Administrator
2025-05-23 638865a032eb403bdf61a5808acb579a25ec7be1
L2成交数据解析
1个文件已修改
77 ■■■■ 已修改文件
l2_data_parser.py 77 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_parser.py
@@ -10,6 +10,7 @@
from db import mysql_data_delegate as mysql_data
from huaxin_client.l2_client_test import L2TransactionDataManager
from log_module import log_export
from utils import tool
def __get_target_codes(day):
@@ -50,13 +51,6 @@
    fdatas = []
    df = pd.read_csv(f"{base_path}/Transaction.csv")
    category_revenue = df.groupby('BuyNo')['TradeVolume'].sum()
    total_count = 0
    for chunk in chunks:
        total_count += len(chunk)
    for chunk in chunks:
        total_count += len(chunk)
    with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
@@ -231,9 +225,74 @@
                writer.writerow(row)
def test(csv_path="E:/测试数据/Transaction_Test.csv"):
    def str_to_float(s):
        try:
            # 移除单位并转换
            return round(float(s.split("@")[0]), 2)
        except:
            return float("nan")
    def code_format(s):
        try:
            code = "{0:0>6}".format(s)
            return code
        except:
            return ''
    # [ExchangeID, SecurityID, TradeTime, TradePrice, TradeVolume, ExecType, MainSeq, SubSeq, BuyNo, SellNo, Info1, Info2,
    #  Info3, TradeBSFlag, BizIndex, LocalTimeStamp]
    # transaction_data = {
    #     "SecurityID": ['300920', '300920', '300920', '300920'],
    #     "TradeTime": [91500040, 91500041, 91500042, 92000040],
    #     "TradePrice": [15.0, 16.2, 15.2, 16.3],
    #     "TradeVolume": [100, 100, 200, 300],
    #     "BuyNo": [0, 1, 1, 1]
    # }
    # 定义聚合函数
    def first_last(group):
        return pd.Series({
            'TotalAmount': group['TradeAmount'].sum(),
            'TotalVolume': group['TradeVolume'].sum(),
            'StartTime': group['TradeTime'].iloc[0],
            'StartPrice': group['TradePrice'].iloc[0],
            'EndTime': group['TradeTime'].iloc[-1],
            'EndPrice': group['TradePrice'].iloc[-1]
        })
    dtype = {
        'SecurityID': 'category',  # 低基数分类数据
    }
    chunk_size = 100000
    # 创建DataFrame
    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
    result_list = []
    index = 0
    for df in chunks:
        index += 1
        print(f"处理第{index}批次")
        df["TradePrice"] = df["TradePrice"].apply(str_to_float)
        df["SecurityID"] = df["SecurityID"].apply(code_format)
        # 计算成交金额
        df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
        # 按SecurityID和BuyNo分组
        grouped = df.groupby(['SecurityID', 'BuyNo'])
        # 应用聚合函数
        chunk_result = grouped.apply(first_last).reset_index()
        child_path = csv_path.replace(".csv", f"_{index}.csv")
        chunk_result.to_csv(child_path, index=False)
    print(f"处理完毕,总共{index}批")
if __name__ == '__main__':
    # df = pd.read_csv(f"E:/测试数据/Transaction_Test.csv")
    test()
# 命令模式  /home/userzjj/app/gp-server/l2_data_parser Transaction  2025-05-08
# 解析大单: /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/最终成交数据20250509.txt 000555
if __name__ == '__main__':
if __name__ == '__main__1':
    if len(sys.argv) > 1:
        params = sys.argv[1:]
        print("接收的参数", params)
@@ -251,6 +310,8 @@
            parse_ngtstick(day)
        elif _type == 'MarketData':
            parse_market_data(day)
        elif _type == 'Transaction_New':
            test(f"/home/userzjj/ftp/{day}/Transaction.csv")
        elif _type == 'ExtractDealBigOrder':
            # 提取所有成交的大单
            if len(params) > 2: