Administrator
2025-05-26 b51b2ae184fad5aaf37a78903987e064f192d430
third_data/code_plate_key_manager.py
@@ -4,42 +4,77 @@
# 涨停代码关键词板块管理
import copy
import datetime
import itertools
import json
import time
import constant
from code_attribute import code_nature_analyse
from code_attribute import gpcode_manager
from db.redis_manager_delegate import RedisUtils
from third_data import kpl_block_util, kpl_api, kpl_util, kpl_limit_up_data_manager
from third_data import kpl_block_util, kpl_api, kpl_util
from settings.trade_setting import MarketSituationManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
from utils import global_util, tool, buy_condition_util, init_data_util
from log_module import log, async_log_util
from db import redis_manager_delegate as redis_manager
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, ContainsLimitupCodesBlocksManager
from third_data.third_blocks_manager import BlockMapManager
from utils import global_util, tool, buy_condition_util
from log_module import async_log_util
from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data
from log_module.log import logger_kpl_block_can_buy
from log_module.log import logger_kpl_block_can_buy, logger_kpl_jx_out, logger_kpl_jx_in, logger_debug, \
    logger_kpl_latest_gaobiao
from third_data.kpl_util import KPLPlatManager
from trade import trade_manager, l2_trade_util, trade_constant
from trade import l2_trade_util, trade_constant
# 代码精选板块管理
from utils.kpl_data_db_util import KPLLimitUpDataUtil
class KPLCodeJXBlockManager:
    __db = 3
    __redisManager = redis_manager.RedisManager(3)
    __code_blocks = {}
    # 备用
    __code_by_blocks = {}
    # 激进买的代码板块
    __code_blocks_for_radical_buy = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(KPLCodeJXBlockManager, cls).__new__(cls, *args, **kwargs)
            try:
                cls.__load_data()
            except Exception as e:
                logger_debug.exception(e)
        return cls.__instance
    def __get_redis(self):
        return self.__redisManager.getRedis()
    @classmethod
    def __load_data(cls):
        keys = RedisUtils.keys(cls.__get_redis(), "kpl_jx_blocks_by-*")
        if keys:
            for k in keys:
                val = RedisUtils.get(cls.__get_redis(), k)
                val = json.loads(val)
                cls.__code_by_blocks[k.split("-")[1]] = (val, time.time())
        keys = RedisUtils.keys(cls.__get_redis(), "kpl_jx_blocks-*")
        if keys:
            for k in keys:
                val = RedisUtils.get(cls.__get_redis(), k)
                val = json.loads(val)
                cls.__code_blocks[k.split("-")[1]] = (val, time.time())
        keys = RedisUtils.keys(cls.__get_redis(), "kpl_jx_blocks_radical-*")
        if keys:
            for k in keys:
                val = RedisUtils.get(cls.__get_redis(), k)
                val = json.loads(val)
                cls.__code_blocks_for_radical_buy[k.split("-")[1]] = (val, time.time())
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    def save_jx_blocks(self, code, blocks: list, current_limit_up_blocks: set, by=False):
        if not blocks:
@@ -60,28 +95,18 @@
            RedisUtils.setex_async(self.__db, f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(final_blocks))
            self.__code_blocks[code] = (final_blocks, time.time())
    # 获取精选板块
    def get_jx_blocks(self, code, by=False):
        if by:
            if code in self.__code_by_blocks:
                return self.__code_by_blocks[code]
            val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks_by-{code}")
            if val is None:
                return None
            else:
                val = json.loads(val)
                self.__code_by_blocks[code] = val
            return self.__code_by_blocks[code]
        else:
            if code in self.__code_blocks:
                return self.__code_blocks[code]
            val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks-{code}")
            if val is None:
                return None
            else:
                val = json.loads(val)
                self.__code_blocks[code] = val
            return self.__code_blocks[code]
    def save_jx_blocks_for_radical_buy(self, code, blocks: list):
        if not blocks:
            return
        RedisUtils.setex_async(self.__db, f"kpl_jx_blocks_radical-{code}", tool.get_expire(), json.dumps(blocks))
        self.__code_blocks_for_radical_buy[code] = (blocks, time.time())
    # 获取精选板块(激进买)
    def get_jx_blocks_radical(self, code):
        blocks_info = self.__code_blocks_for_radical_buy.get(code)
        if blocks_info:
            return set(blocks_info[0])
        return None
    def get_jx_blocks_cache(self, code, by=False):
        if by:
@@ -106,6 +131,8 @@
                        async_log_util.info(logger_kpl_block_can_buy,
                                            f"{code}:获取到精选板块-{blocks}  耗时:{int(time.time() - start_time)}s")
                        self.save_jx_blocks(code, blocks, current_limit_up_blocks)
                        # 跟随精选板块一起更新
                        self.load_jx_blocks_radical(code)
                    else:
                        # 还没涨停的需要更新精选板块 更新精选板块
                        if abs(float(buy_1_price) - float(limit_up_price)) >= 0.001:
@@ -126,8 +153,8 @@
                                async_log_util.info(logger_kpl_block_can_buy,
                                                    f"{code}:获取到精选板块(更新)-{blocks}  耗时:{int(time.time() - start_time)}s")
                                self.save_jx_blocks(code, blocks, current_limit_up_blocks)
                                # 跟随精选板块一起更新
                                self.load_jx_blocks_radical(code)
                elif price_rate > 0.03:
                    # 添加备用板块
                    if not self.get_jx_blocks_cache(code, by=True):
@@ -136,9 +163,24 @@
                        self.save_jx_blocks(code, blocks, current_limit_up_blocks, by=True)
                        async_log_util.info(logger_kpl_block_can_buy,
                                            f"{code}:获取到精选板块(备用)-{blocks}  耗时:{int(time.time() - start_time)}s")
                        # 跟随精选板块一起更新
                        self.load_jx_blocks_radical(code)
                if price_rate > 0.03:
                    if not self.__code_blocks_for_radical_buy.get(code):
                        self.load_jx_blocks_radical(code)
        except Exception as e:
            logger_kpl_block_can_buy.error(f"{code} 获取板块出错")
            logger_kpl_block_can_buy.exception(e)
    def load_jx_blocks_radical(self, code):
        start_time = time.time()
        blocks = kpl_api.getCodeJingXuanBlocks(code, jx=False)
        blocks = set([b[1] for b in blocks])
        # fblocks = BlockMapManager().filter_blocks(blocks)
        async_log_util.info(logger_kpl_block_can_buy,
                            f"{code}:获取到板块(激进买) 过滤前-{blocks} 耗时:{int(time.time() - start_time)}s")
        self.save_jx_blocks_for_radical_buy(code, list(blocks))
# 禁止下单的板块
@@ -185,8 +227,18 @@
# 开盘啦禁止交易板块管理
class KPLPlateForbiddenManager:
    __redisManager = redis_manager.RedisManager(3)
    """
    不能买的板块管理
    """
    __redis_manager = redis_manager.RedisManager(3)
    __kpl_forbidden_plates_cache = set()
    # 已经删除了的板块
    __deleted_kpl_forbidden_plates_cache = set()
    # 监控的高标板块代码字典:{"板块":{"代码1","代码2"}}
    __watch_block_high_codes = {}
    # 高标代码
    __watch_high_codes = set()
    __instance = None
@@ -200,24 +252,164 @@
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            __kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "kpl_forbidden_plates")
            cls.__kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "kpl_forbidden_plates")
            cls.__deleted_kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "deleted_kpl_forbidden_plates")
        finally:
            RedisUtils.realse(__redis)
        cls.__load_latest_gb()
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
        return cls.__redis_manager.getRedis()
    def save_plate(self, plate):
        self.__kpl_forbidden_plates_cache.add(plate)
        RedisUtils.sadd(self.__get_redis(), "kpl_forbidden_plates", plate)
        RedisUtils.expire(self.__get_redis(), "kpl_forbidden_plates", tool.get_expire())
        self.__deleted_kpl_forbidden_plates_cache.discard(plate)
        RedisUtils.srem(self.__get_redis(), "deleted_kpl_forbidden_plates", plate)
        RedisUtils.expire(self.__get_redis(), "deleted_kpl_forbidden_plates", tool.get_expire())
    def delete_plate(self, plate):
        self.__kpl_forbidden_plates_cache.discard(plate)
        RedisUtils.srem(self.__get_redis(), "kpl_forbidden_plates", plate)
        RedisUtils.expire(self.__get_redis(), "kpl_forbidden_plates", tool.get_expire())
        self.__deleted_kpl_forbidden_plates_cache.add(plate)
        RedisUtils.sadd(self.__get_redis(), "deleted_kpl_forbidden_plates", plate)
        RedisUtils.expire(self.__get_redis(), "deleted_kpl_forbidden_plates", tool.get_expire())
    def list_all(self):
        return RedisUtils.smembers(self.__get_redis(), "kpl_forbidden_plates")
    def list_all_cache(self):
        return self.__kpl_forbidden_plates_cache
    def list_all_deleted_cache(self):
        return self.__deleted_kpl_forbidden_plates_cache
    def is_in_cache(self, plate):
        if self.__kpl_forbidden_plates_cache and plate in self.__kpl_forbidden_plates_cache:
            return True
        return False
    @classmethod
    def __load_latest_gb(cls):
        """
        加载最近的市场高标
        @return:
        """
        # 获取最近10个交易日涨停的涨停数据
        dates = HistoryKDatasUtils.get_latest_trading_date_cache(10)
        if not dates:
            return
        min_date = dates[-1]
        sql = f"SELECT r.`_code`, r.`_hot_block_name`, r.`_day`, r.`_open` FROM `kpl_limit_up_record` r WHERE r.`_day`>='{min_date}'"
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_all(sql)
        code_days_map = {}
        # 每炸板
        f_code_days_map = {}
        for r in results:
            if r[0] not in code_days_map:
                code_days_map[r[0]] = set()
            code_days_map[r[0]].add(r[2])
            if not r[3]:
                if r[0] not in f_code_days_map:
                    f_code_days_map[r[0]] = set()
                f_code_days_map[r[0]].add(r[2])
        # 过滤涨停次数>=3次的数据
        target_codes = set()
        for code in code_days_map:
            if f_code_days_map.get(code) and (len(f_code_days_map.get(code)) >= 4 or (
                    tool.is_ge_code(code) and len(f_code_days_map.get(code)) >= 2)):
                # 且有3天属于连续涨停
                day_list = list(code_days_map[code])
                day_list.sort(reverse=True)
                step = 3
                has_continue = False
                for i in range(0, len(day_list) - step + 1):
                    item_list = day_list[i:i + step]
                    # 是否属于连续涨停
                    is_sub = False
                    for j in range(0, len(dates) - step):
                        if f"{dates[j:j + step]}" == f"{item_list}":
                            is_sub = True
                            break
                    if is_sub:
                        has_continue = True
                        break
                if not has_continue:
                    continue
                has_big_deal = False
                # 最近10个交易日的成交额要大于10亿
                volumes_data = HistoryKDataManager().get_history_bars(code, dates[0])
                if volumes_data:
                    for d in volumes_data[:10]:
                        if d["amount"] > 10e8:
                            has_big_deal = True
                            break
                if not has_big_deal:
                    continue
                target_codes.add(code)
        # 代码对应的板块
        code_blocks = {}
        for r in results:
            if r[0] not in target_codes:
                continue
            if r[0] not in code_blocks:
                code_blocks[r[0]] = set()
            code_blocks[r[0]].add(kpl_util.filter_block(r[1]))
        # 所有板块对应的代码集合
        block_codes = {}
        for code in code_blocks:
            for b in code_blocks[code]:
                if b in constant.KPL_INVALID_BLOCKS:
                    continue
                if b not in block_codes:
                    block_codes[b] = set()
                block_codes[b].add(code)
        print(block_codes)
        cls.__watch_block_high_codes = block_codes
        logger_kpl_latest_gaobiao.info(f"{block_codes}")
        cls.__watch_high_codes.clear()
        for b in block_codes:
            cls.__watch_high_codes |= block_codes[b]
        for k in block_codes:
            print(k, [(x, gpcode_manager.get_code_name(x)) for x in block_codes[k]])
    def get_watch_high_codes(self):
        return self.__watch_high_codes
    def get_watch_high_codes_by_block(self, b):
        return self.__watch_block_high_codes.get(b)
    def compute(self, code_rate_dict: dict):
        """
        根据比例计算需要拉黑的代码
        @param code_rate_dict: 涨幅百分数
        @return:
        """
        try:
            if self.__watch_block_high_codes:
                forbidden_blocks = set()
                for b in self.__watch_block_high_codes:
                    total_rate = 0
                    for code in self.__watch_block_high_codes[b]:
                        if code in code_rate_dict:
                            total_rate += code_rate_dict.get(code)
                    average_rate = total_rate / len(self.__watch_block_high_codes[b])
                    if average_rate < 1:
                        forbidden_blocks.add(b)
                    # async_log_util.info(logger_debug, f"板块平均涨幅 {b}-{average_rate}")
                self.__kpl_forbidden_plates_cache = forbidden_blocks
                async_log_util.info(logger_debug, f"拉黑板块:{forbidden_blocks}")
        except Exception as e:
            logger_debug.exception(e)
class LimitUpCodesPlateKeyManager:
@@ -255,12 +447,6 @@
    @classmethod
    def get_today_limit_up_reason(cls, code):
        return cls.__today_total_limit_up_reason_dict.get(code)
    # 今日涨停原因变化
    def set_today_limit_up_reason_change(self, code, from_reason, to_reason):
        RedisUtils.sadd(self.__get_redis(), f"kpl_limit_up_reason_his-{code}", from_reason)
        RedisUtils.expire(self.__get_redis(), f"kpl_limit_up_reason_his-{code}", tool.get_expire())
        self.__set_total_keys(code)
    # 设置代码的今日涨停原因
    def __set_total_keys(self, code):
@@ -302,8 +488,10 @@
# 实时开盘啦市场数据
class RealTimeKplMarketData:
    # 精选前5
    top_5_reason_list = []
    # 流入缓存 [ID, 板块名称, 板块涨幅, 流入金额]
    top_in_list_cache = []
    # 流出缓存
    top_out_list_cache = []
    # 行业前5
    top_5_industry_list = []
    #
@@ -313,31 +501,156 @@
    __KPLPlateForbiddenManager = KPLPlateForbiddenManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __KPLPlatManager = KPLPlatManager()
    # 精选流入前几
    __top_jx_blocks = []
    # 精选流出前几
    __top_jx_out_blocks = []
    # 精选板块流入金额
    __jx_blocks_in_money_dict = {}
    # 市场行情热度,默认为60
    __market_strong = 60
    @classmethod
    def set_top_5_reasons(cls, datas):
        temp_list = []
        for d in datas:
            cls.total_reason_dict[d[1]] = d
        # 排序
        for i in range(0, len(datas)):
            if datas[i][1] not in constant.KPL_INVALID_BLOCKS:
                # (名称,净流入金额,排名)
                temp_list.append((datas[i][1], datas[i][3], len(temp_list)))
                # 只获取前10个
                if len(temp_list) > 10:
                    break
                if datas[i][3] < 3 * 10000 * 10000:
                    break
    def get_jingxuan_in_block_threshold_count(cls):
        """
        获取买精选流入前几
        @return:
        """
        score = 60
        if cls.__market_strong is not None:
            score = int(cls.__market_strong)
        for info in constant.RADICAL_BUY_TOP_IN_COUNT_BY_MARKET_STRONG:
            if info[0] <= score < info[1]:
                return info[2]
        return 10
        for temp in temp_list:
            names = cls.__KPLPlatManager.get_same_plat_names_by_id(temp[0])
            for name in names:
                if name == temp[1]:
                    continue
                temp_list.append((name, temp[1], temp[2]))
        cls.top_5_reason_list = temp_list
        cls.__reset_top_5_dict()
    @classmethod
    def set_market_jingxuan_blocks(cls, datas):
        """
        设置精选流入数据
        @param datas:[(板块编号,板块名称,涨幅, 板块流入金额)]
        @return:
        """
        # 流入阈值
        # THRESHOLD_MONEY = 50 * (tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") // 60) + 1000
        # THRESHOLD_MONEY = min(THRESHOLD_MONEY, 10000)
        # THRESHOLD_MONEY = max(THRESHOLD_MONEY, 1000)
        # THRESHOLD_MONEY = THRESHOLD_MONEY * 10000
        THRESHOLD_MONEY = 0
        # 最大数量
        # MAX_COUNT = cls.get_jingxuan_in_block_threshold_count()
        cls.top_in_list_cache = datas
        blocks = set()
        count = 0
        fblock_money = {}
        for data in datas:
            cls.__jx_blocks_in_money_dict[data[1]] = data[3]
            if data[1] in constant.KPL_INVALID_BLOCKS:
                continue
            if data[3] < THRESHOLD_MONEY:
                continue
            # 过滤出来为同一个板块就只算1个数量
            fb = BlockMapManager().filter_blocks({data[1]})
            if blocks & fb:
                continue
            for b in fb:
                fblock_money[b] = data[3]
            blocks |= fb
            # 如果该原因没有涨停票要往后移一位
            has_code = False
            for b in fb:
                if ContainsLimitupCodesBlocksManager().get_block_codes(b):
                    has_code = True
                    break
            if has_code:
                count += 1
                if count == 10:
                    strong = cls.get_market_strong()
                    if strong is None:
                        strong = 60
                    if data[3] > 3e7:
                        # 大于3千万
                        THRESHOLD_MONEY = int((1 - strong / 200) * data[3])
                    else:
                        THRESHOLD_MONEY = data[3]
            # if count >= MAX_COUNT:
            #     break
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_in, f"原数据:{datas[:50]} 板块:{blocks}")
        blocks = list(blocks)
        blocks.sort(key=lambda x: fblock_money.get(x), reverse=True)
        cls.__top_jx_blocks = blocks
    @classmethod
    def set_market_jingxuan_out_blocks(cls, datas):
        """
        设置精选流出数据
        @param datas:
        @return:
        """
        cls.top_out_list_cache = datas
        count = 0
        blocks = set()
        for data in datas:
            cls.__jx_blocks_in_money_dict[data[1]] = data[3]
            if data[1] in constant.KPL_INVALID_BLOCKS:
                continue
            if data[3] > -5e7:
                # 过滤5千万以上的
                break
            # 过滤出来为同一个板块就只算1个数量
            fb = BlockMapManager().filter_blocks({data[1]})
            if blocks & fb:
                continue
            blocks |= fb
            count += 1
            if count >= 10:
                break
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_out, f"原数据:{datas[:10]} 板块:{blocks}")
        cls.__top_jx_out_blocks = list(blocks)
    @classmethod
    def set_market_strong(cls, strong):
        """
        设置市场行情强度
        @param strong:
        @return:
        """
        cls.__market_strong = strong
    @classmethod
    def is_ignore_block_in_money(cls):
        if cls.__market_strong and cls.__market_strong >= constant.IGNORE_BLOCK_IN_MONEY_MARKET_STRONG:
            return True
        return False
    @classmethod
    def get_market_strong(cls):
        return cls.__market_strong
    @classmethod
    def get_top_market_jingxuan_blocks(cls):
        return cls.__top_jx_blocks
    @classmethod
    def get_top_market_jingxuan_out_blocks(cls):
        return cls.__top_jx_out_blocks
    @classmethod
    def get_block_info_at_block_in(cls, b):
        """
        获取板块的净流入情况
        @param b:
        @return: (板块名称,身位,流入金额)
        """
        for i in range(0, len(cls.top_in_list_cache)):
            if cls.top_in_list_cache[i][1] == b:
                return b, i, cls.top_in_list_cache[i][3]
        return b, -1, 0
    @classmethod
    def set_top_5_industry(cls, datas):
@@ -370,35 +683,6 @@
        temp_set = cls.top_5_key_dict.keys()
        return temp_set
    # 通过关键字判断能买的代码数量
    @classmethod
    def get_can_buy_codes_count(cls, code, key):
        # 判断行业涨停票数量,除开自己必须大于1个
        temp_codes = LimitUpCodesPlateKeyManager.total_key_codes_dict.get(key)
        if temp_codes is None:
            temp_codes = set()
        else:
            temp_codes = set(temp_codes)
        temp_codes.discard(code)
        if len(temp_codes) < 1:
            # 后排才能挂单
            return 0, "身位不为后排"
        forbidden_plates = cls.__KPLPlateForbiddenManager.list_all_cache()
        if key in forbidden_plates:
            return 0, "不买该板块"
        # 10:30以前可以挂2个单
        if int(tool.get_now_time_str().replace(':', '')) < int("100000"):
            return 2, "10:00以前可以挂2个单"
        # 10:30以后
        if key not in cls.top_5_key_dict:
            return 0, "净流入没在前5"
        if cls.top_5_key_dict[key][1] > 3 * 10000 * 10000:
            return 2, "净流入在前5且大于3亿"
        else:
            return 1, "净流入在前5"
    @classmethod
    def is_in_top(cls, keys):
        reasons = cls.get_can_buy_key_set()
@@ -410,6 +694,10 @@
        else:
            return False, None
    @classmethod
    def get_jx_block_in_money(cls, block):
        return cls.__jx_blocks_in_money_dict.get(block)
# 代码历史涨停原因与板块管理
class CodesHisReasonAndBlocksManager:
@@ -418,6 +706,14 @@
    __history_limit_up_reason_dict = {}
    # 板块
    __blocks_dict = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodesHisReasonAndBlocksManager, cls).__new__(cls, *args, **kwargs)
        return cls.__instance
    def __get_redis(self):
        return self.__redisManager.getRedis()
@@ -475,6 +771,41 @@
            blocks = set()
        return reasons | blocks
    __history_blocks_dict_cache = {}
    def get_history_blocks(self, code):
        """
        获取180天的历史涨停原因
        @param code:
        @return:
        """
        if code in self.__history_blocks_dict_cache:
            return self.__history_blocks_dict_cache.get(code)
        try:
            kpl_results = KPLLimitUpDataUtil.get_latest_block_infos(code=code)
            # 取最近2条数据
            if kpl_results and len(kpl_results) > 2:
                kpl_results = kpl_results[-2:]
            keys = set()
            if kpl_results:
                keys |= set([x[2] for x in kpl_results])
            for r in kpl_results:
                if r[3]:
                    keys |= set(r[3].split("、"))
            self.__history_blocks_dict_cache[code] = keys
            return keys
        except:
            pass
        return set()
    def get_history_blocks_cache(self, code):
        """
        获取180天的历史涨停原因缓存
        @param code:
        @return:
        """
        return self.__history_blocks_dict_cache.get(code)
# 目标代码板块关键词管理
class TargetCodePlateKeyManager:
@@ -486,9 +817,9 @@
        return self.__redisManager.getRedis()
    # 返回key集合(排除无效板块),今日涨停原因,今日历史涨停原因,历史涨停原因,二级,精选板块
    def get_plate_keys(self, code):
    def get_plate_keys(self, code, contains_today=True):
        """
        获取代码的板块
        获取代码的板块: (180天的涨停原因+推荐原因)+今日涨停原因+今日涨停推荐原因+今日推荐原因
        @param code:
        @return: (板块关键词集合,今日涨停原因+涨停推荐原因,今日历史涨停原因,历史涨停原因,精选板块)
        """
@@ -503,10 +834,13 @@
        k2 = self.__CodesPlateKeysManager.get_history_limit_up_reason_cache(code)
        if k2 is None:
            k2 = set()
        k3 = set()
        industry = global_util.code_industry_map.get(code)
        if industry:
            k3 = {industry}
        k3 = self.__CodesPlateKeysManager.get_history_blocks(code)
        if k3:
            keys |= k3
        # industry = global_util.code_industry_map.get(code)
        # if industry:
        #     k3 = {industry}
        k4 = set()
        jingxuan_block_info = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code)
@@ -515,15 +849,22 @@
        if jingxuan_block_info:
            jingxuan_blocks = jingxuan_block_info[0]
            k4 |= set(jingxuan_blocks)  # set([x[1] for x in jingxuan_blocks])
        if k1:
        if k1 and contains_today:
            # 涨停过
            keys |= k1
            keys = keys - set(constant.KPL_INVALID_BLOCKS)
        if not keys:
            # 获取不到涨停原因
        # 获取不到涨停原因
        if contains_today:
            keys |= k4
            keys = keys - set(constant.KPL_INVALID_BLOCKS)
        keys = keys - set(constant.KPL_INVALID_BLOCKS)
        return keys, k1, k11, k2, k3, k4
    def get_plate_keys_for_radical_buy(self, code):
        """
        激进买入的板块
        @param code:
        @return:
        """
class CodePlateKeyBuyManager:
@@ -541,26 +882,14 @@
    __TargetCodePlateKeyManager = TargetCodePlateKeyManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __CodesHisReasonAndBlocksManager = CodesHisReasonAndBlocksManager()
    __CodesTradeStateManager = trade_manager.CodesTradeStateManager()
    __can_buy_compute_result_dict = {}
    @classmethod
    def __remove_from_l2(cls, code, msg):
        # 根据身位移除代码
        # return
        # 下过单的代码不移除
        if trade_manager.CodesTradeStateManager().get_trade_state_cache(code) != trade_constant.TRADE_STATE_NOT_TRADE:
            # 只要下过单的就不移除
            return
        l2_trade_util.forbidden_trade(code, msg=msg)
        logger_kpl_block_can_buy.info(msg)
    # 是否需要激进买
    # 是否需要积极买
    @classmethod
    def __is_need_active_buy(cls, code, block, current_rank, open_limit_up_count):
        """
        板块是否需要激进买入
        规则:根据身位判断是否需要激进买,根据时间划分
        板块是否需要积极买入
        规则:根据身位判断是否需要积极买,根据时间划分
        @param code: 代码
        @param block: 板块名称
        @param current_rank: 目前在板块中的身位,从0开始
@@ -582,7 +911,7 @@
                break
        return False
    # 返回内容(是否可买, 是否为独苗, 描述信息, 是否为强势主线, 是否需要激进买)
    # 返回内容(是否可买, 是否为独苗, 描述信息, 是否为强势主线, 是否需要积极买)
    @classmethod
    def __is_block_can_buy(cls, code, block, current_limit_up_datas, code_limit_up_reasons_dict,
                           yesterday_current_limit_up_codes, limit_up_record_datas, current_limit_up_block_codes_dict,
@@ -599,7 +928,7 @@
        if not block_codes:
            # 高位板泛化板块中无板块
            if not high_level_block_codes.get(block):
                return False, True, f"{block}:板块无涨停", False, False
                return False, True, f"【{block}】:板块无涨停", False, False
        elif len(block_codes) == 1 and code in block_codes:
            if not high_level_block_codes.get(block):
                return False, True, f"{block}:板块只有当前代码涨停", False, False
@@ -665,7 +994,7 @@
                                                                                                    current_open_limit_up_codes),
                                                                                                shsz=True,
                                                                                                limit_up_time=first_limit_up_time)
        # 计算是否需要激进买入
        # 计算是否需要积极买入
        is_active_buy = cls.__is_need_active_buy(code, block, current_shsz_rank, len(current_open_limit_up_codes))
        # record_shsz_rank, record_shsz_rank_codes = kpl_block_util.get_code_record_rank(code, block,
@@ -749,7 +1078,7 @@
        if not block_codes:
            # 高位板泛化板块中无板块
            if not high_level_block_codes.get(block):
                return False, True, f"{block}:板块无涨停", False, False, 0, 0, 0
                return False, True, f"【{block}】:板块无涨停", False, False, 0, 0, 0
        elif len(block_codes) == 1 and code in block_codes:
            if not high_level_block_codes.get(block):
                return False, True, f"{block}:板块只有当前代码涨停", False, False, 0, 0, 0
@@ -798,7 +1127,7 @@
                                                                                                shsz=True,
                                                                                                limit_up_time=first_limit_up_time)
        # 计算是否需要激进买入
        # 计算是否需要积极买入
        is_active_buy = cls.__is_need_active_buy(code, block, current_shsz_rank, len(current_open_limit_up_codes))
        if current_shsz_rank < len(current_open_limit_up_codes) + max_rank:
@@ -825,21 +1154,31 @@
        # 加载涨停代码的目标板块
        def load_code_block():
            if limit_up_record_datas:
                # 获取今日9:30以前的时间
                time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00"
                timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))
                for d in limit_up_record_datas:
                    if d[2] in constant.KPL_INVALID_BLOCKS and d[3] in before_blocks_dict:
                        code_limit_up_reasons_dict[d[3]] = {list(before_blocks_dict.get(d[3]))[0]}
                    else:
                        code_limit_up_reasons_dict[d[3]] = {d[2]}
                        # 不包含推荐原因
                        # if d[6]:
                        #     code_limit_up_reasons_dict[d[3]] |= set(d[6].split("、"))
                        # 开1才能包含推荐原因
                        if d[6] and int(d[5]) < timestamp:
                            code_limit_up_reasons_dict[d[3]] |= set(d[6].split("、"))
            return code_limit_up_reasons_dict
        if current_limit_up_datas is None:
            current_limit_up_datas = []
        # 获取目标代码板块
        keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code)
        # keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code)
        keys = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
        if not keys:
            keys = set()
        keys = BlockMapManager().filter_blocks(keys)
        if keys:
            keys -= constant.KPL_INVALID_BLOCKS
        # log.logger_kpl_debug.info("{}最终关键词:{}", code, keys)
@@ -848,6 +1187,7 @@
        fresults = []
        if not keys:
            return fresults, set()
        code_limit_up_reasons_dict = {}
        load_code_block()
        for block in keys:
@@ -866,7 +1206,7 @@
    # 是否可以下单
    # 返回:可以买的板块,是否独苗,消息
    #  可买的板块, 是否独苗, 消息, 可买的强势板块, 关键词, 激进买的板块
    #  可买的板块, 是否独苗, 消息, 可买的强势板块, 关键词, 积极买的板块
    @classmethod
    def can_buy(cls, code):
        if constant.TEST:
@@ -876,11 +1216,12 @@
        #     return True, "不判断板块身位"
        return cls.__can_buy_compute_result_dict.get(code)
    # 返回:(可以买的板块列表, 是否是独苗, 消息简介,可买的强势主线, 激进买入板块列表)
    # 返回:(可以买的板块列表, 是否是独苗, 消息简介,可买的强势主线, 积极买入板块列表)
    @classmethod
    def __compute_can_buy_blocks(cls, code, current_limit_up_datas, limit_up_record_datas,
                                 yesterday_current_limit_up_codes, before_blocks_dict,
                                 current_limit_up_block_codes_dict, high_level_general_code_blocks):
                                 current_limit_up_block_codes_dict, high_level_general_code_blocks, codes_delegate,
                                 codes_success):
        # 根据代码泛化板块获取泛化板块的代码集合
        high_level_general_block_codes = {}
        for c in high_level_general_code_blocks:
@@ -896,10 +1237,7 @@
                                                             high_level_general_block_codes)
        if not blocks_compute_results:
            return False, True, f"没有找到板块", [], keys, []
        codes_delegate = set(cls.__CodesTradeStateManager.get_codes_by_trade_states_cache(
            {trade_constant.TRADE_STATE_BUY_DELEGATED, trade_constant.TRADE_STATE_BUY_PLACE_ORDER}))
        codes_success = set(cls.__CodesTradeStateManager.get_codes_by_trade_states_cache(
            {trade_constant.TRADE_STATE_BUY_SUCCESS}))
        codes = codes_delegate | codes_success
        # 统计成交代码的板块
        trade_codes_blocks_dict = {}
@@ -939,7 +1277,7 @@
        msg_list = []
        active_buy_blocks = []
        for r in blocks_compute_results:
            # r的数据结构(板块,是否可以买,是否独苗,消息,是否是强势板块, 激进买入信息)
            # r的数据结构(板块,是否可以买,是否独苗,消息,是否是强势板块, 积极买入信息)
            if r[2]:
                # 独苗
                unique_count += 1
@@ -964,7 +1302,7 @@
                    msg_list.append(r[3])
                if r[5]:
                    active_buy_blocks.append(r[0])
                    msg_list.append(f"【{r[0]}】激进买入({r[5]})")
                    msg_list.append(f"【{r[0]}】积极买入({r[5]})")
            else:
                if r[3]:
                    msg_list.append(r[3])
@@ -978,7 +1316,7 @@
    @classmethod
    def update_can_buy_blocks(cls, code, current_limit_up_datas, limit_up_record_datas,
                              latest_current_limit_up_records,
                              before_blocks_dict, current_limit_up_block_codes_dict):
                              before_blocks_dict, current_limit_up_block_codes_dict, delegate_codes, deal_codes):
        yesterday_current_limit_up_codes = set()
        yesterday_current_limit_up_records_dict = {}
        yesterday_current_limit_up_records = latest_current_limit_up_records[0][1]
@@ -1012,212 +1350,13 @@
                                                                                                                   yesterday_current_limit_up_codes,
                                                                                                                   before_blocks_dict,
                                                                                                                   current_limit_up_block_codes_dict,
                                                                                                                   high_level_general_code_blocks)
                                                                                                                   high_level_general_code_blocks,
                                                                                                                   delegate_codes,
                                                                                                                   deal_codes)
        # 保存板块计算结果
        cls.__can_buy_compute_result_dict[code] = (
            can_buy_blocks, unique, msg, can_buy_strong_blocks, keys, active_buy_blocks)
class LatestLimitUpBlockManager:
    """
    最近涨停的板块管理
    """
    # 看最近7天
    __LATEST_DAY_COUNT = 7
    __days = []
    # 目前涨停
    __current_limit_up_day_datas = {}
    # 曾涨停
    __history_limit_up_day_datas = {}
    # K线数据
    __k_datas = {}
    # 代码的最高涨幅
    __k_max_rate = {}
    __code_name_dict = {}
    # 统计板块数据:{"day":{"板块":[(涨停数,破板数, 代码集合)]}}
    __block_day_datas = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(LatestLimitUpBlockManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __load_datas(cls):
        # 加载最近7天的数据
        __days = HistoryKDatasUtils.get_latest_trading_date_cache(cls.__LATEST_DAY_COUNT - 1)
        now_day = tool.get_now_date_str()
        if __days[0] != now_day:
            __days.insert(0, now_day)
        cls.__days = __days
        # 加载之前6天的涨停,曾涨停,曾涨停代码的最近6天的K线
        for day in __days:
            if day == now_day:
                continue
            limit_up_records = kpl_limit_up_data_manager.get_history_limit_up_datas(day)
            cls.__history_limit_up_day_datas[day] = limit_up_records
            current_limit_up_datas = kpl_limit_up_data_manager.get_current_limit_up_datas(day)
            cls.__current_limit_up_day_datas[day] = current_limit_up_datas
        # 获取代码的k线
        __total_codes = set()
        for d in cls.__current_limit_up_day_datas:
            __total_codes |= set([dd[3] for dd in cls.__history_limit_up_day_datas[d]])
        # 获取最近7天的k线情况
        for code in __total_codes:
            cls.__get_bars(code)
        # 统计前6天的板块信息
        for day in __days:
            if day == now_day:
                continue
            cls.__block_day_datas[day] = cls.__statistics_limit_up_block_infos_by_day(day)
    def set_current_limit_up_data(self, day, datas):
        self.__current_limit_up_day_datas[day] = datas
        self.__history_limit_up_day_datas[day] = kpl_limit_up_data_manager.get_today_history_limit_up_datas_cache()
        # 加载代码K线数据
        __total_codes = set([d[0] for d in datas])
        __total_codes |= set([d[3] for d in self.__history_limit_up_day_datas[day]])
        for code in __total_codes:
            self.__get_bars(code)
    @classmethod
    def __statistics_limit_up_block_infos_by_day(cls, day):
        """
        统计涨停代码信息
        @return:
        """
        # 统计板块的
        current_code_dict = {d[0]: d for d in cls.__current_limit_up_day_datas[day]}
        # history_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas[day]}
        block_codes_dict = {}
        for h in cls.__history_limit_up_day_datas[day]:
            cls.__code_name_dict[h[3]] = h[4]
            if h[2] not in block_codes_dict:
                block_codes_dict[h[2]] = set()
            block_codes_dict[h[2]].add(h[3])
        fdata = {}
        for b in block_codes_dict:
            limit_up_count = 0
            open_limit_up_count = 0
            for code in block_codes_dict[b]:
                if code in current_code_dict:
                    limit_up_count += 1
                else:
                    open_limit_up_count += 1
            fdata[b] = (limit_up_count, open_limit_up_count, block_codes_dict[b])
        return fdata
    def statistics_limit_up_block_infos(self):
        """
        统计涨停板块数据
        @return:
        """
        # 只统计今天的数据
        now_day = tool.get_now_date_str()
        block_dict = self.__statistics_limit_up_block_infos_by_day(now_day)
        self.__block_day_datas[now_day] = block_dict
        # 板块出现的天数
        block_count_dict = {}
        # 板块出现的代码
        block_codes_dict = {}
        for day in self.__block_day_datas:
            for b in self.__block_day_datas[day]:
                if b not in block_count_dict:
                    block_count_dict[b] = set()
                if b not in block_codes_dict:
                    block_codes_dict[b] = set()
                block_count_dict[b].add(day)
                block_codes_dict[b] |= self.__block_day_datas[day][b][2]
        block_count_list = [(k, block_count_dict[k]) for k in block_count_dict]
        block_count_list.sort(key=lambda x: x[1], reverse=True)
        block_count_list = block_count_list[:20]
        # [(涨停原因,累计涨停次数,连续次数)]
        fdatas = []
        today_records_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas.get(now_day)}
        for d in block_count_list:
            b = d[0]
            fdata = [d[0], len(d[1])]
            temp = []
            max_continue_count = 0
            for day in self.__days:
                if d[0] in self.__block_day_datas[day]:
                    temp.append(day)
                else:
                    c = len(temp)
                    if c > max_continue_count:
                        max_continue_count = c
                    temp.clear()
            c = len(temp)
            if c > max_continue_count:
                max_continue_count = c
            temp.clear()
            # 最大连续次数
            fdata.append(max_continue_count)
            # 最高板
            max_rate_info = None
            for code in block_codes_dict[d[0]]:
                if max_rate_info is None:
                    max_rate_info = (code, self.__k_max_rate.get(code),  self.__code_name_dict.get(code))
                if max_rate_info[1] < self.__k_max_rate.get(code):
                    max_rate_info = (code, self.__k_max_rate.get(code), self.__code_name_dict.get(code))
            fdata.append(max_rate_info)
            # 统计今天这个板块中大于二板的代码数量
            limit_up_counts = 0
            for code in block_codes_dict[d[0]]:
                if code in today_records_code_dict and today_records_code_dict[code][12] != '首板':
                    limit_up_counts += 1
            fdata.append(limit_up_counts)
            # 获取每天的数量
            days_datas = []
            for day in self.__days:
                binfo = self.__block_day_datas[day].get(b)
                if not binfo:
                    days_datas.append((0, 0))
                else:
                    days_datas.append((binfo[0], binfo[1]))
            fdata.append(days_datas)
            fdatas.append(fdata)
        return fdatas
    @classmethod
    def __get_bars(cls, code):
        """
        获取K线
        @param code:
        @return:
        """
        if code in cls.__k_datas:
            return cls.__k_datas[code]
        volumes_data = None
        if cls.__days:
            volumes_data = HistoryKDataManager().get_history_bars(code, cls.__days[1])
            if volumes_data:
                volumes_data = volumes_data[:cls.__LATEST_DAY_COUNT - 1]
                cls.__k_datas[code] = volumes_data
        if not volumes_data:
            volumes_data = init_data_util.get_volumns_by_code(code, cls.__LATEST_DAY_COUNT - 1)
            if volumes_data:
                cls.__k_datas[code] = volumes_data
        # 获取最大涨幅
        min_price = volumes_data[-1]["low"]
        for d in volumes_data:
            if min_price > d["low"]:
                min_price = d["low"]
        rate = int((volumes_data[0]["close"] - min_price) * 100 / min_price)
        cls.__k_max_rate[code] = rate
        return cls.__k_datas.get(code)
if __name__ == "__main__":
    pass
    KPLPlateForbiddenManager().compute()