From d163fc446359d66afa10e2ab63e860887aa8732c Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 19 八月 2025 01:33:11 +0800 Subject: [PATCH] 连续涨停时间记录/新增大单概览接口 --- test/test_pyzmq.py | 90 +++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 82 insertions(+), 8 deletions(-) diff --git a/test/test_pyzmq.py b/test/test_pyzmq.py index 17d9a12..f136d8b 100644 --- a/test/test_pyzmq.py +++ b/test/test_pyzmq.py @@ -1,7 +1,6 @@ import multiprocessing import threading import time - import zmq @@ -11,14 +10,17 @@ print("缁戝畾鎴愬姛", index) while True: msg = socket.recv_json() - print(f"Got Msg({index}): {msg}") + use_time = (time.time() - msg['time']) * 1000 + print(f"Got Msg({index}-{time.time()}): 鑰楁椂锛歿use_time}") socket.send_string("SUCCESS") def create_server(): context = zmq.Context() - for i in range(5): + for i in range(70): threading.Thread(target=lambda: __create_server(context, i), daemon=True).start() + while True: + time.sleep(5) socket_client_dict = {} @@ -27,15 +29,21 @@ def send_data(context, i): if i not in socket_client_dict: socket = context.socket(zmq.REQ) - result = socket.connect(f"ipc://order_{i}.ipc") - print("杩炴帴缁撴灉锛�", result) + socket.connect(f"ipc://order_{i}.ipc") socket_client_dict[i] = socket + print(f"寮�濮嬪彂閫佹秷鎭�({i})锛�") + datas = [] + max_count = 500 + if i % 3 == 0: + max_count = 100 + for j in range(max_count): + datas.append(('002510', 4.07, 148700, '1', '2', 95433140, 2012, 9207862, 9207862, 'A', 1708998873.1507974, 0)) - socket_client_dict[i].send_json({'msg': i}) + socket_client_dict[i].send_json({'data': datas, "time": time.time()}) response = socket_client_dict[i].recv_string() -if __name__ == "__main__": +if __name__ == "__main__1": serverProcess = multiprocessing.Process( target=create_server) @@ -44,7 +52,73 @@ time.sleep(5) context = zmq.Context() while True: - for i in range(5): + print("=======鍑嗗鍙戦�佹秷鎭�========") + for i in range(70): threading.Thread(target=lambda: send_data(context, i), daemon=True).start() time.sleep(5) input() + + +# L2涓婁紶鏁版嵁鍗忚绠$悊鍣� +class L2DataUploadProtocolManager: + + # ipchosts IPC鍗忚 + def __init__(self, ipchosts): + self.ipchosts = ipchosts + # 鎵�鏈夌殑client + self.socket_client_dict = {} + # 淇濆瓨浠g爜鍒嗛厤鐨刢lient 鏍煎紡锛歿code:(host, socket)} + self.code_socket_client_dict = {} + self.rlock = threading.RLock() + context = zmq.Context() + for host in self.ipchosts: + socket = context.socket(zmq.REQ) + socket.connect(host) + self.socket_client_dict[host] = socket + + # 鑾峰彇 + def __get_available_ipchost(self): + if len(self.code_socket_client_dict) >= len(self.socket_client_dict): + raise Exception("鏃犲彲鐢╤ost") + used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict]) + for host in self.socket_client_dict: + if host not in used_hosts: + return host, self.socket_client_dict[host] + raise Exception("鏃犲彲鐢╤ost") + + # 鍒嗛厤HOST + def distribute_upload_host(self, code): + self.rlock.acquire() + try: + host_info = self.__get_available_ipchost() + if host_info: + self.code_socket_client_dict[code] = host_info + finally: + self.rlock.release() + + def release_distributed_upload_host(self, code): + self.rlock.acquire() + try: + if code in self.code_socket_client_dict: + self.code_socket_client_dict.pop(code) + finally: + self.rlock.release() + + def upload_data_as_json(self, code, data): + if code not in self.code_socket_client_dict: + raise Exception("灏氭湭鍒嗛厤host") + host, socket = self.code_socket_client_dict[code] + socket.send_json(data) + socket.recv_string() + + +if __name__ == "__main__": + ipclist = [] + for i in range(0, 70): + ipclist.append(f"ipc://l2order{i}.ipc") + manager = L2DataUploadProtocolManager(ipclist) + code = "000333" + # for i in range(0, 70): + # manager.distribute_upload_host(f"{i}" * 6) + # manager.distribute_upload_host(code) + manager.upload_data_as_json(code, {"test": "test"}) -- Gitblit v1.8.0