Administrator
2024-11-18 e240fb424005fd880be8cb3a5f91a7c1e163e3b5
third_data/code_plate_key_manager.py
@@ -10,18 +10,15 @@
import constant
from db.redis_manager_delegate import RedisUtils
from third_data import kpl_block_util, kpl_api, kpl_util, kpl_data_constant, huaxin_l1_data_manager
from third_data import kpl_block_util, kpl_api, kpl_util
from settings.trade_setting import MarketSituationManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_constant import LimitUpDataConstant
from third_data.third_blocks_manager import BlockMapManager, CodeThirdBlocksManager
from trade.radical_buy_data_manager import RedicalBuyDataManager
from utils import global_util, tool, buy_condition_util, init_data_util
from log_module import log, async_log_util
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager
from third_data.third_blocks_manager import BlockMapManager
from utils import global_util, tool, buy_condition_util
from log_module import async_log_util
from db import redis_manager_delegate as redis_manager
from log_module.log import logger_kpl_block_can_buy, logger_debug
from log_module.log import logger_kpl_block_can_buy, logger_kpl_jx_out, logger_kpl_jx_in
from third_data.kpl_util import KPLPlatManager
from trade import trade_manager, l2_trade_util, trade_constant
@@ -126,6 +123,8 @@
                        async_log_util.info(logger_kpl_block_can_buy,
                                            f"{code}:获取到精选板块-{blocks}  耗时:{int(time.time() - start_time)}s")
                        self.save_jx_blocks(code, blocks, current_limit_up_blocks)
                        # 跟随精选板块一起更新
                        self.load_jx_blocks_radical(code)
                    else:
                        # 还没涨停的需要更新精选板块 更新精选板块
                        if abs(float(buy_1_price) - float(limit_up_price)) >= 0.001:
@@ -146,6 +145,8 @@
                                async_log_util.info(logger_kpl_block_can_buy,
                                                    f"{code}:获取到精选板块(更新)-{blocks}  耗时:{int(time.time() - start_time)}s")
                                self.save_jx_blocks(code, blocks, current_limit_up_blocks)
                                # 跟随精选板块一起更新
                                self.load_jx_blocks_radical(code)
                elif price_rate > 0.03:
                    # 添加备用板块
                    if not self.get_jx_blocks_cache(code, by=True):
@@ -154,19 +155,24 @@
                        self.save_jx_blocks(code, blocks, current_limit_up_blocks, by=True)
                        async_log_util.info(logger_kpl_block_can_buy,
                                            f"{code}:获取到精选板块(备用)-{blocks}  耗时:{int(time.time() - start_time)}s")
                        # 跟随精选板块一起更新
                        self.load_jx_blocks_radical(code)
                if price_rate > 0.03:
                    if not self.__code_blocks_for_radical_buy.get(code):
                        start_time = time.time()
                        blocks = kpl_api.getCodeJingXuanBlocks(code, jx=False)
                        blocks = set([b[1] for b in blocks])
                        # fblocks = BlockMapManager().filter_blocks(blocks)
                        async_log_util.info(logger_kpl_block_can_buy,
                                            f"{code}:获取到板块(激进买) 过滤前-{blocks} 耗时:{int(time.time() - start_time)}s")
                        self.save_jx_blocks_for_radical_buy(code, list(blocks))
                        self.load_jx_blocks_radical(code)
        except Exception as e:
            logger_kpl_block_can_buy.error(f"{code} 获取板块出错")
            logger_kpl_block_can_buy.exception(e)
    def load_jx_blocks_radical(self, code):
        start_time = time.time()
        blocks = kpl_api.getCodeJingXuanBlocks(code, jx=False)
        blocks = set([b[1] for b in blocks])
        # fblocks = BlockMapManager().filter_blocks(blocks)
        async_log_util.info(logger_kpl_block_can_buy,
                            f"{code}:获取到板块(激进买) 过滤前-{blocks} 耗时:{int(time.time() - start_time)}s")
        self.save_jx_blocks_for_radical_buy(code, list(blocks))
# 禁止下单的板块
@@ -284,12 +290,6 @@
    def get_today_limit_up_reason(cls, code):
        return cls.__today_total_limit_up_reason_dict.get(code)
    # 今日涨停原因变化
    def set_today_limit_up_reason_change(self, code, from_reason, to_reason):
        RedisUtils.sadd(self.__get_redis(), f"kpl_limit_up_reason_his-{code}", from_reason)
        RedisUtils.expire(self.__get_redis(), f"kpl_limit_up_reason_his-{code}", tool.get_expire())
        self.__set_total_keys(code)
    # 设置代码的今日涨停原因
    def __set_total_keys(self, code):
        keys = set()
@@ -341,31 +341,115 @@
    __KPLPlateForbiddenManager = KPLPlateForbiddenManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __KPLPlatManager = KPLPlatManager()
    # 精选流入前几
    __top_jx_blocks = set()
    # 精选流出前几
    __top_jx_out_blocks = set()
    @classmethod
    def set_top_5_reasons(cls, datas):
        temp_list = []
        for d in datas:
            cls.total_reason_dict[d[1]] = d
        # 排序
        for i in range(0, len(datas)):
            if datas[i][1] not in constant.KPL_INVALID_BLOCKS:
                # (名称,净流入金额,排名)
                temp_list.append((datas[i][1], datas[i][3], len(temp_list)))
                # 只获取前10个
                if len(temp_list) > 10:
                    break
                if datas[i][3] < 3 * 10000 * 10000:
                    break
    def set_market_jingxuan_blocks(cls, datas):
        """
        设置精选流入数据
        @param datas:
        @return:
        """
        # blocks = set()
        # 老版本实现方式
        # for data in datas:
        #     if data[3] <= 0:
        #         break
        #     blocks.add(data[1])
        # cls.__top_jx_blocks = blocks
        if True:
            return
        blocks = set()
        for data in datas:
            if data[1] in constant.KPL_INVALID_BLOCKS:
                continue
            if data[3] < 5e7:
                continue
            blocks.add(data[1])
            if len(blocks) >= 10:
                break
            blocks.add(kpl_util.filter_block(data[1]))
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_in, f"原数据:{datas[:20]} 板块:{blocks}")
        cls.__top_jx_blocks = BlockMapManager().filter_blocks(blocks)
        for temp in temp_list:
            names = cls.__KPLPlatManager.get_same_plat_names_by_id(temp[0])
            for name in names:
                if name == temp[1]:
                    continue
                temp_list.append((name, temp[1], temp[2]))
        cls.top_5_reason_list = temp_list
        cls.__reset_top_5_dict()
    @classmethod
    def set_market_jingxuan_blocks_from_custom(cls, datas):
        """
        设置自定义精选流入数据
        @param datas:[(板块,流入金额)]
        @return:
        """
        blocks = set()
        for data in datas:
            if data[0] in constant.KPL_INVALID_BLOCKS:
                continue
            if data[1] < 5e7:
                continue
            blocks.add(data[0])
            if len(blocks) >= 10:
                break
            blocks.add(kpl_util.filter_block(data[0]))
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_in, f"原数据:{datas[:20]} 板块:{blocks}")
        cls.__top_jx_blocks = BlockMapManager().filter_blocks(blocks)
    @classmethod
    def set_market_jingxuan_out_blocks(cls, datas):
        """
        设置精选流出数据
        @param datas:
        @return:
        """
        if True:
            return
        blocks = set()
        for data in datas:
            if data[1] in constant.KPL_INVALID_BLOCKS:
                continue
            if data[3] > -5e7:
                # 过滤5千万以上的
                break
            blocks.add(data[1])
            if len(blocks) >= 10:
                break
            blocks.add(kpl_util.filter_block(data[1]))
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_out, f"原数据:{datas[:10]} 板块:{blocks}")
        cls.__top_jx_out_blocks = BlockMapManager().filter_blocks(blocks)
    @classmethod
    def set_market_jingxuan_out_blocks_from_custom(cls, datas):
        """
        设置自定义精选流出数据
        @param datas:[(板块,流入金额)]
        @return:
        """
        blocks = set()
        for data in datas:
            if data[0] in constant.KPL_INVALID_BLOCKS:
                continue
            if data[1] > -5e7:
                # 过滤5千万以上的
                break
            blocks.add(data[0])
            if len(blocks) >= 10:
                break
            blocks.add(kpl_util.filter_block(data[0]))
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_out, f"原数据:{datas[:10]} 板块:{blocks}")
        cls.__top_jx_out_blocks = BlockMapManager().filter_blocks(blocks)
    @classmethod
    def get_top_market_jingxuan_blocks(cls):
        return cls.__top_jx_blocks
    @classmethod
    def get_top_market_jingxuan_out_blocks(cls):
        return cls.__top_jx_out_blocks
    @classmethod
    def set_top_5_industry(cls, datas):
@@ -398,35 +482,6 @@
        temp_set = cls.top_5_key_dict.keys()
        return temp_set
    # 通过关键字判断能买的代码数量
    @classmethod
    def get_can_buy_codes_count(cls, code, key):
        # 判断行业涨停票数量,除开自己必须大于1个
        temp_codes = LimitUpCodesPlateKeyManager.total_key_codes_dict.get(key)
        if temp_codes is None:
            temp_codes = set()
        else:
            temp_codes = set(temp_codes)
        temp_codes.discard(code)
        if len(temp_codes) < 1:
            # 后排才能挂单
            return 0, "身位不为后排"
        forbidden_plates = cls.__KPLPlateForbiddenManager.list_all_cache()
        if key in forbidden_plates:
            return 0, "不买该板块"
        # 10:30以前可以挂2个单
        if int(tool.get_now_time_str().replace(':', '')) < int("100000"):
            return 2, "10:00以前可以挂2个单"
        # 10:30以后
        if key not in cls.top_5_key_dict:
            return 0, "净流入没在前5"
        if cls.top_5_key_dict[key][1] > 3 * 10000 * 10000:
            return 2, "净流入在前5且大于3亿"
        else:
            return 1, "净流入在前5"
    @classmethod
    def is_in_top(cls, keys):
        reasons = cls.get_can_buy_key_set()
@@ -446,6 +501,14 @@
    __history_limit_up_reason_dict = {}
    # 板块
    __blocks_dict = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodesHisReasonAndBlocksManager, cls).__new__(cls, *args, **kwargs)
        return cls.__instance
    def __get_redis(self):
        return self.__redisManager.getRedis()
@@ -529,6 +592,14 @@
        except:
            pass
        return set()
    def get_history_blocks_cache(self, code):
        """
        获取180天的历史涨停原因缓存
        @param code:
        @return:
        """
        return self.__history_blocks_dict_cache.get(code)
# 目标代码板块关键词管理
@@ -908,7 +979,13 @@
            current_limit_up_datas = []
        # 获取目标代码板块
        keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code)
        # keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code)
        keys = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
        if not keys:
            keys = set()
        keys = BlockMapManager().filter_blocks(keys)
        if keys:
            keys -= constant.KPL_INVALID_BLOCKS
        # log.logger_kpl_debug.info("{}最终关键词:{}", code, keys)
@@ -917,6 +994,7 @@
        fresults = []
        if not keys:
            return fresults, set()
        code_limit_up_reasons_dict = {}
        load_code_block()
        for block in keys:
@@ -1085,366 +1163,6 @@
        # 保存板块计算结果
        cls.__can_buy_compute_result_dict[code] = (
            can_buy_blocks, unique, msg, can_buy_strong_blocks, keys, active_buy_blocks)
class RadicalBuyBlockManager:
    """
    激进买板块管理
    """
    __TargetCodePlateKeyManager = TargetCodePlateKeyManager()
    # 上次的涨停代码
    __last_limit_up_codes = set()
    # 记录代码的涨停时间
    __limit_up_time_dict = {}
    # 炸板代码的总涨停时间
    __total_limit_up_space_dict = {}
    # 当前涨停的代码
    __current_limit_up_codes = set()
    @classmethod
    def set_current_limit_up_datas(cls, current_limit_up_datas):
        # 查询当前的涨停代码集合
        codes = set([d[0] for d in current_limit_up_datas])
        cls.__current_limit_up_codes = codes
        try:
            # 炸板代码
            break_limit_up_codes = cls.__last_limit_up_codes - codes
            # 新涨停的代码
            new_limit_up_codes = codes - cls.__last_limit_up_codes
            if new_limit_up_codes:
                for code in new_limit_up_codes:
                    if code not in cls.__limit_up_time_dict:
                        cls.__limit_up_time_dict[code] = time.time()
            if break_limit_up_codes:
                # 记录总涨停时间
                for bc in break_limit_up_codes:
                    if bc in cls.__limit_up_time_dict:
                        space = tool.trade_time_sub(tool.get_now_time_str(),
                                                    tool.to_time_str(cls.__limit_up_time_dict[bc]))
                        if bc not in cls.__total_limit_up_space_dict:
                            cls.__total_limit_up_space_dict[bc] = 0
                        cls.__total_limit_up_space_dict[bc] = cls.__total_limit_up_space_dict[bc] + space
                        logger_debug.info(f"炸板代码涨停时间:{bc}-{cls.__total_limit_up_space_dict[bc]}")
                        cls.__limit_up_time_dict.pop(bc)
        except Exception as e:
            logger_debug.exception(e)
        finally:
            cls.__last_limit_up_codes = codes
        cls.compute_open_limit_up_code_dict_for_radical_buy(current_limit_up_datas)
    @classmethod
    def compute_open_limit_up_code_dict_for_radical_buy(cls, current_limit_up_datas):
        """
        计算开1的代码信息,不包含5板以上的
        @param current_limit_up_datas:
        @return:
        """
        timestamp_start, timestamp_end = kpl_block_util.open_limit_up_time_range
        temp_dict = {}
        for d in current_limit_up_datas:
            code = d[0]
            # d: (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额,涨停原因代码,涨停原因代码数量)
            # 计算是否开1
            if int(d[2]) >= timestamp_end or int(d[2]) < timestamp_start:
                continue
            buy1_money = huaxin_l1_data_manager.get_buy1_money(code)
            # 买1是否大于5000w
            if not constant.TEST:
                if not buy1_money or buy1_money < 5e7:
                    continue
            if not tool.is_can_buy_code(code):
                continue
            blocks = {d[5]}
            if d[6]:
                blocks |= set(d[6].split("、"))
            blocks -= constant.KPL_INVALID_BLOCKS
            # 过滤
            blocks = BlockMapManager().filter_blocks(blocks)
            temp_dict[code] = (kpl_util.get_high_level_count(d[4]), blocks)
        kpl_data_constant.open_limit_up_code_dict_for_radical_buy = temp_dict
    @classmethod
    def __get_current_index(cls, code, block, yesterday_limit_up_codes, exclude_codes=None):
        """
        获取当前涨停身位
        @param code:
        @param block:
        @param yesterday_limit_up_codes:
        @return: 索引,前排代码信息([(代码, 涨停时间)])
        """
        if exclude_codes is None:
            exclude_codes = set()
        current_index = 0
        block_codes_infos = []
        timestamp_start, timestamp_end = kpl_block_util.open_limit_up_time_range
        limit_up_time = time.time()
        for k in LimitUpDataConstant.current_limit_up_datas:
            _code = k[0]
            if _code in exclude_codes:
                continue
            blocks = LimitUpDataConstant.get_blocks_with_history(_code)
            if not blocks:
                blocks = set()
            blocks = BlockMapManager().filter_blocks(blocks)
            if _code == code:
                # 获取当前代码涨停时间
                limit_up_time = int(k[2])
                continue
            # 不是这个板块
            if block not in blocks:
                continue
            if not tool.is_can_buy_code(_code):
                continue
            # 剔除开1的数据
            if timestamp_start <= int(k[2]) < timestamp_end:
                continue
            # 剔除高位板
            if _code in yesterday_limit_up_codes:
                continue
            # 代码.涨停时间
            block_codes_infos.append((_code, int(k[2])))
        block_codes_infos.append((code, limit_up_time))
        block_codes_infos.sort(key=lambda x: x[1])
        before_codes_info = []
        for i in range(0, len(block_codes_infos)):
            if block_codes_infos[i][0] == code:
                current_index = i
                break
            else:
                before_codes_info.append(block_codes_infos[i])
        return current_index, before_codes_info
    @classmethod
    def __get_history_index(cls, code, block, yesterday_limit_up_codes, exclude_codes=None):
        """
        获取历史涨停身位
        @param code:
        @param block:
        @param current_limit_up_datas: 昨日涨停代码
        @param current_limit_up_codes: 目前的涨停代码
        @return:
        """
        if exclude_codes is None:
            exclude_codes = set()
        history_index = 0
        block_codes_infos = []
        # 开1时间范围
        timestamp_start, timestamp_end = kpl_block_util.open_limit_up_time_range
        limit_up_time = time.time()
        limit_up_space_ge_60s_codes = set()
        for k in LimitUpDataConstant.history_limit_up_datas:
            _code = k[3]
            if _code in exclude_codes:
                continue
            blocks = LimitUpDataConstant.get_blocks_with_history(_code)
            blocks = BlockMapManager().filter_blocks(blocks)
            if _code == code:
                # 获取当前代码涨停时间
                limit_up_time = int(k[5])
                continue
            # 不是这个板块
            if block not in blocks:
                continue
            if not tool.is_can_buy_code(_code):
                continue
            # 剔除开1的数据
            if timestamp_start <= int(k[5]) < timestamp_end:
                continue
            # 剔除高位板
            if _code in yesterday_limit_up_codes:
                continue
            # 剔除炸板代码持续涨停时间小于1分钟的代码 且 只能用于不排除前2条数据
            if _code not in cls.__current_limit_up_codes and _code in cls.__total_limit_up_space_dict and \
                    cls.__total_limit_up_space_dict[_code] < 60 and not exclude_codes and len(
                limit_up_space_ge_60s_codes) < 3:
                limit_up_space_ge_60s_codes.add(_code)
                continue
            # 代码,涨停时间
            block_codes_infos.append((_code, int(k[5])))
        block_codes_infos.append((code, limit_up_time))
        block_codes_infos.sort(key=lambda x: x[1])
        before_codes_info = []
        for i in range(0, len(block_codes_infos)):
            if block_codes_infos[i][0] == code:
                history_index = i
                break
            else:
                before_codes_info.append(block_codes_infos[i])
        return history_index, before_codes_info
    @classmethod
    def __is_radical_buy_with_open_limitup(cls, code, block, yesterday_limit_up_codes):
        """
        是否需要激进买(某个板块开1)
        1.有>=2个开1买老2
        2.有1个开1的买老3
        @param code:
        @param block:
        @param yesterday_limit_up_codes 昨日涨停代码
        @return:
        """
        # 9:45点之前涨停的才能买入
        # 获取当前代码的涨停时间
        limit_up_timestamp = LimitUpDataConstant.get_first_limit_up_time(code)
        if not limit_up_timestamp:
            limit_up_timestamp = time.time()
        if int(tool.timestamp_format(limit_up_timestamp, "%H%M%S")) > 94500:
            return False, "超过生效时间"
        # 根据板块聚合数据
        open_limit_up_block_codes_dict = {}
        for c in kpl_data_constant.open_limit_up_code_dict_for_radical_buy:
            blocks = kpl_data_constant.open_limit_up_code_dict_for_radical_buy[c][1]
            for b in blocks:
                if b not in open_limit_up_block_codes_dict:
                    open_limit_up_block_codes_dict[b] = set()
                open_limit_up_block_codes_dict[b].add(c)
        if block not in open_limit_up_block_codes_dict:
            return False, "板块未开1"
        open_limit_up_block_codes = list(open_limit_up_block_codes_dict.get(block))
        count = len(open_limit_up_block_codes)
        # ----获取历史身位----
        history_index, history_before_codes_info = cls.__get_history_index(code, block, yesterday_limit_up_codes)
        # ----获取实时身位----
        current_index, current_before_codes_info = cls.__get_current_index(code, block, yesterday_limit_up_codes)
        exclude_codes = set()
        if count >= 2 or (
                count == 1 and kpl_data_constant.open_limit_up_code_dict_for_radical_buy[open_limit_up_block_codes[0]][
            0] == 2):
            # 开始数量大于2个或者只有一个2板开1
            exclude_codes.clear()
        else:
            # 获取包含高位板的身位
            # ----获取历史身位----
            history_index, history_before_codes_info = cls.__get_history_index(code, block, set())
            # ----获取实时身位----
            current_index, current_before_codes_info = cls.__get_current_index(code, block, set())
            if history_before_codes_info and current_before_codes_info and history_before_codes_info[0][0] == \
                    current_before_codes_info[0][0]:
                # 前排第一个元素无炸板
                exclude_codes = {history_before_codes_info[0][0]}
            else:
                return False, f"开1数量:{count},历史-{history_index + 1} 实时-{current_index + 1}"
        # 获取主板的身位
        history_index, history_before_codes_info = cls.__get_history_index(code, block,
                                                                           yesterday_limit_up_codes,
                                                                           exclude_codes=exclude_codes)
        # 买首板老大/老二
        # 首板老大不能买时可买老二
        if history_index > 1:
            return False, f"开1数量:{count},非开1首板身位不匹配:历史-{history_index + 1} 实时-{current_index + 1}"
        if history_index == 1:
            # 当前代码为老2,要判断老大是否可买
            if RedicalBuyDataManager.can_buy(history_before_codes_info[0][0])[0]:
                return False, f"开1数量:{count},前排代码可买:{history_before_codes_info[0]}"
            return True, f"开1数量:{count},前排代码不可买:{history_before_codes_info[0]}"
        return True, f"开1数量:{count},历史-{history_index + 1} 实时-{current_index + 1}"
    @classmethod
    def __is_radical_buy_with_block_up(cls, code, block, yesterday_limit_up_codes):
        """
        是否激进买(板块突然涨起来)
        1.老大和老二的涨停时间相差5分钟内
        2.老三的涨停时间距离老大涨停在10分钟内就买
        3.前2个票不能炸板(历史身位与现在身位一致)
        4.除开前两个代码可买老1与老2
        5.买老2的情况:老1不满足买入条件
        @param code:
        @param block:
        @param yesterday_limit_up_codes:
        @return:
        """
        # 获取当前的板块
        current_index, current_before_codes_info = cls.__get_current_index(code, block, set())
        current_before_codes = [x[0] for x in current_before_codes_info]
        if len(current_before_codes_info) < 2:
            return False, f"前排代码小于2个:{current_before_codes_info}"
        if current_before_codes_info[0][1] < kpl_block_util.open_limit_up_time_range[1]:
            return False, f"有开1:{current_before_codes_info}"
        # 老大,老二必须相隔5分钟内
        if current_before_codes_info[1][1] - current_before_codes_info[0][1] >= 5 * 60:
            return False, f"老大老二涨停时间必须间隔5分钟内"
        # 获取当前代码的涨停时间
        limit_up_timestamp = LimitUpDataConstant.get_first_limit_up_time(code)
        if not limit_up_timestamp:
            limit_up_timestamp = time.time()
        if limit_up_timestamp - current_before_codes_info[0][1] >= 15 * 60:
            return False, f"距离老大涨停已过去10分钟({current_before_codes_info[0]})"
        history_index, history_before_codes_info = cls.__get_history_index(code, block, set())
        history_before_codes = [x[0] for x in history_before_codes_info]
        # 前两个代码是否有炸板
        dif_codes = set(history_before_codes[:2]) - set(current_before_codes[:2])
        if dif_codes:
            return False, f"前2代码有炸板:{dif_codes}"
        # 不计算前2的代码
        exclude_codes = set(current_before_codes[:2])
        history_index, history_before_codes_info = cls.__get_history_index(code, block, yesterday_limit_up_codes,
                                                                           exclude_codes)
        if history_index > 1:
            return False, f"排除前2,目标代码位于历史身位-{history_index + 1},前排代码:{history_before_codes_info}"
        if history_index == 1:
            # 首板老2,判断前面的老大是否是属于不能买的范畴
            pre_code = history_before_codes_info[0][0]
            # pre_code不能买,才能买
            if RedicalBuyDataManager.can_buy(pre_code)[0]:
                return False, f"前排代码可买:{pre_code}"
        return True, "满足买入需求"
    @classmethod
    def is_radical_buy(cls, code, yesterday_limit_up_codes):
        """
        是否是激进买
        @param code:
        @return: {激进买的板块}, 原因
        """
        # 计算
        # 获取开1的板块
        open_limit_up_code_dict = kpl_data_constant.open_limit_up_code_dict_for_radical_buy
        open_limit_up_blocks = set()
        if open_limit_up_code_dict:
            for c in open_limit_up_code_dict:
                open_limit_up_blocks |= open_limit_up_code_dict[c][1]
        # 获取代码的板块
        keys_ = KPLCodeJXBlockManager().get_jx_blocks_radical(code)
        if not keys_:
            return set(), "没获取到板块"
        keys_ = BlockMapManager().filter_blocks(keys_)
        if not keys_:
            return set(), "过滤后没获取到板块"
        # 获取交集
        keys_ = CodeThirdBlocksManager().get_intersection_blocks_info(code, keys_)
        if not keys_:
            return set(), "没获取到板块交集"
        match_blocks = open_limit_up_blocks & keys_
        can_buy_blocks = set()
        fmsges = []
        msges = []
        for b in match_blocks:
            # 判断板块是否该激进买
            result = cls.__is_radical_buy_with_open_limitup(code, b, yesterday_limit_up_codes)
            if result[0]:
                can_buy_blocks.add(b)
            msges.append(f"【{b}】:{result[1]}")
        fmsges.append("开1判断##" + ",".join(msges))
        if not can_buy_blocks:
            msges.clear()
            for b in keys_:
                # 板块快速启动
                result = cls.__is_radical_buy_with_block_up(code, b, yesterday_limit_up_codes)
                if result[0]:
                    can_buy_blocks.add(b)
                msges.append(f"【{b}】:{result[1]}")
            fmsges.append("板块快速启动判断##" + ",".join(msges))
        return can_buy_blocks, " **** ".join(fmsges)
if __name__ == "__main__":