From d163fc446359d66afa10e2ab63e860887aa8732c Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期二, 19 八月 2025 01:33:11 +0800
Subject: [PATCH] 连续涨停时间记录/新增大单概览接口

---
 test/test_pyzmq.py |   90 +++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 82 insertions(+), 8 deletions(-)

diff --git a/test/test_pyzmq.py b/test/test_pyzmq.py
index 17d9a12..f136d8b 100644
--- a/test/test_pyzmq.py
+++ b/test/test_pyzmq.py
@@ -1,7 +1,6 @@
 import multiprocessing
 import threading
 import time
-
 import zmq
 
 
@@ -11,14 +10,17 @@
     print("缁戝畾鎴愬姛", index)
     while True:
         msg = socket.recv_json()
-        print(f"Got Msg({index}): {msg}")
+        use_time = (time.time() - msg['time']) * 1000
+        print(f"Got Msg({index}-{time.time()}): 鑰楁椂锛歿use_time}")
         socket.send_string("SUCCESS")
 
 
 def create_server():
     context = zmq.Context()
-    for i in range(5):
+    for i in range(70):
         threading.Thread(target=lambda: __create_server(context, i), daemon=True).start()
+    while True:
+        time.sleep(5)
 
 
 socket_client_dict = {}
@@ -27,15 +29,21 @@
 def send_data(context, i):
     if i not in socket_client_dict:
         socket = context.socket(zmq.REQ)
-        result = socket.connect(f"ipc://order_{i}.ipc")
-        print("杩炴帴缁撴灉锛�", result)
+        socket.connect(f"ipc://order_{i}.ipc")
         socket_client_dict[i] = socket
+    print(f"寮�濮嬪彂閫佹秷鎭�({i})锛�")
+    datas = []
+    max_count = 500
+    if i % 3 == 0:
+        max_count = 100
+    for j in range(max_count):
+        datas.append(('002510', 4.07, 148700, '1', '2', 95433140, 2012, 9207862, 9207862, 'A', 1708998873.1507974, 0))
 
-    socket_client_dict[i].send_json({'msg': i})
+    socket_client_dict[i].send_json({'data': datas, "time": time.time()})
     response = socket_client_dict[i].recv_string()
 
 
-if __name__ == "__main__":
+if __name__ == "__main__1":
 
     serverProcess = multiprocessing.Process(
         target=create_server)
@@ -44,7 +52,73 @@
     time.sleep(5)
     context = zmq.Context()
     while True:
-        for i in range(5):
+        print("=======鍑嗗鍙戦�佹秷鎭�========")
+        for i in range(70):
             threading.Thread(target=lambda: send_data(context, i), daemon=True).start()
         time.sleep(5)
     input()
+
+
+# L2涓婁紶鏁版嵁鍗忚绠$悊鍣�
+class L2DataUploadProtocolManager:
+
+    # ipchosts IPC鍗忚
+    def __init__(self, ipchosts):
+        self.ipchosts = ipchosts
+        # 鎵�鏈夌殑client
+        self.socket_client_dict = {}
+        # 淇濆瓨浠g爜鍒嗛厤鐨刢lient 鏍煎紡锛歿code:(host, socket)}
+        self.code_socket_client_dict = {}
+        self.rlock = threading.RLock()
+        context = zmq.Context()
+        for host in self.ipchosts:
+            socket = context.socket(zmq.REQ)
+            socket.connect(host)
+            self.socket_client_dict[host] = socket
+
+    # 鑾峰彇
+    def __get_available_ipchost(self):
+        if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
+            raise Exception("鏃犲彲鐢╤ost")
+        used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
+        for host in self.socket_client_dict:
+            if host not in used_hosts:
+                return host, self.socket_client_dict[host]
+        raise Exception("鏃犲彲鐢╤ost")
+
+    # 鍒嗛厤HOST
+    def distribute_upload_host(self, code):
+        self.rlock.acquire()
+        try:
+            host_info = self.__get_available_ipchost()
+            if host_info:
+                self.code_socket_client_dict[code] = host_info
+        finally:
+            self.rlock.release()
+
+    def release_distributed_upload_host(self, code):
+        self.rlock.acquire()
+        try:
+            if code in self.code_socket_client_dict:
+                self.code_socket_client_dict.pop(code)
+        finally:
+            self.rlock.release()
+
+    def upload_data_as_json(self, code, data):
+        if code not in self.code_socket_client_dict:
+            raise Exception("灏氭湭鍒嗛厤host")
+        host, socket = self.code_socket_client_dict[code]
+        socket.send_json(data)
+        socket.recv_string()
+
+
+if __name__ == "__main__":
+    ipclist = []
+    for i in range(0, 70):
+        ipclist.append(f"ipc://l2order{i}.ipc")
+    manager = L2DataUploadProtocolManager(ipclist)
+    code = "000333"
+    # for i in range(0, 70):
+    #     manager.distribute_upload_host(f"{i}" * 6)
+    # manager.distribute_upload_host(code)
+    manager.upload_data_as_json(code, {"test": "test"})

--
Gitblit v1.8.0