From 326e0b138c00fe14ab860441b2e05f8c3c37576f Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期日, 25 五月 2025 12:01:56 +0800 Subject: [PATCH] 大单足够下单方式改变 --- l2_data_parser.py | 30 +++++++++++++++++++++--------- 1 files changed, 21 insertions(+), 9 deletions(-) diff --git a/l2_data_parser.py b/l2_data_parser.py index e96b4ed..423ee49 100644 --- a/l2_data_parser.py +++ b/l2_data_parser.py @@ -5,6 +5,8 @@ import os import sys import time +from multiprocessing import Pool + import pandas as pd from db import mysql_data_delegate as mysql_data @@ -224,6 +226,9 @@ # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰 writer.writerow(row) +def test1(args): + index, df = args + print(index) def pre_process_transactions(csv_path="E:/娴嬭瘯鏁版嵁/Transaction_Test.csv"): def str_to_float(s): @@ -260,16 +265,24 @@ 'EndPrice': group['TradePrice'].iloc[-1] }) + + + dtype = { 'SecurityID': 'category', # 浣庡熀鏁板垎绫绘暟鎹� } - chunk_size = 100000 + chunk_size = 10000 # 鍒涘缓DataFrame chunks = pd.read_csv(csv_path, chunksize=chunk_size) + indexed_data = list(enumerate(chunks)) + # 鏂板啓娉� + with Pool(processes=4) as pool: + pool.map(test1, indexed_data) + result_list = [] - index = 0 - for df in chunks: - index += 1 + for chunk_index, chunk in enumerate(chunks): + df = chunk.copy() + index = chunk_index + 1 child_path = csv_path.replace(".csv", f"_{index}.csv") if os.path.exists(child_path): continue @@ -279,7 +292,6 @@ df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] # 璁$畻鎴愪氦閲戦 df['TradeAmount'] = df['TradePrice'] * df['TradeVolume'] - # 鎸塖ecurityID鍜孊uyNo鍒嗙粍 grouped = df.groupby(['SecurityID', 'BuyNo']) @@ -319,7 +331,7 @@ # [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp] - chunk_size = 200000 + chunk_size = 10000 # 鍒涘缓DataFrame chunks = pd.read_csv(csv_path, chunksize=chunk_size) result_list = [] @@ -346,13 +358,13 @@ print(f"澶勭悊瀹屾瘯锛屾�诲叡{index}鎵�") -if __name__ == '__main__1': +if __name__ == '__main__': # df = pd.read_csv(f"E:/娴嬭瘯鏁版嵁/Transaction_Test.csv") - pre_process_ngtstick() + pre_process_transactions() # 鍛戒护妯″紡 /home/userzjj/app/gp-server/l2_data_parser Transaction 2025-05-08 # 瑙f瀽澶у崟锛� /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/鏈�缁堟垚浜ゆ暟鎹�20250509.txt 000555 -if __name__ == '__main__': +if __name__ == '__main__1': if len(sys.argv) > 1: params = sys.argv[1:] print("鎺ユ敹鐨勫弬鏁�", params) -- Gitblit v1.8.0