import json import logging import multiprocessing import threading import time import constant from db import redis_manager_delegate as redis_manager from l2 import l2_log from l2.huaxin import huaxin_target_codes_manager from l2.subscript import l2_subscript_manager from log_module import async_log_util from log_module.log import logger_system, logger_l2_codes_subscript, logger_debug from servers.huaxin_trade_server import TradeServerProcessor from third_data import block_info from trade.huaxin import huaxin_trade_data_update from trade.huaxin.huaxin_trade_api import ClientSocketManager from utils import tool def __listen_l1_target_codes(queue_l1_w_strategy_r: multiprocessing.Queue): logger_system.info(f"__listen_l1_target_codes 线程ID:{tool.get_thread_id()}") if queue_l1_w_strategy_r is not None: while True: try: val = queue_l1_w_strategy_r.get() if val: val = json.loads(val) # 处理数据 type_ = val["type"] timestamp = val.get("time") # 大于10s的数据放弃处理 if type_ == "set_target_codes": async_log_util.info(logger_l2_codes_subscript, f"策略接收到数据") if time.time() * 1000 - timestamp > 10 * 1000: continue TradeServerProcessor.set_target_codes(val) except Exception as e: logger_debug.exception(e) def __listen_l2_subscript_target_codes(queue_other_w_l2_r: multiprocessing.Queue): """ 监听L2订阅目标代码 @param queue_other_w_l2_r: @return: """ logger_system.info("启动读取L2订阅代码队列") while True: try: _datas = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.pop() if _datas: times = _datas[0] datas = _datas[1] request_id = _datas[2] async_log_util.info(logger_l2_codes_subscript, "({})读取L2代码处理队列:数量-{}", request_id, len(datas)) # 只处理20s内的数据 if time.time() - times < 20: # datas中的数据格式:(代码, 现价, 涨幅, 量, 时间) if not datas: # 没有数据需要处理 continue # 再次获取代码 codes = [d[0] for d in datas] for code in codes: block_info.init_code(code) if constant.IS_L2_NEW: process_manager: l2_subscript_manager.TargetCodeProcessManager = l2_subscript_manager\ .process_manager queue_codes_list = process_manager.set_codes(set(codes)) code_data_dict = {d[0]: d for d in datas} for queue_codes in queue_codes_list: root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": [code_data_dict.get(c) for c in queue_codes[1]]} queue_codes[0].put_nowait(json.dumps(root_data)) else: root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": datas} queue_other_w_l2_r.put_nowait(json.dumps(root_data)) # 如果在9:25-9:29 需要加载板块 # if int("092500") < int(tool.get_now_time_str().replace(":", "")) < int("092900"): # for d in datas: # threading.Thread(target=lambda: KPLCodeJXBlockManager().load_jx_blocks(d[0], # gpcode_manager.get_price( # d[0]), # float(d[2]), # KPLLimitUpDataRecordManager.get_current_reasons()), # daemon=True).start() # time.sleep(0.2) async_log_util.info(logger_l2_codes_subscript, "({})发送到华鑫L2代码处理队列:数量-{}", request_id, len(datas)) except Exception as e: logging.exception(e) logger_l2_codes_subscript.exception(e) finally: time.sleep(0.01) def run_data_listener(queue_other_w_l2_r, queue_l1_w_strategy_r): """ 运行数据监听器 @param queue_other_w_l2_r: @return: """ # 交易数据更新任务 huaxin_trade_data_update.run() # 接收来自L1的数据 threading.Thread(target=lambda: __listen_l1_target_codes(queue_l1_w_strategy_r), daemon=True).start() # 接收L2订阅 threading.Thread(target=lambda: __listen_l2_subscript_target_codes(queue_other_w_l2_r), daemon=True).start() # 运行异步redis同步服务 threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True).start() # 同步异步日志 threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start() # 同步L2的异步日志 l2_log.codeLogQueueDistributeManager.run_async() threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True).start() while True: time.sleep(5)