Administrator
2024-03-18 59159700fa6300d663140bc44f570ebc90e1998d
L2日志修改
4个文件已修改
149 ■■■■ 已修改文件
huaxin_client/l2_data_manager.py 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_log.py 84 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/async_log_util.py 36 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py
@@ -37,7 +37,8 @@
class L2DataUploadManager:
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue, order_ipc_hosts, data_callback_distribute_manager:CodeDataCallbackDistributeManager):
                 market_data_queue: multiprocessing.Queue, order_ipc_hosts,
                 data_callback_distribute_manager: CodeDataCallbackDistributeManager):
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
@@ -141,7 +142,6 @@
        if not self.data_callback_distribute_manager.get_distributed_callback(code):
            self.data_callback_distribute_manager.distribute_callback(code)
        if code not in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code] = collections.deque()
        if code not in self.temp_transaction_queue_dict:
@@ -204,7 +204,14 @@
                    # 上传数据
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    # self.__upload_l2_order_data(code, temp_list)
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list, time.time())
                    __start_time = time.time()
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list,
                                                                                                   time.time())
                    use_time = time.time() - __start_time
                    if use_time > 0.01:
                        # 记录10ms以上的数据
                        huaxin_l2_log.info(logger_local_huaxin_l2_error, f"耗时:{use_time}s  结束数据:{temp_list[-1]}")
                    temp_list = []
                else:
@@ -235,7 +242,8 @@
                if temp_list:
                    # 上传数据
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code, temp_list)
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code,
                                                                                                         temp_list)
                    temp_list = []
                else:
                    if code not in self.temp_transaction_queue_dict:
l2/l2_log.py
@@ -1,25 +1,88 @@
import threading
import constant
from log_module import async_log_util
from log_module.log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_trade_record, logger_l2_trade, \
    logger_l2_s_cancel, logger_l2_h_cancel, logger_l2_l_cancel, logger_l2_error, logger_l2_d_cancel, logger_l2_f_cancel, \
    logger_l2_g_cancel
# 日志队列分配管理器
# 为每一个代码分配专门的管理器
class CodeLogQueueDistributeManager:
    def __init__(self, l2_channel_count):
        self.async_log_managers = [async_log_util.AsyncLogManager() for i in range(l2_channel_count)]
        # 存放代码分配的日志索引 如:{"000333":2}
        self.distributed_log_dict = {}
    def __get_avaiable_log_manager(self):
        distributed_indexes = set([self.distributed_log_dict[x] for x in self.distributed_log_dict])
        all_indexes = set([i for i in range(0, len(self.async_log_managers))])
        available_indexes = all_indexes - distributed_indexes
        if not available_indexes:
            raise Exception("已分配完")
        return available_indexes.pop()
    def distribute_log_manager(self, code):
        if code in self.distributed_log_dict:
            return
        log_manager_index = self.__get_avaiable_log_manager()
        if log_manager_index is not None:
            self.distributed_log_dict[code] = log_manager_index
    def realase_log_manager(self, code):
        if code in self.distributed_log_dict:
            self.distributed_log_dict.pop(code)
    # 设置L2订阅代码
    def set_l2_subscript_codes(self, codes):
        codes = set(codes)
        now_codes = set([code for code in self.distributed_log_dict])
        del_codes = now_codes - codes
        add_codes = codes - now_codes
        for c in del_codes:
            self.realase_log_manager(c)
        for c in add_codes:
            self.distribute_log_manager(c)
    def get_log_manager(self, code):
        if code in self.distributed_log_dict:
            return self.async_log_managers[self.distributed_log_dict[code]]
        return None
    # 运行同步服务
    def run_async(self):
        for m in self.async_log_managers:
            threading.Thread(target=m.run_sync, daemon=True).start()
codeLogQueueDistributeManager = CodeLogQueueDistributeManager(constant.HUAXIN_L2_MAX_CODES_COUNT)
threadIds = {}
def __add_async_log(logger_, code, content, *args):
    try:
        full_content = ""
        if len(args) > 0:
            async_log_util.debug(logger_,
                                 ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
            full_content = ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args)
        else:
            async_log_util.debug(logger_,
                                 "thread-id={} code={}  ".format(threadIds.get(code), code) + content)
            full_content = "thread-id={} code={}  ".format(threadIds.get(code), code) + content
        async_log_manager = codeLogQueueDistributeManager.get_log_manager(code)
        if async_log_manager:
            async_log_manager.debug(logger_, full_content)
        else:
            async_log_util.debug(logger_, full_content)
    except Exception as e:
        logger_l2_error.exception(e)
def debug(code, content, *args):
    __add_async_log(logger_l2_trade, code, content, *args)
def info(code, logger_, content, *args):
    __add_async_log(logger_, code, content, *args)
def buy_debug(code, content, *args):
@@ -45,11 +108,14 @@
def d_cancel_debug(code, content, *args):
    __add_async_log(logger_l2_d_cancel, code, content, *args)
def f_cancel_debug(code, content, *args):
    __add_async_log(logger_l2_f_cancel, code, content, *args)
def g_cancel_debug(code, content, *args):
    __add_async_log(logger_l2_g_cancel, code, content, *args)
# 交易记录
def trade_record(code, type, content, *args):
@@ -61,3 +127,13 @@
        async_log_util.debug(logger_trade_record,
                             "thread-id={} code={} type={} data=".format(threadIds.get(code), code,
                                                                         type) + content)
if __name__ == "__main__":
    codes = []
    for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT):
        codes.append(f"{i}" * 6)
    codeLogQueueDistributeManager.set_l2_subscript_codes(codes)
    codes[0] = "123456"
    codeLogQueueDistributeManager.set_l2_subscript_codes(codes)
    codeLogQueueDistributeManager.get_log_manager("000333")
log_module/async_log_util.py
@@ -50,55 +50,33 @@
huaxin_l2_log = AsyncLogManager()
log_queue = queue.Queue()
def __add_log(logger, time_out_log, method, *args):
    start_time = time.time()
    log_queue.put_nowait((logger, start_time, method, args))
    if time_out_log:
        end_time = time.time()
        sub_time = end_time - start_time
        if sub_time > 0.01:
            # 记录日志保存慢的日志
            __add_log(logger_debug, False, f"保存到日志队列用时:{sub_time}s")
__common_log = AsyncLogManager()
def debug(logger, *args):
    __add_log(logger, True, "debug", *args)
    __common_log.debug(logger, *args)
def info(logger, *args):
    __add_log(logger, True, "info", *args)
    __common_log.info(logger, *args)
def warning(logger, *args):
    __add_log(logger, True, "warning", *args)
    __common_log.warning(logger, *args)
def error(logger, *args):
    __add_log(logger, True, "error", *args)
    __common_log.error(logger, *args)
def exception(logger, *args):
    __add_log(logger, True, "exception", *args)
    __common_log.exception(logger, *args)
# 运行同步日志
def run_sync():
    logger_system.info(f"async_log 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            val = log_queue.get()
            time_s = val[1]
            cmd = val[2]
            method = getattr(val[0], cmd)
            d = list(val[3])
            d[0] = f"[{tool.to_time_str(int(time_s))}.{str(time_s).split('.')[1][:6]}] " + d[0]
            d = tuple(d)
            method(*d)
        except:
            pass
    __common_log.run_sync()
if __name__ == "__main__":
trade/huaxin/huaxin_trade_server.py
@@ -223,6 +223,8 @@
                            huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.save_subscript_codes(datas)
                            # 上传数据
                            codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes()
                            l2_log.codeLogQueueDistributeManager.set_l2_subscript_codes(codes)
                            fresults = []
                            if codes:
                                for code in codes:
@@ -384,8 +386,8 @@
        use_time = int((time.time() - timestamp) * 1000)
        thread_id = random.randint(0, 100000)
        l2_log.threadIds[code] = thread_id
        async_log_util.info(hx_logger_l2_orderdetail,
                            f"{code}#耗时:{use_time}-{thread_id}#{_datas}")
        l2_log.info(code, hx_logger_l2_orderdetail,
                    f"{code}#耗时:{use_time}-{thread_id}#数量:{len(_datas)}#{_datas[-1]}")
        # l2_data_log.l2_time_log(code, "开始处理L2逐笔委托")
        try:
@@ -1241,7 +1243,7 @@
            trade_rules_count = len(TradeRuleManager().list_can_excut_rules_cache())
            fdata = {"code": code, "total": 0, "available": 0, "sell_orders": [], "sell_rules_count": trade_rules_count,
                     "cost_price": 0, "cost_price_rate":0,
                     "cost_price": 0, "cost_price_rate": 0,
                     "code_info": (code, code_name), "desc": "".join(desc_list)}
            if positions:
                for d in positions:
@@ -1683,7 +1685,7 @@
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        order_queues, transaction_queues,
        market_queue, queue_l1_trade_w_strategy_r,order_ipc_hosts):
        market_queue, queue_l1_trade_w_strategy_r, order_ipc_hosts):
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
        # 执行一些初始化数据
@@ -1721,6 +1723,9 @@
        t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True)
        t1.start()
        # 同步L2的异步日志
        l2_log.codeLogQueueDistributeManager.run_async()
        t1 = threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True)
        t1.start()