"""
|
大单成交数据解析器
|
"""
|
import os
|
import re
|
from multiprocessing import Pool
|
|
import pandas as pd
|
|
from utils import tool
|
|
|
class BigOrderDealParser:
|
|
@classmethod
|
def str_to_float(cls, s):
|
try:
|
# 移除单位并转换
|
return round(float(s.split("@")[0]), 2)
|
except:
|
return float("nan")
|
|
@classmethod
|
def code_format(cls, s):
|
try:
|
code = "{0:0>6}".format(s)
|
return code
|
except:
|
return ''
|
|
|
def __pre_process_transactions_detail(args):
|
def first_last(group):
|
"""
|
获取第一条数据与最后一条
|
@param group:
|
@return:
|
"""
|
return pd.Series({
|
'TotalAmount': group['TradeAmount'].sum(),
|
'TotalVolume': group['TradeVolume'].sum(),
|
'StartTime': group['TradeTime'].iloc[0],
|
'StartPrice': group['TradePrice'].iloc[0],
|
'EndTime': group['TradeTime'].iloc[-1],
|
'EndPrice': group['TradePrice'].iloc[-1]
|
})
|
|
chunk_index, chunk_data = args[0]
|
csv_path = args[1]
|
df = chunk_data.copy()
|
index = chunk_index + 1
|
children_dir = csv_path.replace(".csv", "")
|
if not os.path.exists(children_dir):
|
os.makedirs(children_dir)
|
child_path = os.path.join(children_dir, f"{index}.csv")
|
if os.path.exists(child_path):
|
return
|
print(f"处理Transaction第{index}批次", os.getpid())
|
df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float)
|
df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
|
df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
|
# 计算成交金额
|
df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
|
df = df[df["TradeAmount"] > 0]
|
# 判断是否为空
|
# print(df.empty)
|
# 按SecurityID和BuyNo分组
|
grouped = df.groupby(['SecurityID', 'BuyNo'])
|
# 应用聚合函数
|
chunk_result = grouped.apply(first_last)
|
if not df.empty:
|
chunk_result = chunk_result.reset_index()
|
chunk_result.to_csv(child_path, index=False)
|
|
|
def pre_process_transactions(csv_path, process_count=4):
|
chunk_size = 200000
|
# 创建DataFrame
|
chunks = pd.read_csv(csv_path, chunksize=chunk_size)
|
indexed_data = list(enumerate(chunks))
|
args = [(x, csv_path) for x in indexed_data]
|
# 新写法
|
with Pool(processes=process_count) as pool:
|
pool.map(__pre_process_transactions_detail, args)
|
|
|
def __pre_process_ngtsticks(args):
|
def first_last(group):
|
return pd.Series({
|
'TotalAmount': group['TradeMoney'].sum(),
|
'TotalVolume': group['Volume'].sum(),
|
'StartTime': group['TickTime'].iloc[0],
|
'StartPrice': group['Price'].iloc[0],
|
'EndTime': group['TickTime'].iloc[-1],
|
'EndPrice': group['Price'].iloc[-1]
|
})
|
|
chunk_index, chunk_data = args[0]
|
csv_path = args[1]
|
|
df = chunk_data.copy()
|
index = chunk_index + 1
|
children_dir = csv_path.replace(".csv", "")
|
if not os.path.exists(children_dir):
|
os.makedirs(children_dir)
|
child_path = os.path.join(children_dir, f"{index}.csv")
|
if os.path.exists(child_path):
|
return
|
print(f"处理NGTSTick第{index}批次")
|
df = df[df["TickType"] == 'T']
|
df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float)
|
df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
|
df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
|
# 计算成交金额
|
df['TradeMoney'] = df["TradeMoney"].apply(BigOrderDealParser.str_to_float)
|
|
# 按SecurityID和BuyNo分组
|
grouped = df.groupby(['SecurityID', 'BuyNo'])
|
# 应用聚合函数
|
chunk_result = grouped.apply(first_last)
|
if not df.empty:
|
chunk_result = chunk_result.reset_index()
|
chunk_result.to_csv(child_path, index=False)
|
|
|
def pre_process_ngtsticks(csv_path, process_count=4):
|
chunk_size = 200000
|
# 创建DataFrame
|
chunks = pd.read_csv(csv_path, chunksize=chunk_size)
|
indexed_data = list(enumerate(chunks))
|
args = [(x, csv_path) for x in indexed_data]
|
# 新写法
|
with Pool(processes=process_count) as pool:
|
pool.map(__pre_process_ngtsticks, args)
|
|
|
def __concat_pre_datas(dir_path):
|
"""
|
拼接数据
|
@param dir_path:
|
@return:
|
"""
|
combined_path = os.path.join(dir_path, 'combined.csv')
|
if os.path.exists(combined_path):
|
print("合并的目标文件已存在")
|
return
|
file_list = os.listdir(dir_path)
|
file_list.sort(key=lambda x: int(re.findall(r'\d+', x)[0]))
|
df_list = []
|
for file in file_list:
|
df = pd.read_csv(os.path.join(dir_path, file))
|
if df.empty:
|
continue
|
df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
|
df_list.append(df)
|
print("准备合并的文件数量:", len(df_list))
|
|
combined_df = pd.concat(df_list, ignore_index=True)
|
|
print("合并完成,准备写入文件!")
|
# 保存结果
|
combined_df.to_csv(combined_path, index=False)
|
print("写入文件完成!")
|
|
|
def concat_pre_transactions(dir_path):
|
__concat_pre_datas(dir_path)
|
|
|
def concat_pre_ngtsticks(dir_path):
|
__concat_pre_datas(dir_path)
|
|
|
def process_combined_transaction(dir_path):
|
"""
|
处理拼接起来的数据
|
@param dir_path:
|
@return:
|
"""
|
combined_path = os.path.join(dir_path, 'combined.csv')
|
if not os.path.exists(combined_path):
|
print("拼接数据不存在")
|
return
|
df = pd.read_csv(combined_path)
|
df_copy = df.copy()
|
grouped = df_copy.groupby(["SecurityID"])
|
# 应用聚合函数
|
chunk_result = grouped.apply(pd.Series({}))
|
# chunk_result["SecurityID"] = chunk_result["SecurityID"].apply(BigOrderDealParser.code_format)
|
print(chunk_result.to_string(
|
index=False, # 不显示索引
|
justify='left', # 左对齐
|
float_format='%.3f' # 浮点数格式
|
))
|
|
|
__combined_df_cache = {}
|
|
|
def extract_big_order_of_all(dir_path):
|
combined_path = os.path.join(dir_path, 'combined.csv')
|
if not os.path.exists(combined_path):
|
print("拼接数据不存在")
|
return
|
codes = extract_big_order_codes(dir_path)
|
print("总代码数量:", len(codes))
|
for code in codes:
|
extract_big_order_of_code(dir_path, code)
|
|
|
def extract_big_order_of_code(dir_path, code):
|
"""
|
提取代码的大单
|
@param dir_path: 数据目录
|
@param code: 为空表示导出全部
|
@return:
|
"""
|
|
def first_last(group):
|
"""
|
获取第一条数据与最后一条
|
@param group:
|
@return:
|
"""
|
return pd.Series({
|
'SecurityID': group['SecurityID'].iloc[0],
|
'BuyNo': group['BuyNo'].iloc[0],
|
'TotalVolume': group['TotalVolume'].sum(),
|
'TotalAmount': group['TotalAmount'].sum(),
|
'EndTime': group['EndTime'].iloc[-1],
|
'EndPrice': group['EndPrice'].iloc[-1],
|
'StartTime': group['StartTime'].iloc[0],
|
'StartPrice': group['StartPrice'].iloc[0]
|
})
|
|
combined_path = os.path.join(dir_path, 'combined.csv')
|
if not os.path.exists(combined_path):
|
print("拼接数据不存在")
|
return
|
output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
|
if os.path.exists(output_path):
|
print("路径已存在:", output_path)
|
return
|
df = __combined_df_cache.get(combined_path, None)
|
if df is None:
|
df = pd.read_csv(combined_path)
|
__combined_df_cache[combined_path] = df
|
df_copy = df.copy()
|
if code:
|
df_copy = df_copy[df_copy["SecurityID"] == int(code)]
|
if df_copy.empty:
|
print("目标代码对应成交数据为空")
|
return
|
df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
|
# 按SecurityID和BuyNo分组
|
grouped = df_copy.groupby(['SecurityID', 'BuyNo'])
|
grouped_result = grouped.apply(first_last)
|
grouped_result = grouped_result[grouped_result["TotalAmount"] > 500000]
|
# print(grouped_result)
|
# 遍历内容
|
grouped_result.to_csv(output_path, index=False)
|
print(f"保存成功,路径:{output_path}")
|
|
|
def extract_big_order_codes(dir_path):
|
"""
|
导出大单代码
|
@param dir_path: 数据目录
|
@param code:
|
@return:
|
"""
|
combined_path = os.path.join(dir_path, 'combined.csv')
|
if not os.path.exists(combined_path):
|
print("拼接数据不存在")
|
return
|
df = pd.read_csv(combined_path)
|
df_copy = df.copy()
|
if df_copy.empty:
|
print("目标代码对应成交数据为空")
|
return
|
df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
|
# 按SecurityID和BuyNo分组
|
grouped = df_copy.groupby(['SecurityID'])
|
return set(grouped.groups.keys())
|
|
|
if __name__ == "__main__":
|
# pre_process_transactions("E:/测试数据/Transaction_Test.csv")
|
# pre_process_ngtsticks("E:/测试数据/NGTSTick_Test.csv")
|
# concat_pre_transactions("E:/测试数据/Transaction_Test")
|
# extract_big_order_codes("E:/测试数据/Transaction_Test")
|
extract_big_order_of_code("E:/测试数据/Transaction_Test")
|