Administrator
2024-03-13 bb2c58fb407a3783b3704b33df6a154207ae4199
huaxin_client/l2_data_manager.py
@@ -34,7 +34,7 @@
class L2DataUploadManager:
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue):
                 market_data_queue: multiprocessing.Queue, order_ipc_hosts):
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
        self.market_data_queue = market_data_queue
@@ -45,6 +45,7 @@
        self.upload_l2_data_task_dict = {}
        self.l2_order_codes = set()
        self.l2_transaction_codes = set()
        self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts)
    # 设置订单过滤条件
    # special_price:过滤的1手的价格
@@ -133,7 +134,8 @@
            self.temp_transaction_queue_dict[code] = collections.deque()
        if code not in self.temp_log_queue_dict:
            self.temp_log_queue_dict[code] = queue.Queue()
        # 分配订单上传协议
        self.l2_order_upload_protocol.distribute_upload_host(code)
        if code not in self.upload_l2_data_task_dict:
            t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
@@ -148,6 +150,7 @@
    def release_distributed_upload_queue(self, code):
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
        self.l2_order_upload_protocol.release_distributed_upload_host(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
            self.temp_order_queue_dict.pop(code)
@@ -162,6 +165,9 @@
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait(marshal.dumps([code, datas, time.time()]))
    def __upload_l2_order_data(self, code, datas):
        self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time()))
    # 处理订单数据并上传
    def __run_upload_order_task(self, code):
@@ -180,8 +186,8 @@
                if temp_list:
                    # 上传数据
                    self.__upload_l2_data(code, upload_queue, temp_list)
                    # self.__upload_l2_order_data(code, temp_list)
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    self.__upload_l2_order_data(code, temp_list)
                    temp_list = []
                else:
                    if code not in self.temp_order_queue_dict:
@@ -289,7 +295,7 @@
        if code not in self.code_socket_client_dict:
            raise Exception("尚未分配host")
        host, socket = self.code_socket_client_dict[code]
        socket.send_json(data)
        socket.send(marshal.dumps(data))
        socket.recv_string()