From eb33b717023d9871bd74e6dce47a065228cffefc Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期四, 02 十一月 2023 11:23:09 +0800
Subject: [PATCH] L2进程与策略进程分开

---
 huaxin_client/l2_data_manager.py |  315 +++++++++------------------------------------------
 1 files changed, 59 insertions(+), 256 deletions(-)

diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py
index 9a4ae30..0c00579 100644
--- a/huaxin_client/l2_data_manager.py
+++ b/huaxin_client/l2_data_manager.py
@@ -1,19 +1,18 @@
 # -*- coding: utf-8 -*-
 import json
 import logging
+import multiprocessing
 import queue
-import random
 import threading
 import time
-from huaxin_client import socket_util, l2_data_transform_protocol
+from huaxin_client import socket_util
 
 from huaxin_client.client_network import SendResponseSkManager
 
 # 娲诲姩鏃堕棿
-from huaxin_client.l2_data_transform_protocol import L2DataCallBack
-from log_module import log_export, async_log_util
-from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \
-    logger_local_huaxin_g_cancel, hx_logger_contact_debug, logger_system, logger_local_huaxin_l2_orderdetail
+from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
+from log_module import  async_log_util
+from log_module.log import logger_local_huaxin_l2_error, logger_system
 from utils import tool
 
 order_detail_upload_active_time_dict = {}
@@ -24,12 +23,51 @@
 target_codes = set()
 target_codes_add_time = {}
 common_queue = queue.Queue()
-trading_canceled_queue = queue.Queue()
-log_buy_no_queue = queue.Queue()
-# 涔板叆璁㈠崟鍙风殑瀛楀吀
-buy_order_nos_dict = {}
-# 鏈�杩戠殑澶у崟鎴愪氦鍗曞彿
-latest_big_order_transaction_orders_dict = {}
+
+
+# L2涓婁紶鏁版嵁绠$悊鍣�
+class L2DataUploadManager:
+    def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager,
+                 transaction_queue_distribute_manager: CodeQueueDistributeManager,
+                 market_data_queue: multiprocessing.Queue):
+        self.order_queue_distribute_manager = order_queue_distribute_manager
+        self.transaction_queue_distribute_manager = transaction_queue_distribute_manager
+        self.market_data_queue = market_data_queue
+
+    # 娣诲姞濮旀墭璇︽儏
+    def add_l2_order_detail(self, data, start_time, istransaction=False):
+        code = data["SecurityID"]
+        queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
+        if not queue_info:
+            return
+        queue_info[1].put_nowait(
+            (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
+             data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
+
+    # 娣诲姞閫愮瑪鎴愪氦
+    def add_transaction_detail(self, data):
+        code = data["SecurityID"]
+        queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
+        if not queue_info:
+            return
+        # 鍒ゆ柇鏄惁涓哄ぇ鍗曟垚浜�
+        queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
+                                  data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
+                                  data['SellNo'], data['ExecType']))
+
+    def add_market_data(self, data):
+        # 鍔犲叆涓婁紶闃熷垪
+        self.market_data_queue.put_nowait(data)
+
+    # 鍒嗛厤涓婁紶闃熷垪
+    def distribute_upload_queue(self, code):
+        self.order_queue_distribute_manager.distribute_queue(code)
+        self.transaction_queue_distribute_manager.distribute_queue(code)
+
+    # 閲婃斁宸茬粡鍒嗛厤鐨勯槦鍒�
+    def release_distributed_upload_queue(self, code):
+        self.order_queue_distribute_manager.release_distribute_queue(code)
+        self.transaction_queue_distribute_manager.release_distribute_queue(code)
 
 
 def add_target_code(code):
@@ -42,89 +80,6 @@
     target_codes.discard(code)
     if code in target_codes_add_time:
         target_codes_add_time.pop(code)
-
-
-# 鑾峰彇鏈�杩戠殑澶у崟鎴愪氦璁㈠崟鍙�
-def get_latest_transaction_order_nos(code):
-    return latest_big_order_transaction_orders_dict.get(code)
-
-
-# 姝e湪鎴愪氦鐨勮鍗曟挙鍗曚簡
-def trading_order_canceled(code_, order_no):
-    trading_canceled_queue.put((code_, order_no))
-
-
-# 娣诲姞濮旀墭璇︽儏
-def add_l2_order_detail(data, start_time, istransaction=False):
-    code = data["SecurityID"]
-    # 寮傛鏃ュ織璁板綍
-    if code not in tmep_order_detail_queue_dict:
-        tmep_order_detail_queue_dict[code] = queue.Queue()
-    # 鍘熸潵鐨勬牸寮�
-    # {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
-    #                 "Volume": pOrderDetail['Volume'],
-    #                 "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(),
-    #                 "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
-    #                 "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
-    #                 "OrderStatus": pOrderDetail['OrderStatus'].decode()}
-
-    # 鐢ㄤ簬G鎾ょ殑鏁版嵁锛屾殏鏃舵敞閲�
-    # if data['Side'] == "1":
-    #     # 璁板綍鎵�鏈変拱鍏ョ殑璁㈠崟鍙�
-    #     if data['SecurityID'] not in buy_order_nos_dict:
-    #         buy_order_nos_dict[data['SecurityID']] = set()
-    #     buy_order_nos_dict[data['SecurityID']].add(data['OrderNO'])
-    #     # 涔板叆璁㈠崟鍙烽渶瑕佽褰曟棩蹇�
-    #     async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_buy_no, f"{data['SecurityID']}#{data['OrderNO']}")
-
-    tmep_order_detail_queue_dict[code].put(
-        (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
-         data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
-
-
-# 娣诲姞閫愮瑪鎴愪氦
-def add_transaction_detail(data):
-    code = data["SecurityID"]
-    if code not in tmep_transaction_queue_dict:
-        tmep_transaction_queue_dict[code] = queue.Queue()
-    # 鍘熸潵鐨勬牸寮�
-    #  item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
-    #                     "TradeVolume": pTransaction['TradeVolume'],
-    #                     "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
-    #                     "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'],
-    #                     "ExecType": pTransaction['ExecType'].decode()}
-
-    # 鍒ゆ柇鏄惁涓哄ぇ鍗曟垚浜�
-    code = data['SecurityID']
-    # G鎾ょ浉鍏虫暟鎹搷浣滄殏鏃舵敞閲�
-    # if code in buy_order_nos_dict:
-    #     if data['BuyNo'] in buy_order_nos_dict[code]:
-    #         try:
-    #             temp_list = latest_big_order_transaction_orders_dict.get(code)
-    #             if not temp_list:
-    #                 temp_list = []
-    #             if temp_list:
-    #                 if temp_list[-1] != data['BuyNo']:
-    #                     # 涓嶅姞鍏ラ噸澶嶈鍗曞彿
-    #                     temp_list.append(data['BuyNo'])
-    #                     if len(temp_list) > 10:
-    #                         # 鏈�澶氬姞10涓鍗曞彿
-    #                         temp_list = temp_list[-10:]
-    #             else:
-    #                 temp_list.append(data['BuyNo'])
-    #             latest_big_order_transaction_orders_dict[code] = temp_list
-    #         except:
-    #             pass
-    tmep_transaction_queue_dict[code].put((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
-                                           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
-                                           data['SellNo'], data['ExecType']))
-
-
-def add_market_data(data):
-    code = data['securityID']
-    # 鍔犲叆涓婁紶闃熷垪
-    common_queue.put((code, "l2_market_data", data))
-
 
 def add_subscript_codes(codes):
     print("add_subscript_codes", codes)
@@ -165,7 +120,6 @@
 
 # 涓婁紶鏁版嵁
 def upload_data(code, _type, datas, new_sk=False):
-    uid = random.randint(0, 100000)
     key = f"{_type}_{code}"
     fdata = json.dumps(
         {"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}})
@@ -180,76 +134,16 @@
         logging.exception(e)
     finally:
         pass
-        # print("璇锋眰缁撴潫", uid, result)
-        # logger_local_huaxin_l2_upload.info(
-        #     f"{code} 涓婁紶鏁版嵁鑰楁椂-{_type}锛� {round((time.time() - start_time) * 1000, 1)} 鏁版嵁閲�:{len(datas)}")
-    # print("涓婁紶缁撴灉", result)
 
 
-# 寰幆璇诲彇涓婁紶鏁版嵁
-def __run_upload_order(code: str, l2_data_callback: L2DataCallBack) -> None:
-    if code not in tmep_order_detail_queue_dict:
-        tmep_order_detail_queue_dict[code] = queue.Queue()
-    if True:
-        while True:
-            # print("order task")
-            try:
-                if code not in target_codes:
-                    break
-                order_detail_upload_active_time_dict[code] = time.time()
-                udatas = []
-                while not tmep_order_detail_queue_dict[code].empty():
-                    temp = tmep_order_detail_queue_dict[code].get()
-                    udatas.append(temp)
-                if udatas:
-                    # start_time = time.time()
-                    # upload_data(code, "l2_order", udatas)
-                    l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000))
-                    # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas)
-                    # use_time = int((time.time() - start_time) * 1000)
-                    # if use_time > 10:
-                    #     async_log_util.info(logger_local_huaxin_l2_upload, f"{code}-涓婁紶浠g爜鑰楁椂锛歿use_time}ms")
-                else:
-                    # 娌℃湁鏁版嵁鐨勬椂鍊欓渶绛夊緟锛屾湁鏁版嵁鏃朵笉闇�绛夊緟
-                    time.sleep(0.001)
-            except Exception as e:
-                hx_logger_contact_debug.exception(e)
-                logger_local_huaxin_l2_error.error(f"涓婁紶璁㈠崟鏁版嵁鍑洪敊锛歿str(e)}")
-                pass
-
-
-def __run_upload_trans(code, l2_data_callback: L2DataCallBack):
-    if code not in tmep_transaction_queue_dict:
-        tmep_transaction_queue_dict[code] = queue.Queue()
-    while True:
-        # print("trans task")
-        try:
-            if code not in target_codes:
-                break
-            transaction_upload_active_time_dict[code] = time.time()
-            udatas = []
-            while not tmep_transaction_queue_dict[code].empty():
-                temp = tmep_transaction_queue_dict[code].get()
-                udatas.append(temp)
-            if udatas:
-                # upload_data(code, "l2_trans", udatas)
-                l2_data_callback.OnL2Transaction(code, udatas)
-            time.sleep(0.01)
-        except Exception as e:
-            logger_local_huaxin_l2_error.error(f"涓婁紶鎴愪氦鏁版嵁鍑洪敊锛歿str(e)}")
-
-
-def __run_upload_common(l2_data_callback: L2DataCallBack):
+def __run_upload_common():
     print("__run_upload_common")
     logger_system.info(f"l2_client __run_upload_common 绾跨▼ID:{tool.get_thread_id()}")
     while True:
         try:
             while not common_queue.empty():
                 temp = common_queue.get()
-                if temp[1] == "l2_market_data":
-                    l2_data_callback.OnMarketData(temp[0], temp[2])
-                else:
-                    upload_data(temp[0], temp[1], temp[2])
+                upload_data(temp[0], temp[1], temp[2])
 
         except Exception as e:
             logger_local_huaxin_l2_error.exception(e)
@@ -258,121 +152,30 @@
             time.sleep(0.01)
 
 
-def __run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
-    print("__run_upload_trading_canceled")
-    logger_system.info(f"l2_client __run_upload_trading_canceled 绾跨▼ID:{tool.get_thread_id()}")
-    while True:
-        try:
-            temp = trading_canceled_queue.get()
-            if temp:
-                logger_local_huaxin_g_cancel.info(f"鍑嗗涓婃姤锛歿temp}")
-                # upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True)
-                l2_data_callback.OnTradingOrderCancel(temp[0], temp[1])
-                logger_local_huaxin_g_cancel.info(f"涓婃姤鎴愬姛锛歿temp}")
-        except Exception as e:
-            logger_local_huaxin_l2_error.exception(e)
-            logger_local_huaxin_l2_error.error(f"涓婁紶鏅�氭暟鎹嚭閿欙細{str(e)}")
-
-
 def __run_log():
     print("__run_log")
     logger_system.info(f"l2_client __run_log 绾跨▼ID:{tool.get_thread_id()}")
     async_log_util.huaxin_l2_log.run_sync()
 
 
-__upload_order_threads = {}
-__upload_trans_threads = {}
-
-
-# 杩愯涓婁紶浠诲姟
-def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None:
-    try:
-        # 濡傛灉浠g爜娌℃湁鍦ㄧ洰鏍囦唬鐮佷腑灏变笉闇�瑕佽繍琛�
-        if code not in target_codes:
-            return
-        # 濡傛灉鏈�杩戠殑娲诲姩鏃堕棿灏忎簬2s灏变笉闇�瑕佽繍琛�
-        if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[
-            code] > 2:
-            t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True)
-            t.start()
-            __upload_order_threads[code] = t
-
-        if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[
-            code] > 2:
-            t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True)
-            t.start()
-            __upload_trans_threads[code] = t
-    finally:
-        pass
-
-
-def run_upload_common(l2_data_callback: L2DataCallBack):
-    t = threading.Thread(target=lambda: __run_upload_common(l2_data_callback), daemon=True)
-    t.start()
-
-
-def run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
-    t = threading.Thread(target=lambda: __run_upload_trading_canceled(l2_data_callback), daemon=True)
+# 閲囩敤socket浼犺緭鏁版嵁
+def run_upload_common():
+    t = threading.Thread(target=lambda: __run_upload_common(), daemon=True)
     t.start()
 
 
 def run_log():
-    # G鎾ょ浉鍏虫暟鎹紝鏆傛椂娉ㄩ噴
-    # fdatas = log_export.load_huaxin_local_buy_no()
-    # global buy_order_nos_dict
-    # buy_order_nos_dict = fdatas
     t = threading.Thread(target=lambda: __run_log(), daemon=True)
     t.start()
 
 
-# 杩愯瀹堟姢绾跨▼
-def run_upload_daemon(_l2_data_callback):
-    def upload_daemon():
-        logger_system.info(f"l2_client upload_daemon 绾跨▼ID:{tool.get_thread_id()}")
-        while True:
-            try:
-                for code in target_codes_add_time:
-                    # 鐩爣浠g爜鍔犲叆2s涔嬪悗鍚姩瀹堟姢
-                    if time.time() - target_codes_add_time[code] > 2:
-                        if code not in __upload_order_threads or not __upload_order_threads[code].is_alive():
-                            t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback),
-                                                 daemon=True)
-                            t.start()
-                            __upload_order_threads[code] = t
-                            logger_local_huaxin_l2_upload.info(f"閲嶆柊鍒涘缓L2璁㈠崟涓婁紶绾跨▼锛歿code}")
-                        if code not in __upload_trans_threads or not __upload_trans_threads[code].is_alive():
-                            t = threading.Thread(target=lambda: __run_upload_trans(code, _l2_data_callback),
-                                                 daemon=True)
-                            t.start()
-                            __upload_trans_threads[code] = t
-                            logger_local_huaxin_l2_upload.info(f"閲嶆柊鍒涘缓L2鎴愪氦涓婁紶绾跨▼锛歿code}")
-            except:
-                pass
-            finally:
-                time.sleep(3)
-
-    t = threading.Thread(target=lambda: upload_daemon(), daemon=True)
-    t.start()
-
-
-def __test(_l2_data_callback):
+def __test():
     code = "002073"
-    if code not in tmep_order_detail_queue_dict:
-        tmep_order_detail_queue_dict[code] = queue.Queue()
-    target_codes.add(code)
-    t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True)
-    t.start()
-    while True:
-        try:
-            tmep_order_detail_queue_dict[code].put_nowait(
-                ['002073', 0.0, 88100, '1', '2', 103831240, 2011, 18190761, 18069131, 'D', 1693276711224])
-            time.sleep(5)
-        except:
-            pass
+    pass
 
 
-def run_test(_l2_data_callback):
-    t = threading.Thread(target=lambda: __test(_l2_data_callback), daemon=True)
+def run_test():
+    t = threading.Thread(target=lambda: __test(), daemon=True)
     t.start()
 
 

--
Gitblit v1.8.0