Administrator
2023-11-22 1e64a42737bb6cc7192c68633d3c168ca150da97
l2/cancel_buy_strategy.py
@@ -15,6 +15,7 @@
import l2_data_util
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from l2.code_price_manager import Buy1PriceManager
from l2.l2_data_manager import OrderBeginPosInfo
from l2.l2_sell_manager import L2LimitUpSellManager
from log_module import async_log_util
@@ -486,12 +487,12 @@
        transaction_index = self.__transaction_progress_index_dict.get(code)
        if transaction_index:
            # 不能计算成交进度以前的数据
            start_compute_index = max(transaction_index + 1, start_compute_index)
            start_compute_index = transaction_index + 1  # max(transaction_index + 1, start_compute_index)
        total_datas = local_today_datas.get(code)
        # -----------------计算H上-------------------
        watch_indexes_up = set()
        for i in range(start_compute_index, real_place_order_index):
        for i in range(real_place_order_index - 1, start_compute_index + 1, -1):
            data = total_datas[i]
            val = data['val']
            if not L2DataUtil.is_limit_up_price_buy(val):
@@ -505,53 +506,37 @@
                                                                                                         code))
            if left_count > 0:
                watch_indexes_up.add(i)
                if len(watch_indexes_up) >= 3:
                    break
        # ------------------计算H下-----------------------
        # 计算结束位置
        total_num = 0
        # 获取m值数据
        thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
        thresh_hold_money = Buy1PriceManager().get_latest_buy1_money(code)
        thresh_hold_money = thresh_hold_money
        thresh_hold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100)
        end_index = real_place_order_index + 1
        watch_indexes = set()
        for i in range(real_place_order_index + 1, total_datas[-1]["index"]):
            # 看是否撤单
            data = total_datas[i]
            val = data['val']
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            if float(val['price']) * val['num'] < 50 * 100:
                continue
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                     total_datas,
                                                                                                     local_today_canceled_buyno_map.get(
                                                                                                         code))
            if left_count > 0:
                watch_indexes.add(i)
                total_num += left_count * val["num"]
                if total_num > thresh_hold_num:
                    end_index = i
                count = len(watch_indexes)
                # 最小5笔,最大10笔
                if (total_num > thresh_hold_num and count >= 5) or count >= 10:
                    break
        MIN_MONEYS = [300, 200, 100, 50]
        watch_indexes = set()
        for min_money in MIN_MONEYS:
            for i in range(real_place_order_index + 1, end_index + 1):
                # 看是否撤单
                data = total_datas[i]
                val = data['val']
                if not L2DataUtil.is_limit_up_price_buy(val):
                    continue
                # 小金额过滤
                if float(val['price']) * val['num'] < min_money * 100:
                    continue
                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                         total_datas,
                                                                                                         local_today_canceled_buyno_map.get(
                                                                                                             code))
                if left_count > 0:
                    watch_indexes.add(i)
                    if len(watch_indexes) >= 5:
                        break
            if len(watch_indexes) >= 5:
                break
        if watch_indexes or watch_indexes_up:
            watch_indexes |= watch_indexes_up
            self.__save_watch_index_set(code, buy_single_index, watch_indexes)
@@ -905,7 +890,7 @@
    __redis_manager = redis_manager.RedisManager(0)
    __last_trade_progress_dict = {}
    __real_place_order_index_dict = {}
    __cancel_watch_index_cache = {}
    __cancel_watch_index_info_cache = {}
    # 成交位附近临近大单索引
    __near_by_trade_progress_index_cache = {}
@@ -926,11 +911,14 @@
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "l_cancel_watch_index-*")
            keys = RedisUtils.keys(__redis, "l_cancel_watch_index_info-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.smembers(__redis, k)
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_cache, code, val)
                val = RedisUtils.get(__redis, k)
                if val:
                    val = json.loads(val)
                    val[2] = set(val[2])
                CodeDataCacheUtil.set_cache(cls.__cancel_watch_index_info_cache, code, val)
            keys = RedisUtils.keys(__redis, "l_cancel_real_place_order_index-*")
            for k in keys:
@@ -955,30 +943,11 @@
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def __add_watch_indexes(self, code, indexes):
        if not indexes:
            return
        if code not in self.__cancel_watch_index_cache:
            self.__cancel_watch_index_cache[code] = set()
        for index in indexes:
            self.__cancel_watch_index_cache[code].add(index)
            RedisUtils.sadd_async(self.__db, f"l_cancel_watch_index-{code}", index)
        RedisUtils.expire_async(self.__db, f"l_cancel_watch_index-{code}", tool.get_expire())
    def __del_watch_indexes(self, code, indexes):
        if not indexes:
            return
        for index in indexes:
            if code in self.__cancel_watch_index_cache:
                self.__cancel_watch_index_cache[code].discard(index)
            RedisUtils.srem_async(self.__db, f"l_cancel_watch_index-{code}", index)
    def __set_watch_indexes(self, code, buy_single_index, indexes):
        self.__cancel_watch_index_cache[code] = indexes
        RedisUtils.delete_async(self.__db, f"l_cancel_watch_index-{code}")
        for index in indexes:
            RedisUtils.sadd_async(self.__db, f"l_cancel_watch_index-{code}", index)
    def __set_watch_indexes(self, code, buy_single_index, re_compute: int, indexes):
        self.__cancel_watch_index_info_cache[code] = (buy_single_index, re_compute, indexes)
        RedisUtils.delete_async(self.__db, f"l_cancel_watch_index_info-{code}")
        RedisUtils.setex_async(self.__db, f"l_cancel_watch_index_info-{code}", tool.get_expire(),
                               (buy_single_index, re_compute, list(indexes)))
        if indexes:
            trade_record_log_util.add_cancel_watch_indexes_log(code,
                                                               trade_record_log_util.CancelWatchIndexesInfo(
@@ -986,14 +955,11 @@
                                                                   buy_single_index,
                                                                   list(indexes)))
    def __get_watch_indexes(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"l_cancel_watch_index-{code}")
    def __get_watch_indexes_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_index_cache, code)
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_watch_index_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return set()
        return None
    def __set_near_by_trade_progress_indexes(self, code, buy_single_index, indexes):
        if indexes:
@@ -1019,8 +985,8 @@
        return None
    def del_watch_index(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_index_cache, code)
        RedisUtils.delete_async(self.__db, f"l_cancel_watch_index-{code}")
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_index_info_cache, code)
        RedisUtils.delete_async(self.__db, f"l_cancel_watch_index_info-{code}")
    def clear(self, code=None):
        if code:
@@ -1029,9 +995,9 @@
                self.__real_place_order_index_dict.pop(code)
                RedisUtils.delete_async(self.__db, f"l_cancel_real_place_order_index-{code}")
        else:
            keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_watch_index-*")
            keys = RedisUtils.keys(self.__get_redis(), f"l_cancel_watch_index_info-*")
            for k in keys:
                code = k.replace("l_cancel_watch_index-", "")
                code = k.replace("l_cancel_watch_index_info-", "")
                if code in self.__last_trade_progress_dict:
                    self.__last_trade_progress_dict.pop(code)
                if code in self.__real_place_order_index_dict:
@@ -1041,8 +1007,23 @@
            for k in keys:
                RedisUtils.delete(self.__get_redis(), k)
        # 重新计算L上
    def re_compute_l_down_watch_indexes(self, code):
        watch_index_info = self.__cancel_watch_index_info_cache.get(code)
        if not watch_index_info or watch_index_info[1] > 0:
            return
        # 获取成交进度位与真实下单位置
        real_place_order_index = self.__real_place_order_index_dict.get(code)
        last_trade_progress_index = self.__last_trade_progress_dict.get(code)
        if not real_place_order_index or not last_trade_progress_index:
            return
        self.compute_watch_index(code, watch_index_info[0], last_trade_progress_index + 1, real_place_order_index,
                                 re_compute=1)
    # 计算观察索引,倒序计算
    def compute_watch_index(self, code, buy_single_index, start_index, end_index):
    # re_compute:是否是重新计算的
    def compute_watch_index(self, code, buy_single_index, start_index, end_index, re_compute=0):
        try:
            l2_log.l_cancel_debug(code, f"计算L后囊括范围:{start_index}-{end_index}")
            total_datas = local_today_datas.get(code)
@@ -1145,8 +1126,8 @@
                            if left_count > 0:
                                watch_indexes.add(i)
                                break
                    self.__set_watch_indexes(code, buy_single_index, watch_indexes)
                    l2_log.l_cancel_debug(code, f"设置监听范围, 数据范围:{re_start_index}-{end_index} 监听范围-{watch_indexes}")
                    self.__set_watch_indexes(code, buy_single_index, re_compute, watch_indexes)
                    l2_log.l_cancel_debug(code, f"设置监听范围{ '(重新计算)' if re_compute else ''}, 数据范围:{re_start_index}-{end_index} 监听范围-{watch_indexes}")
        except Exception as e:
            l2_log.l_cancel_debug(code, f"计算L后囊括范围出错:{str(e)}")
            async_log_util.exception(logger_l2_l_cancel, e)
@@ -1244,9 +1225,10 @@
    # 已经成交的索引
    def add_deal_index(self, code, index, buy_single_index):
        watch_indexes = self.__get_watch_indexes_cache(code)
        if not watch_indexes:
        watch_indexes_info = self.__get_watch_indexes_cache(code)
        if not watch_indexes_info:
            return
        watch_indexes = watch_indexes_info[2]
        if index not in watch_indexes:
            return
        if buy_single_index is None:
@@ -1256,7 +1238,7 @@
        if real_place_order_index and real_place_order_index > index:
            total_datas = local_today_datas.get(code)
            min_num = int(5000 / (float(gpcode_manager.get_limit_up_price(code))))
            for j in range(index + 1 , real_place_order_index):
            for j in range(index + 1, real_place_order_index):
                data = total_datas[j]
                val = data['val']
                if data["index"] in watch_indexes:
@@ -1274,13 +1256,13 @@
                if left_count > 0:
                    watch_indexes.add(data["index"])
                    break
        self.__set_watch_indexes(code, buy_single_index, watch_indexes)
        self.__set_watch_indexes(code, watch_indexes_info[0], watch_indexes_info[1], watch_indexes)
    def __compute_need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, is_first_code):
        watch_indexes = self.__get_watch_indexes_cache(code)
        if not watch_indexes:
        watch_indexes_info = self.__get_watch_indexes_cache(code)
        if not watch_indexes_info:
            return False, None
        watch_indexes = set([int(i) for i in watch_indexes])
        watch_indexes = set([int(i) for i in watch_indexes_info[2]])
        # 计算监听的总条数
        total_num = 0
        for wi in watch_indexes:
@@ -1338,7 +1320,6 @@
        # 监听范围小于5笔不生效
        if len(watch_indexes) < 5:
            return False, None
        # 计算监听的总条数
        # 权重
@@ -1422,8 +1403,8 @@
    # L后是否还有可能撤单
    def __is_l_down_can_cancel(self, code):
        watch_indexes = self.__get_watch_indexes_cache(code)
        if not watch_indexes:
        watch_indexes_info = self.__get_watch_indexes_cache(code)
        if not watch_indexes_info:
            return True
        trade_index = self.__last_trade_progress_dict.get(code)
        if trade_index is None:
@@ -1432,7 +1413,7 @@
        total_datas = local_today_datas.get(code)
        total_deal_nums = 0
        total_nums = 1
        for index in watch_indexes:
        for index in watch_indexes_info[2]:
            data = total_datas[index]
            val = data["val"]
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,