From 59159700fa6300d663140bc44f570ebc90e1998d Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 18 三月 2024 14:36:05 +0800
Subject: [PATCH] L2日志修改

---
 l2/l2_log.py |   84 ++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 80 insertions(+), 4 deletions(-)

diff --git a/l2/l2_log.py b/l2/l2_log.py
index 4b1208d..b98e6b6 100644
--- a/l2/l2_log.py
+++ b/l2/l2_log.py
@@ -1,25 +1,88 @@
+import threading
+
+import constant
 from log_module import async_log_util
 from log_module.log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_trade_record, logger_l2_trade, \
     logger_l2_s_cancel, logger_l2_h_cancel, logger_l2_l_cancel, logger_l2_error, logger_l2_d_cancel, logger_l2_f_cancel, \
     logger_l2_g_cancel
+
+
+# 鏃ュ織闃熷垪鍒嗛厤绠$悊鍣�
+# 涓烘瘡涓�涓唬鐮佸垎閰嶄笓闂ㄧ殑绠$悊鍣�
+class CodeLogQueueDistributeManager:
+    def __init__(self, l2_channel_count):
+        self.async_log_managers = [async_log_util.AsyncLogManager() for i in range(l2_channel_count)]
+        # 瀛樻斁浠g爜鍒嗛厤鐨勬棩蹇楃储寮� 濡傦細{"000333":2}
+        self.distributed_log_dict = {}
+
+    def __get_avaiable_log_manager(self):
+        distributed_indexes = set([self.distributed_log_dict[x] for x in self.distributed_log_dict])
+        all_indexes = set([i for i in range(0, len(self.async_log_managers))])
+        available_indexes = all_indexes - distributed_indexes
+        if not available_indexes:
+            raise Exception("宸插垎閰嶅畬")
+        return available_indexes.pop()
+
+    def distribute_log_manager(self, code):
+        if code in self.distributed_log_dict:
+            return
+        log_manager_index = self.__get_avaiable_log_manager()
+        if log_manager_index is not None:
+            self.distributed_log_dict[code] = log_manager_index
+
+    def realase_log_manager(self, code):
+        if code in self.distributed_log_dict:
+            self.distributed_log_dict.pop(code)
+
+    # 璁剧疆L2璁㈤槄浠g爜
+    def set_l2_subscript_codes(self, codes):
+        codes = set(codes)
+        now_codes = set([code for code in self.distributed_log_dict])
+        del_codes = now_codes - codes
+        add_codes = codes - now_codes
+        for c in del_codes:
+            self.realase_log_manager(c)
+        for c in add_codes:
+            self.distribute_log_manager(c)
+
+    def get_log_manager(self, code):
+        if code in self.distributed_log_dict:
+            return self.async_log_managers[self.distributed_log_dict[code]]
+        return None
+
+    # 杩愯鍚屾鏈嶅姟
+    def run_async(self):
+        for m in self.async_log_managers:
+            threading.Thread(target=m.run_sync, daemon=True).start()
+
+
+codeLogQueueDistributeManager = CodeLogQueueDistributeManager(constant.HUAXIN_L2_MAX_CODES_COUNT)
 
 threadIds = {}
 
 
 def __add_async_log(logger_, code, content, *args):
     try:
+        full_content = ""
         if len(args) > 0:
-            async_log_util.debug(logger_,
-                                 ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
+            full_content = ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args)
         else:
-            async_log_util.debug(logger_,
-                                 "thread-id={} code={}  ".format(threadIds.get(code), code) + content)
+            full_content = "thread-id={} code={}  ".format(threadIds.get(code), code) + content
+        async_log_manager = codeLogQueueDistributeManager.get_log_manager(code)
+        if async_log_manager:
+            async_log_manager.debug(logger_, full_content)
+        else:
+            async_log_util.debug(logger_, full_content)
     except Exception as e:
         logger_l2_error.exception(e)
 
 
 def debug(code, content, *args):
     __add_async_log(logger_l2_trade, code, content, *args)
+
+
+def info(code, logger_, content, *args):
+    __add_async_log(logger_, code, content, *args)
 
 
 def buy_debug(code, content, *args):
@@ -45,11 +108,14 @@
 def d_cancel_debug(code, content, *args):
     __add_async_log(logger_l2_d_cancel, code, content, *args)
 
+
 def f_cancel_debug(code, content, *args):
     __add_async_log(logger_l2_f_cancel, code, content, *args)
 
+
 def g_cancel_debug(code, content, *args):
     __add_async_log(logger_l2_g_cancel, code, content, *args)
+
 
 # 浜ゆ槗璁板綍
 def trade_record(code, type, content, *args):
@@ -61,3 +127,13 @@
         async_log_util.debug(logger_trade_record,
                              "thread-id={} code={} type={} data=".format(threadIds.get(code), code,
                                                                          type) + content)
+
+
+if __name__ == "__main__":
+    codes = []
+    for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
+        codes.append(f"{i}" * 6)
+    codeLogQueueDistributeManager.set_l2_subscript_codes(codes)
+    codes[0] = "123456"
+    codeLogQueueDistributeManager.set_l2_subscript_codes(codes)
+    codeLogQueueDistributeManager.get_log_manager("000333")

--
Gitblit v1.8.0