From 23cecbb7d4ddf4149cd6b106bfaad415db45bcc3 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 18 三月 2024 14:53:58 +0800 Subject: [PATCH] L2日志修改 --- huaxin_client/l2_client.py | 4 ++-- trade/huaxin/huaxin_trade_server.py | 4 ++-- l2/l2_data_listen_manager.py | 5 +---- main.py | 11 ++++------- huaxin_client/l2_data_manager.py | 12 +----------- 5 files changed, 10 insertions(+), 26 deletions(-) diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py index 1649e59..50709d0 100644 --- a/huaxin_client/l2_client.py +++ b/huaxin_client/l2_client.py @@ -590,7 +590,7 @@ def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue], - transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, order_ipc_hosts: list, data_callbacks:list) -> None: + transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, data_callbacks:list) -> None: logger_system.info("L2杩涚▼ID锛歿}", os.getpid()) logger_system.info(f"l2_client 绾跨▼ID:{tool.get_thread_id()}") @@ -605,7 +605,7 @@ transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues) data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager, - transaction_queue_distribute_manager, market_queue, order_ipc_hosts, data_callback_distribute_manager) + transaction_queue_distribute_manager, market_queue, data_callback_distribute_manager) __init_l2(l2_data_upload_manager) l2_data_manager.run_upload_common() l2_data_manager.run_log() diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index 0e26cd3..442d396 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -14,7 +14,6 @@ # 娲诲姩鏃堕棿 from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager -from huaxin_client.l2_data_transform_protocol import L2DataCallBack from log_module import async_log_util from log_module.async_log_util import huaxin_l2_log from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \ @@ -37,7 +36,7 @@ class L2DataUploadManager: def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, transaction_queue_distribute_manager: CodeQueueDistributeManager, - market_data_queue: multiprocessing.Queue, order_ipc_hosts, + market_data_queue: multiprocessing.Queue, data_callback_distribute_manager: CodeDataCallbackDistributeManager): self.order_queue_distribute_manager = order_queue_distribute_manager @@ -53,7 +52,6 @@ self.upload_l2_data_task_dict = {} self.l2_order_codes = set() self.l2_transaction_codes = set() - self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts) # 璁剧疆璁㈠崟杩囨护鏉′欢 # special_price:杩囨护鐨�1鎵嬬殑浠锋牸 @@ -148,10 +146,6 @@ self.temp_transaction_queue_dict[code] = collections.deque() if code not in self.temp_log_queue_dict: self.temp_log_queue_dict[code] = queue.Queue() - # 鍒嗛厤璁㈠崟涓婁紶鍗忚 - if not constant.is_windows(): - self.l2_order_upload_protocol.distribute_upload_host(code) - if code not in self.upload_l2_data_task_dict: t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) t1.start() @@ -165,7 +159,6 @@ def release_distributed_upload_queue(self, code): self.order_queue_distribute_manager.release_distribute_queue(code) self.transaction_queue_distribute_manager.release_distribute_queue(code) - self.l2_order_upload_protocol.release_distributed_upload_host(code) self.data_callback_distribute_manager.release_distribute_callback(code) if code in self.temp_order_queue_dict: self.temp_order_queue_dict[code].clear() @@ -181,9 +174,6 @@ def __upload_l2_data(self, code, _queue, datas): _queue.put_nowait(marshal.dumps([code, datas, time.time()])) - - def __upload_l2_order_data(self, code, datas): - self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time())) # 澶勭悊璁㈠崟鏁版嵁骞朵笂浼� def __run_upload_order_task(self, code): diff --git a/l2/l2_data_listen_manager.py b/l2/l2_data_listen_manager.py index f35f63d..6727eb9 100644 --- a/l2/l2_data_listen_manager.py +++ b/l2/l2_data_listen_manager.py @@ -134,7 +134,7 @@ threading.Thread(target=lambda: self.__create_ipc_server(host), daemon=True).start() # 鎺ユ敹L2鏁版嵁 - def receive_l2_data(self, order_queues, transaction_queues, market_queue, order_ipc_hosts): + def receive_l2_data(self, order_queues, transaction_queues, market_queue): # TODO 鏆傛椂涓嶉�氳繃闃熷垪鎺ユ敹鏁版嵁 # for q in order_queues: # t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True) @@ -144,9 +144,6 @@ t2.start() t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True) t3.start() - # 鎺ユ敹璁㈠崟hosts - if not constant.is_windows(): - self.__create_ipc_server_hosts(order_ipc_hosts) def get_active_count(self, type_): expire_time = time.time() - 5 diff --git a/main.py b/main.py index bcc1459..f73fdc2 100644 --- a/main.py +++ b/main.py @@ -29,7 +29,7 @@ queue_l1_w_strategy_r_: multiprocessing.Queue, queue_strategy_w_trade_r_: multiprocessing.Queue, queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_, - market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_, order_ipc_hosts_): + market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_): logger_system.info("绛栫暐杩涚▼ID锛歿}", os.getpid()) log.close_print() # 鍒濆鍖栧弬鏁� @@ -53,7 +53,7 @@ huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_, order_queues_, transaction_queues_, market_queue_, - queue_l1_trade_w_strategy_r_, order_ipc_hosts_) + queue_l1_trade_w_strategy_r_) # 涓绘湇鍔� @@ -131,14 +131,11 @@ # 鍒涘缓L2閫氫俊闃熷垪 order_queues = [] transaction_queues = [] - order_ipc_hosts = [] market_queue = multiprocessing.Queue() for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): order_queues.append(multiprocessing.Queue()) for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): transaction_queues.append(multiprocessing.Queue()) - for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): - order_ipc_hosts.append(f"ipc://l2order{i}.ipc") # 姝ゅ灏哃2鐨勮繘绋嬩笌绛栫暐杩涚▼鍚堝苟 # L2 @@ -148,14 +145,14 @@ # l2Process.start() # 灏哃2鐨勮繘绋嬫敼涓鸿繘绋嬫墽琛� threading.Thread(target=huaxin_client.l2_client.run, args=( - queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts, + queue_other_w_l2_r, order_queues, transaction_queues, market_queue, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start() # 涓昏繘绋� createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, order_queues, transaction_queues, market_queue, queue_l1_trade_r_strategy_w, - queue_l1_trade_w_strategy_r, order_ipc_hosts) + queue_l1_trade_w_strategy_r) # 灏唗radeServer浣滀负涓昏繘绋� l1Process.join() diff --git a/trade/huaxin/huaxin_trade_server.py b/trade/huaxin/huaxin_trade_server.py index fcf118d..b46d721 100644 --- a/trade/huaxin/huaxin_trade_server.py +++ b/trade/huaxin/huaxin_trade_server.py @@ -1685,7 +1685,7 @@ def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, order_queues, transaction_queues, - market_queue, queue_l1_trade_w_strategy_r, order_ipc_hosts): + market_queue, queue_l1_trade_w_strategy_r): logger_system.info(f"trade_server 绾跨▼ID:{tool.get_thread_id()}") try: # 鎵ц涓�浜涘垵濮嬪寲鏁版嵁 @@ -1701,7 +1701,7 @@ # 鐩戝惉L2鏁版嵁 global l2DataListenManager l2DataListenManager = L2DataListenManager(my_l2_data_callback) - l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue, order_ipc_hosts) + l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue) # 鍚姩浜ゆ槗鏈嶅姟 huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, -- Gitblit v1.8.0