| | |
| | | import json |
| | | import multiprocessing |
| | | import threading |
| | | import time |
| | | |
| | | import zmq |
| | | |
| | | |
| | |
| | | socket.connect(f"ipc://order_{i}.ipc") |
| | | socket_client_dict[i] = socket |
| | | print(f"开始发送消息({i}):") |
| | | msg = "test" |
| | | for i in range(1000): |
| | | msg += "test123123" |
| | | datas = [] |
| | | max_count = 500 |
| | | if i % 3 == 0: |
| | | max_count = 100 |
| | | for j in range(max_count): |
| | | datas.append(('002510', 4.07, 148700, '1', '2', 95433140, 2012, 9207862, 9207862, 'A', 1708998873.1507974, 0)) |
| | | |
| | | socket_client_dict[i].send_json({'msg': msg, "time": time.time()}) |
| | | socket_client_dict[i].send_json({'data': datas, "time": time.time()}) |
| | | response = socket_client_dict[i].recv_string() |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | if __name__ == "__main__1": |
| | | |
| | | serverProcess = multiprocessing.Process( |
| | | target=create_server) |
| | |
| | | threading.Thread(target=lambda: send_data(context, i), daemon=True).start() |
| | | time.sleep(5) |
| | | input() |
| | | |
| | | |
| | | # L2上传数据协议管理器 |
| | | class L2DataUploadProtocolManager: |
| | | |
| | | # ipchosts IPC协议 |
| | | def __init__(self, ipchosts): |
| | | self.ipchosts = ipchosts |
| | | # 所有的client |
| | | self.socket_client_dict = {} |
| | | # 保存代码分配的client 格式:{code:(host, socket)} |
| | | self.code_socket_client_dict = {} |
| | | self.rlock = threading.RLock() |
| | | context = zmq.Context() |
| | | for host in self.ipchosts: |
| | | socket = context.socket(zmq.REQ) |
| | | socket.connect(host) |
| | | self.socket_client_dict[host] = socket |
| | | |
| | | # 获取 |
| | | def __get_available_ipchost(self): |
| | | if len(self.code_socket_client_dict) >= len(self.socket_client_dict): |
| | | raise Exception("无可用host") |
| | | used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict]) |
| | | for host in self.socket_client_dict: |
| | | if host not in used_hosts: |
| | | return host, self.socket_client_dict[host] |
| | | raise Exception("无可用host") |
| | | |
| | | # 分配HOST |
| | | def distribute_upload_host(self, code): |
| | | self.rlock.acquire() |
| | | try: |
| | | host_info = self.__get_available_ipchost() |
| | | if host_info: |
| | | self.code_socket_client_dict[code] = host_info |
| | | finally: |
| | | self.rlock.release() |
| | | |
| | | def release_distributed_upload_host(self, code): |
| | | self.rlock.acquire() |
| | | try: |
| | | if code in self.code_socket_client_dict: |
| | | self.code_socket_client_dict.pop(code) |
| | | finally: |
| | | self.rlock.release() |
| | | |
| | | def upload_data_as_json(self, code, data): |
| | | if code not in self.code_socket_client_dict: |
| | | raise Exception("尚未分配host") |
| | | host, socket = self.code_socket_client_dict[code] |
| | | socket.send_json(data) |
| | | socket.recv_string() |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | ipclist = [] |
| | | for i in range(0, 70): |
| | | ipclist.append(f"ipc://l2order{i}.ipc") |
| | | manager = L2DataUploadProtocolManager(ipclist) |
| | | code = "000333" |
| | | # for i in range(0, 70): |
| | | # manager.distribute_upload_host(f"{i}" * 6) |
| | | # manager.distribute_upload_host(code) |
| | | manager.upload_data_as_json(code, {"test": "test"}) |