Administrator
2025-06-03 c4ed4da4ac8b8bc24e0a3ed0e782e9248b4a511c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import json
import logging
import multiprocessing
import threading
import time
 
import constant
from db import redis_manager_delegate as redis_manager
from l2 import l2_log
from l2.huaxin import huaxin_target_codes_manager
from l2.subscript import l2_subscript_manager
from log_module import async_log_util
from log_module.log import logger_system, logger_l2_codes_subscript, logger_debug
from servers.huaxin_trade_server import TradeServerProcessor
from third_data import block_info
from trade.huaxin import huaxin_trade_data_update
from trade.huaxin.huaxin_trade_api import ClientSocketManager
from utils import tool
 
 
def __listen_l1_target_codes(queue_l1_w_strategy_r: multiprocessing.Queue):
    logger_system.info(f"__listen_l1_target_codes 线程ID:{tool.get_thread_id()}")
    if queue_l1_w_strategy_r is not None:
        while True:
            try:
                val = queue_l1_w_strategy_r.get()
                if val:
                    val = json.loads(val)
                    # 处理数据
                    type_ = val["type"]
                    timestamp = val.get("time")
                    # 大于10s的数据放弃处理
                    if type_ == "set_target_codes":
                        async_log_util.info(logger_l2_codes_subscript, f"策略接收到数据")
                        if time.time() * 1000 - timestamp > 10 * 1000:
                            continue
                        TradeServerProcessor.set_target_codes(val)
            except Exception as e:
                logger_debug.exception(e)
 
 
def __listen_l2_subscript_target_codes(queue_other_w_l2_r: multiprocessing.Queue):
    """
    监听L2订阅目标代码
    @param queue_other_w_l2_r:
    @return:
    """
    logger_system.info("启动读取L2订阅代码队列")
    while True:
        try:
            _datas = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.pop()
            if _datas:
                times = _datas[0]
                datas = _datas[1]
                request_id = _datas[2]
                async_log_util.info(logger_l2_codes_subscript, "({})读取L2代码处理队列:数量-{}", request_id, len(datas))
                # 只处理20s内的数据
                if time.time() - times < 20:
                    # datas中的数据格式:(代码, 现价, 涨幅, 量, 时间)
                    if not datas:
                        # 没有数据需要处理
                        continue
 
                    # 再次获取代码
                    codes = [d[0] for d in datas]
                    for code in codes:
                        block_info.init_code(code)
                    if constant.IS_L2_NEW:
                        process_manager: l2_subscript_manager.TargetCodeProcessManager = l2_subscript_manager\
                            .process_manager
                        queue_codes_list = process_manager.set_codes(set(codes))
                        code_data_dict = {d[0]: d for d in datas}
                        for queue_codes in queue_codes_list:
                            root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                         "data": [code_data_dict.get(c) for c in queue_codes[1]]}
                            queue_codes[0].put_nowait(json.dumps(root_data))
                    else:
                        root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                     "data": datas}
                        queue_other_w_l2_r.put_nowait(json.dumps(root_data))
                    # 如果在9:25-9:29 需要加载板块
                    # if int("092500") < int(tool.get_now_time_str().replace(":", "")) < int("092900"):
                    #     for d in datas:
                    #         threading.Thread(target=lambda: KPLCodeJXBlockManager().load_jx_blocks(d[0],
                    #                                                                                gpcode_manager.get_price(
                    #                                                                                    d[0]),
                    #                                                                                float(d[2]),
                    #                                                                                KPLLimitUpDataRecordManager.get_current_reasons()),
                    #                          daemon=True).start()
                    #         time.sleep(0.2)
                    async_log_util.info(logger_l2_codes_subscript, "({})发送到华鑫L2代码处理队列:数量-{}", request_id, len(datas))
        except Exception as e:
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)
        finally:
            time.sleep(0.01)
 
 
def run_data_listener(queue_other_w_l2_r, queue_l1_w_strategy_r):
    """
    运行数据监听器
    @param queue_other_w_l2_r:
    @return:
    """
    # 交易数据更新任务
    huaxin_trade_data_update.run()
 
    # 接收来自L1的数据
    threading.Thread(target=lambda: __listen_l1_target_codes(queue_l1_w_strategy_r), daemon=True).start()
 
    # 接收L2订阅
    threading.Thread(target=lambda: __listen_l2_subscript_target_codes(queue_other_w_l2_r), daemon=True).start()
    # 运行异步redis同步服务
    threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True).start()
    # 同步异步日志
    threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
    # 同步L2的异步日志
    l2_log.codeLogQueueDistributeManager.run_async()
    threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True).start()
    while True:
        time.sleep(5)