Administrator
2024-11-21 a0f4a1d5bed0b4be8be122e90d2f95b76f178a94
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import json
import logging
import multiprocessing
import threading
import time
 
from db import redis_manager_delegate as redis_manager
from l2 import l2_log
from l2.huaxin import huaxin_target_codes_manager
from log_module import async_log_util
from log_module.log import logger_system, logger_l2_codes_subscript, logger_debug
from servers.huaxin_trade_server import TradeServerProcessor
from third_data import block_info
from trade.huaxin import huaxin_trade_data_update
from trade.huaxin.huaxin_trade_api import ClientSocketManager
from utils import tool
 
 
def __listen_l1_target_codes(queue_l1_w_strategy_r: multiprocessing.Queue):
    logger_system.info(f"__listen_l1_target_codes 线程ID:{tool.get_thread_id()}")
    if queue_l1_w_strategy_r is not None:
        while True:
            try:
                val = queue_l1_w_strategy_r.get()
                if val:
                    val = json.loads(val)
                    # print("收到来自L1的数据:", val["type"])
                    # 处理数据
                    type_ = val["type"]
                    timestamp = val.get("time")
                    # 大于10s的数据放弃处理
                    if type_ == "set_target_codes":
                        async_log_util.info(logger_l2_codes_subscript, f"策略接收到数据")
                        if time.time() * 1000 - timestamp > 10 * 1000:
                            continue
                        TradeServerProcessor.set_target_codes(val)
            except Exception as e:
                logger_debug.exception(e)
 
 
def __listen_l2_subscript_target_codes(queue_other_w_l2_r: multiprocessing.Queue):
    """
    监听L2订阅目标代码
    @param queue_other_w_l2_r:
    @return:
    """
    logger_system.info("启动读取L2订阅代码队列")
    while True:
        try:
            _datas = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.pop()
            if _datas:
                times = _datas[0]
                datas = _datas[1]
                request_id = _datas[2]
                logger_l2_codes_subscript.info("({})读取L2代码处理队列:数量-{}", request_id, len(datas))
                # 只处理20s内的数据
                if time.time() - times < 20:
                    # 获取涨停列表中的数据
                    # datas中的数据格式:(代码, 现价, 涨幅, 量, 时间)
                    if not datas:
                        # 没有数据需要处理
                        continue
 
                    # 再次获取代码
                    codes = [d[0] for d in datas]
                    for code in codes:
                        block_info.init_code(code)
                    root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                 "data": datas}
                    queue_other_w_l2_r.put_nowait(json.dumps(root_data))
                    # 如果在9:25-9:29 需要加载板块
                    # if int("092500") < int(tool.get_now_time_str().replace(":", "")) < int("092900"):
                    #     for d in datas:
                    #         threading.Thread(target=lambda: KPLCodeJXBlockManager().load_jx_blocks(d[0],
                    #                                                                                gpcode_manager.get_price(
                    #                                                                                    d[0]),
                    #                                                                                float(d[2]),
                    #                                                                                KPLLimitUpDataRecordManager.get_current_reasons()),
                    #                          daemon=True).start()
                    #         time.sleep(0.2)
                    logger_l2_codes_subscript.info("({})发送到华鑫L2代码处理队列:数量-{}", request_id, len(datas))
        except Exception as e:
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)
        finally:
            time.sleep(0.01)
 
 
def run_data_listener(queue_other_w_l2_r, queue_l1_w_strategy_r):
    """
    运行数据监听器
    @param queue_other_w_l2_r:
    @return:
    """
    # 交易数据更新任务
    huaxin_trade_data_update.run()
 
    # 接收来自L1的数据
    threading.Thread(target=lambda: __listen_l1_target_codes(queue_l1_w_strategy_r), daemon=True).start()
 
    # 接收L2订阅
    threading.Thread(target=lambda: __listen_l2_subscript_target_codes(queue_other_w_l2_r), daemon=True).start()
    # 运行异步redis同步服务
    threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True).start()
    # 同步异步日志
    threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
    # 同步L2的异步日志
    l2_log.codeLogQueueDistributeManager.run_async()
    threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True).start()
    while True:
        time.sleep(5)