From bb2c58fb407a3783b3704b33df6a154207ae4199 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 13 三月 2024 14:39:05 +0800 Subject: [PATCH] 更改L2数据接收方式 --- huaxin_client/l2_data_manager.py | 16 +++++++++++----- 1 files changed, 11 insertions(+), 5 deletions(-) diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index 9a34109..ec5e72f 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -34,7 +34,7 @@ class L2DataUploadManager: def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, transaction_queue_distribute_manager: CodeQueueDistributeManager, - market_data_queue: multiprocessing.Queue): + market_data_queue: multiprocessing.Queue, order_ipc_hosts): self.order_queue_distribute_manager = order_queue_distribute_manager self.transaction_queue_distribute_manager = transaction_queue_distribute_manager self.market_data_queue = market_data_queue @@ -45,6 +45,7 @@ self.upload_l2_data_task_dict = {} self.l2_order_codes = set() self.l2_transaction_codes = set() + self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts) # 璁剧疆璁㈠崟杩囨护鏉′欢 # special_price:杩囨护鐨�1鎵嬬殑浠锋牸 @@ -133,7 +134,8 @@ self.temp_transaction_queue_dict[code] = collections.deque() if code not in self.temp_log_queue_dict: self.temp_log_queue_dict[code] = queue.Queue() - + # 鍒嗛厤璁㈠崟涓婁紶鍗忚 + self.l2_order_upload_protocol.distribute_upload_host(code) if code not in self.upload_l2_data_task_dict: t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) @@ -148,6 +150,7 @@ def release_distributed_upload_queue(self, code): self.order_queue_distribute_manager.release_distribute_queue(code) self.transaction_queue_distribute_manager.release_distribute_queue(code) + self.l2_order_upload_protocol.release_distributed_upload_host(code) if code in self.temp_order_queue_dict: self.temp_order_queue_dict[code].clear() self.temp_order_queue_dict.pop(code) @@ -162,6 +165,9 @@ def __upload_l2_data(self, code, _queue, datas): _queue.put_nowait(marshal.dumps([code, datas, time.time()])) + + def __upload_l2_order_data(self, code, datas): + self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time())) # 澶勭悊璁㈠崟鏁版嵁骞朵笂浼� def __run_upload_order_task(self, code): @@ -180,8 +186,8 @@ if temp_list: # 涓婁紶鏁版嵁 - self.__upload_l2_data(code, upload_queue, temp_list) - # self.__upload_l2_order_data(code, temp_list) + # self.__upload_l2_data(code, upload_queue, temp_list) + self.__upload_l2_order_data(code, temp_list) temp_list = [] else: if code not in self.temp_order_queue_dict: @@ -289,7 +295,7 @@ if code not in self.code_socket_client_dict: raise Exception("灏氭湭鍒嗛厤host") host, socket = self.code_socket_client_dict[code] - socket.send_json(data) + socket.send(marshal.dumps(data)) socket.recv_string() -- Gitblit v1.8.0