Administrator
2025-08-18 c8d828e625fa5b3ae6232666ac2c0e5f51a62c54
third_data/third_blocks_manager.py
@@ -5,12 +5,14 @@
from db.mysql_data_delegate import Mysqldb
from utils import middle_api_protocol
from utils.kpl_data_db_util import KPLLimitUpDataUtil
from utils.ths_industry_util import ThsCodeIndustryManager
SOURCE_TYPE_KPL = 1  # 东方财富
SOURCE_TYPE_KPL = 1  # 开盘啦
SOURCE_TYPE_TDX = 2  # 通达信
SOURCE_TYPE_THS = 3  # 同花顺
SOURCE_TYPE_EASTMONEY = 4  # 东方财富
SOURCE_TYPE_KPL_RECORD = 5  # 开盘啦历史数据
class CodeThirdBlocksManager:
@@ -40,12 +42,30 @@
                cls.__code_source_blocks_dict_origin[result[0]] = {}
            blocks = set(result[2].split("、"))
            if result[1] == SOURCE_TYPE_THS:
                # 同花顺加入2级分类
                industry = cls.__ths_industry.get_industry(result[0])
                if industry:
                    blocks.add(industry)
            cls.__code_source_blocks_dict_origin[result[0]][result[1]] = blocks
            cls.__code_source_blocks_dict[result[0]][result[1]] = BlockMapManager().filter_blocks(blocks)
        # 加载开盘啦历史涨停原因
        kpl_results = KPLLimitUpDataUtil.get_latest_block_infos()
        code_blocks = {}
        for r in kpl_results:
            if r[0] not in code_blocks:
                code_blocks[r[0]] = set()
            code_blocks[r[0]].add(r[2])
            if r[3]:
                code_blocks[r[0]] |= set(r[3].split("、"))
        for code in code_blocks:
            if code not in cls.__code_source_blocks_dict:
                cls.__code_source_blocks_dict[code] = {}
                cls.__code_source_blocks_dict_origin[code] = {}
            blocks = code_blocks[code]
            cls.__code_source_blocks_dict_origin[code][SOURCE_TYPE_KPL_RECORD] = blocks
            cls.__code_source_blocks_dict[code][SOURCE_TYPE_KPL_RECORD] = BlockMapManager().filter_blocks(blocks)
    def get_source_blocks(self, code):
        """
@@ -63,7 +83,7 @@
        """
        return self.__code_source_blocks_dict_origin.get(code)
    def get_intersection_blocks_info(self, code, blocks):
    def get_intersection_blocks_info(self, code, blocks, same_count=2):
        # 获取交集
        bs = []
        b1 = BlockMapManager().filter_blocks(blocks)
@@ -74,13 +94,13 @@
            for s in sb_dict:
                if sb_dict[s]:
                    bs.append(sb_dict[s])
        if len(bs) < 2:
        if len(bs) < same_count:
            return set(), bs
        s_count = len(bs)
        fblocks = set()
        # 求2个平台的交集
        for ces in combinations(bs, 2):
        for ces in combinations(bs, same_count):
            ic = None
            for c in ces:
                if ic is None:
@@ -188,16 +208,62 @@
        if blocks is None or len(blocks) == 0:
            return set()
        fbs = set()
        invalid_blocks = InvalidBlockManager().get_invalid_blocks()
        for block in blocks:
            if block.endswith("概念"):
                block = block[:-2]
            b = self.get_map_blocks_cache(block)
            if b:
                fbs |= b
            if block in invalid_blocks:
                continue
            fbs.add(block)
        return fbs
    def get_all_blocks(self):
        return self.__block_map.keys()
class InvalidBlockManager:
    """
    无效板块管理
    """
    __mysql = Mysqldb()
    __instance = None
    __block = set()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(InvalidBlockManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
    @classmethod
    def __load_data(cls):
        results = cls.__mysql.select_all("select _block from invalid_block")
        cls.__block.clear()
        for result in results:
            if result[0]:
                cls.__block.add(result[0])
    def get_invalid_blocks(self):
        """
        获取无效的板块
        @return:
        """
        return self.__block
    def set_incalid_blocks(self, blocks):
        """
        设置无效的板块
        @param blocks:
        @return:
        """
        # 先删除所有,然后再添加
        self.__mysql.execute("delete from invalid_block")
        for b in blocks:
            self.__mysql.execute(f"insert into invalid_block(_block) values('{b}')")
        self.__block = set(blocks)
def load_if_less(codes):
@@ -221,10 +287,21 @@
                pass
def __add_invlaid_blocks():
    blocks_str = """
    昨日连板
    昨日连板_含一字
    昨日涨停
    昨日涨停_含一字
        """
    blocks = set()
    for x in blocks_str.split("\n"):
        if x.strip():
            blocks.add(x.strip())
    print(len(blocks))
    InvalidBlockManager().set_incalid_blocks(blocks)
if __name__ == '__main__':
    code = "000761"
    try:
        blocks = middle_api_protocol.request(middle_api_protocol.get_third_blocks(code, 4))
        print(blocks)
    except Exception as e:
        print(e)
    pass