""" 板块信息 """ import datetime import constant from utils import tool from third_data import kpl_util from third_data.history_k_data_util import HistoryKDatasUtils from third_data.kpl_data_manager import KPLLimitUpDataRecordManager, KPLDataManager from third_data import kpl_data_manager __before_block_dict = {} __kplDataManager = KPLDataManager() def __load_before_block(code): if code not in __before_block_dict: blocks = KPLLimitUpDataRecordManager.get_latest_blocks_set(code) if blocks: __before_block_dict[code] = blocks return __before_block_dict.get(code) # 获取之前的板块 def get_before_blocks(code): return __before_block_dict.get(code) # 获取之前的代码-板块字典 def get_before_blocks_dict(): return __before_block_dict def __get_code_from_code_info(code_info): code = code_info[0][1].split(".")[0] return code # (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额) def __get_blocks(code, limit_up_datas, filter=True): blocks = [] if limit_up_datas: for data in limit_up_datas: if data[0] == code: block = data[5] if block in constant.KPL_INVALID_BLOCKS and filter: continue blocks.append(block) return blocks def __is_has_high_code(block, latest_datas): for data in latest_datas: if block == data[5]: if data[4] != "首板": return True, data return False, None __blocks_dict = {} def get_latest_block(day_count=15): now_day = tool.get_now_date_str() if now_day in __blocks_dict: return __blocks_dict[now_day] now_date = datetime.datetime.now() end_date = HistoryKDatasUtils.get_previous_trading_date_cache(tool.get_now_date_str()) start_date = now_date - datetime.timedelta(days=(day_count * 2 + 10)) start_date = start_date.strftime("%Y-%m-%d") days = HistoryKDatasUtils.get_trading_dates(start_date, end_date) days = days[0 - day_count:] results = KPLLimitUpDataRecordManager.list_blocks_with_day(days) __blocks_dict[now_day] = results return results # 获取代码所属板块 def get_code_blocks(code): latest_datas = __kplDataManager.get_data(kpl_util.KPLDataType.LIMIT_UP) blocks = __get_blocks(code, latest_datas) return blocks def get_target_block_info(code, filter=False): latest_datas = __kplDataManager.get_data(kpl_util.KPLDataType.LIMIT_UP) blocks = __get_blocks(code, latest_datas, filter) if not blocks: blocks = __load_before_block(code) # 获取目标板块 target_block = None if blocks: for block in blocks: if block in constant.KPL_INVALID_BLOCKS and filter: continue target_block = block break if not target_block: return None return target_block # 获取代码所在板块信息 def get_info(code): latest_datas = __kplDataManager.get_data(kpl_util.KPLDataType.LIMIT_UP) blocks = __get_blocks(code, latest_datas) if not blocks: blocks = __load_before_block(code) latest_blocks_info = get_latest_block(15) # 统计板块出现次数 # 获取目标板块 target_block = None if blocks: for block in blocks: if block in constant.KPL_INVALID_BLOCKS: continue target_block = block break if not target_block: return None # 统计板块中的平均涨幅 total_rates = 0 total_count = 0 target_block_rate = 0 # if latest_datas: # for b in latest_datas: # if b[5] != target_block: # continue # if b[1]: # target_block_rate = float(b[1].strip().replace("%", '')) # for code_info in b[2]: # code__ = __get_code_from_code_info(code_info) # rate = float(code_info[3].strip().replace("%", '')) # if code__ != code: # total_rates += rate # total_count += 1 # 统计板块出现的次数 target_block_histry_count = 0 if blocks: for block_info in latest_blocks_info: if block_info[0] != target_block: continue target_block_histry_count += 1 # 是否出现过高位板 high_block_infos = [] for block in blocks: if block in constant.KPL_INVALID_BLOCKS: continue if latest_datas: has_high, high_code_info = __is_has_high_code(block, latest_datas) if has_high: high_block_info = (high_code_info[1], high_code_info[4]) high_block_infos.append(high_block_info) limit_up_codes_info_set = set() # 板块下的代码数量 block_codes_set = set() if KPLLimitUpDataRecordManager.total_datas is None: KPLLimitUpDataRecordManager.load_total_datas() if KPLLimitUpDataRecordManager.total_datas: for d in KPLLimitUpDataRecordManager.total_datas: if d[2] == target_block: code_ = d[3] limit_up_codes_info_set.add((code_, d[5])) block_codes_set.add(code_) elif d[3] == code: limit_up_codes_info_set.add((d[3], d[5])) # 获取涨停的顺序 limit_up_index = -1 limit_up_codes_info_list = list(limit_up_codes_info_set) limit_up_codes_info_list.sort(key=lambda x: x[1]) for i in range(0, len(limit_up_codes_info_list)): if limit_up_codes_info_list[i][0] == code: limit_up_index = i if limit_up_index < 0: limit_up_index = len(limit_up_codes_info_list) # 涨停代码集合 limit_up_codes_set = set([k[0] for k in limit_up_codes_info_list]) limit_up_codes_set.discard(code) block_codes_set.discard(code) limit_up_codes_count = len(block_codes_set) # 获取炸板数据 open_limit_up_datas = __kplDataManager.get_data(kpl_util.KPLDataType.OPEN_LIMIT_UP) break_codes = set() re_limit_codes = set() # 炸板: # (代码,名称,涨幅,板块,实际流通) if open_limit_up_datas: for data in open_limit_up_datas: blocks = set(data[3].split("、")) if target_block not in blocks: continue code_ = data[0] break_codes.add(code_) # 统计回封 if latest_datas: for data in latest_datas: if data[5] != target_block: continue # 回封 if data[2] != data[3]: re_limit_codes.add(data[0]) # 排除自己 break_codes.discard(code) # 排除已经涨停的代码 break_codes = break_codes.difference(limit_up_codes_set) # 炸板个数 break_size = len(break_codes) # 炸板回封数量 re_limit_up_size = len(re_limit_codes) fresult = { # 目标板块信息(板块名称,板块涨幅,历史板块出现次数) "target_block_info": (target_block, target_block_rate, target_block_histry_count), # 涨停顺序 "limit_up_index": limit_up_index, # 涨停代码数量 "limit_up_codes_count": limit_up_codes_count, # 板块代码涨幅信息 "block_codes_rates_info": (total_rates, total_count), # 炸板代码数量 "break_size": break_size, # 炸板回封数量 "re_limit_up_size": re_limit_up_size, # 高位版信息 "high_block_infos": high_block_infos, } return fresult # 初始化板块数据 def init(): # 加载数据到内存中 kpl_data_manager.get_current_limit_up_data_records(10) # 加载最近数据到内存中 limit_up_datas = kpl_data_manager.KPLDataManager().get_data(kpl_util.KPLDataType.LIMIT_UP) if limit_up_datas: kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas = limit_up_datas def init_code(code): # 加载历史涨停原因 __load_before_block(code) if __name__ == "__main__": get_info("603133")