""" L2数据监听 """ import multiprocessing import threading import time from log_module import async_log_util from log_module.log import logger_debug __l2_order_active_time_dict = {} __l2_transaction_active_time_dict = {} class L2DataListenManager: TYPE_ORDER = "order" TYPE_TRANSACTION = "transaction" TYPE_MARKET = "market" def __init__(self, l2_data_callback): self.my_l2_data_callback = l2_data_callback self.__l2_order_active_time_dict = {} self.__l2_transaction_active_time_dict = {} self.__l2_market_active_time_dict = {} # 接收L2逐笔委托数据 def __recive_l2_orders(self, q: multiprocessing.Queue): __id = id(q) count = 0 while True: # datas_dict = {} try: if not q.empty(): item = q.get() self.my_l2_data_callback.OnL2Order(item[0], item[1], item[2]) else: time.sleep(0.001) # while not q.empty(): # item = q.get() # if item[0] not in datas_dict: # datas_dict[item[0]] = [] # datas_dict[item[0]].append(item) # if datas_dict: # for c in datas_dict: # self.my_l2_data_callback.OnL2Order(c, datas_dict[c], datas_dict[c][0][10]) # else: # time.sleep(0.002) except Exception as e: async_log_util.exception(logger_debug, e) finally: # datas_dict.clear() count += 1 if count > 100: count = 0 # 记录活跃时间,每100次记录一次 self.__l2_order_active_time_dict[__id] = time.time() # 接收L2逐笔成交数据 def __recive_transaction_orders(self, q: multiprocessing.Queue): __id = id(q) # datas_dict = {} count = 0 while True: try: # while not q.empty(): # item = q.get() # if item[0] not in datas_dict: # datas_dict[item[0]] = [] # datas_dict[item[0]].append(item) # if datas_dict: # for c in datas_dict: # self.my_l2_data_callback.OnL2Transaction(c, datas_dict[c]) # else: # time.sleep(0.01) if not q.empty(): item = q.get() self.my_l2_data_callback.OnL2Transaction(item[0], item[1]) else: time.sleep(0.005) except Exception as e: async_log_util.exception(logger_debug, e) finally: # datas_dict.clear() count += 1 if count > 50: count = 0 # 记录活跃时间,每100次记录一次 self.__l2_transaction_active_time_dict[__id] = time.time() def __recive_l2_markets(self, q: multiprocessing.Queue): __id = id(q) while True: try: if not q.empty(): item = q.get() self.my_l2_data_callback.OnMarketData(item['securityID'], item) else: time.sleep(0.002) except Exception as e: async_log_util.exception(logger_debug, e) finally: self.__l2_market_active_time_dict[__id] = time.time() # 接收L2数据 def receive_l2_data(self, order_queues, transaction_queues, market_queue): for q in order_queues: t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True) t1.start() for q in transaction_queues: t2 = threading.Thread(target=lambda: self.__recive_transaction_orders(q), daemon=True) t2.start() t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True) t3.start() def get_active_count(self, type_): expire_time = time.time() - 5 active_count = 0 if type_ == self.TYPE_ORDER: for _id in self.__l2_order_active_time_dict: if self.__l2_order_active_time_dict[_id] > expire_time: active_count += 1 elif type_ == self.TYPE_TRANSACTION: for _id in self.__l2_transaction_active_time_dict: if self.__l2_transaction_active_time_dict[_id] > expire_time: active_count += 1 elif type_ == self.TYPE_MARKET: for _id in self.__l2_market_active_time_dict: if self.__l2_market_active_time_dict[_id] > expire_time: active_count += 1 return active_count