Administrator
2023-01-06 59fba698b03a51a8da5b56a919ebbf94d4784f74
l2_data_manager.py
@@ -11,21 +11,22 @@
import big_money_num_manager
import code_data_util
import constant
import data_process
import global_data_loader
import global_util
import industry_codes_sort
import l2_data_log
import l2_data_util
import gpcode_manager
import l2_trade_factor
import log
import redis_manager
import ths_industry_util
import tool
import trade_manager
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process
from trade_data_manager import TradeBuyDataManager
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process,logger_l2_data
import trade_data_manager
import limit_up_time_manager
_redisManager = redis_manager.RedisManager(1)
@@ -187,18 +188,25 @@
        # 获取今日的数据
    if local_today_datas.get(code) is None or force:
        datas = []
        keys = redis.keys("l2-{}-*".format(code))
        for k in keys:
            value = redis.get(k)
            _data = l2_data_util.l2_data_key_2_obj(k, value)
            datas.append(_data)
        # 排序
        new_datas = sorted(datas,
                           key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
        local_today_datas[code] = new_datas
    # 根据今日数据加载
    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
        datas = log.load_l2_from_log()
        datas = datas.get(code)
        if datas is None:
            datas= []
        local_today_datas[code] = datas
        # 从数据库加载
        # datas = []
        # keys = redis.keys("l2-{}-*".format(code))
        # for k in keys:
        #     value = redis.get(k)
        #     _data = l2_data_util.l2_data_key_2_obj(k, value)
        #     datas.append(_data)
        # # 排序
        # new_datas = sorted(datas,
        #                    key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
        # local_today_datas[code] = new_datas
        # 根据今日数据加载
        l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
@tool.async_call
@@ -251,23 +259,27 @@
    data = data["data"]
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
    # 获取涨停价
    return day, client, channel, code, capture_time, process_time, datas,data
    return day, client, channel, code, capture_time, process_time, datas, data
# 保存l2数据
def save_l2_data(code, datas, add_datas):
    redis = _redisManager.getRedis()
    # 保存最近的数据
    __start_time = round(t.time()* 1000)
    redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
    l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存最近l2数据用时")
    # 设置进内存
    local_latest_datas[code] = datas
    __set_l2_data_latest_count(code, len(datas))
    # 只有有新曾数据才需要保存
    if len(add_datas) > 0:
        # 保存最近的数据
        __start_time = round(t.time() * 1000)
        redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
        l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存最近l2数据用时")
        # 设置进内存
        local_latest_datas[code] = datas
        __set_l2_data_latest_count(code, len(datas))
        try:
            logger_l2_data.info("{}-{}",code,add_datas)
        except Exception as e:
            logging.exception(e)
        saveL2Data(code, add_datas)
@@ -396,10 +408,8 @@
                datas.append({"key": key, "val": item, "re": 1})
                dataIndexs.setdefault(key, len(datas) - 1)
        # TODO 测试的时候开启,方便记录大单数据
        #l2_data_util.save_big_data(code, same_time_num, data)
        # l2_data_util.save_big_data(code, same_time_num, data)
        return datas
    @classmethod
    def get_time_as_second(cls, time_str):
@@ -426,6 +436,7 @@
            return False
        return True
    # 是否为涨停卖
    @classmethod
    def is_limit_up_price_sell(cls, val):
        if int(val["limitPrice"]) != 1:
@@ -785,7 +796,7 @@
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
        codes_index = industry_codes_sort.sort_codes(codes,code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
@@ -817,6 +828,7 @@
            L2BetchCancelBigNumProcessor.del_recod(code)
            cls.debug(code, "执行撤单成功")
        except Exception as e:
            logging.exception(e)
            cls.debug(code, "执行撤单异常:{}", str(e))
    @classmethod
@@ -890,7 +902,7 @@
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            TradeBuyDataManager.remove_buy_position_info(code)
            trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
            # 已过时 为买撤保存基础纯买额
            # TradePointManager.set_compute_info_for_cancel_buy(code, compute_index, buy_nums)
            b_buy_count, b_buy_cancel_count = cls.__count_l2_data_before_for_cancel(code, buy_single_index)
@@ -1043,13 +1055,13 @@
    @classmethod
    def __get_threshmoney(cls, code):
        money,msg = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        money, msg = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        return money
    # 获取预估挂买位
    @classmethod
    def __get_sure_order_pos(cls, code):
        index, data = TradeBuyDataManager.get_buy_sure_position(code)
        index, data = trade_data_manager.TradeBuyDataManager.get_buy_sure_position(code)
        if index is None:
            return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1]
        else:
@@ -1916,223 +1928,6 @@
        cls.__save_recod(code, process_index, count)
# 涨停封单额统计
class L2LimitUpMoneyStatisticUtil:
    _redisManager = redis_manager.RedisManager(1)
    @classmethod
    def __get_redis(cls):
        return cls._redisManager.getRedis()
    # 设置l2的每一秒涨停封单额数据
    @classmethod
    def __set_l2_second_money_record(cls, code, time, num, from_index, to_index):
        old_num, old_from, old_to = cls.__get_l2_second_money_record(code, time)
        if old_num is None:
            old_num = num
            old_from = from_index
            old_to = to_index
        else:
            old_num += num
            old_to = to_index
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        cls.__get_redis().setex(key, tool.get_expire(), json.dumps((old_num, old_from, old_to)))
    @classmethod
    def __get_l2_second_money_record(cls, code, time):
        key = "l2_limit_up_second_money-{}-{}".format(code, time.replace(":", ""))
        val = cls.__get_redis().get(key)
        return cls.__format_second_money_record_val(val)
    @classmethod
    def __format_second_money_record_val(cls, val):
        if val is None:
            return None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2]
    @classmethod
    def __get_l2_second_money_record_keys(cls, code, time_regex):
        key = "l2_limit_up_second_money-{}-{}".format(code, time_regex)
        keys = cls.__get_redis().keys(key)
        return keys
    # 设置l2最新的封单额数据
    @classmethod
    def __set_l2_latest_money_record(cls, code, index, num):
        key = "l2_limit_up_money-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), json.dumps((num, index)))
    # 返回数量,索引
    @classmethod
    def __get_l2_latest_money_record(cls, code):
        key = "l2_limit_up_money-{}".format(code)
        result = cls.__get_redis().get(key)
        if result:
            result = json.loads(result)
            return result[0], result[1]
        else:
            return 0, -1
    # 矫正数据
    # 矫正方法为取矫正时间两侧的秒分布数据,用于确定计算结束坐标
    @classmethod
    def verify_num(cls, code, num, time_str):
        time_ = time_str.replace(":", "")
        key = None
        for i in range(4, -2, -2):
            # 获取本(分钟/小时/天)内秒分布数据
            time_regex = "{}*".format(time_[:i])
            keys_ = cls.__get_l2_second_money_record_keys(code, time_regex)
            if keys_ and len(keys_) > 1:
                # 需要排序
                keys = []
                for k in keys_:
                    keys.append(k)
                keys.sort(key=lambda tup: int(tup.split("-")[-1]))
                # 有2个元素
                for index in range(0, len(keys) - 1):
                    time_1 = keys[index].split("-")[-1]
                    time_2 = keys[index + 1].split("-")[-1]
                    if int(time_1) <= int(time_) <= int(time_2):
                        # 在此时间范围内
                        if time_ == time_2:
                            key = keys[index + 1]
                        else:
                            key = keys[index]
                        break
            if key:
                val = cls.__get_redis().get(key)
                old_num, old_from, old_to = cls.__format_second_money_record_val(val)
                end_index = old_to
                # 保存最近的数据
                cls.__set_l2_latest_money_record(code, end_index, num)
                break
    # 计算量,用于涨停封单量的计算
    @classmethod
    def __compute_num(cls, code, data, buy_single_data):
        if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) or L2DataUtil.is_sell(data["val"]):
            # 涨停买撤与卖
            return 0 - int(data["val"]["num"]) * data["re"]
        else:
            # 卖撤
            if L2DataUtil.is_sell_cancel(data["val"]):
                # 卖撤的买数据是否在买入信号之前,如果在之前就不计算,不在之前就计算
                if l2_data_util.is_sell_index_before_target(data, buy_single_data,
                                                            local_today_num_operate_map.get(code)):
                    return 0
            return int(data["val"]["num"]) * data["re"]
    @classmethod
    def clear(cls, code):
        key = "l2_limit_up_money-{}".format(code)
        cls.__get_redis().delete(key)
    # 返回取消的标志数据
    # with_cancel 是否需要判断是否撤销
    @classmethod
    def process_data(cls, code, start_index, end_index, buy_single_begin_index, with_cancel=True):
        start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        time_dict_num = {}
        # 记录计算的坐标
        time_dict_num_index = {}
        num_dict = {}
        # 统计时间分布
        time_dict = {}
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data["val"]
            time_ = val["time"]
            if time_ not in time_dict:
                time_dict[time_] = i
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data["val"]
            time_ = val["time"]
            if time_ not in time_dict_num:
                time_dict_num[time_] = 0
                time_dict_num_index[time_] = {"s": i, "e": i}
            time_dict_num_index[time_]["e"] = i
            num = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
            num_dict[i] = num
            time_dict_num[time_] = time_dict_num[time_] + num
        for t_ in time_dict_num:
            cls.__set_l2_second_money_record(code, t_, time_dict_num[t_], time_dict_num_index[t_]["s"],
                                             time_dict_num_index[t_]["e"])
        print("保存涨停封单额时间:", round(t.time() * 1000) - start_time)
        # 累计最新的金额
        total_num, index = cls.__get_l2_latest_money_record(code)
        if index == -1:
            # 没有获取到最新的矫正封单额,需要从买入信号开始点计算
            index = buy_single_begin_index - 1
            total_num = 0
        # TODO 待优化计算
        cancel_index = None
        cancel_msg = None
        # 待计算量
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        min_volumn = round(10000000 / (limit_up_price * 100))
        # 不同时间的数据开始坐标
        time_start_index_dict = {}
        # 数据时间分布
        time_list = []
        # 到当前时间累积的买1量
        time_total_num_dict = {}
        for i in range(index + 1, end_index + 1):
            data = total_datas[i]
            time_ = data["val"]["time"]
            if time_ not in time_start_index_dict:
                # 记录每一秒的开始位置
                time_start_index_dict[time_] = i
                # 记录时间分布
                time_list.append(time_)
                # 上一段时间的总数
                time_total_num_dict[time_] = total_num
            val = num_dict.get(i)
            if val is None:
                val = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
            total_num += val
            # 如果是减小项,且在处理数据的范围内,就需要判断是否要撤单了
            if val < 0 and start_index <= i <= end_index:
                # 累计封单金额小于1000万
                if total_num < min_volumn:
                    cancel_index = i
                    cancel_msg = "封单金额小于1000万"
                    break
                # 相邻2s内的数据减小50%
                # 上1s的总数
                last_second_total_volumn = time_total_num_dict.get(time_list[-1])
                if last_second_total_volumn > 0 and (
                        last_second_total_volumn - total_num) / last_second_total_volumn >= 0.5:
                    # 相邻2s内的数据减小50%
                    cancel_index = i
                    cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn,
                                                                     total_num)
                    break
        if not with_cancel:
            cancel_index = None
        print("封单额计算时间:", round(t.time() * 1000) - start_time)
        process_end_index = end_index
        if cancel_index:
            process_end_index = cancel_index
        # 保存最新累计金额
        # cls.__set_l2_latest_money_record(code, process_end_index, total_num)
        l2_data_log.l2_time(code, round(t.time() * 1000) - start_time, "l2数据封单额计算时间",
                            False)
        if cancel_index:
            return total_datas[cancel_index], cancel_msg
        return None, None
def __get_time_second(time_str):
    ts = time_str.split(":")
@@ -2379,7 +2174,4 @@
if __name__ == "__main__":
    # 处理数据
    code = "002898"
    load_l2_data(code)
    L2LimitUpMoneyStatisticUtil.verify_num(code, 70582, "09:42:00")
    clear_l2_data("603912")