From 3b7e932024a2e4771ce7303576931f52ba39f1a9 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 23 五月 2025 01:32:43 +0800 Subject: [PATCH] bug修复 --- l2_data_parser.py | 63 ++++++++++++++++++++++++++++++- 1 files changed, 60 insertions(+), 3 deletions(-) diff --git a/l2_data_parser.py b/l2_data_parser.py index c5b75ca..700dc21 100644 --- a/l2_data_parser.py +++ b/l2_data_parser.py @@ -225,7 +225,7 @@ writer.writerow(row) -def test(csv_path="E:/娴嬭瘯鏁版嵁/Transaction_Test.csv"): +def pre_process_transactions(csv_path="E:/娴嬭瘯鏁版嵁/Transaction_Test.csv"): def str_to_float(s): try: # 绉婚櫎鍗曚綅骞惰浆鎹� @@ -270,6 +270,9 @@ index = 0 for df in chunks: index += 1 + child_path = csv_path.replace(".csv", f"_{index}.csv") + if os.path.exists(child_path): + continue print(f"澶勭悊绗瑊index}鎵规") df["TradePrice"] = df["TradePrice"].apply(str_to_float) df["SecurityID"] = df["SecurityID"].apply(code_format) @@ -281,14 +284,66 @@ # 搴旂敤鑱氬悎鍑芥暟 chunk_result = grouped.apply(first_last).reset_index() + + chunk_result.to_csv(child_path, index=False) + print(f"澶勭悊瀹屾瘯锛屾�诲叡{index}鎵�") + + +def pre_process_ngtstick(csv_path="E:/娴嬭瘯鏁版嵁/NGTSTick_Test.csv"): + def str_to_float(s): + try: + # 绉婚櫎鍗曚綅骞惰浆鎹� + return round(float(s.split("@")[0]), 2) + except: + return float("nan") + + def code_format(s): + try: + code = "{0:0>6}".format(s) + return code + except: + return '' + + # 瀹氫箟鑱氬悎鍑芥暟 + def first_last(group): + return pd.Series({ + 'TotalAmount': group['TradeMoney'].sum(), + 'TotalVolume': group['Volume'].sum(), + 'StartTime': group['TickTime'].iloc[0], + 'StartPrice': group['Price'].iloc[0], + 'EndTime': group['TickTime'].iloc[-1], + 'EndPrice': group['Price'].iloc[-1] + }) + + # [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp] + + chunk_size = 200000 + # 鍒涘缓DataFrame + chunks = pd.read_csv(csv_path, chunksize=chunk_size) + result_list = [] + index = 0 + for df in chunks: + index += 1 child_path = csv_path.replace(".csv", f"_{index}.csv") + if os.path.exists(child_path): + continue + print(f"澶勭悊绗瑊index}鎵规") + df = df[df["TickType"] == 'T'] + df["Price"] = df["Price"].apply(str_to_float) + df["SecurityID"] = df["SecurityID"].apply(code_format) + # 璁$畻鎴愪氦閲戦 + df['TradeMoney'] = df["TradeMoney"].apply(str_to_float) + # 鎸塖ecurityID鍜孊uyNo鍒嗙粍 + grouped = df.groupby(['SecurityID', 'BuyNo']) + # 搴旂敤鑱氬悎鍑芥暟 + chunk_result = grouped.apply(first_last).reset_index() chunk_result.to_csv(child_path, index=False) print(f"澶勭悊瀹屾瘯锛屾�诲叡{index}鎵�") if __name__ == '__main__1': # df = pd.read_csv(f"E:/娴嬭瘯鏁版嵁/Transaction_Test.csv") - test() + pre_process_ngtstick() # 鍛戒护妯″紡 /home/userzjj/app/gp-server/l2_data_parser Transaction 2025-05-08 # 瑙f瀽澶у崟锛� /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/鏈�缁堟垚浜ゆ暟鎹�20250509.txt 000555 @@ -311,7 +366,9 @@ elif _type == 'MarketData': parse_market_data(day) elif _type == 'Transaction_New': - test(f"/home/userzjj/ftp/{day}/Transaction.csv") + pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv") + elif _type == 'NGTSTick_New': + pre_process_transactions(f"/home/userzjj/ftp/{day}/NGTSTick.csv") elif _type == 'ExtractDealBigOrder': # 鎻愬彇鎵�鏈夋垚浜ょ殑澶у崟 if len(params) > 2: -- Gitblit v1.8.0