From 638865a032eb403bdf61a5808acb579a25ec7be1 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 23 五月 2025 00:30:55 +0800 Subject: [PATCH] L2成交数据解析 --- l2_data_parser.py | 77 ++++++++++++++++++++++++++++++++++---- 1 files changed, 69 insertions(+), 8 deletions(-) diff --git a/l2_data_parser.py b/l2_data_parser.py index abdbe66..8018fc5 100644 --- a/l2_data_parser.py +++ b/l2_data_parser.py @@ -10,6 +10,7 @@ from db import mysql_data_delegate as mysql_data from huaxin_client.l2_client_test import L2TransactionDataManager from log_module import log_export +from utils import tool def __get_target_codes(day): @@ -50,13 +51,6 @@ fdatas = [] df = pd.read_csv(f"{base_path}/Transaction.csv") category_revenue = df.groupby('BuyNo')['TradeVolume'].sum() - total_count = 0 - for chunk in chunks: - total_count += len(chunk) - for chunk in chunks: - total_count += len(chunk) - - with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) @@ -231,9 +225,74 @@ writer.writerow(row) +def test(csv_path="E:/娴嬭瘯鏁版嵁/Transaction_Test.csv"): + def str_to_float(s): + try: + # 绉婚櫎鍗曚綅骞惰浆鎹� + return round(float(s.split("@")[0]), 2) + except: + return float("nan") + + def code_format(s): + try: + code = "{0:0>6}".format(s) + return code + except: + return '' + + # [ExchangeID, SecurityID, TradeTime, TradePrice, TradeVolume, ExecType, MainSeq, SubSeq, BuyNo, SellNo, Info1, Info2, + # Info3, TradeBSFlag, BizIndex, LocalTimeStamp] + # transaction_data = { + # "SecurityID": ['300920', '300920', '300920', '300920'], + # "TradeTime": [91500040, 91500041, 91500042, 92000040], + # "TradePrice": [15.0, 16.2, 15.2, 16.3], + # "TradeVolume": [100, 100, 200, 300], + # "BuyNo": [0, 1, 1, 1] + # } + # 瀹氫箟鑱氬悎鍑芥暟 + def first_last(group): + return pd.Series({ + 'TotalAmount': group['TradeAmount'].sum(), + 'TotalVolume': group['TradeVolume'].sum(), + 'StartTime': group['TradeTime'].iloc[0], + 'StartPrice': group['TradePrice'].iloc[0], + 'EndTime': group['TradeTime'].iloc[-1], + 'EndPrice': group['TradePrice'].iloc[-1] + }) + + dtype = { + 'SecurityID': 'category', # 浣庡熀鏁板垎绫绘暟鎹� + } + chunk_size = 100000 + # 鍒涘缓DataFrame + chunks = pd.read_csv(csv_path, chunksize=chunk_size) + result_list = [] + index = 0 + for df in chunks: + index += 1 + print(f"澶勭悊绗瑊index}鎵规") + df["TradePrice"] = df["TradePrice"].apply(str_to_float) + df["SecurityID"] = df["SecurityID"].apply(code_format) + # 璁$畻鎴愪氦閲戦 + df['TradeAmount'] = df['TradePrice'] * df['TradeVolume'] + + # 鎸塖ecurityID鍜孊uyNo鍒嗙粍 + grouped = df.groupby(['SecurityID', 'BuyNo']) + + # 搴旂敤鑱氬悎鍑芥暟 + chunk_result = grouped.apply(first_last).reset_index() + child_path = csv_path.replace(".csv", f"_{index}.csv") + chunk_result.to_csv(child_path, index=False) + print(f"澶勭悊瀹屾瘯锛屾�诲叡{index}鎵�") + + +if __name__ == '__main__': + # df = pd.read_csv(f"E:/娴嬭瘯鏁版嵁/Transaction_Test.csv") + test() + # 鍛戒护妯″紡 /home/userzjj/app/gp-server/l2_data_parser Transaction 2025-05-08 # 瑙f瀽澶у崟锛� /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/鏈�缁堟垚浜ゆ暟鎹�20250509.txt 000555 -if __name__ == '__main__': +if __name__ == '__main__1': if len(sys.argv) > 1: params = sys.argv[1:] print("鎺ユ敹鐨勫弬鏁�", params) @@ -251,6 +310,8 @@ parse_ngtstick(day) elif _type == 'MarketData': parse_market_data(day) + elif _type == 'Transaction_New': + test(f"/home/userzjj/ftp/{day}/Transaction.csv") elif _type == 'ExtractDealBigOrder': # 鎻愬彇鎵�鏈夋垚浜ょ殑澶у崟 if len(params) > 2: -- Gitblit v1.8.0