""" L2数据解析器 """ import csv import os import sys import time from multiprocessing import Pool import pandas as pd from data_parser import transaction_big_order_parser from db import mysql_data_delegate as mysql_data from huaxin_client.l2_client_test import L2TransactionDataManager from log_module import log_export from utils import tool def __get_target_codes(day): special_codes_dict = log_export.load_special_codes(day) fcodes = set() for plate in special_codes_dict: codes = special_codes_dict[plate] fcodes |= codes return fcodes def parse_order_detail(day, code): target_codes = __get_target_codes(day) base_path = f"/home/userzjj/ftp/{day}" with open(f"{base_path}/OrderDetail.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) # 获取表头:: ['ExchangeID', 'SecurityID', 'OrderTime', 'Price', 'Volume', 'Side', 'OrderType', 'MainSeq', 'SubSeq', 'Info1', 'Info2', 'Info3', 'OrderNO', 'OrderStatus', 'BizIndex', 'LocalTimeStamp'] headers = next(csv_reader) print("表头:", headers) # 遍历数据行 _path = f"{base_path}/OrderDetail_filter.csv" with open(_path, 'w', newline='', encoding='utf-8') as csvfile: # 创建一个 CSV 写入器对象 writer = csv.writer(csvfile) # 逐行写入数据 for row in csv_reader: if row[1] not in target_codes: continue if code and code != row[1]: continue # 将文件写入到文本 writer.writerow(row) def parse_transaction(day, save=True, parse_all=False): target_codes = __get_target_codes(day) base_path = f"/home/userzjj/ftp/{day}" fdatas = [] df = pd.read_csv(f"{base_path}/Transaction.csv") category_revenue = df.groupby('BuyNo')['TradeVolume'].sum() with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) total_lines = csv_reader.line_num print("总行数:", total_lines) # 获取表头:: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp] headers = next(csv_reader) print("表头:", headers) # 遍历数据行 _path = f"{base_path}/Transaction_filter.csv" percent = 0 with open(_path, 'w', newline='', encoding='utf-8') as csvfile: # 创建一个 CSV 写入器对象 writer = csv.writer(csvfile) # 逐行写入数据 count = 0 for row in csv_reader: count += 1 p = count * 100 // total_lines if p != percent: percent = p print(f"**进度:{percent}%") if row[1] not in target_codes and not parse_all: continue # 将文件写入到文本 if save: writer.writerow(row) fdatas.append(row) return fdatas def parse_ngtstick(day, save=True): target_codes = __get_target_codes(day) base_path = f"/home/userzjj/ftp/{day}" fdatas = [] with open(f"{base_path}/NGTSTick.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) # 获取表头:: [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp] headers = next(csv_reader) print("表头:", headers) # 遍历数据行 _path = f"{base_path}/NGTSTick_filter.csv" with open(_path, 'w', newline='', encoding='utf-8') as csvfile: # 创建一个 CSV 写入器对象 writer = csv.writer(csvfile) # 逐行写入数据 for row in csv_reader: if row[1] not in target_codes: continue if save: # 将文件写入到文本 writer.writerow(row) fdatas.append(row) return fdatas def parse_deal_big_orders(day, big_order_path, target_code=None): """ 提取所有成交大单 @param day: @return: """ print("*******开始处理成交大单数据", day, big_order_path, target_code) l2_data_manager_dict = {} # 解析数据 __start_time = time.time() transaction_data = parse_transaction(day, parse_all=True) print("*******Transaction 读取完毕", len(transaction_data), "耗时", int(time.time() - __start_time)) __start_time = time.time() ngtstick_data = parse_ngtstick(day) print("*******NGTSTick 读取完毕", len(ngtstick_data), "耗时", int(time.time() - __start_time)) __start_time = time.time() big_order_list = [] for index, row in enumerate(transaction_data): code = row[1] if target_code and code != target_code: continue item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2), "TradeVolume": int(row[4]), "OrderTime": int(row[2]), "MainSeq": int(row[6]), "SubSeq": int(row[7]), "BuyNo": int(row[8]), "SellNo": int(row[9]), "ExecType": int(row[5])} if item["TradePrice"] <= 0: continue if code not in l2_data_manager_dict: l2_data_manager_dict[code] = L2TransactionDataManager(code, True) l2_data_manager_dict[code].add_transaction_data(item, big_order_money_threshold=100e4) if index % 100 == 0: # 读取队列中的数据 l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] while not l2_data_manager.big_accurate_buy_order_queue.empty(): data = l2_data_manager.big_accurate_buy_order_queue.get() big_order_list.append((code, 0, data)) while not l2_data_manager.big_accurate_sell_order_queue.empty(): data = l2_data_manager.big_accurate_sell_order_queue.get() big_order_list.append((code, 1, data)) print("********Transaction 处理完毕", len(big_order_list), "耗时", int(time.time() - __start_time)) __start_time = time.time() for index, row in enumerate(ngtstick_data): code = row[1] if target_code and code != target_code: continue if row[5].strip() != 'T': # 过滤非成交数据 continue item = {"SecurityID": row[1], "TradePrice": round(float(row[8].split("@")[0]), 2), "TradeVolume": int(row[9]), "OrderTime": row[4], "MainSeq": row[2], "SubSeq": row[3], "BuyNo": row[6], "SellNo": row[7], "ExecType": '1'} if item["TradePrice"] <= 0: continue if code not in l2_data_manager_dict: l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(code, True) l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item) if index % 100 == 0: # 读取队列中的数据 l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] while not l2_data_manager.big_accurate_buy_order_queue.empty(): data = l2_data_manager.big_accurate_buy_order_queue.get() big_order_list.append((code, 0, data)) while not l2_data_manager.big_accurate_sell_order_queue.empty(): data = l2_data_manager.big_accurate_sell_order_queue.get() big_order_list.append((code, 1, data)) print("********NGTSTick 处理完毕", len(big_order_list), "耗时", int(time.time() - __start_time)) __start_time = time.time() # 读取剩余的未读数据的代码 for code in l2_data_manager_dict: l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] while not l2_data_manager.big_accurate_buy_order_queue.empty(): data = l2_data_manager.big_accurate_buy_order_queue.get() big_order_list.append((code, 0, data)) while not l2_data_manager.big_accurate_sell_order_queue.empty(): data = l2_data_manager.big_accurate_sell_order_queue.get() big_order_list.append((code, 1, data)) print("********开始写入本地文件:", len(big_order_list), "耗时", int(time.time() - __start_time)) __start_time = time.time() # 开始写入本地文件 with open(big_order_path, mode='w', encoding='utf-8') as f: for order in big_order_list: f.write(f"{order}\n") print("*******写入本地文件结束:", "耗时", int(time.time() - __start_time)) __start_time = time.time() def parse_market_data(day): target_codes = __get_target_codes(day) base_path = f"/home/userzjj/ftp/{day}" with open(f"{base_path}/MarketData.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) # 获取表头:: [SecurityID,ExchangeID,DataTimeStamp,PreClosePrice,OpenPrice,NumTrades,TotalVolumeTrade,TotalValueTrade,TotalBidVolume,AvgBidPrice,TotalAskVolume,AvgAskPrice,HighestPrice,LowestPrice,LastPrice,BidPrice1,BidVolume1,AskPrice1,AskVolume1....] headers = next(csv_reader) print("表头:", headers) # 遍历数据行 _path = f"{base_path}/MarketData_filter.csv" with open(_path, 'w', newline='', encoding='utf-8') as csvfile: # 创建一个 CSV 写入器对象 writer = csv.writer(csvfile) # 逐行写入数据 for row in csv_reader: if row[0] not in target_codes: continue # 将文件写入到文本 writer.writerow(row) def test1(args): index, df = args print(index) def pre_process_transactions(csv_path="E:/测试数据/Transaction_Test.csv"): def str_to_float(s): try: # 移除单位并转换 return round(float(s.split("@")[0]), 2) except: return float("nan") def code_format(s): try: code = "{0:0>6}".format(s) return code except: return '' # [ExchangeID, SecurityID, TradeTime, TradePrice, TradeVolume, ExecType, MainSeq, SubSeq, BuyNo, SellNo, Info1, Info2, # Info3, TradeBSFlag, BizIndex, LocalTimeStamp] # transaction_data = { # "SecurityID": ['300920', '300920', '300920', '300920'], # "TradeTime": [91500040, 91500041, 91500042, 92000040], # "TradePrice": [15.0, 16.2, 15.2, 16.3], # "TradeVolume": [100, 100, 200, 300], # "BuyNo": [0, 1, 1, 1] # } # 定义聚合函数 def first_last(group): return pd.Series({ 'TotalAmount': group['TradeAmount'].sum(), 'TotalVolume': group['TradeVolume'].sum(), 'StartTime': group['TradeTime'].iloc[0], 'StartPrice': group['TradePrice'].iloc[0], 'EndTime': group['TradeTime'].iloc[-1], 'EndPrice': group['TradePrice'].iloc[-1] }) dtype = { 'SecurityID': 'category', # 低基数分类数据 } chunk_size = 10000 # 创建DataFrame chunks = pd.read_csv(csv_path, chunksize=chunk_size) indexed_data = list(enumerate(chunks)) # 新写法 with Pool(processes=4) as pool: pool.map(test1, indexed_data) result_list = [] for chunk_index, chunk in enumerate(chunks): df = chunk.copy() index = chunk_index + 1 child_path = csv_path.replace(".csv", f"_{index}.csv") if os.path.exists(child_path): continue print(f"处理第{index}批次") df["TradePrice"] = df["TradePrice"].apply(str_to_float) df["SecurityID"] = df["SecurityID"].apply(code_format) df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] # 计算成交金额 df['TradeAmount'] = df['TradePrice'] * df['TradeVolume'] # 按SecurityID和BuyNo分组 grouped = df.groupby(['SecurityID', 'BuyNo']) # 应用聚合函数 chunk_result = grouped.apply(first_last).reset_index() chunk_result.to_csv(child_path, index=False) print(f"处理完毕,总共{index}批") def pre_process_ngtstick(csv_path="E:/测试数据/NGTSTick_Test.csv"): def str_to_float(s): try: # 移除单位并转换 return round(float(s.split("@")[0]), 2) except: return float("nan") def code_format(s): try: code = "{0:0>6}".format(s) return code except: return '' # 定义聚合函数 def first_last(group): return pd.Series({ 'TotalAmount': group['TradeMoney'].sum(), 'TotalVolume': group['Volume'].sum(), 'StartTime': group['TickTime'].iloc[0], 'StartPrice': group['Price'].iloc[0], 'EndTime': group['TickTime'].iloc[-1], 'EndPrice': group['Price'].iloc[-1] }) # [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp] chunk_size = 10000 # 创建DataFrame chunks = pd.read_csv(csv_path, chunksize=chunk_size) result_list = [] index = 0 for df in chunks: index += 1 child_path = csv_path.replace(".csv", f"_{index}.csv") if os.path.exists(child_path): continue print(f"处理第{index}批次") df = df[df["TickType"] == 'T'] df["Price"] = df["Price"].apply(str_to_float) df["SecurityID"] = df["SecurityID"].apply(code_format) df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] # 计算成交金额 df['TradeMoney'] = df["TradeMoney"].apply(str_to_float) # 按SecurityID和BuyNo分组 grouped = df.groupby(['SecurityID', 'BuyNo']) # 应用聚合函数 chunk_result = grouped.apply(first_last).reset_index() chunk_result.to_csv(child_path, index=False) print(f"处理完毕,总共{index}批") if __name__ == '__main__1': # df = pd.read_csv(f"E:/测试数据/Transaction_Test.csv") pre_process_transactions() # 命令模式 /home/userzjj/app/gp-server/l2_data_parser Transaction 2025-05-08 # 解析大单: /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/最终成交数据20250509.txt 000555 if __name__ == '__main__': if len(sys.argv) > 1: params = sys.argv[1:] print("接收的参数", params) _type = params[0].strip() day = params[1].strip() if len(params) > 2: code = params[2].strip() else: code = None if _type == 'OrderDetail': parse_order_detail(day, code) elif _type == 'Transaction': parse_transaction(day) elif _type == 'NGTSTick': parse_ngtstick(day) elif _type == 'MarketData': parse_market_data(day) elif _type == 'Transaction_New': if len(params) > 2: process_count = int(params[2].strip()) else: process_count = 4 transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv", process_count=process_count) transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction") elif _type == 'NGTSTick_New': if len(params) > 2: process_count = int(params[2].strip()) else: process_count = 4 transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv", process_count=process_count) transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick") elif _type == 'Transaction_Concat': transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction") elif _type == 'NGTSTick_Concat': transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick") elif _type == 'ExtractDealBigOrder': # 命令模式 /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 # 根据code提取大单 if not code: print("没有传入代码") else: if tool.is_sh_code(code): transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/NGTSTick", code) else: transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/Transaction", code)