Administrator
2025-05-23 3b7e932024a2e4771ce7303576931f52ba39f1a9
l2_data_parser.py
@@ -225,7 +225,7 @@
                writer.writerow(row)
def test(csv_path="E:/测试数据/Transaction_Test.csv"):
def pre_process_transactions(csv_path="E:/测试数据/Transaction_Test.csv"):
    def str_to_float(s):
        try:
            # 移除单位并转换
@@ -270,6 +270,9 @@
    index = 0
    for df in chunks:
        index += 1
        child_path = csv_path.replace(".csv", f"_{index}.csv")
        if os.path.exists(child_path):
            continue
        print(f"处理第{index}批次")
        df["TradePrice"] = df["TradePrice"].apply(str_to_float)
        df["SecurityID"] = df["SecurityID"].apply(code_format)
@@ -281,14 +284,66 @@
        # 应用聚合函数
        chunk_result = grouped.apply(first_last).reset_index()
        chunk_result.to_csv(child_path, index=False)
    print(f"处理完毕,总共{index}批")
def pre_process_ngtstick(csv_path="E:/测试数据/NGTSTick_Test.csv"):
    def str_to_float(s):
        try:
            # 移除单位并转换
            return round(float(s.split("@")[0]), 2)
        except:
            return float("nan")
    def code_format(s):
        try:
            code = "{0:0>6}".format(s)
            return code
        except:
            return ''
    # 定义聚合函数
    def first_last(group):
        return pd.Series({
            'TotalAmount': group['TradeMoney'].sum(),
            'TotalVolume': group['Volume'].sum(),
            'StartTime': group['TickTime'].iloc[0],
            'StartPrice': group['Price'].iloc[0],
            'EndTime': group['TickTime'].iloc[-1],
            'EndPrice': group['Price'].iloc[-1]
        })
    # [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp]
    chunk_size = 200000
    # 创建DataFrame
    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
    result_list = []
    index = 0
    for df in chunks:
        index += 1
        child_path = csv_path.replace(".csv", f"_{index}.csv")
        if os.path.exists(child_path):
            continue
        print(f"处理第{index}批次")
        df = df[df["TickType"] == 'T']
        df["Price"] = df["Price"].apply(str_to_float)
        df["SecurityID"] = df["SecurityID"].apply(code_format)
        # 计算成交金额
        df['TradeMoney'] = df["TradeMoney"].apply(str_to_float)
        # 按SecurityID和BuyNo分组
        grouped = df.groupby(['SecurityID', 'BuyNo'])
        # 应用聚合函数
        chunk_result = grouped.apply(first_last).reset_index()
        chunk_result.to_csv(child_path, index=False)
    print(f"处理完毕,总共{index}批")
if __name__ == '__main__1':
    # df = pd.read_csv(f"E:/测试数据/Transaction_Test.csv")
    test()
    pre_process_ngtstick()
# 命令模式  /home/userzjj/app/gp-server/l2_data_parser Transaction  2025-05-08
# 解析大单: /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/最终成交数据20250509.txt 000555
@@ -311,7 +366,9 @@
        elif _type == 'MarketData':
            parse_market_data(day)
        elif _type == 'Transaction_New':
            test(f"/home/userzjj/ftp/{day}/Transaction.csv")
            pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv")
        elif _type == 'NGTSTick_New':
            pre_process_transactions(f"/home/userzjj/ftp/{day}/NGTSTick.csv")
        elif _type == 'ExtractDealBigOrder':
            # 提取所有成交的大单
            if len(params) > 2: