Administrator
2025-03-11 46f51dfb83f6e6a2784676bde64577e5f6f28cf0
l2/subscript/l2_subscript_manager.py
@@ -4,6 +4,15 @@
import math
import multiprocessing
import random
import threading
import msgpack
import zmq
from huaxin_client import l2_data_transform_protocol
from utils import shared_memery_util
process_manager = None
class TargetCodeProcessManager:
@@ -29,7 +38,12 @@
        # 代码所在队列ID
        self.__code_queue_dict = {}
    def add_codes(self, codes: set):
    def set_codes(self, codes: set):
        """
        设置订阅代码
        @param codes:
        @return: 返回队列与对应分配的代码:[(队列对象, {"代码1","代码2"}),...]
        """
        add_codes = codes - self.__code_queue_dict.keys()
        del_codes = self.__code_queue_dict.keys() - codes
        # 删除代码
@@ -44,7 +58,7 @@
        for code in add_codes:
            # 寻找未满的队列
            for queue_id in self.__queue_codes:
                count_per_process = min(self.__max_code_count_per_queue_dict.get(queue_id), math.ceil(len(codes) / len(self.__com_queues)))
                count_per_process = self.__max_code_count_per_queue_dict.get(queue_id)
                if len(self.__queue_codes[queue_id]) >= count_per_process:
                    # 队列已满
                    continue
@@ -52,6 +66,8 @@
                self.__queue_codes[queue_id].add(code)
                self.__code_queue_dict[code] = queue_id
                break
        return [(self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes[queue_id]) for queue_id in
                self.__queue_codes]
    def get_queues_with_codes(self):
        """
@@ -64,6 +80,67 @@
        return results
class L2DataListener:
    """
    L2数据监听
    """
    def __init__(self, channel_list):
        """
        @param channel_list:channel_list:[((共享内存编号,委托共享内存数组, zmq地址),(共享内存编号,成交共享内存数组, zmq地址))]
        """
        self.channel_list = channel_list
        # 设置共享内存编号与共享内存数组映射
        self.shared_memery_num_object_dict = {}
        for channel in self.channel_list:
            self.shared_memery_num_object_dict[channel[0][0]] = channel[0][1]
            self.shared_memery_num_object_dict[channel[1][0]] = channel[1][1]
    def create_data_listener(self, l2_data_callback: l2_data_transform_protocol.L2DataCallBack):
        """
        创建数据监听器
        @param
        @return:
        """
        for channel in self.channel_list:
            channel_delegate = channel[0]
            channel_deal = channel[1]
            threading.Thread(target=self.__create_l2_zmq_server, args=(channel_delegate[2], l2_data_callback,),
                             daemon=True).start()
            threading.Thread(target=self.__create_l2_zmq_server, args=(channel_deal[2], l2_data_callback,),
                             daemon=True).start()
    def __create_l2_zmq_server(self, ipc_addr, l2_data_callback: l2_data_transform_protocol.L2DataCallBack):
        """
        创建L2逐笔委托/成交zmq服务
        @param ipc_addr:
        @return:
        """
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(ipc_addr)
        while True:
            data = socket.recv()
            try:
                #接收数据
                data = msgpack.unpackb(data)
                shared_memery_id = data["data"]["memery_number"]
                datas = shared_memery_util.read_datas(self.shared_memery_num_object_dict.get(shared_memery_id))
                if data["type"] == 1:
                    # 委托
                    code, data_list, timestamp = datas[0], datas[1], datas[2]
                    l2_data_callback.OnL2Order(code, data_list, timestamp)
                elif data["type"] == 2:
                    # 成交
                    code, data_list = datas[0], datas[1]
                    l2_data_callback.OnL2Transaction(code, data_list)
            except Exception as e:
                pass
            finally:
                socket.send_string("SUCCESS")
if __name__ == "__main__":
    queues = [multiprocessing.Queue() for i in range(7)]
    manager = TargetCodeProcessManager(queues, 10)