Administrator
2024-03-17 15199f8e93fe48e6261c99eadf6673d788db3a80
huaxin_client/l2_data_manager.py
@@ -6,12 +6,15 @@
import queue
import threading
import time
import constant
from huaxin_client import socket_util
from huaxin_client.client_network import SendResponseSkManager
# 活动时间
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module import async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \
@@ -34,13 +37,17 @@
class L2DataUploadManager:
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue, order_ipc_hosts):
                 market_data_queue: multiprocessing.Queue, order_ipc_hosts, data_callback_distribute_manager:CodeDataCallbackDistributeManager):
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
        self.market_data_queue = market_data_queue
        self.data_callback_distribute_manager = data_callback_distribute_manager
        # 代码分配的对象
        self.temp_order_queue_dict = {}
        self.temp_transaction_queue_dict = {}
        self.temp_log_queue_dict = {}
        self.filter_order_condition_dict = {}
        self.upload_l2_data_task_dict = {}
        self.l2_order_codes = set()
@@ -119,7 +126,11 @@
    def add_market_data(self, data):
        # 加入上传队列
        self.market_data_queue.put_nowait(data)
        # self.market_data_queue.put_nowait(data)
        code = data['securityID']
        callback = self.data_callback_distribute_manager.get_distributed_callback(code)
        if callback:
            callback.OnMarketData(code, data)
    # 分配上传队列
    def distribute_upload_queue(self, code):
@@ -127,6 +138,9 @@
            self.order_queue_distribute_manager.distribute_queue(code)
        if not self.transaction_queue_distribute_manager.get_distributed_queue(code):
            self.transaction_queue_distribute_manager.distribute_queue(code)
        if not self.data_callback_distribute_manager.get_distributed_callback(code):
            self.data_callback_distribute_manager.distribute_callback(code)
        if code not in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code] = collections.deque()
@@ -135,7 +149,8 @@
        if code not in self.temp_log_queue_dict:
            self.temp_log_queue_dict[code] = queue.Queue()
        # 分配订单上传协议
        self.l2_order_upload_protocol.distribute_upload_host(code)
        if not constant.is_windows():
            self.l2_order_upload_protocol.distribute_upload_host(code)
        if code not in self.upload_l2_data_task_dict:
            t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
@@ -151,6 +166,7 @@
        self.order_queue_distribute_manager.release_distribute_queue(code)
        self.transaction_queue_distribute_manager.release_distribute_queue(code)
        self.l2_order_upload_protocol.release_distributed_upload_host(code)
        self.data_callback_distribute_manager.release_distribute_callback(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
            self.temp_order_queue_dict.pop(code)
@@ -187,8 +203,10 @@
                if temp_list:
                    # 上传数据
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    self.__upload_l2_order_data(code, temp_list)
                    # self.__upload_l2_order_data(code, temp_list)
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list, time.time())
                    temp_list = []
                else:
                    if code not in self.temp_order_queue_dict:
                        self.l2_order_codes.discard(code)
@@ -216,7 +234,8 @@
                        temp_list.append(data)
                if temp_list:
                    # 上传数据
                    self.__upload_l2_data(code, upload_queue, temp_list)
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code, temp_list)
                    temp_list = []
                else:
                    if code not in self.temp_transaction_queue_dict:
@@ -254,6 +273,8 @@
        self.code_socket_client_dict = {}
        self.rlock = threading.RLock()
        context = zmq.Context()
        if constant.is_windows():
            return
        for host in self.ipchosts:
            socket = context.socket(zmq.REQ)
            socket.connect(host)