Administrator
2024-03-21 c81e244207a297280c602d40fb3ce0a365fb23e5
L2数据本地加载优化
4个文件已修改
72 ■■■■■ 已修改文件
l2/cancel_buy_strategy.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py
@@ -154,6 +154,7 @@
            total_money = total_num * 100 * big_sell_order_info[1][-1][2]
            deal_rate = round(total_deal_money / total_money, 4)
            if deal_rate >= 0.3:
                l2_log.s_cancel_debug(code, f"S撤触发的卖单:{big_sell_order_info}")
                return True, f"有300w大卖单成交比例:{deal_rate}({total_deal_money}/{total_money})"
        if total_deal_money >= 100 * 10000:
@@ -923,8 +924,8 @@
            l2_log.l_cancel_debug(code, f"计算L后囊括范围:{start_index}-{end_index}")
            total_datas = local_today_datas.get(code)
            if re_compute > 0 and tool.trade_time_sub(total_datas[-1]["val"]["time"],
                                                      total_datas[buy_single_index]["val"]["time"]) < 2 * 60:
                # 间隔超过2分钟才能重新计算
                                                      total_datas[buy_single_index]["val"]["time"]) < 2 * 60 and min_cancel_time_with_ms is None:
                # 封单额稳了以后,间隔超过2分钟才能重新计算
                l2_log.l_cancel_debug(code, f"要间隔2分钟过后才能重新计算")
                return
            if total_datas:
l2/l2_data_manager_new.py
@@ -308,9 +308,8 @@
    def process_huaxin(cls, code, origin_datas):
        datas = None
        try:
            l2_data_log.l2_time_log(code, "开始加载历史数据")
            # 加载历史的L2数据
            is_normal = l2.l2_data_util.load_l2_data(code, load_latest=False)
            is_normal = l2.l2_data_util.l2_data_is_normal(code)
            if not is_normal:
                # 数据不正常需要禁止交易
                l2_trade_util.forbidden_trade(code, msg="L2历史数据异常")
@@ -319,11 +318,9 @@
            total_datas = local_today_datas.get(code)
            if total_datas:
                _start_index = total_datas[-1]["index"] + 1
            l2_data_log.l2_time_log(code, "开始格式化原始数据")
            datas = l2_huaxin_util.get_format_l2_datas(code, origin_datas,
                                                       gpcode_manager.get_limit_up_price(code), _start_index)
            __start_time = round(t.time() * 1000)
            l2_data_log.l2_time_log(code, "开始处理数据")
            if len(datas) > 0:
                cls.process_add_datas(code, datas, 0, __start_time)
        except Exception as e:
@@ -332,7 +329,6 @@
            logger_l2_error.exception(e)
        finally:
            if datas:
                l2_data_log.l2_time_log(code, "开始保存数据")
                l2.l2_data_util.save_l2_data(code, None, datas)
            origin_datas.clear()
@@ -341,13 +337,17 @@
        real_order_index = huaxin_delegate_postion_manager.recompute_for_slow_time(code, order_info,
                                                                                   pre_real_order_index)
        if real_order_index:
            exec_data = order_info[2]
            order_begin_pos = cls.__get_order_begin_pos(
                code)
            if order_begin_pos and order_begin_pos.buy_exec_index == exec_data["index"]:
                cls.set_real_place_order_index(code, real_order_index, order_begin_pos)
                async_log_util.info(logger_real_place_order_position,
                                    f"真实下单位矫正:{code}-{real_order_index}  下单数据:{order_info}")
            try:
                exec_data = order_info[2]
                order_begin_pos = cls.__get_order_begin_pos(
                    code)
                async_log_util.info(logger_debug, f"下单位矫正:真实下单位-{real_order_index} 订单信息-{order_info}  下单信息-{order_begin_pos}")
                if order_begin_pos and order_begin_pos.buy_exec_index == exec_data["index"]:
                    cls.set_real_place_order_index(code, real_order_index, order_begin_pos)
                    async_log_util.info(logger_real_place_order_position,
                                        f"真实下单位矫正:{code}-{real_order_index}  下单数据:{order_info}")
            except Exception as e:
                logger_debug.exception(e)
    @classmethod
    def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time):
@@ -360,7 +360,6 @@
            l2.l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
            l2.l2_data_util.load_buy_no_map(local_today_buyno_map, code, add_datas)
            l2.l2_data_util.load_canceled_buy_no_map(local_today_canceled_buyno_map, code, add_datas)
            l2_data_log.l2_time_log(code, "process_add_datas 加载完数据")
            if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN:
                try:
                    if constant.TEST:
@@ -452,7 +451,6 @@
                state = cls.__CodesTradeStateManager.get_trade_state_cache(code)
                start_index = len(total_datas) - len(add_datas)
                end_index = len(total_datas) - 1
                l2_data_log.l2_time_log(code, "process_add_datas 开始处理")
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # 已挂单
                    cls.__process_order(code, start_index, end_index, capture_timestamp, is_first_code)
@@ -1463,7 +1461,6 @@
            # 记录没下单原因
            async_log_util.info(logger_l2_not_buy_reasons, f"{code}#{not_buy_msg}")
            _start_time = t.time()
        l2_data_log.l2_time_log(code, "__start_compute_buy 结束")
    # 获取下单起始信号
    @classmethod
l2/l2_data_util.py
@@ -79,6 +79,28 @@
    return True
# L2数据是否正常
def l2_data_is_normal(code):
    datas = local_today_datas.get(code)
    if datas and len(datas) < datas[-1]["index"] + 1:
        return False
    return True
# 加载所有的l2数据
def load_l2_data_all(force=False):
    datas = log_export.load_l2_from_log()
    for code in datas:
        if force:
            local_today_datas[code] = datas[code]
        else:
            if code not in local_today_datas:
                local_today_datas[code] = datas[code]
        load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
        load_buy_no_map(local_today_buyno_map, code, local_today_datas.get(code), force)
        load_canceled_buy_no_map(local_today_canceled_buyno_map, code, local_today_datas.get(code), force)
# 将数据根据num-operate分类
def load_num_operate_map(local_today_num_operate_map, code, source_datas, clear=False):
    if local_today_num_operate_map.get(code) is None:
trade/huaxin/huaxin_trade_server.py
@@ -29,7 +29,7 @@
from huaxin_client import l1_subscript_codes_manager, l2_data_transform_protocol
from huaxin_client.client_network import SendResponseSkManager
from huaxin_client.trade_transform_protocol import TradeResponse
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress, \
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, transaction_progress, \
    l2_data_source_util, cancel_buy_strategy, l2_data_log
from l2.cancel_buy_strategy import LCancelBigNumComputer, GCancelBigNumComputer, SCancelBigNumComputer, \
    LCancelRateManager, DCancelBigNumComputer
@@ -396,7 +396,7 @@
            use_time = time.time() - now_time
            if use_time > 0.01:
                l2_data_log.l2_time_log(code,
                                    f"处理L2逐笔委托结束:处理数据数量: {len(_datas)} 最终处理时间:{round(use_time*1000,2)}ms")
                                        f"处理L2逐笔委托结束:处理数据数量: {len(_datas)} 最终处理时间:{round(use_time * 1000, 2)}ms")
    @classmethod
    def l2_transaction(cls, code, datas):
@@ -1200,9 +1200,9 @@
    def OnGetActiveListenCount(self, client_id, request_id):
        try:
            order = 0#l2DataListenManager.get_active_count(L2DataListenManager.TYPE_ORDER)
            transaction = 0#l2DataListenManager.get_active_count(L2DataListenManager.TYPE_TRANSACTION)
            market = 0#l2DataListenManager.get_active_count(L2DataListenManager.TYPE_MARKET)
            order = 0  # l2DataListenManager.get_active_count(L2DataListenManager.TYPE_ORDER)
            transaction = 0  # l2DataListenManager.get_active_count(L2DataListenManager.TYPE_TRANSACTION)
            market = 0  # l2DataListenManager.get_active_count(L2DataListenManager.TYPE_MARKET)
            result = {"code": 0, "data": {"order": order, "transaction": transaction, "market": market}}
            self.send_response(result, client_id, request_id)
        except Exception as e:
@@ -1681,14 +1681,16 @@
    schedule.every().day.at("09:00:00").do(huaxin_trade_data_update.add_position_list)
    schedule.every().day.at("09:10:00").do(huaxin_trade_data_update.add_position_list)
    threading.Thread(target=run_pending, daemon=True).start()
    l2_data_util.load_l2_data_all(True)
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, queue_l1_trade_w_strategy_r):
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        queue_l1_trade_w_strategy_r):
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
        # 执行一些初始化数据
        block_info.init()
        __init()
        # 启动外部接口监听
        manager = outside_api_command_manager.ApiCommandManager()
        manager.init(middle_api_protocol.SERVER_HOST,
@@ -1726,7 +1728,7 @@
        t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
        t1.start()
        __init()
        laddr = "0.0.0.0", 10008
        try: