From cc2c7a8514d3d6eea4f73b5875c2500883d165f0 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 26 五月 2025 22:57:20 +0800 Subject: [PATCH] 增加自动加白接口 --- data_parser/transaction_big_order_parser.py | 203 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 203 insertions(+), 0 deletions(-) diff --git a/data_parser/transaction_big_order_parser.py b/data_parser/transaction_big_order_parser.py new file mode 100644 index 0000000..48be723 --- /dev/null +++ b/data_parser/transaction_big_order_parser.py @@ -0,0 +1,203 @@ +""" +澶у崟鎴愪氦鏁版嵁瑙f瀽鍣� +""" +import os +import re +from multiprocessing import Pool + +import pandas as pd + +from utils import tool + + +class BigOrderDealParser: + + @classmethod + def str_to_float(cls, s): + try: + # 绉婚櫎鍗曚綅骞惰浆鎹� + return round(float(s.split("@")[0]), 2) + except: + return float("nan") + + @classmethod + def code_format(cls, s): + try: + code = "{0:0>6}".format(s) + return code + except: + return '' + + +def __pre_process_transactions_detail(args): + def first_last(group): + """ + 鑾峰彇绗竴鏉℃暟鎹笌鏈�鍚庝竴鏉� + @param group: + @return: + """ + return pd.Series({ + 'TotalAmount': group['TradeAmount'].sum(), + 'TotalVolume': group['TradeVolume'].sum(), + 'StartTime': group['TradeTime'].iloc[0], + 'StartPrice': group['TradePrice'].iloc[0], + 'EndTime': group['TradeTime'].iloc[-1], + 'EndPrice': group['TradePrice'].iloc[-1] + }) + + chunk_index, chunk_data = args[0] + csv_path = args[1] + df = chunk_data.copy() + index = chunk_index + 1 + children_dir = csv_path.replace(".csv", "") + if not os.path.exists(children_dir): + os.makedirs(children_dir) + child_path = os.path.join(children_dir, f"{index}.csv") + if os.path.exists(child_path): + return + print(f"澶勭悊Transaction绗瑊index}鎵规", os.getpid()) + df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float) + df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) + df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] + # 璁$畻鎴愪氦閲戦 + df['TradeAmount'] = df['TradePrice'] * df['TradeVolume'] + df = df[df["TradeAmount"] > 0] + # 鍒ゆ柇鏄惁涓虹┖ + # print(df.empty) + # 鎸塖ecurityID鍜孊uyNo鍒嗙粍 + grouped = df.groupby(['SecurityID', 'BuyNo']) + # 搴旂敤鑱氬悎鍑芥暟 + chunk_result = grouped.apply(first_last) + if not df.empty: + chunk_result = chunk_result.reset_index() + chunk_result.to_csv(child_path, index=False) + + +def pre_process_transactions(csv_path, process_count=4): + chunk_size = 200000 + # 鍒涘缓DataFrame + chunks = pd.read_csv(csv_path, chunksize=chunk_size) + indexed_data = list(enumerate(chunks)) + args = [(x, csv_path) for x in indexed_data] + # 鏂板啓娉� + with Pool(processes=process_count) as pool: + pool.map(__pre_process_transactions_detail, args) + + +def __pre_process_ngtsticks(args): + def first_last(group): + return pd.Series({ + 'TotalAmount': group['TradeMoney'].sum(), + 'TotalVolume': group['Volume'].sum(), + 'StartTime': group['TickTime'].iloc[0], + 'StartPrice': group['Price'].iloc[0], + 'EndTime': group['TickTime'].iloc[-1], + 'EndPrice': group['Price'].iloc[-1] + }) + + chunk_index, chunk_data = args[0] + csv_path = args[1] + + df = chunk_data.copy() + index = chunk_index + 1 + children_dir = csv_path.replace(".csv", "") + if not os.path.exists(children_dir): + os.makedirs(children_dir) + child_path = os.path.join(children_dir, f"{index}.csv") + if os.path.exists(child_path): + return + print(f"澶勭悊NGTSTick绗瑊index}鎵规") + df = df[df["TickType"] == 'T'] + df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float) + df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) + df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] + # 璁$畻鎴愪氦閲戦 + df['TradeMoney'] = df["TradeMoney"].apply(BigOrderDealParser.str_to_float) + + # 鎸塖ecurityID鍜孊uyNo鍒嗙粍 + grouped = df.groupby(['SecurityID', 'BuyNo']) + # 搴旂敤鑱氬悎鍑芥暟 + chunk_result = grouped.apply(first_last) + if not df.empty: + chunk_result = chunk_result.reset_index() + chunk_result.to_csv(child_path, index=False) + + +def pre_process_ngtsticks(csv_path, process_count=4): + chunk_size = 200000 + # 鍒涘缓DataFrame + chunks = pd.read_csv(csv_path, chunksize=chunk_size) + indexed_data = list(enumerate(chunks)) + args = [(x, csv_path) for x in indexed_data] + # 鏂板啓娉� + with Pool(processes=process_count) as pool: + pool.map(__pre_process_ngtsticks, args) + + +def __concat_pre_datas(dir_path): + """ + 鎷兼帴鏁版嵁 + @param dir_path: + @return: + """ + combined_path = os.path.join(dir_path, 'combined.csv') + if os.path.exists(combined_path): + print("鍚堝苟鐨勭洰鏍囨枃浠跺凡瀛樺湪") + return + file_list = os.listdir(dir_path) + file_list.sort(key=lambda x: int(re.findall(r'\d+', x)[0])) + df_list = [] + for file in file_list: + df = pd.read_csv(os.path.join(dir_path, file)) + if df.empty: + continue + df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) + df_list.append(df) + print("鍑嗗鍚堝苟鐨勬枃浠舵暟閲忥細", len(df_list)) + + combined_df = pd.concat(df_list, ignore_index=True) + + print("鍚堝苟瀹屾垚锛屽噯澶囧啓鍏ユ枃浠讹紒") + # 淇濆瓨缁撴灉 + combined_df.to_csv(combined_path, index=False) + print("鍐欏叆鏂囦欢瀹屾垚锛�") + + +def concat_pre_transactions(dir_path): + __concat_pre_datas(dir_path) + + +def concat_pre_ngtsticks(dir_path): + __concat_pre_datas(dir_path) + + +def process_combined_transaction(dir_path): + """ + 澶勭悊鎷兼帴璧锋潵鐨勬暟鎹� + @param dir_path: + @return: + """ + combined_path = os.path.join(dir_path, 'combined.csv') + if not os.path.exists(combined_path): + print("鎷兼帴鏁版嵁涓嶅瓨鍦�") + return + df = pd.read_csv(combined_path) + df_copy = df.copy() + grouped = df_copy.groupby(["SecurityID"]) + # 搴旂敤鑱氬悎鍑芥暟 + chunk_result = grouped.apply(pd.Series({})) + # chunk_result["SecurityID"] = chunk_result["SecurityID"].apply(BigOrderDealParser.code_format) + print(chunk_result.to_string( + index=False, # 涓嶆樉绀虹储寮� + justify='left', # 宸﹀榻� + float_format='%.3f' # 娴偣鏁版牸寮� + )) + + + + +if __name__ == "__main__": + # pre_process_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test.csv") + # pre_process_ngtsticks("E:/娴嬭瘯鏁版嵁/NGTSTick_Test.csv") + # concat_pre_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test") + process_combined_transaction("E:/娴嬭瘯鏁版嵁/Transaction_Test") -- Gitblit v1.8.0