Administrator
2025-04-11 f09823c8aa1ac777d634d37ab29c0ad85386a06e
third_data/kpl_data_constant.py
@@ -2,11 +2,9 @@
import constant
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util
from log_module.log import logger_debug
from third_data import kpl_util
from third_data.third_blocks_manager import BlockMapManager
from trade import trade_record_log_util
from utils import tool, global_util
from utils.kpl_data_db_util import KPLLimitUpDataUtil
@@ -15,6 +13,49 @@
from utils.tool import singleton
open_limit_up_code_dict_for_radical_buy = None
# 新题材
new_blocks = set()
# 涨停列表新题材: {"代码":"题材"}
limit_up_code_new_block_dict = {}
# 成分股新题材(没涨停) {"代码":{"题材"}}
limit_up_component_code_new_blocks_dict = {}
def get_new_blocks(code):
    """
    获取新概念
    @return:
    """
    blocks = set()
    if code in limit_up_component_code_new_blocks_dict:
        blocks |= limit_up_component_code_new_blocks_dict[code]
    if code in limit_up_code_new_block_dict:
        blocks.add(limit_up_code_new_block_dict[code])
    return blocks
def is_new_block(block):
    """
    是否是新题材
    @param block:
    @return:
    """
    if block in new_blocks:
        return True
    return False
def has_new_block(code):
    """
    是否有新题材
    @param code:
    @return:
    """
    if get_new_blocks(code):
        return True
    return False
@tool.singleton
@@ -70,12 +111,6 @@
    __radical_buy_reasons_dict = {}
    # 原始数据
    __radical_buy_reasons_origin_data_dict = {}
    # 今日的新概念
    __new_blocks = set()
    # 代码的新题材
    __code_new_blocks = {}
    __instance = None
@@ -204,64 +239,11 @@
        @param code:
        @return:
        """
        if code in self.__radical_buy_reasons_dict:
            return set(self.__radical_buy_reasons_dict.get(code))
        return None
    def add_new_blocks(self, code, block):
        """
        添加新题材的板块
        @param code:
        @param block:
        @return: 返回增加新题材是否成功
        """
        # 自由流通股本要大于50亿
        zyltgb = global_util.zyltgb_map.get(code)
        if not zyltgb or zyltgb < 10e8:
            return False
        if block in constant.KPL_INVALID_BLOCKS:
            return False
        old_blocks = self.__radical_buy_reasons_dict.get(code)
        if old_blocks is None:
            old_blocks = set()
        if block in old_blocks:
            return False
        self.__new_blocks.add(block)
        if code not in self.__code_new_blocks:
            self.__code_new_blocks[code] = set()
        self.__code_new_blocks[code].add(block)
        old_blocks.add(block)
        async_log_util.info(logger_debug, f"今日新增概念:{code} - {block}")
        self.__radical_buy_reasons_dict[code] = old_blocks
        return True
    def get_new_blocks(self):
        """
        获取新概念
        @return:
        """
        return self.__new_blocks
    def is_new_block(self, block):
        """
        是否是新题材
        @param block:
        @return:
        """
        if self.__new_blocks and block in self.__new_blocks:
            return True
        return False
    def has_new_block(self, code):
        """
        是否有新题材
        @param code:
        @return:
        """
        if self.__code_new_blocks.get(code):
            return True
        return False
        blocks = set(self.__radical_buy_reasons_dict.get(code, set()))
        # 获取代码的板块
        # 获取代码的新题材
        new_blocks = get_new_blocks(code)
        return blocks | new_blocks
class TodayLimitUpReasonChangeManager: