Administrator
6 天以前 1c04204fcbc958a7bdef2394ff939063e56b6404
third_data/kpl_block_util.py
@@ -7,74 +7,36 @@
import datetime
import time
import constant
from utils import tool
# latest_2_day_limit_up_datas:最近2天涨停数据
def is_strong_block(block, current_limit_up_datas, latest_2_day_limit_up_datas):
    # 是否开1,且尚未炸板
    if current_limit_up_datas:
        # 获取今日9:30的时间戳
        time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00"
        timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))
        for k in current_limit_up_datas:
            if k[5] == block:
                if int(k[2]) < timestamp:
                    return True, "开一"
                elif k[0].find("30") == 0 or k[0].find("68") == 0:
                    return True, "20cm涨停"
        # 板块有二板且(3天内板块内无高于二板的票)
        has_continue_limit_up = False
        for k in current_limit_up_datas:
            if k[5] == block:
                if k[4] == "2连板":
                    has_continue_limit_up = True
                    break
        total_limit_up_datas = list(latest_2_day_limit_up_datas)
        total_limit_up_datas.extend(current_limit_up_datas)
        for k in total_limit_up_datas:
            if k[5] == block:
                if k[4].find("连板") > 0 and int(k[4][:1]) > 2:
                    has_continue_limit_up = False
                    break
        if has_continue_limit_up:
            return True, "板块有二板且3天内板块内无高于二板"
        has_2 = False
        has_3 = False
        for k in current_limit_up_datas:
            if k[5] == block:
                if k[4].find("连板") > -1 and int(k[4][:1]) > 2:
                    has_3 = True
                elif k[4].find("连板") > -1 and int(k[4][:1]) == 2:
                    has_2 = True
        if not has_2 and has_3:
            return True, f"板块中无2板,但有3板及以上"
    return False, ""
# 是否是猛拉板块
# 是否主板开1
# limit_up_record_datas 今日历史涨停
def is_soon_limit_up(code, block, limit_up_record_datas):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in limit_up_record_datas:
        if k[2] == block:
            if k[3] != code:
                block_codes_infos.append((k[3], int(k[5])))
            else:
                limit_up_time = int(k[5])
    # 排序
    block_codes_infos.append((code, limit_up_time))
    block_codes_infos.sort(key=lambda x: x[1])
    if len(block_codes_infos) < 2:
        return False, ""
    if block_codes_infos[1][1] - block_codes_infos[0][1] < 30 * 60:
        # 首次涨停时间间隔30分钟内
        return True, f"板块:{block}  龙1:{block_codes_infos[0][0]} 龙2:{block_codes_infos[1][0]}"
    return False, ""
def get_shsz_open_limit_up_codes(code, block, limit_up_record_datas, code_block_dict):
    # 获取今日9:30的时间戳
    time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00"
    timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))
    limit_up_codes = set()
    if limit_up_record_datas:
        for k in limit_up_record_datas:
            if code_block_dict.get(k[3]) == block:
                if int(k[5]) < timestamp:
                    limit_up_codes.add(k[3])
    return limit_up_codes
# 获取主板开1且目前是涨停的代码
def get_shsz_open_limit_up_codes_current(code, block, current_limit_up_datas):
    # 获取今日9:30的时间戳
    time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00"
    timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))
    limit_up_codes = set()
    for k in current_limit_up_datas:
        if k[5] == block:
            if int(k[2]) < timestamp:
                limit_up_codes.add(k[0])
    return limit_up_codes
# 代码是否是后排
@@ -90,43 +52,159 @@
        return True
# 获取主板身位
def get_sh_sz_code_rank(code, block, limit_up_record_datas):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in limit_up_record_datas:
        if k[3].find("00") != 0 and k[3].find("60") != 0:
# 是否是前几的板块
# 板块中有主板涨停的才参与排序(排序时间按照板块中的涨停时间来排序)
def __is_top_block(block, block_codes_infos, topn):
    block_limit_up_dict = {}
    for b in block_codes_infos:
        if b[1] not in block_limit_up_dict:
            block_limit_up_dict[b[1]] = []
        block_limit_up_dict[b[1]].append(b)
    # 剔除只有非主板涨停的板块
    invalid_blocks = []
    for k in block_limit_up_dict:
        has_shsz = False
        for b in block_limit_up_dict[k]:
            if tool.is_can_buy_code(b[0]):
                has_shsz = True
                break
        if not has_shsz:
            invalid_blocks.append(k)
    for k in invalid_blocks:
        block_limit_up_dict.pop(k)
    # 每个板块涨停时间排序
    invalid_blocks = []
    for k in block_limit_up_dict:
        # 删除宽泛概念
        if k in constant.KPL_INVALID_BLOCKS:
            invalid_blocks.append(k)
            continue
        if k[2] == block:
            if k[3] != code:
                block_codes_infos.append((k[3], int(k[5])))
        block_limit_up_dict[k].sort(key=lambda x: x[2])
    for k in invalid_blocks:
        block_limit_up_dict.pop(k)
    block_codes_infos = [block_limit_up_dict[k][0] for k in block_limit_up_dict]
    block_codes_infos.sort(key=lambda x: x[2])
    # 去除通用涨停原因
    index = 0
    for d in block_codes_infos:
        if d[1] == block:
            if index + 1 <= topn:
                return True, block_codes_infos[:topn]
            else:
                limit_up_time = int(k[5])
    block_codes_infos.append((code, limit_up_time))
    block_codes_infos.sort(key=lambda x: x[1])
    for i in range(0, len(block_codes_infos)):
        if block_codes_infos[i][0] == code:
            return i
    return 0
                return False, block_codes_infos[:topn]
        index += 1
    if index <= topn:
        return True, block_codes_infos[:topn]
    return False, block_codes_infos[:topn]
# 获取身位
def get_code_rank(code, block, limit_up_record_datas):
def is_record_top_block(code, block, limit_up_record_datas, yesterday_current_limit_up_codes, topn):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in limit_up_record_datas:
        if k[2] == block:
        # 判断是否是首板
        if k[0] in yesterday_current_limit_up_codes:
            continue
        if k[3] != code:
            block_codes_infos.append((k[3], k[2], int(k[5])))
        else:
            limit_up_time = int(k[5])
    block_codes_infos.append((code, block, limit_up_time))
    # 排序
    return __is_top_block(block, block_codes_infos, topn)
def is_current_top_block(code, block, current_limit_up_datas, yesterday_current_limit_up_codes, topn):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in current_limit_up_datas:
        # 判断是否是首板
        if k[0] in yesterday_current_limit_up_codes:
            continue
        if k[0] != code:
            block_codes_infos.append((k[0], k[5], int(k[2])))
        else:
            limit_up_time = int(k[2])
    # 排序
    block_codes_infos.append((code, block, limit_up_time))
    # 排序
    return __is_top_block(block, block_codes_infos, topn)
# 获取当日历史身位
# shsz:是否主板
def get_code_record_rank(code, block, limit_up_record_datas, code_limit_up_reason_dict,
                         yesterday_current_limit_up_codes, shsz=True):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in limit_up_record_datas:
        if k[3] == code:
            # 获取当前代码涨停时间
            limit_up_time = int(k[5])
        if shsz and not tool.is_can_buy_code(k[3]):
            continue
        # 剔除高位板
        if k[3] in yesterday_current_limit_up_codes:
            continue
        if code_limit_up_reason_dict.get(k[3]) == block:
            if k[3] != code:
                block_codes_infos.append((k[3], int(k[5])))
            else:
                limit_up_time = int(k[5])
    block_codes_infos.append((code, limit_up_time))
    block_codes_infos.sort(key=lambda x: x[1])
    front_codes = []
    for i in range(0, len(block_codes_infos)):
        if block_codes_infos[i][0] == code:
            return i
    return 0
            return i, front_codes
        else:
            front_codes.append(block_codes_infos[i][0])
    return 0, []
# 获取当日实时身位
# before_blocks_dict格式位{"代码":set("板块")}
def get_code_current_rank(code, block, current_limit_up_datas, code_limit_up_reasons_dict,
                          yesterday_current_limit_up_codes, exclude_codes, open_limit_up_count, shsz=False,
                          limit_up_time=time.time()):
    block_codes_infos = []
    for k in current_limit_up_datas:
        if k[0] == code:
            # 获取当前代码涨停时间
            limit_up_time = int(k[2])
        if shsz and not tool.is_can_buy_code(k[0]):
            continue
        # 剔除高位板
        if k[0] in yesterday_current_limit_up_codes:
            continue
        if code_limit_up_reasons_dict.get(k[0]) and block in code_limit_up_reasons_dict.get(k[0]):
            if k[0] != code:
                # 代码.涨停时间
                block_codes_infos.append((k[0], int(k[2])))
    block_codes_infos.append((code, limit_up_time))
    block_codes_infos.sort(key=lambda x: x[1])
    front_codes = []
    first_count = 0
    for i in range(0, len(block_codes_infos)):
        if i == open_limit_up_count and exclude_codes and block_codes_infos[i][0] in exclude_codes:
            # 非开1老大被排除
            first_count += 1
            continue
        if block_codes_infos[i][0] == code:
            return i - first_count, front_codes
        else:
            front_codes.append(block_codes_infos[i][0])
    return 0, []
# 开1时间范围
open_limit_up_time_range = time.mktime(
    time.strptime(tool.get_now_date_str() + " 09:25:00", '%Y-%m-%d %H:%M:%S')), time.mktime(
    time.strptime(tool.get_now_date_str() + " 09:30:00", '%Y-%m-%d %H:%M:%S'))
if __name__ == "__main__":
    pass
    print(open_limit_up_time_range)