Administrator
2023-08-25 45b79f58ec7e8aa82bc086f636d89a5ec253b0e7
S撤改进
2个文件已修改
261 ■■■■■ 已修改文件
l2/cancel_buy_strategy.py 251 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py
@@ -33,6 +33,8 @@
    __s_cancel_real_place_order_index_cache = {}
    # 成交位置
    __s_cancel_transaction_index_cache = {}
    # H撤是否初始化数据,当真实下单位置与成交位到来时才进行赋值
    __s_cancel_inited_data = {}
    __instance = None
@@ -88,6 +90,11 @@
            return cache_result[1]
        return -1, 0, 0
    def __del_compute_data_cache(self, code):
        CodeDataCacheUtil.clear_cache(self.__s_big_num_cancel_compute_data_cache, code)
        key = "s_big_num_cancel_compute_data-{}".format(code)
        RedisUtils.delete_async(self.__db, key)
    # 设置真实下单位置
    def __save_real_place_order_index(self, code, index):
        CodeDataCacheUtil.set_cache(self.__s_cancel_real_place_order_index_cache, code, index)
@@ -117,6 +124,7 @@
        CodeDataCacheUtil.clear_cache(self.__s_big_num_cancel_compute_data_cache, code)
        CodeDataCacheUtil.clear_cache(self.__s_cancel_real_place_order_index_cache, code)
        CodeDataCacheUtil.clear_cache(self.__s_cancel_transaction_index_cache, code)
        CodeDataCacheUtil.clear_cache(self.__s_cancel_inited_data, code)
        ks = ["s_big_num_cancel_compute_data-{}".format(code), "s_cancel_real_place_order_index-{}".format(code)]
        for key in ks:
            RedisUtils.delete_async(self.__db, key)
@@ -124,6 +132,10 @@
    # 设置真实下单位置
    def set_real_place_order_index(self, code, index):
        self.__save_real_place_order_index(code, index)
    # 设置成交进度位
    def set_transaction_index(self, code, index):
        self.__s_cancel_transaction_index_cache[code] = index
    def clear_data(self):
        ks = ["s_big_num_cancel_compute_data-*", "s_cancel_real_place_order_index-*"]
@@ -191,105 +203,165 @@
            return False, None
        l2_log.cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        l2_log.s_cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        real_place_order_index = self.__s_cancel_real_place_order_index_cache.get(code)
        transaction_index = self.__s_cancel_transaction_index_cache.get(code)
        if tool.trade_time_sub(total_data[end_index]["val"]["time"],
                               total_data[buy_exec_index]["val"]["time"]) > constant.S_CANCEL_EXPIRE_TIME:
            # 结束位置超过了执行位置30s,需要重新确认结束位置
            for i in range(end_index, start_index - 1, -1):
                if total_data[end_index]["val"]["time"] != total_data[i]["val"]["time"]:
                    end_index = i
                    break
        # 获取处理进度
        process_index_old, buy_num, cancel_num = self.__get_compute_data_cache(code)
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        process_index = process_index_old
        if process_index_old == -1:
            # 第1次计算需要计算买入信号-执行位的净值
            left_big_num = self.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index,
                                                       total_data, volume_rate_index)
            buy_num += left_big_num
            # 设置买入信号-买入执行位的数据不需要处理
            process_index = buy_exec_index
        # 强制固定为1s
        range_seconds = 1  # self.__sCancelParamsManager.get_buy_time_range(buy_volume_rate_index)
        # 获取真实下单位置
        place_order_index = self.__get_real_place_order_index_cache(code)
        cancel_rate_threshold = self.__sCancelParamsManager.get_cancel_rate(volume_rate_index)
        try:
            for i in range(start_index, end_index + 1):
                data = total_data[i]
                val = data["val"]
                process_index = i
                if process_index_old >= i:
                    # 已经处理过的数据不需要处理
                    continue
                if val["num"] * float(val["price"]) <= constant.S_CANCEL_MIN_MONEY * 100:
                    continue
                if L2DataUtil.is_limit_up_price_buy(val):
                    if place_order_index is not None and place_order_index < data["index"]:
                        # 不能比下单位置后
        if real_place_order_index and transaction_index:
            # S撤计算范围:成交位-真实下单位
            if not self.__s_cancel_inited_data.get(code):
                l2_log.s_cancel_debug(code, "S撤初始化,成交位:{} 下单位:{}", transaction_index, real_place_order_index)
                # 清除之前的计算数据
                self.__s_cancel_inited_data[code] = True
                self.__del_compute_data_cache(code)
                # 计算未撤单的订单手数
                left_big_num = 0
                for i in range(transaction_index + 1, real_place_order_index):
                    data = total_data[code]
                    val = data["val"]
                    if val["num"] * float(val["price"]) <= constant.S_CANCEL_MIN_MONEY * 100:
                        continue
                    # 如果在囊括时间范围内就可以计算买
                    if tool.trade_time_sub(val["time"], buy_exec_time) <= range_seconds:
                        buy_num += data["re"] * int(val["num"])
                elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                    # 查询买入位置
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                     local_today_num_operate_map.get(
                                                                                                         code))
                    if buy_index is not None and buy_single_index <= buy_index:
                        if place_order_index and place_order_index >= buy_index:
                    # 获取
                    if L2DataUtil.is_limit_up_price_buy(val):
                        left_big_num += val["num"] * data["re"]
                    elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                        # 查询买入位置
                        buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                         local_today_num_operate_map.get(
                                                                                                             code))
                        if buy_index is not None and transaction_index + 1 <= buy_index <= real_place_order_index and i < start_index:
                            left_big_num -= val["num"] * data["re"]
                l2_log.s_cancel_debug(code, "S撤初始化结果,left_big_num:{}", left_big_num)
                self.__save_compute_data(code, real_place_order_index, left_big_num, 0)
                # 保存信息
            process_index_old, buy_num, cancel_num = self.__get_compute_data_cache(code)
            process_index = process_index_old
            cancel_rate_threshold = self.__sCancelParamsManager.get_cancel_rate(volume_rate_index)
            try:
                for i in range(start_index, end_index + 1):
                    data = total_data[i]
                    val = data["val"]
                    process_index = i
                    if process_index_old >= i:
                        # 已经处理过的数据不需要处理
                        continue
                    if L2DataUtil.is_limit_up_price_buy_cancel(val):
                        if val["num"] * float(val["price"]) <= constant.S_CANCEL_MIN_MONEY * 100:
                            continue
                        # 查询买入位置
                        buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                         local_today_num_operate_map.get(
                                                                                                             code))
                        if buy_index is not None and transaction_index < buy_index < real_place_order_index:
                            cancel_num += total_data[buy_index]["re"] * int(total_data[buy_index]["val"]["num"])
                        # 买入时间在囊括范围内
                        elif tool.trade_time_sub(tool.trade_time_add_second(buy_exec_time, range_seconds),
                                                 total_data[buy_index]["val"]["time"]) >= 0:
                            cancel_num += total_data[buy_index]["re"] * int(total_data[buy_index]["val"]["num"])
                    elif buy_index is None:
                        # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                        min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                         val["cancelTimeUnit"])
                        # 只判断S级撤销,只有s级撤销才有可能相等
                        if max_space - min_space <= 1:
                            buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                            if int(total_data[buy_single_index]["val"]["time"].replace(":", "")) <= int(
                                    buy_time.replace(":", "")):
                                # 买入时间在囊括范围内
                                if tool.trade_time_sub(tool.trade_time_add_second(buy_exec_time, range_seconds),
                                                       buy_time) >= 0:
                                    cancel_num += data["re"] * int(val["num"])
                            if need_cancel:
                                rate__ = round(cancel_num / max(buy_num, 1), 2)
                                if rate__ > cancel_rate_threshold:
                                    return True, total_data[i]
            finally:
                # 保存处理进度与数据
                self.__save_compute_data(code, process_index, buy_num, cancel_num)
        else:
            # S测计算位置为信号起始位置执行位1s之后
            if tool.trade_time_sub(total_data[end_index]["val"]["time"],
                                   total_data[buy_exec_index]["val"]["time"]) > constant.S_CANCEL_EXPIRE_TIME:
                # 结束位置超过了执行位置60s,需要重新确认结束位置
                for i in range(end_index, start_index - 1, -1):
                    if total_data[end_index]["val"]["time"] != total_data[i]["val"]["time"]:
                        end_index = i
                        break
            # 获取处理进度
            process_index_old, buy_num, cancel_num = self.__get_compute_data_cache(code)
                    # 保存数据
                    if need_cancel:
                        rate__ = round(cancel_num / max(buy_num, 1), 2)
                        if rate__ > cancel_rate_threshold:
                            l2_log.trade_record(code, "S撤范围", "'start_index':{} , 'end_index':{} ,'range_seconds':{}",
                                                buy_single_index,
                                                i,
                                                range_seconds)
                            l2_log.trade_record(code, "S撤", "'index':{} , 'rate':{} , 'target_rate':{}", i, rate__,
                                                cancel_rate_threshold)
                            return True, total_data[i]
        finally:
            # 如果start_index与buy_single_index相同,即是下单后的第一次计算
            # 需要查询买入信号之前的同1s是否有涨停撤的数据
            process_index = process_index_old
            l2_log.cancel_debug(code, "S级大单 范围:{}-{} 取消计算结果:{}/{},比例:{} 目标比例:{} 计算时间范围:{}", start_index, end_index,
                                cancel_num,
                                buy_num, round(cancel_num / max(buy_num, 1), 2), cancel_rate_threshold, range_seconds)
            if process_index_old == -1:
                # 第1次计算需要计算买入信号-执行位的净值
                left_big_num = self.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index,
                                                           total_data, volume_rate_index)
                buy_num += left_big_num
                # 设置买入信号-买入执行位的数据不需要处理
                process_index = buy_exec_index
            # 强制固定为1s
            range_seconds = 1  # self.__sCancelParamsManager.get_buy_time_range(buy_volume_rate_index)
            # 获取真实下单位置
            place_order_index = self.__get_real_place_order_index_cache(code)
            # 保存处理进度与数据
            self.__save_compute_data(code, process_index, buy_num, cancel_num)
            cancel_rate_threshold = self.__sCancelParamsManager.get_cancel_rate(volume_rate_index)
            try:
                for i in range(start_index, end_index + 1):
                    data = total_data[i]
                    val = data["val"]
                    process_index = i
                    if process_index_old >= i:
                        # 已经处理过的数据不需要处理
                        continue
                    if val["num"] * float(val["price"]) <= constant.S_CANCEL_MIN_MONEY * 100:
                        continue
                    if L2DataUtil.is_limit_up_price_buy(val):
                        if place_order_index is not None and place_order_index < data["index"]:
                            # 不能比下单位置后
                            continue
                        # 如果在囊括时间范围内就可以计算买
                        if tool.trade_time_sub(val["time"], buy_exec_time) <= range_seconds:
                            buy_num += data["re"] * int(val["num"])
                    elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                        # 查询买入位置
                        buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                         local_today_num_operate_map.get(
                                                                                                             code))
                        if buy_index is not None and buy_single_index <= buy_index:
                            if place_order_index and place_order_index >= buy_index:
                                cancel_num += total_data[buy_index]["re"] * int(total_data[buy_index]["val"]["num"])
                            # 买入时间在囊括范围内
                            elif tool.trade_time_sub(tool.trade_time_add_second(buy_exec_time, range_seconds),
                                                     total_data[buy_index]["val"]["time"]) >= 0:
                                cancel_num += total_data[buy_index]["re"] * int(total_data[buy_index]["val"]["num"])
                        elif buy_index is None:
                            # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                            min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
                                                                                             val["cancelTimeUnit"])
                            # 只判断S级撤销,只有s级撤销才有可能相等
                            if max_space - min_space <= 1:
                                buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                                if int(total_data[buy_single_index]["val"]["time"].replace(":", "")) <= int(
                                        buy_time.replace(":", "")):
                                    # 买入时间在囊括范围内
                                    if tool.trade_time_sub(tool.trade_time_add_second(buy_exec_time, range_seconds),
                                                           buy_time) >= 0:
                                        cancel_num += data["re"] * int(val["num"])
                        # 保存数据
                        if need_cancel:
                            rate__ = round(cancel_num / max(buy_num, 1), 2)
                            if rate__ > cancel_rate_threshold:
                                l2_log.trade_record(code, "S撤范围",
                                                    "'start_index':{} , 'end_index':{} ,'range_seconds':{}",
                                                    buy_single_index,
                                                    i,
                                                    range_seconds)
                                l2_log.trade_record(code, "S撤", "'index':{} , 'rate':{} , 'target_rate':{}", i, rate__,
                                                    cancel_rate_threshold)
                                return True, total_data[i]
            finally:
                l2_log.cancel_debug(code, "S级大单 范围:{}-{} 取消计算结果:{}/{},比例:{} 目标比例:{} 计算时间范围:{}", start_index, end_index,
                                    cancel_num,
                                    buy_num, round(cancel_num / max(buy_num, 1), 2), cancel_rate_threshold,
                                    range_seconds)
                # 保存处理进度与数据
                self.__save_compute_data(code, process_index, buy_num, cancel_num)
        return False, None
    # 撤单成功
    def cancel_success(self, code):
        self.__clear_data(code)
    #下单成功
    # 下单成功
    def place_order_success(self, code):
        self.__clear_data(code)
@@ -1113,7 +1185,8 @@
            data = total_datas[index]
            if L2DataUtil.is_limit_up_price_buy(data["val"]):
                # 获取是否在买入执行信号周围2s
                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code, index, total_datas,
                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code, index,
                                                                                                      total_datas,
                                                                                                      local_operate_map)
                if left_count > 0:
                    buy_nums += left_count * data["val"]["num"]
trade/huaxin/trade_server.py
@@ -25,7 +25,7 @@
from huaxin_client.client_network import SendResponseSkManager
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \
    GCancelBigNumComputer
    GCancelBigNumComputer, SecondCancelBigNumComputer
from l2.huaxin import huaxin_target_codes_manager
from l2.huaxin.huaxin_target_codes_manager import HuaXinL1TargetCodesManager
from l2.l2_data_manager_new import L2TradeDataProcessor
@@ -253,7 +253,13 @@
                                            buy_progress_index,
                                            total_datas,
                                            num_operate_map)
                                        dask.compute(f1, f2, f3)
                                        f4 = dask.delayed(
                                            SecondCancelBigNumComputer().set_transaction_index)(
                                            code,
                                            buy_progress_index)
                                        dask.compute(f1, f2, f3, f4)
                            except Exception as e:
                                hx_logger_l2_transaction.exception(e)
                        finally: