Administrator
2025-05-26 631f0fbfed1f80d5dd9aab1cf93d9a1ddca455c1
大单解析
1个文件已修改
1个文件已添加
174 ■■■■■ 已修改文件
data_parser/transaction_big_order_parser.py 167 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_parser.py 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_parser/transaction_big_order_parser.py
New file
@@ -0,0 +1,167 @@
"""
大单成交数据解析器
"""
import os
from multiprocessing import Pool
import pandas as pd
from utils import tool
class BigOrderDealParser:
    @classmethod
    def str_to_float(cls, s):
        try:
            # 移除单位并转换
            return round(float(s.split("@")[0]), 2)
        except:
            return float("nan")
    @classmethod
    def code_format(cls, s):
        try:
            code = "{0:0>6}".format(s)
            return code
        except:
            return ''
def __pre_process_transactions_detail(args):
    def first_last(group):
        """
            获取第一条数据与最后一条
            @param group:
            @return:
            """
        return pd.Series({
            'TotalAmount': group['TradeAmount'].sum(),
            'TotalVolume': group['TradeVolume'].sum(),
            'StartTime': group['TradeTime'].iloc[0],
            'StartPrice': group['TradePrice'].iloc[0],
            'EndTime': group['TradeTime'].iloc[-1],
            'EndPrice': group['TradePrice'].iloc[-1]
        })
    chunk_index, chunk_data = args[0]
    csv_path = args[1]
    df = chunk_data.copy()
    index = chunk_index + 1
    children_dir = csv_path.replace(".csv", "")
    if not os.path.exists(children_dir):
        os.makedirs(children_dir)
    child_path = os.path.join(children_dir, f"{index}.csv")
    if os.path.exists(child_path):
        return
    print(f"处理Transaction第{index}批次", os.getpid())
    df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float)
    df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
    df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
    # 计算成交金额
    df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
    df = df[df["TradeAmount"] > 0]
    # 判断是否为空
    # print(df.empty)
    # 按SecurityID和BuyNo分组
    grouped = df.groupby(['SecurityID', 'BuyNo'])
    # 应用聚合函数
    chunk_result = grouped.apply(first_last)
    if not df.empty:
        chunk_result = chunk_result.reset_index()
    chunk_result.to_csv(child_path, index=False)
def pre_process_transactions(csv_path, process_count=4):
    chunk_size = 200000
    # 创建DataFrame
    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
    indexed_data = list(enumerate(chunks))
    args = [(x, csv_path) for x in indexed_data]
    # 新写法
    with Pool(processes=process_count) as pool:
        pool.map(__pre_process_transactions_detail, args)
def __pre_process_ngtsticks(args):
    def first_last(group):
        return pd.Series({
            'TotalAmount': group['TradeMoney'].sum(),
            'TotalVolume': group['Volume'].sum(),
            'StartTime': group['TickTime'].iloc[0],
            'StartPrice': group['Price'].iloc[0],
            'EndTime': group['TickTime'].iloc[-1],
            'EndPrice': group['Price'].iloc[-1]
        })
    chunk_index, chunk_data = args[0]
    csv_path = args[1]
    df = chunk_data.copy()
    index = chunk_index + 1
    children_dir = csv_path.replace(".csv", "")
    if not os.path.exists(children_dir):
        os.makedirs(children_dir)
    child_path = os.path.join(children_dir, f"{index}.csv")
    if os.path.exists(child_path):
        return
    print(f"处理NGTSTick第{index}批次")
    df = df[df["TickType"] == 'T']
    df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float)
    df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
    df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
    # 计算成交金额
    df['TradeMoney'] = df["TradeMoney"].apply(BigOrderDealParser.str_to_float)
    # 按SecurityID和BuyNo分组
    grouped = df.groupby(['SecurityID', 'BuyNo'])
    # 应用聚合函数
    chunk_result = grouped.apply(first_last)
    if not df.empty:
        chunk_result = chunk_result.reset_index()
    chunk_result.to_csv(child_path, index=False)
def pre_process_ngtsticks(csv_path, process_count=4):
    chunk_size = 200000
    # 创建DataFrame
    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
    indexed_data = list(enumerate(chunks))
    args = [(x, csv_path) for x in indexed_data]
    # 新写法
    with Pool(processes=process_count) as pool:
        pool.map(__pre_process_ngtsticks, args)
def __concat_pre_datas(dir_path):
    """
    拼接数据
    @param dir_path:
    @return:
    """
    combined_path = os.path.join(dir_path, 'combined.csv')
    if os.path.exists(combined_path):
        return
    file_list = os.listdir(dir_path)
    file_list.sort(key=lambda x: int(x.split(".")[0]))
    df_list = []
    for file in file_list:
        df = pd.read_csv(os.path.join(dir_path, file))
        if df.empty:
            continue
        df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
        df_list.append(df)
    combined_df = pd.concat(df_list, ignore_index=True)
    # 保存结果
    combined_df.to_csv(combined_path, index=False)
def concat_pre_transactions(dir_path):
    __concat_pre_datas(dir_path)
if __name__ == "__main__":
    # pre_process_transactions("E:/测试数据/Transaction_Test.csv")
    # pre_process_ngtsticks("E:/测试数据/NGTSTick_Test.csv")
    concat_pre_transactions("E:/测试数据/Transaction_Test")
l2_data_parser.py
@@ -9,6 +9,7 @@
import pandas as pd
from data_parser import transaction_big_order_parser
from db import mysql_data_delegate as mysql_data
from huaxin_client.l2_client_test import L2TransactionDataManager
from log_module import log_export
@@ -383,9 +384,11 @@
        elif _type == 'MarketData':
            parse_market_data(day)
        elif _type == 'Transaction_New':
            pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv")
            transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv")
            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
        elif _type == 'NGTSTick_New':
            pre_process_ngtstick(f"/home/userzjj/ftp/{day}/NGTSTick.csv")
            transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv")
            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/NGTSTick")
        elif _type == 'ExtractDealBigOrder':
            # 提取所有成交的大单
            if len(params) > 2: