From e3f4cf3bacac6eda31ddea6aaf70ebb883788817 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期三, 28 五月 2025 17:38:07 +0800
Subject: [PATCH] L2成交大单解析

---
 l2_data_parser.py                           |    8 +++-
 l2_test.py                                  |    4 +-
 data_parser/transaction_big_order_parser.py |   59 ++++++++++++++++++++---------
 3 files changed, 49 insertions(+), 22 deletions(-)

diff --git a/data_parser/transaction_big_order_parser.py b/data_parser/transaction_big_order_parser.py
index aad957a..84dae46 100644
--- a/data_parser/transaction_big_order_parser.py
+++ b/data_parser/transaction_big_order_parser.py
@@ -201,7 +201,7 @@
 __combined_df_cache = {}
 
 
-def extract_big_order_of_all(dir_path):
+def extract_big_order_of_all(dir_path, process_count=4):
     combined_path = os.path.join(dir_path, 'combined.csv')
     if not os.path.exists(combined_path):
         print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
@@ -211,15 +211,25 @@
     for code in codes:
         extract_big_order_of_code(dir_path, code)
 
+    combined_path = os.path.join(dir_path, 'combined.csv')
+    if not os.path.exists(combined_path):
+        print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
+        return
+    output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
+    if os.path.exists(output_path):
+        print_log("璺緞宸插瓨鍦�:", output_path)
+        return
+    df = __combined_df_cache.get(combined_path, None)
+    if df is None:
+        df = pd.read_csv(combined_path)
+        __combined_df_cache[combined_path] = df
+    args = [(code, df) for code in codes]
+    # 鏂板啓娉�
+    with Pool(processes=process_count) as pool:
+        pool.map(__extract_big_order_of_code, args)
 
-def extract_big_order_of_code(dir_path, code):
-    """
-    鎻愬彇浠g爜鐨勫ぇ鍗�
-    @param dir_path: 鏁版嵁鐩綍
-    @param code: 涓虹┖琛ㄧず瀵煎嚭鍏ㄩ儴
-    @return:
-    """
 
+def __extract_big_order_of_code(args):
     def first_last(group):
         """
             鑾峰彇绗竴鏉℃暟鎹笌鏈�鍚庝竴鏉�
@@ -237,18 +247,11 @@
             'StartPrice': group['StartPrice'].iloc[0]
         })
 
-    combined_path = os.path.join(dir_path, 'combined.csv')
-    if not os.path.exists(combined_path):
-        print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
-        return
+    dir_path, code, df = args[0], args[1], args[2]
     output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
     if os.path.exists(output_path):
         print_log("璺緞宸插瓨鍦�:", output_path)
         return
-    df = __combined_df_cache.get(combined_path, None)
-    if df is None:
-        df = pd.read_csv(combined_path)
-        __combined_df_cache[combined_path] = df
     df_copy = df.copy()
     if code:
         df_copy = df_copy[df_copy["SecurityID"] == int(code)]
@@ -264,6 +267,26 @@
     # 閬嶅巻鍐呭
     grouped_result.to_csv(output_path, index=False)
     print_log(f"[{tool.get_now_time_str()}]淇濆瓨鎴愬姛锛岃矾寰勶細{output_path}")
+
+
+def extract_big_order_of_code(dir_path, code):
+    """
+    鎻愬彇浠g爜鐨勫ぇ鍗�
+    @param dir_path: 鏁版嵁鐩綍
+    @param code: 涓虹┖琛ㄧず瀵煎嚭鍏ㄩ儴
+    @return:
+    """
+
+    combined_path = os.path.join(dir_path, 'combined.csv')
+    if not os.path.exists(combined_path):
+        print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
+        return
+
+    df = __combined_df_cache.get(combined_path, None)
+    if df is None:
+        df = pd.read_csv(combined_path)
+        __combined_df_cache[combined_path] = df
+    __extract_big_order_of_code((dir_path, code, df))
 
 
 def extract_big_order_codes(dir_path):
@@ -289,9 +312,9 @@
 
 
 if __name__ == "__main__":
-    log(1,2,3)
+    print_log(1, 2, 3)
     # pre_process_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test.csv")
     # pre_process_ngtsticks("E:/娴嬭瘯鏁版嵁/NGTSTick_Test.csv")
     # concat_pre_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test")
     # extract_big_order_codes("E:/娴嬭瘯鏁版嵁/Transaction_Test")
-    extract_big_order_of_code("E:/娴嬭瘯鏁版嵁/Transaction_Test")
+    extract_big_order_of_all("E:/娴嬭瘯鏁版嵁/Transaction_Test")
diff --git a/l2_data_parser.py b/l2_data_parser.py
index 3d72c80..5433d19 100644
--- a/l2_data_parser.py
+++ b/l2_data_parser.py
@@ -407,8 +407,12 @@
             # 鍛戒护妯″紡  /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09
             # 鏍规嵁code鎻愬彇澶у崟
             if not code:
-                transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/NGTSTick")
-                transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/Transaction")
+                # indexed_data = [f"/home/userzjj/ftp/{day}/NGTSTick", f"/home/userzjj/ftp/{day}/Transaction"]
+                # # 鏂板啓娉�
+                # with Pool(processes=2) as pool:
+                #     pool.map(transaction_big_order_parser.extract_big_order_of_all, indexed_data)
+                transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/NGTSTick", process_count=10)
+                transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/Transaction", process_count=10)
             else:
                 if tool.is_sh_code(code):
                     transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/NGTSTick", code)
diff --git a/l2_test.py b/l2_test.py
index 8db7571..161d834 100644
--- a/l2_test.py
+++ b/l2_test.py
@@ -124,8 +124,8 @@
             volume = zylt_volume_map.get(code)
             # 浠婃棩娑ㄥ仠浠疯绐佺牬鏄ㄦ棩鏈�楂樹环
             k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day)
-            if k_bars and 30e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
-                # 鑷敱娴侀�氬競鍊煎湪30浜�-300浜夸互涓�
+            if k_bars and 10e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
+                # 鑷敱娴侀�氬競鍊煎湪10浜�-300浜夸互涓�
                 limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2)
                 if limit_up_price > k_bars[0]["high"]:
                     # 浠婃棩娑ㄥ仠浠疯绐佺牬鏄ㄦ棩鏈�楂樹环

--
Gitblit v1.8.0