""" L2数据监听 """ import marshal import multiprocessing import threading import time import zmq from log_module import async_log_util from log_module.log import logger_debug __l2_order_active_time_dict = {} __l2_transaction_active_time_dict = {} class L2DataListenManager: TYPE_ORDER = "order" TYPE_TRANSACTION = "transaction" TYPE_MARKET = "market" def __init__(self, l2_data_callback): self.my_l2_data_callback = l2_data_callback self.__l2_order_active_time_dict = {} self.__l2_transaction_active_time_dict = {} self.__l2_market_active_time_dict = {} self.zmq_context = zmq.Context() # 接收L2逐笔委托数据 def __recive_l2_orders(self, q: multiprocessing.Queue): __id = id(q) count = 0 while True: # datas_dict = {} try: # if not q.empty(): item = q.get() item = marshal.loads(item) self.my_l2_data_callback.OnL2Order(item[0], item[1], item[2]) # else: # time.sleep(0.001) # while not q.empty(): # item = q.get() # if item[0] not in datas_dict: # datas_dict[item[0]] = [] # datas_dict[item[0]].append(item) # if datas_dict: # for c in datas_dict: # self.my_l2_data_callback.OnL2Order(c, datas_dict[c], datas_dict[c][0][10]) # else: # time.sleep(0.002) except Exception as e: async_log_util.exception(logger_debug, e) finally: # datas_dict.clear() count += 1 if count > 100: count = 0 # 记录活跃时间,每100次记录一次 self.__l2_order_active_time_dict[__id] = time.time() # 接收L2逐笔成交数据 def __recive_transaction_orders(self, q: multiprocessing.Queue): __id = id(q) # datas_dict = {} count = 0 while True: try: # while not q.empty(): # item = q.get() # if item[0] not in datas_dict: # datas_dict[item[0]] = [] # datas_dict[item[0]].append(item) # if datas_dict: # for c in datas_dict: # self.my_l2_data_callback.OnL2Transaction(c, datas_dict[c]) # else: # time.sleep(0.01) if not q.empty(): item = q.get() item = marshal.loads(item) self.my_l2_data_callback.OnL2Transaction(item[0], item[1]) else: time.sleep(0.005) except Exception as e: async_log_util.exception(logger_debug, e) finally: # datas_dict.clear() count += 1 if count > 50: count = 0 # 记录活跃时间,每100次记录一次 self.__l2_transaction_active_time_dict[__id] = time.time() def __recive_l2_markets(self, q: multiprocessing.Queue): __id = id(q) while True: try: if not q.empty(): item = q.get() self.my_l2_data_callback.OnMarketData(item['securityID'], item) else: time.sleep(0.002) except Exception as e: async_log_util.exception(logger_debug, e) finally: self.__l2_market_active_time_dict[__id] = time.time() def __create_ipc_server(self, host): socket = self.zmq_context.socket(zmq.REP) socket.bind(host) count = 0 while True: try: data = socket.recv() data = marshal.loads(data) self.my_l2_data_callback.OnL2Order(data[0], data[1], data[2]) socket.send_string("SUCCESS") except Exception as e: async_log_util.exception(logger_debug, e) finally: count += 1 if count > 100: count = 0 # 记录活跃时间,每100次记录一次 self.__l2_order_active_time_dict[host] = time.time() # 创建订单的IPC服务 def __create_ipc_server_hosts(self, order_ipc_hosts): for host in order_ipc_hosts: threading.Thread(target=lambda: self.__create_ipc_server(host), daemon=True).start() # 接收L2数据 def receive_l2_data(self, order_queues, transaction_queues, market_queue): # TODO 暂时不通过队列接收数据 # for q in order_queues: # t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True) # t1.start() for q in transaction_queues: t2 = threading.Thread(target=lambda: self.__recive_transaction_orders(q), daemon=True) t2.start() t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True) t3.start() def get_active_count(self, type_): expire_time = time.time() - 5 active_count = 0 if type_ == self.TYPE_ORDER: for _id in self.__l2_order_active_time_dict: if self.__l2_order_active_time_dict[_id] > expire_time: active_count += 1 elif type_ == self.TYPE_TRANSACTION: for _id in self.__l2_transaction_active_time_dict: if self.__l2_transaction_active_time_dict[_id] > expire_time: active_count += 1 elif type_ == self.TYPE_MARKET: for _id in self.__l2_market_active_time_dict: if self.__l2_market_active_time_dict[_id] > expire_time: active_count += 1 return active_count