"""
|
大单成交数据解析器
|
"""
|
import os
|
import re
|
from multiprocessing import Pool
|
|
import pandas as pd
|
|
from utils import tool
|
|
|
class BigOrderDealParser:
|
|
@classmethod
|
def str_to_float(cls, s):
|
try:
|
# 移除单位并转换
|
return round(float(s.split("@")[0]), 2)
|
except:
|
return float("nan")
|
|
@classmethod
|
def code_format(cls, s):
|
try:
|
code = "{0:0>6}".format(s)
|
return code
|
except:
|
return ''
|
|
|
def __pre_process_transactions_detail(args):
|
def first_last(group):
|
"""
|
获取第一条数据与最后一条
|
@param group:
|
@return:
|
"""
|
return pd.Series({
|
'TotalAmount': group['TradeAmount'].sum(),
|
'TotalVolume': group['TradeVolume'].sum(),
|
'StartTime': group['TradeTime'].iloc[0],
|
'StartPrice': group['TradePrice'].iloc[0],
|
'EndTime': group['TradeTime'].iloc[-1],
|
'EndPrice': group['TradePrice'].iloc[-1]
|
})
|
|
chunk_index, chunk_data = args[0]
|
csv_path = args[1]
|
df = chunk_data.copy()
|
index = chunk_index + 1
|
children_dir = csv_path.replace(".csv", "")
|
if not os.path.exists(children_dir):
|
os.makedirs(children_dir)
|
child_path = os.path.join(children_dir, f"{index}.csv")
|
if os.path.exists(child_path):
|
return
|
print(f"处理Transaction第{index}批次", os.getpid())
|
df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float)
|
df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
|
df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
|
# 计算成交金额
|
df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
|
df = df[df["TradeAmount"] > 0]
|
# 判断是否为空
|
# print(df.empty)
|
# 按SecurityID和BuyNo分组
|
grouped = df.groupby(['SecurityID', 'BuyNo'])
|
# 应用聚合函数
|
chunk_result = grouped.apply(first_last)
|
if not df.empty:
|
chunk_result = chunk_result.reset_index()
|
chunk_result.to_csv(child_path, index=False)
|
|
|
def pre_process_transactions(csv_path, process_count=4):
|
chunk_size = 200000
|
# 创建DataFrame
|
chunks = pd.read_csv(csv_path, chunksize=chunk_size)
|
indexed_data = list(enumerate(chunks))
|
args = [(x, csv_path) for x in indexed_data]
|
# 新写法
|
with Pool(processes=process_count) as pool:
|
pool.map(__pre_process_transactions_detail, args)
|
|
|
def __pre_process_ngtsticks(args):
|
def first_last(group):
|
return pd.Series({
|
'TotalAmount': group['TradeMoney'].sum(),
|
'TotalVolume': group['Volume'].sum(),
|
'StartTime': group['TickTime'].iloc[0],
|
'StartPrice': group['Price'].iloc[0],
|
'EndTime': group['TickTime'].iloc[-1],
|
'EndPrice': group['Price'].iloc[-1]
|
})
|
|
chunk_index, chunk_data = args[0]
|
csv_path = args[1]
|
|
df = chunk_data.copy()
|
index = chunk_index + 1
|
children_dir = csv_path.replace(".csv", "")
|
if not os.path.exists(children_dir):
|
os.makedirs(children_dir)
|
child_path = os.path.join(children_dir, f"{index}.csv")
|
if os.path.exists(child_path):
|
return
|
print(f"处理NGTSTick第{index}批次")
|
df = df[df["TickType"] == 'T']
|
df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float)
|
df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
|
df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
|
# 计算成交金额
|
df['TradeMoney'] = df["TradeMoney"].apply(BigOrderDealParser.str_to_float)
|
|
# 按SecurityID和BuyNo分组
|
grouped = df.groupby(['SecurityID', 'BuyNo'])
|
# 应用聚合函数
|
chunk_result = grouped.apply(first_last)
|
if not df.empty:
|
chunk_result = chunk_result.reset_index()
|
chunk_result.to_csv(child_path, index=False)
|
|
|
def pre_process_ngtsticks(csv_path, process_count=4):
|
chunk_size = 200000
|
# 创建DataFrame
|
chunks = pd.read_csv(csv_path, chunksize=chunk_size)
|
indexed_data = list(enumerate(chunks))
|
args = [(x, csv_path) for x in indexed_data]
|
# 新写法
|
with Pool(processes=process_count) as pool:
|
pool.map(__pre_process_ngtsticks, args)
|
|
|
def __concat_pre_datas(dir_path):
|
"""
|
拼接数据
|
@param dir_path:
|
@return:
|
"""
|
combined_path = os.path.join(dir_path, 'combined.csv')
|
if os.path.exists(combined_path):
|
print("合并的目标文件已存在")
|
return
|
file_list = os.listdir(dir_path)
|
file_list.sort(key=lambda x: int(re.findall(r'\d+', x)[0]))
|
df_list = []
|
for file in file_list:
|
df = pd.read_csv(os.path.join(dir_path, file))
|
if df.empty:
|
continue
|
df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
|
df_list.append(df)
|
print("准备合并的文件数量:", len(df_list))
|
|
combined_df = pd.concat(df_list, ignore_index=True)
|
|
print("合并完成,准备写入文件!")
|
# 保存结果
|
combined_df.to_csv(combined_path, index=False)
|
print("写入文件完成!")
|
|
|
def concat_pre_transactions(dir_path):
|
__concat_pre_datas(dir_path)
|
|
def concat_pre_ngtsticks(dir_path):
|
__concat_pre_datas(dir_path)
|
|
if __name__ == "__main__":
|
pre_process_transactions("E:/测试数据/Transaction_Test.csv")
|
# pre_process_ngtsticks("E:/测试数据/NGTSTick_Test.csv")
|
concat_pre_transactions("E:/测试数据/Transaction_Test")
|