From eb33b717023d9871bd74e6dce47a065228cffefc Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期四, 02 十一月 2023 11:23:09 +0800
Subject: [PATCH] L2进程与策略进程分开

---
 huaxin_client/l2_client.py |  108 +++++++++++++++++++++--------------------------------
 1 files changed, 43 insertions(+), 65 deletions(-)

diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py
index 4da4698..74d425c 100644
--- a/huaxin_client/l2_client.py
+++ b/huaxin_client/l2_client.py
@@ -6,12 +6,15 @@
 import queue
 import threading
 import time
+from typing import List
 
 from huaxin_client import command_manager, l2_data_transform_protocol
 from huaxin_client import constant
 from huaxin_client import l2_data_manager
 import lev2mdapi
+from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager
 from huaxin_client.command_manager import L2ActionCallback
+from huaxin_client.l2_data_manager import L2DataUploadManager
 from log_module import log, async_log_util
 from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \
     logger_local_huaxin_g_cancel, logger_l2_codes_subscript
@@ -40,7 +43,6 @@
 SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725",
                  b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
 SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
-spi = None
 set_codes_data_queue = queue.Queue()
 market_code_dict = {}
 
@@ -56,10 +58,11 @@
 
     # 涔板叆鐨勫ぇ鍗曡鍗曞彿
 
-    def __init__(self, api):
+    def __init__(self, api, l2_data_upload_manager):
         lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
         self.__api = api
         self.is_login = False
+        self.l2_data_upload_manager = l2_data_upload_manager
 
     def __split_codes(self, codes):
         szse_codes = []
@@ -124,12 +127,16 @@
         add_codes = codes - self.subscripted_codes
         del_codes = self.subscripted_codes - codes
         print("add del codes", add_codes, del_codes)
-        for c in codes:
-            l2_data_manager.add_target_code(c)
-        for c in del_codes:
-            l2_data_manager.del_target_code(c)
-        for c in add_codes:
-            l2_data_manager.run_upload_task(c, l2_data_callback)
+        try:
+            for c in del_codes:
+                self.l2_data_upload_manager.release_distributed_upload_queue(c)
+                l2_data_manager.del_target_code(c)
+            for c in codes:
+                self.l2_data_upload_manager.distribute_upload_queue(c)
+                l2_data_manager.add_target_code(c)
+        except Exception as e:
+            logger_system.error(f"L2浠g爜鍒嗛厤涓婁紶闃熷垪鍑洪敊:{str(e)}")
+            logger_system.exception(e)
         self.__subscribe(add_codes)
         self.__unsubscribe(del_codes)
 
@@ -268,8 +275,7 @@
                      (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
                  ]}
             market_code_dict[pDepthMarketData['SecurityID']] = time.time()
-
-            l2_data_manager.add_market_data(d)
+            self.l2_data_upload_manager.add_market_data(d)
         except:
             pass
 
@@ -289,12 +295,6 @@
         min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
         # 杈撳嚭閫愮瑪鎴愪氦鏁版嵁
         if pTransaction['ExecType'] == b"2":
-            # G鎾ゆ暟鎹殏鏃舵敞閲�
-            # transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code)
-            # if transaction_big_order_nos and pTransaction['BuyNo'] in transaction_big_order_nos:
-            #     # 姝e湪鎴愪氦鐨勮鍗曟挙鍗曚簡
-            #     l2_data_manager.trading_order_canceled(code, pTransaction['BuyNo'])
-            #     async_log_util.info(logger_local_huaxin_g_cancel, f"G鎾ゆ挙鍗曪細{code} - {pTransaction['BuyNo']}")
             if min_volume is None:
                 # 榛樿绛涢��50w
                 if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
@@ -320,8 +320,7 @@
                 item["Side"] = "2"
             # 娣辫瘉鎾ゅ崟
             print("閫愮瑪濮旀墭", item)
-
-            l2_data_manager.add_l2_order_detail(item, 0, True)
+            self.l2_data_upload_manager.add_l2_order_detail(item, 0, True)
         else:
             if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201:
                 # 娑ㄥ仠浠�
@@ -338,7 +337,7 @@
                 #     return
                 # self.__last_transaction_keys_dict[code] = key
                 # print("閫愮瑪鎴愪氦", item)
-                l2_data_manager.add_transaction_detail(item)
+                self.l2_data_upload_manager.add_transaction_detail(item)
 
     def OnRtnOrderDetail(self, pOrderDetail):
         can_listen = False
@@ -346,7 +345,8 @@
         start_time = 0
         if code in self.special_code_volume_for_order_dict:
             start_time = time.time()
-            if self.special_code_volume_for_order_dict[code][0] == pOrderDetail['Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']:
+            if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[
+                'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']:
                 # 鐩戞帶鐩爣璁㈠崟涓庡奖瀛愯鍗�
                 if self.special_code_volume_for_order_dict[code][1] > time.time():
                     # 鐗规畩閲忕洃鍚�
@@ -354,13 +354,6 @@
                 else:
                     self.special_code_volume_for_order_dict.pop(code)
         if not can_listen:
-            # 鏆傛椂娉ㄩ噴鎺塆鎾ょ浉鍏虫暟鎹骇鐢�
-            # if pOrderDetail['OrderStatus'] == b'D':
-            #     transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code)
-            #     if transaction_big_order_nos and pOrderDetail['OrderNO'] in transaction_big_order_nos:
-            #         # 姝e湪鎴愪氦鐨勮鍗曟挙鍗曚簡
-            #         l2_data_manager.trading_order_canceled(code, pOrderDetail['OrderNO'])
-            #         async_log_util.info(logger_local_huaxin_g_cancel, f"G鎾ゆ挙鍗曪細{code} - {pOrderDetail['OrderNO']}")
             min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
             if min_volume is None:
                 # 榛樿绛涢��50w
@@ -376,7 +369,7 @@
                 "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
                 "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
                 "OrderStatus": pOrderDetail['OrderStatus'].decode()}
-        l2_data_manager.add_l2_order_detail(item, start_time)
+        self.l2_data_upload_manager.add_l2_order_detail(item, start_time)
 
     def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum,
                             FirstLevelSellOrderVolumes):
@@ -481,7 +474,7 @@
 
 class MyL2ActionCallback(L2ActionCallback):
 
-    def OnSetL2Position(self, client_id, request_id, codes_data):
+    def OnSetL2Position(self, codes_data):
         print("L2璁㈤槄鏁伴噺锛�", len(codes_data))
         logger_l2_codes_subscript.info("鍗庨懌L2浠g爜澶勭悊闃熷垪鑾峰彇鍒版暟鎹細鏁伴噺-{}", len(codes_data))
         try:
@@ -490,7 +483,7 @@
             logging.exception(e)
 
 
-def __init_l2():
+def __init_l2(l2_data_upload_manager):
     print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion())
     # case 1: Tcp鏂瑰紡
     # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP
@@ -502,7 +495,7 @@
     # case 2闈炵紦瀛樻ā寮�
     api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False)
     global spi
-    spi = Lev2MdSpi(api)
+    spi = Lev2MdSpi(api, l2_data_upload_manager)
     api.RegisterSpi(spi)
     # -------------------姝e紡妯″紡-------------------------------------
     if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST:
@@ -522,32 +515,21 @@
     api.Init()
 
 
-def __receive_from_pipe_trade(queue_trade_w_l2_r: multiprocessing.Queue):
+def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue):
     logger_system.info(f"l2_client __receive_from_pipe_trade 绾跨▼ID:{tool.get_thread_id()}")
     while True:
         try:
             value = queue_trade_w_l2_r.get()
             if value:
-                value = value.decode("utf-8")
+                if type(value) == bytes:
+                    value = value.decode("utf-8")
                 data = json.loads(value)
-                if data["type"] == "listen_volume":
+                _type = data["type"]
+                if _type == "listen_volume":
                     volume = data["data"]["volume"]
                     code = data["data"]["code"]
                     spi.set_code_special_watch_volume(code, volume)
-        except Exception as e:
-            logging.exception(e)
-
-
-def __receive_from_pipe_strategy(pipe_):
-    logger_system.info(f"l2_client __receive_from_pipe_strategy 绾跨▼ID:{tool.get_thread_id()}")
-    while True:
-        # print("__receive_from_pipe_strategy")
-        try:
-            val = pipe_.recv()
-            if val:
-                print("L2瀹㈡埛绔帴鍙楀埌鏁版嵁")
-                data = json.loads(val)
-                if data["data"]["type"] == "l2_cmd":
+                elif _type == "l2_cmd":
                     l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
         except Exception as e:
             logging.exception(e)
@@ -556,28 +538,24 @@
 pipe_strategy = None
 
 
-def run(queue_trade_w_l2_r: multiprocessing.Queue, _pipe_strategy,
-        _l2_data_callback: l2_data_transform_protocol.L2DataCallBack) -> None:
+def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue],
+        transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None:
     logger_system.info("L2杩涚▼ID锛歿}", os.getpid())
     logger_system.info(f"l2_client 绾跨▼ID:{tool.get_thread_id()}")
     try:
         log.close_print()
-        if queue_trade_w_l2_r is not None:
-            t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(queue_trade_w_l2_r), daemon=True)
+        if queue_r is not None:
+            t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True)
             t1.start()
-        if _pipe_strategy is not None:
-            global pipe_strategy
-            pipe_strategy = _pipe_strategy
-            t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(_pipe_strategy,), daemon=True)
-            t1.start()
-        __init_l2()
-        global l2_data_callback
-        l2_data_callback = _l2_data_callback
-        l2_data_manager.run_upload_common(l2_data_callback)
-        l2_data_manager.run_upload_trading_canceled(l2_data_callback)
+
+        # 鍒濆鍖�
+        order_queue_distribute_manager = CodeQueueDistributeManager(order_queues)
+        transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues)
+        l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager,
+                                                     transaction_queue_distribute_manager, market_queue)
+        __init_l2(l2_data_upload_manager)
+        l2_data_manager.run_upload_common()
         l2_data_manager.run_log()
-        l2_data_manager.run_upload_daemon(l2_data_callback)
-        # l2_data_manager.run_test(l2_data_callback)
         global l2CommandManager
         l2CommandManager = command_manager.L2CommandManager()
         l2CommandManager.init(MyL2ActionCallback())
@@ -589,6 +567,6 @@
 
 
 if __name__ == "__main__":
-    run(None, None, None)
+    # run(None, None, None)
     # spi.set_codes_data([("000333", 12000)])
     input()

--
Gitblit v1.8.0