""" 板块辨识度票管理 """ import difflib import constant from db import mysql_data_delegate as mysql_data from log_module.log import logger_plate_special_codes from third_data import kpl_util from third_data.history_k_data_util import HistoryKDatasUtils from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager from utils import tool @tool.singleton class BlockSpecialCodesManager: __block_codes_dict = {} __code_blocks_dict = {} __temp_block_codes_dict = {} __temp_code_blocks_dict = {} def __init__(self): self.mysql = mysql_data.Mysqldb() self.__load_data() def __load_data(self): results = self.mysql.select_all("select _block,_code from block_special_codes") temp_dict = {} for row in results: block, code = row[0], row[1] if block not in temp_dict: temp_dict[block] = set() if code not in self.__code_blocks_dict: self.__code_blocks_dict[code] = set() self.__code_blocks_dict[code].add(block) temp_dict[block].add(code) self.__block_codes_dict = temp_dict # 保存辨识度的日志 try: logger_plate_special_codes.info(f"{temp_dict}") except: pass def get_block_codes(self, block): return self.__block_codes_dict.get(block, set()) | self.__temp_block_codes_dict.get(block, set()) def get_code_blocks(self, code): return self.__code_blocks_dict.get(code, set()) | self.__temp_code_blocks_dict.get(code, set()) def get_code_blocks_dict(self): fdict = {} for code in self.__code_blocks_dict: fdict[code] = set(self.__code_blocks_dict.get(code)) for code in self.__temp_code_blocks_dict: if code not in fdict: fdict[code] = set() fdict[code] |= self.__temp_code_blocks_dict.get(code) return fdict def get_temp_code_blocks_dict(self): fdict = {} for code in self.__temp_code_blocks_dict: if code not in fdict: fdict[code] = set() fdict[code] |= self.__temp_code_blocks_dict.get(code) return fdict def set_code_blocks_for_temp(self, code, blocks): """ 临时添加代码的辨识度板块 @param code: 带啊吗 @param blocks: 板块 @return: """ old_blocks = self.__temp_code_blocks_dict.get(code, set()) self.__temp_code_blocks_dict[code] = set(blocks) add_blocks = blocks - old_blocks del_blocks = old_blocks - blocks if add_blocks: for b in add_blocks: if b not in self.__temp_block_codes_dict: self.__temp_block_codes_dict[b] = set() self.__temp_block_codes_dict[b].add(code) if del_blocks: for b in del_blocks: if b not in self.__temp_block_codes_dict: self.__temp_block_codes_dict[b] = set() self.__temp_block_codes_dict[b].discard(code) if add_blocks or del_blocks: trade_record_log_util.add_temp_special_codes(code, f"新题材辨识度设置:{blocks}") def set_block_codes_list(self, datas): """ 设置数据 @param datas:[(板块,代码,代码名称,涨停次数,自由市值)] @return: """ self.mysql.execute("delete from block_special_codes") for d in datas: sql = f"insert into block_special_codes(_block,_code,_code_name,_limit_up_count,_zyltgb,_create_time) values('{d[0]}', '{d[2]}', '{d[1]}', {d[3]}, {d[4]}, now())" self.mysql.execute(sql) class AnalysisBlockSpecialCodesManager: __mysql = mysql_data.Mysqldb() def __get_code_blocks(self, min_day, max_day): results = self.__mysql.select_all( f"SELECT r.`_hot_block_name`, r.`_code`, COUNT(*) FROM kpl_limit_up_record r WHERE r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' group by r.`_hot_block_name`, r.`_code`") return results def __list_code_blocks(self, min_day, max_day): results = self.__mysql.select_all( f"SELECT r.`_hot_block_name`, r.`_code`, r.`_day`, r.`_open` FROM kpl_limit_up_record r WHERE r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%'") return results def __get_limit_up_info(self, min_day): sql = f"SELECT r.`_code`, COUNT(r.`_code`),r.`_code_name`,IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 ) FROM (SELECT * FROM kpl_limit_up_record r ORDER BY r.`_create_time` DESC) r WHERE r.`_day`>'{min_day}' GROUP BY r.`_code`" results = self.__mysql.select_all(sql) # {"代码":(涨停次数, 名称, 自由流通市值)} return {x[0]: (x[1], x[2], x[3]) for x in results} def __get_top(self, block, min_day, max_day): sql = f"SELECT r.`_code`,r.`_code_name`, COUNT(*), IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 ) FROM (SELECT * FROM kpl_limit_up_record r ORDER BY r.`_create_time` DESC) r WHERE r.`_hot_block_name`='{block}' AND r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' GROUP BY r.`_code`" results = self.__mysql.select_all(sql) results = list(results) info_map = {x[1]: x for x in results} max_count = min(10, len(results) // 2) results.sort(key=lambda x: x[2], reverse=True) names1 = set([x[0] for x in results[:max_count]]) results.sort(key=lambda x: x[3], reverse=True) names2 = set([x[0] for x in results[:max_count]]) fnames = names1 & names2 return fnames def __get_block_map(self): """ 获取板块裂变 @return: """ constant.get_path_prefix() with open(f"{constant.get_path_prefix()}/板块对应.txt", encoding="utf-8") as f: lines = f.readlines() block_map = {} for line in lines: line = line.strip() if line: line = line.replace(",", ",").replace("丨", "|") parent_blocks = set() if line.find("|") >= 0: sts = line.split("|") parent_blocks |= set(sts[0].split(",")) line = sts[1] blocks = line.split(",") if not blocks: continue if not parent_blocks: parent_blocks.add(blocks[0]) for b in blocks: b = b.strip() if b not in block_map: block_map[b] = set() block_map[b] |= parent_blocks if len(parent_blocks) > 1: # parent加入 for b in parent_blocks: if b not in block_map: block_map[b] = set() block_map[b].add(b) return block_map def test_block_map(self): print(self.__get_block_map()) def get_block_special_codes(self): """ 获取板块有辨识度的代码 @return: """ def statistic_continue_target_codes(): """ 统计连续涨停的辨识度代码代码 @return: """ dates = HistoryKDatasUtils.get_latest_trading_date_cache(120) code_days_map = {} for d in code_block_infos: if d[3]: # 不看炸板 continue if d[1] not in code_days_map: code_days_map[d[1]] = set() code_days_map[d[1]].add(d[2]) target_codes_info = set() for code in code_days_map: # if code != '002400': # continue # 且有3天属于连续涨停 day_list = list(code_days_map[code]) day_list.sort(reverse=True) step = 4 matcher = difflib.SequenceMatcher(None, dates, day_list) match = matcher.find_longest_match(0, len(dates), 0, len(day_list)) if match.size < step: continue target_codes_info.add((code, match.size)) print(f"大于4板:{target_codes_info}") continue_count_dict = {x[0]: x[1] for x in target_codes_info} # 统计板块的代码 block_codes_2 = {} for code_info in target_codes_info: code = code_info[0] bs = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if bs: for bb in bs: if bb not in block_codes_2: block_codes_2[bb] = set() block_codes_2[bb].add(code) for k in block_codes_2: block_codes_2[k] = list(block_codes_2[k]) block_codes_2[k].sort(key=lambda x: continue_count_dict[x], reverse=True) block_codes_2[k] = block_codes_2[k][:3] return block_codes_2 trading_dates = HistoryKDatasUtils.get_latest_trading_date(9) max_day = trading_dates[0] min_day = tool.date_sub(max_day, 120) block_map = self.__get_block_map() # 统计最近120天涨停数据 # [(板块名称,代码, 日期, 是否炸板)] code_block_infos = self.__list_code_blocks(min_day, max_day) # 统计代码的涨停次数 limit_up_count_dict = {} for d in code_block_infos: if d[1] not in limit_up_count_dict: limit_up_count_dict[d[1]] = set() limit_up_count_dict[d[1]].add(d[2]) limit_up_count_dict = {c: len(limit_up_count_dict[c]) for c in limit_up_count_dict} # 拥有连板高度的辨识度:{"板块":[代码1,代码2]} continue_target_block_codes = statistic_continue_target_codes() # 统计最近8天的涨停数据 min_day = trading_dates[-1] code_block_infos_8 = self.__get_code_blocks(min_day, max_day) # 统计最近8天涨停次数大于3次的涨停代码 # 格式:{"代码":涨停次数} count_dict = {} filter_codes = set() for d in code_block_infos_8: if d[1] not in count_dict: count_dict[d[1]] = 0 count_dict[d[1]] += d[2] if count_dict[d[1]] > 3: filter_codes.add(d[1]) count_dict.clear() code_block_infos_8.clear() # 删除数据 temp_data_dict = {} for d in code_block_infos: if d[1] in filter_codes and int(d[2].replace("-", "")) > int(min_day.replace("-", "")): # 需要过滤且最近8天涨停 continue block, code = d[0], d[1] k = f"{block}#{code}" if k not in temp_data_dict: temp_data_dict[k] = [block, code, 0] temp_data_dict[k][2] += 1 code_block_infos = [temp_data_dict[k] for k in temp_data_dict] min_day = tool.date_sub(max_day, 120) code_block_dict = {} # {"代码":{"板块": 涨停次数}} for b in code_block_infos: if b[1] not in code_block_dict: code_block_dict[b[1]] = {} bs = block_map.get(b[0]) if not bs: bs = {b[0]} for bb in bs: if bb not in code_block_dict[b[1]]: code_block_dict[b[1]][bb] = 0 # if tool.is_ge_code(b[1]): # # 创业板1次涨停算2次涨停 # code_block_dict[b[1]][bb] += b[2] * 2 # else: code_block_dict[b[1]][bb] += b[2] block_codes_dict = {} # {"板块":[(代码,涨停次数)]} for code in code_block_dict: for b in code_block_dict[code]: if b not in block_codes_dict: block_codes_dict[b] = [] block_codes_dict[b].append((code, code_block_dict[code][b])) # 统计最近120天的涨停数据 {"代码": (涨停次数, 名称, 自由流通市值)} limit_up_info_map = self.__get_limit_up_info(min_day) fdatas = [] for b in block_codes_dict: # if b != '机器人': # continue if b in constant.KPL_INVALID_BLOCKS: continue code_info_list = block_codes_dict[b] # 取自由市值的前1/3 和涨停次数最多的2个代码 code_info_list.sort(key=lambda x: float(limit_up_info_map[x[0]][2]), reverse=True) max_count = len(code_info_list) // 3 zylt_list = code_info_list[:max_count] # 取涨停次数最多的 temp_codes = [x[0] for x in zylt_list] code_info_list.sort(key=lambda x: x[1], reverse=True) limit_up_count_max = code_info_list[:2] for i in range(2, len(code_info_list)): if code_info_list[i][1] == limit_up_count_max[-1][1]: limit_up_count_max.append(code_info_list[i]) else: break rank2_codes = set() for d in limit_up_count_max: if d[1] >= 6: # 涨停次数大于6次 if d[0] not in temp_codes: zylt_list.append(d) rank2_codes.add(d[0]) # 添加连板辨识度 continue_codes = continue_target_block_codes.get(b) if continue_codes: for c in continue_codes: if int(float(limit_up_info_map[c][2])) < 50: continue zylt_list.append((c, limit_up_count_dict.get(c))) # 按照股票涨停次数排序 zylt_list.sort(key=lambda x: x[1], reverse=True) zylt_list = [x[0] for x in zylt_list] index = 0 # 获取股价,是否是ST if not zylt_list: continue juejin_results = HistoryKDatasUtils.get_gp_latest_info(zylt_list, fields="sec_id,sec_level,upper_limit") if juejin_results is None: continue juejin_result_dict = {x['sec_id']: (x['sec_id'], x['sec_level'], x['upper_limit']) for x in juejin_results} codes = set() for code in zylt_list: if code in codes: continue if limit_up_count_dict[code] <= 3: # 累计涨停次数小于3次 continue if code not in juejin_result_dict: # 查询不到信息 continue if juejin_result_dict[code][1] != 1: # 非正常票 continue if juejin_result_dict[code][2] < 3 or juejin_result_dict[code][2] > 50: # 小于3块/大于50块 continue index += 1 if int(float(limit_up_info_map[code][2])) < 50 and code not in rank2_codes: continue # [(板块, 代码名称, 代码, 涨停次数, 自由市值)] fdatas.append( (kpl_util.filter_block(b), limit_up_info_map[code][1], code, limit_up_count_dict[code], int(float(limit_up_info_map[code][2])), limit_up_count_dict.get(code))) codes.add(code) if index >= 10: break # BlockSpecialCodesManager().set_block_codes_list(fdatas) return fdatas def update_block_special_codes(): datas = AnalysisBlockSpecialCodesManager().get_block_special_codes() for d in datas: print(d) BlockSpecialCodesManager().set_block_codes_list(datas) return len(datas) if __name__ == "__main__": # print(datas) datas = AnalysisBlockSpecialCodesManager().get_block_special_codes() # print(datas) # for d in datas: # print(d) # BlockSpecialCodesManager().set_block_codes_list(datas) # print(BlockSpecialCodesManager().get_code_blocks("002582"))