Administrator
2025-05-09 f3651b3d5b750c26b3f5a46e0a4abfc5401782d0
大单解析器
2个文件已修改
128 ■■■■■ 已修改文件
constant.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_parser.py 126 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -202,7 +202,7 @@
MIN_CODE_RADICAL_BUY_PRICE = 2
# 扫入的自由流通市值区间:[[(自由流通最小值,自由流通最大值),(股价最小值,股价最大值)]]
RADICAL_BUY_ZYLTGB_AS_YI_RANGES = [[(5, 1000), (3, 50)], [(50, 1000), (2, 3)]]
RADICAL_BUY_ZYLTGB_AS_YI_RANGES = [[(50, 1000), (3, 50)], [(50, 1000), (2, 3)]]
# L2数据是否载入完成
L2_DATA_IS_LOADED = False
l2_data_parser.py
@@ -5,6 +5,7 @@
import os
import sys
from db import mysql_data_delegate as mysql_data
from huaxin_client.l2_client_test import L2TransactionDataManager
from log_module import log_export
@@ -14,7 +15,7 @@
    for plate in special_codes_dict:
        codes = special_codes_dict[plate]
        fcodes |= codes
    fcodes
    return fcodes
def parse_order_detail(day, code):
@@ -40,9 +41,10 @@
                writer.writerow(row)
def parse_transaction(day):
def parse_transaction(day, save=True):
    target_codes = __get_target_codes(day)
    base_path = f"/home/userzjj/ftp/{day}"
    fdatas = []
    with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        # 获取表头:: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp]
@@ -58,12 +60,16 @@
                if row[1] not in target_codes:
                    continue
                # 将文件写入到文本
                writer.writerow(row)
                if save:
                    writer.writerow(row)
                fdatas.append(row)
    return fdatas
def parse_ngtstick(day):
def parse_ngtstick(day, save=True):
    target_codes = __get_target_codes(day)
    base_path = f"/home/userzjj/ftp/{day}"
    fdatas = []
    with open(f"{base_path}/NGTSTick.csv", 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        # 获取表头:: [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp]
@@ -78,8 +84,102 @@
            for row in csv_reader:
                if row[1] not in target_codes:
                    continue
                # 将文件写入到文本
                writer.writerow(row)
                if save:
                    # 将文件写入到文本
                    writer.writerow(row)
                fdatas.append(row)
    return fdatas
def parse_deal_big_orders(day, big_order_path, target_code=None):
    """
    提取所有成交大单
    @param day:
    @return:
    """
    print("开始处理数据", day, big_order_path, target_code)
    l2_data_manager_dict = {}
    # 解析数据
    transaction_data = parse_transaction(day)
    print("Transaction 读取完毕", len(transaction_data))
    ngtstick_data = parse_ngtstick(day)
    print("NGTSTick 读取完毕", len(ngtstick_data))
    big_order_list = []
    for index, row in enumerate(transaction_data):
        code = row[1]
        if target_code and code != target_code:
            continue
        item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2),
                "TradeVolume": int(row[4]),
                "OrderTime": int(row[2]), "MainSeq": int(row[6]),
                "SubSeq": int(row[7]), "BuyNo": int(row[8]),
                "SellNo": int(row[9]),
                "ExecType": int(row[5])}
        if item["TradePrice"] <= 0:
            continue
        if code not in l2_data_manager_dict:
            l2_data_manager_dict[code] = L2TransactionDataManager(code, True)
        l2_data_manager_dict[code].add_transaction_data(item)
        if index % 100 == 0:
            # 读取队列中的数据
            l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
            while not l2_data_manager.big_accurate_buy_order_queue.empty():
                data = l2_data_manager.big_accurate_buy_order_queue.get()
                big_order_list.append((code, 0, data))
            while not l2_data_manager.big_accurate_sell_order_queue.empty():
                data = l2_data_manager.big_accurate_sell_order_queue.get()
                big_order_list.append((code, 1, data))
    print("Transaction 处理完毕", len(big_order_list))
    for index, row in enumerate(ngtstick_data):
        code = row[1]
        if target_code and code != target_code:
            continue
        if row[5].strip() != 'T':
            # 过滤非成交数据
            continue
        item = {"SecurityID": row[1], "TradePrice": round(float(row[8].split("@")[0]), 2),
                "TradeVolume": int(row[9]),
                "OrderTime": row[4], "MainSeq": row[2],
                "SubSeq": row[3], "BuyNo": row[6],
                "SellNo": row[7],
                "ExecType": '1'}
        if item["TradePrice"] <= 0:
            continue
        if code not in l2_data_manager_dict:
            l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(code, True)
        l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item)
        if index % 100 == 0:
            # 读取队列中的数据
            l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
            while not l2_data_manager.big_accurate_buy_order_queue.empty():
                data = l2_data_manager.big_accurate_buy_order_queue.get()
                big_order_list.append((code, 0, data))
            while not l2_data_manager.big_accurate_sell_order_queue.empty():
                data = l2_data_manager.big_accurate_sell_order_queue.get()
                big_order_list.append((code, 1, data))
    print("NGTSTick 处理完毕", len(big_order_list))
    # 读取剩余的未读数据的代码
    for code in l2_data_manager_dict:
        l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
        while not l2_data_manager.big_accurate_buy_order_queue.empty():
            data = l2_data_manager.big_accurate_buy_order_queue.get()
            big_order_list.append((code, 0, data))
        while not l2_data_manager.big_accurate_sell_order_queue.empty():
            data = l2_data_manager.big_accurate_sell_order_queue.get()
            big_order_list.append((code, 1, data))
    print("开始写入本地文件:", len(big_order_list))
    # 开始写入本地文件
    with open(big_order_path, mode='w', encoding='utf-8') as f:
        for order in big_order_list:
            f.write(f"{order}")
    print("写入本地文件结束:")
def parse_market_data(day):
@@ -103,6 +203,7 @@
                writer.writerow(row)
# 命令模式  /home/userzjj/app/gp-server/l2_data_parser Transaction  2025-05-08
if __name__ == '__main__':
    if len(sys.argv) > 1:
        params = sys.argv[1:]
@@ -121,3 +222,16 @@
            parse_ngtstick(day)
        elif _type == 'MarketData':
            parse_market_data(day)
        elif _type == 'ExtractDealBigOrder':
            # 提取所有成交的大单
            # TODO
            if len(params) > 2:
                save_path = params[2].strip()
            else:
                save_path = None
            if len(params) > 3:
                target_code = params[3].strip()
            else:
                target_code = None
            parse_deal_big_orders(day, save_path, target_code)