import logging import threading import constant from log_module import async_log_util from log_module.log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_trade_record, logger_l2_trade, \ logger_l2_s_cancel, logger_l2_h_cancel, logger_l2_l_cancel, logger_l2_error, logger_l2_d_cancel, logger_l2_f_cancel, \ logger_l2_g_cancel, logger_l2_j_cancel, logger_debug # 日志队列分配管理器 # 为每一个代码分配专门的管理器 class CodeLogQueueDistributeManager: def __init__(self, l2_channel_count): self.async_log_managers = [async_log_util.AsyncLogManager() for i in range(l2_channel_count)] # 存放代码分配的日志索引 如:{"000333":2} self.distributed_log_dict = {} def __get_avaiable_log_manager(self): distributed_indexes = set([self.distributed_log_dict[x] for x in self.distributed_log_dict]) all_indexes = set([i for i in range(0, len(self.async_log_managers))]) available_indexes = all_indexes - distributed_indexes if not available_indexes: raise Exception("已分配完") return available_indexes.pop() def distribute_log_manager(self, code): if code in self.distributed_log_dict: return log_manager_index = self.__get_avaiable_log_manager() if log_manager_index is not None: self.distributed_log_dict[code] = log_manager_index def realase_log_manager(self, code): if code in self.distributed_log_dict: self.distributed_log_dict.pop(code) # 设置L2订阅代码 def set_l2_subscript_codes(self, codes): codes = set(codes) old_codes = set([code for code in self.distributed_log_dict]) del_codes = old_codes - codes add_codes = codes - old_codes for c in del_codes: self.realase_log_manager(c) for c in add_codes: try: self.distribute_log_manager(c) except Exception as e: pass # logging.exception(e) # logger_debug.error(f"L2日志分配出错:新代码总数{len(codes)}") def get_log_manager(self, code): if code in self.distributed_log_dict: return self.async_log_managers[self.distributed_log_dict[code]] return None # 运行同步服务 def run_async(self): for m in self.async_log_managers: threading.Thread(target=lambda: m.run_sync(True), daemon=True).start() codeLogQueueDistributeManager = CodeLogQueueDistributeManager(constant.HUAXIN_L2_MAX_CODES_COUNT) threadIds = {} def __add_async_log(logger_, code, content, *args): try: full_content = "" if len(args) > 0: full_content = ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args) else: full_content = "thread-id={} code={} ".format(threadIds.get(code), code) + content async_log_manager = codeLogQueueDistributeManager.get_log_manager(code) if async_log_manager: async_log_manager.debug(logger_, full_content) else: async_log_util.debug(logger_, full_content) except Exception as e: logger_l2_error.exception(e) def debug(code, content, *args): __add_async_log(logger_l2_trade, code, content, *args) def info(code, logger_, content, *args): __add_async_log(logger_, code, content, *args) def buy_debug(code, content, *args): __add_async_log(logger_l2_trade_buy, code, content, *args) def cancel_debug(code, content, *args): __add_async_log(logger_l2_trade_cancel, code, content, *args) def s_cancel_debug(code, content, *args): __add_async_log(logger_l2_s_cancel, code, content, *args) def h_cancel_debug(code, content, *args): __add_async_log(logger_l2_h_cancel, code, content, *args) def l_cancel_debug(code, content, *args): __add_async_log(logger_l2_l_cancel, code, content, *args) def d_cancel_debug(code, content, *args): __add_async_log(logger_l2_d_cancel, code, content, *args) def f_cancel_debug(code, content, *args): __add_async_log(logger_l2_f_cancel, code, content, *args) def g_cancel_debug(code, content, *args): __add_async_log(logger_l2_g_cancel, code, content, *args) def j_cancel_debug(code, content, *args): __add_async_log(logger_l2_j_cancel, code, content, *args) # 交易记录 def trade_record(code, type, content, *args): if len(args) > 0: async_log_util.debug(logger_trade_record, ("thread-id={} code={} type={} data=".format(threadIds.get(code), code, type) + content).format(*args)) else: async_log_util.debug(logger_trade_record, "thread-id={} code={} type={} data=".format(threadIds.get(code), code, type) + content) if __name__ == "__main__": codes = [] for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): codes.append(f"{i}" * 6) codeLogQueueDistributeManager.set_l2_subscript_codes(codes) codes[0] = "123456" codeLogQueueDistributeManager.set_l2_subscript_codes(codes) codeLogQueueDistributeManager.get_log_manager("000333")