From 771db1d7737e55e36a5dd8da00183a6a7afc8785 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 14 五月 2025 14:13:00 +0800 Subject: [PATCH] bug修复 --- l2_data_parser.py | 245 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 234 insertions(+), 11 deletions(-) diff --git a/l2_data_parser.py b/l2_data_parser.py index 2578aae..69be377 100644 --- a/l2_data_parser.py +++ b/l2_data_parser.py @@ -2,21 +2,244 @@ L2鏁版嵁瑙f瀽鍣� """ import csv -def parse_order_detail(): - with open('/home/userzjj/ftp/20250123/OrderDetail.csv', 'r', encoding='utf-8') as file: +import os +import sys +import time + +from db import mysql_data_delegate as mysql_data +from huaxin_client.l2_client_test import L2TransactionDataManager +from log_module import log_export + + +def __get_target_codes(day): + special_codes_dict = log_export.load_special_codes(day) + fcodes = set() + for plate in special_codes_dict: + codes = special_codes_dict[plate] + fcodes |= codes + return fcodes + + +def parse_order_detail(day, code): + target_codes = __get_target_codes(day) + base_path = f"/home/userzjj/ftp/{day}" + with open(f"{base_path}/OrderDetail.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) - # 鑾峰彇琛ㄥご + # 鑾峰彇琛ㄥご锛�: ['ExchangeID', 'SecurityID', 'OrderTime', 'Price', 'Volume', 'Side', 'OrderType', 'MainSeq', 'SubSeq', 'Info1', 'Info2', 'Info3', 'OrderNO', 'OrderStatus', 'BizIndex', 'LocalTimeStamp'] headers = next(csv_reader) print("琛ㄥご:", headers) # 閬嶅巻鏁版嵁琛� - max_count = 10 - count = 0 - for row in csv_reader: - print(row) - count += 1 - if count > max_count: - break + _path = f"{base_path}/OrderDetail_filter.csv" + with open(_path, 'w', newline='', encoding='utf-8') as csvfile: + # 鍒涘缓涓�涓� CSV 鍐欏叆鍣ㄥ璞� + writer = csv.writer(csvfile) + # 閫愯鍐欏叆鏁版嵁 + for row in csv_reader: + if row[1] not in target_codes: + continue + if code and code != row[1]: + continue + # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰 + writer.writerow(row) +def parse_transaction(day, save=True): + target_codes = __get_target_codes(day) + base_path = f"/home/userzjj/ftp/{day}" + fdatas = [] + with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file: + csv_reader = csv.reader(file) + # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp] + headers = next(csv_reader) + print("琛ㄥご:", headers) + # 閬嶅巻鏁版嵁琛� + _path = f"{base_path}/Transaction_filter.csv" + with open(_path, 'w', newline='', encoding='utf-8') as csvfile: + # 鍒涘缓涓�涓� CSV 鍐欏叆鍣ㄥ璞� + writer = csv.writer(csvfile) + # 閫愯鍐欏叆鏁版嵁 + for row in csv_reader: + if row[1] not in target_codes: + continue + # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰 + if save: + writer.writerow(row) + fdatas.append(row) + return fdatas + + +def parse_ngtstick(day, save=True): + target_codes = __get_target_codes(day) + base_path = f"/home/userzjj/ftp/{day}" + fdatas = [] + with open(f"{base_path}/NGTSTick.csv", 'r', encoding='utf-8') as file: + csv_reader = csv.reader(file) + # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp] + headers = next(csv_reader) + print("琛ㄥご:", headers) + # 閬嶅巻鏁版嵁琛� + _path = f"{base_path}/NGTSTick_filter.csv" + with open(_path, 'w', newline='', encoding='utf-8') as csvfile: + # 鍒涘缓涓�涓� CSV 鍐欏叆鍣ㄥ璞� + writer = csv.writer(csvfile) + # 閫愯鍐欏叆鏁版嵁 + for row in csv_reader: + if row[1] not in target_codes: + continue + if save: + # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰 + writer.writerow(row) + fdatas.append(row) + return fdatas + + +def parse_deal_big_orders(day, big_order_path, target_code=None): + """ + 鎻愬彇鎵�鏈夋垚浜ゅぇ鍗� + @param day: + @return: + """ + print("*******寮�濮嬪鐞嗘垚浜ゅぇ鍗曟暟鎹�", day, big_order_path, target_code) + l2_data_manager_dict = {} + # 瑙f瀽鏁版嵁 + __start_time = time.time() + transaction_data = parse_transaction(day) + print("*******Transaction 璇诲彇瀹屾瘯", len(transaction_data), "鑰楁椂", int(time.time() - __start_time)) + __start_time = time.time() + ngtstick_data = parse_ngtstick(day) + print("*******NGTSTick 璇诲彇瀹屾瘯", len(ngtstick_data), "鑰楁椂", int(time.time() - __start_time)) + __start_time = time.time() + big_order_list = [] + for index, row in enumerate(transaction_data): + code = row[1] + if target_code and code != target_code: + continue + item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2), + "TradeVolume": int(row[4]), + "OrderTime": int(row[2]), "MainSeq": int(row[6]), + "SubSeq": int(row[7]), "BuyNo": int(row[8]), + "SellNo": int(row[9]), + "ExecType": int(row[5])} + if item["TradePrice"] <= 0: + continue + if code not in l2_data_manager_dict: + l2_data_manager_dict[code] = L2TransactionDataManager(code, True) + l2_data_manager_dict[code].add_transaction_data(item) + if index % 100 == 0: + # 璇诲彇闃熷垪涓殑鏁版嵁 + l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] + while not l2_data_manager.big_accurate_buy_order_queue.empty(): + data = l2_data_manager.big_accurate_buy_order_queue.get() + big_order_list.append((code, 0, data)) + + while not l2_data_manager.big_accurate_sell_order_queue.empty(): + data = l2_data_manager.big_accurate_sell_order_queue.get() + big_order_list.append((code, 1, data)) + print("********Transaction 澶勭悊瀹屾瘯", len(big_order_list), "鑰楁椂", int(time.time() - __start_time)) + __start_time = time.time() + + for index, row in enumerate(ngtstick_data): + code = row[1] + if target_code and code != target_code: + continue + if row[5].strip() != 'T': + # 杩囨护闈炴垚浜ゆ暟鎹� + continue + + item = {"SecurityID": row[1], "TradePrice": round(float(row[8].split("@")[0]), 2), + "TradeVolume": int(row[9]), + "OrderTime": row[4], "MainSeq": row[2], + "SubSeq": row[3], "BuyNo": row[6], + "SellNo": row[7], + "ExecType": '1'} + if item["TradePrice"] <= 0: + continue + + if code not in l2_data_manager_dict: + l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(code, True) + l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item) + if index % 100 == 0: + # 璇诲彇闃熷垪涓殑鏁版嵁 + l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] + while not l2_data_manager.big_accurate_buy_order_queue.empty(): + data = l2_data_manager.big_accurate_buy_order_queue.get() + big_order_list.append((code, 0, data)) + + while not l2_data_manager.big_accurate_sell_order_queue.empty(): + data = l2_data_manager.big_accurate_sell_order_queue.get() + big_order_list.append((code, 1, data)) + print("********NGTSTick 澶勭悊瀹屾瘯", len(big_order_list), "鑰楁椂", int(time.time() - __start_time)) + __start_time = time.time() + # 璇诲彇鍓╀綑鐨勬湭璇绘暟鎹殑浠g爜 + for code in l2_data_manager_dict: + l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] + while not l2_data_manager.big_accurate_buy_order_queue.empty(): + data = l2_data_manager.big_accurate_buy_order_queue.get() + big_order_list.append((code, 0, data)) + + while not l2_data_manager.big_accurate_sell_order_queue.empty(): + data = l2_data_manager.big_accurate_sell_order_queue.get() + big_order_list.append((code, 1, data)) + print("********寮�濮嬪啓鍏ユ湰鍦版枃浠讹細", len(big_order_list), "鑰楁椂", int(time.time() - __start_time)) + __start_time = time.time() + # 寮�濮嬪啓鍏ユ湰鍦版枃浠� + with open(big_order_path, mode='w', encoding='utf-8') as f: + for order in big_order_list: + f.write(f"{order}\n") + print("*******鍐欏叆鏈湴鏂囦欢缁撴潫锛�", "鑰楁椂", int(time.time() - __start_time)) + __start_time = time.time() + + +def parse_market_data(day): + target_codes = __get_target_codes(day) + base_path = f"/home/userzjj/ftp/{day}" + with open(f"{base_path}/MarketData.csv", 'r', encoding='utf-8') as file: + csv_reader = csv.reader(file) + # 鑾峰彇琛ㄥご锛�: [SecurityID,ExchangeID,DataTimeStamp,PreClosePrice,OpenPrice,NumTrades,TotalVolumeTrade,TotalValueTrade,TotalBidVolume,AvgBidPrice,TotalAskVolume,AvgAskPrice,HighestPrice,LowestPrice,LastPrice,BidPrice1,BidVolume1,AskPrice1,AskVolume1....] + headers = next(csv_reader) + print("琛ㄥご:", headers) + # 閬嶅巻鏁版嵁琛� + _path = f"{base_path}/MarketData_filter.csv" + with open(_path, 'w', newline='', encoding='utf-8') as csvfile: + # 鍒涘缓涓�涓� CSV 鍐欏叆鍣ㄥ璞� + writer = csv.writer(csvfile) + # 閫愯鍐欏叆鏁版嵁 + for row in csv_reader: + if row[0] not in target_codes: + continue + # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰 + writer.writerow(row) + + +# 鍛戒护妯″紡 /home/userzjj/app/gp-server/l2_data_parser Transaction 2025-05-08 +# 瑙f瀽澶у崟锛� /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/鏈�缁堟垚浜ゆ暟鎹�20250509.txt 000555 if __name__ == '__main__': - parse_order_detail() + if len(sys.argv) > 1: + params = sys.argv[1:] + print("鎺ユ敹鐨勫弬鏁�", params) + _type = params[0].strip() + day = params[1].strip() + if len(params) > 2: + code = params[2].strip() + else: + code = None + if _type == 'OrderDetail': + parse_order_detail(day, code) + elif _type == 'Transaction': + parse_transaction(day) + elif _type == 'NGTSTick': + parse_ngtstick(day) + elif _type == 'MarketData': + parse_market_data(day) + elif _type == 'ExtractDealBigOrder': + # 鎻愬彇鎵�鏈夋垚浜ょ殑澶у崟 + if len(params) > 2: + save_path = params[2].strip() + else: + save_path = None + + if len(params) > 3: + target_code = params[3].strip() + else: + target_code = None + parse_deal_big_orders(day, save_path, target_code) -- Gitblit v1.8.0