Administrator
2023-02-16 92cb2dd75ea37b64b174f42ddd0b5b17d6a4634a
l2/cancel_buy_strategy.py
@@ -18,7 +18,7 @@
from l2.safe_count_manager import BuyL2SafeCountManager
from l2.transaction_progress import TradeBuyQueue
from trade import trade_data_manager, trade_queue_manager, l2_trade_factor
from l2 import l2_log, l2_data_log
from l2 import l2_log, l2_data_log, l2_data_source_util
from l2.l2_data_util import L2DataUtil, local_today_num_operate_map, local_today_datas
from log import logger_buy_1_volumn, logger_l2_h_cancel, logger_l2_s_cancel
@@ -85,9 +85,9 @@
                    left_big_num += val["num"] * data["re"]
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 查询买入位置
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(
                                                                                     code))
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                 local_today_num_operate_map.get(
                                                                                                     code))
                if buy_index is not None and start_index <= buy_index <= end_index:
                    if buy_index - buy_single_index < fire_count:
                        left_big_num -= 0
@@ -107,17 +107,17 @@
        return left_big_num
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, threadId,
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data,
                    need_cancel=True):
        if start_index == 375:
            print("进入调试")
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > constant.S_CANCEL_EXPIRE_TIME:
            return False, None
        l2_log.cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        logger_l2_s_cancel.debug(f"code-{code} S级是否需要撤单,数据范围:{start_index}-{end_index}")
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) >  constant.S_CANCEL_EXPIRE_TIME:
            # 结束位置超过了执行位置30s,需要重新确认结束位置
            for i in range(end_index, start_index - 1, -1):
                if total_data[end_index]["val"]["time"] != total_data[i]["val"]["time"]:
@@ -171,11 +171,11 @@
                    buy_num += data["re"] * int(val["num"])
                elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                    # 查询买入位置
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                     local_today_num_operate_map.get(
                                                                                                         code))
                    if buy_index is not None and buy_single_index <= buy_index:
                        cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                        cancel_num += total_data[buy_index]["re"] * int(total_data[buy_index]["val"]["num"])
                    elif buy_index is None:
                        # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                        min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
@@ -202,8 +202,8 @@
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug( code, "S级大单 范围:{}-{} 取消计算结果:{}/{},比例:{}", start_index, end_index, cancel_num,
                                buy_num, round(cancel_num / max(buy_num,1), 2))
            l2_log.cancel_debug(code, "S级大单 范围:{}-{} 取消计算结果:{}/{},比例:{}", start_index, end_index, cancel_num,
                                buy_num, round(cancel_num / max(buy_num, 1), 2))
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, buy_num, cancel_num)
@@ -225,6 +225,7 @@
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    # 保存成交位置到执行位置的揽括范围数据
    @classmethod
    def __save_watch_index_set(cls, code, datas, process_index, finish):
        key = f"h_cancel_watch_indexs-{code}"
@@ -240,7 +241,41 @@
        val = json.loads(val)
        return val[0], val[1], val[2]
    # 保存结束位置
    # 保存执行位置后面的守护数据
    @classmethod
    def __save_watch_index_set_after_exec(cls, code, datas, process_index, total_count, big_num_count, finished):
        key = f"h_cancel_watch_indexs_exec-{code}"
        cls.__getRedis().setex(key, tool.get_expire(),
                               json.dumps((list(datas), process_index, total_count, big_num_count, finished)))
    # 保存成交进度
    @classmethod
    def __get_watch_index_set_after_exec(cls, code):
        key = f"h_cancel_watch_indexs_exec-{code}"
        val = cls.__getRedis().get(key)
        if val is None:
            return [], -1, 0, 0, False
        val = json.loads(val)
        return val[0], val[1], val[2], val[3], val[4]
    # 保存成交进度
    @classmethod
    def __save_traded_progress(cls, code, origin_process_index, latest_process_index):
        key = "h_cancel_traded_progress-{}".format(code)
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps((origin_process_index, latest_process_index)))
    @classmethod
    def __get_traded_progress(cls, code):
        key = "h_cancel_traded_progress-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
    # 保存结算位置
    @classmethod
    def __save_compute_data(cls, code, process_index, cancel_num):
        key = "h_cancel_compute_data-{}".format(code)
@@ -262,38 +297,49 @@
    @classmethod
    def __clear_data(cls, code):
        ks = ["h_cancel_compute_data-{}".format(code)]
        ks = ["h_cancel_compute_data-{}".format(code), f"h_cancel_watch_indexs_exec-{code}",
              f"h_cancel_watch_indexs-{code}",f"h_cancel_traded_progress-{code}"]
        for key in ks:
            cls.__getRedis().delete(key)
    @classmethod
    def clear_data(cls):
        ks = ["h_cancel_compute_data-*"]
        for key in ks:
            keys = cls.__getRedis().keys(key)
            for k in keys:
                cls.__getRedis().delete(k)
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map):
        time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"],
                                         total_data[buy_exec_index]["val"]["time"])
        if time_space >=  constant.S_CANCEL_EXPIRE_TIME - 1:
            # 开始计算需要监控的单
            cls.__compute_watch_indexs_after_exec(code, buy_exec_index, total_data, local_today_num_operate_map)
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, threadId):
        # 守护30s以外的数据
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) <= 30:
        if time_space <=  constant.S_CANCEL_EXPIRE_TIME:
            return False, None
        watch_indexs = cls.__get_watch_index_set(code)[0]
            # 获取成交进度
        origin_progress_index, latest_progress_index = cls.__get_traded_progress(code)
        # 监听的数据
        watch_indexs_dict = {}
        # 监听的总数
        total_nums = 0
        for indexs in watch_indexs:
            watch_indexs_dict[indexs[0]] = indexs
            total_nums += total_data[indexs[0]]["val"]["num"] * indexs[2]
        if watch_indexs is None:
            l2_log.cancel_debug(code, "H撤没获取到监听范围数据")
            return False, None
        if origin_progress_index is not None:
            # 获取成交位置到执行位置的监控数据
            watch_indexs = cls.__get_watch_index_set(code)[0]
            # 监听的总数
            for indexs in watch_indexs:
                index = indexs[0]
                if index < latest_progress_index:
                    continue
                # 只计算最近的执行位之后的数据
                watch_indexs_dict[index] = indexs
                total_nums += total_data[index]["val"]["num"] * indexs[2]
        # 获取到执行位后的监听数据
        datas, process_index, total_count, big_num_count, finished = cls.__get_watch_index_set_after_exec(code)
        if datas:
            for indexs in datas:
                index = indexs[0]
                watch_indexs_dict[index] = indexs
                total_nums += total_data[index]["val"]["num"] * indexs[2]
        processed_index, cancel_num = cls.__get_compute_data(code)
        l2_log.cancel_debug( code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        l2_log.cancel_debug(code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        # 获取下单次数
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
@@ -314,19 +360,18 @@
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy_cancel(val):
                    # 查询买入位置
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                     local_today_num_operate_map)
                    if buy_index is not None and buy_index in watch_indexs_dict:
                        cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                        cancel_num += data["re"] * val["num"]
                        if cancel_num / total_nums > cancel_rate_threshold:
                            return True, total_data[i]
                            return True, data
        finally:
            l2_log.cancel_debug(code, "H级撤单计算结果 范围:{}-{} 处理进度:{} 取消计算结果:{}/{}", start_index, end_index,
                                process_index, cancel_num,
                                total_nums)
            logger_l2_h_cancel.info(f"code-{code} H级撤单计算结果 范围:{start_index}-{end_index} 处理进度:{process_index} 目标比例:{cancel_rate_threshold} 取消计算结果:{cancel_num}/{total_nums}")
            logger_l2_h_cancel.info(
                f"code-{code} H级撤单计算结果 范围:{start_index}-{end_index} 处理进度:{process_index} 目标比例:{cancel_rate_threshold} 取消计算结果:{cancel_num}/{total_nums}")
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, cancel_num)
        return False, None
@@ -335,26 +380,29 @@
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map):
        cls.__clear_data(code)
        cls.set_trade_progress(code, buy_exec_index, total_data, local_today_num_operate_map, True)
    # 设置成交进度
    @classmethod
    def set_trade_progress(cls, code, index, total_data, local_today_num_operate_map, is_default=False):
        logger_l2_h_cancel.info(f"code-{code} 成交进度:{index} 数据结束位置:"+str(total_data[-1]["index"]))
        last_index, last_is_default = cls.__tradeBuyQueue.get_traded_index(code)
        # 成交进度
        if is_default:
            cls.__tradeBuyQueue.set_default_traded_index(code, index)
            cls.__compute_watch_indexs(code, total_data, local_today_num_operate_map)
    def set_trade_progress(cls, code, data_time, buy_exec_index, index, total_data, local_today_num_operate_map):
        cls.__tradeBuyQueue.set_traded_index(code, index)
        # 如果获取时间与执行时间小于29则不需要处理
        if buy_exec_index is None or buy_exec_index < 0 or tool.trade_time_sub(data_time, total_data[buy_exec_index]["val"]["time"]) < constant.S_CANCEL_EXPIRE_TIME - 1:
            return
        # 保存成交进度
        origin_index, latest_index = cls.__get_traded_progress(code)
        if origin_index is None:
            cls.__save_traded_progress(code, index, index)
            # 计算揽括范围
            cls.__compute_watch_indexs_between_traded_exec(code, index, buy_exec_index, total_data,
                                                           local_today_num_operate_map)
        else:
            if last_index is None or last_index != index:
                cls.__tradeBuyQueue.set_traded_index(code, index)
                cls.__compute_watch_indexs(code, total_data, local_today_num_operate_map)
            cls.__save_traded_progress(code, origin_index, index)
        logger_l2_h_cancel.info(f"code-{code} 成交进度:{index} 数据结束位置:" + str(total_data[-1]["index"]))
    # 涨停买是否撤单
    @classmethod
    def __get_limit_up_buy_no_canceled_count(cls, index, total_data, local_today_num_operate_map):
        data =None
    def __get_limit_up_buy_no_canceled_count(cls, code, index, total_data, local_today_num_operate_map):
        data = None
        try:
            data = total_data[index]
        except:
@@ -367,8 +415,9 @@
            canceled = False
            if cancel_datas:
                for cancel_data in cancel_datas:
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
                                                                                     local_today_num_operate_map)
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
                                                                                                     local_today_num_operate_map)
                    if buy_index == index:
                        canceled = True
                        count = data["re"] - cancel_data["re"]
@@ -382,12 +431,14 @@
        # 计算排名前N的大单
    # 过时数据
    @classmethod
    def __compute_top_n_num(cls, start_index, total_data, local_today_num_operate_map, count):
    def __compute_top_n_num(cls, code, start_index, total_data, local_today_num_operate_map, count):
        # 找到还未撤的TOPN大单
        watch_set = set()
        for i in range(start_index, total_data[-1]["index"] + 1):
            not_cancel_count = cls.__get_limit_up_buy_no_canceled_count(i, total_data, local_today_num_operate_map)
            not_cancel_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                        local_today_num_operate_map)
            if not_cancel_count > 0:
                watch_set.add((i, total_data[i]["val"]["num"], not_cancel_count))
        # 针按照手数排序
@@ -398,56 +449,82 @@
        watch_set = set(watch_list)
        return watch_set
    # 从成交位置到执行位置
    @classmethod
    def __compute_watch_indexs(cls, code, total_data, local_today_num_operate_map):
        trade_progress_index, is_default = cls.__tradeBuyQueue.get_traded_index(code)
        threshold_money, msg = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        if threshold_money < constant.H_CANCEL_MIN_MONEY:
            threshold_money = constant.H_CANCEL_MIN_MONEY
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        threshold_num = round(threshold_money / (limit_up_price * 100))
        if trade_progress_index is None:
            raise Exception("尚未获取到成交进度")
        total_num = 0
        watch_set = set()
    def __compute_watch_indexs_between_traded_exec(cls, code, progress_index, buy_exec_index, total_data,
                                                   local_today_num_operate_map):
        total_count = 0
        # 暂时不需要使用
        process_index = -1
        finished = False
        safe_count = cls.__buyL2SafeCountManager.get_safe_count(code)
        for i in range(trade_progress_index, total_data[-1]["index"] + 1):
            process_index = i
            left_count = cls.__get_limit_up_buy_no_canceled_count(i, total_data, local_today_num_operate_map)
        watch_set = set()
        big_num_count = 0
        for i in range(progress_index, buy_exec_index):
            left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map)
            if left_count > 0:
                data = total_data[i]
                val = data["val"]
                total_num += val["num"] * data["re"]
                if val["num"] * float(val["price"]) <= 9900:
                    continue
                total_count += left_count
                watch_set.add((i, val["num"], left_count))
                if l2_data_util.is_big_money(val):
                    big_num_count += data["re"]
        final_watch_list = list(watch_set)
        final_watch_list.sort(key=lambda x: x[0])
        logger_l2_h_cancel.info(f"code-{code}  H撤监控成交位到执行位:{final_watch_list}")
        cls.__save_watch_index_set(code, final_watch_list, buy_exec_index, True)
        # 删除原来的计算数据
        # cls.__del_compute_data(code)
    # 计算执行位置之后的需要监听的数据
    @classmethod
    def __compute_watch_indexs_after_exec(cls, code, buy_exec_index, total_data, local_today_num_operate_map):
        watch_list, process_index_old, total_count_old, big_num_count_old, finish = cls.__get_watch_index_set_after_exec(
            code)
        if watch_list and finish:
            # 已经计算完了不需要再进行计算
            return
        watch_set = set()
        if watch_list:
            for data in watch_list:
                watch_set.add((data[0], data[1], data[2]))
        # 暂时不需要使用
        process_index = process_index_old
        finished = False
        big_num_count = big_num_count_old
        total_count = total_count_old
        for i in range(buy_exec_index, total_data[-1]["index"] + 1):
            if i <= process_index_old:
                continue
            process_index = i
            left_count = cls.__get_limit_up_buy_no_canceled_count(code, i, total_data, local_today_num_operate_map)
            if left_count > 0:
                data = total_data[i]
                val = data["val"]
                if val["num"] * float(val["price"]) <= 9900:
                    continue
                total_count += left_count
                watch_set.add((i, val["num"], left_count))
                if l2_data_util.is_big_money(val):
                    big_num_count += data["re"]
                # 判断是否达到阈值
                if total_count >= safe_count:  # and total_num >= threshold_num
                if total_count >= constant.H_CANCEL_MIN_COUNT and big_num_count >= constant.H_CANCEL_MIN_BIG_NUM_COUNT:  # and total_num >= threshold_num
                    finished = True
                    # 最小8笔
                    l2_log.cancel_debug(code, "获取到H撤监听数据:{},计算截至位置:{}", json.dumps(list(watch_set)),
                                        total_data[-1]["index"])
                    break
        # 计算TOP N大单
        top_n_watch_set = cls.__compute_top_n_num(trade_progress_index, total_data, local_today_num_operate_map,
                                                  safe_count)
        logger_l2_h_cancel.info(f"code-{code}  H撤监控临单:{watch_set}")
        logger_l2_h_cancel.info(f"code-{code}  H撤监控较大单:{top_n_watch_set}")
        final_watch_set = set.union(watch_set, top_n_watch_set)
        final_watch_list = list(final_watch_set)
        final_watch_list.sort(key=lambda x: x[0])
        logger_l2_h_cancel.info(f"code-{code} 安全笔数:{safe_count}  H撤最终监控大单:{final_watch_list}")
        # 保存计算范围
        cls.__save_watch_index_set(code, final_watch_set, process_index, finished)
        # 删除原来的计算数据
        cls.__del_compute_data(code)
    @classmethod
    def get_watch_indexs(cls, code):
        return cls.__get_watch_index_set(code)[0]
        final_watch_list = list(watch_set)
        final_watch_list.sort(key=lambda x: x[0])
        logger_l2_h_cancel.info(f"code-{code}  H撤监控执行位相邻单:{final_watch_list}")
        # 保存计算范围
        cls.__save_watch_index_set_after_exec(code, final_watch_list, process_index, total_count, big_num_count,
                                              finished)
        # 删除原来的计算数据
        # cls.__del_compute_data(code)
# --------------------------------封单额变化撤------------------------
@@ -535,17 +612,15 @@
                    break
        else:
            keys_ = cls.__get_l2_second_money_record_keys(code, "*")
            key_list=[]
            key_list = []
            for k in keys_:
                time__ = k.split("-")[-1]
                key_list.append((int(time__),k))
                key_list.append((int(time__), k))
            key_list.sort(key=lambda tup: tup[0])
            for t in key_list:
                if t[0] <= int(time_):
                    keys.append(t[1])
                    break
        keys.sort(key=lambda tup: int(tup.split("-")[-1]))
        if len(keys) > 0:
@@ -622,7 +697,7 @@
    # 返回取消的标志数据
    # with_cancel 是否需要判断是否撤销
    @classmethod
    def process_data(cls, random_key, code, start_index, end_index, buy_single_begin_index, buy_exec_index,
    def process_data(cls, code, start_index, end_index, buy_single_begin_index, buy_exec_index,
                     with_cancel=True):
        if buy_single_begin_index is None or buy_exec_index is None:
            return None, None
@@ -697,14 +772,13 @@
                if big_money_num_manager.is_big_num(data["val"]):
                    if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                        cancel_big_num_count += int(data["re"])
                        # TODO 大量重复的工作需要处理,可以暂存在内存中,从而减少计算
                        # 获取是否在买入执行信号周围2s
                        buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                         local_today_num_operate_map.get(
                                                                                             code))
                        if buy_index is not None and buy_data is not None:
                        buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                         local_today_num_operate_map.get(
                                                                                                             code))
                        if buy_index is not None:
                            # 相差1s
                            buy_time = buy_data["val"]["time"]
                            buy_time = total_datas[buy_index]["val"]["time"]
                            if abs(buy_exec_time - tool.get_time_as_second(buy_time)) < 2:
                                cancel_big_num_count += int(data["re"])
@@ -740,7 +814,7 @@
                # 如果是减小项
                if val < 0:
                    # 当前量小于最大量的24%则需要取消
                    if exec_time_offset >= 30:
                    if exec_time_offset >=  constant.S_CANCEL_EXPIRE_TIME:
                        if total_num <= min_volumn_big and max_buy1_volume * 0.24 > total_num:
                            cancel_index = i
                            cancel_msg = "封板额小于最高封板额的24% {}/{}".format(total_num, max_buy1_volume)
@@ -853,7 +927,7 @@
    # 处理数据,返回是否需要撤单
    # 处理范围:买入执行位-当前最新位置
    @classmethod
    def process(cls, random_key, code, start_index, end_index, buy_exec_index):
    def process(cls, code, start_index, end_index, buy_exec_index):
        # 获取涨停卖的阈值
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)