From ae8d76a456b64c1c6c4ebf11b6ec33b7df217b1a Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 18 八月 2025 13:43:46 +0800
Subject: [PATCH] bug修复

---
 l2/subscript/l2_subscript_manager.py |   93 +++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 90 insertions(+), 3 deletions(-)

diff --git a/l2/subscript/l2_subscript_manager.py b/l2/subscript/l2_subscript_manager.py
index 8532f7c..6d947ed 100644
--- a/l2/subscript/l2_subscript_manager.py
+++ b/l2/subscript/l2_subscript_manager.py
@@ -4,6 +4,17 @@
 import math
 import multiprocessing
 import random
+import threading
+import time
+
+import zmq
+
+from huaxin_client import l2_data_transform_protocol
+from log_module import async_log_util
+from log_module.log import logger_debug
+from utils import shared_memery_util
+
+process_manager = None
 
 
 class TargetCodeProcessManager:
@@ -29,7 +40,12 @@
         # 浠g爜鎵�鍦ㄩ槦鍒桰D
         self.__code_queue_dict = {}
 
-    def add_codes(self, codes: set):
+    def set_codes(self, codes: set):
+        """
+        璁剧疆璁㈤槄浠g爜
+        @param codes:
+        @return: 杩斿洖闃熷垪涓庡搴斿垎閰嶇殑浠g爜锛歔(闃熷垪瀵硅薄, {"浠g爜1","浠g爜2"}),...]
+        """
         add_codes = codes - self.__code_queue_dict.keys()
         del_codes = self.__code_queue_dict.keys() - codes
         # 鍒犻櫎浠g爜
@@ -44,7 +60,7 @@
         for code in add_codes:
             # 瀵绘壘鏈弧鐨勯槦鍒�
             for queue_id in self.__queue_codes:
-                count_per_process = min(self.__max_code_count_per_queue_dict.get(queue_id), math.ceil(len(codes) / len(self.__com_queues)))
+                count_per_process = self.__max_code_count_per_queue_dict.get(queue_id)
                 if len(self.__queue_codes[queue_id]) >= count_per_process:
                     # 闃熷垪宸叉弧
                     continue
@@ -52,6 +68,10 @@
                 self.__queue_codes[queue_id].add(code)
                 self.__code_queue_dict[code] = queue_id
                 break
+        # 鍒嗛厤璁㈤槄淇℃伅
+        logger_debug.info(f"璁㈤槄閫氶亾鍒嗛厤锛歿self.__code_queue_dict}")
+        return [(self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes[queue_id]) for queue_id in
+                self.__queue_codes]
 
     def get_queues_with_codes(self):
         """
@@ -64,8 +84,75 @@
         return results
 
 
+class L2DataListener:
+    """
+    L2鏁版嵁鐩戝惉
+    """
+
+    last_log_time = time.time()
+
+    def __init__(self, channel_list):
+        """
+
+        @param channel_list:channel_list:[((鍏变韩鍐呭瓨缂栧彿,濮旀墭鍏变韩鍐呭瓨鏁扮粍, zmq鍦板潃),(鍏变韩鍐呭瓨缂栧彿,鎴愪氦鍏变韩鍐呭瓨鏁扮粍, zmq鍦板潃))]
+        """
+        self.channel_list = channel_list
+        # 璁剧疆鍏变韩鍐呭瓨缂栧彿涓庡叡浜唴瀛樻暟缁勬槧灏�
+        self.shared_memery_num_object_dict = {}
+        for channel in self.channel_list:
+            self.shared_memery_num_object_dict[channel[0][0]] = channel[0][1]
+            self.shared_memery_num_object_dict[channel[1][0]] = channel[1][1]
+
+    def create_data_listener(self, l2_data_callback: l2_data_transform_protocol.L2DataCallBack):
+        """
+        鍒涘缓鏁版嵁鐩戝惉鍣�
+        @param
+        @return:
+        """
+        for channel in self.channel_list:
+            channel_delegate = channel[0]
+            channel_deal = channel[1]
+            threading.Thread(target=self.__create_l2_zmq_server, args=(channel_delegate[2], l2_data_callback,),
+                             daemon=True).start()
+            threading.Thread(target=self.__create_l2_zmq_server, args=(channel_deal[2], l2_data_callback,),
+                             daemon=True).start()
+
+    def __create_l2_zmq_server(self, ipc_addr, l2_data_callback: l2_data_transform_protocol.L2DataCallBack):
+        """
+        鍒涘缓L2閫愮瑪濮旀墭/鎴愪氦zmq鏈嶅姟
+        @param ipc_addr:
+        @return:
+        """
+        context = zmq.Context()
+        socket = context.socket(zmq.REP)
+        socket.bind(ipc_addr)
+        while True:
+            data = socket.recv()
+            try:
+                # 鎺ユ敹鏁版嵁
+                data = msgpack.unpackb(data)
+                shared_memery_id = data["data"]["memery_number"]
+                datas = shared_memery_util.read_datas(self.shared_memery_num_object_dict.get(shared_memery_id))
+
+                if time.time() - self.last_log_time > 10:
+                    async_log_util.info(logger_debug, f"L2-V2鑾峰彇鍒版暟鎹細{datas}")
+                    self.last_log_time = time.time()
+                if data["type"] == 1:
+                    # 濮旀墭
+                    code, data_list, timestamp = datas[0], datas[1], datas[2]
+                    l2_data_callback.OnL2Order(code, data_list, timestamp)
+                elif data["type"] == 2:
+                    # 鎴愪氦
+                    code, data_list = datas[0], datas[1]
+                    l2_data_callback.OnL2Transaction(code, data_list)
+            except Exception as e:
+                pass
+            finally:
+                socket.send_string("SUCCESS")
+
+
 if __name__ == "__main__":
-    queues = [multiprocessing.Queue() for i in range(7)]
+    queues = [multiprocessing.Queue(maxsize=1024) for i in range(7)]
     manager = TargetCodeProcessManager(queues, 10)
     counts = [70, 60, 50, 10]
     for i in range(4):

--
Gitblit v1.8.0