Administrator
2023-10-30 fb47d36048e94b9a506d5c153e3dd19a01e37df1
third_data/kpl_block_util.py
@@ -13,15 +13,16 @@
# 是否主板开1
# limit_up_record_datas 今日历史涨停
def is_shsz_open_limit_up(code, block, limit_up_record_datas, code_block_dict):
def get_shsz_open_limit_up_codes(code, block, limit_up_record_datas, code_block_dict):
    # 获取今日9:30的时间戳
    time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00"
    timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))
    limit_up_codes = set()
    for k in limit_up_record_datas:
        if code_block_dict.get(k[3]) == block:
            if int(k[5]) < timestamp:
                return True, f"{k[3]}开一"
    return False, ""
                limit_up_codes.add(k[3])
    return limit_up_codes
# 代码是否是后排
@@ -87,10 +88,14 @@
    return False, block_codes_infos[:topn]
def is_record_top_block(code, block, limit_up_record_datas, topn):
def is_record_top_block(code, block, limit_up_record_datas, yesterday_current_limit_up_codes, topn):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in limit_up_record_datas:
        # 判断是否是首板
        if k[0] in yesterday_current_limit_up_codes:
            continue
        if k[3] != code:
            block_codes_infos.append((k[3], k[2], int(k[5])))
        else:
@@ -100,10 +105,13 @@
    return __is_top_block(block, block_codes_infos, topn)
def is_current_top_block(code, block, current_limit_up_datas, topn):
def is_current_top_block(code, block, current_limit_up_datas, yesterday_current_limit_up_codes, topn):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in current_limit_up_datas:
        # 判断是否是首板
        if k[0] in yesterday_current_limit_up_codes:
            continue
        if k[0] != code:
            block_codes_infos.append((k[0], k[5], int(k[2])))
        else:
@@ -116,45 +124,62 @@
# 获取当日历史身位
# shsz:是否主板
def get_code_record_rank(code, block, limit_up_record_datas, code_limit_up_reason_dict, shsz=True):
def get_code_record_rank(code, block, limit_up_record_datas, code_limit_up_reason_dict,
                         yesterday_current_limit_up_codes, shsz=True):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in limit_up_record_datas:
        if k[3] == code:
            # 获取当前代码涨停时间
            limit_up_time = int(k[5])
        if shsz and k[3].find("00") != 0 and k[3].find("60") != 0:
            continue
        # 剔除高位板
        if k[3] in yesterday_current_limit_up_codes:
            continue
        if code_limit_up_reason_dict.get(k[3]) == block:
            if k[3] != code:
                block_codes_infos.append((k[3], int(k[5])))
            else:
                limit_up_time = int(k[5])
    block_codes_infos.append((code, limit_up_time))
    block_codes_infos.sort(key=lambda x: x[1])
    front_codes = []
    for i in range(0, len(block_codes_infos)):
        if block_codes_infos[i][0] == code:
            return i
    return 0
            return i, front_codes
        else:
            front_codes.append(block_codes_infos[i][0])
    return 0, []
# 获取当日实时身位
# before_blocks_dict格式位{"代码":set("板块")}
def get_code_current_rank(code, block, current_limit_up_datas, code_limit_up_reason_dict, shsz=False):
def get_code_current_rank(code, block, current_limit_up_datas, code_limit_up_reason_dict,
                          yesterday_current_limit_up_codes, shsz=False):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in current_limit_up_datas:
        if k[0] == code:
            # 获取当前代码涨停时间
            limit_up_time = int(k[2])
        if shsz and k[0].find("00") != 0 and k[0].find("60") != 0:
            continue
        # 剔除高位板
        if k[0] in yesterday_current_limit_up_codes:
            continue
        if code_limit_up_reason_dict.get(k[0]) == block:
            if k[0] != code:
                # 代码.涨停时间
                block_codes_infos.append((k[0], int(k[2])))
            else:
                limit_up_time = int(k[2])
    block_codes_infos.append((code, limit_up_time))
    block_codes_infos.sort(key=lambda x: x[1])
    front_codes = []
    for i in range(0, len(block_codes_infos)):
        if block_codes_infos[i][0] == code:
            return i
    return 0
            return i, front_codes
        else:
            front_codes.append(block_codes_infos[i][0])
    return 0, []
if __name__ == "__main__":