From 984e59be6787f06b927d5ec612f443f54e145044 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期四, 13 三月 2025 17:41:30 +0800
Subject: [PATCH] 真实下单位修改/L2成交处理速度提升

---
 l2/l2_transaction_data_processor.py |  142 +++++++++++++++++++++++++++++------------------
 1 files changed, 88 insertions(+), 54 deletions(-)

diff --git a/l2/l2_transaction_data_processor.py b/l2/l2_transaction_data_processor.py
index a818747..cf58136 100644
--- a/l2/l2_transaction_data_processor.py
+++ b/l2/l2_transaction_data_processor.py
@@ -1,6 +1,8 @@
 import logging
 import time
 
+import dask
+
 import constant
 from cancel_strategy.s_l_h_cancel_strategy import HourCancelBigNumComputer
 from cancel_strategy.s_l_h_cancel_strategy import LCancelBigNumComputer, LCancelRateManager
@@ -32,11 +34,11 @@
 
     # 璁$畻鎴愪氦杩涘害
     @classmethod
-    def __compute_latest_trade_progress(cls, code, buyno_map, datas):
+    def __compute_latest_trade_progress(cls, code, buyno_map, fdatas):
         buy_progress_index = None
-        for i in range(len(datas) - 1, -1, -1):
-            d = datas[i]
-            buy_no = f"{d[6]}"
+        for i in range(len(fdatas) - 1, -1, -1):
+            d = fdatas[i]
+            buy_no = f"{d[0][6]}"
             if buyno_map and buy_no in buyno_map:
                 # 鎴愪氦杩涘害浣嶅繀椤绘槸娑ㄥ仠涔�
                 if L2DataUtil.is_limit_up_price_buy(buyno_map[buy_no]["val"]):
@@ -45,61 +47,94 @@
         return buy_progress_index
 
     @classmethod
-    def statistic_big_order_infos(cls, code, datas, order_begin_pos: OrderBeginPosInfo):
+    def statistic_big_order_infos(cls, code, fdatas, order_begin_pos: OrderBeginPosInfo):
         """
         缁熻澶у崟鎴愪氦
         @param code:
-        @param datas:
+        @param fdatas: 鏍煎紡锛歔(鏁版嵁鏈韩, 鏄惁涓诲姩涔�, 鏄惁娑ㄥ仠, 鎬绘垚浜ら, 涓嶅惈ms鏃堕棿锛屽惈ms鏃堕棿)]
         @return:
         """
-        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
-        buy_datas, bigger_buy_datas = HuaXinBuyOrderManager.statistic_big_buy_data(code, datas, limit_up_price)
-        if buy_datas:
-            BigOrderDealManager().add_buy_datas(code, buy_datas)
-            active_big_buy_orders = []
+
+        @dask.delayed
+        def statistic_big_buy_data():
+            buy_datas, bigger_buy_datas = HuaXinBuyOrderManager.statistic_big_buy_data(code, fdatas, limit_up_price)
             if buy_datas:
-                for x in buy_datas:
-                    if x[0] > x[6]:
-                        # (涔板崟鍙�, 鎴愪氦閲戦, 鏈�鍚庢垚浜ゆ椂闂�)
-                        active_big_buy_orders.append((x[0], x[2], x[4]))
-            EveryLimitupBigDealOrderManager.add_big_buy_order_deal(code, active_big_buy_orders)
-        try:
-            is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
-            if is_placed_order:
-                if order_begin_pos and order_begin_pos.mode == OrderBeginPosInfo.MODE_RADICAL:
-                    RadicalBuyDataManager.big_order_deal(code)
+                BigOrderDealManager().add_buy_datas(code, buy_datas)
+                active_big_buy_orders = []
+                if buy_datas:
+                    for x in buy_datas:
+                        if x[0] > x[6]:
+                            # (涔板崟鍙�, 鎴愪氦閲戦, 鏈�鍚庢垚浜ゆ椂闂�)
+                            active_big_buy_orders.append((x[0], x[2], x[4]))
+                EveryLimitupBigDealOrderManager.add_big_buy_order_deal(code, active_big_buy_orders)
+            try:
+                is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
+                if is_placed_order:
+                    if order_begin_pos and order_begin_pos.mode == OrderBeginPosInfo.MODE_RADICAL:
+                        RadicalBuyDataManager.big_order_deal(code)
 
-                if bigger_buy_datas:
-                    # 鏈夊ぇ浜�50w鐨勫ぇ鍗曟垚浜�
-                    buyno_map = l2_data_util.local_today_buyno_map.get(code)
-                    if buyno_map:
-                        for buy_data in bigger_buy_datas:
-                            order_no = f"{buy_data[0]}"
-                            if order_no in buyno_map:
-                                LCancelBigNumComputer().add_deal_index(code, buyno_map[order_no]["index"],
-                                                                       order_begin_pos.buy_single_index)
-        except Exception as e:
-            logger_debug.exception(e)
+                    if bigger_buy_datas:
+                        # 鏈夊ぇ浜�50w鐨勫ぇ鍗曟垚浜�
+                        buyno_map = l2_data_util.local_today_buyno_map.get(code)
+                        if buyno_map:
+                            for buy_data in bigger_buy_datas:
+                                order_no = f"{buy_data[0]}"
+                                if order_no in buyno_map:
+                                    LCancelBigNumComputer().add_deal_index(code, buyno_map[order_no]["index"],
+                                                                           order_begin_pos.buy_single_index)
+            except Exception as e:
+                logger_debug.exception(e)
+            return buy_datas
 
-        sell_datas = HuaXinSellOrderStatisticManager.statistic_big_sell_data(code, datas)
-        if sell_datas:
-            BigOrderDealManager().add_sell_datas(code, sell_datas)
+        @dask.delayed
+        def statistic_big_sell_data():
+            sell_datas = HuaXinSellOrderStatisticManager.statistic_big_sell_data(code, fdatas)
+            if sell_datas:
+                BigOrderDealManager().add_sell_datas(code, sell_datas)
+            return sell_datas
 
+        @dask.delayed
+        def statistic_big_data(f1_, f2_):
+            temp_data = f1_, f2_
+            return temp_data
+
+        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
+        # 骞惰澶勭悊涔板崟涓庡崠鍗�
+        f1 = statistic_big_buy_data()
+        f2 = statistic_big_sell_data()
+        dask_result = statistic_big_data(f1, f2)
+        buy_datas, sell_datas = dask_result.compute()
         if buy_datas or sell_datas:
             buy_money = BigOrderDealManager().get_total_buy_money(code)
             sell_money = BigOrderDealManager().get_total_sell_money(code)
             LCancelRateManager.set_big_num_deal_info(code, buy_money, sell_money)
 
     @classmethod
-    def process_huaxin_transaction_datas(cls, code, datas):
+    def process_huaxin_transaction_datas(cls, code, o_datas):
+        # TODO 鏁村舰鏁版嵁锛屾牸寮忥細[(鏁版嵁鏈韩, 鏄惁涓诲姩涔�, 鏄惁娑ㄥ仠, 鎬绘垚浜ら, 涓嶅惈ms鏃堕棿锛屽惈ms鏃堕棿)]
+        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
+        # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
+        #                   data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
+        #                   data['SellNo'], data['ExecType']))
+        fdatas = [
+            [d, d[6] > d[7], limit_up_price == d[1], d[1] * d[2], '', '']
+            for d in o_datas]
+        temp_time_dict = {}
+        for d in fdatas:
+            if d[3] not in temp_time_dict:
+                temp_time_dict[d[3]] = l2_huaxin_util.convert_time(d[3], with_ms=True)
+            d[5] = temp_time_dict.get(d[3])
+            d[4] = d[5][:8]
+        temp_time_dict.clear()
+
         __start_time = time.time()
         limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
         # 璁剧疆鎴愪氦浠�
         try:
-            current_price_process_manager.set_trade_price(code, datas[-1][1])
-            if limit_up_price > datas[-1][1]:
+            current_price_process_manager.set_trade_price(code, fdatas[-1][0][1])
+            if limit_up_price > fdatas[-1][0][1]:
                 # 娌℃湁娑ㄥ仠
-                EveryLimitupBigDealOrderManager.open_limit_up(code, f"鏈�鏂版垚浜や环锛歿datas[-1][1]}")
+                EveryLimitupBigDealOrderManager.open_limit_up(code, f"鏈�鏂版垚浜や环锛歿fdatas[-1][0][1]}")
                 radical_buy_strategy.clear_data(code)
         except:
             pass
@@ -116,13 +151,13 @@
             is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
 
             _start_time = time.time()
-            L2LimitUpSellDataManager.set_deal_datas(code, datas)
+            L2LimitUpSellDataManager.set_deal_datas(code, fdatas)
             use_time_list.append(("缁熻娑ㄥ仠鍗栨垚浜�", time.time() - _start_time))
             _start_time = time.time()
             #  澶у崟缁熻
             # cls.__statistic_thread_pool.submit(cls.statistic_big_order_infos, code, datas, order_begin_pos)
             try:
-                cls.statistic_big_order_infos(code, datas, order_begin_pos)
+                cls.statistic_big_order_infos(code, fdatas, order_begin_pos)
             except Exception as e:
                 async_log_util.error(hx_logger_l2_debug, f"缁熻澶у崟鍑洪敊锛歿str(e)}")
             use_time_list.append(("缁熻澶у崟鏁版嵁", time.time() - _start_time))
@@ -132,24 +167,22 @@
             try:
                 # 缁熻涓婃澘鏃堕棿
                 try:
-                    for d in datas:
-                        if d[6] > d[7]:
+                    for d in fdatas:
+                        if d[1]:
                             # 涓诲姩涔�
-                            if d[1] == limit_up_price:
+                            if d[2]:
                                 # 娑ㄥ仠
-                                current_price_process_manager.set_latest_not_limit_up_time(code,
-                                                                                           l2_huaxin_util.convert_time(
-                                                                                               d[3], with_ms=True))
+                                current_price_process_manager.set_latest_not_limit_up_time(code, d[5])
                         else:
                             # 涓诲姩鍗栵紙鏉夸笂锛�
-                            if d[1] == limit_up_price:
+                            if d[2]:
                                 L2LimitUpSellDataManager.clear_data(code)
                                 break
                 except:
                     pass
 
                 # 缁熻鍗栧崟
-                big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, datas, limit_up_price)
+                big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, fdatas, limit_up_price)
 
                 use_time_list.append(("澶勭悊鍗栧崟鎴愪氦鏁版嵁", time.time() - _start_time))
                 _start_time = time.time()
@@ -171,10 +204,10 @@
                                                                                             order_begin_pos)
                         cancel_type = trade_constant.CANCEL_TYPE_P
                     # 鍒ゆ柇鏃堕棿鏄惁涓庢湰鍦版椂闂寸浉宸�5s浠ヤ笂
-                    if tool.trade_time_sub(tool.get_now_time_str(), l2_huaxin_util.convert_time(datas[-1][3])) > 10:
+                    if tool.trade_time_sub(tool.get_now_time_str(), fdatas[-1][4]) > 10:
                         now_seconds = int(tool.get_now_time_str().replace(":", ""))
                         if now_seconds < int("093100"):  # or int("130000") <= now_seconds < int("130200"):
-                            need_cancel, cancel_msg = True, f"鎴愪氦鏃堕棿涓庢湰鍦版椂闂寸浉宸�10S浠ヤ笂锛寋l2_huaxin_util.convert_time(datas[-1][3])}"
+                            need_cancel, cancel_msg = True, f"鎴愪氦鏃堕棿涓庢湰鍦版椂闂寸浉宸�10S浠ヤ笂锛寋fdatas[-1][4]}"
                             cancel_type = trade_constant.CANCEL_TYPE_L2_DELAY
                     if need_cancel:
                         L2TradeDataProcessor.cancel_buy(code, cancel_msg, cancel_type=cancel_type)
@@ -183,7 +216,7 @@
                     use_time_list.append(("澶勭悊鍗栧崟鐩稿叧鎾ゆ暟鎹�", time.time() - _start_time))
                     _start_time = time.time()
                 # 缁熻娑ㄥ仠鍗栨垚浜�
-                HuaXinSellOrderStatisticManager.statistic_total_deal_volume(code, datas, limit_up_price)
+                HuaXinSellOrderStatisticManager.statistic_total_deal_volume(code, fdatas, limit_up_price)
                 use_time_list.append(("缁熻鎴愪氦閲忔暟鎹�", time.time() - _start_time))
             except Exception as e:
                 async_log_util.error(logger_debug, f"鍗栧崟缁熻寮傚父锛歿big_sell_order_info}")
@@ -193,7 +226,7 @@
             # if big_money_count > 0:
             #     LCancelRateManager.compute_big_num_deal_rate(code)
 
-            buy_progress_index = cls.__compute_latest_trade_progress(code, buyno_map, datas)
+            buy_progress_index = cls.__compute_latest_trade_progress(code, buyno_map, fdatas)
 
             if buy_progress_index is not None:
                 buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
@@ -246,4 +279,5 @@
             use_time = int((time.time() - __start_time) * 1000)
             if use_time > 5:
                 l2_log.info(code, hx_logger_l2_upload,
-                            f"{code}澶勭悊鎴愪氦鐢ㄦ椂锛歿use_time} 鏁版嵁鏁伴噺锛歿len(datas)}  璇︽儏:{use_time_list}")
+                            f"{code}澶勭悊鎴愪氦鐢ㄦ椂锛歿use_time} 鏁版嵁鏁伴噺锛歿len(fdatas)}  璇︽儏:{use_time_list}")
+

--
Gitblit v1.8.0