Administrator
2024-11-29 5dbde33455497c27a0767d6ea8c81b0bf076f607
third_data/kpl_data_manager.py
@@ -10,14 +10,14 @@
import constant
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util, log
from third_data.kpl_data_constant import LimitUpDataConstant
from third_data.kpl_data_constant import LimitUpDataConstant, TodayLimitUpReasonChangeManager
from utils import tool
# 开盘啦历史涨停数据管理
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from log_module.log import logger_kpl_limit_up_reason_change, logger_debug, logger_kpl_limit_up, \
    logger_kpl_open_limit_up
from third_data import kpl_util, kpl_api, kpl_data_constant
from third_data import kpl_util, kpl_api
from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager, CodesHisReasonAndBlocksManager
# 代码对应的涨停原因保存
@@ -161,8 +161,7 @@
                        # 板块更改过
                        mysqldb.execute(
                            f"update kpl_limit_up_record set _hot_block_change = f'{dd[2]}' where _day='{dd[1]}' and _code='{code}'")
                        cls.__LimitUpCodesPlateKeyManager.set_today_limit_up_reason_change(code, dd[2],
                        TodayLimitUpReasonChangeManager().set_today_limit_up_reason_change(code, dd[2],
                                                                                           code_block_dict[code])
                        if dd[0] in cls.latest_datas:
@@ -179,9 +178,12 @@
            cls.__load_hist_and_blocks(d[3])
    @staticmethod
    def list_all(day):
    def list_all(day, max_limit_up_time=None):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(f"select * from kpl_limit_up_record where _day='{day}'")
        sql = f"select * from kpl_limit_up_record where _day='{day}'"
        if max_limit_up_time:
            sql += f" and cast(_limit_up_time as unsigned)<={max_limit_up_time}"
        return mysqldb.select_all(sql)
    @classmethod
    def list_all_cache(cls, day):
@@ -289,7 +291,7 @@
    @classmethod
    def get_new_blocks(cls, day):
        """
        获取某一天新出现的板块
        获取某一天新出现的板块(新板块)
        @param day:
        @return:
        """
@@ -347,7 +349,7 @@
    @classmethod
    # 获取最近几天的数据,根据日期倒序返回
    def get_latest_from_file(cls, type, count):
    def get_latest_from_file(cls, type, count, max_day=tool.get_now_date_str()):
        files = os.listdir(constant.CACHE_PATH)
        file_name_list = []
        for f in files:
@@ -355,7 +357,6 @@
                file_name_list.append((f.split("_")[0], f))
        file_name_list.sort(key=lambda x: x[0], reverse=True)
        file_name_list = file_name_list[:count]
        fresults = []
        for file in file_name_list:
            path = f"{constant.CACHE_PATH}/{file[1]}"
@@ -364,7 +365,10 @@
            with open(path, 'r') as f:
                lines = f.readlines()
                if lines:
                    fresults.append((file[0], json.loads(lines[0])))
                    if int(file[0].replace("-", "")) <= int(max_day.replace("-", "")):
                        fresults.append((file[0], json.loads(lines[0])))
                if len(fresults) >=count:
                    break
        return fresults
@@ -409,18 +413,24 @@
# 获取最近几天的实时涨停信息
# 返回格式([日期,数据])
def get_current_limit_up_data_records(count):
def get_current_limit_up_data_records(count, day=tool.get_now_date_str()):
    fresults = []
    day = tool.get_now_date_str()
    datas = []
    if day in __limit_up_list_records_dict:
        datas = __limit_up_list_records_dict[day]
    else:
        logger_debug.info("从文件中获取前几天的实时涨停数据")
        datas = KPLDataManager().get_latest_from_file(KPLDataType.LIMIT_UP, count + 2)
        if datas:
        datas = KPLDataManager().get_latest_from_file(KPLDataType.LIMIT_UP, count + 2, max_day=day)
        # 移除比今天还大的数据
        fdatas = []
        for d in datas:
            if int(d[0].replace("-", "")) > int(day.replace("-", "")):
                continue
            fdatas.append(d)
        if fdatas:
            # 保存数据
            __limit_up_list_records_dict[day] = datas
            __limit_up_list_records_dict[day] = fdatas
    datas = __limit_up_list_records_dict[day]
    for i in range(len(datas)):
        if datas[i][0] == day:
            continue
@@ -445,10 +455,9 @@
__latest_current_limit_up_records = {}
def get_latest_current_limit_up_records():
    day = tool.get_now_date_str()
def get_latest_current_limit_up_records(day=tool.get_now_date_str(), max_day_count=15):
    if day not in __latest_current_limit_up_records:
        fdatas = get_current_limit_up_data_records(15)
        fdatas = get_current_limit_up_data_records(max_day_count)
        __latest_current_limit_up_records[day] = fdatas
    return __latest_current_limit_up_records.get(day)
@@ -478,6 +487,17 @@
            logger_debug.info("任务修复-开盘啦:涨停列表")
            # 大于20s就需要更新
            threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
        # key = "jingxuan_rank"
        # if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
        #     logger_debug.info("任务修复-开盘啦:精选流入列表")
        #     # 大于20s就需要更新
        #     threading.Thread(target=cls.run_market_jingxuan_in, daemon=True).start()
        #
        # key = "jingxuan_rank_out"
        # if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
        #     logger_debug.info("任务修复-开盘啦:精选流出列表")
        #     # 大于20s就需要更新
        #     threading.Thread(target=cls.run_market_jingxuan_out, daemon=True).start()
    @classmethod
    def run_limit_up_task(cls):
@@ -500,6 +520,42 @@
                pass
            finally:
                cls.__latest_update_time_dict["limit_up"] = time.time()
                time.sleep(3)
    @classmethod
    def run_market_jingxuan_in(cls):
        """
        精选流入
        @return:
        """
        while True:
            try:
                if tool.is_trade_time():
                    results = kpl_api.getMarketJingXuanRealRankingInfo()
                    result = json.loads(results)
                    cls.__upload_data("jingxuan_rank", result)
            except:
                pass
            finally:
                cls.__latest_update_time_dict["jingxuan_rank"] = time.time()
                time.sleep(3)
    @classmethod
    def run_market_jingxuan_out(cls):
        """
        精选流出
        @return:
        """
        while True:
            try:
                if tool.is_trade_time():
                    results = kpl_api.getMarketJingXuanRealRankingInfo(False)
                    result = json.loads(results)
                    cls.__upload_data("jingxuan_rank_out", result)
            except:
                pass
            finally:
                cls.__latest_update_time_dict["jingxuan_rank_out"] = time.time()
                time.sleep(3)
    @classmethod
@@ -530,22 +586,67 @@
        def get_market_jingxuan():
            while True:
                if tool.is_trade_time() or True:
                if tool.is_trade_time():
                    try:
                        results = kpl_api.getMarketJingXuanRealRankingInfo()
                        result = json.loads(results)
                        cls.__upload_data("jingxuan_rank", result)
                    except:
                        pass
                time.sleep(3)
                    finally:
                        cls.__latest_update_time_dict["jingxuan_rank"] = time.time()
                        time.sleep(3)
                else:
                    time.sleep(3)
        threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
        # threading.Thread(target=get_bidding_money, daemon=True).start()
        # threading.Thread(target=get_market_industry, daemon=True).start()
        threading.Thread(target=get_market_jingxuan, daemon=True).start()
        # threading.Thread(target=cls.run_market_jingxuan_in, daemon=True).start()
        # threading.Thread(target=cls.run_market_jingxuan_out, daemon=True).start()
@tool.singleton
class CodeHighLevel:
    """
    代码高度管理
    """
    __instance = None
    # 下单板块的代码记录
    __code_level_dict = {}
    __codes = set()
    def __init__(self, day=tool.get_now_date_str()):
        self.__day = day
        self.__load_data(day)
    @classmethod
    def __load_data(cls, day):
        fdatas = get_current_limit_up_data_records(15, day=day)
        temp_dict = {d[0]: 2 for d in fdatas[0][1]}
        break_codes = set()
        for i in range(1, len(fdatas)):
            codes = [d[0] for d in fdatas[i][1]]
            for k in temp_dict:
                if k in break_codes:
                    continue
                if k in codes:
                    temp_dict[k] += 1
                else:
                    break_codes.add(k)
        cls.__code_level_dict = temp_dict
    def get_high_level(self, code):
        """
        获取涨停高度,默认1板
        @param code:
        @return:
        """
        if code in self.__code_level_dict:
            return self.__code_level_dict[code]
        return 1
if __name__ == "__main__":
    print(get_latest_current_limit_up_records())
    print(get_latest_current_limit_up_records())
    print(CodeHighLevel("2024-11-11").get_high_level("000833"))
    input()