Administrator
2024-04-24 9b7a928ead75ab099f8cc87a0f86f6870df7c9f7
utils/data_export_util.py
@@ -11,41 +11,78 @@
import constant
from code_attribute import gpcode_manager
import l2.l2_data_util
from l2.huaxin import l2_huaxin_util
from log_module import log, log_export
from l2 import l2_data_source_util
from log_module.log import logger_debug
from trade import deal_big_money_manager
from utils import tool
# 缓存L2数据,格式:{"日期":{数据}}
__l2_data_cache = {}
def export_l2_excel(code, date=None):
    # 获取L2的数据
    local_today_datas = log_export.load_l2_from_log(date)
    datas = local_today_datas[code]
    datas = datas[-2000:]
    # 获取L2处理位置信息
    process_indexs = log_export.get_l2_process_position(code, date)
    trade_indexs = log_export.get_l2_trade_position(code, date)
    real_position_indexes = log_export.get_real_place_order_positions(code, date)
    fdatas = export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes)
    fdatas = get_l2_datas(code, date=date)
    __save_l2_datas(code, fdatas)
# 获取L2的数据
def get_l2_datas(code, date=None):
    local_today_datas = log_export.load_l2_from_log(date)
    datas = local_today_datas[code]
def get_l2_datas(code, today_datas=None, date=None, max_time=None):
    __start_time = time.time()
    if date is None:
        date = tool.get_now_date_str()
    datas = today_datas
    if datas is None:
        if date in __l2_data_cache:
            local_today_datas = __l2_data_cache.get(date)
        else:
            local_today_datas = log_export.load_l2_from_log(date)
            __l2_data_cache[date] = local_today_datas
        datas = local_today_datas.get(code)
    if not datas:
        datas = []
    logger_debug.info("加载L2逐笔委托耗时:{}", f"{(time.time() - __start_time)}")
    __start_time = time.time()
    process_indexs = log_export.get_l2_process_position(code, date)
    logger_debug.info("加载L2逐笔委托批次耗时:{}", f"{(time.time() - __start_time)}")
    __start_time = time.time()
    trade_indexs = log_export.get_l2_trade_position(code, date)
    real_position_indexes = log_export.get_real_place_order_positions(code, date)
    deal_list = log_export.load_huaxin_deal_record(code)
    deal_list = log_export.load_huaxin_deal_record(code, date)
    logger_debug.info("加载其他交易参数耗时:{}", f"{(time.time() - __start_time)}")
    __start_time = time.time()
    deal_list_dict = {}
    for d in deal_list:
        deal_list_dict[d[0]] = d
        deal_list_dict[str(d[0])] = d
    fdatas = export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict)
    sell_no_dict = log_export.load_huaxin_transaction_sell_no(code=code, date=date)
    sell_nos = sell_no_dict.get(code)
    active_sell_map = log_export.load_huaxin_active_sell_map(date=date)
    active_sell_set = active_sell_map.get(code)
    logger_debug.info("加载卖单耗时:{}", f"{(time.time() - __start_time)}")
    __start_time = time.time()
    if not active_sell_set:
        active_sell_set = set()
    fdatas = export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict, sell_nos,
                            active_sell_set)
    logger_debug.info("组装数据耗时:{}", f"{(time.time() - __start_time)}")
    __start_time = time.time()
    return fdatas
def export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict):
def get_l2_transaction_datas(code, date=None):
    if date is None:
        date = tool.get_now_date_str()
    sell_no_dict = log_export.load_huaxin_transaction_sell_no(code=code, date=date)
    sell_nos = sell_no_dict.get(code)
    fdatas = export_l2_transaction_data(code, sell_nos)
    return fdatas
def export_l2_data(code, datas, process_indexs, trade_indexs, real_position_indexes, deal_list_dict, sell_nos,
                   active_sell_nos):
    def find_process_index(index):
        for i in range(0, len(process_indexs)):
            if process_indexs[i][0] <= index <= process_indexs[i][1]:
@@ -58,24 +95,41 @@
                return trade_indexs[i]
        return None
    logger_debug.info("准备加载L2Map数据-----")
    # 数据预处理
    num_operate_map = {}
    l2.l2_data_util.load_num_operate_map(num_operate_map, code, datas)
    logger_debug.info("load_num_operate_map")
    buy_no_map = {}
    sell_no_map = {}
    l2.l2_data_util.load_buy_no_map(buy_no_map, code, datas)
    logger_debug.info("load_buy_no_map")
    l2.l2_data_util.load_sell_no_map(sell_no_map, code, datas)
    logger_debug.info("load_sell_no_map")
    l2.l2_data_util.load_canceled_buy_no_map(l2.l2_data_util.local_today_canceled_buyno_map, code, datas)
    logger_debug.info("load_canceled_buy_no_map")
    # num_dict = {}
    # for data in datas:
    #     if data["val"]["num"] not in num_dict:
    #         num_dict[data["val"]["num"]] = []
    #     num_dict[data["val"]["num"]].append(data)
    logger_debug.info(f"循环组装数据开始:长度-{len(datas)} 主动卖长度-{len(active_sell_nos)}")
    index = 0
    fdatas = []
    # 数据太多就需要过滤掉小金额
    is_data_too_large = len(datas) > 20000
    for data in datas:
        index += 1
        if is_data_too_large and data['val']['num'] * float(data['val']['price']) < 5000:
            if data["index"] not in real_position_indexes:
                continue
        # 先移除
        if data['val']['orderNo'] in active_sell_nos:
            # 过滤主动卖
            continue
        trade_info = find_trade_index(data["index"])
        if not trade_info:
            # 获取真实下单位置
@@ -155,12 +209,19 @@
            # 买
            if cancel_data:
                try:
                    cancel_info = "{}-{}".format(cancel_data["index"], cancel_data["val"]["time"])
                    left_num = data["val"]["num"] - cancel_data["val"]["num"]
                    if left_num > 0:
                        cancel_info = f"成交:{left_num} 序号:{cancel_data['index']}"
                    else:
                        cancel_info = "{}-{}".format(cancel_data["index"], f"{cancel_data['val']['time']}") + (
                            f".{cancel_data['val']['tms']}" if "tms" in cancel_data["val"] else '')
                except Exception as e:
                    logging.exception(e)
            else:
                if int(data["val"].get("orderNo")) in deal_list_dict:
                    cancel_info = deal_list_dict[int(data["val"].get("orderNo"))][3]
                deal_info = deal_list_dict.get(str(data["val"].get("orderNo")))
                if deal_info and len(deal_info) >= 5:
                    cancel_info = l2_huaxin_util.convert_time(deal_info[4],
                                                              with_ms=True)
        format_data.append(cancel_info)
        cancel_order_info = None
        if trade_info:
@@ -176,6 +237,32 @@
        format_data.append(cancel_order_info)
        format_data.append(data["val"].get("orderNo"))
        fdatas.append((style_int, trade_info, format_data))
    logger_debug.info("循环组装数据完成")
    return fdatas
def export_l2_transaction_data(code, sell_nos):
    fdatas = []
    if sell_nos:
        index = 0
        for sell_info in sell_nos:
            if sell_info[1] * sell_info[2] < 50 * 10000:
                continue
            index += 1
            item = []
            item.append(index)
            item.append(l2_huaxin_util.convert_time(sell_info[3][0], with_ms=True))
            item.append("")
            item.append(
                "{}万".format(round(sell_info[1] * sell_info[2] / 10000, 1)))
            item.append(sell_info[2])
            item.append(sell_info[1] // 100)
            item.append("主动卖")
            item.append(1)
            item.append(l2_huaxin_util.convert_time(sell_info[4][0], with_ms=True))
            item.append(None)
            item.append(sell_info[0])
            fdatas.append((0, None, item))
    return fdatas
@@ -279,6 +366,6 @@
if __name__ == "__main__":
    try:
        export_l2_excel("605218")
        get_l2_datas("600990")
    except Exception as e:
        logging.exception(e)