"""
|
三方板块管理
|
"""
|
|
from db.mysql_data_delegate import Mysqldb
|
|
SOURCE_TYPE_KPL = 1 # 开盘啦
|
SOURCE_TYPE_TDX = 2 # 通达信
|
SOURCE_TYPE_THS = 3 # 同花顺
|
SOURCE_TYPE_EASTMONEY = 4 # 东方财富
|
SOURCE_TYPE_KPL_RECORD = 5 # 开盘啦历史数据
|
|
|
class BlockMapManager:
|
"""
|
板块映射管理
|
"""
|
__mysql = Mysqldb()
|
__instance = None
|
__block_map = {}
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(BlockMapManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_data()
|
return cls.__instance
|
|
@classmethod
|
def __load_data(cls):
|
results = cls.__mysql.select_all("select origin_block,blocks from block_map")
|
cls.__block_map.clear()
|
for result in results:
|
cls.__block_map[result[0]] = set(result[1].split("、"))
|
|
def set_block_map(self, origin_block, blocks):
|
if not blocks:
|
blocks = {origin_block}
|
blocks_str = "、".join(blocks)
|
result = self.__mysql.select_one(f"select * from block_map where origin_block='{origin_block}'")
|
if result:
|
# 更新
|
self.__mysql.execute(
|
f"update block_map set blocks='{blocks_str}', update_time=now() where origin_block='{origin_block}'")
|
else:
|
self.__mysql.execute(
|
f"insert into block_map(origin_block, blocks, create_time) values('{origin_block}','{blocks_str}', now())")
|
|
def get_map_blocks_cache(self, block):
|
"""
|
获取映射好的板块
|
@param block:
|
@return:
|
"""
|
return self.__block_map.get(block)
|
|
def filter_blocks(self, blocks):
|
"""
|
批量过滤板块
|
@param blocks:
|
@return:
|
"""
|
if blocks is None or len(blocks) == 0:
|
return set()
|
fbs = set()
|
invalid_blocks = InvalidBlockManager().get_invalid_blocks()
|
for block in blocks:
|
if block.endswith("概念"):
|
block = block[:-2]
|
b = self.get_map_blocks_cache(block)
|
if b:
|
fbs |= b
|
if block in invalid_blocks:
|
continue
|
fbs.add(block)
|
return fbs
|
|
def get_all_blocks(self):
|
return self.__block_map.keys()
|
|
|
class InvalidBlockManager:
|
"""
|
无效板块管理
|
"""
|
__mysql = Mysqldb()
|
__instance = None
|
__block = set()
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(InvalidBlockManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_data()
|
return cls.__instance
|
|
@classmethod
|
def __load_data(cls):
|
results = cls.__mysql.select_all("select _block from invalid_block")
|
cls.__block.clear()
|
for result in results:
|
if result[0]:
|
cls.__block.add(result[0])
|
|
def get_invalid_blocks(self):
|
"""
|
获取无效的板块
|
@return:
|
"""
|
return self.__block
|
|
def set_incalid_blocks(self, blocks):
|
"""
|
设置无效的板块
|
@param blocks:
|
@return:
|
"""
|
# 先删除所有,然后再添加
|
self.__mysql.execute("delete from invalid_block")
|
for b in blocks:
|
self.__mysql.execute(f"insert into invalid_block(_block) values('{b}')")
|
self.__block = set(blocks)
|
|
|
if __name__ == '__main__':
|
pass
|