Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""
L2订阅管理
"""
import math
import multiprocessing
import random
import threading
import time
 
import msgpack
import zmq
 
from huaxin_client import l2_data_transform_protocol
from log_module import async_log_util
from log_module.log import logger_debug
from utils import shared_memery_util
 
process_manager = None
 
 
class TargetCodeProcessManager:
    """
    目标代码的进程管理
    """
 
    def __init__(self, com_queues: list, max_code_count_per_queue_list):
        """
        初始化
        @param com_queues:list<multiprocessing.Queue> 通信队列
        @param max_code_count_per_queue: 每个队列最大的代码数量
        """
        self.__com_queues = com_queues
        self.__max_code_count_per_queue_dict = {id(com_queues[i]): max_code_count_per_queue_list[i] for i in
                                                range(len(com_queues))}
        # 队列ID与队列对象的映射
        self.__com_queue_id_object_dict = {id(q): q for q in com_queues}
        # 队列ID对应的代码,格式:{"队列ID":{"代码1","代码2"}}
        self.__queue_codes = {}
        for q in com_queues:
            self.__queue_codes[id(q)] = set()
        # 代码所在队列ID
        self.__code_queue_dict = {}
 
    def set_codes(self, codes: set):
        """
        设置订阅代码
        @param codes:
        @return: 返回队列与对应分配的代码:[(队列对象, {"代码1","代码2"}),...]
        """
        add_codes = codes - self.__code_queue_dict.keys()
        del_codes = self.__code_queue_dict.keys() - codes
        # 删除代码
        if del_codes:
            for code in del_codes:
                if code in self.__code_queue_dict:
                    queue_id = self.__code_queue_dict[code]
                    self.__queue_codes[queue_id].discard(code)
                    self.__code_queue_dict.pop(code)
 
        # 为新增代码分配队列
        for code in add_codes:
            # 寻找未满的队列
            for queue_id in self.__queue_codes:
                count_per_process = self.__max_code_count_per_queue_dict.get(queue_id)
                if len(self.__queue_codes[queue_id]) >= count_per_process:
                    # 队列已满
                    continue
                # 队列未满,分配代码
                self.__queue_codes[queue_id].add(code)
                self.__code_queue_dict[code] = queue_id
                break
        # 分配订阅信息
        logger_debug.info(f"订阅通道分配:{self.__code_queue_dict}")
        return [(self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes[queue_id]) for queue_id in
                self.__queue_codes]
 
    def get_queues_with_codes(self):
        """
        获取队列分配的代码
        @return: [(队列对象,{代码集合})]
        """
        results = []
        for queue_id in self.__queue_codes:
            results.append((self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes.get(queue_id)))
        return results
 
 
class L2DataListener:
    """
    L2数据监听
    """
 
    last_log_time = time.time()
 
    def __init__(self, channel_list):
        """
 
        @param channel_list:channel_list:[((共享内存编号,委托共享内存数组, zmq地址),(共享内存编号,成交共享内存数组, zmq地址))]
        """
        self.channel_list = channel_list
        # 设置共享内存编号与共享内存数组映射
        self.shared_memery_num_object_dict = {}
        for channel in self.channel_list:
            self.shared_memery_num_object_dict[channel[0][0]] = channel[0][1]
            self.shared_memery_num_object_dict[channel[1][0]] = channel[1][1]
 
    def create_data_listener(self, l2_data_callback: l2_data_transform_protocol.L2DataCallBack):
        """
        创建数据监听器
        @param
        @return:
        """
        for channel in self.channel_list:
            channel_delegate = channel[0]
            channel_deal = channel[1]
            threading.Thread(target=self.__create_l2_zmq_server, args=(channel_delegate[2], l2_data_callback,),
                             daemon=True).start()
            threading.Thread(target=self.__create_l2_zmq_server, args=(channel_deal[2], l2_data_callback,),
                             daemon=True).start()
 
    def __create_l2_zmq_server(self, ipc_addr, l2_data_callback: l2_data_transform_protocol.L2DataCallBack):
        """
        创建L2逐笔委托/成交zmq服务
        @param ipc_addr:
        @return:
        """
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(ipc_addr)
        while True:
            data = socket.recv()
            try:
                # 接收数据
                data = msgpack.unpackb(data)
                shared_memery_id = data["data"]["memery_number"]
                datas = shared_memery_util.read_datas(self.shared_memery_num_object_dict.get(shared_memery_id))
 
                if time.time() - self.last_log_time > 10:
                    async_log_util.info(logger_debug, f"L2-V2获取到数据:{datas}")
                    self.last_log_time = time.time()
                if data["type"] == 1:
                    # 委托
                    code, data_list, timestamp = datas[0], datas[1], datas[2]
                    l2_data_callback.OnL2Order(code, data_list, timestamp)
                elif data["type"] == 2:
                    # 成交
                    code, data_list = datas[0], datas[1]
                    l2_data_callback.OnL2Transaction(code, data_list)
            except Exception as e:
                pass
            finally:
                socket.send_string("SUCCESS")
 
 
if __name__ == "__main__":
    queues = [multiprocessing.Queue(maxsize=1024) for i in range(7)]
    manager = TargetCodeProcessManager(queues, 10)
    counts = [70, 60, 50, 10]
    for i in range(4):
        codes = set()
        for i in range(counts[i]):
            code = random.randint(1, 1000000)
            code = str(code).zfill(6)
            codes.add(code)
        print(codes)
        manager.add_codes(codes)
        results = manager.get_queues_with_codes()
        fcodes = set()
        for r in results:
            fcodes |= r[1]
 
        if codes - fcodes or fcodes - codes:
            print("订阅出错")
 
        print(results)