From b51b2ae184fad5aaf37a78903987e064f192d430 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 26 五月 2025 11:35:20 +0800
Subject: [PATCH] 大单解析修改

---
 l2/l2_transaction_data_processor.py |  104 +++++++++++++++++++++++++++++----------------------
 1 files changed, 59 insertions(+), 45 deletions(-)

diff --git a/l2/l2_transaction_data_processor.py b/l2/l2_transaction_data_processor.py
index 89dbfa3..aac5019 100644
--- a/l2/l2_transaction_data_processor.py
+++ b/l2/l2_transaction_data_processor.py
@@ -33,6 +33,8 @@
 class HuaXinTransactionDatasProcessor:
     __statistic_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=constant.HUAXIN_L2_MAX_CODES_COUNT + 2)
     __TradeBuyQueue = transaction_progress.TradeBuyQueue()
+    # 闈炴定鍋滄垚浜ゆ椂闂�
+    __not_limit_up_time_dict = {}
 
     # 璁$畻鎴愪氦杩涘害
     @classmethod
@@ -60,9 +62,11 @@
         @return:
         """
 
-        @dask.delayed
         def statistic_big_buy_data():
+            use_time_list = []
+            __start_time = time.time()
             buy_datas, bigger_buy_datas = HuaXinBuyOrderManager.statistic_big_buy_data(code, fdatas, limit_up_price)
+            use_time_list.append((time.time() - __start_time, "涔板崟缁熻"))
             if buy_datas:
                 BigOrderDealManager().add_buy_datas(code, buy_datas)
                 active_big_buy_orders = []
@@ -72,6 +76,7 @@
                             # (涔板崟鍙�, 鎴愪氦閲戦, 鏈�鍚庢垚浜ゆ椂闂�)
                             active_big_buy_orders.append((x[0], x[2], x[4]))
                 EveryLimitupBigDealOrderManager.add_big_buy_order_deal(code, active_big_buy_orders)
+                use_time_list.append((time.time() - __start_time, "涔板崟缁熻缁撴灉澶勭悊"))
             try:
                 is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
                 if is_placed_order:
@@ -89,26 +94,40 @@
                                                                            order_begin_pos.buy_single_index)
             except Exception as e:
                 logger_debug.exception(e)
+            if use_time_list and use_time_list[-1][0] > 0.005:
+                l2_log.info(code, hx_logger_l2_upload,
+                            f"涔板崟缁熻+澶勭悊鑰楁椂锛歿use_time_list[-1][0]}  璇︽儏:{use_time_list}")
+
             return buy_datas
 
-        @dask.delayed
         def statistic_big_sell_data():
+            use_time_list = []
+            __start_time = time.time()
             sell_datas = HuaXinSellOrderStatisticManager.statistic_big_sell_data(code, fdatas)
             if sell_datas:
                 BigOrderDealManager().add_sell_datas(code, sell_datas)
+            use_time_list.append((time.time() - __start_time, "鍗栧崟缁熻"))
+            if use_time_list and use_time_list[-1][0] > 0.005:
+                l2_log.info(code, hx_logger_l2_upload,
+                            f"鍗栧崟缁熻+澶勭悊鑰楁椂锛歿use_time_list[-1][0]}  璇︽儏:{use_time_list}")
             return sell_datas
 
-        @dask.delayed
         def statistic_big_data(f1_, f2_):
             temp_data = f1_, f2_
             return temp_data
 
         limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
-        # 骞惰澶勭悊涔板崟涓庡崠鍗�
-        f1 = statistic_big_buy_data()
-        f2 = statistic_big_sell_data()
-        dask_result = statistic_big_data(f1, f2)
-        buy_datas, sell_datas = dask_result.compute()
+
+        if False and len(fdatas) > 100:
+            # 骞惰澶勭悊涔板崟涓庡崠鍗�
+            # 瓒呰繃100鏉℃暟鎹墠闇�瑕佸苟琛屽鐞�
+            f1 = dask.delayed(statistic_big_buy_data)()
+            f2 = dask.delayed(statistic_big_sell_data)()
+            dask_result = dask.delayed(statistic_big_data)(f1, f2)
+            buy_datas, sell_datas = dask_result.compute()
+        else:
+            buy_datas = statistic_big_buy_data()
+            sell_datas = statistic_big_sell_data()
         # L鎾ょ殑姣斾緥涓庝拱鍗栧ぇ鍗曟棤鐩存帴鍏崇郴浜�
         # if buy_datas or sell_datas:
         #     buy_money = BigOrderDealManager().get_total_buy_money(code)
@@ -134,16 +153,23 @@
         temp_time_dict.clear()
 
         __start_time = time.time()
-        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
         # 璁剧疆鎴愪氦浠�
         try:
             current_price_process_manager.set_trade_price(code, fdatas[-1][0][1])
-            if limit_up_price > fdatas[-1][0][1]:
-                # 娌℃湁娑ㄥ仠
-                EveryLimitupBigDealOrderManager.open_limit_up(code, f"鏈�鏂版垚浜や环锛歿fdatas[-1][0][1]}")
-                radical_buy_strategy.clear_data(code)
-        except:
-            pass
+            if not fdatas[-1][2]:
+                if code not in cls.__not_limit_up_time_dict:
+                    cls.__not_limit_up_time_dict[code] = fdatas[-1][5]
+                last_time = cls.__not_limit_up_time_dict[code]
+                # 鐐告澘鏃堕棿鎸佺画500ms浠ヤ笂绠楃偢鏉�
+                if tool.trade_time_sub_with_ms(fdatas[-1][5], last_time) > 500:
+                    # 娌℃湁娑ㄥ仠
+                    EveryLimitupBigDealOrderManager.open_limit_up(code, f"鏈�鏂版垚浜や环锛歿fdatas[-1][0][1]}")
+                    radical_buy_strategy.clear_data(code, msg=f"娌℃湁娑ㄥ仠锛歿fdatas[-1][0]}")
+            else:
+                if code in cls.__not_limit_up_time_dict:
+                    cls.__not_limit_up_time_dict.pop(code)
+        except Exception as e:
+            async_log_util.error(logger_debug, f"L2鎴愪氦寮�鏉胯绠楅敊璇細{str(e)}")
 
         total_datas = l2_data_util.local_today_datas.get(code)
         use_time_list = []
@@ -171,21 +197,12 @@
             _start_time = time.time()
 
             try:
+                last_data = fdatas[-1]
                 # 缁熻涓婃澘鏃堕棿
-                try:
-                    for d in fdatas:
-                        if d[1]:
-                            # 涓诲姩涔�
-                            if d[2]:
-                                # 娑ㄥ仠
-                                current_price_process_manager.set_latest_not_limit_up_time(code, d[5])
-                        else:
-                            # 涓诲姩鍗栵紙鏉夸笂锛�
-                            if d[2]:
-                                L2LimitUpSellDataManager.clear_data(code)
-                                break
-                except:
-                    pass
+                if last_data[1] and last_data[2]:
+                    current_price_process_manager.set_latest_not_limit_up_time(code, last_data[5])
+                if not last_data[1] and last_data[2]:
+                    L2LimitUpSellDataManager.clear_data(code)
                 big_sell_order_info = None
                 # 缁熻鍗栧崟
                 big_sell_order_info = HuaXinSellOrderStatisticManager.statistic_continue_limit_up_sell_transaction_datas(
@@ -239,11 +256,10 @@
             if buy_progress_index is not None:
                 buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
                                                                                   total_datas)
-                async_log_util.info(logger_l2_trade_buy_queue, "鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}", code,
-                                    buy_progress_index)
+                l2_log.info(code, logger_l2_trade_buy_queue, "鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}", code, buy_progress_index)
                 if is_placed_order:
-                    NewGCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
-                                                                  buy_progress_index)
+                    # NewGCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
+                    #                                               buy_progress_index)
                     LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
                                                                buy_progress_index,
                                                                total_datas)
@@ -268,11 +284,10 @@
                                                                 cancel_type=trade_constant.CANCEL_TYPE_W)
                         except:
                             pass
-
-                    SCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
-                                                                  buy_progress_index)
-                    HourCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
-                                                                     buy_progress_index)
+                    # SCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
+                    #                                               buy_progress_index)
+                    # HourCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
+                    #                                                  buy_progress_index)
             else:
                 pass
             if is_placed_order:
@@ -344,7 +359,7 @@
         # =====鏍煎紡鍖栨暟鎹�=====
         # 鏁村舰鏁版嵁锛屾牸寮忥細[(鏁版嵁鏈韩, 鏄惁涓诲姩涔�, 鏄惁娑ㄥ仠, 鎬绘垚浜ら, 涓嶅惈ms鏃堕棿锛屽惈ms鏃堕棿)]
         use_time_list = []
-        __start_time = int(time.time()*1000)
+        __start_time = int(time.time() * 1000)
         fdatas = [
             [d, d[6] > d[7], limit_up_price == d[1], d[1] * d[2], '', '']
             for d in o_datas]
@@ -356,7 +371,7 @@
             d[4] = d[5][:8]
         temp_time_dict.clear()
         _start_time = int(time.time() * 1000)
-        use_time_list.append((_start_time - __start_time , "鏁版嵁鏁村舰"))
+        use_time_list.append((_start_time - __start_time, "鏁版嵁鏁村舰"))
 
         try:
 
@@ -364,7 +379,7 @@
             # 璁剧疆鎴愪氦浠�
             try:
                 current_price_process_manager.set_trade_price(code, fdatas[-1][0][1])
-                if limit_up_price > fdatas[-1][0][1]:
+                if not fdatas[-1][2]:
                     # 娌℃湁娑ㄥ仠
                     EveryLimitupBigDealOrderManager.open_limit_up(code, f"鏈�鏂版垚浜や环锛歿fdatas[-1][0][1]}")
                     radical_buy_strategy.clear_data(code)
@@ -379,8 +394,7 @@
                     current_price_process_manager.set_latest_not_limit_up_time(code, last_data[5])
                 elif not last_data[1] and last_data[2]:
                     # 娑ㄥ仠涓诲姩鍗�
-                    if last_data[2]:
-                        L2LimitUpSellDataManager.clear_data(code)
+                    L2LimitUpSellDataManager.clear_data(code)
             except:
                 pass
 
@@ -395,7 +409,7 @@
                 _start_time = int(time.time() * 1000)
                 use_time_list.append((_start_time - __start_time, "澶勭悊娑ㄥ仠鍗�"))
                 # 鍥炶皟鏁版嵁
-                if filter_datas:
+                if filter_datas is not None:
                     l2_log.info(code, logger_l2_trade, f"鏈�鍚庝竴绗旀定鍋滃崠琚悆锛歿filter_datas[0]}")
                     data_callback.l2_trade_single_callback.OnLastLimitUpSellDeal(code, filter_datas[0][0])
 
@@ -423,4 +437,4 @@
             _start_time = int(time.time() * 1000)
             if _start_time - __start_time > 5:
                 l2_log.info(code, hx_logger_l2_upload,
-                            f"{code}澶勭悊鎴愪氦鐢ㄦ椂锛歿_start_time - __start_time} 鏁版嵁鏁伴噺锛歿len(fdatas)}  璇︽儏:{use_time_list}")
\ No newline at end of file
+                            f"{code}澶勭悊鎴愪氦鐢ㄦ椂锛歿_start_time - __start_time} 鏁版嵁鏁伴噺锛歿len(fdatas)}  璇︽儏:{use_time_list}")

--
Gitblit v1.8.0