Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
l2_data_parser.py
@@ -5,8 +5,11 @@
import os
import sys
import time
from multiprocessing import Pool
import pandas as pd
from data_parser import transaction_big_order_parser
from db import mysql_data_delegate as mysql_data
from huaxin_client.l2_client_test import L2TransactionDataManager
from log_module import log_export
@@ -225,6 +228,11 @@
                writer.writerow(row)
def test1(args):
    index, df = args
    print(index)
def pre_process_transactions(csv_path="E:/测试数据/Transaction_Test.csv"):
    def str_to_float(s):
        try:
@@ -263,19 +271,25 @@
    dtype = {
        'SecurityID': 'category',  # 低基数分类数据
    }
    chunk_size = 100000
    chunk_size = 10000
    # 创建DataFrame
    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
    indexed_data = list(enumerate(chunks))
    # 新写法
    with Pool(processes=4) as pool:
        pool.map(test1, indexed_data)
    result_list = []
    index = 0
    for df in chunks:
        index += 1
    for chunk_index, chunk in enumerate(chunks):
        df = chunk.copy()
        index = chunk_index + 1
        child_path = csv_path.replace(".csv", f"_{index}.csv")
        if os.path.exists(child_path):
            continue
        print(f"处理第{index}批次")
        df["TradePrice"] = df["TradePrice"].apply(str_to_float)
        df["SecurityID"] = df["SecurityID"].apply(code_format)
        df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
        # 计算成交金额
        df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
@@ -317,7 +331,7 @@
    # [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp]
    chunk_size = 200000
    chunk_size = 10000
    # 创建DataFrame
    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
    result_list = []
@@ -331,6 +345,9 @@
        df = df[df["TickType"] == 'T']
        df["Price"] = df["Price"].apply(str_to_float)
        df["SecurityID"] = df["SecurityID"].apply(code_format)
        df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
        # 计算成交金额
        df['TradeMoney'] = df["TradeMoney"].apply(str_to_float)
        # 按SecurityID和BuyNo分组
@@ -343,7 +360,7 @@
if __name__ == '__main__1':
    # df = pd.read_csv(f"E:/测试数据/Transaction_Test.csv")
    pre_process_ngtstick()
    pre_process_transactions()
# 命令模式  /home/userzjj/app/gp-server/l2_data_parser Transaction  2025-05-08
# 解析大单: /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/最终成交数据20250509.txt 000555
@@ -366,18 +383,33 @@
        elif _type == 'MarketData':
            parse_market_data(day)
        elif _type == 'Transaction_New':
            pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv")
        elif _type == 'NGTSTick_New':
            pre_process_ngtstick(f"/home/userzjj/ftp/{day}/NGTSTick.csv")
        elif _type == 'ExtractDealBigOrder':
            # 提取所有成交的大单
            if len(params) > 2:
                save_path = params[2].strip()
                process_count = int(params[2].strip())
            else:
                save_path = None
                process_count = 4
            if len(params) > 3:
                target_code = params[3].strip()
            transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv",
                                                                  process_count=process_count)
            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
        elif _type == 'NGTSTick_New':
            if len(params) > 2:
                process_count = int(params[2].strip())
            else:
                target_code = None
            parse_deal_big_orders(day, save_path, target_code)
                process_count = 4
            transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv",
                                                               process_count=process_count)
            transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
        elif _type == 'Transaction_Concat':
            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
        elif _type == 'NGTSTick_Concat':
            transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
        elif _type == 'ExtractDealBigOrder':
            # 命令模式  /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09
            if len(params) > 2:
                process_count = int(params[2].strip())
            else:
                process_count = 10
            transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/NGTSTick",
                                                                  process_count=process_count)
            transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/Transaction",
                                                                  process_count=process_count)