From f3651b3d5b750c26b3f5a46e0a4abfc5401782d0 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 09 五月 2025 17:21:33 +0800 Subject: [PATCH] 大单解析器 --- l2_data_parser.py | 126 ++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 120 insertions(+), 6 deletions(-) diff --git a/l2_data_parser.py b/l2_data_parser.py index b93a0e2..0f2b715 100644 --- a/l2_data_parser.py +++ b/l2_data_parser.py @@ -5,6 +5,7 @@ import os import sys from db import mysql_data_delegate as mysql_data +from huaxin_client.l2_client_test import L2TransactionDataManager from log_module import log_export @@ -14,7 +15,7 @@ for plate in special_codes_dict: codes = special_codes_dict[plate] fcodes |= codes - fcodes + return fcodes def parse_order_detail(day, code): @@ -40,9 +41,10 @@ writer.writerow(row) -def parse_transaction(day): +def parse_transaction(day, save=True): target_codes = __get_target_codes(day) base_path = f"/home/userzjj/ftp/{day}" + fdatas = [] with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp] @@ -58,12 +60,16 @@ if row[1] not in target_codes: continue # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰 - writer.writerow(row) + if save: + writer.writerow(row) + fdatas.append(row) + return fdatas -def parse_ngtstick(day): +def parse_ngtstick(day, save=True): target_codes = __get_target_codes(day) base_path = f"/home/userzjj/ftp/{day}" + fdatas = [] with open(f"{base_path}/NGTSTick.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp] @@ -78,8 +84,102 @@ for row in csv_reader: if row[1] not in target_codes: continue - # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰 - writer.writerow(row) + if save: + # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰 + writer.writerow(row) + fdatas.append(row) + return fdatas + + +def parse_deal_big_orders(day, big_order_path, target_code=None): + """ + 鎻愬彇鎵�鏈夋垚浜ゅぇ鍗� + @param day: + @return: + """ + print("寮�濮嬪鐞嗘暟鎹�", day, big_order_path, target_code) + l2_data_manager_dict = {} + # 瑙f瀽鏁版嵁 + transaction_data = parse_transaction(day) + print("Transaction 璇诲彇瀹屾瘯", len(transaction_data)) + ngtstick_data = parse_ngtstick(day) + print("NGTSTick 璇诲彇瀹屾瘯", len(ngtstick_data)) + big_order_list = [] + for index, row in enumerate(transaction_data): + code = row[1] + if target_code and code != target_code: + continue + item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2), + "TradeVolume": int(row[4]), + "OrderTime": int(row[2]), "MainSeq": int(row[6]), + "SubSeq": int(row[7]), "BuyNo": int(row[8]), + "SellNo": int(row[9]), + "ExecType": int(row[5])} + if item["TradePrice"] <= 0: + continue + if code not in l2_data_manager_dict: + l2_data_manager_dict[code] = L2TransactionDataManager(code, True) + l2_data_manager_dict[code].add_transaction_data(item) + if index % 100 == 0: + # 璇诲彇闃熷垪涓殑鏁版嵁 + l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] + while not l2_data_manager.big_accurate_buy_order_queue.empty(): + data = l2_data_manager.big_accurate_buy_order_queue.get() + big_order_list.append((code, 0, data)) + + while not l2_data_manager.big_accurate_sell_order_queue.empty(): + data = l2_data_manager.big_accurate_sell_order_queue.get() + big_order_list.append((code, 1, data)) + print("Transaction 澶勭悊瀹屾瘯", len(big_order_list)) + + for index, row in enumerate(ngtstick_data): + code = row[1] + if target_code and code != target_code: + continue + if row[5].strip() != 'T': + # 杩囨护闈炴垚浜ゆ暟鎹� + continue + + item = {"SecurityID": row[1], "TradePrice": round(float(row[8].split("@")[0]), 2), + "TradeVolume": int(row[9]), + "OrderTime": row[4], "MainSeq": row[2], + "SubSeq": row[3], "BuyNo": row[6], + "SellNo": row[7], + "ExecType": '1'} + if item["TradePrice"] <= 0: + continue + + if code not in l2_data_manager_dict: + l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(code, True) + l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item) + if index % 100 == 0: + # 璇诲彇闃熷垪涓殑鏁版嵁 + l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] + while not l2_data_manager.big_accurate_buy_order_queue.empty(): + data = l2_data_manager.big_accurate_buy_order_queue.get() + big_order_list.append((code, 0, data)) + + while not l2_data_manager.big_accurate_sell_order_queue.empty(): + data = l2_data_manager.big_accurate_sell_order_queue.get() + big_order_list.append((code, 1, data)) + print("NGTSTick 澶勭悊瀹屾瘯", len(big_order_list)) + # 璇诲彇鍓╀綑鐨勬湭璇绘暟鎹殑浠g爜 + for code in l2_data_manager_dict: + l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] + while not l2_data_manager.big_accurate_buy_order_queue.empty(): + data = l2_data_manager.big_accurate_buy_order_queue.get() + big_order_list.append((code, 0, data)) + + while not l2_data_manager.big_accurate_sell_order_queue.empty(): + data = l2_data_manager.big_accurate_sell_order_queue.get() + big_order_list.append((code, 1, data)) + print("寮�濮嬪啓鍏ユ湰鍦版枃浠讹細", len(big_order_list)) + # 寮�濮嬪啓鍏ユ湰鍦版枃浠� + with open(big_order_path, mode='w', encoding='utf-8') as f: + for order in big_order_list: + f.write(f"{order}") + print("鍐欏叆鏈湴鏂囦欢缁撴潫锛�") + def parse_market_data(day): @@ -103,6 +203,7 @@ writer.writerow(row) +# 鍛戒护妯″紡 /home/userzjj/app/gp-server/l2_data_parser Transaction 2025-05-08 if __name__ == '__main__': if len(sys.argv) > 1: params = sys.argv[1:] @@ -121,3 +222,16 @@ parse_ngtstick(day) elif _type == 'MarketData': parse_market_data(day) + elif _type == 'ExtractDealBigOrder': + # 鎻愬彇鎵�鏈夋垚浜ょ殑澶у崟 + # TODO + if len(params) > 2: + save_path = params[2].strip() + else: + save_path = None + + if len(params) > 3: + target_code = params[3].strip() + else: + target_code = None + parse_deal_big_orders(day, save_path, target_code) -- Gitblit v1.8.0