From 46f51dfb83f6e6a2784676bde64577e5f6f28cf0 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 11 三月 2025 14:31:34 +0800 Subject: [PATCH] 新版L2订阅/L2成交处理时间日志 --- huaxin_client/l2_data_manager_v2.py | 24 +++++++++++++----------- 1 files changed, 13 insertions(+), 11 deletions(-) diff --git a/huaxin_client/l2_data_manager_v2.py b/huaxin_client/l2_data_manager_v2.py index 8f1667d..8076fd6 100644 --- a/huaxin_client/l2_data_manager_v2.py +++ b/huaxin_client/l2_data_manager_v2.py @@ -2,6 +2,7 @@ import json import logging import marshal +import multiprocessing import queue import threading import time @@ -35,13 +36,20 @@ # L2涓婁紶鏁版嵁绠$悊鍣� class L2DataUploadManager: + """ + L2閫愮瑪濮旀墭/L2閫愮瑪鎴愪氦锛氶�氳繃鍏变韩鍐呭瓨+ZMQ涓婁紶鏁版嵁 + L2甯傚満琛屾儏锛� 閫氳繃鏅�氭暟鎹笂浼犻槦鍒楀啓鍏� + L2璁㈤槄鐨勪唬鐮侊細 閫氳繃鏅�氭暟鎹笂浼犻槦鍒楀啓鍏� + """ TYPE_DELEGATE = 1 TYPE_TRANSACTION = 2 TYPE_MARKET = 3 - def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager): + def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager, + common_data_upload_queue: multiprocessing.Queue): self.data_channel_distribute_manager = data_channel_distribute_manager + self.common_data_upload_queue = common_data_upload_queue # 浠g爜鍒嗛厤鐨勫璞� self.temp_order_queue_dict = {} self.temp_transaction_queue_dict = {} @@ -144,13 +152,9 @@ data['SellNo'], data['ExecType'], time.time())) def add_market_data(self, data): - # 鍔犲叆涓婁紶闃熷垪 - # self.market_data_queue.put_nowait(data) code = data['securityID'] - # TODO 鏀逛负zmq鍙戦�� - callback = self.data_channel_distribute_manager.get_distributed_channel(code) - if callback: - callback.OnMarketData(code, data) + # 鏀逛负闃熷垪鍥炶皟鍙戦�� + self.common_data_upload_queue.put_nowait({"type": "l2_market", "data": (code, data)}) # 鍒嗛厤涓婁紶闃熷垪 def distribute_upload_queue(self, code, _target_codes=None): @@ -192,9 +196,6 @@ if code in self.upload_l2_data_task_dict: self.upload_l2_data_task_dict.pop(code) - - def __upload_l2_data(self, code, _queue, datas): - _queue.put_nowait(marshal.dumps([code, datas, time.time()])) # 澶勭悊璁㈠崟鏁版嵁骞朵笂浼� def __run_upload_order_task(self, code): @@ -265,7 +266,8 @@ (code, temp_list)) # 閫氱煡鑾峰彇鏁版嵁 _socket = self.data_channel_distribute_manager.get_zmq_socket(zmq_host) - _socket.send(msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}})) + _socket.send( + msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}})) _socket.recv_string() temp_list = [] else: -- Gitblit v1.8.0