Administrator
2023-08-23 9d655c667cb40ba64e3f6fe5400c9e8c139e9988
添加异步日志
6个文件已修改
1个文件已添加
105 ■■■■ 已修改文件
huaxin_client/l1_client.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_log.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/async_log_util.py 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.py 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client.py
@@ -135,7 +135,8 @@
    __latest_subscript_codes.clear()
    for c in codes:
        __latest_subscript_codes.add(c)
    logger_local_huaxin_l1.info(f"新增加订阅的代码:{add_codes}")
    if add_codes:
        logger_local_huaxin_l1.info(f"新增加订阅的代码:{add_codes}")
def run(pipe_l2):
l2/l2_data_manager_new.py
@@ -7,6 +7,7 @@
import constant
from db.redis_manager_delegate import RedisUtils
from l2.huaxin import l2_huaxin_util, huaxin_delegate_postion_manager
from log_module import async_log_util
from third_data import kpl_data_manager, block_info
from trade.deal_big_money_manager import DealComputeProgressManager
from utils import global_util, ths_industry_util, tool
@@ -1210,8 +1211,8 @@
                    buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                    buy_count += int(total_datas[i]["re"])
                    if buy_nums >= threshold_num and buy_count >= threshold_count:
                        logger_l2_trade_buy.info(
                            f"{code}获取到买入执行点:{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count} 目标纯买单数:{threshold_count}, 大单数量:{len(max_buy_num_set)}")
                        async_log_util.info(logger_l2_trade_buy,
                                            f"{code}获取到买入执行点:{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count} 目标纯买单数:{threshold_count}, 大单数量:{len(max_buy_num_set)}")
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                if _val["num"] >= bigger_num:
                    # 只统计59万以上的金额
l2/l2_log.py
@@ -1,23 +1,26 @@
from log_module import async_log_util
from log_module.log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_trade_record, logger_l2_trade
threadIds = {}
def debug(code, content, *args):
    logger_l2_trade.debug(("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
    async_log_util.debug(logger_l2_trade,
                         ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
def buy_debug(code, content, *args):
    logger_l2_trade_buy.debug(
        ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
    async_log_util.debug(logger_l2_trade_buy,
                         ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
def cancel_debug(code, content, *args):
    logger_l2_trade_cancel.debug(
        ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
    async_log_util.debug(logger_l2_trade_cancel,
                         ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
# 交易记录
def trade_record(code, type, content, *args):
    logger_trade_record.debug(
        ("thread-id={} code={} type={} data=".format(threadIds.get(code), code, type) + content).format(*args))
    async_log_util.debug(logger_trade_record,
                         ("thread-id={} code={} type={} data=".format(threadIds.get(code), code,
                                                                      type) + content).format(*args))
log_module/async_log_util.py
New file
@@ -0,0 +1,50 @@
"""
异步日志管理器
"""
import queue
from log_module.log import logger_debug
log_queue = queue.Queue()
def __add_log(logger, method, *args):
    log_queue.put_nowait((logger, method, args))
def debug(logger, *args):
    __add_log(logger, "debug", *args)
def info(logger, *args):
    __add_log(logger, "info", *args)
def warning(logger, *args):
    __add_log(logger, "warning", *args)
def error(logger, *args):
    __add_log(logger, "error", *args)
def exception(logger, *args):
    __add_log(logger, "exception", *args)
# 运行同步日志
def run_sync():
    while True:
        try:
            val = log_queue.get()
            cmd = val[1]
            method = getattr(val[0], cmd)
            method(*val[2])
        except:
            pass
if __name__ == "__main__":
    logger_debug.warning()
    info(logger_debug, "*-{}", "test")
    run_sync()
test/test.py
@@ -1,22 +1,6 @@
import queue
import time
from huaxin_client import l1_subscript_codes_manager
from log_module import async_log_util
from log_module.log import logger_debug
_dict = {}
_queue = queue.Queue()
def add(index):
    start_time = time.time()
    _queue.put(index)
    end_time = time.time()
    logger_debug.debug(end_time - start_time)
if __name__ == "__main__":
    # l1_subscript_codes_manager.save_codes(["600100", "600102"], ["000123", "000146"])
    code_sh,codes_sz =l1_subscript_codes_manager.request_l1_subscript_target_codes()
    l1_subscript_codes_manager.save_codes(code_sh,codes_sz)
    async_log_util.add_log(logger_debug, "error", "测试错误")
    async_log_util.run_sync()
trade/huaxin/trade_server.py
@@ -28,6 +28,7 @@
from l2.huaxin import huaxin_target_codes_manager
from l2.huaxin.huaxin_target_codes_manager import HuaXinL1TargetCodesManager
from l2.l2_data_manager_new import L2TradeDataProcessor
from log_module import async_log_util
from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue
from third_data import block_info, kpl_api, kpl_data_manager
@@ -707,6 +708,10 @@
    t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True)
    t1.start()
    # 同步异步日志
    t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True)
    t1.start()
    print("create TradeServer")
    t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
    t1.start()
trade/trade_manager.py
@@ -12,6 +12,7 @@
from code_attribute import gpcode_manager
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util
from output import kp_client_msg_manager
from trade import trade_data_manager, l2_trade_util, trade_juejin, trade_huaxin
@@ -435,7 +436,7 @@
        raise ex
    print("开始买入")
    logger_trade.info("{}开始买入".format(code))
    async_log_util.info(logger_trade, "{}开始买入".format(code))
    __CodesTradeStateManager.set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER)
    # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "买入判断时间", force=True)
    __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index)