From 771db1d7737e55e36a5dd8da00183a6a7afc8785 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 14 五月 2025 14:13:00 +0800 Subject: [PATCH] bug修复 --- l2_data_parser.py | 143 ++++++++++++++++++++++++++++++++++++++++++++--- 1 files changed, 134 insertions(+), 9 deletions(-) diff --git a/l2_data_parser.py b/l2_data_parser.py index 00496e8..69be377 100644 --- a/l2_data_parser.py +++ b/l2_data_parser.py @@ -4,13 +4,20 @@ import csv import os import sys +import time + from db import mysql_data_delegate as mysql_data +from huaxin_client.l2_client_test import L2TransactionDataManager +from log_module import log_export def __get_target_codes(day): - results = mysql_data.Mysqldb().select_all(f"select _code from kpl_limit_up_record where _day='{day}'") - codes = set([x[0] for x in results]) - return codes + special_codes_dict = log_export.load_special_codes(day) + fcodes = set() + for plate in special_codes_dict: + codes = special_codes_dict[plate] + fcodes |= codes + return fcodes def parse_order_detail(day, code): @@ -36,9 +43,10 @@ writer.writerow(row) -def parse_transaction(day): +def parse_transaction(day, save=True): target_codes = __get_target_codes(day) base_path = f"/home/userzjj/ftp/{day}" + fdatas = [] with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp] @@ -54,12 +62,16 @@ if row[1] not in target_codes: continue # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰 - writer.writerow(row) + if save: + writer.writerow(row) + fdatas.append(row) + return fdatas -def parse_ngtstick(day): +def parse_ngtstick(day, save=True): target_codes = __get_target_codes(day) base_path = f"/home/userzjj/ftp/{day}" + fdatas = [] with open(f"{base_path}/NGTSTick.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp] @@ -74,8 +86,108 @@ for row in csv_reader: if row[1] not in target_codes: continue - # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰 - writer.writerow(row) + if save: + # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰 + writer.writerow(row) + fdatas.append(row) + return fdatas + + +def parse_deal_big_orders(day, big_order_path, target_code=None): + """ + 鎻愬彇鎵�鏈夋垚浜ゅぇ鍗� + @param day: + @return: + """ + print("*******寮�濮嬪鐞嗘垚浜ゅぇ鍗曟暟鎹�", day, big_order_path, target_code) + l2_data_manager_dict = {} + # 瑙f瀽鏁版嵁 + __start_time = time.time() + transaction_data = parse_transaction(day) + print("*******Transaction 璇诲彇瀹屾瘯", len(transaction_data), "鑰楁椂", int(time.time() - __start_time)) + __start_time = time.time() + ngtstick_data = parse_ngtstick(day) + print("*******NGTSTick 璇诲彇瀹屾瘯", len(ngtstick_data), "鑰楁椂", int(time.time() - __start_time)) + __start_time = time.time() + big_order_list = [] + for index, row in enumerate(transaction_data): + code = row[1] + if target_code and code != target_code: + continue + item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2), + "TradeVolume": int(row[4]), + "OrderTime": int(row[2]), "MainSeq": int(row[6]), + "SubSeq": int(row[7]), "BuyNo": int(row[8]), + "SellNo": int(row[9]), + "ExecType": int(row[5])} + if item["TradePrice"] <= 0: + continue + if code not in l2_data_manager_dict: + l2_data_manager_dict[code] = L2TransactionDataManager(code, True) + l2_data_manager_dict[code].add_transaction_data(item) + if index % 100 == 0: + # 璇诲彇闃熷垪涓殑鏁版嵁 + l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] + while not l2_data_manager.big_accurate_buy_order_queue.empty(): + data = l2_data_manager.big_accurate_buy_order_queue.get() + big_order_list.append((code, 0, data)) + + while not l2_data_manager.big_accurate_sell_order_queue.empty(): + data = l2_data_manager.big_accurate_sell_order_queue.get() + big_order_list.append((code, 1, data)) + print("********Transaction 澶勭悊瀹屾瘯", len(big_order_list), "鑰楁椂", int(time.time() - __start_time)) + __start_time = time.time() + + for index, row in enumerate(ngtstick_data): + code = row[1] + if target_code and code != target_code: + continue + if row[5].strip() != 'T': + # 杩囨护闈炴垚浜ゆ暟鎹� + continue + + item = {"SecurityID": row[1], "TradePrice": round(float(row[8].split("@")[0]), 2), + "TradeVolume": int(row[9]), + "OrderTime": row[4], "MainSeq": row[2], + "SubSeq": row[3], "BuyNo": row[6], + "SellNo": row[7], + "ExecType": '1'} + if item["TradePrice"] <= 0: + continue + + if code not in l2_data_manager_dict: + l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(code, True) + l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item) + if index % 100 == 0: + # 璇诲彇闃熷垪涓殑鏁版嵁 + l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] + while not l2_data_manager.big_accurate_buy_order_queue.empty(): + data = l2_data_manager.big_accurate_buy_order_queue.get() + big_order_list.append((code, 0, data)) + + while not l2_data_manager.big_accurate_sell_order_queue.empty(): + data = l2_data_manager.big_accurate_sell_order_queue.get() + big_order_list.append((code, 1, data)) + print("********NGTSTick 澶勭悊瀹屾瘯", len(big_order_list), "鑰楁椂", int(time.time() - __start_time)) + __start_time = time.time() + # 璇诲彇鍓╀綑鐨勬湭璇绘暟鎹殑浠g爜 + for code in l2_data_manager_dict: + l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] + while not l2_data_manager.big_accurate_buy_order_queue.empty(): + data = l2_data_manager.big_accurate_buy_order_queue.get() + big_order_list.append((code, 0, data)) + + while not l2_data_manager.big_accurate_sell_order_queue.empty(): + data = l2_data_manager.big_accurate_sell_order_queue.get() + big_order_list.append((code, 1, data)) + print("********寮�濮嬪啓鍏ユ湰鍦版枃浠讹細", len(big_order_list), "鑰楁椂", int(time.time() - __start_time)) + __start_time = time.time() + # 寮�濮嬪啓鍏ユ湰鍦版枃浠� + with open(big_order_path, mode='w', encoding='utf-8') as f: + for order in big_order_list: + f.write(f"{order}\n") + print("*******鍐欏叆鏈湴鏂囦欢缁撴潫锛�", "鑰楁椂", int(time.time() - __start_time)) + __start_time = time.time() def parse_market_data(day): @@ -99,13 +211,15 @@ writer.writerow(row) +# 鍛戒护妯″紡 /home/userzjj/app/gp-server/l2_data_parser Transaction 2025-05-08 +# 瑙f瀽澶у崟锛� /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/鏈�缁堟垚浜ゆ暟鎹�20250509.txt 000555 if __name__ == '__main__': if len(sys.argv) > 1: params = sys.argv[1:] print("鎺ユ敹鐨勫弬鏁�", params) _type = params[0].strip() day = params[1].strip() - if len(params)>2: + if len(params) > 2: code = params[2].strip() else: code = None @@ -117,4 +231,15 @@ parse_ngtstick(day) elif _type == 'MarketData': parse_market_data(day) + elif _type == 'ExtractDealBigOrder': + # 鎻愬彇鎵�鏈夋垚浜ょ殑澶у崟 + if len(params) > 2: + save_path = params[2].strip() + else: + save_path = None + if len(params) > 3: + target_code = params[3].strip() + else: + target_code = None + parse_deal_big_orders(day, save_path, target_code) -- Gitblit v1.8.0