From 59159700fa6300d663140bc44f570ebc90e1998d Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 18 三月 2024 14:36:05 +0800 Subject: [PATCH] L2日志修改 --- l2/l2_log.py | 84 ++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 80 insertions(+), 4 deletions(-) diff --git a/l2/l2_log.py b/l2/l2_log.py index 4b1208d..b98e6b6 100644 --- a/l2/l2_log.py +++ b/l2/l2_log.py @@ -1,25 +1,88 @@ +import threading + +import constant from log_module import async_log_util from log_module.log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_trade_record, logger_l2_trade, \ logger_l2_s_cancel, logger_l2_h_cancel, logger_l2_l_cancel, logger_l2_error, logger_l2_d_cancel, logger_l2_f_cancel, \ logger_l2_g_cancel + + +# 鏃ュ織闃熷垪鍒嗛厤绠$悊鍣� +# 涓烘瘡涓�涓唬鐮佸垎閰嶄笓闂ㄧ殑绠$悊鍣� +class CodeLogQueueDistributeManager: + def __init__(self, l2_channel_count): + self.async_log_managers = [async_log_util.AsyncLogManager() for i in range(l2_channel_count)] + # 瀛樻斁浠g爜鍒嗛厤鐨勬棩蹇楃储寮� 濡傦細{"000333":2} + self.distributed_log_dict = {} + + def __get_avaiable_log_manager(self): + distributed_indexes = set([self.distributed_log_dict[x] for x in self.distributed_log_dict]) + all_indexes = set([i for i in range(0, len(self.async_log_managers))]) + available_indexes = all_indexes - distributed_indexes + if not available_indexes: + raise Exception("宸插垎閰嶅畬") + return available_indexes.pop() + + def distribute_log_manager(self, code): + if code in self.distributed_log_dict: + return + log_manager_index = self.__get_avaiable_log_manager() + if log_manager_index is not None: + self.distributed_log_dict[code] = log_manager_index + + def realase_log_manager(self, code): + if code in self.distributed_log_dict: + self.distributed_log_dict.pop(code) + + # 璁剧疆L2璁㈤槄浠g爜 + def set_l2_subscript_codes(self, codes): + codes = set(codes) + now_codes = set([code for code in self.distributed_log_dict]) + del_codes = now_codes - codes + add_codes = codes - now_codes + for c in del_codes: + self.realase_log_manager(c) + for c in add_codes: + self.distribute_log_manager(c) + + def get_log_manager(self, code): + if code in self.distributed_log_dict: + return self.async_log_managers[self.distributed_log_dict[code]] + return None + + # 杩愯鍚屾鏈嶅姟 + def run_async(self): + for m in self.async_log_managers: + threading.Thread(target=m.run_sync, daemon=True).start() + + +codeLogQueueDistributeManager = CodeLogQueueDistributeManager(constant.HUAXIN_L2_MAX_CODES_COUNT) threadIds = {} def __add_async_log(logger_, code, content, *args): try: + full_content = "" if len(args) > 0: - async_log_util.debug(logger_, - ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args)) + full_content = ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args) else: - async_log_util.debug(logger_, - "thread-id={} code={} ".format(threadIds.get(code), code) + content) + full_content = "thread-id={} code={} ".format(threadIds.get(code), code) + content + async_log_manager = codeLogQueueDistributeManager.get_log_manager(code) + if async_log_manager: + async_log_manager.debug(logger_, full_content) + else: + async_log_util.debug(logger_, full_content) except Exception as e: logger_l2_error.exception(e) def debug(code, content, *args): __add_async_log(logger_l2_trade, code, content, *args) + + +def info(code, logger_, content, *args): + __add_async_log(logger_, code, content, *args) def buy_debug(code, content, *args): @@ -45,11 +108,14 @@ def d_cancel_debug(code, content, *args): __add_async_log(logger_l2_d_cancel, code, content, *args) + def f_cancel_debug(code, content, *args): __add_async_log(logger_l2_f_cancel, code, content, *args) + def g_cancel_debug(code, content, *args): __add_async_log(logger_l2_g_cancel, code, content, *args) + # 浜ゆ槗璁板綍 def trade_record(code, type, content, *args): @@ -61,3 +127,13 @@ async_log_util.debug(logger_trade_record, "thread-id={} code={} type={} data=".format(threadIds.get(code), code, type) + content) + + +if __name__ == "__main__": + codes = [] + for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): + codes.append(f"{i}" * 6) + codeLogQueueDistributeManager.set_l2_subscript_codes(codes) + codes[0] = "123456" + codeLogQueueDistributeManager.set_l2_subscript_codes(codes) + codeLogQueueDistributeManager.get_log_manager("000333") -- Gitblit v1.8.0