Administrator
2023-02-10 a7a394e1525cfb85aff1ba02f0961dbb07748bc8
日志优化,部分大单并行化处理
14个文件已修改
438 ■■■■■ 已修改文件
data_export_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_log.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 225 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_log.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tool.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 63 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_result_manager.py 54 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py
@@ -193,6 +193,6 @@
if __name__ == "__main__":
    codes = ["601890"]
    codes = ["603083"]
    for code in codes:
        export_l2_excel(code)
juejin.py
@@ -166,8 +166,8 @@
# 获取最新的信息
def get_current_info():
    data = gpcode_manager.get_gp_list();
    results = JueJinManager.get_gp_current_info(data);
    data = gpcode_manager.get_gp_list()
    results = JueJinManager.get_gp_current_info(data)
    logger_juejin_tick.debug("定时获取:{}", results)
    for result in results:
        price = result["price"]
@@ -184,7 +184,7 @@
def re_set_price_pres(codes):
    result = JueJinManager.get_gp_latest_info(codes);
    result = JueJinManager.get_gp_latest_info(codes)
    for item in result:
        symbol = item['symbol']
        symbol = symbol.split(".")[1]
l2/cancel_buy_strategy.py
@@ -114,7 +114,7 @@
        # 只守护30s
        if tool.trade_time_sub(total_data[start_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
            return False, None
        l2_log.cancel_debug(threadId, code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        l2_log.cancel_debug(code, "S级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        logger_l2_s_cancel.debug(f"code-{code} S级是否需要撤单,数据范围:{start_index}-{end_index}")
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) > 30:
@@ -202,7 +202,7 @@
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug(threadId, code, "S级大单 范围:{}-{} 取消计算结果:{}/{},比例:{}", start_index, end_index, cancel_num,
            l2_log.cancel_debug( code, "S级大单 范围:{}-{} 取消计算结果:{}/{},比例:{}", start_index, end_index, cancel_num,
                                buy_num, round(cancel_num / max(buy_num,1), 2))
            # 保存处理进度与数据
@@ -288,12 +288,12 @@
            total_nums += total_data[indexs[0]]["val"]["num"] * indexs[2]
        if watch_indexs is None:
            l2_log.cancel_debug(threadId, code, "H撤没获取到监听范围数据")
            l2_log.cancel_debug(code, "H撤没获取到监听范围数据")
            return False, None
        processed_index, cancel_num = cls.__get_compute_data(code)
        l2_log.cancel_debug(threadId, code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        l2_log.cancel_debug( code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        # 获取下单次数
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        cancel_rate_threshold = constant.H_CANCEL_FIRST_RATE
@@ -323,7 +323,7 @@
                        if cancel_num / total_nums > cancel_rate_threshold:
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug(threadId, code, "H级撤单计算结果 范围:{}-{} 处理进度:{} 取消计算结果:{}/{}", start_index, end_index,
            l2_log.cancel_debug(code, "H级撤单计算结果 范围:{}-{} 处理进度:{} 取消计算结果:{}/{}", start_index, end_index,
                                process_index, cancel_num,
                                total_nums)
            logger_l2_h_cancel.info(f"code-{code} H级撤单计算结果 范围:{start_index}-{end_index} 处理进度:{process_index} 目标比例:{cancel_rate_threshold} 取消计算结果:{cancel_num}/{total_nums}")
@@ -428,7 +428,7 @@
                if total_count >= safe_count:  # and total_num >= threshold_num
                    finished = True
                    # 最小8笔
                    l2_log.cancel_debug(0, code, "获取到H撤监听数据:{},计算截至位置:{}", json.dumps(list(watch_set)),
                    l2_log.cancel_debug(code, "获取到H撤监听数据:{},计算截至位置:{}", json.dumps(list(watch_set)),
                                        total_data[-1]["index"])
                    break
        # 计算TOP N大单
@@ -439,7 +439,7 @@
        final_watch_set = set.union(watch_set, top_n_watch_set)
        final_watch_list = list(final_watch_set)
        final_watch_list.sort(key=lambda x: x[0])
        logger_l2_h_cancel.info(f"code-{code}  H撤最终监控大单:{final_watch_list}")
        logger_l2_h_cancel.info(f"code-{code} 安全笔数:{safe_count}  H撤最终监控大单:{final_watch_list}")
        # 保存计算范围
        cls.__save_watch_index_set(code, final_watch_set, process_index, finished)
        # 删除原来的计算数据
@@ -522,6 +522,7 @@
        key = None
        # 获取矫正时间前1分钟的数据
        keys = []
        if not constant.TEST:
        for i in range(0, 3600):
            temp_time = tool.trade_time_add_second(time_str, 0 - i)
            # 只处理9:30后的数据
@@ -532,6 +533,20 @@
                keys.append(keys_[0])
            if len(keys) >= 1:
                break
        else:
            keys_ = cls.__get_l2_second_money_record_keys(code, "*")
            key_list=[]
            for k in keys_:
                time__ = k.split("-")[-1]
                key_list.append((int(time__),k))
            key_list.sort(key=lambda tup: tup[0])
            for t in key_list:
                if t[0] <= int(time_):
                    keys.append(t[1])
                    break
        keys.sort(key=lambda tup: int(tup.split("-")[-1]))
        if len(keys) > 0:
            key = keys[0]
@@ -775,11 +790,11 @@
            process_end_index = cancel_index
        # 保存最新累计金额
        # cls.__set_l2_latest_money_record(code, process_end_index, total_num)
        l2_data_log.l2_time(code, random_key, round(time.time() * 1000) - start_time,
        l2_data_log.l2_time(code, round(time.time() * 1000) - start_time,
                            "l2数据封单额计算时间",
                            False)
        if cancel_index:
            l2_log.cancel_debug(random_key, code, "数据处理位置:{}-{},{},最终买1为:{}", start_index, end_index, record_msg,
            l2_log.cancel_debug(code, "数据处理位置:{}-{},{},最终买1为:{}", start_index, end_index, record_msg,
                                total_num)
            return total_datas[cancel_index], cancel_msg
        return None, None
@@ -864,7 +879,7 @@
            process_index = cancel_index
        else:
            process_index = end_index
        l2_log.cancel_debug(random_key, code, "板上卖信息:计算位置:{}-{} 板上卖数据{}/{}", start_index, end_index, total_num,
        l2_log.cancel_debug(code, "板上卖信息:计算位置:{}-{} 板上卖数据{}/{}", start_index, end_index, total_num,
                            threshold_num)
        cls.__save_process_index(code, process_index)
l2/l2_data_log.py
@@ -2,13 +2,14 @@
import time
import log
from l2 import l2_log
def l2_time(code, do_id, time_, description, new_line=False, force=False):
def l2_time(code, time_, description, new_line=False, force=False):
    timestamp = int(time.time() * 1000)
    # 只记录耗时较长的信息
    if time_ > 1 or force:
        log.logger_l2_process_time.info("{}-{} {}: {}-{}{}", do_id, timestamp, description, code, time_,
        log.logger_l2_process_time.info("{}-{} {}: {}-{}{}", l2_log.threadIds.get(code), timestamp, description, code, time_,
                                        "\n" if new_line else "")
    return timestamp
l2/l2_data_manager_new.py
@@ -16,7 +16,7 @@
import tool
from trade import trade_data_manager, trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \
    trade_result_manager
from l2 import safe_count_manager, l2_data_manager, l2_data_log
from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log
from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \
    L2LimitUpSellStatisticUtil
from l2.l2_data_manager import L2DataException, TradePointManager
@@ -161,20 +161,6 @@
    __buyL2SafeCountManager = safe_count_manager.BuyL2SafeCountManager()
    @classmethod
    def debug(cls, code, content, *args):
        logger_l2_trade.debug(("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    def cancel_debug(cls, code, content, *args):
        logger_l2_trade_cancel.debug(
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    def buy_debug(cls, code, content, *args):
        logger_l2_trade_buy.debug(
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
@@ -204,7 +190,7 @@
                    # 保存数据
                    __start_time = round(t.time() * 1000)
                    l2.l2_data_util.save_l2_data(code, datas, add_datas, cls.random_key[code])
                    __start_time = l2_data_log.l2_time(code, cls.random_key[code],
                    __start_time = l2_data_log.l2_time(code,
                                                       round(t.time() * 1000) - __start_time,
                                                       "保存数据时间({})".format(len(add_datas)))
        finally:
@@ -231,7 +217,7 @@
                        limit_up_time_manager.save_limit_up_time(code, "09:30:00")
        total_datas = local_today_datas[code]
        __start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - __start_time,
        __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                           "l2数据预处理时间")
        if len(add_datas) > 0:
@@ -254,7 +240,7 @@
            logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                   add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                   capture_timestamp)
            __start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - __start_time,
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                               "l2数据处理时间")
    # 处理未挂单
@@ -264,7 +250,7 @@
        # 获取阈值
        threshold_money, msg = cls.__get_threshmoney(code)
        if round(t.time() * 1000) - __start_time > 10:
            __start_time = l2_data_log.l2_time(code, cls.random_key.get(code), round(t.time() * 1000) - __start_time,
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                               "获取m值数据耗时")
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
@@ -285,7 +271,7 @@
            cls.__buyL2SafeCountManager.compute_left_rate(code, start_index, end_index, total_data,
                                                          local_today_num_operate_map.get(code))
            l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
            l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
                                "已下单-获取买入信息耗时")
            return None, ""
@@ -296,7 +282,7 @@
            # 计算m值大单
            cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
                                              gpcode_manager.get_limit_up_price(code))
            l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
            l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
                                "已下单-m值大单计算")
            return None, ""
@@ -309,7 +295,7 @@
                                                                               end_index,
                                                                               buy_single_index, buy_exec_index)
            l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
            l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
                                "已下单-买1统计耗时")
            return cancel_data, cancel_msg
@@ -328,7 +314,7 @@
            except Exception as e:
                logging.exception(e)
            finally:
                l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time,
                                    "已下单-s级大单估算")
            return None, ""
@@ -345,7 +331,7 @@
            except Exception as e:
                logging.exception(e)
            finally:
                l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "已下单-H撤大单计算")
                l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-H撤大单计算")
            return None, ""
        # 板上卖撤
@@ -361,7 +347,7 @@
            except Exception as e:
                logging.exception(e)
            finally:
                l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "已下单-板上卖耗时")
                l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-板上卖耗时")
            return None, ""
        # 是否需要撤销
@@ -384,7 +370,7 @@
        if end_index < start_index:
            return
        total_data = local_today_datas.get(code)
        _start_time = round(t.time() * 1000)
        _start_time = tool.get_now_timestamp()
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(code)
@@ -397,18 +383,18 @@
        dask_result = is_need_cancel(f1, f2, f3, f4, f5, f6)
        cancel_data, cancel_msg = dask_result.compute()
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
        _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                          "已下单-撤单 判断是否需要撤单")
        if cancel_data:
            cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
            l2_log.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
            # 撤单
            if cls.cancel_buy(code, cancel_msg):
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "已下单-撤单 耗时")
                # 撤单成功,继续计算下单
                cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "处理剩余数据 耗时")
            else:
                # 撤单尚未成功
@@ -417,41 +403,43 @@
            # 如果有虚拟下单需要真实下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time)
                l2_log.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time)
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "已下单-真实下单 耗时")
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "已虚拟下单-执行真实下单 外部耗时")
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        __start_time = tool.get_now_timestamp()
        can, reason = cls.__can_buy(code)
        __start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - __start_time, "最后判断是否能下单", force=True)
        # 删除虚拟下单
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
        if not can:
            cls.debug(code, "不可以下单,原因:{}", reason)
            l2_log.debug(code, "不可以下单,原因:{}", reason)
            if not reason.startswith("买1价不为涨停价"):
                # 中断买入
                trade_manager.break_buy(code, reason)
            return
        else:
            cls.debug(code, "可以下单,原因:{}", reason)
            l2_log.debug(code, "可以下单,原因:{}", reason)
            try:
                cls.debug(code, "开始执行买入")
                l2_log.debug(code, "开始执行买入")
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
                ################下单成功处理################
                trade_result_manager.real_buy_success(code)
                cls.debug(code, "执行买入成功")
                l2_log.debug(code, "执行买入成功")
            except Exception as e:
                cls.debug(code, "执行买入异常:{}", str(e))
                l2_log.debug(code, "执行买入异常:{}", str(e))
                pass
            finally:
                cls.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
                l2_log.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
    # 是否可以取消
    @classmethod
@@ -502,7 +490,7 @@
            total_datas = local_today_datas[code]
            try:
                sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code)
                cls.buy_debug(code, "卖1信息为:({},{},{})", sell1_time, sell1_price, sell1_volumn)
                l2_log.buy_debug(code, "卖1信息为:({},{},{})", sell1_time, sell1_price, sell1_volumn)
                if sell1_time is not None and sell1_volumn > 0:
                    # 获取执行位信息
@@ -591,18 +579,18 @@
            # 可以下单
            return True, None
        finally:
            l2_data_log.l2_time(code, cls.random_key[code], round((t.time() - __start_time) * 1000), "是否可以下单计算")
            l2_data_log.l2_time(code, round((t.time() - __start_time) * 1000), "是否可以下单计算")
    @classmethod
    def __cancel_buy(cls, code):
        try:
            cls.debug(code, "开始执行撤单")
            l2_log.debug(code, "开始执行撤单")
            trade_manager.start_cancel_buy(code)
            cls.debug(code, "执行撤单成功")
            l2_log.debug(code, "执行撤单成功")
            return True
        except Exception as e:
            logging.exception(e)
            cls.debug(code, "执行撤单异常:{}", str(e))
            l2_log.debug(code, "执行撤单异常:{}", str(e))
            return False
    @classmethod
@@ -626,13 +614,13 @@
            can_cancel, reason = cls.__can_cancel(code)
            if not can_cancel:
                # 不能取消
                cls.cancel_debug(code, "撤单中断,原因:{}", reason)
                cls.debug(code, "撤单中断,原因:{}", reason)
                l2_log.cancel_debug(code, "撤单中断,原因:{}", reason)
                l2_log.debug(code, "撤单中断,原因:{}", reason)
                return False
            cancel_result = cls.__cancel_buy(code)
            if cancel_result:
                trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index, total_datas)
        cls.debug(code, "执行撤单结束,原因:{}", msg)
        l2_log.debug(code, "执行撤单结束,原因:{}", msg)
        return True
    # 虚拟下单
@@ -646,7 +634,7 @@
                            new_add=True):
        if compute_end_index < compute_start_index:
            return
        _start_time = round(t.time() * 1000)
        _start_time = tool.get_now_timestamp()
        total_datas = local_today_datas[code]
        # 处理安全笔数
        cls.__buyL2SafeCountManager.compute_left_rate(code, compute_start_index, compute_end_index, total_datas,
@@ -657,6 +645,7 @@
            code)
        # 是否为新获取到的位置
        new_get_single = False
        if buy_single_index is None:
            place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
            continue_count = 3
@@ -669,33 +658,46 @@
                                                               compute_end_index)
            buy_single_index = _index
            if has_single:
                new_get_single = True
                num = 0
                count = 0
                cls.debug(code, "获取到买入信号起始点:{} ,计算范围:{}-{} ,数据:{}", buy_single_index, compute_start_index,
                l2_log.debug(code, "获取到买入信号起始点:{} ,计算范围:{}-{} ,数据:{}", buy_single_index, compute_start_index,
                          compute_end_index, total_datas[buy_single_index])
                # 如果是今天第一次有下单开始信号,需要设置大单起始点
                cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "下单信号计算时间")
        _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "下单信号计算时间")
        if buy_single_index is None:
            # 未获取到买入信号,终止程序
            return None
        # 开始计算的位置
        start_process_index = min(buy_single_index, compute_start_index) if new_get_single else max(buy_single_index,
                                                                                                    compute_start_index)
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index,
        cls.l2BigNumForMProcessor.process(code, start_process_index,
                                          compute_end_index,
                                          gpcode_manager.get_limit_up_price(code))
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "计算m值大单")
        _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "计算m值大单")
        threshold_money, msg = cls.__get_threshmoney(code)
        # 买入纯买额统计
        compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code, max(
            buy_single_index, compute_start_index), compute_end_index, num, count, threshold_money, buy_single_index,
                                                                                                             max_num_set)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "纯买额统计时间")
        cls.debug(code, "m值-{} m值因子-{}", threshold_money, msg)
        _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "m值阈值计算")
        # 买入纯买额统计
        compute_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code,
                                                                                                             start_process_index,
                                                                                                             compute_end_index,
                                                                                                             num, count,
                                                                                                             threshold_money,
                                                                                                             buy_single_index,
                                                                                                             max_num_set)
        _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "纯买额统计时间")
        l2_log.debug(code, "m值-{} m值因子-{}", threshold_money, msg)
        # 买入信号位与计算位置间隔2s及以上了
        if rebegin_buy_pos is not None:
@@ -704,25 +706,37 @@
            return
        if compute_index is not None:
            cls.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 纯买单数:{} 数据:{}", compute_index, threshold_money, buy_nums,
                      buy_count,
                      total_datas[compute_index])
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count,
                                        max_num_set_new)
            # 如果是今天第一次有下单执行信号,涨停时间(买入执行位时间)
            limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"])
            # 虚拟下单
            cls.__virtual_buy(code, buy_single_index, compute_index, capture_time)
            # 删除之前的所有撤单信号
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            l2_log.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 纯买单数:{} 数据:{}", compute_index, threshold_money, buy_nums,
                         buy_count, total_datas[compute_index])
            # 涨停封单额计算
            L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, buy_single_index, compute_index,
            f1 = dask.delayed(cls.__save_order_begin_data)(code, buy_single_index, compute_index, compute_index,
                                                           buy_nums, buy_count, max_num_set_new)
            f2 = dask.delayed(limit_up_time_manager.save_limit_up_time)(code, total_datas[compute_index]["val"]["time"])
            f3 = dask.delayed(cls.__virtual_buy)(code, buy_single_index, compute_index, capture_time)
            f4 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
            f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(cls.random_key[code], code, buy_single_index,
                                                                        compute_index,
                                                     buy_single_index,
                                                     buy_exec_index, False)
            dask.compute(f1, f2, f3, f4, f5)
            _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
            # 已被并行处理
            # # 记录买入信号位置
            # cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count,
            #                             max_num_set_new)
            # # 如果是今天第一次有下单执行信号,涨停时间(买入执行位时间)
            # limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"])
            # # 虚拟下单
            # cls.__virtual_buy(code, buy_single_index, compute_index, capture_time)
            # # 删除之前的所有撤单信号
            # l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            #
            # # 涨停封单额计算
            # L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, buy_single_index, compute_index,
            #                                          buy_single_index,
            #                                          buy_exec_index, False)
            _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                              "记录执行买入数据", force=True)
            # 数据是否处理完毕
@@ -732,9 +746,9 @@
                                                                                  buy_single_index, compute_index,
                                                                                  total_datas, cls.random_key[code],
                                                                                  True)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "S级大单处理耗时", force=True)
                cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
                l2_log.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
                # 数据已经处理完毕,如果还没撤单就实际下单
                if need_cancel:
                    if cls.cancel_buy(code, "S级大单撤销"):
@@ -746,13 +760,13 @@
                SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index,
                                                       compute_index, total_datas, cls.random_key[code], False)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "S级大单处理耗时", force=True)
                # 数据尚未处理完毕,进行下一步处理
                cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                l2_log.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                # 处理撤单步骤
                cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  f"处理撤单步骤耗时,范围:{compute_index + 1}-{compute_end_index}", force=True)
        else:
@@ -841,23 +855,11 @@
    def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count,
                                  threshold_money, buy_single_index, max_num_set):
        def get_threshold_count():
            count = threshold_count  # - sub_threshold_count
            # if count < 3:
            #     count = 3
            # count = round(count * buy1_factor)
            # # 最高30笔,最低8笔
            # if count > 21:
            #     count = 21
            # if count < 8:
            #     count = 8
            count = threshold_count
            return count
        _start_time = t.time()
        total_datas = local_today_datas[code]
        # 计算从买入信号开始到计算开始位置的大单数量
        sub_threshold_count = cls.__compute_big_money_count(total_datas, buy_single_index, compute_start_index - 1)
        if sub_threshold_count < 0:
            sub_threshold_count = 0
        buy_nums = origin_num
        buy_count = origin_count
@@ -868,15 +870,6 @@
        # 目标手数
        threshold_num = round(threshold_money / (limit_up_price * 100))
        buy1_factor = 1
        # 获取买1是否为涨停价
        if buy1_price is None:
            buy1_factor = 1.3
        elif limit_up_price is None:
            buy1_factor = 1.3
        elif abs(float(buy1_price) - float(limit_up_price)) >= 0.01:
            print("买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price))
            buy1_factor = 1.3
        # 目标订单数量
        threshold_count = cls.__buyL2SafeCountManager.get_safe_count(code)
@@ -899,6 +892,9 @@
            # 第一次下单需要大单最少2笔,以后只需要1笔
            big_num_count = 1
        # 较大单的手数
        bigger_num = round(5900 / limit_up_price)
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
@@ -917,22 +913,17 @@
            # 涨停买
            if L2DataUtil.is_limit_up_price_buy(_val):
                if l2_data_util.is_big_money(_val):
                    # sub_threshold_count += int(total_datas[i]["re"])
                    max_buy_num_set.add(i)
                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
                if _val["num"] >= bigger_num:
                    trigger_buy = True
                    # 只统计59万以上的金额
                    buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                    buy_count += int(total_datas[i]["re"])
                    if buy_nums >= threshold_num and buy_count >= get_threshold_count():
                        logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}, 大单数量:{}", code,
                                                 i,
                                                 buy_nums,
                                                 threshold_num, buy_count, get_threshold_count(), sub_threshold_count, )
                        logger_l2_trade_buy.info(
                            f"{code}获取到买入执行点:{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count} 目标纯买单数:{get_threshold_count()}, 大单数量:{len(max_buy_num_set)}")
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                if l2_data_util.is_big_money(_val):
                    sub_threshold_count -= int(total_datas[i]["re"])
                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
                if _val["num"] >= bigger_num:
                    # 只统计59万以上的金额
                    # 涨停买撤
                    # 判断买入位置是否在买入信号之前
@@ -944,20 +935,24 @@
                        if buy_index >= buy_single_index:
                            buy_nums -= int(_val["num"]) * int(data["re"])
                            buy_count -= int(data["re"])
                            cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                            # 大单撤销
                            max_buy_num_set.discard(buy_index)
                            l2_log.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                        else:
                            cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                            l2_log.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                            if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                                # 同一秒,当作买入信号之后处理
                                buy_nums -= int(_val["num"]) * int(data["re"])
                                buy_count -= int(data["re"])
                                cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                                # 大单撤销
                                max_buy_num_set.discard(buy_index)
                                l2_log.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                    else:
                        # 未找到买撤数据的买入点
                        cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                        l2_log.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                        buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                        buy_count -= int(total_datas[i]["re"])
            cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
            l2_log.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                          buy_nums, threshold_num)
            # 有撤单信号,且小于阈值
@@ -965,7 +960,7 @@
                    max_buy_num_set) >= big_num_count:
                return i, buy_nums, buy_count, None, max_buy_num_set
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{} 目标大单数量:{}",
        l2_log.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{} 目标大单数量:{}",
                      compute_start_index,
                      buy_nums,
                      threshold_num, buy_count, get_threshold_count(), len(max_buy_num_set), big_num_count)
l2/l2_data_util.py
@@ -122,7 +122,7 @@
        # 保存最近的数据
        __start_time = round(time.time() * 1000)
        redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
        l2_data_log.l2_time(code, randomKey, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
        l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
        # 设置进内存
        local_latest_datas[code] = datas
        __set_l2_data_latest_count(code, len(datas))
l2/l2_log.py
@@ -1,15 +1,17 @@
from log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_trade
def debug(random_key, code, content, *args):
    logger_l2_trade.debug(("thread-id={} code={}  ".format(random_key, code) + content).format(*args))
threadIds = {}
def buy_debug(random_key, code, content, *args):
def debug( code, content, *args):
    logger_l2_trade.debug(("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
def buy_debug(code, content, *args):
    logger_l2_trade_buy.debug(
        ("thread-id={} code={}  ".format(random_key, code) + content).format(*args))
        ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
def cancel_debug(random_key, code, content, *args):
def cancel_debug(code, content, *args):
    logger_l2_trade_cancel.debug(
        ("thread-id={} code={}  ".format(random_key, code) + content).format(*args))
        ("thread-id={} code={}  ".format(threadIds.get(code), code) + content).format(*args))
l2_data_util.py
@@ -28,7 +28,7 @@
# 是否为大单
def is_big_money(val):
    price = float(val["price"])
    money = price * int(val["num"])
    money = price * val["num"]
    if price > 3.0:
        if money >= 30000:
            return True
l2_trade_test.py
@@ -71,9 +71,9 @@
                except Exception as e:
                    pass
    @unittest.skip("跳过此单元测试")
    # @unittest.skip("跳过此单元测试")
    def test_trade(self):
        code = "002328"
        code = "002131"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
@@ -85,7 +85,7 @@
                total_datas.insert(i, data)
        pos_list = log.get_l2_process_position(code)
        pos_list.insert(108,(375,448))
        # pos_list.insert(108,(375,448))
        if pos_list[0][0] > 0:
            pos_list.insert(0, (0, pos_list[0][0] - 1))
        del pos_list[-1]
log.py
@@ -350,7 +350,7 @@
if __name__ == '__main__':
    # logger_l2_h_cancel.info("test")
    # logger_l2_process_time.info("test123")
    codes = ["002328"]
    codes = ["002131", "003035", "002131"]
    for code in codes:
        export_logs(code)
server.py
@@ -19,7 +19,7 @@
import gpcode_manager
import authority
import juejin
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log
import l2_data_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil
import l2.l2_data_util
@@ -96,12 +96,14 @@
                return_str = "OK"
                if type == 0:
                    try:
                        origin_start_time = round(time.time() * 1000)
                        __start_time = round(time.time() * 1000)
                        do_id = random.randint(0, 100000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data(
                            _str)
                        l2_log.threadIds[code] = random.randint(0, 100000)
                        if channel == 0:
                            now_time = round(time.time() * 1000)
                            if self.last_time.get(channel) is not None:
@@ -120,12 +122,12 @@
                            # 10ms的网络传输延时
                            capture_timestamp = __start_time - process_time - 10
                            # print("截图时间:", process_time)
                            __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                            __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                               "截图时间:{} 数据解析时间".format(process_time))
                            cid, pid = gpcode_manager.get_listen_code_pos(code)
                            __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                            __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                               "l2获取代码位置耗时")
                            # 判断目标代码位置是否与上传数据位置一致
                            if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
@@ -134,19 +136,19 @@
                                    l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                    __start_time = round(time.time() * 1000)
                                    if gpcode_manager.is_listen(code):
                                        __start_time = l2_data_log.l2_time(code, do_id,
                                        __start_time = l2_data_log.l2_time(code,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2外部数据预处理耗时")
                                        l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp,
                                                                                         do_id)
                                        __start_time = l2_data_log.l2_time(code, do_id,
                                        __start_time = l2_data_log.l2_time(code,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2数据有效处理外部耗时",
                                                                           False)
                                        # 保存原始数据数量
                                        l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                        if round(time.time() * 1000) - __start_time > 20:
                                            l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                            l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                "异步保存原始数据条数耗时",
                                                                False)
@@ -173,7 +175,7 @@
                                    __end_time = round(time.time() * 1000)
                                    # 只记录大于40ms的数据
                                    if __end_time - origin_start_time > 100:
                                        l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time,
                                        l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time,
                                                            "l2数据处理总耗时",
                                                            True)
                    except Exception as e:
tool.py
@@ -44,6 +44,10 @@
    return time_str
def get_now_timestamp():
    return round(time.time() * 1000)
# 转为价格,四舍五入保留2位小数
def to_price(_decimal):
    return _decimal.quantize(decimal.Decimal("0.00"), decimal.ROUND_HALF_UP)
trade/trade_manager.py
@@ -5,11 +5,13 @@
# 交易管理器
import time
import dask
from db import mysql_data, redis_manager
from trade import trade_data_manager, l2_trade_util
from trade.trade_gui import THSBuyWinManagerNew, THSGuiTrade
import time as t
from l2 import l2_data_manager
from l2 import l2_data_manager, l2_data_log
from log import *
@@ -165,26 +167,72 @@
# 开始交易
def start_buy(code, capture_timestamp, last_data, last_data_index):
    # 是否禁止交易
    @dask.delayed
    def is_forbidden(code):
    if l2_trade_util.is_in_forbidden_trade_codes(code):
        raise Exception("禁止交易")
            return Exception("禁止交易")
        return None, None
    @dask.delayed
    def is_state_right(code):
    trade_state = get_trade_state(code)
    if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
        raise Exception("代码处于不可交易状态")
            return Exception("代码处于不可交易状态"), trade_state
        return None, trade_state
    @dask.delayed
    def is_money_enough(code):
    money = get_available_money()
    if money is None:
        raise Exception("未获取到账户可用资金")
            return Exception("未获取到账户可用资金"), None
    price = gpcode_manager.get_limit_up_price(code)
    if price is None:
        raise Exception("尚未获取到涨停价")
            return Exception("尚未获取到涨停价"), None
    # 买一手的资金是否足够
    if price * 100 > money:
        raise Exception("账户可用资金不足")
            return Exception("账户可用资金不足"), price
        return None, price
    @dask.delayed
    def can_trade(*args):
        for arg in args:
            if arg[0] is not None:
                return arg[0], None, None
        return None, args[1][1], args[2][1]
    _start_time = tool.get_now_timestamp()
    f1 = is_forbidden(code)
    f2 = is_state_right(code)
    f3 = is_money_enough(code)
    dask_result = can_trade(f1, f2, f3)
    ex, trade_state, price = dask_result.compute()
    if ex is not None:
        raise ex
    # 并行改造
    # # 是否禁止交易
    # if l2_trade_util.is_in_forbidden_trade_codes(code):
    #     raise Exception("禁止交易")
    # trade_state = get_trade_state(code)
    # if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
    #     raise Exception("代码处于不可交易状态")
    # money = get_available_money()
    # if money is None:
    #     raise Exception("未获取到账户可用资金")
    # price = gpcode_manager.get_limit_up_price(code)
    # if price is None:
    #     raise Exception("尚未获取到涨停价")
    # # 买一手的资金是否足够
    # if price * 100 > money:
    #     raise Exception("账户可用资金不足")
    print("开始买入")
    logger_trade.info("{}开始买入".format(code))
    set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER)
    _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "买入判断时间", force=True)
    __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index)
    l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "异步买入时间", force=True)
# 中断买入
@@ -272,6 +320,7 @@
            time.sleep(0.1+0.05*i)
            pass
# 取消委托成功
def __cancel_success(code):
    trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
trade/trade_result_manager.py
@@ -1,6 +1,8 @@
# 虚拟买成功
import logging
import dask
from l2 import l2_data_manager
from l2.cancel_buy_strategy import HourCancelBigNumComputer, SecondCancelBigNumComputer, L2LimitUpSellStatisticUtil
from l2.l2_data_util import local_today_datas, local_today_num_operate_map
@@ -22,41 +24,61 @@
# 虚拟撤成功
def virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas):
    l2_data_manager.TradePointManager.delete_buy_point(code)
    l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
    l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
    l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
    f1 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_point)(code)
    f2 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
    f3 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code)
    f4 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code)
    # 安全笔数计算
    __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, total_datas[-1]["index"])
    SecondCancelBigNumComputer.cancel_success(code)
    f5 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index,
                                                                     total_datas[-1]["index"])
    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
    dask.compute(f1, f2, f3, f4, f5, f6)
# 真实买成功
def real_buy_success(code):
    @dask.delayed
    def clear_max_buy1_volume(code):
    # 下单成功,需要删除最大买1
    __thsBuy1VolumnManager.clear_max_buy1_volume(code)
    # 获取买入位置信息
    @dask.delayed
    def safe_count(code, buy_single_index, buy_exec_index):
    try:
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
            code)
        __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, None)
        except Exception as e:
            logging.exception(e)
            logger_l2_error.exception(e)
    @dask.delayed
    def h_cancel(code, buy_single_index, buy_exec_index):
        try:
        HourCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index,
                                                     local_today_datas.get(code),
                                                     local_today_num_operate_map.get(code))
    except Exception as e:
        logging.exception(e)
        logger_l2_error.exception(e)
    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
        code)
    f1 = clear_max_buy1_volume(code)
    f2 = safe_count(code, buy_single_index, buy_exec_index)
    f3 = h_cancel(code, buy_single_index, buy_exec_index)
    dask.compute(f1, f2, f3)
    l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
# 真实撤成功
def real_cancel_success(code, buy_single_index, buy_exec_index, total_datas):
    # 安全笔数计算
    __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, total_datas[-1]["index"])
    f1 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index,
                                                                     total_datas[-1]["index"])
    # 取消买入标识
    l2_data_manager.TradePointManager.delete_buy_point(code)
    l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
    l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
    l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
    SecondCancelBigNumComputer.cancel_success(code)
    f2 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_point)(code)
    f3 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
    f4 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code)
    f5 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code)
    f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
    dask.compute(f1, f2, f3, f4, f5, f6)