constant.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
l2/subscript/l2_subscript_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
main.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
task/task_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
constant.py
@@ -230,4 +230,7 @@ # 忽略板块流入的市场强度 IGNORE_BLOCK_IN_MONEY_MARKET_STRONG = 101 # 新版L2监听 IS_L2_NEW = False l2/subscript/l2_subscript_manager.py
@@ -5,11 +5,14 @@ import multiprocessing import random import threading import time import msgpack import zmq from huaxin_client import l2_data_transform_protocol from log_module import async_log_util from log_module.log import logger_debug from utils import shared_memery_util process_manager = None @@ -85,6 +88,8 @@ L2数据监听 """ last_log_time = time.time() def __init__(self, channel_list): """ @@ -123,10 +128,14 @@ while True: data = socket.recv() try: #接收数据 # 接收数据 data = msgpack.unpackb(data) shared_memery_id = data["data"]["memery_number"] datas = shared_memery_util.read_datas(self.shared_memery_num_object_dict.get(shared_memery_id)) if time.time() - self.last_log_time > 10: async_log_util.info(logger_debug, f"L2-V2获取到数据:{datas}") self.last_log_time = time.time() if data["type"] == 1: # 委托 code, data_list, timestamp = datas[0], datas[1], datas[2] main.py
@@ -179,7 +179,7 @@ # 测试L2单独进程 if True: if constant.IS_L2_NEW: __create_l2_subscript() else: # 将L2的进程改为线程执行 task/task_manager.py
@@ -4,9 +4,11 @@ import threading import time import constant from db import redis_manager_delegate as redis_manager from l2 import l2_log from l2.huaxin import huaxin_target_codes_manager from l2.subscript import l2_subscript_manager from log_module import async_log_util from log_module.log import logger_system, logger_l2_codes_subscript, logger_debug from servers.huaxin_trade_server import TradeServerProcessor @@ -63,9 +65,19 @@ codes = [d[0] for d in datas] for code in codes: block_info.init_code(code) root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": datas} queue_other_w_l2_r.put_nowait(json.dumps(root_data)) if constant.IS_L2_NEW: process_manager: l2_subscript_manager.TargetCodeProcessManager = l2_subscript_manager\ .process_manager queue_codes_list = process_manager.set_codes(set(codes)) code_data_dict = {d[0]: d for d in datas} for queue_codes in queue_codes_list: root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": [code_data_dict.get(c) for c in queue_codes[1]]} queue_codes[0].put_nowait(json.dumps(root_data)) else: root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": datas} queue_other_w_l2_r.put_nowait(json.dumps(root_data)) # 如果在9:25-9:29 需要加载板块 # if int("092500") < int(tool.get_now_time_str().replace(":", "")) < int("092900"): # for d in datas: