From 122def357140a4a504710a57fe2bc1a8020aa7b1 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 27 二月 2024 17:57:39 +0800 Subject: [PATCH] 新版L2数据传输协议测试 --- huaxin_client/l2_data_manager.py | 41 ++++++++++++++++++++++++----------------- 1 files changed, 24 insertions(+), 17 deletions(-) diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index 0dd5bf7..ccddadf 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -30,9 +30,10 @@ # L2涓婁紶鏁版嵁绠$悊鍣� class L2DataUploadManager: + # order_ipc_hosts:杩滅▼host def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, transaction_queue_distribute_manager: CodeQueueDistributeManager, - market_data_queue: multiprocessing.Queue): + market_data_queue: multiprocessing.Queue, order_ipc_hosts): self.order_queue_distribute_manager = order_queue_distribute_manager self.transaction_queue_distribute_manager = transaction_queue_distribute_manager self.market_data_queue = market_data_queue @@ -42,6 +43,8 @@ self.upload_l2_data_task_dict = {} self.l2_order_codes = set() self.l2_transaction_codes = set() + # 璁㈠崟 + self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts) # 璁剧疆璁㈠崟杩囨护鏉′欢 # special_price:杩囨护鐨�1鎵嬬殑浠锋牸 @@ -141,6 +144,8 @@ self.temp_order_queue_dict[code] = collections.deque() if code not in self.temp_transaction_queue_dict: self.temp_transaction_queue_dict[code] = collections.deque() + # 鍒嗛厤璁㈠崟涓婁紶鍗忚 + self.l2_order_upload_protocol.distribute_upload_host(code) if code not in self.upload_l2_data_task_dict: t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) @@ -161,9 +166,13 @@ self.temp_transaction_queue_dict.pop(code) if code in self.upload_l2_data_task_dict: self.upload_l2_data_task_dict.pop(code) + self.l2_order_upload_protocol.release_distributed_upload_host(code) def __upload_l2_data(self, code, _queue, datas): _queue.put_nowait((code, datas, time.time())) + + def __upload_l2_order_data(self, code, datas): + self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time())) # 澶勭悊璁㈠崟鏁版嵁骞朵笂浼� def __run_upload_order_task(self, code): @@ -182,7 +191,8 @@ if temp_list: # 涓婁紶鏁版嵁 - self.__upload_l2_data(code, upload_queue, temp_list) + # self.__upload_l2_data(code, upload_queue, temp_list) + self.__upload_l2_order_data(code, temp_list) temp_list = [] else: if code not in self.temp_order_queue_dict: @@ -225,7 +235,6 @@ pass -# L2涓婁紶鏁版嵁鍗忚绠$悊鍣� class L2DataUploadProtocolManager: # ipchosts IPC鍗忚 @@ -235,7 +244,7 @@ self.socket_client_dict = {} # 淇濆瓨浠g爜鍒嗛厤鐨刢lient 鏍煎紡锛歿code:(host, socket)} self.code_socket_client_dict = {} - self.lock = threading.Lock() + self.rlock = threading.RLock() context = zmq.Context() for host in self.ipchosts: socket = context.socket(zmq.REQ) @@ -244,34 +253,31 @@ # 鑾峰彇 def __get_available_ipchost(self): - try: - if len(self.code_socket_client_dict) >= len(self.socket_client_dict): - raise Exception("鏃犲彲鐢╤ost") - used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict]) - for host in self.socket_client_dict: - if host not in used_hosts: - return host, self.socket_client_dict[host] + if len(self.code_socket_client_dict) >= len(self.socket_client_dict): raise Exception("鏃犲彲鐢╤ost") - finally: - self.lock.release() + used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict]) + for host in self.socket_client_dict: + if host not in used_hosts: + return host, self.socket_client_dict[host] + raise Exception("鏃犲彲鐢╤ost") # 鍒嗛厤HOST def distribute_upload_host(self, code): - self.lock.acquire() + self.rlock.acquire() try: host_info = self.__get_available_ipchost() if host_info: self.code_socket_client_dict[code] = host_info finally: - self.lock.release() + self.rlock.release() def release_distributed_upload_host(self, code): - self.lock.acquire() + self.rlock.acquire() try: if code in self.code_socket_client_dict: self.code_socket_client_dict.pop(code) finally: - self.lock.release() + self.rlock.release() def upload_data_as_json(self, code, data): if code not in self.code_socket_client_dict: @@ -386,6 +392,7 @@ pass + def run_test(): t = threading.Thread(target=lambda: __test(), daemon=True) t.start() -- Gitblit v1.8.0