From 7effd6cbe7ba570c91fc47ff3971df6fb686759d Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期三, 30 七月 2025 11:23:51 +0800
Subject: [PATCH] bug修复

---
 l2/l2_transaction_data_processor.py |  167 ++++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 110 insertions(+), 57 deletions(-)

diff --git a/l2/l2_transaction_data_processor.py b/l2/l2_transaction_data_processor.py
index 89dbfa3..2a62378 100644
--- a/l2/l2_transaction_data_processor.py
+++ b/l2/l2_transaction_data_processor.py
@@ -33,13 +33,22 @@
 class HuaXinTransactionDatasProcessor:
     __statistic_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=constant.HUAXIN_L2_MAX_CODES_COUNT + 2)
     __TradeBuyQueue = transaction_progress.TradeBuyQueue()
+    # 闈炴定鍋滄垚浜ゆ椂闂�
+    __not_limit_up_time_dict = {}
 
     # 璁$畻鎴愪氦杩涘害
     @classmethod
-    def __compute_latest_trade_progress(cls, code, fdatas):
+    def __compute_latest_trade_progress(cls, code, fdatas, buy_exec_index=None):
+        """
+        璁$畻鐪熷疄涓嬪崟浣嶇疆
+        @param code:
+        @param fdatas:
+        @param buy_exec_index:
+        @return:鐪熷疄涓嬪崟浣嶇疆, 鏄惁鏄繎浼艰绠�
+        """
         buyno_map = l2_data_util.local_today_buyno_map.get(code)
         if not buyno_map:
-            return None
+            return None, False
         buy_progress_index = None
         for i in range(len(fdatas) - 1, -1, -1):
             d = fdatas[i]
@@ -49,7 +58,37 @@
                 if L2DataUtil.is_limit_up_price_buy(buyno_map[buy_no]["val"]):
                     buy_progress_index = buyno_map[buy_no]["index"]
                 break
-        return buy_progress_index
+        if buy_progress_index is None and buy_exec_index is not None and buy_exec_index >= 0:
+            # 娌℃湁鎵惧埌鐪熷疄鎴愪氦杩涘害浣嶄笖鏈変拱鍏ユ墽琛屼綅缃�
+            # 鏍规嵁鏈�杩戠殑鎴愪氦涔板崟鍙疯绠楃湡瀹炴垚浜や綅缃�
+            try:
+                latest_buy_order_no = fdatas[-1][0][6]
+                total_datas = l2_data_util.local_today_datas.get(code)
+                if tool.trade_time_sub(total_datas[-1]["val"]["time"], total_datas[buy_exec_index]["val"]["time"]) < 60:
+                    # 涓嬪崟60s鍐呮墠杩欐牱璁$畻
+                    # 鍙栨渶鏂扮殑鎴愪氦涔板崟鍙凤紝鍦ㄤ袱涓定鍋滃鎵樹拱涔嬮棿鐨勫悗涓�涓暟鎹�
+                    index_1 = None
+                    max_index = total_datas[-1]["index"]
+                    for i in range(max_index, -1, -1):
+                        data = total_datas[i]
+                        val = data['val']
+                        if not L2DataUtil.is_limit_up_price_buy(val):
+                            continue
+                        if val['orderNo'] < latest_buy_order_no:
+                            index_1 = i
+                            break
+                    if index_1 is not None:
+                        for i in range(index_1 + 1, max_index + 1):
+                            data = total_datas[i]
+                            val = data['val']
+                            if not L2DataUtil.is_limit_up_price_buy(val):
+                                continue
+                            if val['orderNo'] > latest_buy_order_no:
+                                buy_progress_index = i
+                                return buy_progress_index, True
+            except  Exception as e:
+                async_log_util.info(logger_debug, f"璁$畻鐪熷疄鎴愪氦杩涘害浣嶅嚭閿欙細{str(e)}")
+        return buy_progress_index, False
 
     @classmethod
     def statistic_big_order_infos(cls, code, fdatas, order_begin_pos: OrderBeginPosInfo):
@@ -60,9 +99,11 @@
         @return:
         """
 
-        @dask.delayed
         def statistic_big_buy_data():
+            use_time_list = []
+            __start_time = time.time()
             buy_datas, bigger_buy_datas = HuaXinBuyOrderManager.statistic_big_buy_data(code, fdatas, limit_up_price)
+            use_time_list.append((time.time() - __start_time, "涔板崟缁熻"))
             if buy_datas:
                 BigOrderDealManager().add_buy_datas(code, buy_datas)
                 active_big_buy_orders = []
@@ -72,6 +113,7 @@
                             # (涔板崟鍙�, 鎴愪氦閲戦, 鏈�鍚庢垚浜ゆ椂闂�)
                             active_big_buy_orders.append((x[0], x[2], x[4]))
                 EveryLimitupBigDealOrderManager.add_big_buy_order_deal(code, active_big_buy_orders)
+                use_time_list.append((time.time() - __start_time, "涔板崟缁熻缁撴灉澶勭悊"))
             try:
                 is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos)
                 if is_placed_order:
@@ -89,26 +131,40 @@
                                                                            order_begin_pos.buy_single_index)
             except Exception as e:
                 logger_debug.exception(e)
+            if use_time_list and use_time_list[-1][0] > 0.005:
+                l2_log.info(code, hx_logger_l2_upload,
+                            f"涔板崟缁熻+澶勭悊鑰楁椂锛歿use_time_list[-1][0]}  璇︽儏:{use_time_list}")
+
             return buy_datas
 
-        @dask.delayed
         def statistic_big_sell_data():
+            use_time_list = []
+            __start_time = time.time()
             sell_datas = HuaXinSellOrderStatisticManager.statistic_big_sell_data(code, fdatas)
             if sell_datas:
                 BigOrderDealManager().add_sell_datas(code, sell_datas)
+            use_time_list.append((time.time() - __start_time, "鍗栧崟缁熻"))
+            if use_time_list and use_time_list[-1][0] > 0.005:
+                l2_log.info(code, hx_logger_l2_upload,
+                            f"鍗栧崟缁熻+澶勭悊鑰楁椂锛歿use_time_list[-1][0]}  璇︽儏:{use_time_list}")
             return sell_datas
 
-        @dask.delayed
         def statistic_big_data(f1_, f2_):
             temp_data = f1_, f2_
             return temp_data
 
         limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
-        # 骞惰澶勭悊涔板崟涓庡崠鍗�
-        f1 = statistic_big_buy_data()
-        f2 = statistic_big_sell_data()
-        dask_result = statistic_big_data(f1, f2)
-        buy_datas, sell_datas = dask_result.compute()
+
+        if False and len(fdatas) > 100:
+            # 骞惰澶勭悊涔板崟涓庡崠鍗�
+            # 瓒呰繃100鏉℃暟鎹墠闇�瑕佸苟琛屽鐞�
+            f1 = dask.delayed(statistic_big_buy_data)()
+            f2 = dask.delayed(statistic_big_sell_data)()
+            dask_result = dask.delayed(statistic_big_data)(f1, f2)
+            buy_datas, sell_datas = dask_result.compute()
+        else:
+            buy_datas = statistic_big_buy_data()
+            sell_datas = statistic_big_sell_data()
         # L鎾ょ殑姣斾緥涓庝拱鍗栧ぇ鍗曟棤鐩存帴鍏崇郴浜�
         # if buy_datas or sell_datas:
         #     buy_money = BigOrderDealManager().get_total_buy_money(code)
@@ -134,16 +190,23 @@
         temp_time_dict.clear()
 
         __start_time = time.time()
-        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
         # 璁剧疆鎴愪氦浠�
         try:
             current_price_process_manager.set_trade_price(code, fdatas[-1][0][1])
-            if limit_up_price > fdatas[-1][0][1]:
-                # 娌℃湁娑ㄥ仠
-                EveryLimitupBigDealOrderManager.open_limit_up(code, f"鏈�鏂版垚浜や环锛歿fdatas[-1][0][1]}")
-                radical_buy_strategy.clear_data(code)
-        except:
-            pass
+            if not fdatas[-1][2]:
+                if code not in cls.__not_limit_up_time_dict:
+                    cls.__not_limit_up_time_dict[code] = fdatas[-1][5]
+                last_time = cls.__not_limit_up_time_dict[code]
+                # 鐐告澘鏃堕棿鎸佺画500ms浠ヤ笂绠楃偢鏉�
+                if tool.trade_time_sub_with_ms(fdatas[-1][5], last_time) > 500:
+                    # 娌℃湁娑ㄥ仠
+                    EveryLimitupBigDealOrderManager.open_limit_up(code, f"鏈�鏂版垚浜や环锛歿fdatas[-1][0][1]}")
+                    radical_buy_strategy.clear_data(code, msg=f"娌℃湁娑ㄥ仠锛歿fdatas[-1][0]}")
+            else:
+                if code in cls.__not_limit_up_time_dict:
+                    cls.__not_limit_up_time_dict.pop(code)
+        except Exception as e:
+            async_log_util.error(logger_debug, f"L2鎴愪氦寮�鏉胯绠楅敊璇細{str(e)}")
 
         total_datas = l2_data_util.local_today_datas.get(code)
         use_time_list = []
@@ -171,21 +234,12 @@
             _start_time = time.time()
 
             try:
+                last_data = fdatas[-1]
                 # 缁熻涓婃澘鏃堕棿
-                try:
-                    for d in fdatas:
-                        if d[1]:
-                            # 涓诲姩涔�
-                            if d[2]:
-                                # 娑ㄥ仠
-                                current_price_process_manager.set_latest_not_limit_up_time(code, d[5])
-                        else:
-                            # 涓诲姩鍗栵紙鏉夸笂锛�
-                            if d[2]:
-                                L2LimitUpSellDataManager.clear_data(code)
-                                break
-                except:
-                    pass
+                if last_data[1] and last_data[2]:
+                    current_price_process_manager.set_latest_not_limit_up_time(code, last_data[5])
+                if not last_data[1] and last_data[2]:
+                    L2LimitUpSellDataManager.clear_data(code)
                 big_sell_order_info = None
                 # 缁熻鍗栧崟
                 big_sell_order_info = HuaXinSellOrderStatisticManager.statistic_continue_limit_up_sell_transaction_datas(
@@ -234,16 +288,16 @@
             # if big_money_count > 0:
             #     LCancelRateManager.compute_big_num_deal_rate(code)
 
-            buy_progress_index = cls.__compute_latest_trade_progress(code, fdatas)
+            buy_progress_index, is_similar = cls.__compute_latest_trade_progress(code, fdatas,
+                                                                                 order_begin_pos.buy_exec_index)
 
             if buy_progress_index is not None:
                 buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
                                                                                   total_datas)
-                async_log_util.info(logger_l2_trade_buy_queue, "鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}", code,
-                                    buy_progress_index)
+                l2_log.info(code, logger_l2_trade_buy_queue, "鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{} is_similar-{}", code, buy_progress_index, is_similar)
                 if is_placed_order:
-                    NewGCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
-                                                                  buy_progress_index)
+                    # NewGCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
+                    #                                               buy_progress_index)
                     LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
                                                                buy_progress_index,
                                                                total_datas)
@@ -268,11 +322,10 @@
                                                                 cancel_type=trade_constant.CANCEL_TYPE_W)
                         except:
                             pass
-
-                    SCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
-                                                                  buy_progress_index)
-                    HourCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
-                                                                     buy_progress_index)
+                    # SCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
+                    #                                               buy_progress_index)
+                    # HourCancelBigNumComputer().set_transaction_index(code, order_begin_pos.buy_single_index,
+                    #                                                  buy_progress_index)
             else:
                 pass
             if is_placed_order:
@@ -324,18 +377,19 @@
             # 缁熻娑ㄥ仠涓诲姩鍗栨垚浜わ紝涓轰簡F鎾ゅ噯澶囨暟鎹�
             HuaXinSellOrderStatisticManager.statistic_active_sell_deal_volume(code, fdatas, limit_up_price)
             # 璁$畻鎴愪氦杩涘害
-            buy_progress_index = cls.__compute_latest_trade_progress(code, fdatas)
-            if buy_progress_index is not None:
+            _buy_progress_index, _is_similar = cls.__compute_latest_trade_progress(code, fdatas,
+                                                                                   order_begin_pos.buy_exec_index)
+            if _buy_progress_index is not None:
                 total_datas = l2_data_util.local_today_datas.get(code)
-                buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
+                buy_progress_index_changed = cls.__TradeBuyQueue.set_traded_index(code, _buy_progress_index,
                                                                                   total_datas)
-                async_log_util.info(logger_l2_trade_buy_queue, "鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}", code,
-                                    buy_progress_index)
+                async_log_util.info(logger_l2_trade_buy_queue, "鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{} similar-{}", code,
+                                    _buy_progress_index, _is_similar)
                 if is_placed_order:
                     LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index,
-                                                               buy_progress_index,
+                                                               _buy_progress_index,
                                                                total_datas)
-                    cancel_result = FCancelBigNumComputer().need_cancel_for_deal_fast(code, buy_progress_index)
+                    cancel_result = FCancelBigNumComputer().need_cancel_for_deal_fast(code, _buy_progress_index)
                     if cancel_result[0]:
                         L2TradeDataProcessor.cancel_buy(code, f"F鎾�:{cancel_result[1]}",
                                                         cancel_type=trade_constant.CANCEL_TYPE_F)
@@ -344,7 +398,7 @@
         # =====鏍煎紡鍖栨暟鎹�=====
         # 鏁村舰鏁版嵁锛屾牸寮忥細[(鏁版嵁鏈韩, 鏄惁涓诲姩涔�, 鏄惁娑ㄥ仠, 鎬绘垚浜ら, 涓嶅惈ms鏃堕棿锛屽惈ms鏃堕棿)]
         use_time_list = []
-        __start_time = int(time.time()*1000)
+        __start_time = int(time.time() * 1000)
         fdatas = [
             [d, d[6] > d[7], limit_up_price == d[1], d[1] * d[2], '', '']
             for d in o_datas]
@@ -356,7 +410,7 @@
             d[4] = d[5][:8]
         temp_time_dict.clear()
         _start_time = int(time.time() * 1000)
-        use_time_list.append((_start_time - __start_time , "鏁版嵁鏁村舰"))
+        use_time_list.append((_start_time - __start_time, "鏁版嵁鏁村舰"))
 
         try:
 
@@ -364,7 +418,7 @@
             # 璁剧疆鎴愪氦浠�
             try:
                 current_price_process_manager.set_trade_price(code, fdatas[-1][0][1])
-                if limit_up_price > fdatas[-1][0][1]:
+                if not fdatas[-1][2]:
                     # 娌℃湁娑ㄥ仠
                     EveryLimitupBigDealOrderManager.open_limit_up(code, f"鏈�鏂版垚浜や环锛歿fdatas[-1][0][1]}")
                     radical_buy_strategy.clear_data(code)
@@ -379,8 +433,7 @@
                     current_price_process_manager.set_latest_not_limit_up_time(code, last_data[5])
                 elif not last_data[1] and last_data[2]:
                     # 娑ㄥ仠涓诲姩鍗�
-                    if last_data[2]:
-                        L2LimitUpSellDataManager.clear_data(code)
+                    L2LimitUpSellDataManager.clear_data(code)
             except:
                 pass
 
@@ -395,7 +448,7 @@
                 _start_time = int(time.time() * 1000)
                 use_time_list.append((_start_time - __start_time, "澶勭悊娑ㄥ仠鍗�"))
                 # 鍥炶皟鏁版嵁
-                if filter_datas:
+                if filter_datas is not None:
                     l2_log.info(code, logger_l2_trade, f"鏈�鍚庝竴绗旀定鍋滃崠琚悆锛歿filter_datas[0]}")
                     data_callback.l2_trade_single_callback.OnLastLimitUpSellDeal(code, filter_datas[0][0])
 
@@ -404,7 +457,7 @@
 
                 # 濡傛灉鏄鍔ㄤ拱灏辨洿鏂版垚浜よ繘搴�
                 if not fdatas[-1][1]:
-                    buy_progress_index = cls.__compute_latest_trade_progress(code, fdatas)
+                    buy_progress_index, is_similar = cls.__compute_latest_trade_progress(code, fdatas)
                     if buy_progress_index is not None:
                         total_datas = l2_data_util.local_today_datas.get(code)
                         cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index,
@@ -423,4 +476,4 @@
             _start_time = int(time.time() * 1000)
             if _start_time - __start_time > 5:
                 l2_log.info(code, hx_logger_l2_upload,
-                            f"{code}澶勭悊鎴愪氦鐢ㄦ椂锛歿_start_time - __start_time} 鏁版嵁鏁伴噺锛歿len(fdatas)}  璇︽儏:{use_time_list}")
\ No newline at end of file
+                            f"{code}澶勭悊鎴愪氦鐢ㄦ椂锛歿_start_time - __start_time} 鏁版嵁鏁伴噺锛歿len(fdatas)}  璇︽儏:{use_time_list}")

--
Gitblit v1.8.0