""" 数据导出工具 """ import json import logging import os import time import xlwt import constant from code_attribute import gpcode_manager import l2.l2_data_util from l2.huaxin import l2_huaxin_util from log_module import log, log_export from l2 import l2_data_source_util from log_module.log import logger_debug from trade import deal_big_money_manager from utils import tool # 缓存L2数据,格式:{"日期":{数据}} __l2_data_cache = {} def export_l2_excel(code, date=None): # 获取L2的数据 fdatas = get_l2_datas(code, date=date) __save_l2_datas(code, fdatas) # 获取L2的数据 def get_l2_datas(code, today_datas=None, date=None, max_time=None): __start_time = time.time() if date is None: date = tool.get_now_date_str() datas = today_datas if datas is None: if date in __l2_data_cache: local_today_datas = __l2_data_cache.get(date) else: local_today_datas = log_export.load_l2_from_log(date) __l2_data_cache[date] = local_today_datas datas = local_today_datas.get(code) if not datas: datas = [] if not datas: # 加快没有L2数据的导出速度 process_indexs = [] trade_indexs = [] real_position_indexes = [] deal_list = [] cancel_reasons = {} else: process_indexs = log_export.get_l2_process_position(code, date) trade_indexs = log_export.get_l2_trade_position(code, date) real_position_indexes = log_export.get_real_place_order_positions(code, date) deal_list = log_export.load_huaxin_deal_record(code, date) cancel_reasons = log_export.load_cancel_buy_reasons(code, date) deal_list_dict = {} for d in deal_list: deal_list_dict[str(d[0])] = d sell_no_dict = log_export.load_huaxin_transaction_sell_no(code=code, date=date) sell_nos = sell_no_dict.get(code) active_sell_map = log_export.load_huaxin_active_sell_map(date=date) active_sell_set = active_sell_map.get(code) if not active_sell_set: active_sell_set = set() fdatas = export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict, sell_nos, active_sell_set, cancel_reasons) return fdatas def get_l2_transaction_datas(code, date=None): if date is None: date = tool.get_now_date_str() sell_no_dict = log_export.load_huaxin_transaction_sell_no(code=code, date=date) sell_nos = sell_no_dict.get(code) fdatas = export_l2_transaction_data(code, sell_nos) return fdatas def export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict, sell_nos, active_sell_nos, cancel_reasons): def find_process_index(index): for i in range(0, len(process_indexs)): if process_indexs[i][0] <= index <= process_indexs[i][1]: return i return len(process_indexs) def find_trade_index(index): for i in range(0, len(trade_indexs)): if trade_indexs[i][1] == index: return trade_indexs[i] return None # 数据预处理 num_operate_map = {} l2.l2_data_util.load_num_operate_map(num_operate_map, code, datas) buy_no_map = {} sell_no_map = {} l2.l2_data_util.load_buy_no_map(buy_no_map, code, datas) l2.l2_data_util.load_sell_no_map(sell_no_map, code, datas) l2.l2_data_util.load_canceled_buy_no_map(l2.l2_data_util.local_today_canceled_buyno_map, code, datas) # num_dict = {} # for data in datas: # if data["val"]["num"] not in num_dict: # num_dict[data["val"]["num"]] = [] # num_dict[data["val"]["num"]].append(data) logger_debug.info(f"循环组装数据开始:长度-{len(datas)} 主动卖长度-{len(active_sell_nos)}") index = 0 fdatas = [] # 数据太多就需要过滤掉小金额 is_data_too_large = len(datas) > 20000 for data in datas: index += 1 if is_data_too_large and data['val']['num'] * float(data['val']['price']) < 5000: if data["index"] not in real_position_indexes: continue # 先移除 if data['val']['orderNo'] in active_sell_nos: # 过滤主动卖 continue trade_info = find_trade_index(data["index"]) if not trade_info: # 获取真实下单位置 if data["index"] in real_position_indexes: trade_info = [3] style_int = None if find_process_index(data["index"]) % 2 == 0: style_int = 0 else: style_int = 1 format_data = [] # 索引 format_data.append(data["index"]) # 时间 format_data.append(data["val"]["time"] + (f".{data['val']['tms']}" if "tms" in data["val"] else '')) cancel_time = data["val"]["cancelTime"] if cancel_time == '0': cancel_time = '' else: cancel_time = "{}".format(cancel_time) if len(cancel_time) > 0: if int(data["val"]["cancelTimeUnit"]) == 0: cancel_time += "s" elif int(data["val"]["cancelTimeUnit"]) == 1: cancel_time += "m" elif int(data["val"]["cancelTimeUnit"]) == 2: cancel_time += "h" # 撤单间隔时间 format_data.append(cancel_time) # 金额 format_data.append("{}万".format(round(int(data["val"]["num"]) * float(data["val"]["price"]) / 100, 1))) # 单价 format_data.append(data["val"]["price"]) if int(data["val"]["operateType"]) == 1 or int(data["val"]["operateType"]) == 2: format_data.append(0 - int(data["val"]["num"])) else: format_data.append(int(data["val"]["num"])) limit_price = "" if int(data["val"]["limitPrice"]) == 1: limit_price = "T" elif int(data["val"]["limitPrice"]) == 2: limit_price = "D" operateDesc = "" if int(data["val"]["operateType"]) == 0: if len(limit_price) > 0: operateDesc = '买{}'.format(limit_price) else: operateDesc = '买' elif int(data["val"]["operateType"]) == 1: if len(limit_price) > 0: operateDesc = '买撤{}'.format(limit_price) else: operateDesc = '买撤' elif int(data["val"]["operateType"]) == 2: if len(limit_price) > 0: operateDesc = '卖{}'.format(limit_price) else: operateDesc = '卖' elif int(data["val"]["operateType"]) == 3: if len(limit_price) > 0: operateDesc = '卖撤{}'.format(limit_price) else: operateDesc = '卖撤' format_data.append(operateDesc) format_data.append(data["re"]) # 查询是否撤单 cancel_info = None if int(data["val"]["operateType"]) == 0: cancel_data = l2.l2_data_util.local_today_canceled_buyno_map.get(code).get(str(data["val"]["orderNo"])) # 买 if cancel_data: try: left_num = data["val"]["num"] - cancel_data["val"]["num"] if left_num > 0: cancel_info = f"成交:{left_num} 序号:{cancel_data['index']}" else: cancel_info = "{}-{}".format(cancel_data["index"], f"{cancel_data['val']['time']}") + ( f".{cancel_data['val']['tms']}" if "tms" in cancel_data["val"] else '') except Exception as e: logging.exception(e) else: deal_info = deal_list_dict.get(str(data["val"].get("orderNo"))) if deal_info and len(deal_info) >= 5: cancel_info = l2_huaxin_util.convert_time(deal_info[4], with_ms=True) format_data.append(cancel_info) cancel_order_info = None if trade_info: if trade_info[0] == 0: # font.colour_index = 53 pass elif trade_info[0] == 1: # font.colour_index = 17 pass elif trade_info[0] == 2: # font.colour_index = 10 cancel_order_info = trade_info[2] if not cancel_order_info: if data["index"] in cancel_reasons: cancel_order_info = cancel_reasons[data["index"]] format_data.append(cancel_order_info) format_data.append(data["val"].get("orderNo")) fdatas.append((style_int, trade_info, format_data)) logger_debug.info("循环组装数据完成") return fdatas def export_l2_transaction_data(code, sell_nos): fdatas = [] if sell_nos: index = 0 for sell_info in sell_nos: if sell_info[1] * sell_info[2] < 50 * 10000: continue index += 1 item = [] item.append(index) item.append(l2_huaxin_util.convert_time(sell_info[3][0], with_ms=True)) item.append("") item.append( "{}万".format(round(sell_info[1] * sell_info[2] / 10000, 1))) item.append(sell_info[2]) item.append(sell_info[1] // 100) item.append("主动卖") item.append(1) item.append(l2_huaxin_util.convert_time(sell_info[4][0], with_ms=True)) item.append(None) item.append(sell_info[0]) fdatas.append((0, None, item)) return fdatas def __save_l2_datas(code, fdatas, dest_dir=f"{constant.get_path_prefix()}/export/l2"): local_time = time.strftime("%Y%m%dT%H%M%S", time.localtime(time.time())) file_name = "{}/{}_{}.xls".format(dest_dir, code, local_time) wb = xlwt.Workbook(encoding="utf-8") ws = wb.add_sheet('sheet1') ws.write(0, 0, '序号') ws.write(0, 1, '时间') ws.write(0, 2, '买撤间隔') ws.write(0, 3, '金额') ws.write(0, 4, '价格') ws.write(0, 5, '手数') ws.write(0, 6, '类型') ws.write(0, 7, '重复数量') ws.write(0, 8, '撤单时间') ws.write(0, 9, '备注') ws.write(0, 10, '订单号') index = 0 cancel_style = xlwt.easyxf('pattern: pattern solid, fore_colour gray25') for fdata in fdatas: index += 1 style_int = fdata[0] trade_info = fdata[1] data = fdata[2] style = None if style_int == 0: style = xlwt.easyxf('pattern: pattern solid') else: style = xlwt.easyxf('pattern: pattern solid, fore_colour light_yellow') font = xlwt.Font() style.font = font if trade_info: if trade_info[0] == 0: font.colour_index = 53 elif trade_info[0] == 1: font.colour_index = 17 elif trade_info[0] == 2: font.colour_index = 10 for i in range(len(data)): if data[i] is None: continue if i == 8 and data[i]: ws.write(index, i, data[i], cancel_style) else: ws.write(index, i, data[i], style) wb.save(file_name) def export_l2_data_origin(code, datas, key, dest_dir=f"{constant.get_path_prefix()}/export/l2_origin"): file_dir = "{}/{}".format(dest_dir, code) if not os.path.exists(file_dir): os.makedirs(file_dir) file_name = "{}/{}/{}.xls".format(dest_dir, code, key) wb = xlwt.Workbook() ws = wb.add_sheet('sheet1') ws.write(0, 0, '序号') ws.write(0, 1, '时间') ws.write(0, 2, '买撤间隔') ws.write(0, 3, '价格') ws.write(0, 4, '手数') ws.write(0, 5, '类型') index = 0 for data in datas: index += 1 ws.write(index, 0, index) ws.write(index, 1, data["time"]) cancel_time = data["cancelTime"] if cancel_time > 0: cancel_time = "{}".format(cancel_time) if data["cancelTimeUnit"] == 0: cancel_time += "s" elif data["cancelTimeUnit"] == 1: cancel_time += "m" elif data["cancelTimeUnit"] == 2: cancel_time += "h" ws.write(index, 2, cancel_time) ws.write(index, 3, data["price"]) ws.write(index, 4, data["num"]) if data["operateType"] == 0: ws.write(index, 5, '买') elif data["operateType"] == 1: ws.write(index, 5, '买撤') wb.save(file_name) return file_name def test(code): progresses = log_export.export_trade_progress(code) local_today_datas = log_export.load_l2_from_log("2023-04-04") datas = local_today_datas[code] num_operate_map = {} l2.l2_data_util.load_num_operate_map(num_operate_map, code, datas) if __name__ == "__main__": try: get_l2_datas("600990") except Exception as e: logging.exception(e)