"""
|
L2订阅管理
|
"""
|
import math
|
import multiprocessing
|
import random
|
import threading
|
import time
|
|
import msgpack
|
import zmq
|
|
from huaxin_client import l2_data_transform_protocol
|
from log_module import async_log_util
|
from log_module.log import logger_debug
|
from utils import shared_memery_util
|
|
process_manager = None
|
|
|
class TargetCodeProcessManager:
|
"""
|
目标代码的进程管理
|
"""
|
|
def __init__(self, com_queues: list, max_code_count_per_queue_list):
|
"""
|
初始化
|
@param com_queues:list<multiprocessing.Queue> 通信队列
|
@param max_code_count_per_queue: 每个队列最大的代码数量
|
"""
|
self.__com_queues = com_queues
|
self.__max_code_count_per_queue_dict = {id(com_queues[i]): max_code_count_per_queue_list[i] for i in
|
range(len(com_queues))}
|
# 队列ID与队列对象的映射
|
self.__com_queue_id_object_dict = {id(q): q for q in com_queues}
|
# 队列ID对应的代码,格式:{"队列ID":{"代码1","代码2"}}
|
self.__queue_codes = {}
|
for q in com_queues:
|
self.__queue_codes[id(q)] = set()
|
# 代码所在队列ID
|
self.__code_queue_dict = {}
|
|
def set_codes(self, codes: set):
|
"""
|
设置订阅代码
|
@param codes:
|
@return: 返回队列与对应分配的代码:[(队列对象, {"代码1","代码2"}),...]
|
"""
|
add_codes = codes - self.__code_queue_dict.keys()
|
del_codes = self.__code_queue_dict.keys() - codes
|
# 删除代码
|
if del_codes:
|
for code in del_codes:
|
if code in self.__code_queue_dict:
|
queue_id = self.__code_queue_dict[code]
|
self.__queue_codes[queue_id].discard(code)
|
self.__code_queue_dict.pop(code)
|
|
# 为新增代码分配队列
|
for code in add_codes:
|
# 寻找未满的队列
|
for queue_id in self.__queue_codes:
|
count_per_process = self.__max_code_count_per_queue_dict.get(queue_id)
|
if len(self.__queue_codes[queue_id]) >= count_per_process:
|
# 队列已满
|
continue
|
# 队列未满,分配代码
|
self.__queue_codes[queue_id].add(code)
|
self.__code_queue_dict[code] = queue_id
|
break
|
# 分配订阅信息
|
logger_debug.info(f"订阅通道分配:{self.__code_queue_dict}")
|
return [(self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes[queue_id]) for queue_id in
|
self.__queue_codes]
|
|
def get_queues_with_codes(self):
|
"""
|
获取队列分配的代码
|
@return: [(队列对象,{代码集合})]
|
"""
|
results = []
|
for queue_id in self.__queue_codes:
|
results.append((self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes.get(queue_id)))
|
return results
|
|
|
class L2DataListener:
|
"""
|
L2数据监听
|
"""
|
|
last_log_time = time.time()
|
|
def __init__(self, channel_list):
|
"""
|
|
@param channel_list:channel_list:[((共享内存编号,委托共享内存数组, zmq地址),(共享内存编号,成交共享内存数组, zmq地址))]
|
"""
|
self.channel_list = channel_list
|
# 设置共享内存编号与共享内存数组映射
|
self.shared_memery_num_object_dict = {}
|
for channel in self.channel_list:
|
self.shared_memery_num_object_dict[channel[0][0]] = channel[0][1]
|
self.shared_memery_num_object_dict[channel[1][0]] = channel[1][1]
|
|
def create_data_listener(self, l2_data_callback: l2_data_transform_protocol.L2DataCallBack):
|
"""
|
创建数据监听器
|
@param
|
@return:
|
"""
|
for channel in self.channel_list:
|
channel_delegate = channel[0]
|
channel_deal = channel[1]
|
threading.Thread(target=self.__create_l2_zmq_server, args=(channel_delegate[2], l2_data_callback,),
|
daemon=True).start()
|
threading.Thread(target=self.__create_l2_zmq_server, args=(channel_deal[2], l2_data_callback,),
|
daemon=True).start()
|
|
def __create_l2_zmq_server(self, ipc_addr, l2_data_callback: l2_data_transform_protocol.L2DataCallBack):
|
"""
|
创建L2逐笔委托/成交zmq服务
|
@param ipc_addr:
|
@return:
|
"""
|
context = zmq.Context()
|
socket = context.socket(zmq.REP)
|
socket.bind(ipc_addr)
|
while True:
|
data = socket.recv()
|
try:
|
# 接收数据
|
data = msgpack.unpackb(data)
|
shared_memery_id = data["data"]["memery_number"]
|
datas = shared_memery_util.read_datas(self.shared_memery_num_object_dict.get(shared_memery_id))
|
|
if time.time() - self.last_log_time > 10:
|
async_log_util.info(logger_debug, f"L2-V2获取到数据:{datas}")
|
self.last_log_time = time.time()
|
if data["type"] == 1:
|
# 委托
|
code, data_list, timestamp = datas[0], datas[1], datas[2]
|
l2_data_callback.OnL2Order(code, data_list, timestamp)
|
elif data["type"] == 2:
|
# 成交
|
code, data_list = datas[0], datas[1]
|
l2_data_callback.OnL2Transaction(code, data_list)
|
except Exception as e:
|
pass
|
finally:
|
socket.send_string("SUCCESS")
|
|
|
if __name__ == "__main__":
|
queues = [multiprocessing.Queue(maxsize=1024) for i in range(7)]
|
manager = TargetCodeProcessManager(queues, 10)
|
counts = [70, 60, 50, 10]
|
for i in range(4):
|
codes = set()
|
for i in range(counts[i]):
|
code = random.randint(1, 1000000)
|
code = str(code).zfill(6)
|
codes.add(code)
|
print(codes)
|
manager.add_codes(codes)
|
results = manager.get_queues_with_codes()
|
fcodes = set()
|
for r in results:
|
fcodes |= r[1]
|
|
if codes - fcodes or fcodes - codes:
|
print("订阅出错")
|
|
print(results)
|