Administrator
2023-08-31 b217ca6db84c273f0d1c24eed3fae6bec2431dbe
添加系统日志
6个文件已修改
200 ■■■■■ 已修改文件
huaxin_client/l2_client.py 43 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_record_manager.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 61 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -575,27 +575,30 @@
def run(pipe_trade, _pipe_strategy, _l2_data_callback: l2_data_transform_protocol.L2DataCallBack)->None:
    logger_system.info("L2进程ID:{}", os.getpid())
    log.close_print()
    if pipe_trade is not None:
        t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(pipe_trade), daemon=True)
        t1.start()
    if _pipe_strategy is not None:
        global pipe_strategy
        pipe_strategy = _pipe_strategy
        t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(_pipe_strategy,), daemon=True)
        t1.start()
    __init_l2()
    try:
        log.close_print()
        if pipe_trade is not None:
            t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(pipe_trade), daemon=True)
            t1.start()
        if _pipe_strategy is not None:
            global pipe_strategy
            pipe_strategy = _pipe_strategy
            t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(_pipe_strategy,), daemon=True)
            t1.start()
        __init_l2()
    global l2_data_callback
    l2_data_callback = _l2_data_callback
    l2_data_manager.run_upload_common(l2_data_callback)
    l2_data_manager.run_upload_trading_canceled(l2_data_callback)
    l2_data_manager.run_log()
    # l2_data_manager.run_test(l2_data_callback)
    global l2CommandManager
    l2CommandManager = command_manager.L2CommandManager()
    l2CommandManager.init(MyL2ActionCallback())
    logger_system.info("L2订阅服务启动成功")
        global l2_data_callback
        l2_data_callback = _l2_data_callback
        l2_data_manager.run_upload_common(l2_data_callback)
        l2_data_manager.run_upload_trading_canceled(l2_data_callback)
        l2_data_manager.run_log()
        # l2_data_manager.run_test(l2_data_callback)
        global l2CommandManager
        l2CommandManager = command_manager.L2CommandManager()
        l2CommandManager.init(MyL2ActionCallback())
        logger_system.info("L2订阅服务启动成功")
    except Exception as e:
        logger_system.exception(e)
    while True:
        time.sleep(2)
huaxin_client/trade_client.py
@@ -1028,24 +1028,27 @@
def run(trade_response_: TradeResponse, pipe_l2=None, pipe_strategy=None):
    logger_system.info("交易进程ID:{}", os.getpid())
    __init_trade_data_server()
    global l2pipe
    l2pipe = pipe_l2
    try:
        logger_system.info("交易进程ID:{}", os.getpid())
        __init_trade_data_server()
        global l2pipe
        l2pipe = pipe_l2
    global strategy_pipe
    strategy_pipe = pipe_strategy
        global strategy_pipe
        strategy_pipe = pipe_strategy
    global trade_response
    trade_response = trade_response_
        global trade_response
        trade_response = trade_response_
    t1 = threading.Thread(target=lambda: trade_client_server.run(), daemon=True)
    t1.start()
        t1 = threading.Thread(target=lambda: trade_client_server.run(), daemon=True)
        t1.start()
    global tradeCommandManager
    tradeCommandManager = command_manager.TradeCommandManager()
    tradeCommandManager.init(MyTradeActionCallback(), l2pipe, pipe_strategy)
    logger_system.info("华鑫交易服务启动")
        global tradeCommandManager
        tradeCommandManager = command_manager.TradeCommandManager()
        tradeCommandManager.init(MyTradeActionCallback(), l2pipe, pipe_strategy)
        logger_system.info("华鑫交易服务启动")
    except Exception as e:
        logger_system.exception(e)
    # 不需要运行命令解析
    # tradeCommandManager.run()
    while True:
l2/cancel_buy_strategy.py
@@ -1195,7 +1195,9 @@
                        if left_count > 0:
                            buy_nums += left_count * data["val"]["num"]
                            if buy_nums > threshold_num:
                                l2_log.l_cancel_debug(code, f"LX阻断L撤撤单:{buy_nums}/{threshold_num}")
                                return False, "LX阻断L撤撤单"
                l2_log.l_cancel_debug(code, f"LX尚未阻断L撤撤单:{buy_nums}/{threshold_num}")
                return can_cancel, cancel_data
            except Exception as e:
                l2_log.l_cancel_debug(code, f"LX撤单计算异常:{str(e)}")
main.py
@@ -42,11 +42,13 @@
    t1.start()
    # 启动L2订阅服务
    t1 = threading.Thread(target=huaxin_client.l2_client.run, args=(ptl2_l2, psl2_l2, trade_server.my_l2_data_callback), daemon=True)
    t1 = threading.Thread(target=huaxin_client.l2_client.run, args=(ptl2_l2, psl2_l2, trade_server.my_l2_data_callback),
                          daemon=True)
    t1.start()
    # 启动华鑫交易服务
    t1 = threading.Thread(target=huaxin_client.trade_client.run, args=(trade_server.my_trade_response, ptl2_trade, pst_trade),
    t1 = threading.Thread(target=huaxin_client.trade_client.run,
                          args=(trade_server.my_trade_response, ptl2_trade, pst_trade),
                          daemon=True)
    t1.start()
@@ -78,31 +80,35 @@
if __name__ == '__main__':
    logger_l2_trade.info("启动程序")
    logger_system.info("启动程序--------")
    log.close_print()
    # 策略与server间的通信
    pss_server, pss_strategy = multiprocessing.Pipe()
    # 策略与交易间的通信
    pst_trade, pst_strategy = multiprocessing.Pipe()
    # 交易与l2之间的通信
    ptl2_trade, ptl2_l2 = multiprocessing.Pipe()
    # 策略与l2之间的通信
    psl2_strategy, psl2_l2 = multiprocessing.Pipe()
    try:
        logger_l2_trade.info("启动程序")
        logger_system.info("启动程序--------")
        log.close_print()
        # 策略与server间的通信
        pss_server, pss_strategy = multiprocessing.Pipe()
        # 策略与交易间的通信
        pst_trade, pst_strategy = multiprocessing.Pipe()
        # 交易与l2之间的通信
        ptl2_trade, ptl2_l2 = multiprocessing.Pipe()
        # 策略与l2之间的通信
        psl2_strategy, psl2_l2 = multiprocessing.Pipe()
    # l1与策略间的通信
    pl1t_l1, pl1t_strategy = multiprocessing.Pipe()
        # l1与策略间的通信
        pl1t_l1, pl1t_strategy = multiprocessing.Pipe()
    # 托管环境下不创建
    # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
    # serverProcess.start()
    logger_system.info("主进程ID:{}", os.getpid())
        # 托管环境下不创建
        # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
        # serverProcess.start()
        logger_system.info("主进程ID:{}", os.getpid())
    tradeServerProcess = multiprocessing.Process(target=createTradeServer,
                                                 args=(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy,ptl2_l2, psl2_l2,ptl2_trade,pst_trade))
    tradeServerProcess.start()
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,))
        l1Process.start()
    # L1订阅数据
    huaxin_client.l1_client.run(pl1t_l1)
    # 将tradeServer作为主进程
    tradeServerProcess.join()
        # 主进程
        createTradeServer(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, ptl2_trade, pst_trade)
        # 将tradeServer作为主进程
        l1Process.join()
    except Exception as e:
        logger_system.exception(e)
trade/huaxin/huaxin_trade_record_manager.py
@@ -393,5 +393,8 @@
if __name__ == "__main__":
    results = DelegateRecordManager.list_by_day('20230704', '1970-01-01')
    update_time = None
    results, update_time = DelegateRecordManager.list_by_day(
        tool.get_now_date_str("%Y%m%d"), update_time)
    print(results)
trade/huaxin/trade_server.py
@@ -798,38 +798,41 @@
def run(pipe_trade, pipe_l1, pipe_l2, trade_cmd_callback):
    # 执行一些初始化数据
    block_info.init()
    # 启动外部接口监听
    manager = outside_api_command_manager.ApiCommandManager()
    manager.init(middle_api_protocol.SERVER_HOST,
                 middle_api_protocol.SERVER_PORT,
                 OutsideApiCommandCallback())
    manager.run(blocking=False)
    # 启动交易服务
    huaxin_trade_api.run_pipe_trade(pipe_trade, trade_cmd_callback)
    # 监听l1那边传过来的代码
    t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True)
    t1.start()
    # 同步异步日志
    t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True)
    t1.start()
    logger_system.info("create TradeServer")
    t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
    t1.start()
    laddr = "0.0.0.0", 10008
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
    try:
        tcpserver.serve_forever()
        # 执行一些初始化数据
        block_info.init()
        # 启动外部接口监听
        manager = outside_api_command_manager.ApiCommandManager()
        manager.init(middle_api_protocol.SERVER_HOST,
                     middle_api_protocol.SERVER_PORT,
                     OutsideApiCommandCallback())
        manager.run(blocking=False)
        # 启动交易服务
        huaxin_trade_api.run_pipe_trade(pipe_trade, trade_cmd_callback)
        # 监听l1那边传过来的代码
        t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True)
        t1.start()
        # 同步异步日志
        t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True)
        t1.start()
        logger_system.info("create TradeServer")
        t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
        t1.start()
        laddr = "0.0.0.0", 10008
        tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
        try:
            tcpserver.serve_forever()
        except Exception as e:
            logger_system.exception(e)
            logger_system.error(f"端口服务器:{laddr[1]} 启动失败")
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{laddr[1]} 启动失败")
if __name__ == "__main__":