| | |
| | | from utils import tool |
| | | |
| | | |
| | | def print_log(*args): |
| | | print(f"[{tool.get_now_time_str()}]", *args) |
| | | |
| | | |
| | | class BigOrderDealParser: |
| | | |
| | | @classmethod |
| | |
| | | child_path = os.path.join(children_dir, f"{index}.csv") |
| | | if os.path.exists(child_path): |
| | | return |
| | | print(f"处理Transaction第{index}批次", os.getpid()) |
| | | print_log(f"处理Transaction第{index}批次", os.getpid()) |
| | | df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float) |
| | | df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) |
| | | df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] |
| | |
| | | df['TradeAmount'] = df['TradePrice'] * df['TradeVolume'] |
| | | df = df[df["TradeAmount"] > 0] |
| | | # 判断是否为空 |
| | | # print(df.empty) |
| | | # print_log(df.empty) |
| | | # 按SecurityID和BuyNo分组 |
| | | grouped = df.groupby(['SecurityID', 'BuyNo']) |
| | | # 应用聚合函数 |
| | |
| | | child_path = os.path.join(children_dir, f"{index}.csv") |
| | | if os.path.exists(child_path): |
| | | return |
| | | print(f"处理NGTSTick第{index}批次") |
| | | log(f"处理NGTSTick第{index}批次") |
| | | df = df[df["TickType"] == 'T'] |
| | | df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float) |
| | | df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) |
| | |
| | | """ |
| | | combined_path = os.path.join(dir_path, 'combined.csv') |
| | | if os.path.exists(combined_path): |
| | | print("合并的目标文件已存在") |
| | | print_log("合并的目标文件已存在") |
| | | return |
| | | file_list = os.listdir(dir_path) |
| | | file_list.sort(key=lambda x: int(re.findall(r'\d+', x)[0])) |
| | |
| | | continue |
| | | df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) |
| | | df_list.append(df) |
| | | print("准备合并的文件数量:", len(df_list)) |
| | | print_log("准备合并的文件数量:", len(df_list)) |
| | | |
| | | combined_df = pd.concat(df_list, ignore_index=True) |
| | | |
| | | print("合并完成,准备写入文件!") |
| | | print_log("合并完成,准备写入文件!") |
| | | # 保存结果 |
| | | combined_df.to_csv(combined_path, index=False) |
| | | print("写入文件完成!") |
| | | print_log("写入文件完成!") |
| | | |
| | | |
| | | def concat_pre_transactions(dir_path): |
| | |
| | | """ |
| | | combined_path = os.path.join(dir_path, 'combined.csv') |
| | | if not os.path.exists(combined_path): |
| | | print("拼接数据不存在") |
| | | print_log("拼接数据不存在") |
| | | return |
| | | df = pd.read_csv(combined_path) |
| | | df_copy = df.copy() |
| | |
| | | # 应用聚合函数 |
| | | chunk_result = grouped.apply(pd.Series({})) |
| | | # chunk_result["SecurityID"] = chunk_result["SecurityID"].apply(BigOrderDealParser.code_format) |
| | | print(chunk_result.to_string( |
| | | print_log(chunk_result.to_string( |
| | | index=False, # 不显示索引 |
| | | justify='left', # 左对齐 |
| | | float_format='%.3f' # 浮点数格式 |
| | |
| | | def extract_big_order_of_all(dir_path): |
| | | combined_path = os.path.join(dir_path, 'combined.csv') |
| | | if not os.path.exists(combined_path): |
| | | print("拼接数据不存在") |
| | | print_log("拼接数据不存在") |
| | | return |
| | | codes = extract_big_order_codes(dir_path) |
| | | print("总代码数量:", len(codes)) |
| | | print_log("总代码数量:", len(codes)) |
| | | for code in codes: |
| | | extract_big_order_of_code(dir_path, code) |
| | | |
| | |
| | | |
| | | combined_path = os.path.join(dir_path, 'combined.csv') |
| | | if not os.path.exists(combined_path): |
| | | print("拼接数据不存在") |
| | | print_log("拼接数据不存在") |
| | | return |
| | | output_path = os.path.join(dir_path, f"big_buy_{code}.csv") |
| | | if os.path.exists(output_path): |
| | | print("路径已存在:", output_path) |
| | | print_log("路径已存在:", output_path) |
| | | return |
| | | df = __combined_df_cache.get(combined_path) |
| | | if not df: |
| | | df = __combined_df_cache.get(combined_path, None) |
| | | if df is None: |
| | | df = pd.read_csv(combined_path) |
| | | __combined_df_cache[combined_path] = df |
| | | df_copy = df.copy() |
| | | if code: |
| | | df_copy = df_copy[df_copy["SecurityID"] == int(code)] |
| | | if df_copy.empty: |
| | | print("目标代码对应成交数据为空") |
| | | print_log("目标代码对应成交数据为空") |
| | | return |
| | | df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format) |
| | | # 按SecurityID和BuyNo分组 |
| | | grouped = df_copy.groupby(['SecurityID', 'BuyNo']) |
| | | grouped_result = grouped.apply(first_last) |
| | | grouped_result = grouped_result[grouped_result["TotalAmount"] > 500000] |
| | | # print(grouped_result) |
| | | # print_log(grouped_result) |
| | | # 遍历内容 |
| | | grouped_result.to_csv(output_path, index=False) |
| | | print(f"保存成功,路径:{output_path}") |
| | | print_log(f"[{tool.get_now_time_str()}]保存成功,路径:{output_path}") |
| | | |
| | | |
| | | def extract_big_order_codes(dir_path): |
| | |
| | | """ |
| | | combined_path = os.path.join(dir_path, 'combined.csv') |
| | | if not os.path.exists(combined_path): |
| | | print("拼接数据不存在") |
| | | print_log("拼接数据不存在") |
| | | return |
| | | df = pd.read_csv(combined_path) |
| | | df_copy = df.copy() |
| | | if df_copy.empty: |
| | | print("目标代码对应成交数据为空") |
| | | print_log("目标代码对应成交数据为空") |
| | | return |
| | | df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format) |
| | | # 按SecurityID和BuyNo分组 |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | log(1,2,3) |
| | | # pre_process_transactions("E:/测试数据/Transaction_Test.csv") |
| | | # pre_process_ngtsticks("E:/测试数据/NGTSTick_Test.csv") |
| | | # concat_pre_transactions("E:/测试数据/Transaction_Test") |