""" L2数据解析器 """ import csv import os import sys from db import mysql_data_delegate as mysql_data def __get_target_codes(day): results = mysql_data.Mysqldb().select_all(f"select _code from kpl_limit_up_record where _day='{day}'") codes = set([x[0] for x in results]) return codes def parse_order_detail(day): target_codes = __get_target_codes(day) base_path = f"/home/userzjj/ftp/{day}" with open(f"{base_path}/OrderDetail.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) # 获取表头:: ['ExchangeID', 'SecurityID', 'OrderTime', 'Price', 'Volume', 'Side', 'OrderType', 'MainSeq', 'SubSeq', 'Info1', 'Info2', 'Info3', 'OrderNO', 'OrderStatus', 'BizIndex', 'LocalTimeStamp'] headers = next(csv_reader) print("表头:", headers) # 遍历数据行 _path = f"{base_path}/OrderDetail_filter.csv" with open(_path, 'w', newline='', encoding='utf-8') as csvfile: # 创建一个 CSV 写入器对象 writer = csv.writer(csvfile) # 逐行写入数据 for row in csv_reader: if row[1] not in target_codes: continue # 将文件写入到文本 writer.writerow(row) def parse_transaction(day): target_codes = __get_target_codes(day) base_path = f"/home/userzjj/ftp/{day}" with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) # 获取表头:: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp] headers = next(csv_reader) print("表头:", headers) # 遍历数据行 _path = f"{base_path}/Transaction_filter.csv" with open(_path, 'w', newline='', encoding='utf-8') as csvfile: # 创建一个 CSV 写入器对象 writer = csv.writer(csvfile) # 逐行写入数据 for row in csv_reader: if row[1] not in target_codes: continue # 将文件写入到文本 writer.writerow(row) def parse_ngtstick(day): target_codes = __get_target_codes(day) base_path = f"/home/userzjj/ftp/{day}" with open(f"{base_path}/NGTSTick.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) # 获取表头:: [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp] headers = next(csv_reader) print("表头:", headers) # 遍历数据行 _path = f"{base_path}/NGTSTick_filter.csv" with open(_path, 'w', newline='', encoding='utf-8') as csvfile: # 创建一个 CSV 写入器对象 writer = csv.writer(csvfile) # 逐行写入数据 for row in csv_reader: if row[1] not in target_codes: continue # 将文件写入到文本 writer.writerow(row) def parse_market_data(day): target_codes = __get_target_codes(day) base_path = f"/home/userzjj/ftp/{day}" with open(f"{base_path}/MarketData.csv", 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) # 获取表头:: [SecurityID,ExchangeID,DataTimeStamp,PreClosePrice,OpenPrice,NumTrades,TotalVolumeTrade,TotalValueTrade,TotalBidVolume,AvgBidPrice,TotalAskVolume,AvgAskPrice,HighestPrice,LowestPrice,LastPrice,BidPrice1,BidVolume1,AskPrice1,AskVolume1....] headers = next(csv_reader) print("表头:", headers) # 遍历数据行 _path = f"{base_path}/MarketData_filter.csv" with open(_path, 'w', newline='', encoding='utf-8') as csvfile: # 创建一个 CSV 写入器对象 writer = csv.writer(csvfile) # 逐行写入数据 for row in csv_reader: if row[0] not in target_codes: continue # 将文件写入到文本 writer.writerow(row) if __name__ == '__main__': if len(sys.argv) > 1: params = sys.argv[1:] print("接收的参数", params) _type = params[0].strip() day = params[1].strip() if _type == 'OrderDetail': parse_order_detail(day) elif _type == 'Transaction': parse_transaction(day) elif _type == 'NGTSTick': parse_ngtstick(day) elif _type == 'MarketData': parse_market_data(day)