Administrator
2025-08-18 c8d828e625fa5b3ae6232666ac2c0e5f51a62c54
third_data/third_blocks_manager.py
@@ -4,12 +4,15 @@
from itertools import combinations
from db.mysql_data_delegate import Mysqldb
from third_data import kpl_api
from utils import middle_api_protocol
from utils.kpl_data_db_util import KPLLimitUpDataUtil
from utils.ths_industry_util import ThsCodeIndustryManager
SOURCE_TYPE_KPL = 1  # 东方财富
SOURCE_TYPE_KPL = 1  # 开盘啦
SOURCE_TYPE_TDX = 2  # 通达信
SOURCE_TYPE_THS = 3  # 同花顺
SOURCE_TYPE_EASTMONEY = 4  # 东方财富
SOURCE_TYPE_KPL_RECORD = 5  # 开盘啦历史数据
class CodeThirdBlocksManager:
@@ -21,6 +24,7 @@
    # 代码板块:{code:{1:{"b1","b2"},2:{"c1","c2"}}}
    __code_source_blocks_dict = {}
    __code_source_blocks_dict_origin = {}
    __ths_industry = ThsCodeIndustryManager()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
@@ -37,8 +41,31 @@
                cls.__code_source_blocks_dict[result[0]] = {}
                cls.__code_source_blocks_dict_origin[result[0]] = {}
            blocks = set(result[2].split("、"))
            if result[1] == SOURCE_TYPE_THS:
                # 同花顺加入2级分类
                industry = cls.__ths_industry.get_industry(result[0])
                if industry:
                    blocks.add(industry)
            cls.__code_source_blocks_dict_origin[result[0]][result[1]] = blocks
            cls.__code_source_blocks_dict[result[0]][result[1]] = BlockMapManager().filter_blocks(blocks)
        # 加载开盘啦历史涨停原因
        kpl_results = KPLLimitUpDataUtil.get_latest_block_infos()
        code_blocks = {}
        for r in kpl_results:
            if r[0] not in code_blocks:
                code_blocks[r[0]] = set()
            code_blocks[r[0]].add(r[2])
            if r[3]:
                code_blocks[r[0]] |= set(r[3].split("、"))
        for code in code_blocks:
            if code not in cls.__code_source_blocks_dict:
                cls.__code_source_blocks_dict[code] = {}
                cls.__code_source_blocks_dict_origin[code] = {}
            blocks = code_blocks[code]
            cls.__code_source_blocks_dict_origin[code][SOURCE_TYPE_KPL_RECORD] = blocks
            cls.__code_source_blocks_dict[code][SOURCE_TYPE_KPL_RECORD] = BlockMapManager().filter_blocks(blocks)
    def get_source_blocks(self, code):
        """
@@ -56,7 +83,7 @@
        """
        return self.__code_source_blocks_dict_origin.get(code)
    def get_intersection_blocks_info(self, code, blocks):
    def get_intersection_blocks_info(self, code, blocks, same_count=2):
        # 获取交集
        bs = []
        b1 = BlockMapManager().filter_blocks(blocks)
@@ -67,13 +94,13 @@
            for s in sb_dict:
                if sb_dict[s]:
                    bs.append(sb_dict[s])
        if len(bs) < 2:
        if len(bs) < same_count:
            return set(), bs
        s_count = len(bs)
        fblocks = set()
        # 求2个平台的交集
        for ces in combinations(bs, 2):
        for ces in combinations(bs, same_count):
            ic = None
            for c in ces:
                if ic is None:
@@ -94,6 +121,9 @@
        # 更新缓存数据
        if code not in self.__code_source_blocks_dict:
            self.__code_source_blocks_dict[code] = {}
        if code not in self.__code_source_blocks_dict_origin:
            self.__code_source_blocks_dict_origin[code] = {}
        if blocks:
            self.__code_source_blocks_dict[code][source_type] = BlockMapManager().filter_blocks(set(blocks))
            self.__code_source_blocks_dict_origin[code][source_type] = set(blocks)
@@ -178,20 +208,100 @@
        if blocks is None or len(blocks) == 0:
            return set()
        fbs = set()
        invalid_blocks = InvalidBlockManager().get_invalid_blocks()
        for block in blocks:
            if block.endswith("概念"):
                block = block[:-2]
            b = self.get_map_blocks_cache(block)
            if b:
                fbs |= b
            if block in invalid_blocks:
                continue
            fbs.add(block)
        return fbs
    def get_all_blocks(self):
        return self.__block_map.keys()
class InvalidBlockManager:
    """
    无效板块管理
    """
    __mysql = Mysqldb()
    __instance = None
    __block = set()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(InvalidBlockManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
    @classmethod
    def __load_data(cls):
        results = cls.__mysql.select_all("select _block from invalid_block")
        cls.__block.clear()
        for result in results:
            if result[0]:
                cls.__block.add(result[0])
    def get_invalid_blocks(self):
        """
        获取无效的板块
        @return:
        """
        return self.__block
    def set_incalid_blocks(self, blocks):
        """
        设置无效的板块
        @param blocks:
        @return:
        """
        # 先删除所有,然后再添加
        self.__mysql.execute("delete from invalid_block")
        for b in blocks:
            self.__mysql.execute(f"insert into invalid_block(_block) values('{b}')")
        self.__block = set(blocks)
def load_if_less(codes):
    """
    加载
    @param codes:
    @return:
    """
    for code in codes:
        source_blocks = CodeThirdBlocksManager().get_source_blocks_origin(code)
        if source_blocks is None:
            source_blocks = {}
        all_source = {SOURCE_TYPE_EASTMONEY, SOURCE_TYPE_TDX, SOURCE_TYPE_THS}
        sources = all_source - source_blocks.keys()
        for source in sources:
            try:
                blocks = middle_api_protocol.request(middle_api_protocol.get_third_blocks(code, source))
                if blocks:
                    CodeThirdBlocksManager().set_blocks(code, blocks, source)
            except:
                pass
def __add_invlaid_blocks():
    blocks_str = """
    昨日连板
    昨日连板_含一字
    昨日涨停
    昨日涨停_含一字
        """
    blocks = set()
    for x in blocks_str.split("\n"):
        if x.strip():
            blocks.add(x.strip())
    print(len(blocks))
    InvalidBlockManager().set_incalid_blocks(blocks)
if __name__ == '__main__':
    code = "000761"
    blocks = kpl_api.getCodeJingXuanBlocks(code, jx=False)
    blocks = set([b[1] for b in blocks])
    print(CodeThirdBlocksManager().get_intersection_blocks_info(code, blocks))
    pass