Administrator
2024-10-12 0bca60accd9c3bb4d2fe6123d41db6be6ab2273d
third_data/third_blocks_manager.py
@@ -208,17 +208,62 @@
        if blocks is None or len(blocks) == 0:
            return set()
        fbs = set()
        invalid_blocks = InvalidBlockManager().get_invalid_blocks()
        for block in blocks:
            if block.endswith("概念"):
                block = block[:-2]
            # b = self.get_map_blocks_cache(block)
            # if b:
            #     fbs |= b
            if block in invalid_blocks:
                continue
            fbs.add(block)
        return fbs
    def get_all_blocks(self):
        return self.__block_map.keys()
class InvalidBlockManager:
    """
    无效板块管理
    """
    __mysql = Mysqldb()
    __instance = None
    __block = set()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(InvalidBlockManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
    @classmethod
    def __load_data(cls):
        results = cls.__mysql.select_all("select _block from invalid_block")
        cls.__block.clear()
        for result in results:
            if result[0]:
                cls.__block.add(result[0])
    def get_invalid_blocks(self):
        """
        获取无效的板块
        @return:
        """
        return self.__block
    def set_incalid_blocks(self, blocks):
        """
        设置无效的板块
        @param blocks:
        @return:
        """
        # 先删除所有,然后再添加
        self.__mysql.execute("delete from invalid_block")
        for b in blocks:
            self.__mysql.execute(f"insert into invalid_block(_block) values('{b}')")
        self.__block = set(blocks)
def load_if_less(codes):
@@ -243,9 +288,16 @@
if __name__ == '__main__':
    code = "000761"
    try:
        blocks = middle_api_protocol.request(middle_api_protocol.get_third_blocks(code, 4))
        print(blocks)
    except Exception as e:
        print(e)
    blocks_str = """
昨日连板
昨日连板_含一字
昨日涨停
昨日涨停_含一字
    """
    blocks = set()
    for x in blocks_str.split("\n"):
        if x.strip():
            blocks.add(x.strip())
    print(len(blocks))
    print(len(InvalidBlockManager().get_invalid_blocks()))