Administrator
2023-02-09 b74016d3ba3750cd27fee83675449da8f1da3926
l2/cancel_buy_strategy.py
@@ -15,11 +15,12 @@
import l2_data_util
from db import redis_manager
import tool
from l2.safe_count_manager import BuyL2SafeCountManager
from l2.transaction_progress import TradeBuyQueue
from trade import trade_data_manager, trade_queue_manager, l2_trade_factor
from l2 import l2_log, l2_data_log
from l2.l2_data_util import L2DataUtil, local_today_num_operate_map, local_today_datas
from log import logger_buy_1_volumn
from log import logger_buy_1_volumn, logger_l2_h_cancel, logger_l2_s_cancel
class SecondCancelBigNumComputer:
@@ -108,10 +109,13 @@
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, total_data, threadId,
                    need_cancel=True):
        if start_index == 375:
            print("进入调试")
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            return False, None
        l2_log.cancel_debug(threadId, code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        logger_l2_s_cancel.debug(f"code-{code} S级是否需要撤单,数据范围:{start_index}-{end_index}")
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            # 结束位置超过了执行位置30s,需要重新确认结束位置
@@ -197,8 +201,10 @@
                        if cancel_num / max(buy_num, 1) > cancel_rate_threshold:
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug(threadId, code, "S级大单 范围:{}-{} 取消计算结果:{}/{},比例:{}", start_index, end_index, cancel_num,
                                buy_num, round(cancel_num / buy_num, 2))
                                buy_num, round(cancel_num / max(buy_num,1), 2))
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, buy_num, cancel_num)
        return False, None
@@ -213,15 +219,16 @@
class HourCancelBigNumComputer:
    __redis_manager = redis_manager.RedisManager(0)
    __tradeBuyQueue = TradeBuyQueue()
    __buyL2SafeCountManager = BuyL2SafeCountManager()
    @classmethod
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __save_watch_index_set(cls, code, datas):
    def __save_watch_index_set(cls, code, datas, process_index, finish):
        key = f"h_cancel_watch_indexs-{code}"
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps(list(datas)))
        cls.__getRedis().setex(key, tool.get_expire(), json.dumps((list(datas), process_index, finish)))
    # 保存成交进度
    @classmethod
@@ -229,9 +236,9 @@
        key = f"h_cancel_watch_indexs-{code}"
        val = cls.__getRedis().get(key)
        if val is None:
            return None
            return None, -1, False
        val = json.loads(val)
        return val
        return val[0], val[1], val[2]
    # 保存结束位置
    @classmethod
@@ -272,13 +279,13 @@
        # 守护30s以外的数据
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) <= 30:
            return False, None
        watch_indexs = cls.__get_watch_index_set(code)
        watch_indexs = cls.__get_watch_index_set(code)[0]
        watch_indexs_dict = {}
        # 监听的总数
        total_nums = 0
        for indexs in watch_indexs:
            watch_indexs_dict[indexs[0]] = indexs
            total_nums += total_data[indexs[0]]["val"]["num"] * indexs[1]
            total_nums += total_data[indexs[0]]["val"]["num"] * indexs[2]
        if watch_indexs is None:
            l2_log.cancel_debug(threadId, code, "H撤没获取到监听范围数据")
@@ -289,6 +296,13 @@
        l2_log.cancel_debug(threadId, code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        # 获取下单次数
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
        if place_order_count <= 1:
            cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
        elif place_order_count <= 2:
            cancel_rate_threshold = constant.H_CANCEL_SECOND_RATE
        else:
            cancel_rate_threshold = constant.H_CANCEL_THIRD_RATE
        process_index = start_index
        try:
            for i in range(start_index, end_index + 1):
@@ -305,19 +319,14 @@
                                                                                         code))
                    if buy_index is not None and buy_index in watch_indexs_dict:
                        cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                        cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
                        if place_order_count <= 1:
                            cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
                        elif place_order_count <= 2:
                            cancel_rate_threshold = constant.H_CANCEL_SECOND_RATE
                        else:
                            cancel_rate_threshold = constant.H_CANCEL_THIRD_RATE
                        if cancel_num / total_nums > cancel_rate_threshold:
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug(threadId, code, "H级撤单计算结果 范围:{}-{} 处理进度:{} 取消计算结果:{}/{}", start_index, end_index,
                                process_index, cancel_num,
                                total_nums)
            logger_l2_h_cancel.info(f"code-{code} H级撤单计算结果 范围:{start_index}-{end_index} 处理进度:{process_index} 目标比例:{cancel_rate_threshold} 取消计算结果:{cancel_num}/{total_nums}")
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, cancel_num)
        return False, None
@@ -331,16 +340,66 @@
    # 设置成交进度
    @classmethod
    def set_trade_progress(cls, code, index, total_data, local_today_num_operate_map, is_default=False):
        l2_log.cancel_debug(0, code, "成交进度:{}", index)
        last_index, is_default = cls.__tradeBuyQueue.get_traded_index(code)
        logger_l2_h_cancel.info(f"code-{code} 成交进度:{index} 数据结束位置:"+str(total_data[-1]["index"]))
        last_index, last_is_default = cls.__tradeBuyQueue.get_traded_index(code)
        # 成交进度
        if is_default:
            cls.__tradeBuyQueue.set_default_traded_index(code, index)
        if last_index is None or last_index != index:
            cls.compute_watch_end_index(code, total_data, local_today_num_operate_map)
            cls.__compute_watch_indexs(code, total_data, local_today_num_operate_map)
        else:
            if last_index is None or last_index != index:
                cls.__tradeBuyQueue.set_traded_index(code, index)
                cls.__compute_watch_indexs(code, total_data, local_today_num_operate_map)
    # 涨停买是否撤单
    @classmethod
    def __get_limit_up_buy_no_canceled_count(cls, index, total_data, local_today_num_operate_map):
        data =None
        try:
            data = total_data[index]
        except:
            print("")
        val = data["val"]
        if L2DataUtil.is_limit_up_price_buy(val):
            # 判断当前买是否已经买撤
            cancel_datas = local_today_num_operate_map.get(
                "{}-{}-{}".format(val["num"], "1", val["price"]))
            canceled = False
            if cancel_datas:
                for cancel_data in cancel_datas:
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
                                                                                     local_today_num_operate_map)
                    if buy_index == index:
                        canceled = True
                        count = data["re"] - cancel_data["re"]
                        if count > 0:
                            return count
                        break
            if not canceled:
                count = data["re"]
                return count
        return 0
        # 计算排名前N的大单
    @classmethod
    def compute_watch_end_index(cls, code, total_data, local_today_num_operate_map):
    def __compute_top_n_num(cls, start_index, total_data, local_today_num_operate_map, count):
        # 找到还未撤的TOPN大单
        watch_set = set()
        for i in range(start_index, total_data[-1]["index"] + 1):
            not_cancel_count = cls.__get_limit_up_buy_no_canceled_count(i, total_data, local_today_num_operate_map)
            if not_cancel_count > 0:
                watch_set.add((i, total_data[i]["val"]["num"], not_cancel_count))
        # 针按照手数排序
        watch_list = list(watch_set)
        watch_list.sort(key=lambda tup: tup[1])
        watch_list.reverse()
        watch_list = watch_list[:count]
        watch_set = set(watch_list)
        return watch_set
    @classmethod
    def __compute_watch_indexs(cls, code, total_data, local_today_num_operate_map):
        trade_progress_index, is_default = cls.__tradeBuyQueue.get_traded_index(code)
        threshold_money, msg = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        if threshold_money < constant.H_CANCEL_MIN_MONEY:
@@ -352,46 +411,43 @@
        total_num = 0
        watch_set = set()
        total_count = 0
        # 暂时不需要使用
        process_index = -1
        finished = False
        safe_count = cls.__buyL2SafeCountManager.get_safe_count(code)
        for i in range(trade_progress_index, total_data[-1]["index"] + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
            process_index = i
            left_count = cls.__get_limit_up_buy_no_canceled_count(i, total_data, local_today_num_operate_map)
            if left_count > 0:
                data = total_data[i]
                val = data["val"]
                total_num += val["num"] * data["re"]
                # 判断当前买是否已经买撤
                cancel_datas = local_today_num_operate_map.get(
                    "{}-{}-{}".format(val["num"], "1", val["price"]))
                canceled = False
                if cancel_datas:
                    for cancel_data in cancel_datas:
                        buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
                                                                                         local_today_num_operate_map)
                        if buy_index == i:
                            # 已经买撤
                            total_num -= buy_data["val"]["num"] * cancel_data["re"]
                            canceled = True
                            count = data["re"] - cancel_data["re"]
                            if count > 0:
                                total_count += count
                                watch_set.add((i, count))
                            break
                if not canceled:
                    count = data["re"]
                    total_count += count
                    watch_set.add((i, count))
                total_count += left_count
                watch_set.add((i, val["num"], left_count))
                # 判断是否达到阈值
                if total_num >= threshold_num and total_count >= constant.H_CANCEL_MIN_COUNT:
                if total_count >= safe_count:  # and total_num >= threshold_num
                    finished = True
                    # 最小8笔
                    l2_log.cancel_debug(0, code, "获取到H撤监听数据:{}", json.dumps(list(watch_set)))
                    l2_log.cancel_debug(0, code, "获取到H撤监听数据:{},计算截至位置:{}", json.dumps(list(watch_set)),
                                        total_data[-1]["index"])
                    break
        # 计算TOP N大单
        top_n_watch_set = cls.__compute_top_n_num(trade_progress_index, total_data, local_today_num_operate_map,
                                                  safe_count)
        logger_l2_h_cancel.info(f"code-{code}  H撤监控临单:{watch_set}")
        logger_l2_h_cancel.info(f"code-{code}  H撤监控较大单:{top_n_watch_set}")
        final_watch_set = set.union(watch_set, top_n_watch_set)
        final_watch_list = list(final_watch_set)
        final_watch_list.sort(key=lambda x: x[0])
        logger_l2_h_cancel.info(f"code-{code}  H撤最终监控大单:{final_watch_list}")
        # 保存计算范围
        cls.__save_watch_index_set(code, watch_set)
        cls.__save_watch_index_set(code, final_watch_set, process_index, finished)
        # 删除原来的计算数据
        cls.__del_compute_data(code)
    @classmethod
    def get_watch_indexs(cls, code):
        return cls.__get_watch_index_set(code)
        return cls.__get_watch_index_set(code)[0]
# --------------------------------封单额变化撤------------------------