Administrator
2023-07-24 4d443587c7f6539f08eca5d7b1f83dbd13cc8873
选票板块即将更改
3个文件已修改
154 ■■■■ 已修改文件
third_data/block_test.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_block_util.py 134 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/block_test.py
@@ -1,17 +1,20 @@
from third_data import kpl_data_manager, block_info
from third_data import kpl_data_manager, block_info, kpl_block_util
from third_data.code_plate_key_manager import CodePlateKeyBuyManager, LimitUpCodesPlateKeyManager
from third_data.kpl_util import KPLDataType
from utils import tool
if __name__ == "__main__":
    code = "002655"
    block_info.init()
    current_limit_up_datas = kpl_data_manager.KPLDataManager().get_from_file(KPLDataType.LIMIT_UP,
                                                                             tool.get_now_date_str())
    LimitUpCodesPlateKeyManager().set_today_limit_up([(d[0], d[5]) for d in current_limit_up_datas])
    # LimitUpCodesPlateKeyManager().set_today_limit_up([(d[0], d[5]) for d in current_limit_up_datas])
    limit_up_record_datas = kpl_data_manager.KPLLimitUpDataRecordManager.list_all(tool.get_now_date_str())
    for d in limit_up_record_datas:
        block_info.init_code(d[3])
    results = CodePlateKeyBuyManager.get_can_buy_block(code, current_limit_up_datas,
                                                       limit_up_record_datas, block_info.get_before_blocks_dict())
    print(results)
    kpl_block_util.is_record_top_block("002845", "芯片", limit_up_record_datas, 4)
    kpl_block_util.is_current_top_block("002845", "芯片", current_limit_up_datas, 4)
    # results = CodePlateKeyBuyManager.get_can_buy_block(code, current_limit_up_datas,
    #                                                    limit_up_record_datas, block_info.get_before_blocks_dict())
    # print(results)
third_data/code_plate_key_manager.py
@@ -377,7 +377,10 @@
        load_code_block()
        msg_list = []
        for block in keys:
            is_top_4 = kpl_block_util.is_top_block(code, block, limit_up_record_datas, 4)
            is_top_8_record, top_8_record = kpl_block_util.is_record_top_block(code, block, limit_up_record_datas, 8)
            is_top_4_current, top_4_current = kpl_block_util.is_record_top_block(code, block, current_limit_up_datas, 4)
            is_top_4 = is_top_8_record and is_top_4_current
            # 获取主板实时身位
            current_shsz_rank = kpl_block_util.get_code_current_rank(code, block, current_limit_up_datas,
                                                                     code_limit_up_reason_dict, shsz=True)
@@ -423,7 +426,7 @@
    # 是否可以下单
    # 返回:是否可以下单,消息,板块类型
    @classmethod
    def can_buy(cls, code, current_limit_up_datas, limit_up_record_datas,before_blocks_dict):
    def can_buy(cls, code, current_limit_up_datas, limit_up_record_datas, before_blocks_dict):
        if constant.TEST:
            return True, "", cls.BLOCK_TYPE_NONE
        block, block_msg = cls.get_can_buy_block(code, current_limit_up_datas,
third_data/kpl_block_util.py
@@ -11,51 +11,6 @@
from utils import tool
# latest_2_day_limit_up_datas:最近2天涨停数据
def is_strong_block(block, current_limit_up_datas, latest_2_day_limit_up_datas):
    # 是否开1,且尚未炸板
    if current_limit_up_datas:
        # 获取今日9:30的时间戳
        time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00"
        timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))
        for k in current_limit_up_datas:
            if k[5] == block:
                if int(k[2]) < timestamp:
                    return True, "开一"
                elif k[0].find("30") == 0 or k[0].find("68") == 0:
                    return True, "20cm涨停"
        # 板块有二板且(3天内板块内无高于二板的票)
        has_continue_limit_up = False
        for k in current_limit_up_datas:
            if k[5] == block:
                if k[4] == "2连板":
                    has_continue_limit_up = True
                    break
        total_limit_up_datas = list(latest_2_day_limit_up_datas)
        total_limit_up_datas.extend(current_limit_up_datas)
        for k in total_limit_up_datas:
            if k[5] == block:
                if k[4].find("连板") > 0 and int(k[4][:1]) > 2:
                    has_continue_limit_up = False
                    break
        if has_continue_limit_up:
            return True, "板块有二板且3天内板块内无高于二板"
        has_2 = False
        has_3 = False
        for k in current_limit_up_datas:
            if k[5] == block:
                if k[4].find("连板") > -1 and int(k[4][:1]) > 2:
                    has_3 = True
                elif k[4].find("连板") > -1 and int(k[4][:1]) == 2:
                    has_2 = True
        if not has_2 and has_3:
            return True, f"板块中无2板,但有3板及以上"
    return False, ""
# 是否主板开1
# limit_up_record_datas 今日历史涨停
def is_shsz_open_limit_up(code, block, limit_up_record_datas, code_block_dict):
@@ -83,7 +38,56 @@
# 是否是前几的板块
def is_top_block(code, block, limit_up_record_datas, topn):
# 板块中有主板涨停的才参与排序(排序时间按照板块中的涨停时间来排序)
def __is_top_block(block, block_codes_infos, topn):
    block_limit_up_dict = {}
    for b in block_codes_infos:
        if b[1] not in block_limit_up_dict:
            block_limit_up_dict[b[1]] = []
        block_limit_up_dict[b[1]].append(b)
    # 剔除只有非主板涨停的板块
    invalid_blocks = []
    for k in block_limit_up_dict:
        has_shsz = False
        for b in block_limit_up_dict[k]:
            if b[0].find('00') == 0 or b[0].find('60') == 0:
                has_shsz = True
                break
        if not has_shsz:
            invalid_blocks.append(k)
    for k in invalid_blocks:
        block_limit_up_dict.pop(k)
    # 每个板块涨停时间排序
    invalid_blocks = []
    for k in block_limit_up_dict:
        # 删除宽泛概念
        if k in constant.KPL_INVALID_BLOCKS:
            invalid_blocks.append(k)
            continue
        block_limit_up_dict[k].sort(key=lambda x: x[2])
    for k in invalid_blocks:
        block_limit_up_dict.pop(k)
    block_codes_infos = [block_limit_up_dict[k][0] for k in block_limit_up_dict]
    block_codes_infos.sort(key=lambda x: x[2])
    # 去除通用涨停原因
    index = 0
    for d in block_codes_infos:
        if d[1] == block:
            if index + 1 <= topn:
                return True, block_codes_infos[:topn]
            else:
                return False, block_codes_infos[:topn]
        index += 1
    if index <= topn:
        return True, block_codes_infos[:topn]
    return False, block_codes_infos[:topn]
def is_record_top_block(code, block, limit_up_record_datas, topn):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in limit_up_record_datas:
@@ -91,31 +95,23 @@
            block_codes_infos.append((k[3], k[2], int(k[5])))
        else:
            limit_up_time = int(k[5])
    block_codes_infos.append((code, block, limit_up_time))
    # 排序
    return __is_top_block(block, block_codes_infos, topn)
def is_current_top_block(code, block, current_limit_up_datas, topn):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in current_limit_up_datas:
        if k[0] != code:
            block_codes_infos.append((k[0], k[5], int(k[2])))
        else:
            limit_up_time = int(k[2])
    # 排序
    block_codes_infos.append((code, block, limit_up_time))
    block_first_limit_up_dict = {}
    for b in block_codes_infos:
        if b[1] not in block_first_limit_up_dict:
            block_first_limit_up_dict[b[1]] = b
        else:
            if b[2] < block_first_limit_up_dict[b[1]][2]:
                block_first_limit_up_dict[b[1]] = b
    block_codes_infos = [block_first_limit_up_dict[k] for k in block_first_limit_up_dict]
    block_codes_infos.sort(key=lambda x: x[2])
    # 去除通用涨停原因
    index = 0
    for d in block_codes_infos:
        if d[1] in constant.KPL_INVALID_BLOCKS:
            continue
        if d[1] == block:
            if index + 1 <= topn:
                return True
            else:
                return False
        index += 1
    if index <= topn:
        return True
    return False
    # 排序
    return __is_top_block(block, block_codes_infos, topn)
# 获取当日历史身位