"""
|
L2数据监听
|
"""
|
import multiprocessing
|
import threading
|
import time
|
|
from log_module import async_log_util
|
from log_module.log import logger_debug
|
|
__l2_order_active_time_dict = {}
|
__l2_transaction_active_time_dict = {}
|
|
|
class L2DataListenManager:
|
TYPE_ORDER = "order"
|
TYPE_TRANSACTION = "transaction"
|
TYPE_MARKET = "market"
|
|
def __init__(self, l2_data_callback):
|
self.my_l2_data_callback = l2_data_callback
|
self.__l2_order_active_time_dict = {}
|
self.__l2_transaction_active_time_dict = {}
|
self.__l2_market_active_time_dict = {}
|
|
# 接收L2逐笔委托数据
|
def __recive_l2_orders(self, q: multiprocessing.Queue):
|
__id = id(q)
|
count = 0
|
while True:
|
datas_dict = {}
|
try:
|
while not q.empty():
|
item = q.get()
|
if item[0] not in datas_dict:
|
datas_dict[item[0]] = []
|
datas_dict[item[0]].append(item)
|
if datas_dict:
|
for c in datas_dict:
|
self.my_l2_data_callback.OnL2Order(c, datas_dict[c], datas_dict[c][0][10])
|
else:
|
time.sleep(0.002)
|
except Exception as e:
|
async_log_util.exception(logger_debug, e)
|
finally:
|
datas_dict.clear()
|
count += 1
|
if count > 100:
|
count = 0
|
# 记录活跃时间,每100次记录一次
|
self.__l2_order_active_time_dict[__id] = time.time()
|
|
# 接收L2逐笔成交数据
|
def __recive_transaction_orders(self, q: multiprocessing.Queue):
|
__id = id(q)
|
datas_dict = {}
|
count = 0
|
while True:
|
try:
|
while not q.empty():
|
item = q.get()
|
if item[0] not in datas_dict:
|
datas_dict[item[0]] = []
|
datas_dict[item[0]].append(item)
|
if datas_dict:
|
for c in datas_dict:
|
self.my_l2_data_callback.OnL2Transaction(c, datas_dict[c])
|
else:
|
time.sleep(0.01)
|
except Exception as e:
|
async_log_util.exception(logger_debug, e)
|
finally:
|
datas_dict.clear()
|
count += 1
|
if count > 50:
|
count = 0
|
# 记录活跃时间,每100次记录一次
|
self.__l2_transaction_active_time_dict[__id] = time.time()
|
|
def __recive_l2_markets(self, q: multiprocessing.Queue):
|
__id = id(q)
|
while True:
|
try:
|
if not q.empty():
|
item = q.get()
|
self.my_l2_data_callback.OnMarketData(item['securityID'], item)
|
else:
|
time.sleep(0.002)
|
except Exception as e:
|
async_log_util.exception(logger_debug, e)
|
finally:
|
self.__l2_market_active_time_dict[__id] = time.time()
|
|
# 接收L2数据
|
def receive_l2_data(self, order_queues, transaction_queues, market_queue):
|
for q in order_queues:
|
t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
|
t1.start()
|
for q in transaction_queues:
|
t2 = threading.Thread(target=lambda: self.__recive_transaction_orders(q), daemon=True)
|
t2.start()
|
t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True)
|
t3.start()
|
|
def get_active_count(self, type_):
|
expire_time = time.time() - 5
|
active_count = 0
|
if type_ == self.TYPE_ORDER:
|
for _id in self.__l2_order_active_time_dict:
|
if self.__l2_order_active_time_dict[_id] > expire_time:
|
active_count += 1
|
elif type_ == self.TYPE_TRANSACTION:
|
for _id in self.__l2_transaction_active_time_dict:
|
if self.__l2_transaction_active_time_dict[_id] > expire_time:
|
active_count += 1
|
elif type_ == self.TYPE_MARKET:
|
for _id in self.__l2_market_active_time_dict:
|
if self.__l2_market_active_time_dict[_id] > expire_time:
|
active_count += 1
|
return active_count
|