From 48fb7a00951f91bdc707e5dd2d196e5bccb752c3 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 18 六月 2025 18:41:30 +0800 Subject: [PATCH] 异常保护 --- l2_data_parser.py | 212 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 files changed, 191 insertions(+), 21 deletions(-) diff --git a/l2_data_parser.py b/l2_data_parser.py index 32f7851..3cab7b0 100644 --- a/l2_data_parser.py +++ b/l2_data_parser.py @@ -5,10 +5,15 @@ import os import sys import time +from multiprocessing import Pool +import pandas as pd + +from data_parser import transaction_big_order_parser from db import mysql_data_delegate as mysql_data from huaxin_client.l2_client_test import L2TransactionDataManager from log_module import log_export +from utils import tool def __get_target_codes(day): @@ -43,23 +48,35 @@ writer.writerow(row) -def parse_transaction(day, save=True): +def parse_transaction(day, save=True, parse_all=False): target_codes = __get_target_codes(day) base_path = f"/home/userzjj/ftp/{day}" fdatas = [] + df = pd.read_csv(f"{base_path}/Transaction.csv") + category_revenue = df.groupby('BuyNo')['TradeVolume'].sum() + with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) + total_lines = csv_reader.line_num + print("鎬昏鏁帮細", total_lines) # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp] headers = next(csv_reader) print("琛ㄥご:", headers) # 閬嶅巻鏁版嵁琛� _path = f"{base_path}/Transaction_filter.csv" + percent = 0 with open(_path, 'w', newline='', encoding='utf-8') as csvfile: # 鍒涘缓涓�涓� CSV 鍐欏叆鍣ㄥ璞� writer = csv.writer(csvfile) # 閫愯鍐欏叆鏁版嵁 + count = 0 for row in csv_reader: - if row[1] not in target_codes: + count += 1 + p = count * 100 // total_lines + if p != percent: + percent = p + print(f"**杩涘害锛歿percent}%") + if row[1] not in target_codes and not parse_all: continue # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰 if save: @@ -99,15 +116,15 @@ @param day: @return: """ - print("寮�濮嬪鐞嗘垚浜ゅぇ鍗曟暟鎹�", day, big_order_path, target_code) + print("*******寮�濮嬪鐞嗘垚浜ゅぇ鍗曟暟鎹�", day, big_order_path, target_code) l2_data_manager_dict = {} # 瑙f瀽鏁版嵁 __start_time = time.time() - transaction_data = parse_transaction(day) - print("Transaction 璇诲彇瀹屾瘯", len(transaction_data), "鑰楁椂", int(time.time() - __start_time)) + transaction_data = parse_transaction(day, parse_all=True) + print("*******Transaction 璇诲彇瀹屾瘯", len(transaction_data), "鑰楁椂", int(time.time() - __start_time)) __start_time = time.time() ngtstick_data = parse_ngtstick(day) - print("NGTSTick 璇诲彇瀹屾瘯", len(ngtstick_data), "鑰楁椂", int(time.time() - __start_time)) + print("*******NGTSTick 璇诲彇瀹屾瘯", len(ngtstick_data), "鑰楁椂", int(time.time() - __start_time)) __start_time = time.time() big_order_list = [] for index, row in enumerate(transaction_data): @@ -124,7 +141,7 @@ continue if code not in l2_data_manager_dict: l2_data_manager_dict[code] = L2TransactionDataManager(code, True) - l2_data_manager_dict[code].add_transaction_data(item) + l2_data_manager_dict[code].add_transaction_data(item, big_order_money_threshold=100e4) if index % 100 == 0: # 璇诲彇闃熷垪涓殑鏁版嵁 l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] @@ -135,7 +152,7 @@ while not l2_data_manager.big_accurate_sell_order_queue.empty(): data = l2_data_manager.big_accurate_sell_order_queue.get() big_order_list.append((code, 1, data)) - print("Transaction 澶勭悊瀹屾瘯", len(big_order_list), "鑰楁椂", int(time.time() - __start_time)) + print("********Transaction 澶勭悊瀹屾瘯", len(big_order_list), "鑰楁椂", int(time.time() - __start_time)) __start_time = time.time() for index, row in enumerate(ngtstick_data): @@ -168,7 +185,7 @@ while not l2_data_manager.big_accurate_sell_order_queue.empty(): data = l2_data_manager.big_accurate_sell_order_queue.get() big_order_list.append((code, 1, data)) - print("NGTSTick 澶勭悊瀹屾瘯", len(big_order_list), "鑰楁椂", int(time.time() - __start_time)) + print("********NGTSTick 澶勭悊瀹屾瘯", len(big_order_list), "鑰楁椂", int(time.time() - __start_time)) __start_time = time.time() # 璇诲彇鍓╀綑鐨勬湭璇绘暟鎹殑浠g爜 for code in l2_data_manager_dict: @@ -180,13 +197,13 @@ while not l2_data_manager.big_accurate_sell_order_queue.empty(): data = l2_data_manager.big_accurate_sell_order_queue.get() big_order_list.append((code, 1, data)) - print("寮�濮嬪啓鍏ユ湰鍦版枃浠讹細", len(big_order_list), "鑰楁椂", int(time.time() - __start_time)) + print("********寮�濮嬪啓鍏ユ湰鍦版枃浠讹細", len(big_order_list), "鑰楁椂", int(time.time() - __start_time)) __start_time = time.time() # 寮�濮嬪啓鍏ユ湰鍦版枃浠� with open(big_order_path, mode='w', encoding='utf-8') as f: for order in big_order_list: - f.write(f"{order}") - print("鍐欏叆鏈湴鏂囦欢缁撴潫锛�", "鑰楁椂", int(time.time() - __start_time)) + f.write(f"{order}\n") + print("*******鍐欏叆鏈湴鏂囦欢缁撴潫锛�", "鑰楁椂", int(time.time() - __start_time)) __start_time = time.time() @@ -211,8 +228,142 @@ writer.writerow(row) +def test1(args): + index, df = args + print(index) + + +def pre_process_transactions(csv_path="E:/娴嬭瘯鏁版嵁/Transaction_Test.csv"): + def str_to_float(s): + try: + # 绉婚櫎鍗曚綅骞惰浆鎹� + return round(float(s.split("@")[0]), 2) + except: + return float("nan") + + def code_format(s): + try: + code = "{0:0>6}".format(s) + return code + except: + return '' + + # [ExchangeID, SecurityID, TradeTime, TradePrice, TradeVolume, ExecType, MainSeq, SubSeq, BuyNo, SellNo, Info1, Info2, + # Info3, TradeBSFlag, BizIndex, LocalTimeStamp] + # transaction_data = { + # "SecurityID": ['300920', '300920', '300920', '300920'], + # "TradeTime": [91500040, 91500041, 91500042, 92000040], + # "TradePrice": [15.0, 16.2, 15.2, 16.3], + # "TradeVolume": [100, 100, 200, 300], + # "BuyNo": [0, 1, 1, 1] + # } + # 瀹氫箟鑱氬悎鍑芥暟 + def first_last(group): + return pd.Series({ + 'TotalAmount': group['TradeAmount'].sum(), + 'TotalVolume': group['TradeVolume'].sum(), + 'StartTime': group['TradeTime'].iloc[0], + 'StartPrice': group['TradePrice'].iloc[0], + 'EndTime': group['TradeTime'].iloc[-1], + 'EndPrice': group['TradePrice'].iloc[-1] + }) + + dtype = { + 'SecurityID': 'category', # 浣庡熀鏁板垎绫绘暟鎹� + } + chunk_size = 10000 + # 鍒涘缓DataFrame + chunks = pd.read_csv(csv_path, chunksize=chunk_size) + indexed_data = list(enumerate(chunks)) + # 鏂板啓娉� + with Pool(processes=4) as pool: + pool.map(test1, indexed_data) + + result_list = [] + for chunk_index, chunk in enumerate(chunks): + df = chunk.copy() + index = chunk_index + 1 + child_path = csv_path.replace(".csv", f"_{index}.csv") + if os.path.exists(child_path): + continue + print(f"澶勭悊绗瑊index}鎵规") + df["TradePrice"] = df["TradePrice"].apply(str_to_float) + df["SecurityID"] = df["SecurityID"].apply(code_format) + df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] + # 璁$畻鎴愪氦閲戦 + df['TradeAmount'] = df['TradePrice'] * df['TradeVolume'] + + # 鎸塖ecurityID鍜孊uyNo鍒嗙粍 + grouped = df.groupby(['SecurityID', 'BuyNo']) + + # 搴旂敤鑱氬悎鍑芥暟 + chunk_result = grouped.apply(first_last).reset_index() + + chunk_result.to_csv(child_path, index=False) + print(f"澶勭悊瀹屾瘯锛屾�诲叡{index}鎵�") + + +def pre_process_ngtstick(csv_path="E:/娴嬭瘯鏁版嵁/NGTSTick_Test.csv"): + def str_to_float(s): + try: + # 绉婚櫎鍗曚綅骞惰浆鎹� + return round(float(s.split("@")[0]), 2) + except: + return float("nan") + + def code_format(s): + try: + code = "{0:0>6}".format(s) + return code + except: + return '' + + # 瀹氫箟鑱氬悎鍑芥暟 + def first_last(group): + return pd.Series({ + 'TotalAmount': group['TradeMoney'].sum(), + 'TotalVolume': group['Volume'].sum(), + 'StartTime': group['TickTime'].iloc[0], + 'StartPrice': group['Price'].iloc[0], + 'EndTime': group['TickTime'].iloc[-1], + 'EndPrice': group['Price'].iloc[-1] + }) + + # [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp] + + chunk_size = 10000 + # 鍒涘缓DataFrame + chunks = pd.read_csv(csv_path, chunksize=chunk_size) + result_list = [] + index = 0 + for df in chunks: + index += 1 + child_path = csv_path.replace(".csv", f"_{index}.csv") + if os.path.exists(child_path): + continue + print(f"澶勭悊绗瑊index}鎵规") + df = df[df["TickType"] == 'T'] + df["Price"] = df["Price"].apply(str_to_float) + df["SecurityID"] = df["SecurityID"].apply(code_format) + + df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] + + # 璁$畻鎴愪氦閲戦 + df['TradeMoney'] = df["TradeMoney"].apply(str_to_float) + # 鎸塖ecurityID鍜孊uyNo鍒嗙粍 + grouped = df.groupby(['SecurityID', 'BuyNo']) + # 搴旂敤鑱氬悎鍑芥暟 + chunk_result = grouped.apply(first_last).reset_index() + chunk_result.to_csv(child_path, index=False) + print(f"澶勭悊瀹屾瘯锛屾�诲叡{index}鎵�") + + +if __name__ == '__main__1': + # df = pd.read_csv(f"E:/娴嬭瘯鏁版嵁/Transaction_Test.csv") + pre_process_transactions() + # 鍛戒护妯″紡 /home/userzjj/app/gp-server/l2_data_parser Transaction 2025-05-08 -# 瑙f瀽澶у崟锛� /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-08 /home/userzjj/temp.txt 000555 +# 瑙f瀽澶у崟锛� /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/鏈�缁堟垚浜ゆ暟鎹�20250509.txt 000555 if __name__ == '__main__': if len(sys.argv) > 1: params = sys.argv[1:] @@ -231,15 +382,34 @@ parse_ngtstick(day) elif _type == 'MarketData': parse_market_data(day) - elif _type == 'ExtractDealBigOrder': - # 鎻愬彇鎵�鏈夋垚浜ょ殑澶у崟 + elif _type == 'Transaction_New': if len(params) > 2: - save_path = params[2].strip() + process_count = int(params[2].strip()) else: - save_path = None + process_count = 4 - if len(params) > 3: - target_code = params[3].strip() + transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv", + process_count=process_count) + transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction") + elif _type == 'NGTSTick_New': + if len(params) > 2: + process_count = int(params[2].strip()) else: - target_code = None - parse_deal_big_orders(day, save_path, target_code) + process_count = 4 + transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv", + process_count=process_count) + transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick") + elif _type == 'Transaction_Concat': + transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction") + elif _type == 'NGTSTick_Concat': + transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick") + elif _type == 'ExtractDealBigOrder': + # 鍛戒护妯″紡 /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 + if len(params) > 2: + process_count = int(params[2].strip()) + else: + process_count = 10 + transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/NGTSTick", + process_count=process_count) + transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/Transaction", + process_count=process_count) -- Gitblit v1.8.0