import logging
|
import threading
|
|
import constant
|
from log_module import async_log_util
|
from log_module.log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_trade_record, logger_l2_trade, \
|
logger_l2_s_cancel, logger_l2_h_cancel, logger_l2_l_cancel, logger_l2_error, logger_l2_d_cancel, logger_l2_f_cancel, \
|
logger_l2_g_cancel, logger_l2_j_cancel, logger_debug
|
|
|
# 日志队列分配管理器
|
# 为每一个代码分配专门的管理器
|
class CodeLogQueueDistributeManager:
|
def __init__(self, l2_channel_count):
|
self.async_log_managers = [async_log_util.AsyncLogManager() for i in range(l2_channel_count)]
|
# 存放代码分配的日志索引 如:{"000333":2}
|
self.distributed_log_dict = {}
|
|
def __get_avaiable_log_manager(self):
|
distributed_indexes = set([self.distributed_log_dict[x] for x in self.distributed_log_dict])
|
all_indexes = set([i for i in range(0, len(self.async_log_managers))])
|
available_indexes = all_indexes - distributed_indexes
|
if not available_indexes:
|
raise Exception("已分配完")
|
return available_indexes.pop()
|
|
def distribute_log_manager(self, code):
|
if code in self.distributed_log_dict:
|
return
|
log_manager_index = self.__get_avaiable_log_manager()
|
if log_manager_index is not None:
|
self.distributed_log_dict[code] = log_manager_index
|
|
def realase_log_manager(self, code):
|
if code in self.distributed_log_dict:
|
self.distributed_log_dict.pop(code)
|
|
# 设置L2订阅代码
|
def set_l2_subscript_codes(self, codes):
|
codes = set(codes)
|
old_codes = set([code for code in self.distributed_log_dict])
|
del_codes = old_codes - codes
|
add_codes = codes - old_codes
|
for c in del_codes:
|
self.realase_log_manager(c)
|
for c in add_codes:
|
try:
|
self.distribute_log_manager(c)
|
except Exception as e:
|
pass
|
# logging.exception(e)
|
# logger_debug.error(f"L2日志分配出错:新代码总数{len(codes)}")
|
|
def get_log_manager(self, code):
|
if code in self.distributed_log_dict:
|
return self.async_log_managers[self.distributed_log_dict[code]]
|
return None
|
|
# 运行同步服务
|
def run_async(self):
|
for m in self.async_log_managers:
|
threading.Thread(target=lambda: m.run_sync(True), daemon=True).start()
|
|
|
codeLogQueueDistributeManager = CodeLogQueueDistributeManager(constant.HUAXIN_L2_MAX_CODES_COUNT)
|
|
threadIds = {}
|
|
|
def __add_async_log(logger_, code, content, *args):
|
try:
|
full_content = ""
|
if len(args) > 0:
|
full_content = ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args)
|
else:
|
full_content = "thread-id={} code={} ".format(threadIds.get(code), code) + content
|
async_log_manager = codeLogQueueDistributeManager.get_log_manager(code)
|
if async_log_manager:
|
async_log_manager.debug(logger_, full_content)
|
else:
|
async_log_util.debug(logger_, full_content)
|
except Exception as e:
|
logger_l2_error.exception(e)
|
|
|
def debug(code, content, *args):
|
__add_async_log(logger_l2_trade, code, content, *args)
|
|
|
def info(code, logger_, content, *args):
|
__add_async_log(logger_, code, content, *args)
|
|
|
def buy_debug(code, content, *args):
|
__add_async_log(logger_l2_trade_buy, code, content, *args)
|
|
|
def cancel_debug(code, content, *args):
|
__add_async_log(logger_l2_trade_cancel, code, content, *args)
|
|
|
def s_cancel_debug(code, content, *args):
|
__add_async_log(logger_l2_s_cancel, code, content, *args)
|
|
|
def h_cancel_debug(code, content, *args):
|
__add_async_log(logger_l2_h_cancel, code, content, *args)
|
|
|
def l_cancel_debug(code, content, *args):
|
__add_async_log(logger_l2_l_cancel, code, content, *args)
|
|
|
def d_cancel_debug(code, content, *args):
|
__add_async_log(logger_l2_d_cancel, code, content, *args)
|
|
|
def f_cancel_debug(code, content, *args):
|
__add_async_log(logger_l2_f_cancel, code, content, *args)
|
|
|
def g_cancel_debug(code, content, *args):
|
__add_async_log(logger_l2_g_cancel, code, content, *args)
|
|
|
def j_cancel_debug(code, content, *args):
|
__add_async_log(logger_l2_j_cancel, code, content, *args)
|
|
|
# 交易记录
|
def trade_record(code, type, content, *args):
|
if len(args) > 0:
|
async_log_util.debug(logger_trade_record,
|
("thread-id={} code={} type={} data=".format(threadIds.get(code), code,
|
type) + content).format(*args))
|
else:
|
async_log_util.debug(logger_trade_record,
|
"thread-id={} code={} type={} data=".format(threadIds.get(code), code,
|
type) + content)
|
|
|
if __name__ == "__main__":
|
codes = []
|
for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
|
codes.append(f"{i}" * 6)
|
codeLogQueueDistributeManager.set_l2_subscript_codes(codes)
|
codes[0] = "123456"
|
codeLogQueueDistributeManager.set_l2_subscript_codes(codes)
|
codeLogQueueDistributeManager.get_log_manager("000333")
|