From 2f2516749615da866e96d8d24e499b7ecbb63a3e Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 23 六月 2025 12:28:52 +0800
Subject: [PATCH] 默认交易模式变更/真实下单位置计算位置修改

---
 data_parser/transaction_big_order_parser.py |  118 +++++++++++++++++++++++++++++++++++++----------------------
 1 files changed, 74 insertions(+), 44 deletions(-)

diff --git a/data_parser/transaction_big_order_parser.py b/data_parser/transaction_big_order_parser.py
index 31f3eb9..84dae46 100644
--- a/data_parser/transaction_big_order_parser.py
+++ b/data_parser/transaction_big_order_parser.py
@@ -10,6 +10,10 @@
 from utils import tool
 
 
+def print_log(*args):
+    print(f"[{tool.get_now_time_str()}]", *args)
+
+
 class BigOrderDealParser:
 
     @classmethod
@@ -55,7 +59,7 @@
     child_path = os.path.join(children_dir, f"{index}.csv")
     if os.path.exists(child_path):
         return
-    print(f"澶勭悊Transaction绗瑊index}鎵规", os.getpid())
+    print_log(f"澶勭悊Transaction绗瑊index}鎵规", os.getpid())
     df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float)
     df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
     df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
@@ -63,7 +67,7 @@
     df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
     df = df[df["TradeAmount"] > 0]
     # 鍒ゆ柇鏄惁涓虹┖
-    # print(df.empty)
+    # print_log(df.empty)
     # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
     grouped = df.groupby(['SecurityID', 'BuyNo'])
     # 搴旂敤鑱氬悎鍑芥暟
@@ -106,7 +110,7 @@
     child_path = os.path.join(children_dir, f"{index}.csv")
     if os.path.exists(child_path):
         return
-    print(f"澶勭悊NGTSTick绗瑊index}鎵规")
+    log(f"澶勭悊NGTSTick绗瑊index}鎵规")
     df = df[df["TickType"] == 'T']
     df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float)
     df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
@@ -142,7 +146,7 @@
     """
     combined_path = os.path.join(dir_path, 'combined.csv')
     if os.path.exists(combined_path):
-        print("鍚堝苟鐨勭洰鏍囨枃浠跺凡瀛樺湪")
+        print_log("鍚堝苟鐨勭洰鏍囨枃浠跺凡瀛樺湪")
         return
     file_list = os.listdir(dir_path)
     file_list.sort(key=lambda x: int(re.findall(r'\d+', x)[0]))
@@ -153,14 +157,14 @@
             continue
         df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
         df_list.append(df)
-    print("鍑嗗鍚堝苟鐨勬枃浠舵暟閲忥細", len(df_list))
+    print_log("鍑嗗鍚堝苟鐨勬枃浠舵暟閲忥細", len(df_list))
 
     combined_df = pd.concat(df_list, ignore_index=True)
 
-    print("鍚堝苟瀹屾垚锛屽噯澶囧啓鍏ユ枃浠讹紒")
+    print_log("鍚堝苟瀹屾垚锛屽噯澶囧啓鍏ユ枃浠讹紒")
     # 淇濆瓨缁撴灉
     combined_df.to_csv(combined_path, index=False)
-    print("鍐欏叆鏂囦欢瀹屾垚锛�")
+    print_log("鍐欏叆鏂囦欢瀹屾垚锛�")
 
 
 def concat_pre_transactions(dir_path):
@@ -179,7 +183,7 @@
     """
     combined_path = os.path.join(dir_path, 'combined.csv')
     if not os.path.exists(combined_path):
-        print("鎷兼帴鏁版嵁涓嶅瓨鍦�")
+        print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
         return
     df = pd.read_csv(combined_path)
     df_copy = df.copy()
@@ -187,21 +191,45 @@
     # 搴旂敤鑱氬悎鍑芥暟
     chunk_result = grouped.apply(pd.Series({}))
     # chunk_result["SecurityID"] = chunk_result["SecurityID"].apply(BigOrderDealParser.code_format)
-    print(chunk_result.to_string(
+    print_log(chunk_result.to_string(
         index=False,  # 涓嶆樉绀虹储寮�
         justify='left',  # 宸﹀榻�
         float_format='%.3f'  # 娴偣鏁版牸寮�
     ))
 
 
-def extract_big_order_of_code(dir_path, code=None):
-    """
-    鎻愬彇浠g爜鐨勫ぇ鍗�
-    @param dir_path: 鏁版嵁鐩綍
-    @param code: 涓虹┖琛ㄧず瀵煎嚭鍏ㄩ儴
-    @return:
-    """
+__combined_df_cache = {}
 
+
+def extract_big_order_of_all(dir_path, process_count=4):
+    combined_path = os.path.join(dir_path, 'combined.csv')
+    if not os.path.exists(combined_path):
+        print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
+        return
+    codes = extract_big_order_codes(dir_path)
+    print_log("鎬讳唬鐮佹暟閲忥細", len(codes))
+    for code in codes:
+        extract_big_order_of_code(dir_path, code)
+
+    combined_path = os.path.join(dir_path, 'combined.csv')
+    if not os.path.exists(combined_path):
+        print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
+        return
+    output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
+    if os.path.exists(output_path):
+        print_log("璺緞宸插瓨鍦�:", output_path)
+        return
+    df = __combined_df_cache.get(combined_path, None)
+    if df is None:
+        df = pd.read_csv(combined_path)
+        __combined_df_cache[combined_path] = df
+    args = [(code, df) for code in codes]
+    # 鏂板啓娉�
+    with Pool(processes=process_count) as pool:
+        pool.map(__extract_big_order_of_code, args)
+
+
+def __extract_big_order_of_code(args):
     def first_last(group):
         """
             鑾峰彇绗竴鏉℃暟鎹笌鏈�鍚庝竴鏉�
@@ -219,35 +247,46 @@
             'StartPrice': group['StartPrice'].iloc[0]
         })
 
-    combined_path = os.path.join(dir_path, 'combined.csv')
-    if not os.path.exists(combined_path):
-        print("鎷兼帴鏁版嵁涓嶅瓨鍦�")
-        return
-
-    if code:
-        output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
-    else:
-        output_path = os.path.join(dir_path, f"big_buy.csv")
-
+    dir_path, code, df = args[0], args[1], args[2]
+    output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
     if os.path.exists(output_path):
-        print("璺緞宸插瓨鍦�")
+        print_log("璺緞宸插瓨鍦�:", output_path)
         return
-    df = pd.read_csv(combined_path)
     df_copy = df.copy()
     if code:
         df_copy = df_copy[df_copy["SecurityID"] == int(code)]
     if df_copy.empty:
-        print("鐩爣浠g爜瀵瑰簲鎴愪氦鏁版嵁涓虹┖")
+        print_log("鐩爣浠g爜瀵瑰簲鎴愪氦鏁版嵁涓虹┖")
         return
     df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
     # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
     grouped = df_copy.groupby(['SecurityID', 'BuyNo'])
     grouped_result = grouped.apply(first_last)
     grouped_result = grouped_result[grouped_result["TotalAmount"] > 500000]
-    # print(grouped_result)
+    # print_log(grouped_result)
     # 閬嶅巻鍐呭
     grouped_result.to_csv(output_path, index=False)
-    print(f"淇濆瓨鎴愬姛锛岃矾寰勶細{output_path}")
+    print_log(f"[{tool.get_now_time_str()}]淇濆瓨鎴愬姛锛岃矾寰勶細{output_path}")
+
+
+def extract_big_order_of_code(dir_path, code):
+    """
+    鎻愬彇浠g爜鐨勫ぇ鍗�
+    @param dir_path: 鏁版嵁鐩綍
+    @param code: 涓虹┖琛ㄧず瀵煎嚭鍏ㄩ儴
+    @return:
+    """
+
+    combined_path = os.path.join(dir_path, 'combined.csv')
+    if not os.path.exists(combined_path):
+        print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
+        return
+
+    df = __combined_df_cache.get(combined_path, None)
+    if df is None:
+        df = pd.read_csv(combined_path)
+        __combined_df_cache[combined_path] = df
+    __extract_big_order_of_code((dir_path, code, df))
 
 
 def extract_big_order_codes(dir_path):
@@ -257,24 +296,14 @@
     @param code:
     @return:
     """
-
-    def first_last(group):
-        """
-            鑾峰彇绗竴鏉℃暟鎹笌鏈�鍚庝竴鏉�
-            @param group:
-            @return:
-            """
-        return pd.Series({
-        })
-
     combined_path = os.path.join(dir_path, 'combined.csv')
     if not os.path.exists(combined_path):
-        print("鎷兼帴鏁版嵁涓嶅瓨鍦�")
+        print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
         return
     df = pd.read_csv(combined_path)
     df_copy = df.copy()
     if df_copy.empty:
-        print("鐩爣浠g爜瀵瑰簲鎴愪氦鏁版嵁涓虹┖")
+        print_log("鐩爣浠g爜瀵瑰簲鎴愪氦鏁版嵁涓虹┖")
         return
     df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
     # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
@@ -283,8 +312,9 @@
 
 
 if __name__ == "__main__":
+    print_log(1, 2, 3)
     # pre_process_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test.csv")
     # pre_process_ngtsticks("E:/娴嬭瘯鏁版嵁/NGTSTick_Test.csv")
     # concat_pre_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test")
     # extract_big_order_codes("E:/娴嬭瘯鏁版嵁/Transaction_Test")
-    extract_big_order_of_code("E:/娴嬭瘯鏁版嵁/Transaction_Test")
+    extract_big_order_of_all("E:/娴嬭瘯鏁版嵁/Transaction_Test")

--
Gitblit v1.8.0