| | |
| | | |
| | | # 是否主板开1 |
| | | # limit_up_record_datas 今日历史涨停 |
| | | def is_shsz_open_limit_up(code, block, limit_up_record_datas, code_block_dict): |
| | | def get_shsz_open_limit_up_codes(code, block, limit_up_record_datas, code_block_dict): |
| | | # 获取今日9:30的时间戳 |
| | | time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00" |
| | | timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S')) |
| | | for k in limit_up_record_datas: |
| | | if code_block_dict.get(k[3]) == block: |
| | | if int(k[5]) < timestamp: |
| | | return True, f"{k[3]}开一" |
| | | return False, "" |
| | | limit_up_codes = set() |
| | | if limit_up_record_datas: |
| | | for k in limit_up_record_datas: |
| | | if code_block_dict.get(k[3]) == block: |
| | | if int(k[5]) < timestamp: |
| | | limit_up_codes.add(k[3]) |
| | | return limit_up_codes |
| | | |
| | | |
| | | # 获取主板开1且目前是涨停的代码 |
| | | def get_shsz_open_limit_up_codes_current(code, block, current_limit_up_datas): |
| | | # 获取今日9:30的时间戳 |
| | | time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00" |
| | | timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S')) |
| | | limit_up_codes = set() |
| | | for k in current_limit_up_datas: |
| | | if k[5] == block: |
| | | if int(k[2]) < timestamp: |
| | | limit_up_codes.add(k[0]) |
| | | return limit_up_codes |
| | | |
| | | |
| | | # 代码是否是后排 |
| | |
| | | for k in block_limit_up_dict: |
| | | has_shsz = False |
| | | for b in block_limit_up_dict[k]: |
| | | if b[0].find('00') == 0 or b[0].find('60') == 0: |
| | | if tool.is_can_buy_code(b[0]): |
| | | has_shsz = True |
| | | break |
| | | if not has_shsz: |
| | |
| | | return False, block_codes_infos[:topn] |
| | | |
| | | |
| | | def is_record_top_block(code, block, limit_up_record_datas,yesterday_current_limit_up_codes, topn): |
| | | def is_record_top_block(code, block, limit_up_record_datas, yesterday_current_limit_up_codes, topn): |
| | | block_codes_infos = [] |
| | | limit_up_time = time.time() |
| | | for k in limit_up_record_datas: |
| | |
| | | |
| | | # 获取当日历史身位 |
| | | # shsz:是否主板 |
| | | def get_code_record_rank(code, block, limit_up_record_datas, code_limit_up_reason_dict, shsz=True): |
| | | def get_code_record_rank(code, block, limit_up_record_datas, code_limit_up_reason_dict, |
| | | yesterday_current_limit_up_codes, shsz=True): |
| | | block_codes_infos = [] |
| | | limit_up_time = time.time() |
| | | for k in limit_up_record_datas: |
| | | if shsz and k[3].find("00") != 0 and k[3].find("60") != 0: |
| | | if k[3] == code: |
| | | # 获取当前代码涨停时间 |
| | | limit_up_time = int(k[5]) |
| | | if shsz and not tool.is_can_buy_code(k[3]): |
| | | continue |
| | | # 剔除高位板 |
| | | if k[3] in yesterday_current_limit_up_codes: |
| | | continue |
| | | if code_limit_up_reason_dict.get(k[3]) == block: |
| | | if k[3] != code: |
| | | block_codes_infos.append((k[3], int(k[5]))) |
| | | else: |
| | | limit_up_time = int(k[5]) |
| | | |
| | | block_codes_infos.append((code, limit_up_time)) |
| | | block_codes_infos.sort(key=lambda x: x[1]) |
| | | front_codes = [] |
| | | for i in range(0, len(block_codes_infos)): |
| | | if block_codes_infos[i][0] == code: |
| | | return i |
| | | return 0 |
| | | return i, front_codes |
| | | else: |
| | | front_codes.append(block_codes_infos[i][0]) |
| | | return 0, [] |
| | | |
| | | |
| | | # 获取当日实时身位 |
| | | # before_blocks_dict格式位{"代码":set("板块")} |
| | | def get_code_current_rank(code, block, current_limit_up_datas, code_limit_up_reason_dict, shsz=False): |
| | | def get_code_current_rank(code, block, current_limit_up_datas, code_limit_up_reasons_dict, |
| | | yesterday_current_limit_up_codes, exclude_codes, open_limit_up_count, shsz=False, |
| | | limit_up_time=time.time()): |
| | | block_codes_infos = [] |
| | | limit_up_time = time.time() |
| | | for k in current_limit_up_datas: |
| | | if shsz and k[0].find("00") != 0 and k[0].find("60") != 0: |
| | | if k[0] == code: |
| | | # 获取当前代码涨停时间 |
| | | limit_up_time = int(k[2]) |
| | | if shsz and not tool.is_can_buy_code(k[0]): |
| | | continue |
| | | if code_limit_up_reason_dict.get(k[0]) == block: |
| | | # 剔除高位板 |
| | | if k[0] in yesterday_current_limit_up_codes: |
| | | continue |
| | | if code_limit_up_reasons_dict.get(k[0]) and block in code_limit_up_reasons_dict.get(k[0]): |
| | | if k[0] != code: |
| | | # 代码.涨停时间 |
| | | block_codes_infos.append((k[0], int(k[2]))) |
| | | else: |
| | | limit_up_time = int(k[2]) |
| | | block_codes_infos.append((code, limit_up_time)) |
| | | block_codes_infos.sort(key=lambda x: x[1]) |
| | | front_codes = [] |
| | | first_count = 0 |
| | | for i in range(0, len(block_codes_infos)): |
| | | if i == open_limit_up_count and exclude_codes and block_codes_infos[i][0] in exclude_codes: |
| | | # 非开1老大被排除 |
| | | first_count += 1 |
| | | continue |
| | | if block_codes_infos[i][0] == code: |
| | | return i |
| | | return 0 |
| | | return i - first_count, front_codes |
| | | else: |
| | | front_codes.append(block_codes_infos[i][0]) |
| | | return 0, [] |
| | | |
| | | |
| | | if __name__ == "__main__": |