Administrator
2023-08-16 965bd2fea182851382f61d9c7c1492d04546bf4d
++++++++++++++++
取消不必要的并行计算框架
3个文件已修改
64 ■■■■ 已修改文件
l2/l2_data_manager_new.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_result_manager.py 33 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py
@@ -1130,20 +1130,19 @@
                         buy_nums,
                         buy_count, total_datas[compute_index], cls.volume_rate_info[code])
            f1 = dask.delayed(cls.__save_order_begin_data)(code, buy_single_index, compute_index, compute_index,
            cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index,
                                                           buy_nums, buy_count, max_num_set_new,
                                                           cls.volume_rate_info[code][0])
            f2 = dask.delayed(limit_up_time_manager.LimitUpTimeManager().save_limit_up_time)(code,
            limit_up_time_manager.LimitUpTimeManager().save_limit_up_time(code,
                                                                                             total_datas[compute_index][
                                                                                                 "val"]["time"])
            f3 = dask.delayed(cls.__virtual_buy)(code, buy_single_index, compute_index, capture_time)
            f4 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_cancel_point)(code)
            cls.__virtual_buy(code, buy_single_index, compute_index, capture_time)
            l2_data_manager.TradePointManager().delete_buy_cancel_point(code)
            # 暂时不需要
            # f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(code, buy_single_index,
            #                                                             compute_index,
            #                                                             buy_single_index,
            #                                                             buy_exec_index, False)
            dask.compute(f1, f2, f3, f4)
            # 已被并行处理
            # # 记录买入信号位置
trade/trade_manager.py
@@ -386,20 +386,20 @@
# 开始交易
def start_buy(code, capture_timestamp, last_data, last_data_index):
    @dask.delayed
    # @dask.delayed
    def is_forbidden(code):
        if l2_trade_util.is_in_forbidden_trade_codes(code):
            return Exception("禁止交易")
        return None, None
    @dask.delayed
    # @dask.delayed
    def is_state_right(code):
        trade_state = CodesTradeStateManager().get_trade_state_cache(code)
        if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
            return Exception("代码处于不可交易状态"), trade_state
        return None, trade_state
    @dask.delayed
    # @dask.delayed
    def is_money_enough(code):
        money = AccountAvailableMoneyManager().get_available_money_cache()
        if money is None:
@@ -412,7 +412,7 @@
            return Exception("账户可用资金不足"), price
        return None, price
    @dask.delayed
    # @dask.delayed
    def can_trade(*args):
        for arg in args:
            if arg[0] is not None:
@@ -421,12 +421,14 @@
    _start_time = tool.get_now_timestamp()
    f1 = is_forbidden(code)
    f2 = is_state_right(code)
    f3 = is_money_enough(code)
    dask_result = can_trade(f1, f2, f3)
    ex, trade_state, price = dask_result.compute()
    if ex is not None:
    ex = is_forbidden(code)[0]
    if ex:
        raise ex
    ex, trade_state = is_state_right(code)
    if ex:
        raise ex
    ex, price = is_money_enough(code)
    if ex:
        raise ex
    # 并行改造
trade/trade_result_manager.py
@@ -1,8 +1,6 @@
# 虚拟买成功
import logging
import dask
from l2 import l2_data_manager
from l2.cancel_buy_strategy import HourCancelBigNumComputer, SecondCancelBigNumComputer, L2LimitUpSellStatisticUtil, \
    LCancelBigNumComputer, DCancelBigNumComputer
@@ -25,25 +23,25 @@
# 虚拟撤成功
def virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas):
    f1 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_point)(code)
    f2 = dask.delayed(l2_data_manager.TradePointManager().delete_buy_cancel_point)(code)
    l2_data_manager.TradePointManager().delete_buy_point(code)
    l2_data_manager.TradePointManager().delete_buy_cancel_point(code)
    # 安全笔数计算
    f5 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index,
    __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index,
                                                                     total_datas[-1]["index"])
    f6 = dask.delayed(SecondCancelBigNumComputer().cancel_success)(code)
    f7 = dask.delayed(DCancelBigNumComputer().cancel_success)(code)
    f8 = dask.delayed(LCancelBigNumComputer().cancel_success)(code)
    dask.compute(f1, f2, f5, f6, f7, f8)
    SecondCancelBigNumComputer().cancel_success(code)
    DCancelBigNumComputer().cancel_success(code)
    LCancelBigNumComputer().cancel_success(code)
    # dask.compute(f1, f2, f5, f6, f7, f8)
# 真实买成功
def real_buy_success(code):
    @dask.delayed
    # @dask.delayed
    def clear_max_buy1_volume(code):
        # 下单成功,需要删除最大买1
        __thsBuy1VolumnManager.clear_max_buy1_volume(code)
    @dask.delayed
    # @dask.delayed
    def safe_count(code, buy_single_index, buy_exec_index):
        try:
            __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, None)
@@ -51,7 +49,7 @@
            logging.exception(e)
            logger_l2_error.exception(e)
    @dask.delayed
    # @dask.delayed
    def h_cancel(code, buy_single_index, buy_exec_index):
        try:
            HourCancelBigNumComputer().place_order_success(code, buy_single_index, buy_exec_index,
@@ -61,7 +59,7 @@
            logging.exception(e)
            logger_l2_error.exception(e)
    @dask.delayed
    # @dask.delayed
    def l_cancel(code):
        try:
            LCancelBigNumComputer().del_watch_index(code)
@@ -72,11 +70,10 @@
    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
        code)
    f1 = clear_max_buy1_volume(code)
    f2 = safe_count(code, buy_single_index, buy_exec_index)
    f3 = h_cancel(code, buy_single_index, buy_exec_index)
    f4 = l_cancel(code)
    dask.compute(f1, f2, f3, f4)
    clear_max_buy1_volume(code)
    safe_count(code, buy_single_index, buy_exec_index)
    h_cancel(code, buy_single_index, buy_exec_index)
    l_cancel(code)
    l2_data_manager.TradePointManager().delete_buy_cancel_point(code)