""" 开盘啦涨停数据管理 """ import copy from third_data import kpl_util, kpl_data_manager from third_data.history_k_data_manager import HistoryKDataManager from third_data.history_k_data_util import HistoryKDatasUtils from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager from utils import tool, init_data_util def get_current_limit_up_datas(day): """ 获取当时涨停的数据 @param day: @return: """ datas = kpl_data_manager.KPLDataManager.get_from_file_cache(kpl_util.KPLDataType.LIMIT_UP, day) return datas def get_history_limit_up_datas(day): """ 获取曾涨停的代码数据 @param day: @return: """ limit_up_records = kpl_data_manager.KPLLimitUpDataRecordManager.list_all(day) return limit_up_records def get_today_history_limit_up_datas_cache(): """ 获取今天的历史涨停数据 @return: """ return kpl_data_manager.KPLLimitUpDataRecordManager.total_datas class CodeLimitUpSequenceManager: """ 代码身位管理 """ # 首板身位 __first_block_sequence_dict = {} @classmethod def __get_code_blocks(cls, code): blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if not blocks: blocks = set() return blocks @classmethod def set_current_limit_up_datas(cls, current_limit_up_datas): """ 设置目前的涨停代码 @param current_limit_up_datas: @return: """ records = get_today_history_limit_up_datas_cache() # 按代码排序 # {"代码":(代码,涨停原因, 涨停时间, 几版)} current_code_block_dict = {x[0]: (x[0], x[5], x[2], x[4]) for x in current_limit_up_datas} record_code_block_dict = {x[3]: (x[3], x[2], x[5], x[12]) for x in records} # 根据涨停原因统计 # {"板块":{代码}} block_codes = {} limit_up_codes = set() for code in current_code_block_dict: bs = cls.__get_code_blocks(code) for b in bs: if b not in block_codes: block_codes[b] = set() block_codes[b].add(code) limit_up_codes.add(code) for code in record_code_block_dict: bs = cls.__get_code_blocks(code) for b in bs: if b not in block_codes: block_codes[b] = set() block_codes[b].add(code) # 获取上个交易日涨停的代码 yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() if yesterday_codes is None: yesterday_codes = set() temp_block_sequence_dict = {} for code in limit_up_codes: bs = cls.__get_code_blocks(code) for b in bs: # 计算身位 codes = block_codes[b] total_count = len(codes) # 统计真正涨停数 limit_up_count = 0 limit_up_codes_list = [] for c in codes: if c in limit_up_codes: limit_up_count += 1 if c not in yesterday_codes: limit_up_codes_list.append((c, current_code_block_dict[c][2])) # 获取首板代码的排位 limit_up_codes_list.sort(key=lambda x: x[1]) index = 1 for i in range(0, len(limit_up_codes_list)): if limit_up_codes_list[i][0] == code: index = i + 1 break if code not in temp_block_sequence_dict: temp_block_sequence_dict[code] = [] temp_block_sequence_dict[code].append((b, index, total_count, limit_up_count)) cls.__first_block_sequence_dict = temp_block_sequence_dict @classmethod def get_current_limit_up_sequence(cls, code): """ 获取代码当前的板块身位 @param code: @return:[(板块名称,身位,总涨停数量,目前涨停数量)] """ return cls.__first_block_sequence_dict.get(code) class LatestLimitUpBlockManager: """ 最近涨停的板块管理 """ # 看最近2天,不包含今天 __LATEST_DAY_COUNT = 2 __days = [] # 目前涨停 __current_limit_up_day_datas = {} # 曾涨停 __history_limit_up_day_datas = {} # K线数据 __k_datas = {} # 代码的最高涨幅 __k_max_rate = {} __code_name_dict = {} # 统计板块数据:{"day":{"板块":[(涨停数,破板数, 代码集合)]}} __block_day_datas = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(LatestLimitUpBlockManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __load_datas(cls): # 加载最近几天的数据 __days = HistoryKDatasUtils.get_latest_trading_date_cache(cls.__LATEST_DAY_COUNT) __days = copy.deepcopy(__days) # 不能包含今天 if __days[0] == tool.get_now_date_str(): __days.pop(0) cls.__days = __days # 加载之前6天的涨停,曾涨停,曾涨停代码的最近6天的K线 for day in __days: limit_up_records = get_history_limit_up_datas(day) cls.__history_limit_up_day_datas[day] = limit_up_records current_limit_up_datas = get_current_limit_up_datas(day) cls.__current_limit_up_day_datas[day] = current_limit_up_datas # 获取代码的k线 __total_codes = set() for d in cls.__current_limit_up_day_datas: __total_codes |= set([dd[3] for dd in cls.__history_limit_up_day_datas[d]]) # 获取最近7天的k线情况 for code in __total_codes: cls.__get_bars(code) # 统计前6天的板块信息 for day in __days: cls.__block_day_datas[day] = cls.__statistics_limit_up_block_infos_by_day(day) def set_current_limit_up_data(self, day, datas): self.__current_limit_up_day_datas[day] = datas self.__history_limit_up_day_datas[day] = get_today_history_limit_up_datas_cache() # 加载代码K线数据 __total_codes = set([d[0] for d in datas]) __total_codes |= set([d[3] for d in self.__history_limit_up_day_datas[day]]) for code in __total_codes: self.__get_bars(code) @classmethod def __statistics_limit_up_block_infos_by_day(cls, day): """ 统计涨停代码信息 @return:{"板块":(涨停数, 炸板数, {包含的代码})} """ # 统计板块的 current_code_dict = {d[0]: d for d in cls.__current_limit_up_day_datas[day]} block_codes_dict = {} for h in cls.__history_limit_up_day_datas[day]: # 将板块所包含的代码归类 code = h[3] cls.__code_name_dict[code] = h[4] blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if blocks: for b in blocks: if b not in block_codes_dict: block_codes_dict[b] = set() block_codes_dict[b].add(code) fdata = {} for b in block_codes_dict: limit_up_count = 0 open_limit_up_count = 0 for code in block_codes_dict[b]: if code in current_code_dict: limit_up_count += 1 else: open_limit_up_count += 1 fdata[b] = (limit_up_count, open_limit_up_count, block_codes_dict[b]) return fdata def statistics_limit_up_block_infos(self): """ 统计涨停板块数据 @return: """ # 板块出现的天数 block_count_dict = {} # 板块出现的代码 block_codes_dict = {} for day in self.__block_day_datas: for b in self.__block_day_datas[day]: finally_limit_up_count = self.__block_day_datas[day][b][0] if finally_limit_up_count < 3: continue # 板块涨停个数 if b not in block_count_dict: block_count_dict[b] = set() if b not in block_codes_dict: block_codes_dict[b] = set() block_count_dict[b].add(day) block_codes_dict[b] |= self.__block_day_datas[day][b][2] # [(板块,涨停日期集合)] block_count_list = [(k, block_count_dict[k]) for k in block_count_dict] block_count_list.sort(key=lambda x: len(x[1]), reverse=True) block_count_list = block_count_list[:50] # [(涨停原因,累计涨停次数,连续次数)] fdatas = [] for d in block_count_list: b = d[0] fdata = [b, len(d[1])] temp = [] max_continue_count = 0 for day in self.__days: if day not in self.__block_day_datas: continue if b in self.__block_day_datas[day]: finally_limit_up_count = self.__block_day_datas[day][b][0] else: finally_limit_up_count = 0 if b in self.__block_day_datas[day] and finally_limit_up_count >= 3: # 板块代码数量>=3个 temp.append(day) else: c = len(temp) if c > max_continue_count: max_continue_count = c temp.clear() c = len(temp) if c > max_continue_count: max_continue_count = c temp.clear() # 最大连续次数 fdata.append(max_continue_count) # 最高板 max_rate_info = None for code in block_codes_dict[d[0]]: if max_rate_info is None: max_rate_info = (code, self.__k_max_rate.get(code), self.__code_name_dict.get(code)) if max_rate_info[1] < self.__k_max_rate.get(code): max_rate_info = (code, self.__k_max_rate.get(code), self.__code_name_dict.get(code)) fdata.append(max_rate_info) # 统计今天这个板块中大于二板的代码数量 limit_up_counts = 0 fdata.append(limit_up_counts) # 获取每天的数量 days_datas = [] for day in self.__days: if day not in self.__block_day_datas: continue binfo = self.__block_day_datas[day].get(b) if not binfo: days_datas.append((0, 0)) else: days_datas.append((binfo[0], binfo[1])) fdata.append(days_datas) fdatas.append(fdata) return fdatas @classmethod def __get_bars(cls, code): """ 获取K线 @param code: @return: """ if code in cls.__k_datas: return cls.__k_datas[code] volumes_data = None if cls.__days: volumes_data = HistoryKDataManager().get_history_bars(code, cls.__days[1]) if volumes_data: volumes_data = volumes_data[:cls.__LATEST_DAY_COUNT] cls.__k_datas[code] = volumes_data if not volumes_data: volumes_data = init_data_util.get_volumns_by_code(code, cls.__LATEST_DAY_COUNT) if volumes_data: cls.__k_datas[code] = volumes_data # 获取最大涨幅 min_price = volumes_data[-1]["low"] for d in volumes_data: if min_price > d["low"]: min_price = d["low"] rate = int((volumes_data[0]["close"] - min_price) * 100 / min_price) cls.__k_max_rate[code] = rate return cls.__k_datas.get(code) if __name__ == "__main__": datas = LatestLimitUpBlockManager().statistics_limit_up_block_infos() print(datas)