Administrator
3 天以前 5f034f7a6733b03e0d08d7920ec6de1b1517c421
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""
代码队列管理
"""
import copy
 
import zmq
 
from log_module.log import logger_local_huaxin_l2_error
 
 
class CodeQueueDistributeManager:
    # queue_list
    def __init__(self, queue_list: list):
        flist = []
        for i in range(0, len(queue_list)):
            flist.append((i, queue_list[i]))
        self.queue_list = flist
        self.distibuted_code_queue_dict = {}
 
    # 获取可用的队列
    def get_available_queue(self):
        distibuted_queue_indexes = set()
        for code in self.distibuted_code_queue_dict:
            distibuted_queue_indexes.add(self.distibuted_code_queue_dict[code][0])
        for q_info in self.queue_list:
            if q_info[0] not in distibuted_queue_indexes:
                return q_info
        return None
 
    # 为代码分配队列
    def distribute_queue(self, code):
        if code in self.distibuted_code_queue_dict:
            return self.distibuted_code_queue_dict.get(code)
        q_info = self.get_available_queue()
        if not q_info:
            raise Exception("无可用的队列")
        self.distibuted_code_queue_dict[code] = q_info
        return q_info
 
    # 获取代码分配的队列
    def get_distributed_queue(self, code):
        return self.distibuted_code_queue_dict.get(code)
 
    def release_distribute_queue(self, code):
        if code in self.distibuted_code_queue_dict:
            self.distibuted_code_queue_dict.pop(code)
 
    # 获取空闲的位置数量
    def get_free_queue_count(self):
        return len(self.queue_list) - len(self.distibuted_code_queue_dict.keys())
 
 
# 回调对象分配
class CodeDataCallbackDistributeManager:
    # queue_list
    def __init__(self, callback_list: list):
        flist = []
        for callback in callback_list:
            flist.append((id(callback), callback))
        self.callback_list = flist
        self.distibuted_code_callback_dict = {}
 
    # 获取可用的队列
    def get_available_callback(self):
        # 已经分配的回调ID
        distibuted_callbacks_ids = set()
        for code in self.distibuted_code_callback_dict:
            distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0])
        for callback in self.callback_list:
            if callback[0] not in distibuted_callbacks_ids:
                return callback
        return None
 
    # 为代码分配队列
    def distribute_callback(self, code, target_codes=None):
        if code in self.distibuted_code_callback_dict:
            return self.distibuted_code_callback_dict.get(code)
        callback_info = self.get_available_callback()
        if not callback_info:
            distibuted_callbacks_ids = set()
            need_release_codes = set()
            for code in self.distibuted_code_callback_dict:
                distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0])
                # 如果代码没在目标代码中就移除
                if target_codes and code not in target_codes:
                    need_release_codes.add(code)
            for c in need_release_codes:
                self.release_distribute_callback(c)
            logger_local_huaxin_l2_error.info(f"已经分配的代码:{self.distibuted_code_callback_dict.keys()}")
            logger_local_huaxin_l2_error.info(f"已经分配的callbackid:{distibuted_callbacks_ids}")
            # 删除已经没在目标代码中的分配
            raise Exception("无可用的回调对象")
        self.distibuted_code_callback_dict[code] = callback_info
        return callback_info
 
    # 获取代码分配的队列
    def get_distributed_callback(self, code):
        if code in self.distibuted_code_callback_dict:
            return self.distibuted_code_callback_dict.get(code)[1]
        else:
            return None
 
    def release_distribute_callback(self, code):
        if code in self.distibuted_code_callback_dict:
            self.distibuted_code_callback_dict.pop(code)
 
    # 获取空闲的位置数量
    def get_free_queue_count(self):
        return len(self.callback_list) - len(self.distibuted_code_callback_dict.keys())
 
    def get_distributed_codes(self):
        """
        获取已经分配的代码
        @return:
        """
        codes = self.distibuted_code_callback_dict.keys()
        return copy.deepcopy(codes)
 
 
# 数据通道分配
class CodeDataChannelDistributeManager:
 
    def __init__(self, channel_list: list):
        """
        # [(逐笔委托共享内存, 逐笔成交共享内存, 通知地址)]
        @param channel_list:[((编号,multiprocessing.Array, zmq_address),(编号, multiprocessing.Array, zmq_address))]
        """
        flist = []
        for channel_data in channel_list:
            flist.append((id(channel_data), channel_data))
        self.channel_list = flist
        self.distibuted_code_channel_dict = {}
        self.zmq_host_socket_dict = {}
        self.zmq_context = zmq.Context()
        for channel in channel_list:
            socket = self.zmq_context.socket(zmq.REQ)
            host = channel[0][2]
            socket.connect(host)
            self.zmq_host_socket_dict[host] = socket
            socket = self.zmq_context.socket(zmq.REQ)
            host = channel[1][2]
            socket.connect(host)
            self.zmq_host_socket_dict[host] = socket
 
    def get_zmq_socket(self, host):
        return self.zmq_host_socket_dict.get(host)
 
    # 获取可用的队列
    def get_available_channel(self):
        # 已经分配的回调ID
        distibuted_channels_ids = set()
        for code in self.distibuted_code_channel_dict:
            distibuted_channels_ids.add(self.distibuted_code_channel_dict[code][0])
        for channel in self.channel_list:
            if channel[0] not in distibuted_channels_ids:
                return channel
        return None
 
    # 为代码分配队列
    def distribute_channel(self, code, target_codes=None):
        if code in self.distibuted_code_channel_dict:
            return self.distibuted_code_channel_dict.get(code)
        callback_info = self.get_available_channel()
        if not callback_info:
            distibuted_callbacks_ids = set()
            need_release_codes = set()
            for code in self.distibuted_code_channel_dict:
                distibuted_callbacks_ids.add(self.distibuted_code_channel_dict[code][0])
                # 如果代码没在目标代码中就移除
                if target_codes and code not in target_codes:
                    need_release_codes.add(code)
            for c in need_release_codes:
                self.release_distribute_channel(c)
            logger_local_huaxin_l2_error.info(f"已经分配的代码:{self.distibuted_code_channel_dict.keys()}")
            logger_local_huaxin_l2_error.info(f"已经分配的callbackid:{distibuted_callbacks_ids}")
            # 删除已经没在目标代码中的分配
            raise Exception("无可用的回调对象")
        self.distibuted_code_channel_dict[code] = callback_info
        return callback_info
 
    # 获取代码分配的队列
    def get_distributed_channel(self, code):
        if code in self.distibuted_code_channel_dict:
            return self.distibuted_code_channel_dict.get(code)[1]
        else:
            return None
 
    def release_distribute_channel(self, code):
        if code in self.distibuted_code_channel_dict:
            self.distibuted_code_channel_dict.pop(code)
 
    # 获取空闲的位置数量
    def get_free_channel_count(self):
        return len(self.channel_list) - len(self.distibuted_code_channel_dict.keys())
 
    def get_distributed_codes(self):
        """
        获取已经分配的代码
        @return:
        """
        codes = self.distibuted_code_channel_dict.keys()
        return copy.deepcopy(codes)