Administrator
2025-05-28 e3f4cf3bacac6eda31ddea6aaf70ebb883788817
data_parser/transaction_big_order_parser.py
@@ -10,6 +10,10 @@
from utils import tool
def print_log(*args):
    print(f"[{tool.get_now_time_str()}]", *args)
class BigOrderDealParser:
    @classmethod
@@ -55,7 +59,7 @@
    child_path = os.path.join(children_dir, f"{index}.csv")
    if os.path.exists(child_path):
        return
    print(f"处理Transaction第{index}批次", os.getpid())
    print_log(f"处理Transaction第{index}批次", os.getpid())
    df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float)
    df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
    df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
@@ -63,7 +67,7 @@
    df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
    df = df[df["TradeAmount"] > 0]
    # 判断是否为空
    # print(df.empty)
    # print_log(df.empty)
    # 按SecurityID和BuyNo分组
    grouped = df.groupby(['SecurityID', 'BuyNo'])
    # 应用聚合函数
@@ -106,7 +110,7 @@
    child_path = os.path.join(children_dir, f"{index}.csv")
    if os.path.exists(child_path):
        return
    print(f"处理NGTSTick第{index}批次")
    log(f"处理NGTSTick第{index}批次")
    df = df[df["TickType"] == 'T']
    df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float)
    df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
@@ -142,7 +146,7 @@
    """
    combined_path = os.path.join(dir_path, 'combined.csv')
    if os.path.exists(combined_path):
        print("合并的目标文件已存在")
        print_log("合并的目标文件已存在")
        return
    file_list = os.listdir(dir_path)
    file_list.sort(key=lambda x: int(re.findall(r'\d+', x)[0]))
@@ -153,14 +157,14 @@
            continue
        df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
        df_list.append(df)
    print("准备合并的文件数量:", len(df_list))
    print_log("准备合并的文件数量:", len(df_list))
    combined_df = pd.concat(df_list, ignore_index=True)
    print("合并完成,准备写入文件!")
    print_log("合并完成,准备写入文件!")
    # 保存结果
    combined_df.to_csv(combined_path, index=False)
    print("写入文件完成!")
    print_log("写入文件完成!")
def concat_pre_transactions(dir_path):
@@ -179,7 +183,7 @@
    """
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print("拼接数据不存在")
        print_log("拼接数据不存在")
        return
    df = pd.read_csv(combined_path)
    df_copy = df.copy()
@@ -187,7 +191,7 @@
    # 应用聚合函数
    chunk_result = grouped.apply(pd.Series({}))
    # chunk_result["SecurityID"] = chunk_result["SecurityID"].apply(BigOrderDealParser.code_format)
    print(chunk_result.to_string(
    print_log(chunk_result.to_string(
        index=False,  # 不显示索引
        justify='left',  # 左对齐
        float_format='%.3f'  # 浮点数格式
@@ -197,14 +201,35 @@
__combined_df_cache = {}
def extract_big_order_of_code(dir_path, code=None):
    """
    提取代码的大单
    @param dir_path: 数据目录
    @param code: 为空表示导出全部
    @return:
    """
def extract_big_order_of_all(dir_path, process_count=4):
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print_log("拼接数据不存在")
        return
    codes = extract_big_order_codes(dir_path)
    print_log("总代码数量:", len(codes))
    for code in codes:
        extract_big_order_of_code(dir_path, code)
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print_log("拼接数据不存在")
        return
    output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
    if os.path.exists(output_path):
        print_log("路径已存在:", output_path)
        return
    df = __combined_df_cache.get(combined_path, None)
    if df is None:
        df = pd.read_csv(combined_path)
        __combined_df_cache[combined_path] = df
    args = [(code, df) for code in codes]
    # 新写法
    with Pool(processes=process_count) as pool:
        pool.map(__extract_big_order_of_code, args)
def __extract_big_order_of_code(args):
    def first_last(group):
        """
            获取第一条数据与最后一条
@@ -222,43 +247,46 @@
            'StartPrice': group['StartPrice'].iloc[0]
        })
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print("拼接数据不存在")
        return
    if not code:
        codes = extract_big_order_codes(dir_path)
        print("总代码数量:", len(codes))
        for code in codes:
            extract_big_order_of_code(dir_path, code)
    if code:
        output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
    else:
        output_path = os.path.join(dir_path, f"big_buy.csv")
    dir_path, code, df = args[0], args[1], args[2]
    output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
    if os.path.exists(output_path):
        print("路径已存在:", output_path)
        print_log("路径已存在:", output_path)
        return
    df = __combined_df_cache.get(combined_path)
    if not df:
        df = pd.read_csv(combined_path)
        __combined_df_cache[combined_path] = df
    df_copy = df.copy()
    if code:
        df_copy = df_copy[df_copy["SecurityID"] == int(code)]
    if df_copy.empty:
        print("目标代码对应成交数据为空")
        print_log("目标代码对应成交数据为空")
        return
    df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
    # 按SecurityID和BuyNo分组
    grouped = df_copy.groupby(['SecurityID', 'BuyNo'])
    grouped_result = grouped.apply(first_last)
    grouped_result = grouped_result[grouped_result["TotalAmount"] > 500000]
    # print(grouped_result)
    # print_log(grouped_result)
    # 遍历内容
    grouped_result.to_csv(output_path, index=False)
    print(f"保存成功,路径:{output_path}")
    print_log(f"[{tool.get_now_time_str()}]保存成功,路径:{output_path}")
def extract_big_order_of_code(dir_path, code):
    """
    提取代码的大单
    @param dir_path: 数据目录
    @param code: 为空表示导出全部
    @return:
    """
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print_log("拼接数据不存在")
        return
    df = __combined_df_cache.get(combined_path, None)
    if df is None:
        df = pd.read_csv(combined_path)
        __combined_df_cache[combined_path] = df
    __extract_big_order_of_code((dir_path, code, df))
def extract_big_order_codes(dir_path):
@@ -270,12 +298,12 @@
    """
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print("拼接数据不存在")
        print_log("拼接数据不存在")
        return
    df = pd.read_csv(combined_path)
    df_copy = df.copy()
    if df_copy.empty:
        print("目标代码对应成交数据为空")
        print_log("目标代码对应成交数据为空")
        return
    df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format)
    # 按SecurityID和BuyNo分组
@@ -284,8 +312,9 @@
if __name__ == "__main__":
    print_log(1, 2, 3)
    # pre_process_transactions("E:/测试数据/Transaction_Test.csv")
    # pre_process_ngtsticks("E:/测试数据/NGTSTick_Test.csv")
    # concat_pre_transactions("E:/测试数据/Transaction_Test")
    # extract_big_order_codes("E:/测试数据/Transaction_Test")
    extract_big_order_of_code("E:/测试数据/Transaction_Test")
    extract_big_order_of_all("E:/测试数据/Transaction_Test")