Administrator
6 天以前 abd510d66074ac640555c241b6343a53cca8f070
l2_data_parser.py
@@ -2,28 +2,414 @@
L2数据解析器
"""
import csv
import os
import sys
import time
from multiprocessing import Pool
import pandas as pd
from data_parser import transaction_big_order_parser
from db import mysql_data_delegate as mysql_data
from huaxin_client.l2_client_test import L2TransactionDataManager
from log_module import log_export
from utils import tool
def parse_order_detail():
    with open('/home/userzjj/ftp/20250123/OrderDetail.csv', 'r', encoding='utf-8') as file:
def __get_target_codes(day):
    special_codes_dict = log_export.load_special_codes(day)
    fcodes = set()
    for plate in special_codes_dict:
        codes = special_codes_dict[plate]
        fcodes |= codes
    return fcodes
def parse_order_detail(day, code):
    target_codes = __get_target_codes(day)
    base_path = f"/home/userzjj/ftp/{day}"
    with open(f"{base_path}/OrderDetail.csv", 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        # 获取表头:: ['ExchangeID', 'SecurityID', 'OrderTime', 'Price', 'Volume', 'Side', 'OrderType', 'MainSeq', 'SubSeq', 'Info1', 'Info2', 'Info3', 'OrderNO', 'OrderStatus', 'BizIndex', 'LocalTimeStamp']
        headers = next(csv_reader)
        print("表头:", headers)
        # 遍历数据行
        max_count = 10
        count = 0
        for row in csv_reader:
            print(row)
            count += 1
            if count > max_count:
                break
        _path = f"{base_path}/OrderDetail_filter.csv"
        with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
            # 创建一个 CSV 写入器对象
            writer = csv.writer(csvfile)
            # 逐行写入数据
            for row in csv_reader:
                if row[1] not in target_codes:
                    continue
                if code and code != row[1]:
                    continue
                # 将文件写入到文本
                writer.writerow(row)
def parse_transaction(day, save=True, parse_all=False):
    target_codes = __get_target_codes(day)
    base_path = f"/home/userzjj/ftp/{day}"
    fdatas = []
    df = pd.read_csv(f"{base_path}/Transaction.csv")
    category_revenue = df.groupby('BuyNo')['TradeVolume'].sum()
    with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        total_lines = csv_reader.line_num
        print("总行数:", total_lines)
        # 获取表头:: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp]
        headers = next(csv_reader)
        print("表头:", headers)
        # 遍历数据行
        _path = f"{base_path}/Transaction_filter.csv"
        percent = 0
        with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
            # 创建一个 CSV 写入器对象
            writer = csv.writer(csvfile)
            # 逐行写入数据
            count = 0
            for row in csv_reader:
                count += 1
                p = count * 100 // total_lines
                if p != percent:
                    percent = p
                    print(f"**进度:{percent}%")
                if row[1] not in target_codes and not parse_all:
                    continue
                # 将文件写入到文本
                if save:
                    writer.writerow(row)
                fdatas.append(row)
    return fdatas
def parse_ngtstick(day, save=True):
    target_codes = __get_target_codes(day)
    base_path = f"/home/userzjj/ftp/{day}"
    fdatas = []
    with open(f"{base_path}/NGTSTick.csv", 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        # 获取表头:: [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp]
        headers = next(csv_reader)
        print("表头:", headers)
        # 遍历数据行
        _path = f"{base_path}/NGTSTick_filter.csv"
        with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
            # 创建一个 CSV 写入器对象
            writer = csv.writer(csvfile)
            # 逐行写入数据
            for row in csv_reader:
                if row[1] not in target_codes:
                    continue
                if save:
                    # 将文件写入到文本
                    writer.writerow(row)
                fdatas.append(row)
    return fdatas
def parse_deal_big_orders(day, big_order_path, target_code=None):
    """
    提取所有成交大单
    @param day:
    @return:
    """
    print("*******开始处理成交大单数据", day, big_order_path, target_code)
    l2_data_manager_dict = {}
    # 解析数据
    __start_time = time.time()
    transaction_data = parse_transaction(day, parse_all=True)
    print("*******Transaction 读取完毕", len(transaction_data), "耗时", int(time.time() - __start_time))
    __start_time = time.time()
    ngtstick_data = parse_ngtstick(day)
    print("*******NGTSTick 读取完毕", len(ngtstick_data), "耗时", int(time.time() - __start_time))
    __start_time = time.time()
    big_order_list = []
    for index, row in enumerate(transaction_data):
        code = row[1]
        if target_code and code != target_code:
            continue
        item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2),
                "TradeVolume": int(row[4]),
                "OrderTime": int(row[2]), "MainSeq": int(row[6]),
                "SubSeq": int(row[7]), "BuyNo": int(row[8]),
                "SellNo": int(row[9]),
                "ExecType": int(row[5])}
        if item["TradePrice"] <= 0:
            continue
        if code not in l2_data_manager_dict:
            l2_data_manager_dict[code] = L2TransactionDataManager(code, True)
        l2_data_manager_dict[code].add_transaction_data(item, big_order_money_threshold=100e4)
        if index % 100 == 0:
            # 读取队列中的数据
            l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
            while not l2_data_manager.big_accurate_buy_order_queue.empty():
                data = l2_data_manager.big_accurate_buy_order_queue.get()
                big_order_list.append((code, 0, data))
            while not l2_data_manager.big_accurate_sell_order_queue.empty():
                data = l2_data_manager.big_accurate_sell_order_queue.get()
                big_order_list.append((code, 1, data))
    print("********Transaction 处理完毕", len(big_order_list), "耗时", int(time.time() - __start_time))
    __start_time = time.time()
    for index, row in enumerate(ngtstick_data):
        code = row[1]
        if target_code and code != target_code:
            continue
        if row[5].strip() != 'T':
            # 过滤非成交数据
            continue
        item = {"SecurityID": row[1], "TradePrice": round(float(row[8].split("@")[0]), 2),
                "TradeVolume": int(row[9]),
                "OrderTime": row[4], "MainSeq": row[2],
                "SubSeq": row[3], "BuyNo": row[6],
                "SellNo": row[7],
                "ExecType": '1'}
        if item["TradePrice"] <= 0:
            continue
        if code not in l2_data_manager_dict:
            l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(code, True)
        l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item)
        if index % 100 == 0:
            # 读取队列中的数据
            l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
            while not l2_data_manager.big_accurate_buy_order_queue.empty():
                data = l2_data_manager.big_accurate_buy_order_queue.get()
                big_order_list.append((code, 0, data))
            while not l2_data_manager.big_accurate_sell_order_queue.empty():
                data = l2_data_manager.big_accurate_sell_order_queue.get()
                big_order_list.append((code, 1, data))
    print("********NGTSTick 处理完毕", len(big_order_list), "耗时", int(time.time() - __start_time))
    __start_time = time.time()
    # 读取剩余的未读数据的代码
    for code in l2_data_manager_dict:
        l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
        while not l2_data_manager.big_accurate_buy_order_queue.empty():
            data = l2_data_manager.big_accurate_buy_order_queue.get()
            big_order_list.append((code, 0, data))
        while not l2_data_manager.big_accurate_sell_order_queue.empty():
            data = l2_data_manager.big_accurate_sell_order_queue.get()
            big_order_list.append((code, 1, data))
    print("********开始写入本地文件:", len(big_order_list), "耗时", int(time.time() - __start_time))
    __start_time = time.time()
    # 开始写入本地文件
    with open(big_order_path, mode='w', encoding='utf-8') as f:
        for order in big_order_list:
            f.write(f"{order}\n")
    print("*******写入本地文件结束:", "耗时", int(time.time() - __start_time))
    __start_time = time.time()
def parse_market_data(day):
    target_codes = __get_target_codes(day)
    base_path = f"/home/userzjj/ftp/{day}"
    with open(f"{base_path}/MarketData.csv", 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        # 获取表头:: [SecurityID,ExchangeID,DataTimeStamp,PreClosePrice,OpenPrice,NumTrades,TotalVolumeTrade,TotalValueTrade,TotalBidVolume,AvgBidPrice,TotalAskVolume,AvgAskPrice,HighestPrice,LowestPrice,LastPrice,BidPrice1,BidVolume1,AskPrice1,AskVolume1....]
        headers = next(csv_reader)
        print("表头:", headers)
        # 遍历数据行
        _path = f"{base_path}/MarketData_filter.csv"
        with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
            # 创建一个 CSV 写入器对象
            writer = csv.writer(csvfile)
            # 逐行写入数据
            for row in csv_reader:
                if row[0] not in target_codes:
                    continue
                # 将文件写入到文本
                writer.writerow(row)
def test1(args):
    index, df = args
    print(index)
def pre_process_transactions(csv_path="E:/测试数据/Transaction_Test.csv"):
    def str_to_float(s):
        try:
            # 移除单位并转换
            return round(float(s.split("@")[0]), 2)
        except:
            return float("nan")
    def code_format(s):
        try:
            code = "{0:0>6}".format(s)
            return code
        except:
            return ''
    # [ExchangeID, SecurityID, TradeTime, TradePrice, TradeVolume, ExecType, MainSeq, SubSeq, BuyNo, SellNo, Info1, Info2,
    #  Info3, TradeBSFlag, BizIndex, LocalTimeStamp]
    # transaction_data = {
    #     "SecurityID": ['300920', '300920', '300920', '300920'],
    #     "TradeTime": [91500040, 91500041, 91500042, 92000040],
    #     "TradePrice": [15.0, 16.2, 15.2, 16.3],
    #     "TradeVolume": [100, 100, 200, 300],
    #     "BuyNo": [0, 1, 1, 1]
    # }
    # 定义聚合函数
    def first_last(group):
        return pd.Series({
            'TotalAmount': group['TradeAmount'].sum(),
            'TotalVolume': group['TradeVolume'].sum(),
            'StartTime': group['TradeTime'].iloc[0],
            'StartPrice': group['TradePrice'].iloc[0],
            'EndTime': group['TradeTime'].iloc[-1],
            'EndPrice': group['TradePrice'].iloc[-1]
        })
    dtype = {
        'SecurityID': 'category',  # 低基数分类数据
    }
    chunk_size = 10000
    # 创建DataFrame
    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
    indexed_data = list(enumerate(chunks))
    # 新写法
    with Pool(processes=4) as pool:
        pool.map(test1, indexed_data)
    result_list = []
    for chunk_index, chunk in enumerate(chunks):
        df = chunk.copy()
        index = chunk_index + 1
        child_path = csv_path.replace(".csv", f"_{index}.csv")
        if os.path.exists(child_path):
            continue
        print(f"处理第{index}批次")
        df["TradePrice"] = df["TradePrice"].apply(str_to_float)
        df["SecurityID"] = df["SecurityID"].apply(code_format)
        df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
        # 计算成交金额
        df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
        # 按SecurityID和BuyNo分组
        grouped = df.groupby(['SecurityID', 'BuyNo'])
        # 应用聚合函数
        chunk_result = grouped.apply(first_last).reset_index()
        chunk_result.to_csv(child_path, index=False)
    print(f"处理完毕,总共{index}批")
def pre_process_ngtstick(csv_path="E:/测试数据/NGTSTick_Test.csv"):
    def str_to_float(s):
        try:
            # 移除单位并转换
            return round(float(s.split("@")[0]), 2)
        except:
            return float("nan")
    def code_format(s):
        try:
            code = "{0:0>6}".format(s)
            return code
        except:
            return ''
    # 定义聚合函数
    def first_last(group):
        return pd.Series({
            'TotalAmount': group['TradeMoney'].sum(),
            'TotalVolume': group['Volume'].sum(),
            'StartTime': group['TickTime'].iloc[0],
            'StartPrice': group['Price'].iloc[0],
            'EndTime': group['TickTime'].iloc[-1],
            'EndPrice': group['Price'].iloc[-1]
        })
    # [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp]
    chunk_size = 10000
    # 创建DataFrame
    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
    result_list = []
    index = 0
    for df in chunks:
        index += 1
        child_path = csv_path.replace(".csv", f"_{index}.csv")
        if os.path.exists(child_path):
            continue
        print(f"处理第{index}批次")
        df = df[df["TickType"] == 'T']
        df["Price"] = df["Price"].apply(str_to_float)
        df["SecurityID"] = df["SecurityID"].apply(code_format)
        df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
        # 计算成交金额
        df['TradeMoney'] = df["TradeMoney"].apply(str_to_float)
        # 按SecurityID和BuyNo分组
        grouped = df.groupby(['SecurityID', 'BuyNo'])
        # 应用聚合函数
        chunk_result = grouped.apply(first_last).reset_index()
        chunk_result.to_csv(child_path, index=False)
    print(f"处理完毕,总共{index}批")
if __name__ == '__main__1':
    # df = pd.read_csv(f"E:/测试数据/Transaction_Test.csv")
    pre_process_transactions()
# 命令模式  /home/userzjj/app/gp-server/l2_data_parser Transaction  2025-05-08
# 解析大单: /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/最终成交数据20250509.txt 000555
if __name__ == '__main__':
    if len(sys.argv) > 1:
        params = sys.argv[1:]
        print("接收的参数")
        print("接收的参数", params)
        _type = params[0].strip()
        day = params[1].strip()
        if len(params) > 2:
            code = params[2].strip()
        else:
            code = None
        if _type == 'OrderDetail':
            parse_order_detail(day, code)
        elif _type == 'Transaction':
            parse_transaction(day)
        elif _type == 'NGTSTick':
            parse_ngtstick(day)
        elif _type == 'MarketData':
            parse_market_data(day)
        elif _type == 'Transaction_New':
            if len(params) > 2:
                process_count = int(params[2].strip())
            else:
                process_count = 4
    parse_order_detail()
            transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv",
                                                                  process_count=process_count)
            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
        elif _type == 'NGTSTick_New':
            if len(params) > 2:
                process_count = int(params[2].strip())
            else:
                process_count = 4
            transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv",
                                                               process_count=process_count)
            transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
        elif _type == 'Transaction_Concat':
            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
        elif _type == 'NGTSTick_Concat':
            transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
        elif _type == 'ExtractDealBigOrder':
            # 命令模式  /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09
            if len(params) > 2:
                process_count = int(params[2].strip())
            else:
                process_count = 10
            transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/NGTSTick",
                                                                  process_count=process_count)
            transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/Transaction",
                                                                  process_count=process_count)