Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
utils/data_export_util.py
@@ -1,6 +1,7 @@
"""
数据导出工具
"""
import copy
import json
import logging
import os
@@ -12,10 +13,16 @@
from code_attribute import gpcode_manager
import l2.l2_data_util
from l2.huaxin import l2_huaxin_util
from l2.l2_transaction_data_manager import BigOrderDealManager
from log_module import log, log_export
from l2 import l2_data_source_util
from l2 import l2_data_source_util, l2_data_util
from log_module.log import logger_debug
from trade import deal_big_money_manager
from trade.buy_money_count_setting import BuyMoneyUtil
from utils import tool
# 缓存L2数据,格式:{"日期":{数据}}
__l2_data_cache = {}
def export_l2_excel(code, date=None):
@@ -25,19 +32,33 @@
# 获取L2的数据
def get_l2_datas(code, today_datas=None, date=None):
def get_l2_datas(code, today_datas=None, date=None, end_index=None):
    __start_time = time.time()
    if date is None:
        date = tool.get_now_date_str()
    datas = today_datas
    if datas is None:
        local_today_datas = log_export.load_l2_from_log(date)
        if date in __l2_data_cache:
            local_today_datas = __l2_data_cache.get(date)
        else:
            local_today_datas = log_export.load_l2_from_log(date)
            __l2_data_cache[date] = local_today_datas
        datas = local_today_datas.get(code)
    if not datas:
        datas = []
    process_indexs = log_export.get_l2_process_position(code, date)
    trade_indexs = log_export.get_l2_trade_position(code, date)
    real_position_indexes = log_export.get_real_place_order_positions(code, date)
    deal_list = log_export.load_huaxin_deal_record(code, date)
    if not datas:
        # 加快没有L2数据的导出速度
        process_indexs = []
        trade_indexs = []
        real_position_indexes = []
        deal_list = []
        cancel_reasons = {}
    else:
        process_indexs = log_export.get_l2_process_position(code, date)
        trade_indexs = log_export.get_l2_trade_position(code, date)
        real_position_indexes = log_export.get_real_place_order_positions(code, date)
        deal_list = log_export.load_huaxin_deal_record(code, date)
        cancel_reasons = log_export.load_cancel_buy_reasons(code, date)
    deal_list_dict = {}
    for d in deal_list:
        deal_list_dict[str(d[0])] = d
@@ -48,7 +69,22 @@
    active_sell_set = active_sell_map.get(code)
    if not active_sell_set:
        active_sell_set = set()
    fdatas = export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict, sell_nos, active_sell_set)
    # 如果有截至位置
    if end_index:
        datas = copy.deepcopy(datas)
        for i in range(len(datas)):
            if datas[i]["index"] == end_index:
                datas = datas[:i]
                break
    # 成交的大单信息[(买单号,总股数,总成交额,成交开始时间,成交结束时间)]
    deal_big_buy_order_list = copy.deepcopy(BigOrderDealManager().get_total_buy_data_list(code))
    deal_big_buy_order_no_dict = {}
    if deal_big_buy_order_list:
        for d in deal_big_buy_order_list:
            deal_big_buy_order_no_dict[d[0]] = d
    fdatas = export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict, sell_nos,
                            active_sell_set, cancel_reasons, deal_big_buy_order_no_dict)
    return fdatas
@@ -61,7 +97,8 @@
    return fdatas
def export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict, sell_nos,active_sell_nos):
def export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict, sell_nos,
                   active_sell_nos, cancel_reasons, deal_big_buy_order_no_dict):
    def find_process_index(index):
        for i in range(0, len(process_indexs)):
            if process_indexs[i][0] <= index <= process_indexs[i][1]:
@@ -74,6 +111,10 @@
                return trade_indexs[i]
        return None
    # 最新的一条数据
    latest_data = datas[-1]
    latest_time_str_with_ms = l2.l2_data_util.L2DataUtil.get_time_with_ms(latest_data["val"])
    # 数据预处理
    num_operate_map = {}
    l2.l2_data_util.load_num_operate_map(num_operate_map, code, datas)
@@ -81,23 +122,42 @@
    sell_no_map = {}
    l2.l2_data_util.load_buy_no_map(buy_no_map, code, datas)
    l2.l2_data_util.load_sell_no_map(sell_no_map, code, datas)
    l2.l2_data_util.load_canceled_buy_no_map(l2.l2_data_util.local_today_canceled_buyno_map, code, datas)
    # num_dict = {}
    # for data in datas:
    #     if data["val"]["num"] not in num_dict:
    #         num_dict[data["val"]["num"]] = []
    #     num_dict[data["val"]["num"]].append(data)
    logger_debug.info(f"循环组装数据开始:长度-{len(datas)} 主动卖长度-{len(active_sell_nos)}")
    index = 0
    fdatas = []
    # 数据太多就需要过滤掉小金额
    is_data_too_large = len(datas) > 20000
    limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
    if limit_up_price:
        # 需要订阅的特殊的量
        special_volumes = BuyMoneyUtil.get_possible_buy_volumes(limit_up_price)
        special_volumes = [x//100 for x in special_volumes]
    else:
        special_volumes = []
    for data in datas:
        index += 1
        if is_data_too_large and data['val']['num'] * float(data['val']['price']) < 5000:
            # 小单
            if data["index"] not in real_position_indexes and data['val']['num'] not in special_volumes:
                continue
        # 先移除
        if data['val']['orderNo'] in active_sell_nos:
            # 过滤主动卖
            continue
        # 移除成交大单在L2中已有的订单
        if l2.l2_data_util.L2DataUtil.is_limit_up_price_buy(data["val"]):
            if int(data['val']['orderNo']) in deal_big_buy_order_no_dict:
                deal_big_buy_order_no_dict.pop(int(data['val']['orderNo']))
        trade_info = find_trade_index(data["index"])
        if not trade_info:
@@ -176,7 +236,8 @@
        if int(data["val"]["operateType"]) == 0:
            cancel_data = l2.l2_data_util.local_today_canceled_buyno_map.get(code).get(str(data["val"]["orderNo"]))
            # 买
            if cancel_data:
            if cancel_data and latest_data["index"] >= cancel_data["index"]:
                # 如果撤单索引不能比最近数据索引还大
                try:
                    left_num = data["val"]["num"] - cancel_data["val"]["num"]
                    if left_num > 0:
@@ -188,9 +249,12 @@
                    logging.exception(e)
            else:
                deal_info = deal_list_dict.get(str(data["val"].get("orderNo")))
                if deal_info:
                    cancel_info = l2_huaxin_util.convert_time(deal_info[3],
                if deal_info and len(deal_info) >= 5:
                    # 成交时间不能比当前索引时间大
                    cancel_info = l2_huaxin_util.convert_time(deal_info[4],
                                                              with_ms=True)
                    if tool.trade_time_sub_with_ms(cancel_info, latest_time_str_with_ms) > 0:
                        cancel_info = None
        format_data.append(cancel_info)
        cancel_order_info = None
        if trade_info:
@@ -203,42 +267,33 @@
            elif trade_info[0] == 2:
                # font.colour_index = 10
                cancel_order_info = trade_info[2]
        if not cancel_order_info:
            if data["index"] in cancel_reasons:
                cancel_order_info = cancel_reasons[data["index"]]
        format_data.append(cancel_order_info)
        format_data.append(data["val"].get("orderNo"))
        fdatas.append((style_int, trade_info, format_data))
    # 将订单号索引
    order_no_index_map = {}
    for i in range(len(fdatas)):
        d = fdatas[i][2]
        if d[6].find('撤') >= 0:
            continue
        order_no_index_map[int(d[10])] = i
    order_no_indexes = [(k, order_no_index_map[k]) for k in order_no_index_map]
    order_no_indexes.sort(key=lambda x: x[0])
    if sell_nos:
        for sell_info in sell_nos:
            if sell_info[1] * sell_info[2] < 50 * 10000:
                continue
            for i in range(len(order_no_indexes) - 1):
                if order_no_indexes[i][0] < sell_info[0] < order_no_indexes[i + 1][0]:
                    item = []
                    item.append(order_no_indexes[i + 1][1])
                    item.append(l2_huaxin_util.convert_time(sell_info[3][0], with_ms=True))
                    item.append("")
                    item.append(
                        "{}万".format(round(sell_info[1] * sell_info[2] / 10000, 1)))
                    item.append(sell_info[2])
                    item.append(sell_info[1] // 100)
                    item.append("主动卖")
                    item.append(1)
                    item.append(l2_huaxin_util.convert_time(sell_info[4][0], with_ms=True))
                    item.append(None)
                    item.append(sell_info[0])
                    fdatas.insert(order_no_indexes[i + 1][1], (0, None, item))
                    break
    if tool.is_sh_code(code) and deal_big_buy_order_no_dict:
        # 上证,有主动成交的大单
        active_deal_list = [deal_big_buy_order_no_dict[order_no] for order_no in deal_big_buy_order_no_dict]
        active_deal_list.sort(key=lambda x: x[3])
        for i in range(len(active_deal_list)):
            d = active_deal_list[i]
            format_data = [0]
            format_data.append(l2_huaxin_util.convert_time(d[3], with_ms=True))
            format_data.append('')
            format_data.append(f"{round(d[2] / 10000, 1)}万")
            format_data.append(round(d[2] / d[1], 2))
            format_data.append(d[1] // 100)
            format_data.append('主动买')
            format_data.append(1)
            format_data.append(l2_huaxin_util.convert_time(d[4], with_ms=True))
            format_data.append('')
            format_data.append(d[0])
            fdatas.insert(i, (0, None, format_data))
    logger_debug.info("循环组装数据完成")
    return fdatas