| | |
| | | import os |
| | | import sys |
| | | import time |
| | | from multiprocessing import Pool |
| | | |
| | | import pandas as pd |
| | | |
| | | from data_parser import transaction_big_order_parser |
| | | from db import mysql_data_delegate as mysql_data |
| | | from huaxin_client.l2_client_test import L2TransactionDataManager |
| | | from log_module import log_export |
| | | from utils import tool |
| | | |
| | | |
| | | def __get_target_codes(day): |
| | |
| | | writer.writerow(row) |
| | | |
| | | |
| | | def parse_transaction(day, save=True): |
| | | def parse_transaction(day, save=True, parse_all=False): |
| | | target_codes = __get_target_codes(day) |
| | | base_path = f"/home/userzjj/ftp/{day}" |
| | | fdatas = [] |
| | | df = pd.read_csv(f"{base_path}/Transaction.csv") |
| | | category_revenue = df.groupby('BuyNo')['TradeVolume'].sum() |
| | | |
| | | with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file: |
| | | csv_reader = csv.reader(file) |
| | | total_lines = csv_reader.line_num |
| | | print("总行数:", total_lines) |
| | | # 获取表头:: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp] |
| | | headers = next(csv_reader) |
| | | print("表头:", headers) |
| | | # 遍历数据行 |
| | | _path = f"{base_path}/Transaction_filter.csv" |
| | | percent = 0 |
| | | with open(_path, 'w', newline='', encoding='utf-8') as csvfile: |
| | | # 创建一个 CSV 写入器对象 |
| | | writer = csv.writer(csvfile) |
| | | # 逐行写入数据 |
| | | count = 0 |
| | | for row in csv_reader: |
| | | if row[1] not in target_codes: |
| | | count += 1 |
| | | p = count * 100 // total_lines |
| | | if p != percent: |
| | | percent = p |
| | | print(f"**进度:{percent}%") |
| | | if row[1] not in target_codes and not parse_all: |
| | | continue |
| | | # 将文件写入到文本 |
| | | if save: |
| | |
| | | l2_data_manager_dict = {} |
| | | # 解析数据 |
| | | __start_time = time.time() |
| | | transaction_data = parse_transaction(day) |
| | | transaction_data = parse_transaction(day, parse_all=True) |
| | | print("*******Transaction 读取完毕", len(transaction_data), "耗时", int(time.time() - __start_time)) |
| | | __start_time = time.time() |
| | | ngtstick_data = parse_ngtstick(day) |
| | |
| | | writer.writerow(row) |
| | | |
| | | |
| | | def test1(args): |
| | | index, df = args |
| | | print(index) |
| | | |
| | | |
| | | def pre_process_transactions(csv_path="E:/测试数据/Transaction_Test.csv"): |
| | | def str_to_float(s): |
| | | try: |
| | | # 移除单位并转换 |
| | | return round(float(s.split("@")[0]), 2) |
| | | except: |
| | | return float("nan") |
| | | |
| | | def code_format(s): |
| | | try: |
| | | code = "{0:0>6}".format(s) |
| | | return code |
| | | except: |
| | | return '' |
| | | |
| | | # [ExchangeID, SecurityID, TradeTime, TradePrice, TradeVolume, ExecType, MainSeq, SubSeq, BuyNo, SellNo, Info1, Info2, |
| | | # Info3, TradeBSFlag, BizIndex, LocalTimeStamp] |
| | | # transaction_data = { |
| | | # "SecurityID": ['300920', '300920', '300920', '300920'], |
| | | # "TradeTime": [91500040, 91500041, 91500042, 92000040], |
| | | # "TradePrice": [15.0, 16.2, 15.2, 16.3], |
| | | # "TradeVolume": [100, 100, 200, 300], |
| | | # "BuyNo": [0, 1, 1, 1] |
| | | # } |
| | | # 定义聚合函数 |
| | | def first_last(group): |
| | | return pd.Series({ |
| | | 'TotalAmount': group['TradeAmount'].sum(), |
| | | 'TotalVolume': group['TradeVolume'].sum(), |
| | | 'StartTime': group['TradeTime'].iloc[0], |
| | | 'StartPrice': group['TradePrice'].iloc[0], |
| | | 'EndTime': group['TradeTime'].iloc[-1], |
| | | 'EndPrice': group['TradePrice'].iloc[-1] |
| | | }) |
| | | |
| | | dtype = { |
| | | 'SecurityID': 'category', # 低基数分类数据 |
| | | } |
| | | chunk_size = 10000 |
| | | # 创建DataFrame |
| | | chunks = pd.read_csv(csv_path, chunksize=chunk_size) |
| | | indexed_data = list(enumerate(chunks)) |
| | | # 新写法 |
| | | with Pool(processes=4) as pool: |
| | | pool.map(test1, indexed_data) |
| | | |
| | | result_list = [] |
| | | for chunk_index, chunk in enumerate(chunks): |
| | | df = chunk.copy() |
| | | index = chunk_index + 1 |
| | | child_path = csv_path.replace(".csv", f"_{index}.csv") |
| | | if os.path.exists(child_path): |
| | | continue |
| | | print(f"处理第{index}批次") |
| | | df["TradePrice"] = df["TradePrice"].apply(str_to_float) |
| | | df["SecurityID"] = df["SecurityID"].apply(code_format) |
| | | df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] |
| | | # 计算成交金额 |
| | | df['TradeAmount'] = df['TradePrice'] * df['TradeVolume'] |
| | | |
| | | # 按SecurityID和BuyNo分组 |
| | | grouped = df.groupby(['SecurityID', 'BuyNo']) |
| | | |
| | | # 应用聚合函数 |
| | | chunk_result = grouped.apply(first_last).reset_index() |
| | | |
| | | chunk_result.to_csv(child_path, index=False) |
| | | print(f"处理完毕,总共{index}批") |
| | | |
| | | |
| | | def pre_process_ngtstick(csv_path="E:/测试数据/NGTSTick_Test.csv"): |
| | | def str_to_float(s): |
| | | try: |
| | | # 移除单位并转换 |
| | | return round(float(s.split("@")[0]), 2) |
| | | except: |
| | | return float("nan") |
| | | |
| | | def code_format(s): |
| | | try: |
| | | code = "{0:0>6}".format(s) |
| | | return code |
| | | except: |
| | | return '' |
| | | |
| | | # 定义聚合函数 |
| | | def first_last(group): |
| | | return pd.Series({ |
| | | 'TotalAmount': group['TradeMoney'].sum(), |
| | | 'TotalVolume': group['Volume'].sum(), |
| | | 'StartTime': group['TickTime'].iloc[0], |
| | | 'StartPrice': group['Price'].iloc[0], |
| | | 'EndTime': group['TickTime'].iloc[-1], |
| | | 'EndPrice': group['Price'].iloc[-1] |
| | | }) |
| | | |
| | | # [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp] |
| | | |
| | | chunk_size = 10000 |
| | | # 创建DataFrame |
| | | chunks = pd.read_csv(csv_path, chunksize=chunk_size) |
| | | result_list = [] |
| | | index = 0 |
| | | for df in chunks: |
| | | index += 1 |
| | | child_path = csv_path.replace(".csv", f"_{index}.csv") |
| | | if os.path.exists(child_path): |
| | | continue |
| | | print(f"处理第{index}批次") |
| | | df = df[df["TickType"] == 'T'] |
| | | df["Price"] = df["Price"].apply(str_to_float) |
| | | df["SecurityID"] = df["SecurityID"].apply(code_format) |
| | | |
| | | df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] |
| | | |
| | | # 计算成交金额 |
| | | df['TradeMoney'] = df["TradeMoney"].apply(str_to_float) |
| | | # 按SecurityID和BuyNo分组 |
| | | grouped = df.groupby(['SecurityID', 'BuyNo']) |
| | | # 应用聚合函数 |
| | | chunk_result = grouped.apply(first_last).reset_index() |
| | | chunk_result.to_csv(child_path, index=False) |
| | | print(f"处理完毕,总共{index}批") |
| | | |
| | | |
| | | if __name__ == '__main__1': |
| | | # df = pd.read_csv(f"E:/测试数据/Transaction_Test.csv") |
| | | pre_process_transactions() |
| | | |
| | | # 命令模式 /home/userzjj/app/gp-server/l2_data_parser Transaction 2025-05-08 |
| | | # 解析大单: /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/最终成交数据20250509.txt 000555 |
| | | if __name__ == '__main__': |
| | |
| | | parse_ngtstick(day) |
| | | elif _type == 'MarketData': |
| | | parse_market_data(day) |
| | | elif _type == 'ExtractDealBigOrder': |
| | | # 提取所有成交的大单 |
| | | elif _type == 'Transaction_New': |
| | | if len(params) > 2: |
| | | save_path = params[2].strip() |
| | | process_count = int(params[2].strip()) |
| | | else: |
| | | save_path = None |
| | | process_count = 4 |
| | | |
| | | if len(params) > 3: |
| | | target_code = params[3].strip() |
| | | transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv", |
| | | process_count=process_count) |
| | | transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction") |
| | | elif _type == 'NGTSTick_New': |
| | | if len(params) > 2: |
| | | process_count = int(params[2].strip()) |
| | | else: |
| | | target_code = None |
| | | parse_deal_big_orders(day, save_path, target_code) |
| | | process_count = 4 |
| | | transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv", |
| | | process_count=process_count) |
| | | transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick") |
| | | elif _type == 'Transaction_Concat': |
| | | transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction") |
| | | elif _type == 'NGTSTick_Concat': |
| | | transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick") |
| | | elif _type == 'ExtractDealBigOrder': |
| | | # 命令模式 /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 |
| | | # 根据code提取大单 |
| | | if not code: |
| | | transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/NGTSTick") |
| | | transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/Transaction") |
| | | else: |
| | | if tool.is_sh_code(code): |
| | | transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/NGTSTick", code) |
| | | else: |
| | | transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/Transaction", code) |
| | | |