From 48fb7a00951f91bdc707e5dd2d196e5bccb752c3 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 18 六月 2025 18:41:30 +0800 Subject: [PATCH] 异常保护 --- l2_data_parser.py | 66 ++++++++++++++++++++++++-------- 1 files changed, 49 insertions(+), 17 deletions(-) diff --git a/l2_data_parser.py b/l2_data_parser.py index 254c8c6..3cab7b0 100644 --- a/l2_data_parser.py +++ b/l2_data_parser.py @@ -5,8 +5,11 @@ import os import sys import time +from multiprocessing import Pool + import pandas as pd +from data_parser import transaction_big_order_parser from db import mysql_data_delegate as mysql_data from huaxin_client.l2_client_test import L2TransactionDataManager from log_module import log_export @@ -225,6 +228,11 @@ writer.writerow(row) +def test1(args): + index, df = args + print(index) + + def pre_process_transactions(csv_path="E:/娴嬭瘯鏁版嵁/Transaction_Test.csv"): def str_to_float(s): try: @@ -263,19 +271,25 @@ dtype = { 'SecurityID': 'category', # 浣庡熀鏁板垎绫绘暟鎹� } - chunk_size = 100000 + chunk_size = 10000 # 鍒涘缓DataFrame chunks = pd.read_csv(csv_path, chunksize=chunk_size) + indexed_data = list(enumerate(chunks)) + # 鏂板啓娉� + with Pool(processes=4) as pool: + pool.map(test1, indexed_data) + result_list = [] - index = 0 - for df in chunks: - index += 1 + for chunk_index, chunk in enumerate(chunks): + df = chunk.copy() + index = chunk_index + 1 child_path = csv_path.replace(".csv", f"_{index}.csv") if os.path.exists(child_path): continue print(f"澶勭悊绗瑊index}鎵规") df["TradePrice"] = df["TradePrice"].apply(str_to_float) df["SecurityID"] = df["SecurityID"].apply(code_format) + df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] # 璁$畻鎴愪氦閲戦 df['TradeAmount'] = df['TradePrice'] * df['TradeVolume'] @@ -317,7 +331,7 @@ # [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp] - chunk_size = 200000 + chunk_size = 10000 # 鍒涘缓DataFrame chunks = pd.read_csv(csv_path, chunksize=chunk_size) result_list = [] @@ -331,6 +345,9 @@ df = df[df["TickType"] == 'T'] df["Price"] = df["Price"].apply(str_to_float) df["SecurityID"] = df["SecurityID"].apply(code_format) + + df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] + # 璁$畻鎴愪氦閲戦 df['TradeMoney'] = df["TradeMoney"].apply(str_to_float) # 鎸塖ecurityID鍜孊uyNo鍒嗙粍 @@ -343,7 +360,7 @@ if __name__ == '__main__1': # df = pd.read_csv(f"E:/娴嬭瘯鏁版嵁/Transaction_Test.csv") - pre_process_ngtstick() + pre_process_transactions() # 鍛戒护妯″紡 /home/userzjj/app/gp-server/l2_data_parser Transaction 2025-05-08 # 瑙f瀽澶у崟锛� /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/鏈�缁堟垚浜ゆ暟鎹�20250509.txt 000555 @@ -366,18 +383,33 @@ elif _type == 'MarketData': parse_market_data(day) elif _type == 'Transaction_New': - pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv") - elif _type == 'NGTSTick_New': - pre_process_ngtstick(f"/home/userzjj/ftp/{day}/NGTSTick.csv") - elif _type == 'ExtractDealBigOrder': - # 鎻愬彇鎵�鏈夋垚浜ょ殑澶у崟 if len(params) > 2: - save_path = params[2].strip() + process_count = int(params[2].strip()) else: - save_path = None + process_count = 4 - if len(params) > 3: - target_code = params[3].strip() + transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv", + process_count=process_count) + transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction") + elif _type == 'NGTSTick_New': + if len(params) > 2: + process_count = int(params[2].strip()) else: - target_code = None - parse_deal_big_orders(day, save_path, target_code) + process_count = 4 + transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv", + process_count=process_count) + transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick") + elif _type == 'Transaction_Concat': + transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction") + elif _type == 'NGTSTick_Concat': + transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick") + elif _type == 'ExtractDealBigOrder': + # 鍛戒护妯″紡 /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 + if len(params) > 2: + process_count = int(params[2].strip()) + else: + process_count = 10 + transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/NGTSTick", + process_count=process_count) + transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/Transaction", + process_count=process_count) -- Gitblit v1.8.0