New file |
| | |
| | | """ |
| | | 大单成交数据解析器 |
| | | """ |
| | | import os |
| | | from multiprocessing import Pool |
| | | |
| | | import pandas as pd |
| | | |
| | | from utils import tool |
| | | |
| | | |
| | | class BigOrderDealParser: |
| | | |
| | | @classmethod |
| | | def str_to_float(cls, s): |
| | | try: |
| | | # 移除单位并转换 |
| | | return round(float(s.split("@")[0]), 2) |
| | | except: |
| | | return float("nan") |
| | | |
| | | @classmethod |
| | | def code_format(cls, s): |
| | | try: |
| | | code = "{0:0>6}".format(s) |
| | | return code |
| | | except: |
| | | return '' |
| | | |
| | | |
| | | def __pre_process_transactions_detail(args): |
| | | def first_last(group): |
| | | """ |
| | | 获取第一条数据与最后一条 |
| | | @param group: |
| | | @return: |
| | | """ |
| | | return pd.Series({ |
| | | 'TotalAmount': group['TradeAmount'].sum(), |
| | | 'TotalVolume': group['TradeVolume'].sum(), |
| | | 'StartTime': group['TradeTime'].iloc[0], |
| | | 'StartPrice': group['TradePrice'].iloc[0], |
| | | 'EndTime': group['TradeTime'].iloc[-1], |
| | | 'EndPrice': group['TradePrice'].iloc[-1] |
| | | }) |
| | | |
| | | chunk_index, chunk_data = args[0] |
| | | csv_path = args[1] |
| | | df = chunk_data.copy() |
| | | index = chunk_index + 1 |
| | | children_dir = csv_path.replace(".csv", "") |
| | | if not os.path.exists(children_dir): |
| | | os.makedirs(children_dir) |
| | | child_path = os.path.join(children_dir, f"{index}.csv") |
| | | if os.path.exists(child_path): |
| | | return |
| | | print(f"处理Transaction第{index}批次", os.getpid()) |
| | | df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float) |
| | | df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) |
| | | df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] |
| | | # 计算成交金额 |
| | | df['TradeAmount'] = df['TradePrice'] * df['TradeVolume'] |
| | | df = df[df["TradeAmount"] > 0] |
| | | # 判断是否为空 |
| | | # print(df.empty) |
| | | # 按SecurityID和BuyNo分组 |
| | | grouped = df.groupby(['SecurityID', 'BuyNo']) |
| | | # 应用聚合函数 |
| | | chunk_result = grouped.apply(first_last) |
| | | if not df.empty: |
| | | chunk_result = chunk_result.reset_index() |
| | | chunk_result.to_csv(child_path, index=False) |
| | | |
| | | |
| | | def pre_process_transactions(csv_path, process_count=4): |
| | | chunk_size = 200000 |
| | | # 创建DataFrame |
| | | chunks = pd.read_csv(csv_path, chunksize=chunk_size) |
| | | indexed_data = list(enumerate(chunks)) |
| | | args = [(x, csv_path) for x in indexed_data] |
| | | # 新写法 |
| | | with Pool(processes=process_count) as pool: |
| | | pool.map(__pre_process_transactions_detail, args) |
| | | |
| | | |
| | | def __pre_process_ngtsticks(args): |
| | | def first_last(group): |
| | | return pd.Series({ |
| | | 'TotalAmount': group['TradeMoney'].sum(), |
| | | 'TotalVolume': group['Volume'].sum(), |
| | | 'StartTime': group['TickTime'].iloc[0], |
| | | 'StartPrice': group['Price'].iloc[0], |
| | | 'EndTime': group['TickTime'].iloc[-1], |
| | | 'EndPrice': group['Price'].iloc[-1] |
| | | }) |
| | | |
| | | chunk_index, chunk_data = args[0] |
| | | csv_path = args[1] |
| | | |
| | | df = chunk_data.copy() |
| | | index = chunk_index + 1 |
| | | children_dir = csv_path.replace(".csv", "") |
| | | if not os.path.exists(children_dir): |
| | | os.makedirs(children_dir) |
| | | child_path = os.path.join(children_dir, f"{index}.csv") |
| | | if os.path.exists(child_path): |
| | | return |
| | | print(f"处理NGTSTick第{index}批次") |
| | | df = df[df["TickType"] == 'T'] |
| | | df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float) |
| | | df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) |
| | | df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] |
| | | # 计算成交金额 |
| | | df['TradeMoney'] = df["TradeMoney"].apply(BigOrderDealParser.str_to_float) |
| | | |
| | | # 按SecurityID和BuyNo分组 |
| | | grouped = df.groupby(['SecurityID', 'BuyNo']) |
| | | # 应用聚合函数 |
| | | chunk_result = grouped.apply(first_last) |
| | | if not df.empty: |
| | | chunk_result = chunk_result.reset_index() |
| | | chunk_result.to_csv(child_path, index=False) |
| | | |
| | | |
| | | def pre_process_ngtsticks(csv_path, process_count=4): |
| | | chunk_size = 200000 |
| | | # 创建DataFrame |
| | | chunks = pd.read_csv(csv_path, chunksize=chunk_size) |
| | | indexed_data = list(enumerate(chunks)) |
| | | args = [(x, csv_path) for x in indexed_data] |
| | | # 新写法 |
| | | with Pool(processes=process_count) as pool: |
| | | pool.map(__pre_process_ngtsticks, args) |
| | | |
| | | |
| | | def __concat_pre_datas(dir_path): |
| | | """ |
| | | 拼接数据 |
| | | @param dir_path: |
| | | @return: |
| | | """ |
| | | combined_path = os.path.join(dir_path, 'combined.csv') |
| | | if os.path.exists(combined_path): |
| | | return |
| | | file_list = os.listdir(dir_path) |
| | | file_list.sort(key=lambda x: int(x.split(".")[0])) |
| | | df_list = [] |
| | | for file in file_list: |
| | | df = pd.read_csv(os.path.join(dir_path, file)) |
| | | if df.empty: |
| | | continue |
| | | df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) |
| | | df_list.append(df) |
| | | |
| | | combined_df = pd.concat(df_list, ignore_index=True) |
| | | # 保存结果 |
| | | combined_df.to_csv(combined_path, index=False) |
| | | |
| | | |
| | | def concat_pre_transactions(dir_path): |
| | | __concat_pre_datas(dir_path) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | # pre_process_transactions("E:/测试数据/Transaction_Test.csv") |
| | | # pre_process_ngtsticks("E:/测试数据/NGTSTick_Test.csv") |
| | | concat_pre_transactions("E:/测试数据/Transaction_Test") |