Administrator
2024-02-27 122def357140a4a504710a57fe2bc1a8020aa7b1
huaxin_client/l2_data_manager.py
@@ -30,9 +30,10 @@
# L2上传数据管理器
class L2DataUploadManager:
    # order_ipc_hosts:远程host
    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
                 market_data_queue: multiprocessing.Queue):
                 market_data_queue: multiprocessing.Queue, order_ipc_hosts):
        self.order_queue_distribute_manager = order_queue_distribute_manager
        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
        self.market_data_queue = market_data_queue
@@ -42,6 +43,8 @@
        self.upload_l2_data_task_dict = {}
        self.l2_order_codes = set()
        self.l2_transaction_codes = set()
        # 订单
        self.l2_order_upload_protocol = L2DataUploadProtocolManager(order_ipc_hosts)
    # 设置订单过滤条件
    # special_price:过滤的1手的价格
@@ -141,6 +144,8 @@
            self.temp_order_queue_dict[code] = collections.deque()
        if code not in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code] = collections.deque()
        # 分配订单上传协议
        self.l2_order_upload_protocol.distribute_upload_host(code)
        if code not in self.upload_l2_data_task_dict:
            t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
@@ -161,9 +166,13 @@
            self.temp_transaction_queue_dict.pop(code)
        if code in self.upload_l2_data_task_dict:
            self.upload_l2_data_task_dict.pop(code)
        self.l2_order_upload_protocol.release_distributed_upload_host(code)
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait((code, datas, time.time()))
    def __upload_l2_order_data(self, code, datas):
        self.l2_order_upload_protocol.upload_data_as_json(code, (code, datas, time.time()))
    # 处理订单数据并上传
    def __run_upload_order_task(self, code):
@@ -182,7 +191,8 @@
                if temp_list:
                    # 上传数据
                    self.__upload_l2_data(code, upload_queue, temp_list)
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    self.__upload_l2_order_data(code, temp_list)
                    temp_list = []
                else:
                    if code not in self.temp_order_queue_dict:
@@ -225,7 +235,6 @@
                pass
# L2上传数据协议管理器
class L2DataUploadProtocolManager:
    # ipchosts IPC协议
@@ -235,7 +244,7 @@
        self.socket_client_dict = {}
        # 保存代码分配的client 格式:{code:(host, socket)}
        self.code_socket_client_dict = {}
        self.lock = threading.Lock()
        self.rlock = threading.RLock()
        context = zmq.Context()
        for host in self.ipchosts:
            socket = context.socket(zmq.REQ)
@@ -244,7 +253,6 @@
    # 获取
    def __get_available_ipchost(self):
        try:
            if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
                raise Exception("无可用host")
            used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
@@ -252,26 +260,24 @@
                if host not in used_hosts:
                    return host, self.socket_client_dict[host]
            raise Exception("无可用host")
        finally:
            self.lock.release()
    # 分配HOST
    def distribute_upload_host(self, code):
        self.lock.acquire()
        self.rlock.acquire()
        try:
            host_info = self.__get_available_ipchost()
            if host_info:
                self.code_socket_client_dict[code] = host_info
        finally:
            self.lock.release()
            self.rlock.release()
    def release_distributed_upload_host(self, code):
        self.lock.acquire()
        self.rlock.acquire()
        try:
            if code in self.code_socket_client_dict:
                self.code_socket_client_dict.pop(code)
        finally:
            self.lock.release()
            self.rlock.release()
    def upload_data_as_json(self, code, data):
        if code not in self.code_socket_client_dict:
@@ -386,6 +392,7 @@
    pass
def run_test():
    t = threading.Thread(target=lambda: __test(), daemon=True)
    t.start()