From 46f51dfb83f6e6a2784676bde64577e5f6f28cf0 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期二, 11 三月 2025 14:31:34 +0800
Subject: [PATCH] 新版L2订阅/L2成交处理时间日志

---
 huaxin_client/l2_data_manager_v2.py |   24 +++++++++++++-----------
 1 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/huaxin_client/l2_data_manager_v2.py b/huaxin_client/l2_data_manager_v2.py
index 8f1667d..8076fd6 100644
--- a/huaxin_client/l2_data_manager_v2.py
+++ b/huaxin_client/l2_data_manager_v2.py
@@ -2,6 +2,7 @@
 import json
 import logging
 import marshal
+import multiprocessing
 import queue
 import threading
 import time
@@ -35,13 +36,20 @@
 
 # L2涓婁紶鏁版嵁绠$悊鍣�
 class L2DataUploadManager:
+    """
+    L2閫愮瑪濮旀墭/L2閫愮瑪鎴愪氦锛氶�氳繃鍏变韩鍐呭瓨+ZMQ涓婁紶鏁版嵁
+    L2甯傚満琛屾儏锛� 閫氳繃鏅�氭暟鎹笂浼犻槦鍒楀啓鍏�
+    L2璁㈤槄鐨勪唬鐮侊細 閫氳繃鏅�氭暟鎹笂浼犻槦鍒楀啓鍏�
+    """
 
     TYPE_DELEGATE = 1
     TYPE_TRANSACTION = 2
     TYPE_MARKET = 3
 
-    def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager):
+    def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager,
+                 common_data_upload_queue: multiprocessing.Queue):
         self.data_channel_distribute_manager = data_channel_distribute_manager
+        self.common_data_upload_queue = common_data_upload_queue
         # 浠g爜鍒嗛厤鐨勫璞�
         self.temp_order_queue_dict = {}
         self.temp_transaction_queue_dict = {}
@@ -144,13 +152,9 @@
                       data['SellNo'], data['ExecType'], time.time()))
 
     def add_market_data(self, data):
-        # 鍔犲叆涓婁紶闃熷垪
-        # self.market_data_queue.put_nowait(data)
         code = data['securityID']
-        # TODO 鏀逛负zmq鍙戦��
-        callback = self.data_channel_distribute_manager.get_distributed_channel(code)
-        if callback:
-            callback.OnMarketData(code, data)
+        #  鏀逛负闃熷垪鍥炶皟鍙戦��
+        self.common_data_upload_queue.put_nowait({"type": "l2_market", "data": (code, data)})
 
     # 鍒嗛厤涓婁紶闃熷垪
     def distribute_upload_queue(self, code, _target_codes=None):
@@ -192,9 +196,6 @@
 
         if code in self.upload_l2_data_task_dict:
             self.upload_l2_data_task_dict.pop(code)
-
-    def __upload_l2_data(self, code, _queue, datas):
-        _queue.put_nowait(marshal.dumps([code, datas, time.time()]))
 
     # 澶勭悊璁㈠崟鏁版嵁骞朵笂浼�
     def __run_upload_order_task(self, code):
@@ -265,7 +266,8 @@
                                                  (code, temp_list))
                     # 閫氱煡鑾峰彇鏁版嵁
                     _socket = self.data_channel_distribute_manager.get_zmq_socket(zmq_host)
-                    _socket.send(msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}}))
+                    _socket.send(
+                        msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}}))
                     _socket.recv_string()
                     temp_list = []
                 else:

--
Gitblit v1.8.0