""" 开盘啦板块工具 """ # 是否是强势板块 # current_limit_up_datas:实时涨停数据 (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额,涨停原因代码,涨停原因代码数量) import datetime import time import constant from utils import tool # 是否主板开1 # limit_up_record_datas 今日历史涨停 def get_shsz_open_limit_up_codes(code, block, limit_up_record_datas, code_block_dict): # 获取今日9:30的时间戳 time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00" timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S')) limit_up_codes = set() if limit_up_record_datas: for k in limit_up_record_datas: if code_block_dict.get(k[3]) == block: if int(k[5]) < timestamp: limit_up_codes.add(k[3]) return limit_up_codes # 获取主板开1且目前是涨停的代码 def get_shsz_open_limit_up_codes_current(code, block, current_limit_up_datas): # 获取今日9:30的时间戳 time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00" timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S')) limit_up_codes = set() for k in current_limit_up_datas: if k[5] == block: if int(k[2]) < timestamp: limit_up_codes.add(k[0]) return limit_up_codes # 代码是否是后排 def is_back_row(code, block, current_limit_up_datas): codes = set() for k in current_limit_up_datas: if k[5] == block: codes.add(k[0]) codes.discard(code) if len(codes) == 0: return False else: return True # 是否是前几的板块 # 板块中有主板涨停的才参与排序(排序时间按照板块中的涨停时间来排序) def __is_top_block(block, block_codes_infos, topn): block_limit_up_dict = {} for b in block_codes_infos: if b[1] not in block_limit_up_dict: block_limit_up_dict[b[1]] = [] block_limit_up_dict[b[1]].append(b) # 剔除只有非主板涨停的板块 invalid_blocks = [] for k in block_limit_up_dict: has_shsz = False for b in block_limit_up_dict[k]: if b[0].find('00') == 0 or b[0].find('60') == 0: has_shsz = True break if not has_shsz: invalid_blocks.append(k) for k in invalid_blocks: block_limit_up_dict.pop(k) # 每个板块涨停时间排序 invalid_blocks = [] for k in block_limit_up_dict: # 删除宽泛概念 if k in constant.KPL_INVALID_BLOCKS: invalid_blocks.append(k) continue block_limit_up_dict[k].sort(key=lambda x: x[2]) for k in invalid_blocks: block_limit_up_dict.pop(k) block_codes_infos = [block_limit_up_dict[k][0] for k in block_limit_up_dict] block_codes_infos.sort(key=lambda x: x[2]) # 去除通用涨停原因 index = 0 for d in block_codes_infos: if d[1] == block: if index + 1 <= topn: return True, block_codes_infos[:topn] else: return False, block_codes_infos[:topn] index += 1 if index <= topn: return True, block_codes_infos[:topn] return False, block_codes_infos[:topn] def is_record_top_block(code, block, limit_up_record_datas, yesterday_current_limit_up_codes, topn): block_codes_infos = [] limit_up_time = time.time() for k in limit_up_record_datas: # 判断是否是首板 if k[0] in yesterday_current_limit_up_codes: continue if k[3] != code: block_codes_infos.append((k[3], k[2], int(k[5]))) else: limit_up_time = int(k[5]) block_codes_infos.append((code, block, limit_up_time)) # 排序 return __is_top_block(block, block_codes_infos, topn) def is_current_top_block(code, block, current_limit_up_datas, yesterday_current_limit_up_codes, topn): block_codes_infos = [] limit_up_time = time.time() for k in current_limit_up_datas: # 判断是否是首板 if k[0] in yesterday_current_limit_up_codes: continue if k[0] != code: block_codes_infos.append((k[0], k[5], int(k[2]))) else: limit_up_time = int(k[2]) # 排序 block_codes_infos.append((code, block, limit_up_time)) # 排序 return __is_top_block(block, block_codes_infos, topn) # 获取当日历史身位 # shsz:是否主板 def get_code_record_rank(code, block, limit_up_record_datas, code_limit_up_reason_dict, yesterday_current_limit_up_codes, shsz=True): block_codes_infos = [] limit_up_time = time.time() for k in limit_up_record_datas: if k[3] == code: # 获取当前代码涨停时间 limit_up_time = int(k[5]) if shsz and not tool.is_shsz_code(k[3]): continue # 剔除高位板 if k[3] in yesterday_current_limit_up_codes: continue if code_limit_up_reason_dict.get(k[3]) == block: if k[3] != code: block_codes_infos.append((k[3], int(k[5]))) block_codes_infos.append((code, limit_up_time)) block_codes_infos.sort(key=lambda x: x[1]) front_codes = [] for i in range(0, len(block_codes_infos)): if block_codes_infos[i][0] == code: return i, front_codes else: front_codes.append(block_codes_infos[i][0]) return 0, [] # 获取当日实时身位 # before_blocks_dict格式位{"代码":set("板块")} def get_code_current_rank(code, block, current_limit_up_datas, code_limit_up_reasons_dict, yesterday_current_limit_up_codes, exclude_codes, open_limit_up_count, shsz=False, limit_up_time=time.time()): block_codes_infos = [] for k in current_limit_up_datas: if k[0] == code: # 获取当前代码涨停时间 limit_up_time = int(k[2]) if shsz and not tool.is_shsz_code(k[0]): continue # 剔除高位板 if k[0] in yesterday_current_limit_up_codes: continue if code_limit_up_reasons_dict.get(k[0]) and block in code_limit_up_reasons_dict.get(k[0]): if k[0] != code: # 代码.涨停时间 block_codes_infos.append((k[0], int(k[2]))) block_codes_infos.append((code, limit_up_time)) block_codes_infos.sort(key=lambda x: x[1]) front_codes = [] first_count = 0 for i in range(0, len(block_codes_infos)): if i == open_limit_up_count and exclude_codes and block_codes_infos[i][0] in exclude_codes: # 非开1老大被排除 first_count += 1 continue if block_codes_infos[i][0] == code: return i - first_count, front_codes else: front_codes.append(block_codes_infos[i][0]) return 0, [] if __name__ == "__main__": pass