Administrator
2023-08-16 aa3392d3d5b0b858deedd44abd7c4020eb565f4c
++++++++++++++++
bug修复
4个文件已修改
183 ■■■■■ 已修改文件
l2/cancel_buy_strategy.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 163 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_result_manager.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py
@@ -166,13 +166,11 @@
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        process_index = process_index_old
        # 下单次数
        place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code)
        if buy_single_index == start_index:
            # 第1次计算需要计算买入信号-执行位的净值
            left_big_num = self.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index,
                                                       total_data, place_order_count)
                                                       total_data, volume_rate_index)
            buy_num += left_big_num
            # 设置买入信号-买入执行位的数据不需要处理
            start_index = end_index + 1
@@ -233,7 +231,6 @@
                                    cancel_num += data["re"] * int(val["num"])
                    # 保存数据
                    if need_cancel:
                        rate__ = round(cancel_num / max(buy_num, 1), 2)
                        if rate__ > cancel_rate_threshold:
@@ -509,7 +506,6 @@
        l2_log.cancel_debug(code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        # 获取下单次数
        place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code)
        cancel_rate_threshold = self.__hCancelParamsManager.get_cancel_rate(volume_index)
        process_index = start_index
        # 是否有观测的数据撤单
l2/l2_data_manager_new.py
@@ -13,7 +13,7 @@
from db import redis_manager_delegate as redis_manager
from third_data.code_plate_key_manager import CodePlateKeyBuyManager
from trade import trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \
    trade_result_manager, first_code_score_manager, current_price_process_manager
    trade_result_manager, first_code_score_manager, current_price_process_manager, trade_data_manager
from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log, l2_data_source_util, code_price_manager, \
    transaction_progress
from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \
@@ -32,7 +32,7 @@
import dask
from trade.trade_manager import TradeTargetCodeModeManager
from trade.trade_manager import TradeTargetCodeModeManager, AccountAvailableMoneyManager
class L2DataManager:
@@ -215,6 +215,17 @@
    __last_buy_single_dict = {}
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
    __latest_process_unique_keys = {}
    # 初始化
    __TradePointManager = l2_data_manager.TradePointManager()
    __SecondCancelBigNumComputer = SecondCancelBigNumComputer()
    __HourCancelBigNumComputer = HourCancelBigNumComputer()
    __LCancelBigNumComputer = LCancelBigNumComputer()
    __TradeStateManager = trade_manager.TradeStateManager()
    __CodesTradeStateManager = trade_manager.CodesTradeStateManager()
    __PauseBuyCodesManager = gpcode_manager.PauseBuyCodesManager()
    __Buy1PriceManager = code_price_manager.Buy1PriceManager()
    __AccountAvailableMoneyManager = AccountAvailableMoneyManager()
    __TradeBuyDataManager = trade_data_manager.TradeBuyDataManager()
    # 获取代码评分
    @classmethod
@@ -271,8 +282,8 @@
    @classmethod
    def process_huaxin(cls, code, origin_datas):
        print("process_huaxin", code, len(origin_datas))
        origin_start_time = round(t.time() * 1000)
        datas = None
        origin_start_time = round(t.time() * 1000)
        try:
            # 加载历史的L2数据
            is_normal = l2.l2_data_util.load_l2_data(code, load_latest=False)
@@ -280,6 +291,7 @@
                print("历史数据异常:", code)
                # 数据不正常需要禁止交易
                l2_trade_util.forbidden_trade(code)
            origin_start_time = round(t.time() * 1000)
            # 转换数据格式
            _start_index = 0
            total_datas = local_today_datas.get(code)
@@ -369,7 +381,7 @@
            # 时间差不能太大才能处理
            if not l2_trade_util.is_in_forbidden_trade_codes(code):
                # 判断是否已经挂单
                state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
                state = cls.__CodesTradeStateManager.get_trade_state_cache(code)
                start_index = len(total_datas) - len(add_datas)
                end_index = len(total_datas) - 1
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
@@ -476,18 +488,19 @@
            # return cancel_data, cancel_msg
        # S撤
        @dask.delayed
        # @dask.delayed
        def s_cancel():
            _start_time = round(t.time() * 1000)
            # S撤单计算,看秒级大单撤单
            try:
                b_need_cancel, b_cancel_data = SecondCancelBigNumComputer().need_cancel(code, buy_single_index,
                                                                                        buy_exec_index, start_index,
                                                                                        end_index, total_data,
                                                                                        code_volumn_manager.get_volume_rate_index(
                                                                                            buy_volume_rate),
                                                                                        cls.volume_rate_info[code][1],
                                                                                        is_first_code)
                b_need_cancel, b_cancel_data = cls.__SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                            buy_exec_index, start_index,
                                                                                            end_index, total_data,
                                                                                            code_volumn_manager.get_volume_rate_index(
                                                                                                buy_volume_rate),
                                                                                            cls.volume_rate_info[code][
                                                                                                1],
                                                                                            is_first_code)
                if b_need_cancel:
                    return b_cancel_data, "S大单撤销比例触发阈值"
            except Exception as e:
@@ -498,19 +511,19 @@
            return None, ""
        # H撤
        @dask.delayed
        # @dask.delayed
        def h_cancel():
            _start_time = round(t.time() * 1000)
            try:
                b_need_cancel, b_cancel_data = HourCancelBigNumComputer().need_cancel(code, buy_single_index,
                                                                                      buy_exec_index, start_index,
                                                                                      end_index, total_data,
                                                                                      local_today_num_operate_map.get(
                                                                                          code),
                                                                                      code_volumn_manager.get_volume_rate_index(
                                                                                          buy_volume_rate),
                                                                                      cls.volume_rate_info[code][1],
                                                                                      is_first_code)
                b_need_cancel, b_cancel_data = cls.__HourCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                          buy_exec_index, start_index,
                                                                                          end_index, total_data,
                                                                                          local_today_num_operate_map.get(
                                                                                              code),
                                                                                          code_volumn_manager.get_volume_rate_index(
                                                                                              buy_volume_rate),
                                                                                          cls.volume_rate_info[code][1],
                                                                                          is_first_code)
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "H撤销比例触发阈值"
            except Exception as e:
@@ -520,15 +533,15 @@
            return None, ""
        # L撤
        @dask.delayed
        # @dask.delayed
        def l_cancel():
            _start_time = round(t.time() * 1000)
            try:
                b_need_cancel, b_cancel_data = LCancelBigNumComputer().need_cancel(code,
                                                                                   buy_exec_index, start_index,
                                                                                   end_index, total_data,
                                                                                   local_today_num_operate_map.get(
                                                                                       code), is_first_code)
                b_need_cancel, b_cancel_data = cls.__LCancelBigNumComputer.need_cancel(code,
                                                                                       buy_exec_index, start_index,
                                                                                       end_index, total_data,
                                                                                       local_today_num_operate_map.get(
                                                                                           code), is_first_code)
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "L撤销比例触发阈值"
            except Exception as e:
@@ -577,19 +590,27 @@
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
            code)
        # 老版本撤单
        # f1 = compute_safe_count()
        # f2 = compute_m_big_num()
        # f3 = s_cancel()
        # f4 = h_cancel()
        # f5 = buy_1_cancel()
        # f6 = sell_cancel()
        # f7 = l_cancel()
        # dask_result = is_need_cancel(f1, f2, f3, f4, f5, f6, f7)
        # if is_first_code:
        #     dask_result = is_need_cancel(f3, f4, f7)
        # l2_log.debug(code, "撤单计算开始")
        # cancel_data, cancel_msg = dask_result.compute()
        # l2_log.debug(code, "撤单计算结束")
        f1 = compute_safe_count()
        f2 = compute_m_big_num()
        f3 = s_cancel()
        f4 = h_cancel()
        f5 = buy_1_cancel()
        f6 = sell_cancel()
        f7 = l_cancel()
        dask_result = is_need_cancel(f1, f2, f3, f4, f5, f6, f7)
        if is_first_code:
            dask_result = is_need_cancel(f3, f4, f7)
        l2_log.debug(code, "撤单计算开始")
        cancel_data, cancel_msg = dask_result.compute()
        # 依次处理
        cancel_data, cancel_msg = s_cancel()
        if not cancel_data:
            cancel_data, cancel_msg = h_cancel()
        if not cancel_data:
            cancel_data, cancel_msg = l_cancel()
        l2_log.debug(code, "撤单计算结束")
        _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                          "已下单-撤单 判断是否需要撤单")
@@ -660,7 +681,7 @@
                                        last_data_index)
                l2_log.debug(code, "执行买入成功")
                ################下单成功处理################
                trade_result_manager.real_buy_success(code)
                trade_result_manager.real_buy_success(code, cls.__TradePointManager)
                l2_log.debug(code, "处理买入成功")
                params_desc = cls.__l2PlaceOrderParamsManagerDict[code].get_buy_rank_desc()
                l2_log.debug(code, params_desc)
@@ -714,7 +735,7 @@
    @classmethod
    def __can_buy(cls, code):
        __start_time = t.time()
        if not trade_manager.TradeStateManager().is_can_buy_cache():
        if not cls.__TradeStateManager.is_can_buy_cache():
            return False, True, f"今日已禁止交易"
        # 之前的代码
        # 首板代码且尚未涨停过的不能下单
@@ -841,10 +862,10 @@
    @classmethod
    def __can_buy_first(cls, code):
        if not trade_manager.TradeStateManager().is_can_buy_cache():
        if not cls.__TradeStateManager.is_can_buy_cache():
            return False, True, f"今日已禁止交易"
        if gpcode_manager.PauseBuyCodesManager().is_in_cache(code):
        if cls.__PauseBuyCodesManager.is_in_cache(code):
            return False, True, f"该代码被暂停交易"
        limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -886,7 +907,7 @@
                zyltgb = global_util.zyltgb_map.get(code)
            if zyltgb >= 200 * 100000000:
                buy1_price = code_price_manager.Buy1PriceManager().get_buy1_price(code)
                buy1_price = cls.__Buy1PriceManager.get_buy1_price(code)
                if buy1_price is None:
                    return False, True, f"尚未获取到买1价"
                dif = float(limit_up_price) - float(buy1_price)
@@ -894,13 +915,13 @@
                if dif > 0.10001:
                    return False, True, f"自由流通200亿以上,买1剩余档数大于10档,买一({buy1_price})涨停({limit_up_price})"
        open_limit_up_lowest_price = code_price_manager.Buy1PriceManager().get_open_limit_up_lowest_price(code)
        open_limit_up_lowest_price = cls.__Buy1PriceManager.get_open_limit_up_lowest_price(code)
        price_pre_close = gpcode_manager.CodePrePriceManager.get_price_pre_cache(code)
        if open_limit_up_lowest_price and (
                float(open_limit_up_lowest_price) - price_pre_close) / price_pre_close < 0.05:
            return False, True, f"炸板后最低价跌至5%以下"
        limit_up_info = code_price_manager.Buy1PriceManager().get_limit_up_info(code)
        limit_up_info = cls.__Buy1PriceManager.get_limit_up_info(code)
        if limit_up_info[0] is None and False:
            total_data = local_today_datas.get(code)
            buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
@@ -908,13 +929,13 @@
            # 之前没有涨停过
            # 统计买入信号位到当前位置没有撤的大单金额
            min_money_w = l2_data_util.get_big_money_val(float(total_data[buy_single_index]["val"]["price"])) // 10000
            left_big_num = l2.cancel_buy_strategy.SecondCancelBigNumComputer().compute_left_big_num(code,
                                                                                                    buy_single_index,
                                                                                                    buy_exec_index,
                                                                                                    total_data[-1][
                                                                                                        "index"],
                                                                                                    total_data,
                                                                                                    0, min_money_w)
            left_big_num = cls.__SecondCancelBigNumComputer.compute_left_big_num(code,
                                                                                 buy_single_index,
                                                                                 buy_exec_index,
                                                                                 total_data[-1][
                                                                                     "index"],
                                                                                 total_data,
                                                                                 0, min_money_w)
            if left_big_num > 0:
                # 重新获取分数与分数索引
                limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code)
@@ -1141,7 +1162,7 @@
            l2_log.debug(code, "save_limit_up_time")
            cls.__virtual_buy(code, buy_single_index, compute_index, capture_time)
            l2_log.debug(code, "__virtual_buy")
            l2_data_manager.TradePointManager().delete_buy_cancel_point(code)
            cls.__TradePointManager.delete_buy_cancel_point(code)
            l2_log.debug(code, "delete_buy_cancel_point")
            # 暂时不需要
            # f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(code, buy_single_index,
@@ -1172,13 +1193,13 @@
            # 数据是否处理完毕
            if compute_index >= compute_end_index:
                need_cancel, cancel_data = SecondCancelBigNumComputer().need_cancel(code, buy_single_index,
                                                                                    compute_index,
                                                                                    buy_single_index, compute_index,
                                                                                    total_datas, is_first_code,
                                                                                    cls.volume_rate_info[code][1],
                                                                                    cls.volume_rate_info[code][1],
                                                                                    True)
                need_cancel, cancel_data = cls.__SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                        compute_index,
                                                                                        buy_single_index, compute_index,
                                                                                        total_datas, is_first_code,
                                                                                        cls.volume_rate_info[code][1],
                                                                                        cls.volume_rate_info[code][1],
                                                                                        True)
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
                                                  "S级大单处理耗时", force=True)
                l2_log.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
@@ -1190,10 +1211,10 @@
                else:
                    cls.__buy(code, capture_time, total_datas[compute_index], compute_index, is_first_code)
            else:
                SecondCancelBigNumComputer().need_cancel(code, buy_single_index, compute_index, buy_single_index,
                                                         compute_index, total_datas, is_first_code,
                                                         cls.volume_rate_info[code][1],
                                                         cls.volume_rate_info[code][1], False)
                cls.__SecondCancelBigNumComputer.need_cancel(code, buy_single_index, compute_index, buy_single_index,
                                                             compute_index, total_datas, is_first_code,
                                                             cls.volume_rate_info[code][1],
                                                             cls.volume_rate_info[code][1], False)
                l2_log.debug(code, "S级大单处理")
                _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time,
@@ -1225,17 +1246,17 @@
    # 获取下单起始信号
    @classmethod
    def __get_order_begin_pos(cls, code):
        buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
        buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = cls.__TradePointManager.get_buy_compute_start_data_cache(
            code)
        return buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate
    # 保存下单起始信号
    @classmethod
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set,
    def __save_order_begin_data(cls, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set,
                                volume_rate):
        TradePointManager().set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num,
                                                       count,
                                                       max_num_set, volume_rate)
        cls.__TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num,
                                                           count,
                                                           max_num_set, volume_rate)
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
l2/l2_data_util.py
@@ -152,7 +152,7 @@
        # 保存最近的数据
        __start_time = round(time.time() * 1000)
        if datas:
            RedisUtils.setex(_redisManager.getRedis(), "l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
            RedisUtils.setex_async(_redisManager.getRedis(), "l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
            l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
            # 设置进内存
            local_latest_datas[code] = datas
trade/trade_result_manager.py
@@ -15,10 +15,12 @@
def virtual_buy_success(code):
    # 已经不需要了,暂时注释掉
    # 增加下单计算
    trade_data_manager.PlaceOrderCountManager().place_order(code)
    # trade_data_manager.PlaceOrderCountManager().place_order(code)
    # 删除之前的板上卖信息
    L2LimitUpSellStatisticUtil().delete(code)
    # L2LimitUpSellStatisticUtil().delete(code)
    pass
# 虚拟撤成功
@@ -35,7 +37,7 @@
# 真实买成功
def real_buy_success(code):
def real_buy_success(code, tradePointManager):
    # @dask.delayed
    def clear_max_buy1_volume(code):
        # 下单成功,需要删除最大买1
@@ -67,14 +69,14 @@
            logging.exception(e)
            logger_l2_error.exception(e)
    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, volume_rate = tradePointManager.get_buy_compute_start_data_cache(
        code)
    clear_max_buy1_volume(code)
    safe_count(code, buy_single_index, buy_exec_index)
    h_cancel(code, buy_single_index, buy_exec_index)
    l_cancel(code)
    l2_data_manager.TradePointManager().delete_buy_cancel_point(code)
    tradePointManager.delete_buy_cancel_point(code)
# 真实撤成功