""" 开盘啦板块工具 """ # 是否是强势板块 # current_limit_up_datas:实时涨停数据 (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额,涨停原因代码,涨停原因代码数量) import datetime import time from utils import tool # latest_2_day_limit_up_datas:最近2天涨停数据 def is_strong_block(block, current_limit_up_datas, latest_2_day_limit_up_datas): # 是否开1,且尚未炸板 if current_limit_up_datas: # 获取今日9:30的时间戳 time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00" timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S')) for k in current_limit_up_datas: if k[5] == block: if int(k[2]) < timestamp: return True, "开一" elif k[0].find("30") == 0 or k[0].find("68") == 0: return True, "20cm涨停" # 板块有二板且(3天内板块内无高于二板的票) has_continue_limit_up = False for k in current_limit_up_datas: if k[5] == block: if k[4] == "2连板": has_continue_limit_up = True break total_limit_up_datas = list(latest_2_day_limit_up_datas) total_limit_up_datas.extend(current_limit_up_datas) for k in total_limit_up_datas: if k[5] == block: if k[4].find("连板") > 0 and int(k[4][:1]) > 2: has_continue_limit_up = False break if has_continue_limit_up: return True, "板块有二板且3天内板块内无高于二板" return False, "" # 是否是猛拉板块 # limit_up_record_datas 今日历史涨停 def is_soon_limit_up(code, block, limit_up_record_datas): block_codes_infos = [] limit_up_time = time.time() for k in limit_up_record_datas: if k[2] == block: if k[3] != code: block_codes_infos.append((k[3], int(k[5]))) else: limit_up_time = int(k[5]) # 排序 block_codes_infos.append((code, limit_up_time)) block_codes_infos.sort(key=lambda x: x[1]) if len(block_codes_infos) < 2: return False, "" if block_codes_infos[1][1] - block_codes_infos[0][1] < 30 * 60: # 首次涨停时间间隔30分钟内 return True, f"板块:{block} 龙1:{block_codes_infos[0][0]} 龙2:{block_codes_infos[1][0]}" return False, "" # 代码是否是后排 def is_back_row(code, block, current_limit_up_datas): codes = set() for k in current_limit_up_datas: if k[5] == block: codes.add(k[0]) codes.discard(code) if len(codes) == 0: return False else: return True # 获取主板身位 def get_sh_sz_code_rank(code, block, limit_up_record_datas): block_codes_infos = [] limit_up_time = time.time() for k in limit_up_record_datas: if k[3].find("00") != 0 and k[3].find("60") != 0: continue if k[2] == block: if k[3] != code: block_codes_infos.append((k[3], int(k[5]))) else: limit_up_time = int(k[5]) block_codes_infos.append((code, limit_up_time)) block_codes_infos.sort(key=lambda x: x[1]) for i in range(0, len(block_codes_infos)): if block_codes_infos[i][0] == code: return i return 0 # 获取身位 def get_code_rank(code, block, limit_up_record_datas): block_codes_infos = [] limit_up_time = time.time() for k in limit_up_record_datas: if k[2] == block: if k[3] != code: block_codes_infos.append((k[3], int(k[5]))) else: limit_up_time = int(k[5]) block_codes_infos.append((code, limit_up_time)) block_codes_infos.sort(key=lambda x: x[1]) for i in range(0, len(block_codes_infos)): if block_codes_infos[i][0] == code: return i return 0 if __name__ == "__main__": pass