| | |
| | | L2数据解析器 |
| | | """ |
| | | import csv |
| | | def parse_order_detail(): |
| | | with open('/home/userzjj/ftp/20250123/OrderDetail.csv', 'r', encoding='utf-8') as file: |
| | | import os |
| | | import sys |
| | | import time |
| | | |
| | | from db import mysql_data_delegate as mysql_data |
| | | from huaxin_client.l2_client_test import L2TransactionDataManager |
| | | from log_module import log_export |
| | | |
| | | |
| | | def __get_target_codes(day): |
| | | special_codes_dict = log_export.load_special_codes(day) |
| | | fcodes = set() |
| | | for plate in special_codes_dict: |
| | | codes = special_codes_dict[plate] |
| | | fcodes |= codes |
| | | return fcodes |
| | | |
| | | |
| | | def parse_order_detail(day, code): |
| | | target_codes = __get_target_codes(day) |
| | | base_path = f"/home/userzjj/ftp/{day}" |
| | | with open(f"{base_path}/OrderDetail.csv", 'r', encoding='utf-8') as file: |
| | | csv_reader = csv.reader(file) |
| | | # 获取表头 |
| | | # 获取表头:: ['ExchangeID', 'SecurityID', 'OrderTime', 'Price', 'Volume', 'Side', 'OrderType', 'MainSeq', 'SubSeq', 'Info1', 'Info2', 'Info3', 'OrderNO', 'OrderStatus', 'BizIndex', 'LocalTimeStamp'] |
| | | headers = next(csv_reader) |
| | | print("表头:", headers) |
| | | # 遍历数据行 |
| | | max_count = 10 |
| | | count = 0 |
| | | for row in csv_reader: |
| | | print(row) |
| | | count += 1 |
| | | if count > max_count: |
| | | break |
| | | _path = f"{base_path}/OrderDetail_filter.csv" |
| | | with open(_path, 'w', newline='', encoding='utf-8') as csvfile: |
| | | # 创建一个 CSV 写入器对象 |
| | | writer = csv.writer(csvfile) |
| | | # 逐行写入数据 |
| | | for row in csv_reader: |
| | | if row[1] not in target_codes: |
| | | continue |
| | | if code and code != row[1]: |
| | | continue |
| | | # 将文件写入到文本 |
| | | writer.writerow(row) |
| | | |
| | | |
| | | def parse_transaction(day, save=True): |
| | | target_codes = __get_target_codes(day) |
| | | base_path = f"/home/userzjj/ftp/{day}" |
| | | fdatas = [] |
| | | with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file: |
| | | csv_reader = csv.reader(file) |
| | | # 获取表头:: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp] |
| | | headers = next(csv_reader) |
| | | print("表头:", headers) |
| | | # 遍历数据行 |
| | | _path = f"{base_path}/Transaction_filter.csv" |
| | | with open(_path, 'w', newline='', encoding='utf-8') as csvfile: |
| | | # 创建一个 CSV 写入器对象 |
| | | writer = csv.writer(csvfile) |
| | | # 逐行写入数据 |
| | | for row in csv_reader: |
| | | if row[1] not in target_codes: |
| | | continue |
| | | # 将文件写入到文本 |
| | | if save: |
| | | writer.writerow(row) |
| | | fdatas.append(row) |
| | | return fdatas |
| | | |
| | | |
| | | def parse_ngtstick(day, save=True): |
| | | target_codes = __get_target_codes(day) |
| | | base_path = f"/home/userzjj/ftp/{day}" |
| | | fdatas = [] |
| | | with open(f"{base_path}/NGTSTick.csv", 'r', encoding='utf-8') as file: |
| | | csv_reader = csv.reader(file) |
| | | # 获取表头:: [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp] |
| | | headers = next(csv_reader) |
| | | print("表头:", headers) |
| | | # 遍历数据行 |
| | | _path = f"{base_path}/NGTSTick_filter.csv" |
| | | with open(_path, 'w', newline='', encoding='utf-8') as csvfile: |
| | | # 创建一个 CSV 写入器对象 |
| | | writer = csv.writer(csvfile) |
| | | # 逐行写入数据 |
| | | for row in csv_reader: |
| | | if row[1] not in target_codes: |
| | | continue |
| | | if save: |
| | | # 将文件写入到文本 |
| | | writer.writerow(row) |
| | | fdatas.append(row) |
| | | return fdatas |
| | | |
| | | |
| | | def parse_deal_big_orders(day, big_order_path, target_code=None): |
| | | """ |
| | | 提取所有成交大单 |
| | | @param day: |
| | | @return: |
| | | """ |
| | | print("*******开始处理成交大单数据", day, big_order_path, target_code) |
| | | l2_data_manager_dict = {} |
| | | # 解析数据 |
| | | __start_time = time.time() |
| | | transaction_data = parse_transaction(day) |
| | | print("*******Transaction 读取完毕", len(transaction_data), "耗时", int(time.time() - __start_time)) |
| | | __start_time = time.time() |
| | | ngtstick_data = parse_ngtstick(day) |
| | | print("*******NGTSTick 读取完毕", len(ngtstick_data), "耗时", int(time.time() - __start_time)) |
| | | __start_time = time.time() |
| | | big_order_list = [] |
| | | for index, row in enumerate(transaction_data): |
| | | code = row[1] |
| | | if target_code and code != target_code: |
| | | continue |
| | | item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2), |
| | | "TradeVolume": int(row[4]), |
| | | "OrderTime": int(row[2]), "MainSeq": int(row[6]), |
| | | "SubSeq": int(row[7]), "BuyNo": int(row[8]), |
| | | "SellNo": int(row[9]), |
| | | "ExecType": int(row[5])} |
| | | if item["TradePrice"] <= 0: |
| | | continue |
| | | if code not in l2_data_manager_dict: |
| | | l2_data_manager_dict[code] = L2TransactionDataManager(code, True) |
| | | l2_data_manager_dict[code].add_transaction_data(item) |
| | | if index % 100 == 0: |
| | | # 读取队列中的数据 |
| | | l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] |
| | | while not l2_data_manager.big_accurate_buy_order_queue.empty(): |
| | | data = l2_data_manager.big_accurate_buy_order_queue.get() |
| | | big_order_list.append((code, 0, data)) |
| | | |
| | | while not l2_data_manager.big_accurate_sell_order_queue.empty(): |
| | | data = l2_data_manager.big_accurate_sell_order_queue.get() |
| | | big_order_list.append((code, 1, data)) |
| | | print("********Transaction 处理完毕", len(big_order_list), "耗时", int(time.time() - __start_time)) |
| | | __start_time = time.time() |
| | | |
| | | for index, row in enumerate(ngtstick_data): |
| | | code = row[1] |
| | | if target_code and code != target_code: |
| | | continue |
| | | if row[5].strip() != 'T': |
| | | # 过滤非成交数据 |
| | | continue |
| | | |
| | | item = {"SecurityID": row[1], "TradePrice": round(float(row[8].split("@")[0]), 2), |
| | | "TradeVolume": int(row[9]), |
| | | "OrderTime": row[4], "MainSeq": row[2], |
| | | "SubSeq": row[3], "BuyNo": row[6], |
| | | "SellNo": row[7], |
| | | "ExecType": '1'} |
| | | if item["TradePrice"] <= 0: |
| | | continue |
| | | |
| | | if code not in l2_data_manager_dict: |
| | | l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(code, True) |
| | | l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item) |
| | | if index % 100 == 0: |
| | | # 读取队列中的数据 |
| | | l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] |
| | | while not l2_data_manager.big_accurate_buy_order_queue.empty(): |
| | | data = l2_data_manager.big_accurate_buy_order_queue.get() |
| | | big_order_list.append((code, 0, data)) |
| | | |
| | | while not l2_data_manager.big_accurate_sell_order_queue.empty(): |
| | | data = l2_data_manager.big_accurate_sell_order_queue.get() |
| | | big_order_list.append((code, 1, data)) |
| | | print("********NGTSTick 处理完毕", len(big_order_list), "耗时", int(time.time() - __start_time)) |
| | | __start_time = time.time() |
| | | # 读取剩余的未读数据的代码 |
| | | for code in l2_data_manager_dict: |
| | | l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code] |
| | | while not l2_data_manager.big_accurate_buy_order_queue.empty(): |
| | | data = l2_data_manager.big_accurate_buy_order_queue.get() |
| | | big_order_list.append((code, 0, data)) |
| | | |
| | | while not l2_data_manager.big_accurate_sell_order_queue.empty(): |
| | | data = l2_data_manager.big_accurate_sell_order_queue.get() |
| | | big_order_list.append((code, 1, data)) |
| | | print("********开始写入本地文件:", len(big_order_list), "耗时", int(time.time() - __start_time)) |
| | | __start_time = time.time() |
| | | # 开始写入本地文件 |
| | | with open(big_order_path, mode='w', encoding='utf-8') as f: |
| | | for order in big_order_list: |
| | | f.write(f"{order}\n") |
| | | print("*******写入本地文件结束:", "耗时", int(time.time() - __start_time)) |
| | | __start_time = time.time() |
| | | |
| | | |
| | | def parse_market_data(day): |
| | | target_codes = __get_target_codes(day) |
| | | base_path = f"/home/userzjj/ftp/{day}" |
| | | with open(f"{base_path}/MarketData.csv", 'r', encoding='utf-8') as file: |
| | | csv_reader = csv.reader(file) |
| | | # 获取表头:: [SecurityID,ExchangeID,DataTimeStamp,PreClosePrice,OpenPrice,NumTrades,TotalVolumeTrade,TotalValueTrade,TotalBidVolume,AvgBidPrice,TotalAskVolume,AvgAskPrice,HighestPrice,LowestPrice,LastPrice,BidPrice1,BidVolume1,AskPrice1,AskVolume1....] |
| | | headers = next(csv_reader) |
| | | print("表头:", headers) |
| | | # 遍历数据行 |
| | | _path = f"{base_path}/MarketData_filter.csv" |
| | | with open(_path, 'w', newline='', encoding='utf-8') as csvfile: |
| | | # 创建一个 CSV 写入器对象 |
| | | writer = csv.writer(csvfile) |
| | | # 逐行写入数据 |
| | | for row in csv_reader: |
| | | if row[0] not in target_codes: |
| | | continue |
| | | # 将文件写入到文本 |
| | | writer.writerow(row) |
| | | |
| | | |
| | | # 命令模式 /home/userzjj/app/gp-server/l2_data_parser Transaction 2025-05-08 |
| | | # 解析大单: /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/最终成交数据20250509.txt 000555 |
| | | if __name__ == '__main__': |
| | | parse_order_detail() |
| | | if len(sys.argv) > 1: |
| | | params = sys.argv[1:] |
| | | print("接收的参数", params) |
| | | _type = params[0].strip() |
| | | day = params[1].strip() |
| | | if len(params) > 2: |
| | | code = params[2].strip() |
| | | else: |
| | | code = None |
| | | if _type == 'OrderDetail': |
| | | parse_order_detail(day, code) |
| | | elif _type == 'Transaction': |
| | | parse_transaction(day) |
| | | elif _type == 'NGTSTick': |
| | | parse_ngtstick(day) |
| | | elif _type == 'MarketData': |
| | | parse_market_data(day) |
| | | elif _type == 'ExtractDealBigOrder': |
| | | # 提取所有成交的大单 |
| | | if len(params) > 2: |
| | | save_path = params[2].strip() |
| | | else: |
| | | save_path = None |
| | | |
| | | if len(params) > 3: |
| | | target_code = params[3].strip() |
| | | else: |
| | | target_code = None |
| | | parse_deal_big_orders(day, save_path, target_code) |