"""
|
L2数据监听
|
"""
|
import multiprocessing
|
import threading
|
import time
|
|
import zmq
|
|
from log_module import async_log_util
|
from log_module.log import logger_debug
|
|
__l2_order_active_time_dict = {}
|
__l2_transaction_active_time_dict = {}
|
|
|
class L2DataListenManager:
|
TYPE_ORDER = "order"
|
TYPE_TRANSACTION = "transaction"
|
TYPE_MARKET = "market"
|
|
def __init__(self, l2_data_callback):
|
self.my_l2_data_callback = l2_data_callback
|
self.__l2_order_active_time_dict = {}
|
self.__l2_transaction_active_time_dict = {}
|
self.__l2_market_active_time_dict = {}
|
self.zmq_context = zmq.Context()
|
|
# 接收L2逐笔委托数据
|
def __recive_l2_orders(self, q: multiprocessing.Queue):
|
__id = id(q)
|
count = 0
|
while True:
|
# datas_dict = {}
|
try:
|
if not q.empty():
|
item = q.get()
|
self.my_l2_data_callback.OnL2Order(item[0], item[1], item[2])
|
else:
|
time.sleep(0.001)
|
|
# while not q.empty():
|
# item = q.get()
|
# if item[0] not in datas_dict:
|
# datas_dict[item[0]] = []
|
# datas_dict[item[0]].append(item)
|
# if datas_dict:
|
# for c in datas_dict:
|
# self.my_l2_data_callback.OnL2Order(c, datas_dict[c], datas_dict[c][0][10])
|
# else:
|
# time.sleep(0.002)
|
except Exception as e:
|
async_log_util.exception(logger_debug, e)
|
finally:
|
# datas_dict.clear()
|
count += 1
|
if count > 100:
|
count = 0
|
# 记录活跃时间,每100次记录一次
|
self.__l2_order_active_time_dict[__id] = time.time()
|
|
# 接收L2逐笔成交数据
|
def __recive_transaction_orders(self, q: multiprocessing.Queue):
|
__id = id(q)
|
# datas_dict = {}
|
count = 0
|
while True:
|
try:
|
# while not q.empty():
|
# item = q.get()
|
# if item[0] not in datas_dict:
|
# datas_dict[item[0]] = []
|
# datas_dict[item[0]].append(item)
|
# if datas_dict:
|
# for c in datas_dict:
|
# self.my_l2_data_callback.OnL2Transaction(c, datas_dict[c])
|
# else:
|
# time.sleep(0.01)
|
if not q.empty():
|
item = q.get()
|
self.my_l2_data_callback.OnL2Transaction(item[0], item[1])
|
else:
|
time.sleep(0.005)
|
except Exception as e:
|
async_log_util.exception(logger_debug, e)
|
finally:
|
# datas_dict.clear()
|
count += 1
|
if count > 50:
|
count = 0
|
# 记录活跃时间,每100次记录一次
|
self.__l2_transaction_active_time_dict[__id] = time.time()
|
|
def __recive_l2_markets(self, q: multiprocessing.Queue):
|
__id = id(q)
|
while True:
|
try:
|
if not q.empty():
|
item = q.get()
|
self.my_l2_data_callback.OnMarketData(item['securityID'], item)
|
else:
|
time.sleep(0.002)
|
except Exception as e:
|
async_log_util.exception(logger_debug, e)
|
finally:
|
self.__l2_market_active_time_dict[__id] = time.time()
|
|
def __create_ipc_server(self, host):
|
socket = self.zmq_context.socket(zmq.REP)
|
socket.bind(host)
|
count = 0
|
while True:
|
try:
|
data = socket.recv_json()
|
self.my_l2_data_callback.OnL2Order(data[0], data[1], data[2])
|
socket.send_string("SUCCESS")
|
except Exception as e:
|
async_log_util.exception(logger_debug, e)
|
finally:
|
count += 1
|
if count > 100:
|
count = 0
|
# 记录活跃时间,每100次记录一次
|
self.__l2_order_active_time_dict[host] = time.time()
|
|
# 创建订单的IPC服务
|
def __create_ipc_server_hosts(self, order_ipc_hosts):
|
for host in order_ipc_hosts:
|
threading.Thread(target=lambda: self.__create_ipc_server(host), daemon=True).start()
|
|
# 接收L2数据
|
def receive_l2_data(self, order_queues, transaction_queues, market_queue, order_ipc_hosts):
|
# TODO 暂时不通过队列接收数据
|
# for q in order_queues:
|
# t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True)
|
# t1.start()
|
for q in transaction_queues:
|
t2 = threading.Thread(target=lambda: self.__recive_transaction_orders(q), daemon=True)
|
t2.start()
|
t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True)
|
t3.start()
|
# 接收订单hosts
|
self.__create_ipc_server_hosts(order_ipc_hosts)
|
|
def get_active_count(self, type_):
|
expire_time = time.time() - 5
|
active_count = 0
|
if type_ == self.TYPE_ORDER:
|
for _id in self.__l2_order_active_time_dict:
|
if self.__l2_order_active_time_dict[_id] > expire_time:
|
active_count += 1
|
elif type_ == self.TYPE_TRANSACTION:
|
for _id in self.__l2_transaction_active_time_dict:
|
if self.__l2_transaction_active_time_dict[_id] > expire_time:
|
active_count += 1
|
elif type_ == self.TYPE_MARKET:
|
for _id in self.__l2_market_active_time_dict:
|
if self.__l2_market_active_time_dict[_id] > expire_time:
|
active_count += 1
|
return active_count
|