Administrator
2024-11-13 d2d5ca80907183f88a5e78aa28c085a746868d6d
third_data/kpl_data_constant.py
@@ -8,9 +8,12 @@
from utils.kpl_data_db_util import KPLLimitUpDataUtil
# 用于计算激进买开1的板数:{"代码":(几版,{板块})}
from utils.tool import singleton
open_limit_up_code_dict_for_radical_buy = None
@singleton
class LimitUpCodesBlockRecordManager:
    """
    历史涨停代码的板块管理
@@ -26,15 +29,16 @@
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(LimitUpCodesBlockRecordManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
    __day = tool.get_now_date_str()
    def __init__(self, day=tool.get_now_date_str()):
        self.__day = day
        self.__load_data()
    @classmethod
    def __load_data(cls):
        kpl_results = KPLLimitUpDataUtil.get_latest_block_infos()
        kpl_results = KPLLimitUpDataUtil.get_latest_block_infos(min_day=tool.date_sub(cls.__day, 180),
                                                                max_day=cls.__day)
        limit_up_reasons_dict = {}
        limit_up_recommend_block_dict = {}
        for r in kpl_results:
@@ -55,11 +59,10 @@
            cls.__limit_up_recommend_reasons_dict[code] = set(limit_up_recommend_block_dict[code])
        # 加载为扫入买匹配的代码板块
        kpl_results = KPLLimitUpDataUtil.get_latest_block_infos(min_day=tool.date_sub(tool.get_now_date_str(), 365))
        kpl_results = KPLLimitUpDataUtil.get_latest_block_infos(min_day=tool.date_sub(cls.__day, 365),
                                                                max_day=cls.__day)
        kpl_block_dict = {}
        for r in kpl_results:
            if r[2] in constant.KPL_INVALID_BLOCKS:
                continue
            code = r[0]
            if code not in kpl_block_dict:
                kpl_block_dict[code] = []
@@ -80,6 +83,7 @@
            if len(temp_list) > 1:
                if temp_list[1][1] >= 2:
                    blocks.add(temp_list[1][0])
            blocks -= constant.KPL_INVALID_BLOCKS
            cls.__radical_buy_reasons_dict[code] = blocks
    def get_limit_up_reasons(self, code):