Administrator
2023-09-05 778889b13087125e0eacfa5f0d69e8bb95f82916
L撤撤单策略修改
2个文件已修改
153 ■■■■■ 已修改文件
l2/cancel_buy_strategy.py 134 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py
@@ -30,6 +30,7 @@
def set_real_place_position(code, index):
    DCancelBigNumComputer().set_real_order_index(code, index)
    SecondCancelBigNumComputer().set_real_place_order_index(code, index)
    LCancelBigNumComputer().set_real_place_order_index(code, index)
class SecondCancelBigNumComputer:
@@ -1004,6 +1005,7 @@
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __last_trade_progress_dict = {}
    __real_place_order_index_dict = {}
    __cancel_watch_index_cache = {}
    __SecondCancelBigNumComputer = SecondCancelBigNumComputer()
@@ -1049,6 +1051,12 @@
                self.__cancel_watch_index_cache[code].discard(index)
            RedisUtils.srem_async(self.__db, f"l_cancel_watch_index-{code}", index)
    def __set_watch_indexes(self, code, indexes):
        self.__cancel_watch_index_cache[code] = indexes
        RedisUtils.delete_async(self.__db, f"l_cancel_watch_index-{code}")
        for index in indexes:
            RedisUtils.srem_async(self.__db, f"l_cancel_watch_index-{code}", index)
    def __get_watch_indexes(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"l_cancel_watch_index-{code}")
@@ -1067,66 +1075,60 @@
            self.del_watch_index(code)
            if code in self.__last_trade_progress_dict:
                self.__last_trade_progress_dict.pop(code)
            if code in self.__real_place_order_index_dict:
                self.__real_place_order_index_dict.pop(code)
        else:
            keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_watch_index-*")
            for k in keys:
                code = k.replace("l_cancel_watch_index-", "")
                self.del_watch_index(code)
    # 设置成交位置,成交位置变化之后相应的监听数据也会发生变化
    def set_trade_progress(self, code, index, total_data):
        # 求动态m值
        volume_rate = code_volumn_manager.get_volume_rate(code)
        volume_rate_index = code_volumn_manager.get_volume_rate_index(volume_rate)
        m_val = L2PlaceOrderParamsManager(code, True, volume_rate, volume_rate_index, None).get_m_val()[0]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        m_val_num = int(m_val / (float(limit_up_price) * 100))
        threshold_num = m_val_num
        old_watch_indexes = self.__get_watch_indexes_cache(code)
        if self.__last_trade_progress_dict.get(code) == index:
            # 成交进度尚未发生变化且已经监听到了足够的数据
            if len(old_watch_indexes) >= constant.L_CANCEL_MIN_WATCH_COUNT:
                if old_watch_indexes:
                    total_num = 0
                    for i in old_watch_indexes:
                        data = total_data[i]
                        val = data['val']
                        total_num += val['num'] * data['re']
                    if total_num > threshold_num:
                        return
        self.__last_trade_progress_dict[code] = index
    # 计算观察索引,倒序计算
    def compute_watch_index(self, code, start_index, end_index):
        total_datas = local_today_datas.get(code)
        if total_datas:
            MIN_MONEYS = [300, 200, 100, 50]
            for min_money in MIN_MONEYS:
        watch_indexes = set()
        start_index = index + 1
        end_index = total_data[-1]["index"]
        total_num = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
                for i in range(end_index - 1, start_index, -1):
                    data = total_datas[i]
            val = data['val']
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            # 小金额过滤
            if float(val['price'])*val['num'] < constant.L_CANCEL_MIN_MONEY * 100:
                    if float(val['price']) * val['num'] < min_money * 100:
                continue
            # 判断当前订单是否已经撤单
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code, i, total_data,
                    left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code, i,
                                                                                                          total_datas,
                                                                                                  local_today_num_operate_map.get(
                                                                                                      code))
            if left_count > 0:
                watch_indexes.add(i)
                total_num += val['num'] * data['re']
                if len(watch_indexes) >= constant.L_CANCEL_MIN_WATCH_COUNT and total_num > threshold_num:
                        if len(watch_indexes) >= 5:
                    break
        l2_log.l_cancel_debug(code,
                              f"设置监听范围,成交进度-{index} , 数据范围:{start_index}-{end_index} 监听范围-{watch_indexes} 纯买手数:{total_num}/{threshold_num}")
        # 数据维护
        add_indexes = watch_indexes - old_watch_indexes
        delete_indexes = old_watch_indexes - watch_indexes
        self.__add_watch_indexes(code, add_indexes)
        self.__del_watch_indexes(code, delete_indexes)
                if watch_indexes:
                    self.__set_watch_indexes(code, watch_indexes)
                    l2_log.l_cancel_debug(code, f"设置监听范围, 数据范围:{start_index}-{end_index} 监听范围-{watch_indexes}")
                    break
    # 设置真实下单位置
    def set_real_place_order_index(self, code, index):
        self.__real_place_order_index_dict[code] = index
        if self.__last_trade_progress_dict.get(code):
            self.compute_watch_index(code, self.__last_trade_progress_dict.get(code), index)
    # 设置成交位置,成交位置变化之后相应的监听数据也会发生变化
    def set_trade_progress(self, code, index, total_data):
        # 已经有计算的无法触发计算
        old_watch_indexes = self.__get_watch_indexes_cache(code)
        if old_watch_indexes:
            return
        if self.__last_trade_progress_dict.get(code) == index:
            return
        self.__last_trade_progress_dict[code] = index
        if self.__real_place_order_index_dict.get(code):
            # 触发计算
            self.compute_watch_index(code, self.__last_trade_progress_dict.get(code), self.__real_place_order_index_dict.get(code))
    def __compute_need_cancel(self, code, buy_exec_index, start_index, end_index, total_data,
                              _local_today_num_operate_map, is_first_code):
@@ -1135,9 +1137,9 @@
            return False, None
        watch_indexes = set([int(i) for i in watch_indexes])
        # 计算监听的总条数
        total_num = 0
        total_count = 0
        for wi in watch_indexes:
            total_num += total_data[wi]["re"] * int(total_data[wi]["val"]["num"])
            total_count += total_data[wi]["re"]
        # 判断撤单中是否有监听中的索引
        need_compute = False
        for i in range(start_index, end_index + 1):
@@ -1152,14 +1154,14 @@
                    break
        if need_compute:
            # 计算撤单比例
            canceled_num = 0
            canceled_count = 0
            for wi in watch_indexes:
                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code,
                                                                                                      wi,
                                                                                                      total_data,
                                                                                                      _local_today_num_operate_map)
                canceled_num += (total_data[wi]["re"] - left_count) * int(total_data[wi]["val"]["num"])
            rate = round(canceled_num / total_num, 3)
                canceled_count += total_data[wi]["re"] - left_count
            rate = round(canceled_count / total_count, 3)
            l2_log.l_cancel_debug(code, f"计算范围:{start_index}-{end_index},已撤单比例:{rate}")
            if rate >= constant.L_CANCEL_RATE:
                return True, total_data[-1]
@@ -1175,42 +1177,6 @@
            return False, None
        can_cancel, cancel_data = self.__compute_need_cancel(code, buy_exec_index, start_index, end_index, total_data,
                                                             _local_today_num_operate_map, is_first_code)
        if can_cancel:
            # 判断成交进度位置到当前位置的净买入
            try:
                place_order_index = self.__SecondCancelBigNumComputer.get_real_place_order_index_cache(code)
                if place_order_index is None:
                    raise Exception("未获取到下单真实位置")
                # 获取到真实成交位置
                transaction_index = self.__last_trade_progress_dict.get(code)
                if transaction_index is None:
                    raise Exception("尚未获取到真实成交位置")
                # 获取m值
                volume_rate = code_volumn_manager.get_volume_rate(code)
                volume_rate_index = code_volumn_manager.get_volume_rate_index(volume_rate)
                m_val = L2PlaceOrderParamsManager(code, True, volume_rate, volume_rate_index, None).get_m_val()[0]
                limit_up_price = gpcode_manager.get_limit_up_price(code)
                m_val_num = int(m_val / (float(limit_up_price) * 100))
                threshold_num = int(m_val_num * 1.2)
                buy_nums = 0
                for index in range(transaction_index + 1, place_order_index):
                    data = total_data[index]
                    if L2DataUtil.is_limit_up_price_buy(data["val"]):
                        # 获取是否在买入执行信号周围2s
                        left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code,
                                                                                                              index,
                                                                                                              total_data,
                                                                                                              local_today_num_operate_map)
                        if left_count > 0:
                            buy_nums += left_count * data["val"]["num"]
                            if buy_nums > threshold_num:
                                l2_log.l_cancel_debug(code, f"LX阻断L撤撤单:{buy_nums}/{threshold_num}  成交位置-{transaction_index} 真实下单位置-{place_order_index}")
                                return False, "LX阻断L撤撤单"
                l2_log.l_cancel_debug(code, f"LX尚未阻断L撤撤单:{buy_nums}/{threshold_num} 成交位置-{transaction_index} 真实下单位置-{place_order_index}")
                return can_cancel, cancel_data
            except Exception as e:
                l2_log.l_cancel_debug(code, f"LX撤单计算异常:{str(e)}")
                return can_cancel, cancel_data
        return can_cancel, cancel_data
    def place_order_success(self, code):
l2/l2_data_manager_new.py
@@ -18,7 +18,7 @@
from trade import trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \
    trade_result_manager, current_price_process_manager, trade_data_manager, trade_huaxin
from l2 import safe_count_manager, l2_data_manager, l2_log, l2_data_source_util, code_price_manager, \
    transaction_progress
    transaction_progress, cancel_buy_strategy
from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, DCancelBigNumComputer, \
    LCancelBigNumComputer
from l2.l2_data_manager import L2DataException
@@ -290,8 +290,7 @@
    @classmethod
    def set_real_place_order_index(cls, code, index):
        cls.__DCancelBigNumComputer.set_real_order_index(code, index)
        cls.__SecondCancelBigNumComputer.set_real_place_order_index(code, index)
        cancel_buy_strategy.set_real_place_position(code, index)
    # 处理华鑫L2数据
    @classmethod
@@ -601,9 +600,6 @@
                cls.__trade_thread_lock_dict[code] = threading.RLock()
            cls.__trade_thread_lock_dict[code].acquire()
            try:
                l2_log.debug(code, "开始执行买入")
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
@@ -1286,6 +1282,11 @@
if __name__ == "__main__":
    yesterday_limit_up_data_records = kpl_data_manager.get_current_limit_up_data_records(1)[0][1]
    yesterday_codes = set([x[0] for x in yesterday_limit_up_data_records])
    print(yesterday_codes)
    # yesterday_limit_up_data_records = kpl_data_manager.get_current_limit_up_data_records(1)[0][1]
    # yesterday_codes = set([x[0] for x in yesterday_limit_up_data_records])
    # print(yesterday_codes)
    code = "002137"
    l2.l2_data_util.load_l2_data(code)
    start_index = 200
    end_index = 259
    LCancelBigNumComputer().compute_watch_index(code, start_index, end_index)