Administrator
2023-11-15 dcb8ca6aebca9cef3672c007438f3f459988a921
交易查询分离
7个文件已修改
83 ■■■■ 已修改文件
huaxin_client/command_manager.py 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_threadpool.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py
@@ -74,9 +74,10 @@
        return cls._instance
    @classmethod
    def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read: multiprocessing.Queue):
    def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read_for_trade: multiprocessing.Queue,queue_strategy_trade_read_for_read: multiprocessing.Queue):
        cls.action_callback = trade_action_callback
        cls.queue_strategy_trade_read = queue_strategy_trade_read
        cls.queue_strategy_trade_read = queue_strategy_trade_read_for_trade
        cls.queue_strategy_trade_read_trade_read = queue_strategy_trade_read_for_read
    @classmethod
    def process_command(cls, _type, client_id, result_json, sk=None):
@@ -128,6 +129,26 @@
                        _type = val["type"]
                        if _type != "test":
                            async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}")
                        cls.process_command(_type, None, val)
                except Exception as e:
                    async_log_util.exception(logger_local_huaxin_trade_debug, e)
                    logging.exception(e)
        except Exception as e:
            async_log_util.exception(logger_local_huaxin_trade_debug, e)
    @classmethod
    def run_process_read_command(cls, queue_strategy_trade_read_trade: multiprocessing.Queue):
        if queue_strategy_trade_read_trade is None:
            return
        # 本地命令接收
        try:
            while True:
                try:
                    val = queue_strategy_trade_read_trade.get()
                    if val:
                        _type = val["type"]
                        if _type != "test":
                            async_log_util.info(logger_local_huaxin_contact_debug, f"接受到信息: {val}")
                        cls.process_command_thread_pool.submit(lambda: cls.process_command(_type, None, val))
                except Exception as e:
                    async_log_util.exception(logger_local_huaxin_trade_debug, e)
@@ -138,11 +159,15 @@
    # 维护连接数的稳定
    def run(self, blocking=True):
        if blocking:
            t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
            self.run_process_command(self.queue_strategy_trade_read)
        else:
            # 接受命令
            t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_trade_read), daemon=True)
            t1.start()
            t1 = threading.Thread(target=lambda: self.run_process_read_command(self.queue_strategy_trade_read_trade_read), daemon=True)
            t1.start()
# L2指令管理
huaxin_client/trade_client.py
@@ -885,7 +885,7 @@
class MyTradeActionCallback(command_manager.TradeActionCallback):
    __tradeSimpleApi = TradeSimpleApi()
    trade_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
    trade_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=30)
    def OnTrade(self, client_id, request_id, sk, type_, data):
        if type_ == 1:
@@ -1144,7 +1144,7 @@
def run(trade_response_: TradeResponse = None, queue_other_w_l2_r_: multiprocessing.Queue = None,
        queue_strategy_trade_write_=None,
        queue_strategy_trade_read=None):
        queue_strategy_trade_read=None,queue_strategy_trade_read_for_read=None):
    try:
        logger_system.info("交易进程ID:{}", os.getpid())
        logger_system.info(f"trade 线程ID:{tool.get_thread_id()}")
@@ -1163,7 +1163,7 @@
        global tradeCommandManager
        tradeCommandManager = command_manager.TradeCommandManager()
        tradeCommandManager.init(MyTradeActionCallback(), queue_strategy_trade_read)
        tradeCommandManager.init(MyTradeActionCallback(), queue_strategy_trade_read, queue_strategy_trade_read_for_read)
        logger_system.info("华鑫交易服务启动")
        tradeCommandManager.run()
    except Exception as e:
l2/cancel_buy_strategy.py
@@ -1012,7 +1012,7 @@
                MIN_MONEYS = [300, 200, 100, 50]
                watch_indexes = set()
                for min_money in MIN_MONEYS:
                    for i in range(end_index, re_start_index, -1):
                    for i in range(end_index, re_start_index - 1, -1):
                        try:
                            data = total_datas[i]
                            val = data['val']
main.py
@@ -24,7 +24,8 @@
def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue,
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue, order_queues_, transaction_queues_,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_,
                      market_queue_):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
@@ -46,7 +47,7 @@
    t1.start()
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, order_queues_,
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_, order_queues_,
                            transaction_queues_, market_queue_)
@@ -92,6 +93,7 @@
        # 交易读策略写
        queue_strategy_w_trade_r = multiprocessing.Queue()
        queue_strategy_w_trade_r_for_read = multiprocessing.Queue()
        # 策略读交易写
        queue_strategy_r_trade_w = multiprocessing.Queue()
@@ -107,7 +109,8 @@
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=huaxin_client.trade_client.run,
            args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r,))
            args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r,
                  queue_strategy_w_trade_r_for_read))
        tradeProcess.start()
        # 创建L2通信队列
@@ -127,6 +130,7 @@
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read,
                          order_queues, transaction_queues, market_queue)
        # 将tradeServer作为主进程
test/test_threadpool.py
@@ -66,4 +66,13 @@
if __name__ == '__main__':
    thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
    thread_pool.submit(lambda: test1("123123"))
    input()
    thread_pool.submit(lambda: test1("123123"))
    thread_pool.submit(lambda: test1("123123"))
    thread_pool.submit(lambda: test1("123123"))
    while True:
        running_count = 0
        for future in thread_pool._threads:
            if future.is_alive():
                running_count += 1
        print(running_count)
        time.sleep(1)
trade/huaxin/huaxin_trade_api.py
@@ -118,9 +118,12 @@
# 设置交易通信队列
# 暂时不会使用该方法
def run_pipe_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_):
    global queue_strategy_w_trade_r
def run_pipe_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, queue_strategy_w_trade_r_for_read_):
    global queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read
    queue_strategy_w_trade_r = queue_strategy_w_trade_r_
    queue_strategy_w_trade_r_for_read = queue_strategy_w_trade_r_for_read_
    t1 = threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: __run_save_data(), daemon=True)
@@ -295,7 +298,7 @@
# 网络请求
def __request(_type, data, request_id=None, blocking=False, is_pipe=True, log_enable=True):
def __request(_type, data, request_id=None, blocking=False, is_pipe=True, log_enable=True,is_trade=False):
    if not request_id:
        request_id = __get_request_id(_type)
    try:
@@ -307,7 +310,11 @@
                     "request_id": request_id}
        root_data = socket_util.encryp_client_params_sign(root_data)
        start_time = time.time()
        queue_strategy_w_trade_r.put_nowait(root_data)
        if is_trade:
            queue_strategy_w_trade_r.put_nowait(root_data)
        else:
            queue_strategy_w_trade_r_for_read.put_nowait(root_data)
        use_time = int((time.time() - start_time) * 1000)
        if use_time > 10:
            async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}")
@@ -374,7 +381,7 @@
        order_ref = huaxin_util.create_order_ref()
    if not request_id:
        request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE)
    for i in range(2):
    for i in range(1):
        request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                               {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1,
                                "direction": direction,
@@ -385,7 +392,7 @@
                                "price": price, "shadow_price": shadow_price, "sinfo": sinfo, "blocking": blocking},
                               request_id=request_id,
                               blocking=blocking,
                               is_pipe=is_pipe_channel_normal())
                               is_pipe=is_pipe_channel_normal(),is_trade=True)
    try:
        if blocking:
            return __read_response(request_id, blocking)
@@ -421,7 +428,7 @@
                                "orderRef": orderRef,
                                "orderActionRef": order_action_ref,
                                "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, blocking=blocking,
                               is_pipe=is_pipe_channel_normal())
                               is_pipe=is_pipe_channel_normal(),is_trade=True)
    try:
        return __read_response(request_id, blocking)
    finally:
trade/huaxin/huaxin_trade_server.py
@@ -876,7 +876,7 @@
l2DataListenManager: L2DataListenManager = None
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, order_queues, transaction_queues,
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,queue_strategy_w_trade_r_for_read, order_queues, transaction_queues,
        market_queue):
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
    try:
@@ -896,7 +896,7 @@
        l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue)
        # 启动交易服务
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r)
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,queue_strategy_w_trade_r_for_read)
        # 监听l1那边传过来的代码
        t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True)