"""
|
L2数据解析器
|
"""
|
import csv
|
import os
|
import sys
|
import time
|
|
from db import mysql_data_delegate as mysql_data
|
from huaxin_client.l2_client_test import L2TransactionDataManager
|
from log_module import log_export
|
|
|
def __get_target_codes(day):
|
special_codes_dict = log_export.load_special_codes(day)
|
fcodes = set()
|
for plate in special_codes_dict:
|
codes = special_codes_dict[plate]
|
fcodes |= codes
|
return fcodes
|
|
|
def parse_order_detail(day, code):
|
target_codes = __get_target_codes(day)
|
base_path = f"/home/userzjj/ftp/{day}"
|
with open(f"{base_path}/OrderDetail.csv", 'r', encoding='utf-8') as file:
|
csv_reader = csv.reader(file)
|
# 获取表头:: ['ExchangeID', 'SecurityID', 'OrderTime', 'Price', 'Volume', 'Side', 'OrderType', 'MainSeq', 'SubSeq', 'Info1', 'Info2', 'Info3', 'OrderNO', 'OrderStatus', 'BizIndex', 'LocalTimeStamp']
|
headers = next(csv_reader)
|
print("表头:", headers)
|
# 遍历数据行
|
_path = f"{base_path}/OrderDetail_filter.csv"
|
with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
|
# 创建一个 CSV 写入器对象
|
writer = csv.writer(csvfile)
|
# 逐行写入数据
|
for row in csv_reader:
|
if row[1] not in target_codes:
|
continue
|
if code and code != row[1]:
|
continue
|
# 将文件写入到文本
|
writer.writerow(row)
|
|
|
def parse_transaction(day, save=True):
|
target_codes = __get_target_codes(day)
|
base_path = f"/home/userzjj/ftp/{day}"
|
fdatas = []
|
with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file:
|
csv_reader = csv.reader(file)
|
# 获取表头:: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp]
|
headers = next(csv_reader)
|
print("表头:", headers)
|
# 遍历数据行
|
_path = f"{base_path}/Transaction_filter.csv"
|
with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
|
# 创建一个 CSV 写入器对象
|
writer = csv.writer(csvfile)
|
# 逐行写入数据
|
for row in csv_reader:
|
if row[1] not in target_codes:
|
continue
|
# 将文件写入到文本
|
if save:
|
writer.writerow(row)
|
fdatas.append(row)
|
return fdatas
|
|
|
def parse_ngtstick(day, save=True):
|
target_codes = __get_target_codes(day)
|
base_path = f"/home/userzjj/ftp/{day}"
|
fdatas = []
|
with open(f"{base_path}/NGTSTick.csv", 'r', encoding='utf-8') as file:
|
csv_reader = csv.reader(file)
|
# 获取表头:: [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp]
|
headers = next(csv_reader)
|
print("表头:", headers)
|
# 遍历数据行
|
_path = f"{base_path}/NGTSTick_filter.csv"
|
with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
|
# 创建一个 CSV 写入器对象
|
writer = csv.writer(csvfile)
|
# 逐行写入数据
|
for row in csv_reader:
|
if row[1] not in target_codes:
|
continue
|
if save:
|
# 将文件写入到文本
|
writer.writerow(row)
|
fdatas.append(row)
|
return fdatas
|
|
|
def parse_deal_big_orders(day, big_order_path, target_code=None):
|
"""
|
提取所有成交大单
|
@param day:
|
@return:
|
"""
|
print("开始处理成交大单数据", day, big_order_path, target_code)
|
l2_data_manager_dict = {}
|
# 解析数据
|
__start_time = time.time()
|
transaction_data = parse_transaction(day)
|
print("Transaction 读取完毕", len(transaction_data), "耗时", int(time.time() - __start_time))
|
__start_time = time.time()
|
ngtstick_data = parse_ngtstick(day)
|
print("NGTSTick 读取完毕", len(ngtstick_data), "耗时", int(time.time() - __start_time))
|
__start_time = time.time()
|
big_order_list = []
|
for index, row in enumerate(transaction_data):
|
code = row[1]
|
if target_code and code != target_code:
|
continue
|
item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2),
|
"TradeVolume": int(row[4]),
|
"OrderTime": int(row[2]), "MainSeq": int(row[6]),
|
"SubSeq": int(row[7]), "BuyNo": int(row[8]),
|
"SellNo": int(row[9]),
|
"ExecType": int(row[5])}
|
if item["TradePrice"] <= 0:
|
continue
|
if code not in l2_data_manager_dict:
|
l2_data_manager_dict[code] = L2TransactionDataManager(code, True)
|
l2_data_manager_dict[code].add_transaction_data(item)
|
if index % 100 == 0:
|
# 读取队列中的数据
|
l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
|
while not l2_data_manager.big_accurate_buy_order_queue.empty():
|
data = l2_data_manager.big_accurate_buy_order_queue.get()
|
big_order_list.append((code, 0, data))
|
|
while not l2_data_manager.big_accurate_sell_order_queue.empty():
|
data = l2_data_manager.big_accurate_sell_order_queue.get()
|
big_order_list.append((code, 1, data))
|
print("Transaction 处理完毕", len(big_order_list), "耗时", int(time.time() - __start_time))
|
__start_time = time.time()
|
|
for index, row in enumerate(ngtstick_data):
|
code = row[1]
|
if target_code and code != target_code:
|
continue
|
if row[5].strip() != 'T':
|
# 过滤非成交数据
|
continue
|
|
item = {"SecurityID": row[1], "TradePrice": round(float(row[8].split("@")[0]), 2),
|
"TradeVolume": int(row[9]),
|
"OrderTime": row[4], "MainSeq": row[2],
|
"SubSeq": row[3], "BuyNo": row[6],
|
"SellNo": row[7],
|
"ExecType": '1'}
|
if item["TradePrice"] <= 0:
|
continue
|
|
if code not in l2_data_manager_dict:
|
l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(code, True)
|
l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item)
|
if index % 100 == 0:
|
# 读取队列中的数据
|
l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
|
while not l2_data_manager.big_accurate_buy_order_queue.empty():
|
data = l2_data_manager.big_accurate_buy_order_queue.get()
|
big_order_list.append((code, 0, data))
|
|
while not l2_data_manager.big_accurate_sell_order_queue.empty():
|
data = l2_data_manager.big_accurate_sell_order_queue.get()
|
big_order_list.append((code, 1, data))
|
print("NGTSTick 处理完毕", len(big_order_list), "耗时", int(time.time() - __start_time))
|
__start_time = time.time()
|
# 读取剩余的未读数据的代码
|
for code in l2_data_manager_dict:
|
l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
|
while not l2_data_manager.big_accurate_buy_order_queue.empty():
|
data = l2_data_manager.big_accurate_buy_order_queue.get()
|
big_order_list.append((code, 0, data))
|
|
while not l2_data_manager.big_accurate_sell_order_queue.empty():
|
data = l2_data_manager.big_accurate_sell_order_queue.get()
|
big_order_list.append((code, 1, data))
|
print("开始写入本地文件:", len(big_order_list), "耗时", int(time.time() - __start_time))
|
__start_time = time.time()
|
# 开始写入本地文件
|
with open(big_order_path, mode='w', encoding='utf-8') as f:
|
for order in big_order_list:
|
f.write(f"{order}")
|
print("写入本地文件结束:", "耗时", int(time.time() - __start_time))
|
__start_time = time.time()
|
|
|
def parse_market_data(day):
|
target_codes = __get_target_codes(day)
|
base_path = f"/home/userzjj/ftp/{day}"
|
with open(f"{base_path}/MarketData.csv", 'r', encoding='utf-8') as file:
|
csv_reader = csv.reader(file)
|
# 获取表头:: [SecurityID,ExchangeID,DataTimeStamp,PreClosePrice,OpenPrice,NumTrades,TotalVolumeTrade,TotalValueTrade,TotalBidVolume,AvgBidPrice,TotalAskVolume,AvgAskPrice,HighestPrice,LowestPrice,LastPrice,BidPrice1,BidVolume1,AskPrice1,AskVolume1....]
|
headers = next(csv_reader)
|
print("表头:", headers)
|
# 遍历数据行
|
_path = f"{base_path}/MarketData_filter.csv"
|
with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
|
# 创建一个 CSV 写入器对象
|
writer = csv.writer(csvfile)
|
# 逐行写入数据
|
for row in csv_reader:
|
if row[0] not in target_codes:
|
continue
|
# 将文件写入到文本
|
writer.writerow(row)
|
|
|
# 命令模式 /home/userzjj/app/gp-server/l2_data_parser Transaction 2025-05-08
|
# 解析大单: /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-08 /home/userzjj/temp.txt 000555
|
if __name__ == '__main__':
|
if len(sys.argv) > 1:
|
params = sys.argv[1:]
|
print("接收的参数", params)
|
_type = params[0].strip()
|
day = params[1].strip()
|
if len(params) > 2:
|
code = params[2].strip()
|
else:
|
code = None
|
if _type == 'OrderDetail':
|
parse_order_detail(day, code)
|
elif _type == 'Transaction':
|
parse_transaction(day)
|
elif _type == 'NGTSTick':
|
parse_ngtstick(day)
|
elif _type == 'MarketData':
|
parse_market_data(day)
|
elif _type == 'ExtractDealBigOrder':
|
# 提取所有成交的大单
|
if len(params) > 2:
|
save_path = params[2].strip()
|
else:
|
save_path = None
|
|
if len(params) > 3:
|
target_code = params[3].strip()
|
else:
|
target_code = None
|
parse_deal_big_orders(day, save_path, target_code)
|