From 91d1a35eac17c9fbaea387191587cd1d860a7931 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 15 八月 2025 18:42:56 +0800
Subject: [PATCH] 接口修改

---
 huaxin_client/l2_data_manager.py |  240 +++++++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 182 insertions(+), 58 deletions(-)

diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py
index 1ab2a47..f47415a 100644
--- a/huaxin_client/l2_data_manager.py
+++ b/huaxin_client/l2_data_manager.py
@@ -1,21 +1,25 @@
 # -*- coding: utf-8 -*-
 import json
 import logging
-import multiprocessing
+import marshal
 import queue
 import threading
 import time
+
+import constant
 from huaxin_client import socket_util
 
 from huaxin_client.client_network import SendResponseSkManager
 
 # 娲诲姩鏃堕棿
-from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
+from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager
 from log_module import async_log_util
 from log_module.async_log_util import huaxin_l2_log
-from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript
+from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \
+    logger_local_huaxin_l2_special_volume, logger_debug, logger_local_huaxin_l2_orderdetail
 from utils import tool
 import collections
+import zmq
 
 order_detail_upload_active_time_dict = {}
 transaction_upload_active_time_dict = {}
@@ -24,38 +28,37 @@
 tmep_transaction_queue_dict = {}
 target_codes = set()
 target_codes_add_time = {}
-common_queue = queue.Queue()
+common_queue = queue.Queue(maxsize=1000)
 
 
 # L2涓婁紶鏁版嵁绠$悊鍣�
 class L2DataUploadManager:
-    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
-                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
-                 market_data_queue: multiprocessing.Queue):
-        self.order_queue_distribute_manager = order_queue_distribute_manager
-        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
-        self.market_data_queue = market_data_queue
+    def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager):
+        self.data_callback_distribute_manager = data_callback_distribute_manager
+        # 浠g爜鍒嗛厤鐨勫璞�
         self.temp_order_queue_dict = {}
         self.temp_transaction_queue_dict = {}
+        self.temp_log_queue_dict = {}
+
         self.filter_order_condition_dict = {}
         self.upload_l2_data_task_dict = {}
         self.l2_order_codes = set()
         self.l2_transaction_codes = set()
 
     # 璁剧疆璁㈠崟杩囨护鏉′欢
-    def set_order_fileter_condition(self, code, min_volume, limit_up_price, special_volumes=None,
-                                    special_volumes_expire_time=None):
-        if special_volumes is None:
+    # special_price:杩囨护鐨�1鎵嬬殑浠锋牸
+    def set_order_fileter_condition(self, code, min_volume, limit_up_price, shadow_price, buy_volume, special_volumes):
+        if not special_volumes:
             special_volumes = set()
-        if code in self.filter_order_condition_dict and not special_volumes and not special_volumes_expire_time:
-            self.filter_order_condition_dict[code][0] = (min_volume, limit_up_price)
-            huaxin_l2_log.info(logger_local_huaxin_l2_subscript,
-                               f"({code})甯歌杩囨护鏉′欢璁剧疆锛歿self.filter_order_condition_dict[code]}")
-        else:
-            self.filter_order_condition_dict[code] = [(min_volume, limit_up_price), special_volumes,
-                                                      special_volumes_expire_time]
-            huaxin_l2_log.info(logger_local_huaxin_l2_subscript,
-                               f"({code})涓嬪崟鍚庤繃婊ゆ潯浠惰缃細{self.filter_order_condition_dict[code]}")
+        # if code not in self.filter_order_condition_dict:
+        try:
+            # (鏈�灏忕殑閲�, 娑ㄥ仠浠锋牸, 褰卞瓙鍗曚环鏍�, 涔扮殑閲�, 搴熷純浣跨敤, 鐗规畩鐨勯噺闆嗗悎)
+            self.filter_order_condition_dict[code] = [(min_volume, limit_up_price, shadow_price, buy_volume,
+                                                       int(min_volume) // 50, set(special_volumes))]
+            # huaxin_l2_log.info(logger_local_huaxin_l2_subscript,
+            #                    f"({code})甯歌杩囨护鏉′欢璁剧疆锛歿self.filter_order_condition_dict[code]}")
+        except Exception as e:
+            logger_debug.error(f"{str(e)} - min_volume-{min_volume}")
 
     # 杩囨护璁㈠崟
     def __filter_order(self, item):
@@ -64,13 +67,23 @@
             # item[2]涓洪噺
             if item[2] >= filter_condition[0][0]:
                 return item
-            if filter_condition[1] and item[2] in filter_condition[1]:
-                if filter_condition[2] and time.time() > filter_condition[2]:
-                    # 瓒呮椂浜嗭紝闇�瑕佹竻闄ょ壒娈婇噺鏁版嵁
-                    filter_condition[1] = set()
-                    filter_condition[2] = None
-                    return None
+            # 1鎵嬬殑涔板崟婊¤冻浠锋牸
+            # if item[2] == 100 and abs(filter_condition[0][2] - item[1]) < 0.001:
+            #     return item
+            # 涔伴噺
+            if item[2] == filter_condition[0][3]:
                 return item
+
+            # 鎵�鏈夌殑娑ㄥ仠鍗�
+            if item[3] != '1':
+                # 鍗栦笌鍗栨挙
+                if abs(item[1] - filter_condition[0][1]) < 0.001:
+                    # 娑ㄥ仠浠�
+                    return item
+            else:
+                if item[2] in filter_condition[0][5]:
+                    # 鐗规畩鎵嬫暟
+                    return item
             return None
         return item
         # 杩囨护璁㈠崟
@@ -94,10 +107,16 @@
         # queue_info[1].put_nowait(
         #     (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
         #      data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
+        # if data['Volume'] == 100:
+        #     log_queue = self.temp_log_queue_dict.get(code)
+        #     if log_queue:
+        #         log_queue.put_nowait(data)
 
         q: collections.deque = self.temp_order_queue_dict.get(code)
-        q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
-                  data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
+        if q is not None:
+            q.append(
+                (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
+                 data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
 
     # 娣诲姞閫愮瑪鎴愪氦
     def add_transaction_detail(self, data):
@@ -112,54 +131,68 @@
         #                           data['SellNo'], data['ExecType']))
 
         q: collections.deque = self.temp_transaction_queue_dict.get(code)
-        q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
-                  data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
-                  data['SellNo'], data['ExecType']))
+        if q is not None:
+            q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
+                      data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
+                      data['SellNo'], data['ExecType'], time.time()))
 
     def add_market_data(self, data):
         # 鍔犲叆涓婁紶闃熷垪
-        self.market_data_queue.put_nowait(data)
+        # self.market_data_queue.put_nowait(data)
+        code = data['securityID']
+        callback = self.data_callback_distribute_manager.get_distributed_callback(code)
+        if callback:
+            callback.OnMarketData(code, data)
 
     # 鍒嗛厤涓婁紶闃熷垪
-    def distribute_upload_queue(self, code):
-        if not self.order_queue_distribute_manager.get_distributed_queue(code):
-            self.order_queue_distribute_manager.distribute_queue(code)
-        if not self.transaction_queue_distribute_manager.get_distributed_queue(code):
-            self.transaction_queue_distribute_manager.distribute_queue(code)
+    def distribute_upload_queue(self, code, _target_codes=None):
+        """
+        鍒嗛厤涓婁紶闃熷垪
+        @param code: 浠g爜
+        @param _target_codes: 鎵�鏈夌殑鐩爣浠g爜
+        @return:
+        """
+        if not self.data_callback_distribute_manager.get_distributed_callback(code):
+            self.data_callback_distribute_manager.distribute_callback(code, _target_codes)
+
         if code not in self.temp_order_queue_dict:
             self.temp_order_queue_dict[code] = collections.deque()
         if code not in self.temp_transaction_queue_dict:
             self.temp_transaction_queue_dict[code] = collections.deque()
-
+        if code not in self.temp_log_queue_dict:
+            self.temp_log_queue_dict[code] = queue.Queue(maxsize=1000)
         if code not in self.upload_l2_data_task_dict:
             t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
             t1.start()
             t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True)
             t2.start()
+            # t3 = threading.Thread(target=lambda: self.__run_log_task(code), daemon=True)
+            # t3.start()
             self.upload_l2_data_task_dict[code] = (t1, t2)
         # 閲婃斁宸茬粡鍒嗛厤鐨勯槦鍒�
 
     def release_distributed_upload_queue(self, code):
-        self.order_queue_distribute_manager.release_distribute_queue(code)
-        self.transaction_queue_distribute_manager.release_distribute_queue(code)
+        self.data_callback_distribute_manager.release_distribute_callback(code)
         if code in self.temp_order_queue_dict:
             self.temp_order_queue_dict[code].clear()
             self.temp_order_queue_dict.pop(code)
         if code in self.temp_transaction_queue_dict:
             self.temp_transaction_queue_dict[code].clear()
             self.temp_transaction_queue_dict.pop(code)
+        if code in self.temp_log_queue_dict:
+            self.temp_log_queue_dict.pop(code)
+
         if code in self.upload_l2_data_task_dict:
             self.upload_l2_data_task_dict.pop(code)
 
     def __upload_l2_data(self, code, _queue, datas):
-        _queue.put_nowait((code, datas, time.time()))
+        _queue.put_nowait(marshal.dumps([code, datas, time.time()]))
 
     # 澶勭悊璁㈠崟鏁版嵁骞朵笂浼�
     def __run_upload_order_task(self, code):
         q: collections.deque = self.temp_order_queue_dict.get(code)
         temp_list = []
-        queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
-        upload_queue = queue_info[1]
+        filter_condition = self.filter_order_condition_dict.get(code)
         while True:
             try:
                 while len(q) > 0:
@@ -171,8 +204,22 @@
 
                 if temp_list:
                     # 涓婁紶鏁版嵁
-                    self.__upload_l2_data(code, upload_queue, temp_list)
-                    temp_list = []
+                    # self.__upload_l2_data(code, upload_queue, temp_list)
+                    # self.__upload_l2_order_data(code, temp_list)
+                    __start_time = time.time()
+                    last_data = temp_list[-1]
+                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list,
+                                                                                                   time.time())
+                    use_time = time.time() - __start_time
+                    if use_time > 0.01:
+                        # 璁板綍10ms浠ヤ笂鐨勬暟鎹�
+                        huaxin_l2_log.info(logger_local_huaxin_l2_error, f"鑰楁椂:{use_time}s  缁撴潫鏁版嵁锛歿last_data}")
+
+                    # 璁板綍鎵�鏈夌殑璁㈠崟鍙�
+                    if filter_condition:
+                        huaxin_l2_log.info(logger_local_huaxin_l2_orderdetail,
+                                           f"{[(x[0], x[1], x[2], x[4], x[8]) for x in temp_list if x[2] >= filter_condition[0][0]]}")
+                    temp_list.clear()
                 else:
                     if code not in self.temp_order_queue_dict:
                         self.l2_order_codes.discard(code)
@@ -183,13 +230,11 @@
             except Exception as e:
                 logging.exception(e)
             finally:
-                pass
+                temp_list.clear()
 
     # 澶勭悊鎴愪氦鏁版嵁骞朵笂浼�
     def __run_upload_transaction_task(self, code):
         q: collections.deque = self.temp_transaction_queue_dict.get(code)
-        queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
-        upload_queue = queue_info[1]
         temp_list = []
         while True:
             try:
@@ -200,18 +245,91 @@
                         temp_list.append(data)
                 if temp_list:
                     # 涓婁紶鏁版嵁
-                    self.__upload_l2_data(code, upload_queue, temp_list)
+                    # self.__upload_l2_data(code, upload_queue, temp_list)
+                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code,
+                                                                                                         temp_list)
                     temp_list = []
                 else:
                     if code not in self.temp_transaction_queue_dict:
                         self.l2_transaction_codes.discard(code)
                         break
                     self.l2_transaction_codes.add(code)
-                    time.sleep(0.002)
+                    time.sleep(0.001)
             except:
                 pass
             finally:
-                pass
+                temp_list.clear()
+
+    def __run_log_task(self, code):
+        q: queue.Queue = self.temp_log_queue_dict.get(code)
+        while True:
+            try:
+                temp = q.get(timeout=10)
+                huaxin_l2_log.info(logger_local_huaxin_l2_special_volume,
+                                   f"{temp}")
+            except:
+                time.sleep(0.02)
+            finally:
+                if code not in self.temp_log_queue_dict:
+                    break
+
+
+class L2DataUploadProtocolManager:
+
+    # ipchosts IPC鍗忚
+    def __init__(self, ipchosts):
+        self.ipchosts = ipchosts
+        # 鎵�鏈夌殑client
+        self.socket_client_dict = {}
+        # 淇濆瓨浠g爜鍒嗛厤鐨刢lient 鏍煎紡锛歿code:(host, socket)}
+        self.code_socket_client_dict = {}
+        self.rlock = threading.RLock()
+        context = zmq.Context()
+        if constant.is_windows():
+            return
+        for host in self.ipchosts:
+            socket = context.socket(zmq.REQ)
+            socket.connect(host)
+            self.socket_client_dict[host] = socket
+
+    # 鑾峰彇
+    def __get_available_ipchost(self):
+        if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
+            raise Exception("鏃犲彲鐢╤ost")
+        used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
+        for host in self.socket_client_dict:
+            if host not in used_hosts:
+                return host, self.socket_client_dict[host]
+        raise Exception("鏃犲彲鐢╤ost")
+
+    # 鍒嗛厤HOST
+    def distribute_upload_host(self, code):
+        if code in self.code_socket_client_dict:
+            return
+        self.rlock.acquire()
+        try:
+            host_info = self.__get_available_ipchost()
+            if host_info:
+                self.code_socket_client_dict[code] = host_info
+        finally:
+            self.rlock.release()
+
+    def release_distributed_upload_host(self, code):
+        if code not in self.code_socket_client_dict:
+            return
+        self.rlock.acquire()
+        try:
+            if code in self.code_socket_client_dict:
+                self.code_socket_client_dict.pop(code)
+        finally:
+            self.rlock.release()
+
+    def upload_data_as_json(self, code, data):
+        if code not in self.code_socket_client_dict:
+            raise Exception("灏氭湭鍒嗛厤host")
+        host, socket = self.code_socket_client_dict[code]
+        socket.send(marshal.dumps(data))
+        socket.recv_string()
 
 
 def add_target_code(code):
@@ -227,7 +345,7 @@
 
 
 def add_subscript_codes(codes):
-    print("add_subscript_codes", codes)
+    # print("add_subscript_codes", codes)
     # 鍔犲叆涓婁紶闃熷垪
     common_queue.put(('', "l2_subscript_codes", list(codes)))
 
@@ -251,7 +369,7 @@
             return True
         else:
             # 鍐嶆鍙戦��
-            print("鍐嶆鍙戦��")
+            # print("鍐嶆鍙戦��")
             return __send_response(sk, msg)
     except ConnectionResetError as e:
         SendResponseSkManager.del_send_response_sk(type)
@@ -282,7 +400,7 @@
 
 
 def __run_upload_common():
-    print("__run_upload_common")
+    # print("__run_upload_common")
     logger_system.info(f"l2_client __run_upload_common 绾跨▼ID:{tool.get_thread_id()}")
     while True:
         try:
@@ -298,7 +416,7 @@
 
 
 def __run_log():
-    print("__run_log")
+    # print("__run_log")
     logger_system.info(f"l2_client __run_log 绾跨▼ID:{tool.get_thread_id()}")
     async_log_util.huaxin_l2_log.run_sync()
 
@@ -325,4 +443,10 @@
 
 
 def test():
-    pass
+    ipclist = []
+    for i in range(0, 70):
+        ipclist.append(f"ipc://l2order{i}.ipc")
+    manager = L2DataUploadProtocolManager(ipclist)
+    code = "000333"
+    manager.distribute_upload_host(code)
+    manager.upload_data_as_json(code, {"test": "test"})

--
Gitblit v1.8.0