""" 数据导出工具 """ import json import logging import os import time import xlwt import constant from code_attribute import gpcode_manager import l2.l2_data_util from l2.huaxin import l2_huaxin_util from log_module import log, log_export from l2 import l2_data_source_util from trade import deal_big_money_manager def export_l2_excel(code, date=None): # 获取L2的数据 local_today_datas = log_export.load_l2_from_log(date) datas = local_today_datas[code] datas = datas[-2000:] # 获取L2处理位置信息 process_indexs = log_export.get_l2_process_position(code, date) trade_indexs = log_export.get_l2_trade_position(code, date) real_position_indexes = log_export.get_real_place_order_positions(code, date) fdatas = export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes) __save_l2_datas(code, fdatas) # 获取L2的数据 def get_l2_datas(code, date=None): local_today_datas = log_export.load_l2_from_log(date) datas = local_today_datas[code] process_indexs = log_export.get_l2_process_position(code, date) trade_indexs = log_export.get_l2_trade_position(code, date) real_position_indexes = log_export.get_real_place_order_positions(code, date) deal_list = log_export.load_huaxin_deal_record(code) deal_list_dict = {} for d in deal_list: deal_list_dict[d[0]] = d fdatas = export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict) return fdatas def export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict): def find_process_index(index): for i in range(0, len(process_indexs)): if process_indexs[i][0] <= index <= process_indexs[i][1]: return i return len(process_indexs) def find_trade_index(index): for i in range(0, len(trade_indexs)): if trade_indexs[i][1] == index: return trade_indexs[i] return None # 数据预处理 num_operate_map = {} l2.l2_data_util.load_num_operate_map(num_operate_map, code, datas) buy_no_map = {} l2.l2_data_util.load_buy_no_map(buy_no_map, code, datas) l2.l2_data_util.load_canceled_buy_no_map(l2.l2_data_util.local_today_canceled_buyno_map, code, datas) # num_dict = {} # for data in datas: # if data["val"]["num"] not in num_dict: # num_dict[data["val"]["num"]] = [] # num_dict[data["val"]["num"]].append(data) index = 0 fdatas = [] for data in datas: index += 1 trade_info = find_trade_index(data["index"]) if not trade_info: # 获取真实下单位置 if data["index"] in real_position_indexes: trade_info = [3] style_int = None if find_process_index(data["index"]) % 2 == 0: style_int = 0 else: style_int = 1 format_data = [] # 索引 format_data.append(data["index"]) # 时间 format_data.append(data["val"]["time"] + (f".{data['val']['tms']}" if "tms" in data["val"] else '')) cancel_time = data["val"]["cancelTime"] if cancel_time == '0': cancel_time = '' else: cancel_time = "{}".format(cancel_time) if len(cancel_time) > 0: if int(data["val"]["cancelTimeUnit"]) == 0: cancel_time += "s" elif int(data["val"]["cancelTimeUnit"]) == 1: cancel_time += "m" elif int(data["val"]["cancelTimeUnit"]) == 2: cancel_time += "h" # 撤单间隔时间 format_data.append(cancel_time) # 金额 format_data.append("{}万".format(round(int(data["val"]["num"]) * float(data["val"]["price"]) / 100, 1))) # 单价 format_data.append(data["val"]["price"]) if int(data["val"]["operateType"]) == 1 or int(data["val"]["operateType"]) == 2: format_data.append(0 - int(data["val"]["num"])) else: format_data.append(int(data["val"]["num"])) limit_price = "" if int(data["val"]["limitPrice"]) == 1: limit_price = "T" elif int(data["val"]["limitPrice"]) == 2: limit_price = "D" operateDesc = "" if int(data["val"]["operateType"]) == 0: if len(limit_price) > 0: operateDesc = '买{}'.format(limit_price) else: operateDesc = '买' elif int(data["val"]["operateType"]) == 1: if len(limit_price) > 0: operateDesc = '买撤{}'.format(limit_price) else: operateDesc = '买撤' elif int(data["val"]["operateType"]) == 2: if len(limit_price) > 0: operateDesc = '卖{}'.format(limit_price) else: operateDesc = '卖' elif int(data["val"]["operateType"]) == 3: if len(limit_price) > 0: operateDesc = '卖撤{}'.format(limit_price) else: operateDesc = '卖撤' format_data.append(operateDesc) format_data.append(data["re"]) # 查询是否撤单 cancel_info = None if int(data["val"]["operateType"]) == 0: cancel_data = l2.l2_data_util.local_today_canceled_buyno_map.get(code).get(str(data["val"]["orderNo"])) # 买 if cancel_data: try: cancel_info = "{}-{}".format(cancel_data["index"], f"{cancel_data['val']['time']}.{cancel_data['val']['tms']}") except Exception as e: logging.exception(e) else: if int(data["val"].get("orderNo")) in deal_list_dict: cancel_info = l2_huaxin_util.convert_time(deal_list_dict[int(data["val"].get("orderNo"))][3]) format_data.append(cancel_info) cancel_order_info = None if trade_info: if trade_info[0] == 0: # font.colour_index = 53 pass elif trade_info[0] == 1: # font.colour_index = 17 pass elif trade_info[0] == 2: # font.colour_index = 10 cancel_order_info = trade_info[2] format_data.append(cancel_order_info) format_data.append(data["val"].get("orderNo")) fdatas.append((style_int, trade_info, format_data)) return fdatas def __save_l2_datas(code, fdatas, dest_dir=f"{constant.get_path_prefix()}/export/l2"): local_time = time.strftime("%Y%m%dT%H%M%S", time.localtime(time.time())) file_name = "{}/{}_{}.xls".format(dest_dir, code, local_time) wb = xlwt.Workbook(encoding="utf-8") ws = wb.add_sheet('sheet1') ws.write(0, 0, '序号') ws.write(0, 1, '时间') ws.write(0, 2, '买撤间隔') ws.write(0, 3, '金额') ws.write(0, 4, '价格') ws.write(0, 5, '手数') ws.write(0, 6, '类型') ws.write(0, 7, '重复数量') ws.write(0, 8, '撤单时间') ws.write(0, 9, '备注') ws.write(0, 10, '订单号') index = 0 cancel_style = xlwt.easyxf('pattern: pattern solid, fore_colour gray25') for fdata in fdatas: index += 1 style_int = fdata[0] trade_info = fdata[1] data = fdata[2] style = None if style_int == 0: style = xlwt.easyxf('pattern: pattern solid') else: style = xlwt.easyxf('pattern: pattern solid, fore_colour light_yellow') font = xlwt.Font() style.font = font if trade_info: if trade_info[0] == 0: font.colour_index = 53 elif trade_info[0] == 1: font.colour_index = 17 elif trade_info[0] == 2: font.colour_index = 10 for i in range(len(data)): if data[i] is None: continue if i == 8 and data[i]: ws.write(index, i, data[i], cancel_style) else: ws.write(index, i, data[i], style) wb.save(file_name) def export_l2_data_origin(code, datas, key, dest_dir=f"{constant.get_path_prefix()}/export/l2_origin"): file_dir = "{}/{}".format(dest_dir, code) if not os.path.exists(file_dir): os.makedirs(file_dir) file_name = "{}/{}/{}.xls".format(dest_dir, code, key) wb = xlwt.Workbook() ws = wb.add_sheet('sheet1') ws.write(0, 0, '序号') ws.write(0, 1, '时间') ws.write(0, 2, '买撤间隔') ws.write(0, 3, '价格') ws.write(0, 4, '手数') ws.write(0, 5, '类型') index = 0 for data in datas: index += 1 ws.write(index, 0, index) ws.write(index, 1, data["time"]) cancel_time = data["cancelTime"] if cancel_time > 0: cancel_time = "{}".format(cancel_time) if data["cancelTimeUnit"] == 0: cancel_time += "s" elif data["cancelTimeUnit"] == 1: cancel_time += "m" elif data["cancelTimeUnit"] == 2: cancel_time += "h" ws.write(index, 2, cancel_time) ws.write(index, 3, data["price"]) ws.write(index, 4, data["num"]) if data["operateType"] == 0: ws.write(index, 5, '买') elif data["operateType"] == 1: ws.write(index, 5, '买撤') wb.save(file_name) return file_name def test(code): progresses = log_export.export_trade_progress(code) local_today_datas = log_export.load_l2_from_log("2023-04-04") datas = local_today_datas[code] num_operate_map = {} l2.l2_data_util.load_num_operate_map(num_operate_map, code, datas) if __name__ == "__main__": try: export_l2_excel("605218") except Exception as e: logging.exception(e)