""" 三方板块管理 """ from db.mysql_data_delegate import Mysqldb SOURCE_TYPE_KPL = 1 # 开盘啦 SOURCE_TYPE_TDX = 2 # 通达信 SOURCE_TYPE_THS = 3 # 同花顺 SOURCE_TYPE_EASTMONEY = 4 # 东方财富 SOURCE_TYPE_KPL_RECORD = 5 # 开盘啦历史数据 class BlockMapManager: """ 板块映射管理 """ __mysql = Mysqldb() __instance = None __block_map = {} def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(BlockMapManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __load_data(cls): results = cls.__mysql.select_all("select origin_block,blocks from block_map") cls.__block_map.clear() for result in results: cls.__block_map[result[0]] = set(result[1].split("、")) def set_block_map(self, origin_block, blocks): if not blocks: blocks = {origin_block} blocks_str = "、".join(blocks) result = self.__mysql.select_one(f"select * from block_map where origin_block='{origin_block}'") if result: # 更新 self.__mysql.execute( f"update block_map set blocks='{blocks_str}', update_time=now() where origin_block='{origin_block}'") else: self.__mysql.execute( f"insert into block_map(origin_block, blocks, create_time) values('{origin_block}','{blocks_str}', now())") def get_map_blocks_cache(self, block): """ 获取映射好的板块 @param block: @return: """ return self.__block_map.get(block) def filter_blocks(self, blocks): """ 批量过滤板块 @param blocks: @return: """ if blocks is None or len(blocks) == 0: return set() fbs = set() invalid_blocks = InvalidBlockManager().get_invalid_blocks() for block in blocks: if block.endswith("概念"): block = block[:-2] b = self.get_map_blocks_cache(block) if b: fbs |= b if block in invalid_blocks: continue fbs.add(block) return fbs def get_all_blocks(self): return self.__block_map.keys() class InvalidBlockManager: """ 无效板块管理 """ __mysql = Mysqldb() __instance = None __block = set() def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(InvalidBlockManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __load_data(cls): results = cls.__mysql.select_all("select _block from invalid_block") cls.__block.clear() for result in results: if result[0]: cls.__block.add(result[0]) def get_invalid_blocks(self): """ 获取无效的板块 @return: """ return self.__block def set_incalid_blocks(self, blocks): """ 设置无效的板块 @param blocks: @return: """ # 先删除所有,然后再添加 self.__mysql.execute("delete from invalid_block") for b in blocks: self.__mysql.execute(f"insert into invalid_block(_block) values('{b}')") self.__block = set(blocks) if __name__ == '__main__': pass