From 15199f8e93fe48e6261c99eadf6673d788db3a80 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期日, 17 三月 2024 22:56:10 +0800
Subject: [PATCH] L2进程与策略进程合并

---
 huaxin_client/l2_data_manager.py |   33 +++++++++++++++++++++++++++------
 1 files changed, 27 insertions(+), 6 deletions(-)

diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py
index ec5e72f..793d5e0 100644
--- a/huaxin_client/l2_data_manager.py
+++ b/huaxin_client/l2_data_manager.py
@@ -6,12 +6,15 @@
 import queue
 import threading
 import time
+
+import constant
 from huaxin_client import socket_util
 
 from huaxin_client.client_network import SendResponseSkManager
 
 # 娲诲姩鏃堕棿
-from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
+from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager
+from huaxin_client.l2_data_transform_protocol import L2DataCallBack
 from log_module import async_log_util
 from log_module.async_log_util import huaxin_l2_log
 from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \
@@ -34,13 +37,17 @@
 class L2DataUploadManager:
     def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
                  transaction_queue_distribute_manager: CodeQueueDistributeManager,
-                 market_data_queue: multiprocessing.Queue, order_ipc_hosts):
+                 market_data_queue: multiprocessing.Queue, order_ipc_hosts, data_callback_distribute_manager:CodeDataCallbackDistributeManager):
+
         self.order_queue_distribute_manager = order_queue_distribute_manager
         self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
         self.market_data_queue = market_data_queue
+        self.data_callback_distribute_manager = data_callback_distribute_manager
+        # 浠g爜鍒嗛厤鐨勫璞�
         self.temp_order_queue_dict = {}
         self.temp_transaction_queue_dict = {}
         self.temp_log_queue_dict = {}
+
         self.filter_order_condition_dict = {}
         self.upload_l2_data_task_dict = {}
         self.l2_order_codes = set()
@@ -119,7 +126,11 @@
 
     def add_market_data(self, data):
         # 鍔犲叆涓婁紶闃熷垪
-        self.market_data_queue.put_nowait(data)
+        # self.market_data_queue.put_nowait(data)
+        code = data['securityID']
+        callback = self.data_callback_distribute_manager.get_distributed_callback(code)
+        if callback:
+            callback.OnMarketData(code, data)
 
     # 鍒嗛厤涓婁紶闃熷垪
     def distribute_upload_queue(self, code):
@@ -127,6 +138,9 @@
             self.order_queue_distribute_manager.distribute_queue(code)
         if not self.transaction_queue_distribute_manager.get_distributed_queue(code):
             self.transaction_queue_distribute_manager.distribute_queue(code)
+        if not self.data_callback_distribute_manager.get_distributed_callback(code):
+            self.data_callback_distribute_manager.distribute_callback(code)
+
 
         if code not in self.temp_order_queue_dict:
             self.temp_order_queue_dict[code] = collections.deque()
@@ -135,7 +149,8 @@
         if code not in self.temp_log_queue_dict:
             self.temp_log_queue_dict[code] = queue.Queue()
         # 鍒嗛厤璁㈠崟涓婁紶鍗忚
-        self.l2_order_upload_protocol.distribute_upload_host(code)
+        if not constant.is_windows():
+            self.l2_order_upload_protocol.distribute_upload_host(code)
 
         if code not in self.upload_l2_data_task_dict:
             t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
@@ -151,6 +166,7 @@
         self.order_queue_distribute_manager.release_distribute_queue(code)
         self.transaction_queue_distribute_manager.release_distribute_queue(code)
         self.l2_order_upload_protocol.release_distributed_upload_host(code)
+        self.data_callback_distribute_manager.release_distribute_callback(code)
         if code in self.temp_order_queue_dict:
             self.temp_order_queue_dict[code].clear()
             self.temp_order_queue_dict.pop(code)
@@ -187,8 +203,10 @@
                 if temp_list:
                     # 涓婁紶鏁版嵁
                     # self.__upload_l2_data(code, upload_queue, temp_list)
-                    self.__upload_l2_order_data(code, temp_list)
+                    # self.__upload_l2_order_data(code, temp_list)
+                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list, time.time())
                     temp_list = []
+
                 else:
                     if code not in self.temp_order_queue_dict:
                         self.l2_order_codes.discard(code)
@@ -216,7 +234,8 @@
                         temp_list.append(data)
                 if temp_list:
                     # 涓婁紶鏁版嵁
-                    self.__upload_l2_data(code, upload_queue, temp_list)
+                    # self.__upload_l2_data(code, upload_queue, temp_list)
+                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code, temp_list)
                     temp_list = []
                 else:
                     if code not in self.temp_transaction_queue_dict:
@@ -254,6 +273,8 @@
         self.code_socket_client_dict = {}
         self.rlock = threading.RLock()
         context = zmq.Context()
+        if constant.is_windows():
+            return
         for host in self.ipchosts:
             socket = context.socket(zmq.REQ)
             socket.connect(host)

--
Gitblit v1.8.0