Administrator
2022-12-09 954e42723fab626b33f6dbff9246bd235981fe7a
l2_data_manager_new.py
@@ -217,14 +217,15 @@
                        state = trade_manager.get_trade_state(code)
                        start_index = len(total_datas) - len(add_datas)
                        end_index = len(total_datas) - 1
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                            # 已挂单
                            cls.__process_order(code, start_index, end_index, capture_timestamp)
                        else:
                            # 未挂单
                            cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                           capture_timestamp)
                    __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
                # 保存数据
                l2_data_manager.save_l2_data(code, datas, add_datas)
@@ -258,7 +259,7 @@
        buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
        # 撤单计算,只看买1
        cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
                                                                           buy_single_index)
                                                                           buy_single_index, buy_exec_index)
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
@@ -368,7 +369,7 @@
            return False, "同一板块中老三,老四,...不能买"
        if cls.__codeActualPriceProcessor.is_under_water(code):
            # 水下捞且板块中的票小于21不能买
            # 水下捞且板块中的票小于16不能买
            if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get(
                    industry) <= 16:
                return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry))
@@ -518,7 +519,8 @@
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            # 涨停封单额计算
            L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index, False)
            L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index,
                                                     buy_exec_index, False)
            _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "记录执行买入数据")
@@ -628,7 +630,13 @@
            count = threshold_count - sub_threshold_count
            if count < 3:
                count = 3
            return round(count*buy1_factor)
            count = round(count * buy1_factor)
            # 最高30笔,最低8笔
            if count > 30:
                count = 30
            if count < 8:
                count = 8
            return count
        _start_time = t.time()
        total_datas = local_today_datas[code]
@@ -846,6 +854,17 @@
                    return True, "9:30涨停的老大,老二可以下单"
            return False, "老大非9:30涨停,老二不能下单"
    @classmethod
    def test3(cls):
        code = "002693"
        load_l2_data(code, True)
        start_index = 334
        end_index = 341
        buy_single_index = 152
        cls.random_key[code] = random.randint(0, 100000)
        L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
                                                 buy_single_index)
# 涨停封单额统计
class L2LimitUpMoneyStatisticUtil:
@@ -969,12 +988,13 @@
    # 返回取消的标志数据
    # with_cancel 是否需要判断是否撤销
    @classmethod
    def process_data(cls, code, start_index, end_index, buy_single_begin_index, with_cancel=True):
    def process_data(cls, code, start_index, end_index, buy_single_begin_index, buy_exec_index, with_cancel=True):
        start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        time_dict_num = {}
        # 记录计算的坐标
        time_dict_num_index = {}
        # 坐标-量的map
        num_dict = {}
        # 统计时间分布
        time_dict = {}
@@ -1022,8 +1042,39 @@
        time_list = []
        # 到当前时间累积的买1量
        time_total_num_dict = {}
        # 大单撤销笔数
        cancel_big_num_count = 0
        buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index])
        # 从同花顺买1矫正过后的位置开始计算,到end_index结束
        for i in range(index + 1, end_index + 1):
            data = total_datas[i]
            # 统计撤销数量
            if big_money_num_manager.is_big_num(data["val"]):
                if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                    cancel_big_num_count += int(data["re"])
                    # TODO 大量重复的工作需要处理,可以暂存在内存中,从而减少计算
                    # 获取是否在买入执行信号周围2s
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data["val"],
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and buy_data is not None:
                        # 相差1s
                        buy_time = buy_data["val"]["time"]
                        if abs(buy_exec_time - tool.get_time_as_second(buy_time)) < 2:
                            cancel_big_num_count += int(data["re"])
                elif L2DataUtil.is_limit_up_price_buy(data["val"]):
                    cancel_big_num_count -= int(data["re"])
            threshold_rate = 0.5
            if cancel_big_num_count >= 0:
                if cancel_big_num_count < 10:
                    threshold_rate = threshold_rate - cancel_big_num_count * 0.01
                else:
                    threshold_rate = threshold_rate - 10 * 0.01
            time_ = data["val"]["time"]
            if time_ not in time_start_index_dict:
                # 记录每一秒的开始位置
@@ -1048,12 +1099,27 @@
                # 上1s的总数
                last_second_total_volumn = time_total_num_dict.get(time_list[-1])
                if last_second_total_volumn > 0 and (
                        last_second_total_volumn - total_num) / last_second_total_volumn >= 0.5:
                        last_second_total_volumn - total_num) / last_second_total_volumn >= threshold_rate:
                    # 相邻2s内的数据减小50%
                    cancel_index = i
                    cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn,
                                                                     total_num)
                    break
                # 记录中有上2个数据
                if len(time_list) >= 2:
                    # 倒数第2个数据
                    last_2_second_total_volumn = time_total_num_dict.get(time_list[-2])
                    if last_2_second_total_volumn > 0:
                        if last_2_second_total_volumn > last_second_total_volumn > total_num:
                            dif = last_2_second_total_volumn - total_num
                            if dif / last_2_second_total_volumn >= threshold_rate:
                                cancel_index = i
                                cancel_msg = "相邻3s({})内的封单量(第3秒 与 第1的 减小比例)减小50%({}->{}->{})".format(time_,
                                                                                                     last_2_second_total_volumn,
                                                                                                     last_second_total_volumn,
                                                                                                     total_num)
                                break
        if not with_cancel:
            cancel_index = None
@@ -1072,7 +1138,82 @@
        return None, None
# 涨停卖统计
class L2LimitUpSellStatisticUtil:
    _redisManager = redis_manager.RedisManager(0)
    @classmethod
    def __get_redis(cls):
        return cls._redisManager.getRedis()
    # 新增卖数据
    @classmethod
    def __incre_sell_data(cls, code, num):
        key = "limit_up_sell_num-{}".format(code)
        cls.__get_redis().incrby(key, num)
    @classmethod
    def __get_sell_data(cls, code):
        key = "limit_up_sell_num-{}".format(code)
        val = cls.__get_redis().get(key)
        if val is None:
            return 0
        return int(val)
    @classmethod
    def __save_process_index(cls, code, index):
        key = "limit_up_sell_index-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), index)
    @classmethod
    def __get_process_index(cls, code):
        key = "limit_up_sell_index-{}".format(code)
        val = cls.__get_redis().get(key)
        if val is None:
            return -1
        return int(val)
    # 清除数据
    @classmethod
    def delete(cls, code):
        key = "limit_up_sell_num-{}".format(code)
        cls.__get_redis().delete(key)
        key = "limit_up_sell_index-{}".format(code)
        cls.__get_redis().delete(key)
    # 处理数据,返回是否需要撤单
    @classmethod
    def process(cls, code, start_index, end_index, buy_exec_index):
        # 获取涨停卖的阈值
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
        threshold_num = zyltgb * 0.015 // (limit_up_price * 100)
        total_num = cls.__get_sell_data(code)
        cancel_index = None
        process_index = cls.__get_process_index(code)
        for i in range(start_index, end_index + 1):
            if i < buy_exec_index:
                continue
            if i <= process_index:
                continue
            total_datas = local_today_datas.get(code)
            if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]):
                num = int(total_datas[i]["val"]["num"])
                cls.__incre_sell_data(code, num)
                total_num += num
                if total_num > threshold_num:
                    cancel_index = i
                    break
        if cancel_index is not None:
            process_index = cancel_index
        else:
            process_index = end_index
        # 保存处理的位置
        cls.__save_process_index(code, process_index)
        return cancel_index
if __name__ == "__main__":
    L2TradeDataProcessor.test2()
    L2TradeDataProcessor.test3()
    print("----------------------")
    # L2TradeDataProcessor.test()