Administrator
2025-05-26 b51b2ae184fad5aaf37a78903987e064f192d430
third_data/code_plate_key_manager.py
@@ -5,23 +5,27 @@
# 涨停代码关键词板块管理
import copy
import datetime
import itertools
import json
import time
import constant
from code_attribute import gpcode_manager
from db.redis_manager_delegate import RedisUtils
from third_data import kpl_block_util, kpl_api, kpl_util, kpl_data_constant, huaxin_l1_data_manager
from third_data import kpl_block_util, kpl_api, kpl_util
from settings.trade_setting import MarketSituationManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_constant import LimitUpDataConstant
from utils import global_util, tool, buy_condition_util, init_data_util
from log_module import log, async_log_util
from db import redis_manager_delegate as redis_manager
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, ContainsLimitupCodesBlocksManager
from third_data.third_blocks_manager import BlockMapManager
from utils import global_util, tool, buy_condition_util
from log_module import async_log_util
from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data
from log_module.log import logger_kpl_block_can_buy, logger_debug
from log_module.log import logger_kpl_block_can_buy, logger_kpl_jx_out, logger_kpl_jx_in, logger_debug, \
    logger_kpl_latest_gaobiao
from third_data.kpl_util import KPLPlatManager
from trade import trade_manager, l2_trade_util, trade_constant
from trade import l2_trade_util, trade_constant
# 代码精选板块管理
from utils.kpl_data_db_util import KPLLimitUpDataUtil
@@ -33,16 +37,44 @@
    __code_blocks = {}
    # 备用
    __code_by_blocks = {}
    # 激进买的代码板块
    __code_blocks_for_radical_buy = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(KPLCodeJXBlockManager, cls).__new__(cls, *args, **kwargs)
            try:
                cls.__load_data()
            except Exception as e:
                logger_debug.exception(e)
        return cls.__instance
    def __get_redis(self):
        return self.__redisManager.getRedis()
    @classmethod
    def __load_data(cls):
        keys = RedisUtils.keys(cls.__get_redis(), "kpl_jx_blocks_by-*")
        if keys:
            for k in keys:
                val = RedisUtils.get(cls.__get_redis(), k)
                val = json.loads(val)
                cls.__code_by_blocks[k.split("-")[1]] = (val, time.time())
        keys = RedisUtils.keys(cls.__get_redis(), "kpl_jx_blocks-*")
        if keys:
            for k in keys:
                val = RedisUtils.get(cls.__get_redis(), k)
                val = json.loads(val)
                cls.__code_blocks[k.split("-")[1]] = (val, time.time())
        keys = RedisUtils.keys(cls.__get_redis(), "kpl_jx_blocks_radical-*")
        if keys:
            for k in keys:
                val = RedisUtils.get(cls.__get_redis(), k)
                val = json.loads(val)
                cls.__code_blocks_for_radical_buy[k.split("-")[1]] = (val, time.time())
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    def save_jx_blocks(self, code, blocks: list, current_limit_up_blocks: set, by=False):
        if not blocks:
@@ -63,28 +95,18 @@
            RedisUtils.setex_async(self.__db, f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(final_blocks))
            self.__code_blocks[code] = (final_blocks, time.time())
    # 获取精选板块
    def get_jx_blocks(self, code, by=False):
        if by:
            if code in self.__code_by_blocks:
                return self.__code_by_blocks[code]
            val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks_by-{code}")
            if val is None:
                return None
            else:
                val = json.loads(val)
                self.__code_by_blocks[code] = val
            return self.__code_by_blocks[code]
        else:
            if code in self.__code_blocks:
                return self.__code_blocks[code]
            val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks-{code}")
            if val is None:
                return None
            else:
                val = json.loads(val)
                self.__code_blocks[code] = val
            return self.__code_blocks[code]
    def save_jx_blocks_for_radical_buy(self, code, blocks: list):
        if not blocks:
            return
        RedisUtils.setex_async(self.__db, f"kpl_jx_blocks_radical-{code}", tool.get_expire(), json.dumps(blocks))
        self.__code_blocks_for_radical_buy[code] = (blocks, time.time())
    # 获取精选板块(激进买)
    def get_jx_blocks_radical(self, code):
        blocks_info = self.__code_blocks_for_radical_buy.get(code)
        if blocks_info:
            return set(blocks_info[0])
        return None
    def get_jx_blocks_cache(self, code, by=False):
        if by:
@@ -109,6 +131,8 @@
                        async_log_util.info(logger_kpl_block_can_buy,
                                            f"{code}:获取到精选板块-{blocks}  耗时:{int(time.time() - start_time)}s")
                        self.save_jx_blocks(code, blocks, current_limit_up_blocks)
                        # 跟随精选板块一起更新
                        self.load_jx_blocks_radical(code)
                    else:
                        # 还没涨停的需要更新精选板块 更新精选板块
                        if abs(float(buy_1_price) - float(limit_up_price)) >= 0.001:
@@ -129,8 +153,8 @@
                                async_log_util.info(logger_kpl_block_can_buy,
                                                    f"{code}:获取到精选板块(更新)-{blocks}  耗时:{int(time.time() - start_time)}s")
                                self.save_jx_blocks(code, blocks, current_limit_up_blocks)
                                # 跟随精选板块一起更新
                                self.load_jx_blocks_radical(code)
                elif price_rate > 0.03:
                    # 添加备用板块
                    if not self.get_jx_blocks_cache(code, by=True):
@@ -139,9 +163,24 @@
                        self.save_jx_blocks(code, blocks, current_limit_up_blocks, by=True)
                        async_log_util.info(logger_kpl_block_can_buy,
                                            f"{code}:获取到精选板块(备用)-{blocks}  耗时:{int(time.time() - start_time)}s")
                        # 跟随精选板块一起更新
                        self.load_jx_blocks_radical(code)
                if price_rate > 0.03:
                    if not self.__code_blocks_for_radical_buy.get(code):
                        self.load_jx_blocks_radical(code)
        except Exception as e:
            logger_kpl_block_can_buy.error(f"{code} 获取板块出错")
            logger_kpl_block_can_buy.exception(e)
    def load_jx_blocks_radical(self, code):
        start_time = time.time()
        blocks = kpl_api.getCodeJingXuanBlocks(code, jx=False)
        blocks = set([b[1] for b in blocks])
        # fblocks = BlockMapManager().filter_blocks(blocks)
        async_log_util.info(logger_kpl_block_can_buy,
                            f"{code}:获取到板块(激进买) 过滤前-{blocks} 耗时:{int(time.time() - start_time)}s")
        self.save_jx_blocks_for_radical_buy(code, list(blocks))
# 禁止下单的板块
@@ -188,8 +227,18 @@
# 开盘啦禁止交易板块管理
class KPLPlateForbiddenManager:
    __redisManager = redis_manager.RedisManager(3)
    """
    不能买的板块管理
    """
    __redis_manager = redis_manager.RedisManager(3)
    __kpl_forbidden_plates_cache = set()
    # 已经删除了的板块
    __deleted_kpl_forbidden_plates_cache = set()
    # 监控的高标板块代码字典:{"板块":{"代码1","代码2"}}
    __watch_block_high_codes = {}
    # 高标代码
    __watch_high_codes = set()
    __instance = None
@@ -203,24 +252,164 @@
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            __kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "kpl_forbidden_plates")
            cls.__kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "kpl_forbidden_plates")
            cls.__deleted_kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "deleted_kpl_forbidden_plates")
        finally:
            RedisUtils.realse(__redis)
        cls.__load_latest_gb()
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
        return cls.__redis_manager.getRedis()
    def save_plate(self, plate):
        self.__kpl_forbidden_plates_cache.add(plate)
        RedisUtils.sadd(self.__get_redis(), "kpl_forbidden_plates", plate)
        RedisUtils.expire(self.__get_redis(), "kpl_forbidden_plates", tool.get_expire())
        self.__deleted_kpl_forbidden_plates_cache.discard(plate)
        RedisUtils.srem(self.__get_redis(), "deleted_kpl_forbidden_plates", plate)
        RedisUtils.expire(self.__get_redis(), "deleted_kpl_forbidden_plates", tool.get_expire())
    def delete_plate(self, plate):
        self.__kpl_forbidden_plates_cache.discard(plate)
        RedisUtils.srem(self.__get_redis(), "kpl_forbidden_plates", plate)
        RedisUtils.expire(self.__get_redis(), "kpl_forbidden_plates", tool.get_expire())
        self.__deleted_kpl_forbidden_plates_cache.add(plate)
        RedisUtils.sadd(self.__get_redis(), "deleted_kpl_forbidden_plates", plate)
        RedisUtils.expire(self.__get_redis(), "deleted_kpl_forbidden_plates", tool.get_expire())
    def list_all(self):
        return RedisUtils.smembers(self.__get_redis(), "kpl_forbidden_plates")
    def list_all_cache(self):
        return self.__kpl_forbidden_plates_cache
    def list_all_deleted_cache(self):
        return self.__deleted_kpl_forbidden_plates_cache
    def is_in_cache(self, plate):
        if self.__kpl_forbidden_plates_cache and plate in self.__kpl_forbidden_plates_cache:
            return True
        return False
    @classmethod
    def __load_latest_gb(cls):
        """
        加载最近的市场高标
        @return:
        """
        # 获取最近10个交易日涨停的涨停数据
        dates = HistoryKDatasUtils.get_latest_trading_date_cache(10)
        if not dates:
            return
        min_date = dates[-1]
        sql = f"SELECT r.`_code`, r.`_hot_block_name`, r.`_day`, r.`_open` FROM `kpl_limit_up_record` r WHERE r.`_day`>='{min_date}'"
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_all(sql)
        code_days_map = {}
        # 每炸板
        f_code_days_map = {}
        for r in results:
            if r[0] not in code_days_map:
                code_days_map[r[0]] = set()
            code_days_map[r[0]].add(r[2])
            if not r[3]:
                if r[0] not in f_code_days_map:
                    f_code_days_map[r[0]] = set()
                f_code_days_map[r[0]].add(r[2])
        # 过滤涨停次数>=3次的数据
        target_codes = set()
        for code in code_days_map:
            if f_code_days_map.get(code) and (len(f_code_days_map.get(code)) >= 4 or (
                    tool.is_ge_code(code) and len(f_code_days_map.get(code)) >= 2)):
                # 且有3天属于连续涨停
                day_list = list(code_days_map[code])
                day_list.sort(reverse=True)
                step = 3
                has_continue = False
                for i in range(0, len(day_list) - step + 1):
                    item_list = day_list[i:i + step]
                    # 是否属于连续涨停
                    is_sub = False
                    for j in range(0, len(dates) - step):
                        if f"{dates[j:j + step]}" == f"{item_list}":
                            is_sub = True
                            break
                    if is_sub:
                        has_continue = True
                        break
                if not has_continue:
                    continue
                has_big_deal = False
                # 最近10个交易日的成交额要大于10亿
                volumes_data = HistoryKDataManager().get_history_bars(code, dates[0])
                if volumes_data:
                    for d in volumes_data[:10]:
                        if d["amount"] > 10e8:
                            has_big_deal = True
                            break
                if not has_big_deal:
                    continue
                target_codes.add(code)
        # 代码对应的板块
        code_blocks = {}
        for r in results:
            if r[0] not in target_codes:
                continue
            if r[0] not in code_blocks:
                code_blocks[r[0]] = set()
            code_blocks[r[0]].add(kpl_util.filter_block(r[1]))
        # 所有板块对应的代码集合
        block_codes = {}
        for code in code_blocks:
            for b in code_blocks[code]:
                if b in constant.KPL_INVALID_BLOCKS:
                    continue
                if b not in block_codes:
                    block_codes[b] = set()
                block_codes[b].add(code)
        print(block_codes)
        cls.__watch_block_high_codes = block_codes
        logger_kpl_latest_gaobiao.info(f"{block_codes}")
        cls.__watch_high_codes.clear()
        for b in block_codes:
            cls.__watch_high_codes |= block_codes[b]
        for k in block_codes:
            print(k, [(x, gpcode_manager.get_code_name(x)) for x in block_codes[k]])
    def get_watch_high_codes(self):
        return self.__watch_high_codes
    def get_watch_high_codes_by_block(self, b):
        return self.__watch_block_high_codes.get(b)
    def compute(self, code_rate_dict: dict):
        """
        根据比例计算需要拉黑的代码
        @param code_rate_dict: 涨幅百分数
        @return:
        """
        try:
            if self.__watch_block_high_codes:
                forbidden_blocks = set()
                for b in self.__watch_block_high_codes:
                    total_rate = 0
                    for code in self.__watch_block_high_codes[b]:
                        if code in code_rate_dict:
                            total_rate += code_rate_dict.get(code)
                    average_rate = total_rate / len(self.__watch_block_high_codes[b])
                    if average_rate < 1:
                        forbidden_blocks.add(b)
                    # async_log_util.info(logger_debug, f"板块平均涨幅 {b}-{average_rate}")
                self.__kpl_forbidden_plates_cache = forbidden_blocks
                async_log_util.info(logger_debug, f"拉黑板块:{forbidden_blocks}")
        except Exception as e:
            logger_debug.exception(e)
class LimitUpCodesPlateKeyManager:
@@ -258,12 +447,6 @@
    @classmethod
    def get_today_limit_up_reason(cls, code):
        return cls.__today_total_limit_up_reason_dict.get(code)
    # 今日涨停原因变化
    def set_today_limit_up_reason_change(self, code, from_reason, to_reason):
        RedisUtils.sadd(self.__get_redis(), f"kpl_limit_up_reason_his-{code}", from_reason)
        RedisUtils.expire(self.__get_redis(), f"kpl_limit_up_reason_his-{code}", tool.get_expire())
        self.__set_total_keys(code)
    # 设置代码的今日涨停原因
    def __set_total_keys(self, code):
@@ -305,8 +488,10 @@
# 实时开盘啦市场数据
class RealTimeKplMarketData:
    # 精选前5
    top_5_reason_list = []
    # 流入缓存 [ID, 板块名称, 板块涨幅, 流入金额]
    top_in_list_cache = []
    # 流出缓存
    top_out_list_cache = []
    # 行业前5
    top_5_industry_list = []
    #
@@ -316,31 +501,156 @@
    __KPLPlateForbiddenManager = KPLPlateForbiddenManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __KPLPlatManager = KPLPlatManager()
    # 精选流入前几
    __top_jx_blocks = []
    # 精选流出前几
    __top_jx_out_blocks = []
    # 精选板块流入金额
    __jx_blocks_in_money_dict = {}
    # 市场行情热度,默认为60
    __market_strong = 60
    @classmethod
    def set_top_5_reasons(cls, datas):
        temp_list = []
        for d in datas:
            cls.total_reason_dict[d[1]] = d
        # 排序
        for i in range(0, len(datas)):
            if datas[i][1] not in constant.KPL_INVALID_BLOCKS:
                # (名称,净流入金额,排名)
                temp_list.append((datas[i][1], datas[i][3], len(temp_list)))
                # 只获取前10个
                if len(temp_list) > 10:
                    break
                if datas[i][3] < 3 * 10000 * 10000:
                    break
    def get_jingxuan_in_block_threshold_count(cls):
        """
        获取买精选流入前几
        @return:
        """
        score = 60
        if cls.__market_strong is not None:
            score = int(cls.__market_strong)
        for info in constant.RADICAL_BUY_TOP_IN_COUNT_BY_MARKET_STRONG:
            if info[0] <= score < info[1]:
                return info[2]
        return 10
        for temp in temp_list:
            names = cls.__KPLPlatManager.get_same_plat_names_by_id(temp[0])
            for name in names:
                if name == temp[1]:
                    continue
                temp_list.append((name, temp[1], temp[2]))
        cls.top_5_reason_list = temp_list
        cls.__reset_top_5_dict()
    @classmethod
    def set_market_jingxuan_blocks(cls, datas):
        """
        设置精选流入数据
        @param datas:[(板块编号,板块名称,涨幅, 板块流入金额)]
        @return:
        """
        # 流入阈值
        # THRESHOLD_MONEY = 50 * (tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") // 60) + 1000
        # THRESHOLD_MONEY = min(THRESHOLD_MONEY, 10000)
        # THRESHOLD_MONEY = max(THRESHOLD_MONEY, 1000)
        # THRESHOLD_MONEY = THRESHOLD_MONEY * 10000
        THRESHOLD_MONEY = 0
        # 最大数量
        # MAX_COUNT = cls.get_jingxuan_in_block_threshold_count()
        cls.top_in_list_cache = datas
        blocks = set()
        count = 0
        fblock_money = {}
        for data in datas:
            cls.__jx_blocks_in_money_dict[data[1]] = data[3]
            if data[1] in constant.KPL_INVALID_BLOCKS:
                continue
            if data[3] < THRESHOLD_MONEY:
                continue
            # 过滤出来为同一个板块就只算1个数量
            fb = BlockMapManager().filter_blocks({data[1]})
            if blocks & fb:
                continue
            for b in fb:
                fblock_money[b] = data[3]
            blocks |= fb
            # 如果该原因没有涨停票要往后移一位
            has_code = False
            for b in fb:
                if ContainsLimitupCodesBlocksManager().get_block_codes(b):
                    has_code = True
                    break
            if has_code:
                count += 1
                if count == 10:
                    strong = cls.get_market_strong()
                    if strong is None:
                        strong = 60
                    if data[3] > 3e7:
                        # 大于3千万
                        THRESHOLD_MONEY = int((1 - strong / 200) * data[3])
                    else:
                        THRESHOLD_MONEY = data[3]
            # if count >= MAX_COUNT:
            #     break
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_in, f"原数据:{datas[:50]} 板块:{blocks}")
        blocks = list(blocks)
        blocks.sort(key=lambda x: fblock_money.get(x), reverse=True)
        cls.__top_jx_blocks = blocks
    @classmethod
    def set_market_jingxuan_out_blocks(cls, datas):
        """
        设置精选流出数据
        @param datas:
        @return:
        """
        cls.top_out_list_cache = datas
        count = 0
        blocks = set()
        for data in datas:
            cls.__jx_blocks_in_money_dict[data[1]] = data[3]
            if data[1] in constant.KPL_INVALID_BLOCKS:
                continue
            if data[3] > -5e7:
                # 过滤5千万以上的
                break
            # 过滤出来为同一个板块就只算1个数量
            fb = BlockMapManager().filter_blocks({data[1]})
            if blocks & fb:
                continue
            blocks |= fb
            count += 1
            if count >= 10:
                break
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_out, f"原数据:{datas[:10]} 板块:{blocks}")
        cls.__top_jx_out_blocks = list(blocks)
    @classmethod
    def set_market_strong(cls, strong):
        """
        设置市场行情强度
        @param strong:
        @return:
        """
        cls.__market_strong = strong
    @classmethod
    def is_ignore_block_in_money(cls):
        if cls.__market_strong and cls.__market_strong >= constant.IGNORE_BLOCK_IN_MONEY_MARKET_STRONG:
            return True
        return False
    @classmethod
    def get_market_strong(cls):
        return cls.__market_strong
    @classmethod
    def get_top_market_jingxuan_blocks(cls):
        return cls.__top_jx_blocks
    @classmethod
    def get_top_market_jingxuan_out_blocks(cls):
        return cls.__top_jx_out_blocks
    @classmethod
    def get_block_info_at_block_in(cls, b):
        """
        获取板块的净流入情况
        @param b:
        @return: (板块名称,身位,流入金额)
        """
        for i in range(0, len(cls.top_in_list_cache)):
            if cls.top_in_list_cache[i][1] == b:
                return b, i, cls.top_in_list_cache[i][3]
        return b, -1, 0
    @classmethod
    def set_top_5_industry(cls, datas):
@@ -373,35 +683,6 @@
        temp_set = cls.top_5_key_dict.keys()
        return temp_set
    # 通过关键字判断能买的代码数量
    @classmethod
    def get_can_buy_codes_count(cls, code, key):
        # 判断行业涨停票数量,除开自己必须大于1个
        temp_codes = LimitUpCodesPlateKeyManager.total_key_codes_dict.get(key)
        if temp_codes is None:
            temp_codes = set()
        else:
            temp_codes = set(temp_codes)
        temp_codes.discard(code)
        if len(temp_codes) < 1:
            # 后排才能挂单
            return 0, "身位不为后排"
        forbidden_plates = cls.__KPLPlateForbiddenManager.list_all_cache()
        if key in forbidden_plates:
            return 0, "不买该板块"
        # 10:30以前可以挂2个单
        if int(tool.get_now_time_str().replace(':', '')) < int("100000"):
            return 2, "10:00以前可以挂2个单"
        # 10:30以后
        if key not in cls.top_5_key_dict:
            return 0, "净流入没在前5"
        if cls.top_5_key_dict[key][1] > 3 * 10000 * 10000:
            return 2, "净流入在前5且大于3亿"
        else:
            return 1, "净流入在前5"
    @classmethod
    def is_in_top(cls, keys):
        reasons = cls.get_can_buy_key_set()
@@ -413,6 +694,10 @@
        else:
            return False, None
    @classmethod
    def get_jx_block_in_money(cls, block):
        return cls.__jx_blocks_in_money_dict.get(block)
# 代码历史涨停原因与板块管理
class CodesHisReasonAndBlocksManager:
@@ -421,6 +706,14 @@
    __history_limit_up_reason_dict = {}
    # 板块
    __blocks_dict = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodesHisReasonAndBlocksManager, cls).__new__(cls, *args, **kwargs)
        return cls.__instance
    def __get_redis(self):
        return self.__redisManager.getRedis()
@@ -505,6 +798,14 @@
            pass
        return set()
    def get_history_blocks_cache(self, code):
        """
        获取180天的历史涨停原因缓存
        @param code:
        @return:
        """
        return self.__history_blocks_dict_cache.get(code)
# 目标代码板块关键词管理
class TargetCodePlateKeyManager:
@@ -558,6 +859,13 @@
        keys = keys - set(constant.KPL_INVALID_BLOCKS)
        return keys, k1, k11, k2, k3, k4
    def get_plate_keys_for_radical_buy(self, code):
        """
        激进买入的板块
        @param code:
        @return:
        """
class CodePlateKeyBuyManager:
    # 无板块
@@ -574,19 +882,7 @@
    __TargetCodePlateKeyManager = TargetCodePlateKeyManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __CodesHisReasonAndBlocksManager = CodesHisReasonAndBlocksManager()
    __CodesTradeStateManager = trade_manager.CodesTradeStateManager()
    __can_buy_compute_result_dict = {}
    @classmethod
    def __remove_from_l2(cls, code, msg):
        # 根据身位移除代码
        # return
        # 下过单的代码不移除
        if trade_manager.CodesTradeStateManager().get_trade_state_cache(code) != trade_constant.TRADE_STATE_NOT_TRADE:
            # 只要下过单的就不移除
            return
        l2_trade_util.forbidden_trade(code, msg=msg)
        logger_kpl_block_can_buy.info(msg)
    # 是否需要积极买
    @classmethod
@@ -876,7 +1172,13 @@
            current_limit_up_datas = []
        # 获取目标代码板块
        keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code)
        # keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code)
        keys = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
        if not keys:
            keys = set()
        keys = BlockMapManager().filter_blocks(keys)
        if keys:
            keys -= constant.KPL_INVALID_BLOCKS
        # log.logger_kpl_debug.info("{}最终关键词:{}", code, keys)
@@ -885,6 +1187,7 @@
        fresults = []
        if not keys:
            return fresults, set()
        code_limit_up_reasons_dict = {}
        load_code_block()
        for block in keys:
@@ -917,7 +1220,8 @@
    @classmethod
    def __compute_can_buy_blocks(cls, code, current_limit_up_datas, limit_up_record_datas,
                                 yesterday_current_limit_up_codes, before_blocks_dict,
                                 current_limit_up_block_codes_dict, high_level_general_code_blocks):
                                 current_limit_up_block_codes_dict, high_level_general_code_blocks, codes_delegate,
                                 codes_success):
        # 根据代码泛化板块获取泛化板块的代码集合
        high_level_general_block_codes = {}
        for c in high_level_general_code_blocks:
@@ -933,10 +1237,7 @@
                                                             high_level_general_block_codes)
        if not blocks_compute_results:
            return False, True, f"没有找到板块", [], keys, []
        codes_delegate = set(cls.__CodesTradeStateManager.get_codes_by_trade_states_cache(
            {trade_constant.TRADE_STATE_BUY_DELEGATED, trade_constant.TRADE_STATE_BUY_PLACE_ORDER}))
        codes_success = set(cls.__CodesTradeStateManager.get_codes_by_trade_states_cache(
            {trade_constant.TRADE_STATE_BUY_SUCCESS}))
        codes = codes_delegate | codes_success
        # 统计成交代码的板块
        trade_codes_blocks_dict = {}
@@ -1015,7 +1316,7 @@
    @classmethod
    def update_can_buy_blocks(cls, code, current_limit_up_datas, limit_up_record_datas,
                              latest_current_limit_up_records,
                              before_blocks_dict, current_limit_up_block_codes_dict):
                              before_blocks_dict, current_limit_up_block_codes_dict, delegate_codes, deal_codes):
        yesterday_current_limit_up_codes = set()
        yesterday_current_limit_up_records_dict = {}
        yesterday_current_limit_up_records = latest_current_limit_up_records[0][1]
@@ -1049,303 +1350,13 @@
                                                                                                                   yesterday_current_limit_up_codes,
                                                                                                                   before_blocks_dict,
                                                                                                                   current_limit_up_block_codes_dict,
                                                                                                                   high_level_general_code_blocks)
                                                                                                                   high_level_general_code_blocks,
                                                                                                                   delegate_codes,
                                                                                                                   deal_codes)
        # 保存板块计算结果
        cls.__can_buy_compute_result_dict[code] = (
            can_buy_blocks, unique, msg, can_buy_strong_blocks, keys, active_buy_blocks)
class RadicalBuyBlockManager:
    """
    激进买板块管理
    """
    __TargetCodePlateKeyManager = TargetCodePlateKeyManager()
    # 上次的涨停代码
    __last_limit_up_codes = set()
    # 记录代码的涨停时间
    __limit_up_time_dict = {}
    # 炸板代码的总涨停时间
    __total_limit_up_space_dict = {}
    # 当前涨停的代码
    __current_limit_up_codes = set()
    @classmethod
    def set_current_limit_up_datas(cls, current_limit_up_datas):
        # 查询当前的涨停代码集合
        codes = set([d[0] for d in current_limit_up_datas])
        cls.__current_limit_up_codes = codes
        try:
            # 炸板代码
            break_limit_up_codes = cls.__last_limit_up_codes - codes
            # 新涨停的代码
            new_limit_up_codes = codes - cls.__last_limit_up_codes
            if new_limit_up_codes:
                for code in new_limit_up_codes:
                    if code not in cls.__limit_up_time_dict:
                        cls.__limit_up_time_dict[code] = time.time()
            if break_limit_up_codes:
                # 记录总涨停时间
                for bc in break_limit_up_codes:
                    if bc in cls.__limit_up_time_dict:
                        space = tool.trade_time_sub(tool.get_now_time_str(),
                                                    tool.to_time_str(cls.__limit_up_time_dict[bc]))
                        if bc not in cls.__total_limit_up_space_dict:
                            cls.__total_limit_up_space_dict[bc] = 0
                        cls.__total_limit_up_space_dict[bc] = cls.__total_limit_up_space_dict[bc] + space
                        logger_debug.info(f"炸板代码涨停时间:{bc}-{cls.__total_limit_up_space_dict[bc]}")
                        cls.__limit_up_time_dict.pop(bc)
        except Exception as e:
            logger_debug.exception(e)
        finally:
            cls.__last_limit_up_codes = codes
        cls.compute_open_limit_up_code_dict_for_radical_buy(current_limit_up_datas)
    @classmethod
    def compute_open_limit_up_code_dict_for_radical_buy(cls, current_limit_up_datas):
        """
        计算开1的代码信息,不包含5板以上的
        @param current_limit_up_datas:
        @return:
        """
        timestamp_start, timestamp_end = kpl_block_util.open_limit_up_time_range
        temp_dict = {}
        for d in current_limit_up_datas:
            code = d[0]
            # d: (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额,涨停原因代码,涨停原因代码数量)
            # 计算是否开1
            if int(d[2]) >= timestamp_end or int(d[2]) < timestamp_start:
                continue
            buy1_money = huaxin_l1_data_manager.get_buy1_money(code)
            # 买1是否大于5000w
            if not constant.TEST:
                if not buy1_money or buy1_money < 5e7:
                    continue
            if not tool.is_can_buy_code(code):
                continue
            blocks = {d[5]}
            if d[6]:
                blocks |= set(d[6].split("、"))
            blocks -= constant.KPL_INVALID_BLOCKS
            temp_dict[code] = (kpl_util.get_high_level_count(d[4]), blocks)
        kpl_data_constant.open_limit_up_code_dict_for_radical_buy = temp_dict
    @classmethod
    def __get_current_index(cls, code, block, yesterday_limit_up_codes):
        """
        获取当前涨停身位
        @param code:
        @param block:
        @param yesterday_limit_up_codes:
        @return: 索引,前排代码信息([(代码, 涨停时间)])
        """
        current_index = 0
        block_codes_infos = []
        timestamp_start, timestamp_end = kpl_block_util.open_limit_up_time_range
        limit_up_time = time.time()
        for k in LimitUpDataConstant.current_limit_up_datas:
            _code = k[0]
            blocks = LimitUpDataConstant.get_blocks_with_history(_code)
            if not blocks:
                blocks = set()
            if _code == code:
                # 获取当前代码涨停时间
                limit_up_time = int(k[2])
                continue
            # 不是这个板块
            if block not in blocks:
                continue
            if not tool.is_can_buy_code(_code):
                continue
            # 剔除开1的数据
            if timestamp_start <= int(k[2]) < timestamp_end:
                continue
            # 剔除高位板
            if _code in yesterday_limit_up_codes:
                continue
            # 代码.涨停时间
            block_codes_infos.append((_code, int(k[2])))
        block_codes_infos.append((code, limit_up_time))
        block_codes_infos.sort(key=lambda x: x[1])
        before_codes_info = []
        for i in range(0, len(block_codes_infos)):
            if block_codes_infos[i][0] == code:
                current_index = i
                break
            else:
                before_codes_info.append(block_codes_infos[i])
        return current_index, before_codes_info
    @classmethod
    def __get_history_index(cls, code, block, yesterday_limit_up_codes):
        """
        获取历史涨停身位
        @param code:
        @param block:
        @param current_limit_up_datas: 昨日涨停代码
        @param current_limit_up_codes: 目前的涨停代码
        @return:
        """
        history_index = 0
        block_codes_infos = []
        # 开1时间范围
        timestamp_start, timestamp_end = kpl_block_util.open_limit_up_time_range
        limit_up_time = time.time()
        for k in LimitUpDataConstant.history_limit_up_datas:
            _code = k[3]
            blocks = LimitUpDataConstant.get_blocks_with_history(_code)
            if _code == code:
                # 获取当前代码涨停时间
                limit_up_time = int(k[5])
                continue
            # 不是这个板块
            if block not in blocks:
                continue
            if not tool.is_can_buy_code(_code):
                continue
            # 剔除开1的数据
            if timestamp_start <= int(k[5]) < timestamp_end:
                continue
            # 剔除高位板
            if _code in yesterday_limit_up_codes:
                continue
            # 剔除炸板代码持续涨停时间小于1分钟的代码
            if _code not in cls.__current_limit_up_codes and _code in cls.__total_limit_up_space_dict and \
                    cls.__total_limit_up_space_dict[_code] < 60:
                continue
            # 代码,涨停时间
            block_codes_infos.append((_code, int(k[5])))
        block_codes_infos.append((code, limit_up_time))
        block_codes_infos.sort(key=lambda x: x[1])
        before_codes_info = []
        for i in range(0, len(block_codes_infos)):
            if block_codes_infos[i][0] == code:
                history_index = i
                break
            else:
                before_codes_info.append(block_codes_infos[i])
        return history_index, before_codes_info
    @classmethod
    def __is_radical_buy_with_open_limitup(cls, code, block, yesterday_limit_up_codes):
        """
        是否需要激进买(某个板块开1)
        1.有>=2个开1买老2
        2.有1个开1的买老3
        @param code:
        @param block:
        @param yesterday_limit_up_codes 昨日涨停代码
        @return:
        """
        # 9:45点之前才能买入
        if not constant.TEST:
            if int(tool.get_now_time_str().replace(":", "")) > 94500:
                return False, "超过生效时间"
        # 根据板块聚合数据
        open_limit_up_block_codes_dict = {}
        for c in kpl_data_constant.open_limit_up_code_dict_for_radical_buy:
            blocks = kpl_data_constant.open_limit_up_code_dict_for_radical_buy[c][1]
            for b in blocks:
                if b not in open_limit_up_block_codes_dict:
                    open_limit_up_block_codes_dict[b] = set()
                open_limit_up_block_codes_dict[b].add(c)
        if block not in open_limit_up_block_codes_dict:
            return False, "板块未开1"
        open_limit_up_block_codes = list(open_limit_up_block_codes_dict.get(block))
        count = len(open_limit_up_block_codes)
        # ----获取历史身位----
        history_index, history_before_codes_info = cls.__get_history_index(code, block, yesterday_limit_up_codes)
        # ----获取实时身位----
        current_index, current_before_codes_info = cls.__get_current_index(code, block, yesterday_limit_up_codes)
        if count >= 2 or (
                count == 1 and kpl_data_constant.open_limit_up_code_dict_for_radical_buy[open_limit_up_block_codes[0]][
            0] == 2):
            # 开始数量大于2个或者只有一个2板开1
            if history_index == 0 and current_index == 0:
                return True, f"开1数量:{count}"
            else:
                return False, f"开1数量:{count},非开1首板身位不匹配:历史-{history_index + 1} 实时-{current_index + 1}"
        else:
            # 买老3
            if history_index == 1 and current_index == 1:
                return True, f"开1数量:{count}"
            else:
                return False, f"开1数量:{count},非开1首板身位不匹配:历史-{history_index + 1} 实时-{current_index + 1}"
    @classmethod
    def __is_radical_buy_with_block_up(cls, code, block, yesterday_limit_up_codes):
        """
        是否激进买(板块突然涨起来)
        老大和老二的涨停时间相差5分钟内
        老三的涨停时间距离老大涨停在10分钟内就买
        @param code:
        @param block:
        @param yesterday_limit_up_codes:
        @return:
        """
        # 获取当前的板块
        current_index, current_before_codes_info = cls.__get_current_index(code, block, set())
        if len(current_before_codes_info) < 2:
            return False, f"前排代码小于2个"
        if current_before_codes_info[0][1] >= kpl_block_util.open_limit_up_time_range[1]:
            return False, f"有开1"
        if current_index != 2:
            return False, f"只能买老3,当前身位-{current_index + 1}"
        history_index, history_before_codes_info = cls.__get_history_index(code, block, set())
        if history_index != current_index or len(current_before_codes_info) != len(history_before_codes_info):
            return False, f"前排代码有炸板"
        # 老大,老二必须相隔5分钟内
        if current_before_codes_info[1][1] - current_before_codes_info[0][1] < 5 * 60:
            # 获取当前代码的涨停时间
            limit_up_timestamp = LimitUpDataConstant.get_first_limit_up_time(code)
            if not limit_up_timestamp:
                limit_up_timestamp = time.time()
            if limit_up_timestamp - current_before_codes_info[0][1] < 10 * 60:
                return True, f"前排涨停:{current_before_codes_info}"
        return False, "前排涨停时间间隔不满足条件"
    @classmethod
    def is_radical_buy(cls, code, yesterday_limit_up_codes):
        """
        是否是激进买
        @param code:
        @return: {激进买的板块}, 原因
        """
        # 计算
        # 获取开1的板块
        open_limit_up_code_dict = kpl_data_constant.open_limit_up_code_dict_for_radical_buy
        open_limit_up_blocks = set()
        if open_limit_up_code_dict:
            for c in open_limit_up_code_dict:
                open_limit_up_blocks |= open_limit_up_code_dict[c][1]
        # 获取代码的板块
        keys_, k1_, k11_, k2_, k3_, k4_ = cls.__TargetCodePlateKeyManager.get_plate_keys(code, contains_today=False)
        match_blocks = open_limit_up_blocks & keys_
        can_buy_blocks = set()
        fmsges = []
        msges = []
        for b in match_blocks:
            # 判断板块是否该激进买
            result = cls.__is_radical_buy_with_open_limitup(code, b, yesterday_limit_up_codes)
            if result[0]:
                can_buy_blocks.add(b)
            msges.append(f"【{b}】:{result}")
        fmsges.append("开1判断##" + ",".join(msges))
        if not can_buy_blocks:
            msges.clear()
            for b in match_blocks:
                # 板块快速启动
                result = cls.__is_radical_buy_with_block_up(code, b, yesterday_limit_up_codes)
                if result[0]:
                    can_buy_blocks.add(b)
                msges.append(f"【{b}】:{result}")
            fmsges.append("板块快速启动判断##" + ",".join(msges))
        return can_buy_blocks, " **** ".join(fmsges)
if __name__ == "__main__":
    pass
    KPLPlateForbiddenManager().compute()