From ae8d76a456b64c1c6c4ebf11b6ec33b7df217b1a Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 18 八月 2025 13:43:46 +0800 Subject: [PATCH] bug修复 --- l2/subscript/l2_subscript_manager.py | 93 +++++++++++++++++++++++++++++++++++++++++++++- 1 files changed, 90 insertions(+), 3 deletions(-) diff --git a/l2/subscript/l2_subscript_manager.py b/l2/subscript/l2_subscript_manager.py index 8532f7c..6d947ed 100644 --- a/l2/subscript/l2_subscript_manager.py +++ b/l2/subscript/l2_subscript_manager.py @@ -4,6 +4,17 @@ import math import multiprocessing import random +import threading +import time + +import zmq + +from huaxin_client import l2_data_transform_protocol +from log_module import async_log_util +from log_module.log import logger_debug +from utils import shared_memery_util + +process_manager = None class TargetCodeProcessManager: @@ -29,7 +40,12 @@ # 浠g爜鎵�鍦ㄩ槦鍒桰D self.__code_queue_dict = {} - def add_codes(self, codes: set): + def set_codes(self, codes: set): + """ + 璁剧疆璁㈤槄浠g爜 + @param codes: + @return: 杩斿洖闃熷垪涓庡搴斿垎閰嶇殑浠g爜锛歔(闃熷垪瀵硅薄, {"浠g爜1","浠g爜2"}),...] + """ add_codes = codes - self.__code_queue_dict.keys() del_codes = self.__code_queue_dict.keys() - codes # 鍒犻櫎浠g爜 @@ -44,7 +60,7 @@ for code in add_codes: # 瀵绘壘鏈弧鐨勯槦鍒� for queue_id in self.__queue_codes: - count_per_process = min(self.__max_code_count_per_queue_dict.get(queue_id), math.ceil(len(codes) / len(self.__com_queues))) + count_per_process = self.__max_code_count_per_queue_dict.get(queue_id) if len(self.__queue_codes[queue_id]) >= count_per_process: # 闃熷垪宸叉弧 continue @@ -52,6 +68,10 @@ self.__queue_codes[queue_id].add(code) self.__code_queue_dict[code] = queue_id break + # 鍒嗛厤璁㈤槄淇℃伅 + logger_debug.info(f"璁㈤槄閫氶亾鍒嗛厤锛歿self.__code_queue_dict}") + return [(self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes[queue_id]) for queue_id in + self.__queue_codes] def get_queues_with_codes(self): """ @@ -64,8 +84,75 @@ return results +class L2DataListener: + """ + L2鏁版嵁鐩戝惉 + """ + + last_log_time = time.time() + + def __init__(self, channel_list): + """ + + @param channel_list:channel_list:[((鍏变韩鍐呭瓨缂栧彿,濮旀墭鍏变韩鍐呭瓨鏁扮粍, zmq鍦板潃),(鍏变韩鍐呭瓨缂栧彿,鎴愪氦鍏变韩鍐呭瓨鏁扮粍, zmq鍦板潃))] + """ + self.channel_list = channel_list + # 璁剧疆鍏变韩鍐呭瓨缂栧彿涓庡叡浜唴瀛樻暟缁勬槧灏� + self.shared_memery_num_object_dict = {} + for channel in self.channel_list: + self.shared_memery_num_object_dict[channel[0][0]] = channel[0][1] + self.shared_memery_num_object_dict[channel[1][0]] = channel[1][1] + + def create_data_listener(self, l2_data_callback: l2_data_transform_protocol.L2DataCallBack): + """ + 鍒涘缓鏁版嵁鐩戝惉鍣� + @param + @return: + """ + for channel in self.channel_list: + channel_delegate = channel[0] + channel_deal = channel[1] + threading.Thread(target=self.__create_l2_zmq_server, args=(channel_delegate[2], l2_data_callback,), + daemon=True).start() + threading.Thread(target=self.__create_l2_zmq_server, args=(channel_deal[2], l2_data_callback,), + daemon=True).start() + + def __create_l2_zmq_server(self, ipc_addr, l2_data_callback: l2_data_transform_protocol.L2DataCallBack): + """ + 鍒涘缓L2閫愮瑪濮旀墭/鎴愪氦zmq鏈嶅姟 + @param ipc_addr: + @return: + """ + context = zmq.Context() + socket = context.socket(zmq.REP) + socket.bind(ipc_addr) + while True: + data = socket.recv() + try: + # 鎺ユ敹鏁版嵁 + data = msgpack.unpackb(data) + shared_memery_id = data["data"]["memery_number"] + datas = shared_memery_util.read_datas(self.shared_memery_num_object_dict.get(shared_memery_id)) + + if time.time() - self.last_log_time > 10: + async_log_util.info(logger_debug, f"L2-V2鑾峰彇鍒版暟鎹細{datas}") + self.last_log_time = time.time() + if data["type"] == 1: + # 濮旀墭 + code, data_list, timestamp = datas[0], datas[1], datas[2] + l2_data_callback.OnL2Order(code, data_list, timestamp) + elif data["type"] == 2: + # 鎴愪氦 + code, data_list = datas[0], datas[1] + l2_data_callback.OnL2Transaction(code, data_list) + except Exception as e: + pass + finally: + socket.send_string("SUCCESS") + + if __name__ == "__main__": - queues = [multiprocessing.Queue() for i in range(7)] + queues = [multiprocessing.Queue(maxsize=1024) for i in range(7)] manager = TargetCodeProcessManager(queues, 10) counts = [70, 60, 50, 10] for i in range(4): -- Gitblit v1.8.0