""" 数据导出工具 """ import json import os import time import xlwt import constant from code_attribute import gpcode_manager import l2.l2_data_util from logs_ import log from l2 import l2_data_source_util from trade import deal_big_money_manager def export_l2_excel(code, date=None): # 获取L2的数据 local_today_datas = log.load_l2_from_log(date) datas = local_today_datas[code] # 获取L2处理位置信息 process_indexs = log.get_l2_process_position(code, date) trade_indexs = log.get_l2_trade_position(code, date) export_l2_data(code, datas, process_indexs, trade_indexs) def export_l2_data(code, datas, process_indexs, trade_indexs, dest_dir=f"{constant.get_path_prefix()}/export/l2"): def find_process_index(index): for i in range(0, len(process_indexs)): if process_indexs[i][0] <= index <= process_indexs[i][1]: return i return len(process_indexs) def find_trade_index(index): for i in range(0, len(trade_indexs)): if trade_indexs[i][1] == index: return trade_indexs[i] return None # 数据预处理 num_operate_map = {} l2.l2_data_util.load_num_operate_map(num_operate_map, code, datas) num_dict = {} for data in datas: if data["val"]["num"] not in num_dict: num_dict[data["val"]["num"]] = [] num_dict[data["val"]["num"]].append(data) local_time = time.strftime("%Y%m%dT%H%M%S", time.localtime(time.time())) file_name = "{}/{}_{}_{}.xls".format(dest_dir, code, gpcode_manager.get_code_name(code), local_time) file_name_txt = "{}/{}_{}.txt".format(dest_dir, code, local_time) openfile = open(file_name_txt, 'w') try: for data in datas: openfile.write(json.dumps(data) + "\n") finally: openfile.close() wb = xlwt.Workbook() ws = wb.add_sheet('sheet1') ws.write(0, 0, '序号') ws.write(0, 1, '时间') ws.write(0, 2, '买撤间隔') ws.write(0, 3, '金额') ws.write(0, 4, '价格') ws.write(0, 5, '手数') ws.write(0, 6, '类型') ws.write(0, 7, '重复数量') ws.write(0, 8, '撤单时间') index = 0 for data in datas: index += 1 trade_info = find_trade_index(data["index"]) font = xlwt.Font() if trade_info: if trade_info[0] == 0: font.colour_index = 53 elif trade_info[0] == 1: font.colour_index = 17 elif trade_info[0] == 2: font.colour_index = 10 ws.write(index, 8, trade_info[2]) style = None if find_process_index(data["index"]) % 2 == 0: style = xlwt.easyxf('pattern: pattern solid') else: style = xlwt.easyxf('pattern: pattern solid, fore_colour light_yellow') style.font = font cancel_style = xlwt.easyxf('pattern: pattern solid, fore_colour gray25') ws.write(index, 0, data["index"], style) ws.write(index, 1, data["val"]["time"], style) cancel_time = data["val"]["cancelTime"] if cancel_time == '0': cancel_time = '' else: cancel_time = "{}".format(cancel_time) if len(cancel_time) > 0: if int(data["val"]["cancelTimeUnit"]) == 0: cancel_time += "s" elif int(data["val"]["cancelTimeUnit"]) == 1: cancel_time += "m" elif int(data["val"]["cancelTimeUnit"]) == 2: cancel_time += "h" ws.write(index, 2, cancel_time, style) ws.write(index, 4, data["val"]["price"], style) if int(data["val"]["operateType"]) == 1 or int(data["val"]["operateType"]) == 2: ws.write(index, 5, 0 - int(data["val"]["num"]), style) else: ws.write(index, 5, int(data["val"]["num"]), style) limit_price = "" if int(data["val"]["limitPrice"]) == 1: limit_price = "涨停" elif int(data["val"]["limitPrice"]) == 2: limit_price = "跌停" if int(data["val"]["operateType"]) == 0: if len(limit_price) > 0: ws.write(index, 6, '买 ({})'.format(limit_price), style) else: ws.write(index, 6, '买', style) elif int(data["val"]["operateType"]) == 1: if len(limit_price) > 0: ws.write(index, 6, '买撤 ({})'.format(limit_price), style) else: ws.write(index, 6, '买撤', style) elif int(data["val"]["operateType"]) == 2: if len(limit_price) > 0: ws.write(index, 6, '卖 ({})'.format(limit_price), style) else: ws.write(index, 6, '卖', style) elif int(data["val"]["operateType"]) == 3: if len(limit_price) > 0: ws.write(index, 6, '卖撤 ({})'.format(limit_price), style) else: ws.write(index, 6, '卖撤', style) ws.write(index, 7, data["re"], style) # 查询是否撤单 if int(data["val"]["operateType"]) == 0: cancel = False # 买 for d in num_dict[data["val"]["num"]]: if int(d["val"]["operateType"]) == 1: buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, d, num_operate_map[ code]) if buy_index == data["index"]: ws.write(index, 8, "{}-{}".format(d["index"], d["val"]["time"]), cancel_style) break ws.write(index, 3, "{}万".format(round(int(data["val"]["num"]) * float(data["val"]["price"]) / 100, 2)), style) wb.save(file_name) return file_name def export_l2_data_origin(code, datas, key, dest_dir=f"{constant.get_path_prefix()}/export/l2_origin"): file_dir = "{}/{}".format(dest_dir, code) if not os.path.exists(file_dir): os.makedirs(file_dir) file_name = "{}/{}/{}.xls".format(dest_dir, code, key) wb = xlwt.Workbook() ws = wb.add_sheet('sheet1') ws.write(0, 0, '序号') ws.write(0, 1, '时间') ws.write(0, 2, '买撤间隔') ws.write(0, 3, '价格') ws.write(0, 4, '手数') ws.write(0, 5, '类型') index = 0 for data in datas: index += 1 ws.write(index, 0, index) ws.write(index, 1, data["time"]) cancel_time = data["cancelTime"] if cancel_time > 0: cancel_time = "{}".format(cancel_time) if data["cancelTimeUnit"] == 0: cancel_time += "s" elif data["cancelTimeUnit"] == 1: cancel_time += "m" elif data["cancelTimeUnit"] == 2: cancel_time += "h" ws.write(index, 2, cancel_time) ws.write(index, 3, data["price"]) ws.write(index, 4, data["num"]) if data["operateType"] == 0: ws.write(index, 5, '买') elif data["operateType"] == 1: ws.write(index, 5, '买撤') wb.save(file_name) return file_name def test(code): progresses = log.export_trade_progress(code) local_today_datas = log.load_l2_from_log("2023-04-04") datas = local_today_datas[code] num_operate_map = {} l2.l2_data_util.load_num_operate_map(num_operate_map, code, datas) for progress in progresses: deal_big_money_manager.set_trade_progress(code, progress, datas, num_operate_map[code]) if __name__ == "__main__": export_l2_excel("002765")