From 2f2516749615da866e96d8d24e499b7ecbb63a3e Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 23 六月 2025 12:28:52 +0800 Subject: [PATCH] 默认交易模式变更/真实下单位置计算位置修改 --- data_parser/transaction_big_order_parser.py | 163 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 files changed, 158 insertions(+), 5 deletions(-) diff --git a/data_parser/transaction_big_order_parser.py b/data_parser/transaction_big_order_parser.py index 2ff872e..84dae46 100644 --- a/data_parser/transaction_big_order_parser.py +++ b/data_parser/transaction_big_order_parser.py @@ -2,11 +2,16 @@ 澶у崟鎴愪氦鏁版嵁瑙f瀽鍣� """ import os +import re from multiprocessing import Pool import pandas as pd from utils import tool + + +def print_log(*args): + print(f"[{tool.get_now_time_str()}]", *args) class BigOrderDealParser: @@ -54,7 +59,7 @@ child_path = os.path.join(children_dir, f"{index}.csv") if os.path.exists(child_path): return - print(f"澶勭悊Transaction绗瑊index}鎵规", os.getpid()) + print_log(f"澶勭悊Transaction绗瑊index}鎵规", os.getpid()) df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float) df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] @@ -62,7 +67,7 @@ df['TradeAmount'] = df['TradePrice'] * df['TradeVolume'] df = df[df["TradeAmount"] > 0] # 鍒ゆ柇鏄惁涓虹┖ - # print(df.empty) + # print_log(df.empty) # 鎸塖ecurityID鍜孊uyNo鍒嗙粍 grouped = df.groupby(['SecurityID', 'BuyNo']) # 搴旂敤鑱氬悎鍑芥暟 @@ -105,7 +110,7 @@ child_path = os.path.join(children_dir, f"{index}.csv") if os.path.exists(child_path): return - print(f"澶勭悊NGTSTick绗瑊index}鎵规") + log(f"澶勭悊NGTSTick绗瑊index}鎵规") df = df[df["TickType"] == 'T'] df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float) df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) @@ -141,9 +146,10 @@ """ combined_path = os.path.join(dir_path, 'combined.csv') if os.path.exists(combined_path): + print_log("鍚堝苟鐨勭洰鏍囨枃浠跺凡瀛樺湪") return file_list = os.listdir(dir_path) - file_list.sort(key=lambda x: int(x.split(".")[0])) + file_list.sort(key=lambda x: int(re.findall(r'\d+', x)[0])) df_list = [] for file in file_list: df = pd.read_csv(os.path.join(dir_path, file)) @@ -151,17 +157,164 @@ continue df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) df_list.append(df) + print_log("鍑嗗鍚堝苟鐨勬枃浠舵暟閲忥細", len(df_list)) combined_df = pd.concat(df_list, ignore_index=True) + + print_log("鍚堝苟瀹屾垚锛屽噯澶囧啓鍏ユ枃浠讹紒") # 淇濆瓨缁撴灉 combined_df.to_csv(combined_path, index=False) + print_log("鍐欏叆鏂囦欢瀹屾垚锛�") def concat_pre_transactions(dir_path): __concat_pre_datas(dir_path) +def concat_pre_ngtsticks(dir_path): + __concat_pre_datas(dir_path) + + +def process_combined_transaction(dir_path): + """ + 澶勭悊鎷兼帴璧锋潵鐨勬暟鎹� + @param dir_path: + @return: + """ + combined_path = os.path.join(dir_path, 'combined.csv') + if not os.path.exists(combined_path): + print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�") + return + df = pd.read_csv(combined_path) + df_copy = df.copy() + grouped = df_copy.groupby(["SecurityID"]) + # 搴旂敤鑱氬悎鍑芥暟 + chunk_result = grouped.apply(pd.Series({})) + # chunk_result["SecurityID"] = chunk_result["SecurityID"].apply(BigOrderDealParser.code_format) + print_log(chunk_result.to_string( + index=False, # 涓嶆樉绀虹储寮� + justify='left', # 宸﹀榻� + float_format='%.3f' # 娴偣鏁版牸寮� + )) + + +__combined_df_cache = {} + + +def extract_big_order_of_all(dir_path, process_count=4): + combined_path = os.path.join(dir_path, 'combined.csv') + if not os.path.exists(combined_path): + print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�") + return + codes = extract_big_order_codes(dir_path) + print_log("鎬讳唬鐮佹暟閲忥細", len(codes)) + for code in codes: + extract_big_order_of_code(dir_path, code) + + combined_path = os.path.join(dir_path, 'combined.csv') + if not os.path.exists(combined_path): + print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�") + return + output_path = os.path.join(dir_path, f"big_buy_{code}.csv") + if os.path.exists(output_path): + print_log("璺緞宸插瓨鍦�:", output_path) + return + df = __combined_df_cache.get(combined_path, None) + if df is None: + df = pd.read_csv(combined_path) + __combined_df_cache[combined_path] = df + args = [(code, df) for code in codes] + # 鏂板啓娉� + with Pool(processes=process_count) as pool: + pool.map(__extract_big_order_of_code, args) + + +def __extract_big_order_of_code(args): + def first_last(group): + """ + 鑾峰彇绗竴鏉℃暟鎹笌鏈�鍚庝竴鏉� + @param group: + @return: + """ + return pd.Series({ + 'SecurityID': group['SecurityID'].iloc[0], + 'BuyNo': group['BuyNo'].iloc[0], + 'TotalVolume': group['TotalVolume'].sum(), + 'TotalAmount': group['TotalAmount'].sum(), + 'EndTime': group['EndTime'].iloc[-1], + 'EndPrice': group['EndPrice'].iloc[-1], + 'StartTime': group['StartTime'].iloc[0], + 'StartPrice': group['StartPrice'].iloc[0] + }) + + dir_path, code, df = args[0], args[1], args[2] + output_path = os.path.join(dir_path, f"big_buy_{code}.csv") + if os.path.exists(output_path): + print_log("璺緞宸插瓨鍦�:", output_path) + return + df_copy = df.copy() + if code: + df_copy = df_copy[df_copy["SecurityID"] == int(code)] + if df_copy.empty: + print_log("鐩爣浠g爜瀵瑰簲鎴愪氦鏁版嵁涓虹┖") + return + df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format) + # 鎸塖ecurityID鍜孊uyNo鍒嗙粍 + grouped = df_copy.groupby(['SecurityID', 'BuyNo']) + grouped_result = grouped.apply(first_last) + grouped_result = grouped_result[grouped_result["TotalAmount"] > 500000] + # print_log(grouped_result) + # 閬嶅巻鍐呭 + grouped_result.to_csv(output_path, index=False) + print_log(f"[{tool.get_now_time_str()}]淇濆瓨鎴愬姛锛岃矾寰勶細{output_path}") + + +def extract_big_order_of_code(dir_path, code): + """ + 鎻愬彇浠g爜鐨勫ぇ鍗� + @param dir_path: 鏁版嵁鐩綍 + @param code: 涓虹┖琛ㄧず瀵煎嚭鍏ㄩ儴 + @return: + """ + + combined_path = os.path.join(dir_path, 'combined.csv') + if not os.path.exists(combined_path): + print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�") + return + + df = __combined_df_cache.get(combined_path, None) + if df is None: + df = pd.read_csv(combined_path) + __combined_df_cache[combined_path] = df + __extract_big_order_of_code((dir_path, code, df)) + + +def extract_big_order_codes(dir_path): + """ + 瀵煎嚭澶у崟浠g爜 + @param dir_path: 鏁版嵁鐩綍 + @param code: + @return: + """ + combined_path = os.path.join(dir_path, 'combined.csv') + if not os.path.exists(combined_path): + print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�") + return + df = pd.read_csv(combined_path) + df_copy = df.copy() + if df_copy.empty: + print_log("鐩爣浠g爜瀵瑰簲鎴愪氦鏁版嵁涓虹┖") + return + df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format) + # 鎸塖ecurityID鍜孊uyNo鍒嗙粍 + grouped = df_copy.groupby(['SecurityID']) + return set(grouped.groups.keys()) + + if __name__ == "__main__": + print_log(1, 2, 3) # pre_process_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test.csv") # pre_process_ngtsticks("E:/娴嬭瘯鏁版嵁/NGTSTick_Test.csv") - concat_pre_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test") + # concat_pre_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test") + # extract_big_order_codes("E:/娴嬭瘯鏁版嵁/Transaction_Test") + extract_big_order_of_all("E:/娴嬭瘯鏁版嵁/Transaction_Test") -- Gitblit v1.8.0