import json
|
import logging
|
import multiprocessing
|
import threading
|
import time
|
|
import constant
|
from db import redis_manager_delegate as redis_manager
|
from l2 import l2_log
|
from l2.huaxin import huaxin_target_codes_manager
|
from l2.subscript import l2_subscript_manager
|
from log_module import async_log_util
|
from log_module.log import logger_system, logger_l2_codes_subscript, logger_debug
|
from servers.huaxin_trade_server import TradeServerProcessor
|
from third_data import block_info
|
from trade.huaxin import huaxin_trade_data_update
|
from trade.huaxin.huaxin_trade_api import ClientSocketManager
|
from utils import tool
|
|
|
def __listen_l1_target_codes(queue_l1_w_strategy_r: multiprocessing.Queue):
|
logger_system.info(f"__listen_l1_target_codes 线程ID:{tool.get_thread_id()}")
|
if queue_l1_w_strategy_r is not None:
|
while True:
|
try:
|
val = queue_l1_w_strategy_r.get()
|
if val:
|
val = json.loads(val)
|
# 处理数据
|
type_ = val["type"]
|
timestamp = val.get("time")
|
# 大于10s的数据放弃处理
|
if type_ == "set_target_codes":
|
async_log_util.info(logger_l2_codes_subscript, f"策略接收到数据")
|
if time.time() * 1000 - timestamp > 10 * 1000:
|
continue
|
TradeServerProcessor.set_target_codes(val)
|
except Exception as e:
|
logger_debug.exception(e)
|
|
|
def __listen_l2_subscript_target_codes(queue_other_w_l2_r: multiprocessing.Queue):
|
"""
|
监听L2订阅目标代码
|
@param queue_other_w_l2_r:
|
@return:
|
"""
|
logger_system.info("启动读取L2订阅代码队列")
|
while True:
|
try:
|
_datas = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.pop()
|
if _datas:
|
times = _datas[0]
|
datas = _datas[1]
|
request_id = _datas[2]
|
async_log_util.info(logger_l2_codes_subscript, "({})读取L2代码处理队列:数量-{}", request_id, len(datas))
|
# 只处理20s内的数据
|
if time.time() - times < 20:
|
# datas中的数据格式:(代码, 现价, 涨幅, 量, 时间)
|
if not datas:
|
# 没有数据需要处理
|
continue
|
|
# 再次获取代码
|
codes = [d[0] for d in datas]
|
for code in codes:
|
block_info.init_code(code)
|
if constant.IS_L2_NEW:
|
process_manager: l2_subscript_manager.TargetCodeProcessManager = l2_subscript_manager\
|
.process_manager
|
queue_codes_list = process_manager.set_codes(set(codes))
|
code_data_dict = {d[0]: d for d in datas}
|
for queue_codes in queue_codes_list:
|
root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
|
"data": [code_data_dict.get(c) for c in queue_codes[1]]}
|
queue_codes[0].put_nowait(json.dumps(root_data))
|
else:
|
root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
|
"data": datas}
|
queue_other_w_l2_r.put_nowait(json.dumps(root_data))
|
# 如果在9:25-9:29 需要加载板块
|
# if int("092500") < int(tool.get_now_time_str().replace(":", "")) < int("092900"):
|
# for d in datas:
|
# threading.Thread(target=lambda: KPLCodeJXBlockManager().load_jx_blocks(d[0],
|
# gpcode_manager.get_price(
|
# d[0]),
|
# float(d[2]),
|
# KPLLimitUpDataRecordManager.get_current_reasons()),
|
# daemon=True).start()
|
# time.sleep(0.2)
|
async_log_util.info(logger_l2_codes_subscript, "({})发送到华鑫L2代码处理队列:数量-{}", request_id, len(datas))
|
except Exception as e:
|
logging.exception(e)
|
logger_l2_codes_subscript.exception(e)
|
finally:
|
time.sleep(0.01)
|
|
|
def run_data_listener(queue_other_w_l2_r, queue_l1_w_strategy_r):
|
"""
|
运行数据监听器
|
@param queue_other_w_l2_r:
|
@return:
|
"""
|
# 交易数据更新任务
|
huaxin_trade_data_update.run()
|
|
# 接收来自L1的数据
|
threading.Thread(target=lambda: __listen_l1_target_codes(queue_l1_w_strategy_r), daemon=True).start()
|
|
# 接收L2订阅
|
threading.Thread(target=lambda: __listen_l2_subscript_target_codes(queue_other_w_l2_r), daemon=True).start()
|
# 运行异步redis同步服务
|
threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True).start()
|
# 同步异步日志
|
threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
|
# 同步L2的异步日志
|
l2_log.codeLogQueueDistributeManager.run_async()
|
threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True).start()
|
while True:
|
time.sleep(5)
|