Administrator
2025-05-27 7c6d06136079b73c5676c716c992fdebb23cca7c
data_parser/transaction_big_order_parser.py
@@ -2,6 +2,7 @@
大单成交数据解析器
"""
import os
import re
from multiprocessing import Pool
import pandas as pd
@@ -141,9 +142,10 @@
    """
    combined_path = os.path.join(dir_path, 'combined.csv')
    if os.path.exists(combined_path):
        print("合并的目标文件已存在")
        return
    file_list = os.listdir(dir_path)
    file_list.sort(key=lambda x: int(x.split(".")[0]))
    file_list.sort(key=lambda x: int(re.findall(r'\d+', x)[0]))
    df_list = []
    for file in file_list:
        df = pd.read_csv(os.path.join(dir_path, file))
@@ -151,17 +153,132 @@
            continue
        df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
        df_list.append(df)
    print("准备合并的文件数量:", len(df_list))
    combined_df = pd.concat(df_list, ignore_index=True)
    print("合并完成,准备写入文件!")
    # 保存结果
    combined_df.to_csv(combined_path, index=False)
    print("写入文件完成!")
def concat_pre_transactions(dir_path):
    __concat_pre_datas(dir_path)
def concat_pre_ngtsticks(dir_path):
    __concat_pre_datas(dir_path)
def process_combined_transaction(dir_path):
    """
    处理拼接起来的数据
    @param dir_path:
    @return:
    """
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print("拼接数据不存在")
        return
    df = pd.read_csv(combined_path)
    df_copy = df.copy()
    grouped = df_copy.groupby(["SecurityID"])
    # 应用聚合函数
    chunk_result = grouped.apply(pd.Series({}))
    # chunk_result["SecurityID"] = chunk_result["SecurityID"].apply(BigOrderDealParser.code_format)
    print(chunk_result.to_string(
        index=False,  # 不显示索引
        justify='left',  # 左对齐
        float_format='%.3f'  # 浮点数格式
    ))
def extract_big_order_of_code(dir_path, code=None):
    """
    提取代码的大单
    @param dir_path: 数据目录
    @param code: 为空表示导出全部
    @return:
    """
    def first_last(group):
        """
            获取第一条数据与最后一条
            @param group:
            @return:
            """
        return pd.Series({
            'SecurityID': group['SecurityID'].iloc[0],
            'BuyNo': group['BuyNo'].iloc[0],
            'TotalVolume': group['TotalVolume'].sum(),
            'TotalAmount': group['TotalAmount'].sum(),
            'EndTime': group['EndTime'].iloc[-1],
            'EndPrice': group['EndPrice'].iloc[-1],
            'StartTime': group['StartTime'].iloc[0],
            'StartPrice': group['StartPrice'].iloc[0]
        })
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print("拼接数据不存在")
        return
    df = pd.read_csv(combined_path)
    df_copy = df.copy()
    if code:
        df_copy = df_copy[df_copy["SecurityID"] == int(code)]
    if df_copy.empty:
        print("目标代码对应成交数据为空")
        return
    df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
    # 按SecurityID和BuyNo分组
    grouped = df_copy.groupby(['SecurityID', 'BuyNo'])
    grouped_result = grouped.apply(first_last)
    grouped_result = grouped_result[grouped_result["TotalAmount"] > 500000]
    # print(grouped_result)
    # 遍历内容
    if code:
        grouped_result.to_csv(os.path.join(dir_path, f"big_buy_{code}.csv"), index=False)
    else:
        grouped_result.to_csv(os.path.join(dir_path, f"big_buy.csv"), index=False)
    print("保存成功")
def extract_big_order_codes(dir_path):
    """
    导出大单代码
    @param dir_path: 数据目录
    @param code:
    @return:
    """
    def first_last(group):
        """
            获取第一条数据与最后一条
            @param group:
            @return:
            """
        return pd.Series({
        })
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print("拼接数据不存在")
        return
    df = pd.read_csv(combined_path)
    df_copy = df.copy()
    if df_copy.empty:
        print("目标代码对应成交数据为空")
        return
    df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
    # 按SecurityID和BuyNo分组
    grouped = df_copy.groupby(['SecurityID'])
    return set(grouped.groups.keys())
if __name__ == "__main__":
    # pre_process_transactions("E:/测试数据/Transaction_Test.csv")
    # pre_process_ngtsticks("E:/测试数据/NGTSTick_Test.csv")
    concat_pre_transactions("E:/测试数据/Transaction_Test")
    # concat_pre_transactions("E:/测试数据/Transaction_Test")
    # extract_big_order_codes("E:/测试数据/Transaction_Test")
    extract_big_order_of_code("E:/测试数据/Transaction_Test")