Administrator
2025-03-11 2c04737679957659e3eae0e3b69469ae5d7c5095
新版L2与老版L2模式切换
4个文件已修改
34 ■■■■ 已修改文件
constant.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/subscript/l2_subscript_manager.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
task/task_manager.py 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -230,4 +230,7 @@
# 忽略板块流入的市场强度
IGNORE_BLOCK_IN_MONEY_MARKET_STRONG = 101
# 新版L2监听
IS_L2_NEW = False
l2/subscript/l2_subscript_manager.py
@@ -5,11 +5,14 @@
import multiprocessing
import random
import threading
import time
import msgpack
import zmq
from huaxin_client import l2_data_transform_protocol
from log_module import async_log_util
from log_module.log import logger_debug
from utils import shared_memery_util
process_manager = None
@@ -85,6 +88,8 @@
    L2数据监听
    """
    last_log_time = time.time()
    def __init__(self, channel_list):
        """
@@ -123,10 +128,14 @@
        while True:
            data = socket.recv()
            try:
                #接收数据
                # 接收数据
                data = msgpack.unpackb(data)
                shared_memery_id = data["data"]["memery_number"]
                datas = shared_memery_util.read_datas(self.shared_memery_num_object_dict.get(shared_memery_id))
                if time.time() - self.last_log_time > 10:
                    async_log_util.info(logger_debug, f"L2-V2获取到数据:{datas}")
                    self.last_log_time = time.time()
                if data["type"] == 1:
                    # 委托
                    code, data_list, timestamp = datas[0], datas[1], datas[2]
main.py
@@ -179,7 +179,7 @@
        # 测试L2单独进程
        if True:
        if constant.IS_L2_NEW:
            __create_l2_subscript()
        else:
            # 将L2的进程改为线程执行
task/task_manager.py
@@ -4,9 +4,11 @@
import threading
import time
import constant
from db import redis_manager_delegate as redis_manager
from l2 import l2_log
from l2.huaxin import huaxin_target_codes_manager
from l2.subscript import l2_subscript_manager
from log_module import async_log_util
from log_module.log import logger_system, logger_l2_codes_subscript, logger_debug
from servers.huaxin_trade_server import TradeServerProcessor
@@ -63,9 +65,19 @@
                    codes = [d[0] for d in datas]
                    for code in codes:
                        block_info.init_code(code)
                    root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                 "data": datas}
                    queue_other_w_l2_r.put_nowait(json.dumps(root_data))
                    if constant.IS_L2_NEW:
                        process_manager: l2_subscript_manager.TargetCodeProcessManager = l2_subscript_manager\
                            .process_manager
                        queue_codes_list = process_manager.set_codes(set(codes))
                        code_data_dict = {d[0]: d for d in datas}
                        for queue_codes in queue_codes_list:
                            root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                         "data": [code_data_dict.get(c) for c in queue_codes[1]]}
                            queue_codes[0].put_nowait(json.dumps(root_data))
                    else:
                        root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                     "data": datas}
                        queue_other_w_l2_r.put_nowait(json.dumps(root_data))
                    # 如果在9:25-9:29 需要加载板块
                    # if int("092500") < int(tool.get_now_time_str().replace(":", "")) < int("092900"):
                    #     for d in datas: