Administrator
2023-11-22 1e64a42737bb6cc7192c68633d3c168ca150da97
L下动态更新一次/加载订单是否成交
8个文件已修改
279 ■■■■■ 已修改文件
l2/cancel_buy_strategy.py 139 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/code_price_manager.py 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_export.py 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 55 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/data_export_util.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py
@@ -15,6 +15,7 @@
import l2_data_util
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from l2.code_price_manager import Buy1PriceManager
from l2.l2_data_manager import OrderBeginPosInfo
from l2.l2_sell_manager import L2LimitUpSellManager
from log_module import async_log_util
@@ -486,12 +487,12 @@
        transaction_index = self.__transaction_progress_index_dict.get(code)
        if transaction_index:
            # 不能计算成交进度以前的数据
            start_compute_index = max(transaction_index + 1, start_compute_index)
            start_compute_index = transaction_index + 1  # max(transaction_index + 1, start_compute_index)
        total_datas = local_today_datas.get(code)
        # -----------------计算H上-------------------
        watch_indexes_up = set()
        for i in range(start_compute_index, real_place_order_index):
        for i in range(real_place_order_index - 1, start_compute_index + 1, -1):
            data = total_datas[i]
            val = data['val']
            if not L2DataUtil.is_limit_up_price_buy(val):
@@ -505,53 +506,37 @@
                                                                                                         code))
            if left_count > 0:
                watch_indexes_up.add(i)
                if len(watch_indexes_up) >= 3:
                    break
        # ------------------计算H下-----------------------
        # 计算结束位置
        total_num = 0
        # 获取m值数据
        thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
        thresh_hold_money = Buy1PriceManager().get_latest_buy1_money(code)
        thresh_hold_money = thresh_hold_money
        thresh_hold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100)
        end_index = real_place_order_index + 1
        watch_indexes = set()
        for i in range(real_place_order_index + 1, total_datas[-1]["index"]):
            # 看是否撤单
            data = total_datas[i]
            val = data['val']
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            if float(val['price']) * val['num'] < 50 * 100:
                continue
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                     total_datas,
                                                                                                     local_today_canceled_buyno_map.get(
                                                                                                         code))
            if left_count > 0:
                watch_indexes.add(i)
                total_num += left_count * val["num"]
                if total_num > thresh_hold_num:
                    end_index = i
                count = len(watch_indexes)
                # 最小5笔,最大10笔
                if (total_num > thresh_hold_num and count >= 5) or count >= 10:
                    break
        MIN_MONEYS = [300, 200, 100, 50]
        watch_indexes = set()
        for min_money in MIN_MONEYS:
            for i in range(real_place_order_index + 1, end_index + 1):
                # 看是否撤单
                data = total_datas[i]
                val = data['val']
                if not L2DataUtil.is_limit_up_price_buy(val):
                    continue
                # 小金额过滤
                if float(val['price']) * val['num'] < min_money * 100:
                    continue
                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                         total_datas,
                                                                                                         local_today_canceled_buyno_map.get(
                                                                                                             code))
                if left_count > 0:
                    watch_indexes.add(i)
                    if len(watch_indexes) >= 5:
                        break
            if len(watch_indexes) >= 5:
                break
        if watch_indexes or watch_indexes_up:
            watch_indexes |= watch_indexes_up
            self.__save_watch_index_set(code, buy_single_index, watch_indexes)
@@ -905,7 +890,7 @@
    __redis_manager = redis_manager.RedisManager(0)
    __last_trade_progress_dict = {}
    __real_place_order_index_dict = {}
    __cancel_watch_index_cache = {}
    __cancel_watch_index_info_cache = {}
    # 成交位附近临近大单索引
    __near_by_trade_progress_index_cache = {}
@@ -926,11 +911,14 @@
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "l_cancel_watch_index-*")
            keys = RedisUtils.keys(__redis, "l_cancel_watch_index_info-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.smembers(__redis, k)
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_cache, code, val)
                val = RedisUtils.get(__redis, k)
                if val:
                    val = json.loads(val)
                    val[2] = set(val[2])
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_info_cache, code, val)
            keys = RedisUtils.keys(__redis, "l_cancel_real_place_order_index-*")
            for k in keys:
@@ -955,30 +943,11 @@
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def __add_watch_indexes(self, code, indexes):
        if not indexes:
            return
        if code not in self.__cancel_watch_index_cache:
            self.__cancel_watch_index_cache[code] = set()
        for index in indexes:
            self.__cancel_watch_index_cache[code].add(index)
            RedisUtils.sadd_async(self.__db, f"l_cancel_watch_index-{code}", index)
        RedisUtils.expire_async(self.__db, f"l_cancel_watch_index-{code}", tool.get_expire())
    def __del_watch_indexes(self, code, indexes):
        if not indexes:
            return
        for index in indexes:
            if code in self.__cancel_watch_index_cache:
                self.__cancel_watch_index_cache[code].discard(index)
            RedisUtils.srem_async(self.__db, f"l_cancel_watch_index-{code}", index)
    def __set_watch_indexes(self, code, buy_single_index, indexes):
        self.__cancel_watch_index_cache[code] = indexes
        RedisUtils.delete_async(self.__db, f"l_cancel_watch_index-{code}")
        for index in indexes:
            RedisUtils.sadd_async(self.__db, f"l_cancel_watch_index-{code}", index)
    def __set_watch_indexes(self, code, buy_single_index, re_compute: int, indexes):
        self.__cancel_watch_index_info_cache[code] = (buy_single_index, re_compute, indexes)
        RedisUtils.delete_async(self.__db, f"l_cancel_watch_index_info-{code}")
        RedisUtils.setex_async(self.__db, f"l_cancel_watch_index_info-{code}", tool.get_expire(),
                               (buy_single_index, re_compute, list(indexes)))
        if indexes:
            trade_record_log_util.add_cancel_watch_indexes_log(code,
                                                               trade_record_log_util.CancelWatchIndexesInfo(
@@ -986,14 +955,11 @@
                                                                   buy_single_index,
                                                                   list(indexes)))
    def __get_watch_indexes(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"l_cancel_watch_index-{code}")
    def __get_watch_indexes_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_index_cache, code)
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_index_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return set()
        return None
    def __set_near_by_trade_progress_indexes(self, code, buy_single_index, indexes):
        if indexes:
@@ -1019,8 +985,8 @@
        return None
    def del_watch_index(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_index_cache, code)
        RedisUtils.delete_async(self.__db, f"l_cancel_watch_index-{code}")
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_index_info_cache, code)
        RedisUtils.delete_async(self.__db, f"l_cancel_watch_index_info-{code}")
    def clear(self, code=None):
        if code:
@@ -1029,9 +995,9 @@
                self.__real_place_order_index_dict.pop(code)
                RedisUtils.delete_async(self.__db, f"l_cancel_real_place_order_index-{code}")
        else:
            keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_watch_index-*")
            keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_watch_index_info-*")
            for k in keys:
                code = k.replace("l_cancel_watch_index-", "")
                code = k.replace("l_cancel_watch_index_info-", "")
                if code in self.__last_trade_progress_dict:
                    self.__last_trade_progress_dict.pop(code)
                if code in self.__real_place_order_index_dict:
@@ -1041,8 +1007,23 @@
            for k in keys:
                RedisUtils.delete(self.__get_redis(), k)
        # 重新计算L上
    def re_compute_l_down_watch_indexes(self, code):
        watch_index_info = self.__cancel_watch_index_info_cache.get(code)
        if not watch_index_info or watch_index_info[1] > 0:
            return
        # 获取成交进度位与真实下单位置
        real_place_order_index = self.__real_place_order_index_dict.get(code)
        last_trade_progress_index = self.__last_trade_progress_dict.get(code)
        if not real_place_order_index or not last_trade_progress_index:
            return
        self.compute_watch_index(code, watch_index_info[0], last_trade_progress_index + 1, real_place_order_index,
                                 re_compute=1)
    # 计算观察索引,倒序计算
    def compute_watch_index(self, code, buy_single_index, start_index, end_index):
    # re_compute:是否是重新计算的
    def compute_watch_index(self, code, buy_single_index, start_index, end_index, re_compute=0):
        try:
            l2_log.l_cancel_debug(code, f"计算L后囊括范围:{start_index}-{end_index}")
            total_datas = local_today_datas.get(code)
@@ -1145,8 +1126,8 @@
                            if left_count > 0:
                                watch_indexes.add(i)
                                break
                    self.__set_watch_indexes(code, buy_single_index, watch_indexes)
                    l2_log.l_cancel_debug(code, f"设置监听范围, 数据范围:{re_start_index}-{end_index} 监听范围-{watch_indexes}")
                    self.__set_watch_indexes(code, buy_single_index, re_compute, watch_indexes)
                    l2_log.l_cancel_debug(code, f"设置监听范围{ '(重新计算)' if re_compute else ''}, 数据范围:{re_start_index}-{end_index} 监听范围-{watch_indexes}")
        except Exception as e:
            l2_log.l_cancel_debug(code, f"计算L后囊括范围出错:{str(e)}")
            async_log_util.exception(logger_l2_l_cancel, e)
@@ -1244,9 +1225,10 @@
    # 已经成交的索引
    def add_deal_index(self, code, index, buy_single_index):
        watch_indexes = self.__get_watch_indexes_cache(code)
        if not watch_indexes:
        watch_indexes_info = self.__get_watch_indexes_cache(code)
        if not watch_indexes_info:
            return
        watch_indexes = watch_indexes_info[2]
        if index not in watch_indexes:
            return
        if buy_single_index is None:
@@ -1256,7 +1238,7 @@
        if real_place_order_index and real_place_order_index > index:
            total_datas = local_today_datas.get(code)
            min_num = int(5000 / (float(gpcode_manager.get_limit_up_price(code))))
            for j in range(index + 1 , real_place_order_index):
            for j in range(index + 1, real_place_order_index):
                data = total_datas[j]
                val = data['val']
                if data["index"] in watch_indexes:
@@ -1274,13 +1256,13 @@
                if left_count > 0:
                    watch_indexes.add(data["index"])
                    break
        self.__set_watch_indexes(code, buy_single_index, watch_indexes)
        self.__set_watch_indexes(code, watch_indexes_info[0], watch_indexes_info[1], watch_indexes)
    def __compute_need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, is_first_code):
        watch_indexes = self.__get_watch_indexes_cache(code)
        if not watch_indexes:
        watch_indexes_info = self.__get_watch_indexes_cache(code)
        if not watch_indexes_info:
            return False, None
        watch_indexes = set([int(i) for i in watch_indexes])
        watch_indexes = set([int(i) for i in watch_indexes_info[2]])
        # 计算监听的总条数
        total_num = 0
        for wi in watch_indexes:
@@ -1338,7 +1320,6 @@
        # 监听范围小于5笔不生效
        if len(watch_indexes) < 5:
            return False, None
        # 计算监听的总条数
        # 权重
@@ -1422,8 +1403,8 @@
    # L后是否还有可能撤单
    def __is_l_down_can_cancel(self, code):
        watch_indexes = self.__get_watch_indexes_cache(code)
        if not watch_indexes:
        watch_indexes_info = self.__get_watch_indexes_cache(code)
        if not watch_indexes_info:
            return True
        trade_index = self.__last_trade_progress_dict.get(code)
        if trade_index is None:
@@ -1432,7 +1413,7 @@
        total_datas = local_today_datas.get(code)
        total_deal_nums = 0
        total_nums = 1
        for index in watch_indexes:
        for index in watch_indexes_info[2]:
            data = total_datas[index]
            val = data["val"]
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
l2/code_price_manager.py
@@ -20,6 +20,10 @@
    __latest_data = {}
    __current_buy_1_price = {}
    __buy1_price_info_cache = {}
    # 买1的金额
    __latest_buy1_money_dict = {}
    # 最近3分钟内的买1金额
    __latest_3m_buy1_money_list_dict = {}
    __open_limit_up_lowest_price_cache = {}
    __instance = None
@@ -137,13 +141,10 @@
    # 处理
    def process(self, code, buy_1_price, time_str, limit_up_price, sell_1_price, sell_1_volumn):
        data_str = f"{buy_1_price},{time_str},{limit_up_price},{sell_1_price},{sell_1_volumn}"
    def process(self, code, buy_1_price, buy_1_volume, time_str, limit_up_price, sell_1_price, sell_1_volumn):
        data_str = f"{buy_1_price},{buy_1_volume},{time_str},{limit_up_price},{sell_1_price},{sell_1_volumn}"
        if self.__latest_data.get(code) == data_str:
            return
        self.__latest_data[code] = data_str
        # 保存买1价格
        self.__save_buy1_price(code, buy_1_price)
        # 记录日志
        logger_trade_queue_price_info.info(
@@ -151,6 +152,18 @@
        # 买1价格不能小于1块
        if float(buy_1_price) < 1.0:
            return
        ## 记录最近的买1金额
        if code not in self.__latest_3m_buy1_money_list_dict:
            self.__latest_3m_buy1_money_list_dict[code] = []
        self.__latest_3m_buy1_money_list_dict[code].append((time_str, int(buy_1_price * buy_1_volume)))
        if len(self.__latest_3m_buy1_money_list_dict[code]) > 80:
            self.__latest_3m_buy1_money_list_dict[code] = self.__latest_3m_buy1_money_list_dict[code][-80:]
        self.__latest_data[code] = data_str
        self.__latest_buy1_money_dict[code] = int(buy_1_price * buy_1_volume)
        # 保存买1价格
        self.__save_buy1_price(code, buy_1_price)
        is_limit_up = abs(float(limit_up_price) - float(buy_1_price)) < 0.01
        old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_info_cache(code)
@@ -232,6 +245,13 @@
        if limit_up_time is None:
            self.__save_buy1_price_info(code, time_str, None)
    # 获取最近的买1金额
    def get_latest_buy1_money(self, code):
        return self.__latest_buy1_money_dict.get(code)
    def get_latest_3m_buy1_money_list(self, code):
        return self.__latest_3m_buy1_money_list_dict.get(code)
if __name__ == "__main__":
    print(Buy1PriceManager().get_limit_up_info("002777"))
log_module/log_export.py
@@ -338,6 +338,25 @@
    return fdatas
# 加载l2订单成交数据
def load_huaxin_deal_record(code):
    path = f"{constant.get_path_prefix()}/logs/huaxin/l2/transaction_desc.{tool.get_now_date_str()}.log"
    # 格式:[(订单号,手数,开始成交时间,成交结束时间,下单手数)]
    fdatas = []
    lines = __load_file_content(path)
    for line in lines:
        data_index = line.find(f"{code}")
        if data_index > 0:
            line = line.split(" - ")[1]
            time_str = line[line.find("[") + 1:line.find("[") + 9]
            data = line[line.find("]") + 1:].strip()
            code = data.split("#")[0]
            data = data.split("#")[1]
            data = eval(data)
            fdatas.append(data)
    return fdatas
def load_kpl_reason_changes():
    path = f"{constant.get_path_prefix()}/logs/gp/kpl/kpl_limit_up_reason_change.{tool.get_now_date_str()}.log"
    fdatas = []
@@ -438,7 +457,7 @@
if __name__ == '__main__':
    fdatas = load_huaxin_transaction_map()
    fdatas = load_huaxin_deal_record("002528")
    print(len(fdatas))
    # print(get_h_cancel_compute_info("603912"))
server.py
@@ -389,7 +389,7 @@
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        if limit_up_price is not None:
                            code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_time, limit_up_price,
                            code_price_manager.Buy1PriceManager().process(code, buy_one_price, buy_one_volumn, buy_time, limit_up_price,
                                                                          sell_one_price, sell_one_volumn)
                            _start_time = time.time()
                            msg += "买1价格处理:" + f"{_start_time - __start_time} "
test/l2_trade_test.py
@@ -23,7 +23,7 @@
from third_data import kpl_util, kpl_data_manager, block_info
from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager, CodePlateKeyBuyManager, KPLCodeJXBlockManager
from third_data.kpl_data_manager import KPLDataManager
from trade import trade_data_manager, current_price_process_manager, l2_trade_util
from trade import trade_data_manager, current_price_process_manager, l2_trade_util, trade_manager
from trade.trade_queue_manager import THSBuy1VolumnManager
import l2.l2_data_manager_new, l2.l2_data_manager, l2.l2_data_util, l2.cancel_buy_strategy
@@ -85,10 +85,11 @@
                except Exception as e:
                    pass
    @unittest.skip("跳过此单元测试")
    # @unittest.skip("跳过此单元测试")
    def test_trade(self):
        trade_manager.TradeStateManager().open_buy()
        threading.Thread(target=async_log_util.run_sync, daemon=True).start()
        code = "000026"
        code = "002528"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
@@ -151,7 +152,7 @@
        current_price_process_manager.set_trade_price(code, round(float(gpcode_manager.get_limit_up_price(code)), 2))
        pss_server, pss_strategy = multiprocessing.Pipe()
        huaxin_trade_api.run_pipe_trade(pss_server, None)
        huaxin_trade_api.run_pipe_trade(pss_server, None, None)
        for indexs in pos_list:
            l2_log.threadIds[code] = mock.Mock(
third_data/code_plate_key_manager.py
@@ -521,11 +521,12 @@
                          before_blocks_dict):
        # 加载涨停代码的目标板块
        def load_code_block():
            for d in limit_up_record_datas:
                if d[2] in constant.KPL_INVALID_BLOCKS and d[3] in before_blocks_dict:
                    code_limit_up_reason_dict[d[3]] = list(before_blocks_dict.get(d[3]))[0]
                else:
                    code_limit_up_reason_dict[d[3]] = d[2]
            if limit_up_record_datas:
                for d in limit_up_record_datas:
                    if d[2] in constant.KPL_INVALID_BLOCKS and d[3] in before_blocks_dict:
                        code_limit_up_reason_dict[d[3]] = list(before_blocks_dict.get(d[3]))[0]
                    else:
                        code_limit_up_reason_dict[d[3]] = d[2]
            return code_limit_up_reason_dict
        if current_limit_up_datas is None:
trade/huaxin/huaxin_trade_server.py
@@ -1,5 +1,6 @@
import concurrent.futures
import contextlib
import copy
import datetime
import hashlib
import io
@@ -15,6 +16,7 @@
import time
import dask
import numpy
import psutil
import requests
@@ -294,7 +296,7 @@
    def l2_order(cls, code, _datas, timestamp):
        now_timestamp = int(time.time() * 1000)
        async_log_util.info(hx_logger_l2_orderdetail,
                            f"{code}#耗时:{int((time.time() - timestamp)*1000)}-{now_timestamp}#{_datas}")
                            f"{code}#耗时:{int((time.time() - timestamp) * 1000)}-{now_timestamp}#{_datas}")
        thread_id = random.randint(0, 100000)
        l2_log.threadIds[code] = thread_id
        # l2_data_log.l2_time_log(code, "开始处理L2逐笔委托")
@@ -324,9 +326,34 @@
        if limit_up_price is not None:
            # 处理买1,卖1信息
            code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str,
            code_price_manager.Buy1PriceManager().process(code, buy_1_price, buy_1_volume, time_str,
                                                          limit_up_price,
                                                          sell_1_price, sell_1_volume // 100)
            latest_3m_buy1_money_list = code_price_manager.Buy1PriceManager().get_latest_3m_buy1_money_list(code)
            # 如果时涨停状态
            if abs(float(limit_up_price) - float(buy_1_price)) < 0.001:
                # 是否处于下单状态
                state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or constant.TEST:
                    if latest_3m_buy1_money_list and tool.trade_time_sub(latest_3m_buy1_money_list[-1][0],
                                                                         latest_3m_buy1_money_list[0][0]) >= 2 * 60:
                        # 2分钟以内,标准差在10%以内
                        c_start_index = None
                        for i in range(len(latest_3m_buy1_money_list) - 1, -1, -1):
                            if tool.trade_time_sub(latest_3m_buy1_money_list[-1][0],
                                                   latest_3m_buy1_money_list[i][0]) >= 2 * 60:
                                c_start_index = i
                                break
                        if c_start_index is not None:
                            latest_3m_buy1_money_list = copy.deepcopy(latest_3m_buy1_money_list[c_start_index:])
                            latest_3m_buy1_money_list = [x[1] for x in latest_3m_buy1_money_list]
                            avg_val = numpy.mean(numpy.array(latest_3m_buy1_money_list))
                            max_val = max(latest_3m_buy1_money_list)
                            min_val = min(latest_3m_buy1_money_list)
                            if abs(max_val - avg_val) / avg_val < 0.1 and abs(min_val - avg_val) / avg_val < 0.1:
                                # 买1封单额平稳
                                LCancelBigNumComputer().re_compute_l_down_watch_indexes(code)
            cls.__KPLCodeJXBlockManager.load_jx_blocks(code, buy_1_price, limit_up_price)
            # 更新板块信息
            yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
@@ -883,7 +910,8 @@
l2DataListenManager: L2DataListenManager = None
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,queue_strategy_w_trade_r_for_read, order_queues, transaction_queues,
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        order_queues, transaction_queues,
        market_queue):
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
@@ -903,7 +931,8 @@
        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue)
        # 启动交易服务
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,queue_strategy_w_trade_r_for_read)
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,
                                        queue_strategy_w_trade_r_for_read)
        # 监听l1那边传过来的代码
        t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True)
@@ -932,17 +961,11 @@
if __name__ == "__main__":
    code = "002640"
    code = "002528"
    global_data_loader.init()
    kpl_data_manager.KPLLimitUpDataRecordManager.load_total_datas()
    l2_data_util.load_l2_data(code, False, False)
    # DealOrderNoManager().add_orderno(code, "18972810")
    # DealOrderNoManager().add_orderno(code, "18972232")
    # DealOrderNoManager().add_orderno(code, "18972434")
    total_deal_nums = DealOrderNoManager().get_deal_nums(code, l2_data_util.local_today_buyno_map.get(code))
    print("成交大单手数", total_deal_nums)
    thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    if limit_up_price:
        rate = round(total_deal_nums / (thresh_hold_money // (float(limit_up_price) * 100)), 2)
        LCancelRateManager().set_big_num_deal_rate(code, rate)
        print("撤单比例", LCancelRateManager().get_cancel_rate(code))
    datas = log_export.load_l2_market_data()
    datas = datas[code]
    for data in datas:
        TradeServerProcessor.l2_market_data(code, data)
utils/data_export_util.py
@@ -36,11 +36,16 @@
    process_indexs = log_export.get_l2_process_position(code, date)
    trade_indexs = log_export.get_l2_trade_position(code, date)
    real_position_indexes = log_export.get_real_place_order_positions(code, date)
    fdatas = export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes)
    deal_list = log_export.load_huaxin_deal_record(code)
    deal_list_dict={}
    for d in deal_list:
        deal_list_dict[d[0]] = d
    fdatas = export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict)
    return fdatas
def export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes):
def export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict):
    def find_process_index(index):
        for i in range(0, len(process_indexs)):
            if process_indexs[i][0] <= index <= process_indexs[i][1]:
@@ -153,6 +158,9 @@
                    cancel_info = "{}-{}".format(cancel_data["index"], cancel_data["val"]["time"])
                except Exception as e:
                    logging.exception(e)
            else:
                if data["val"].get("orderNo") in deal_list_dict:
                    cancel_info = deal_list_dict[ data["val"].get("orderNo")][3]
        format_data.append(cancel_info)
        cancel_order_info = None
        if trade_info: