Administrator
2024-02-27 122def357140a4a504710a57fe2bc1a8020aa7b1
l2/l2_data_listen_manager.py
@@ -5,6 +5,8 @@
import threading
import time
import zmq
from log_module import async_log_util
from log_module.log import logger_debug
@@ -22,6 +24,7 @@
        self.__l2_order_active_time_dict = {}
        self.__l2_transaction_active_time_dict = {}
        self.__l2_market_active_time_dict = {}
        self.zmq_context = zmq.Context()
    # 接收L2逐笔委托数据
    def __recive_l2_orders(self, q: multiprocessing.Queue):
@@ -102,16 +105,42 @@
            finally:
                self.__l2_market_active_time_dict[__id] = time.time()
    def __create_ipc_server(self, host):
        socket = self.zmq_context.socket(zmq.REP)
        socket.bind(host)
        count = 0
        while True:
            try:
                data = socket.recv_json()
                self.my_l2_data_callback.OnL2Order(data[0], data[1], data[2])
                socket.send_string("SUCCESS")
            except Exception as e:
                async_log_util.exception(logger_debug, e)
            finally:
                count += 1
                if count > 100:
                    count = 0
                    # 记录活跃时间,每100次记录一次
                    self.__l2_order_active_time_dict[host] = time.time()
    # 创建订单的IPC服务
    def __create_ipc_server_hosts(self, order_ipc_hosts):
        for host in order_ipc_hosts:
            threading.Thread(target=lambda: self.__create_ipc_server(host), daemon=True).start()
    # 接收L2数据
    def receive_l2_data(self, order_queues, transaction_queues, market_queue):
        for q in order_queues:
            t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
            t1.start()
    def receive_l2_data(self, order_queues, transaction_queues, market_queue, order_ipc_hosts):
        # TODO 暂时不通过队列接收数据
        # for q in order_queues:
        #     t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
        #     t1.start()
        for q in transaction_queues:
            t2 = threading.Thread(target=lambda: self.__recive_transaction_orders(q), daemon=True)
            t2.start()
        t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True)
        t3.start()
        # 接收订单hosts
        self.__create_ipc_server_hosts(order_ipc_hosts)
    def get_active_count(self, type_):
        expire_time = time.time() - 5