From 23cecbb7d4ddf4149cd6b106bfaad415db45bcc3 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 18 三月 2024 14:53:58 +0800
Subject: [PATCH] L2日志修改

---
 huaxin_client/l2_client.py          |    4 ++--
 trade/huaxin/huaxin_trade_server.py |    4 ++--
 l2/l2_data_listen_manager.py        |    5 +----
 main.py                             |   11 ++++-------
 huaxin_client/l2_data_manager.py    |   12 +-----------
 5 files changed, 10 insertions(+), 26 deletions(-)

diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py
index 1649e59..50709d0 100644
--- a/huaxin_client/l2_client.py
+++ b/huaxin_client/l2_client.py
@@ -590,7 +590,7 @@
 
 
 def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
-        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, order_ipc_hosts: list, data_callbacks:list) -> None:
+        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, data_callbacks:list) -> None:
 
     logger_system.info("L2杩涚▼ID锛歿}", os.getpid())
     logger_system.info(f"l2_client 绾跨▼ID:{tool.get_thread_id()}")
@@ -605,7 +605,7 @@
         transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues)
         data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
         l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager,
-                                                     transaction_queue_distribute_manager, market_queue, order_ipc_hosts, data_callback_distribute_manager)
+                                                     transaction_queue_distribute_manager, market_queue, data_callback_distribute_manager)
         __init_l2(l2_data_upload_manager)
         l2_data_manager.run_upload_common()
         l2_data_manager.run_log()
diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py
index 0e26cd3..442d396 100644
--- a/huaxin_client/l2_data_manager.py
+++ b/huaxin_client/l2_data_manager.py
@@ -14,7 +14,6 @@
 
 # 娲诲姩鏃堕棿
 from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager
-from huaxin_client.l2_data_transform_protocol import L2DataCallBack
 from log_module import async_log_util
 from log_module.async_log_util import huaxin_l2_log
 from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \
@@ -37,7 +36,7 @@
 class L2DataUploadManager:
     def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                  transaction_queue_distribute_manager: CodeQueueDistributeManager,
-                 market_data_queue: multiprocessing.Queue, order_ipc_hosts,
+                 market_data_queue: multiprocessing.Queue,
                  data_callback_distribute_manager: CodeDataCallbackDistributeManager):
 
         self.order_queue_distribute_manager = order_queue_distribute_manager
@@ -53,7 +52,6 @@
         self.upload_l2_data_task_dict = {}
         self.l2_order_codes = set()
         self.l2_transaction_codes = set()
-        self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts)
 
     # 璁剧疆璁㈠崟杩囨护鏉′欢
     # special_price:杩囨护鐨�1鎵嬬殑浠锋牸
@@ -148,10 +146,6 @@
             self.temp_transaction_queue_dict[code] = collections.deque()
         if code not in self.temp_log_queue_dict:
             self.temp_log_queue_dict[code] = queue.Queue()
-        # 鍒嗛厤璁㈠崟涓婁紶鍗忚
-        if not constant.is_windows():
-            self.l2_order_upload_protocol.distribute_upload_host(code)
-
         if code not in self.upload_l2_data_task_dict:
             t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
             t1.start()
@@ -165,7 +159,6 @@
     def release_distributed_upload_queue(self, code):
         self.order_queue_distribute_manager.release_distribute_queue(code)
         self.transaction_queue_distribute_manager.release_distribute_queue(code)
-        self.l2_order_upload_protocol.release_distributed_upload_host(code)
         self.data_callback_distribute_manager.release_distribute_callback(code)
         if code in self.temp_order_queue_dict:
             self.temp_order_queue_dict[code].clear()
@@ -181,9 +174,6 @@
 
     def __upload_l2_data(self, code, _queue, datas):
         _queue.put_nowait(marshal.dumps([code, datas, time.time()]))
-
-    def __upload_l2_order_data(self, code, datas):
-        self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time()))
 
     # 澶勭悊璁㈠崟鏁版嵁骞朵笂浼�
     def __run_upload_order_task(self, code):
diff --git a/l2/l2_data_listen_manager.py b/l2/l2_data_listen_manager.py
index f35f63d..6727eb9 100644
--- a/l2/l2_data_listen_manager.py
+++ b/l2/l2_data_listen_manager.py
@@ -134,7 +134,7 @@
             threading.Thread(target=lambda: self.__create_ipc_server(host), daemon=True).start()
 
     # 鎺ユ敹L2鏁版嵁
-    def receive_l2_data(self, order_queues, transaction_queues, market_queue, order_ipc_hosts):
+    def receive_l2_data(self, order_queues, transaction_queues, market_queue):
         # TODO 鏆傛椂涓嶉�氳繃闃熷垪鎺ユ敹鏁版嵁
         # for q in order_queues:
         #     t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
@@ -144,9 +144,6 @@
             t2.start()
         t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True)
         t3.start()
-        # 鎺ユ敹璁㈠崟hosts
-        if not constant.is_windows():
-            self.__create_ipc_server_hosts(order_ipc_hosts)
 
     def get_active_count(self, type_):
         expire_time = time.time() - 5
diff --git a/main.py b/main.py
index bcc1459..f73fdc2 100644
--- a/main.py
+++ b/main.py
@@ -29,7 +29,7 @@
                       queue_l1_w_strategy_r_: multiprocessing.Queue,
                       queue_strategy_w_trade_r_: multiprocessing.Queue,
                       queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_,
-                      market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_, order_ipc_hosts_):
+                      market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_):
     logger_system.info("绛栫暐杩涚▼ID锛歿}", os.getpid())
     log.close_print()
     # 鍒濆鍖栧弬鏁�
@@ -53,7 +53,7 @@
     huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
                             queue_strategy_w_trade_r_for_read_, order_queues_,
                             transaction_queues_, market_queue_,
-                            queue_l1_trade_w_strategy_r_, order_ipc_hosts_)
+                            queue_l1_trade_w_strategy_r_)
 
 
 # 涓绘湇鍔�
@@ -131,14 +131,11 @@
         # 鍒涘缓L2閫氫俊闃熷垪
         order_queues = []
         transaction_queues = []
-        order_ipc_hosts = []
         market_queue = multiprocessing.Queue()
         for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
             order_queues.append(multiprocessing.Queue())
         for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
             transaction_queues.append(multiprocessing.Queue())
-        for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
-            order_ipc_hosts.append(f"ipc://l2order{i}.ipc")
 
         # 姝ゅ灏哃2鐨勮繘绋嬩笌绛栫暐杩涚▼鍚堝苟
         # L2
@@ -148,14 +145,14 @@
         # l2Process.start()
         # 灏哃2鐨勮繘绋嬫敼涓鸿繘绋嬫墽琛�
         threading.Thread(target=huaxin_client.l2_client.run, args=(
-            queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts,
+            queue_other_w_l2_r, order_queues, transaction_queues, market_queue,
             huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
 
         # 涓昏繘绋�
         createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                           queue_strategy_w_trade_r_for_read,
                           order_queues, transaction_queues, market_queue, queue_l1_trade_r_strategy_w,
-                          queue_l1_trade_w_strategy_r, order_ipc_hosts)
+                          queue_l1_trade_w_strategy_r)
 
         # 灏唗radeServer浣滀负涓昏繘绋�
         l1Process.join()
diff --git a/trade/huaxin/huaxin_trade_server.py b/trade/huaxin/huaxin_trade_server.py
index fcf118d..b46d721 100644
--- a/trade/huaxin/huaxin_trade_server.py
+++ b/trade/huaxin/huaxin_trade_server.py
@@ -1685,7 +1685,7 @@
 
 def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
         order_queues, transaction_queues,
-        market_queue, queue_l1_trade_w_strategy_r, order_ipc_hosts):
+        market_queue, queue_l1_trade_w_strategy_r):
     logger_system.info(f"trade_server 绾跨▼ID:{tool.get_thread_id()}")
     try:
         # 鎵ц涓�浜涘垵濮嬪寲鏁版嵁
@@ -1701,7 +1701,7 @@
         # 鐩戝惉L2鏁版嵁
         global l2DataListenManager
         l2DataListenManager = L2DataListenManager(my_l2_data_callback)
-        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue, order_ipc_hosts)
+        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue)
 
         # 鍚姩浜ゆ槗鏈嶅姟
         huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,

--
Gitblit v1.8.0