""" 开盘啦板块管理 """ import json import constant from db.mysql_data_delegate import Mysqldb from third_data import kpl_api, kpl_util from utils import tool class KPLCodeJXBlocksManager: """ 开盘啦精选板块管理 """ def __init__(self, day, target_codes): self.day = day self.mysql_db = Mysqldb() self.target_codes = target_codes def __download_blocks(self, code): datas = kpl_api.getCodeJingXuanBlocks(code) blocks = json.dumps([kpl_util.filter_block(x[1]) for x in datas], ensure_ascii=False) block_ids = json.dumps([x[0] for x in datas], ensure_ascii=False) block_details = json.dumps(datas, ensure_ascii=False) id = f"{self.day}_{code}" self.mysql_db.execute( f"insert into kpl_code_blocks(id,code, day,jx_blocks, jx_block_ids, jx_blocks_detail, create_time) values('{id}','{code}','{self.day}','{blocks}', '{block_ids}','{block_details}', now())") def start_download_blocks(self): codes = self.mysql_db.select_all(f"select code from kpl_code_blocks where day='{self.day}'") codes = set([x[0] for x in codes]) need_update_codes = set(self.target_codes) - codes for code in need_update_codes: self.__download_blocks(code) def get_all_code_blocks(self): """ 获取所有代码的板块 @return: """ sql = f"select code, jx_blocks from kpl_code_blocks where day = '{self.day}'" results = self.mysql_db.select_all(sql) return {x[0]: set(json.loads(x[1])) - constant.KPL_INVALID_BLOCKS for x in results} def get_all_code_blocks_count(self): """ 获取所有代码的板块的数量 @return: """ sql = f"select count(*) from kpl_code_blocks where day = '{self.day}'" results = self.mysql_db.select_one(sql) return int(results[0])