From 365491c1fcf523994035e4bd28d8b5872dd6ec98 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期四, 31 七月 2025 14:47:48 +0800 Subject: [PATCH] 除权采用掘金更新K线 --- l2/l2_log.py | 139 +++++++++++++++++++++++++++++++++++++++++----- 1 files changed, 123 insertions(+), 16 deletions(-) diff --git a/l2/l2_log.py b/l2/l2_log.py index dca9f37..71df443 100644 --- a/l2/l2_log.py +++ b/l2/l2_log.py @@ -1,42 +1,149 @@ +import logging +import threading + +import constant from log_module import async_log_util from log_module.log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_trade_record, logger_l2_trade, \ - logger_l2_s_cancel, logger_l2_h_cancel, logger_l2_l_cancel + logger_l2_s_cancel, logger_l2_h_cancel, logger_l2_l_cancel, logger_l2_error, logger_l2_d_cancel, logger_l2_f_cancel, \ + logger_l2_g_cancel, logger_l2_j_cancel, logger_debug + + +# 鏃ュ織闃熷垪鍒嗛厤绠$悊鍣� +# 涓烘瘡涓�涓唬鐮佸垎閰嶄笓闂ㄧ殑绠$悊鍣� +class CodeLogQueueDistributeManager: + def __init__(self, l2_channel_count): + self.async_log_managers = [async_log_util.AsyncLogManager() for i in range(l2_channel_count)] + # 瀛樻斁浠g爜鍒嗛厤鐨勬棩蹇楃储寮� 濡傦細{"000333":2} + self.distributed_log_dict = {} + + def __get_avaiable_log_manager(self): + distributed_indexes = set([self.distributed_log_dict[x] for x in self.distributed_log_dict]) + all_indexes = set([i for i in range(0, len(self.async_log_managers))]) + available_indexes = all_indexes - distributed_indexes + if not available_indexes: + raise Exception("宸插垎閰嶅畬") + return available_indexes.pop() + + def distribute_log_manager(self, code): + if code in self.distributed_log_dict: + return + log_manager_index = self.__get_avaiable_log_manager() + if log_manager_index is not None: + self.distributed_log_dict[code] = log_manager_index + + def realase_log_manager(self, code): + if code in self.distributed_log_dict: + self.distributed_log_dict.pop(code) + + # 璁剧疆L2璁㈤槄浠g爜 + def set_l2_subscript_codes(self, codes): + codes = set(codes) + old_codes = set([code for code in self.distributed_log_dict]) + del_codes = old_codes - codes + add_codes = codes - old_codes + for c in del_codes: + self.realase_log_manager(c) + for c in add_codes: + try: + self.distribute_log_manager(c) + except Exception as e: + pass + # logging.exception(e) + # logger_debug.error(f"L2鏃ュ織鍒嗛厤鍑洪敊锛氭柊浠g爜鎬绘暟{len(codes)}") + + def get_log_manager(self, code): + if code in self.distributed_log_dict: + return self.async_log_managers[self.distributed_log_dict[code]] + return None + + # 杩愯鍚屾鏈嶅姟 + def run_async(self): + for m in self.async_log_managers: + threading.Thread(target=lambda: m.run_sync(True), daemon=True).start() + + +codeLogQueueDistributeManager = CodeLogQueueDistributeManager(constant.HUAXIN_L2_MAX_CODES_COUNT) threadIds = {} +def __add_async_log(logger_, code, content, *args): + try: + full_content = "" + if len(args) > 0: + full_content = ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args) + else: + full_content = "thread-id={} code={} ".format(threadIds.get(code), code) + content + async_log_manager = codeLogQueueDistributeManager.get_log_manager(code) + if async_log_manager: + async_log_manager.debug(logger_, full_content) + else: + async_log_util.debug(logger_, full_content) + except Exception as e: + logger_l2_error.exception(e) + + def debug(code, content, *args): - async_log_util.debug(logger_l2_trade, - ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args)) + __add_async_log(logger_l2_trade, code, content, *args) + + +def info(code, logger_, content, *args): + __add_async_log(logger_, code, content, *args) def buy_debug(code, content, *args): - async_log_util.debug(logger_l2_trade_buy, - ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args)) + __add_async_log(logger_l2_trade_buy, code, content, *args) def cancel_debug(code, content, *args): - async_log_util.debug(logger_l2_trade_cancel, - ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args)) + __add_async_log(logger_l2_trade_cancel, code, content, *args) def s_cancel_debug(code, content, *args): - async_log_util.debug(logger_l2_s_cancel, - ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args)) + __add_async_log(logger_l2_s_cancel, code, content, *args) def h_cancel_debug(code, content, *args): - async_log_util.debug(logger_l2_h_cancel, - ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args)) + __add_async_log(logger_l2_h_cancel, code, content, *args) def l_cancel_debug(code, content, *args): - async_log_util.debug(logger_l2_l_cancel, - ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args)) + __add_async_log(logger_l2_l_cancel, code, content, *args) + + +def d_cancel_debug(code, content, *args): + __add_async_log(logger_l2_d_cancel, code, content, *args) + + +def f_cancel_debug(code, content, *args): + __add_async_log(logger_l2_f_cancel, code, content, *args) + + +def g_cancel_debug(code, content, *args): + __add_async_log(logger_l2_g_cancel, code, content, *args) + + +def j_cancel_debug(code, content, *args): + __add_async_log(logger_l2_j_cancel, code, content, *args) # 浜ゆ槗璁板綍 def trade_record(code, type, content, *args): - async_log_util.debug(logger_trade_record, - ("thread-id={} code={} type={} data=".format(threadIds.get(code), code, - type) + content).format(*args)) + if len(args) > 0: + async_log_util.debug(logger_trade_record, + ("thread-id={} code={} type={} data=".format(threadIds.get(code), code, + type) + content).format(*args)) + else: + async_log_util.debug(logger_trade_record, + "thread-id={} code={} type={} data=".format(threadIds.get(code), code, + type) + content) + + +if __name__ == "__main__": + codes = [] + for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): + codes.append(f"{i}" * 6) + codeLogQueueDistributeManager.set_l2_subscript_codes(codes) + codes[0] = "123456" + codeLogQueueDistributeManager.set_l2_subscript_codes(codes) + codeLogQueueDistributeManager.get_log_manager("000333") -- Gitblit v1.8.0