From 15199f8e93fe48e6261c99eadf6673d788db3a80 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期日, 17 三月 2024 22:56:10 +0800 Subject: [PATCH] L2进程与策略进程合并 --- huaxin_client/l2_data_manager.py | 33 +++++++++++++++++++++++++++------ 1 files changed, 27 insertions(+), 6 deletions(-) diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index ec5e72f..793d5e0 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -6,12 +6,15 @@ import queue import threading import time + +import constant from huaxin_client import socket_util from huaxin_client.client_network import SendResponseSkManager # 娲诲姩鏃堕棿 -from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager +from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager +from huaxin_client.l2_data_transform_protocol import L2DataCallBack from log_module import async_log_util from log_module.async_log_util import huaxin_l2_log from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \ @@ -34,13 +37,17 @@ class L2DataUploadManager: def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, transaction_queue_distribute_manager: CodeQueueDistributeManager, - market_data_queue: multiprocessing.Queue, order_ipc_hosts): + market_data_queue: multiprocessing.Queue, order_ipc_hosts, data_callback_distribute_manager:CodeDataCallbackDistributeManager): + self.order_queue_distribute_manager = order_queue_distribute_manager self.transaction_queue_distribute_manager = transaction_queue_distribute_manager self.market_data_queue = market_data_queue + self.data_callback_distribute_manager = data_callback_distribute_manager + # 浠g爜鍒嗛厤鐨勫璞� self.temp_order_queue_dict = {} self.temp_transaction_queue_dict = {} self.temp_log_queue_dict = {} + self.filter_order_condition_dict = {} self.upload_l2_data_task_dict = {} self.l2_order_codes = set() @@ -119,7 +126,11 @@ def add_market_data(self, data): # 鍔犲叆涓婁紶闃熷垪 - self.market_data_queue.put_nowait(data) + # self.market_data_queue.put_nowait(data) + code = data['securityID'] + callback = self.data_callback_distribute_manager.get_distributed_callback(code) + if callback: + callback.OnMarketData(code, data) # 鍒嗛厤涓婁紶闃熷垪 def distribute_upload_queue(self, code): @@ -127,6 +138,9 @@ self.order_queue_distribute_manager.distribute_queue(code) if not self.transaction_queue_distribute_manager.get_distributed_queue(code): self.transaction_queue_distribute_manager.distribute_queue(code) + if not self.data_callback_distribute_manager.get_distributed_callback(code): + self.data_callback_distribute_manager.distribute_callback(code) + if code not in self.temp_order_queue_dict: self.temp_order_queue_dict[code] = collections.deque() @@ -135,7 +149,8 @@ if code not in self.temp_log_queue_dict: self.temp_log_queue_dict[code] = queue.Queue() # 鍒嗛厤璁㈠崟涓婁紶鍗忚 - self.l2_order_upload_protocol.distribute_upload_host(code) + if not constant.is_windows(): + self.l2_order_upload_protocol.distribute_upload_host(code) if code not in self.upload_l2_data_task_dict: t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) @@ -151,6 +166,7 @@ self.order_queue_distribute_manager.release_distribute_queue(code) self.transaction_queue_distribute_manager.release_distribute_queue(code) self.l2_order_upload_protocol.release_distributed_upload_host(code) + self.data_callback_distribute_manager.release_distribute_callback(code) if code in self.temp_order_queue_dict: self.temp_order_queue_dict[code].clear() self.temp_order_queue_dict.pop(code) @@ -187,8 +203,10 @@ if temp_list: # 涓婁紶鏁版嵁 # self.__upload_l2_data(code, upload_queue, temp_list) - self.__upload_l2_order_data(code, temp_list) + # self.__upload_l2_order_data(code, temp_list) + self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list, time.time()) temp_list = [] + else: if code not in self.temp_order_queue_dict: self.l2_order_codes.discard(code) @@ -216,7 +234,8 @@ temp_list.append(data) if temp_list: # 涓婁紶鏁版嵁 - self.__upload_l2_data(code, upload_queue, temp_list) + # self.__upload_l2_data(code, upload_queue, temp_list) + self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code, temp_list) temp_list = [] else: if code not in self.temp_transaction_queue_dict: @@ -254,6 +273,8 @@ self.code_socket_client_dict = {} self.rlock = threading.RLock() context = zmq.Context() + if constant.is_windows(): + return for host in self.ipchosts: socket = context.socket(zmq.REQ) socket.connect(host) -- Gitblit v1.8.0