From 2f2516749615da866e96d8d24e499b7ecbb63a3e Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 23 六月 2025 12:28:52 +0800
Subject: [PATCH] 默认交易模式变更/真实下单位置计算位置修改

---
 data_parser/transaction_big_order_parser.py |  163 +++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 files changed, 154 insertions(+), 9 deletions(-)

diff --git a/data_parser/transaction_big_order_parser.py b/data_parser/transaction_big_order_parser.py
index b1d98d0..84dae46 100644
--- a/data_parser/transaction_big_order_parser.py
+++ b/data_parser/transaction_big_order_parser.py
@@ -10,6 +10,10 @@
 from utils import tool
 
 
+def print_log(*args):
+    print(f"[{tool.get_now_time_str()}]", *args)
+
+
 class BigOrderDealParser:
 
     @classmethod
@@ -55,7 +59,7 @@
     child_path = os.path.join(children_dir, f"{index}.csv")
     if os.path.exists(child_path):
         return
-    print(f"澶勭悊Transaction绗瑊index}鎵规", os.getpid())
+    print_log(f"澶勭悊Transaction绗瑊index}鎵规", os.getpid())
     df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float)
     df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
     df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
@@ -63,7 +67,7 @@
     df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
     df = df[df["TradeAmount"] > 0]
     # 鍒ゆ柇鏄惁涓虹┖
-    # print(df.empty)
+    # print_log(df.empty)
     # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
     grouped = df.groupby(['SecurityID', 'BuyNo'])
     # 搴旂敤鑱氬悎鍑芥暟
@@ -106,7 +110,7 @@
     child_path = os.path.join(children_dir, f"{index}.csv")
     if os.path.exists(child_path):
         return
-    print(f"澶勭悊NGTSTick绗瑊index}鎵规")
+    log(f"澶勭悊NGTSTick绗瑊index}鎵规")
     df = df[df["TickType"] == 'T']
     df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float)
     df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
@@ -142,7 +146,7 @@
     """
     combined_path = os.path.join(dir_path, 'combined.csv')
     if os.path.exists(combined_path):
-        print("鍚堝苟鐨勭洰鏍囨枃浠跺凡瀛樺湪")
+        print_log("鍚堝苟鐨勭洰鏍囨枃浠跺凡瀛樺湪")
         return
     file_list = os.listdir(dir_path)
     file_list.sort(key=lambda x: int(re.findall(r'\d+', x)[0]))
@@ -153,23 +157,164 @@
             continue
         df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
         df_list.append(df)
-    print("鍑嗗鍚堝苟鐨勬枃浠舵暟閲忥細", len(df_list))
+    print_log("鍑嗗鍚堝苟鐨勬枃浠舵暟閲忥細", len(df_list))
 
     combined_df = pd.concat(df_list, ignore_index=True)
 
-    print("鍚堝苟瀹屾垚锛屽噯澶囧啓鍏ユ枃浠讹紒")
+    print_log("鍚堝苟瀹屾垚锛屽噯澶囧啓鍏ユ枃浠讹紒")
     # 淇濆瓨缁撴灉
     combined_df.to_csv(combined_path, index=False)
-    print("鍐欏叆鏂囦欢瀹屾垚锛�")
+    print_log("鍐欏叆鏂囦欢瀹屾垚锛�")
 
 
 def concat_pre_transactions(dir_path):
     __concat_pre_datas(dir_path)
 
+
 def concat_pre_ngtsticks(dir_path):
     __concat_pre_datas(dir_path)
 
+
+def process_combined_transaction(dir_path):
+    """
+    澶勭悊鎷兼帴璧锋潵鐨勬暟鎹�
+    @param dir_path:
+    @return:
+    """
+    combined_path = os.path.join(dir_path, 'combined.csv')
+    if not os.path.exists(combined_path):
+        print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
+        return
+    df = pd.read_csv(combined_path)
+    df_copy = df.copy()
+    grouped = df_copy.groupby(["SecurityID"])
+    # 搴旂敤鑱氬悎鍑芥暟
+    chunk_result = grouped.apply(pd.Series({}))
+    # chunk_result["SecurityID"] = chunk_result["SecurityID"].apply(BigOrderDealParser.code_format)
+    print_log(chunk_result.to_string(
+        index=False,  # 涓嶆樉绀虹储寮�
+        justify='left',  # 宸﹀榻�
+        float_format='%.3f'  # 娴偣鏁版牸寮�
+    ))
+
+
+__combined_df_cache = {}
+
+
+def extract_big_order_of_all(dir_path, process_count=4):
+    combined_path = os.path.join(dir_path, 'combined.csv')
+    if not os.path.exists(combined_path):
+        print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
+        return
+    codes = extract_big_order_codes(dir_path)
+    print_log("鎬讳唬鐮佹暟閲忥細", len(codes))
+    for code in codes:
+        extract_big_order_of_code(dir_path, code)
+
+    combined_path = os.path.join(dir_path, 'combined.csv')
+    if not os.path.exists(combined_path):
+        print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
+        return
+    output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
+    if os.path.exists(output_path):
+        print_log("璺緞宸插瓨鍦�:", output_path)
+        return
+    df = __combined_df_cache.get(combined_path, None)
+    if df is None:
+        df = pd.read_csv(combined_path)
+        __combined_df_cache[combined_path] = df
+    args = [(code, df) for code in codes]
+    # 鏂板啓娉�
+    with Pool(processes=process_count) as pool:
+        pool.map(__extract_big_order_of_code, args)
+
+
+def __extract_big_order_of_code(args):
+    def first_last(group):
+        """
+            鑾峰彇绗竴鏉℃暟鎹笌鏈�鍚庝竴鏉�
+            @param group:
+            @return:
+            """
+        return pd.Series({
+            'SecurityID': group['SecurityID'].iloc[0],
+            'BuyNo': group['BuyNo'].iloc[0],
+            'TotalVolume': group['TotalVolume'].sum(),
+            'TotalAmount': group['TotalAmount'].sum(),
+            'EndTime': group['EndTime'].iloc[-1],
+            'EndPrice': group['EndPrice'].iloc[-1],
+            'StartTime': group['StartTime'].iloc[0],
+            'StartPrice': group['StartPrice'].iloc[0]
+        })
+
+    dir_path, code, df = args[0], args[1], args[2]
+    output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
+    if os.path.exists(output_path):
+        print_log("璺緞宸插瓨鍦�:", output_path)
+        return
+    df_copy = df.copy()
+    if code:
+        df_copy = df_copy[df_copy["SecurityID"] == int(code)]
+    if df_copy.empty:
+        print_log("鐩爣浠g爜瀵瑰簲鎴愪氦鏁版嵁涓虹┖")
+        return
+    df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
+    # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
+    grouped = df_copy.groupby(['SecurityID', 'BuyNo'])
+    grouped_result = grouped.apply(first_last)
+    grouped_result = grouped_result[grouped_result["TotalAmount"] > 500000]
+    # print_log(grouped_result)
+    # 閬嶅巻鍐呭
+    grouped_result.to_csv(output_path, index=False)
+    print_log(f"[{tool.get_now_time_str()}]淇濆瓨鎴愬姛锛岃矾寰勶細{output_path}")
+
+
+def extract_big_order_of_code(dir_path, code):
+    """
+    鎻愬彇浠g爜鐨勫ぇ鍗�
+    @param dir_path: 鏁版嵁鐩綍
+    @param code: 涓虹┖琛ㄧず瀵煎嚭鍏ㄩ儴
+    @return:
+    """
+
+    combined_path = os.path.join(dir_path, 'combined.csv')
+    if not os.path.exists(combined_path):
+        print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
+        return
+
+    df = __combined_df_cache.get(combined_path, None)
+    if df is None:
+        df = pd.read_csv(combined_path)
+        __combined_df_cache[combined_path] = df
+    __extract_big_order_of_code((dir_path, code, df))
+
+
+def extract_big_order_codes(dir_path):
+    """
+    瀵煎嚭澶у崟浠g爜
+    @param dir_path: 鏁版嵁鐩綍
+    @param code:
+    @return:
+    """
+    combined_path = os.path.join(dir_path, 'combined.csv')
+    if not os.path.exists(combined_path):
+        print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�")
+        return
+    df = pd.read_csv(combined_path)
+    df_copy = df.copy()
+    if df_copy.empty:
+        print_log("鐩爣浠g爜瀵瑰簲鎴愪氦鏁版嵁涓虹┖")
+        return
+    df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
+    # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
+    grouped = df_copy.groupby(['SecurityID'])
+    return set(grouped.groups.keys())
+
+
 if __name__ == "__main__":
-    pre_process_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test.csv")
+    print_log(1, 2, 3)
+    # pre_process_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test.csv")
     # pre_process_ngtsticks("E:/娴嬭瘯鏁版嵁/NGTSTick_Test.csv")
-    concat_pre_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test")
+    # concat_pre_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test")
+    # extract_big_order_codes("E:/娴嬭瘯鏁版嵁/Transaction_Test")
+    extract_big_order_of_all("E:/娴嬭瘯鏁版嵁/Transaction_Test")

--
Gitblit v1.8.0