""" 大单成交数据解析器 """ import os import re from multiprocessing import Pool import pandas as pd from utils import tool def print_log(*args): print(f"[{tool.get_now_time_str()}]", *args) class BigOrderDealParser: @classmethod def str_to_float(cls, s): try: # 移除单位并转换 return round(float(s.split("@")[0]), 2) except: return float("nan") @classmethod def code_format(cls, s): try: code = "{0:0>6}".format(s) return code except: return '' def __pre_process_transactions_detail(args): def first_last(group): """ 获取第一条数据与最后一条 @param group: @return: """ return pd.Series({ 'TotalAmount': group['TradeAmount'].sum(), 'TotalVolume': group['TradeVolume'].sum(), 'StartTime': group['TradeTime'].iloc[0], 'StartPrice': group['TradePrice'].iloc[0], 'EndTime': group['TradeTime'].iloc[-1], 'EndPrice': group['TradePrice'].iloc[-1] }) chunk_index, chunk_data = args[0] csv_path = args[1] df = chunk_data.copy() index = chunk_index + 1 children_dir = csv_path.replace(".csv", "") if not os.path.exists(children_dir): os.makedirs(children_dir) child_path = os.path.join(children_dir, f"{index}.csv") if os.path.exists(child_path): return print_log(f"处理Transaction第{index}批次", os.getpid()) df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float) df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] # 计算成交金额 df['TradeAmount'] = df['TradePrice'] * df['TradeVolume'] df = df[df["TradeAmount"] > 0] # 判断是否为空 # print_log(df.empty) # 按SecurityID和BuyNo分组 grouped = df.groupby(['SecurityID', 'BuyNo']) # 应用聚合函数 chunk_result = grouped.apply(first_last) if not df.empty: chunk_result = chunk_result.reset_index() chunk_result.to_csv(child_path, index=False) def pre_process_transactions(csv_path, process_count=4): chunk_size = 200000 # 创建DataFrame chunks = pd.read_csv(csv_path, chunksize=chunk_size) indexed_data = list(enumerate(chunks)) args = [(x, csv_path) for x in indexed_data] # 新写法 with Pool(processes=process_count) as pool: pool.map(__pre_process_transactions_detail, args) def __pre_process_ngtsticks(args): def first_last(group): return pd.Series({ 'TotalAmount': group['TradeMoney'].sum(), 'TotalVolume': group['Volume'].sum(), 'StartTime': group['TickTime'].iloc[0], 'StartPrice': group['Price'].iloc[0], 'EndTime': group['TickTime'].iloc[-1], 'EndPrice': group['Price'].iloc[-1] }) chunk_index, chunk_data = args[0] csv_path = args[1] df = chunk_data.copy() index = chunk_index + 1 children_dir = csv_path.replace(".csv", "") if not os.path.exists(children_dir): os.makedirs(children_dir) child_path = os.path.join(children_dir, f"{index}.csv") if os.path.exists(child_path): return log(f"处理NGTSTick第{index}批次") df = df[df["TickType"] == 'T'] df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float) df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] # 计算成交金额 df['TradeMoney'] = df["TradeMoney"].apply(BigOrderDealParser.str_to_float) # 按SecurityID和BuyNo分组 grouped = df.groupby(['SecurityID', 'BuyNo']) # 应用聚合函数 chunk_result = grouped.apply(first_last) if not df.empty: chunk_result = chunk_result.reset_index() chunk_result.to_csv(child_path, index=False) def pre_process_ngtsticks(csv_path, process_count=4): chunk_size = 200000 # 创建DataFrame chunks = pd.read_csv(csv_path, chunksize=chunk_size) indexed_data = list(enumerate(chunks)) args = [(x, csv_path) for x in indexed_data] # 新写法 with Pool(processes=process_count) as pool: pool.map(__pre_process_ngtsticks, args) def __concat_pre_datas(dir_path): """ 拼接数据 @param dir_path: @return: """ combined_path = os.path.join(dir_path, 'combined.csv') if os.path.exists(combined_path): print_log("合并的目标文件已存在") return file_list = os.listdir(dir_path) file_list.sort(key=lambda x: int(re.findall(r'\d+', x)[0])) df_list = [] for file in file_list: df = pd.read_csv(os.path.join(dir_path, file)) if df.empty: continue df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) df_list.append(df) print_log("准备合并的文件数量:", len(df_list)) combined_df = pd.concat(df_list, ignore_index=True) print_log("合并完成,准备写入文件!") # 保存结果 combined_df.to_csv(combined_path, index=False) print_log("写入文件完成!") def concat_pre_transactions(dir_path): __concat_pre_datas(dir_path) def concat_pre_ngtsticks(dir_path): __concat_pre_datas(dir_path) def process_combined_transaction(dir_path): """ 处理拼接起来的数据 @param dir_path: @return: """ combined_path = os.path.join(dir_path, 'combined.csv') if not os.path.exists(combined_path): print_log("拼接数据不存在") return df = pd.read_csv(combined_path) df_copy = df.copy() grouped = df_copy.groupby(["SecurityID"]) # 应用聚合函数 chunk_result = grouped.apply(pd.Series({})) # chunk_result["SecurityID"] = chunk_result["SecurityID"].apply(BigOrderDealParser.code_format) print_log(chunk_result.to_string( index=False, # 不显示索引 justify='left', # 左对齐 float_format='%.3f' # 浮点数格式 )) __combined_df_cache = {} def extract_big_order_of_all(dir_path): combined_path = os.path.join(dir_path, 'combined.csv') if not os.path.exists(combined_path): print_log("拼接数据不存在") return codes = extract_big_order_codes(dir_path) print_log("总代码数量:", len(codes)) for code in codes: extract_big_order_of_code(dir_path, code) def extract_big_order_of_code(dir_path, code): """ 提取代码的大单 @param dir_path: 数据目录 @param code: 为空表示导出全部 @return: """ def first_last(group): """ 获取第一条数据与最后一条 @param group: @return: """ return pd.Series({ 'SecurityID': group['SecurityID'].iloc[0], 'BuyNo': group['BuyNo'].iloc[0], 'TotalVolume': group['TotalVolume'].sum(), 'TotalAmount': group['TotalAmount'].sum(), 'EndTime': group['EndTime'].iloc[-1], 'EndPrice': group['EndPrice'].iloc[-1], 'StartTime': group['StartTime'].iloc[0], 'StartPrice': group['StartPrice'].iloc[0] }) combined_path = os.path.join(dir_path, 'combined.csv') if not os.path.exists(combined_path): print_log("拼接数据不存在") return output_path = os.path.join(dir_path, f"big_buy_{code}.csv") if os.path.exists(output_path): print_log("路径已存在:", output_path) return df = __combined_df_cache.get(combined_path, None) if df is None: df = pd.read_csv(combined_path) __combined_df_cache[combined_path] = df df_copy = df.copy() if code: df_copy = df_copy[df_copy["SecurityID"] == int(code)] if df_copy.empty: print_log("目标代码对应成交数据为空") return df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format) # 按SecurityID和BuyNo分组 grouped = df_copy.groupby(['SecurityID', 'BuyNo']) grouped_result = grouped.apply(first_last) grouped_result = grouped_result[grouped_result["TotalAmount"] > 500000] # print_log(grouped_result) # 遍历内容 grouped_result.to_csv(output_path, index=False) print_log(f"[{tool.get_now_time_str()}]保存成功,路径:{output_path}") def extract_big_order_codes(dir_path): """ 导出大单代码 @param dir_path: 数据目录 @param code: @return: """ combined_path = os.path.join(dir_path, 'combined.csv') if not os.path.exists(combined_path): print_log("拼接数据不存在") return df = pd.read_csv(combined_path) df_copy = df.copy() if df_copy.empty: print_log("目标代码对应成交数据为空") return df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format) # 按SecurityID和BuyNo分组 grouped = df_copy.groupby(['SecurityID']) return set(grouped.groups.keys()) if __name__ == "__main__": log(1,2,3) # pre_process_transactions("E:/测试数据/Transaction_Test.csv") # pre_process_ngtsticks("E:/测试数据/NGTSTick_Test.csv") # concat_pre_transactions("E:/测试数据/Transaction_Test") # extract_big_order_codes("E:/测试数据/Transaction_Test") extract_big_order_of_code("E:/测试数据/Transaction_Test")