Administrator
2025-02-07 7eb1a8ed1a007d80de41d131071ee38f5872700c
third_data/kpl_limit_up_data_manager.py
@@ -1,9 +1,12 @@
"""
开盘啦涨停数据管理
"""
import copy
from third_data import kpl_util, kpl_data_manager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager
from utils import tool, init_data_util
@@ -43,6 +46,13 @@
    __first_block_sequence_dict = {}
    @classmethod
    def __get_code_blocks(cls, code):
        blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
        if not blocks:
            blocks = set()
        return blocks
    @classmethod
    def set_current_limit_up_datas(cls, current_limit_up_datas):
        """
        设置目前的涨停代码
@@ -52,56 +62,62 @@
        records = get_today_history_limit_up_datas_cache()
        # 按代码排序
        # {"代码":(代码,涨停原因, 涨停时间, 几版)}
        current_code_block_dict = {x[0]: (x[0], x[2], x[5], x[4]) for x in current_limit_up_datas}
        current_code_block_dict = {x[0]: (x[0], x[5], x[2], x[4]) for x in current_limit_up_datas}
        record_code_block_dict = {x[3]: (x[3], x[2], x[5], x[12]) for x in records}
        # 根据涨停原因统计
        # {"板块":{代码}}
        block_codes = {}
        limit_up_codes = set()
        for code in current_code_block_dict:
            b = current_code_block_dict[code][1]
            if b not in block_codes:
                block_codes[b] = set()
            block_codes[b].add(code)
            bs = cls.__get_code_blocks(code)
            for b in bs:
                if b not in block_codes:
                    block_codes[b] = set()
                block_codes[b].add(code)
            limit_up_codes.add(code)
        for code in record_code_block_dict:
            b = record_code_block_dict[code][1]
            if b not in block_codes:
                block_codes[b] = set()
            block_codes[b].add(code)
            bs = cls.__get_code_blocks(code)
            for b in bs:
                if b not in block_codes:
                    block_codes[b] = set()
                block_codes[b].add(code)
        # 获取上个交易日涨停的代码
        yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
        if yesterday_codes is None:
            yesterday_codes = set()
        temp_block_sequence_dict = {}
        for code in limit_up_codes:
            # 计算身位
            b = current_code_block_dict[code][1]
            codes = block_codes[b]
            total_count = len(codes)
            # 统计真正涨停数
            limit_up_count = 0
            limit_up_codes_list = []
            for c in codes:
                if c in limit_up_codes:
                    limit_up_count += 1
                    if c not in yesterday_codes:
                        limit_up_codes_list.append((c, current_code_block_dict[c][2]))
            # 获取首板代码的排位
            limit_up_codes_list.sort(key=lambda x: x[1])
            index = 1
            for i in range(0, len(limit_up_codes_list)):
                if limit_up_codes_list[i][0] == code:
                    index = i + 1
                    break
            cls.__first_block_sequence_dict[code] = (b, index, total_count, limit_up_count)
            bs = cls.__get_code_blocks(code)
            for b in bs:
                # 计算身位
                codes = block_codes[b]
                total_count = len(codes)
                # 统计真正涨停数
                limit_up_count = 0
                limit_up_codes_list = []
                for c in codes:
                    if c in limit_up_codes:
                        limit_up_count += 1
                        if c not in yesterday_codes:
                            limit_up_codes_list.append((c, current_code_block_dict[c][2]))
                # 获取首板代码的排位
                limit_up_codes_list.sort(key=lambda x: x[1])
                index = 1
                for i in range(0, len(limit_up_codes_list)):
                    if limit_up_codes_list[i][0] == code:
                        index = i + 1
                        break
                if code not in temp_block_sequence_dict:
                    temp_block_sequence_dict[code] = []
                temp_block_sequence_dict[code].append((b, index, total_count, limit_up_count))
        cls.__first_block_sequence_dict = temp_block_sequence_dict
    @classmethod
    def get_current_limit_up_sequence(cls, code):
        """
        获取代码当前的板块身位
        @param code:
        @return:(板块名称,身位,总涨停数量,目前涨停数量)
        @return:[(板块名称,身位,总涨停数量,目前涨停数量)]
        """
        return cls.__first_block_sequence_dict.get(code)
@@ -110,8 +126,8 @@
    """
    最近涨停的板块管理
    """
    # 看最近7天
    __LATEST_DAY_COUNT = 7
    # 看最近2天,不包含今天
    __LATEST_DAY_COUNT = 2
    __days = []
    # 目前涨停
@@ -140,16 +156,15 @@
    @classmethod
    def __load_datas(cls):
        # 加载最近7天的数据
        __days = HistoryKDatasUtils.get_latest_trading_date_cache(cls.__LATEST_DAY_COUNT - 1)
        now_day = tool.get_now_date_str()
        if __days[0] != now_day:
            __days.insert(0, now_day)
        # 加载最近几天的数据
        __days = HistoryKDatasUtils.get_latest_trading_date_cache(cls.__LATEST_DAY_COUNT)
        __days = copy.deepcopy(__days)
        # 不能包含今天
        if __days[0] == tool.get_now_date_str():
            __days.pop(0)
        cls.__days = __days
        # 加载之前6天的涨停,曾涨停,曾涨停代码的最近6天的K线
        for day in __days:
            if day == now_day:
                continue
            limit_up_records = get_history_limit_up_datas(day)
            cls.__history_limit_up_day_datas[day] = limit_up_records
            current_limit_up_datas = get_current_limit_up_datas(day)
@@ -163,8 +178,6 @@
            cls.__get_bars(code)
        # 统计前6天的板块信息
        for day in __days:
            if day == now_day:
                continue
            cls.__block_day_datas[day] = cls.__statistics_limit_up_block_infos_by_day(day)
    def set_current_limit_up_data(self, day, datas):
@@ -180,17 +193,21 @@
    def __statistics_limit_up_block_infos_by_day(cls, day):
        """
        统计涨停代码信息
        @return:
        @return:{"板块":(涨停数, 炸板数, {包含的代码})}
        """
        # 统计板块的
        current_code_dict = {d[0]: d for d in cls.__current_limit_up_day_datas[day]}
        # history_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas[day]}
        block_codes_dict = {}
        for h in cls.__history_limit_up_day_datas[day]:
            cls.__code_name_dict[h[3]] = h[4]
            if h[2] not in block_codes_dict:
                block_codes_dict[h[2]] = set()
            block_codes_dict[h[2]].add(h[3])
            # 将板块所包含的代码归类
            code = h[3]
            cls.__code_name_dict[code] = h[4]
            blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
            if blocks:
                for b in blocks:
                    if b not in block_codes_dict:
                        block_codes_dict[b] = set()
                    block_codes_dict[b].add(code)
        fdata = {}
        for b in block_codes_dict:
            limit_up_count = 0
@@ -209,36 +226,42 @@
        统计涨停板块数据
        @return:
        """
        # 只统计今天的数据
        now_day = tool.get_now_date_str()
        block_dict = self.__statistics_limit_up_block_infos_by_day(now_day)
        self.__block_day_datas[now_day] = block_dict
        # 板块出现的天数
        block_count_dict = {}
        # 板块出现的代码
        block_codes_dict = {}
        for day in self.__block_day_datas:
            for b in self.__block_day_datas[day]:
                finally_limit_up_count = self.__block_day_datas[day][b][0]
                if finally_limit_up_count < 3:
                    continue
                # 板块涨停个数
                if b not in block_count_dict:
                    block_count_dict[b] = set()
                if b not in block_codes_dict:
                    block_codes_dict[b] = set()
                block_count_dict[b].add(day)
                block_codes_dict[b] |= self.__block_day_datas[day][b][2]
        # [(板块,涨停日期集合)]
        block_count_list = [(k, block_count_dict[k]) for k in block_count_dict]
        block_count_list.sort(key=lambda x: x[1], reverse=True)
        block_count_list = block_count_list[:20]
        block_count_list.sort(key=lambda x: len(x[1]), reverse=True)
        block_count_list = block_count_list[:50]
        # [(涨停原因,累计涨停次数,连续次数)]
        fdatas = []
        today_records_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas.get(now_day)}
        for d in block_count_list:
            b = d[0]
            fdata = [d[0], len(d[1])]
            fdata = [b, len(d[1])]
            temp = []
            max_continue_count = 0
            for day in self.__days:
                if d[0] in self.__block_day_datas[day]:
                if day not in self.__block_day_datas:
                    continue
                if b in self.__block_day_datas[day]:
                    finally_limit_up_count = self.__block_day_datas[day][b][0]
                else:
                    finally_limit_up_count = 0
                if b in self.__block_day_datas[day] and finally_limit_up_count >= 3:
                    # 板块代码数量>=3个
                    temp.append(day)
                else:
                    c = len(temp)
@@ -262,13 +285,12 @@
            # 统计今天这个板块中大于二板的代码数量
            limit_up_counts = 0
            for code in block_codes_dict[d[0]]:
                if code in today_records_code_dict and today_records_code_dict[code][12] != '首板':
                    limit_up_counts += 1
            fdata.append(limit_up_counts)
            # 获取每天的数量
            days_datas = []
            for day in self.__days:
                if day not in self.__block_day_datas:
                    continue
                binfo = self.__block_day_datas[day].get(b)
                if not binfo:
                    days_datas.append((0, 0))
@@ -291,10 +313,10 @@
        if cls.__days:
            volumes_data = HistoryKDataManager().get_history_bars(code, cls.__days[1])
            if volumes_data:
                volumes_data = volumes_data[:cls.__LATEST_DAY_COUNT - 1]
                volumes_data = volumes_data[:cls.__LATEST_DAY_COUNT]
                cls.__k_datas[code] = volumes_data
        if not volumes_data:
            volumes_data = init_data_util.get_volumns_by_code(code, cls.__LATEST_DAY_COUNT - 1)
            volumes_data = init_data_util.get_volumns_by_code(code, cls.__LATEST_DAY_COUNT)
            if volumes_data:
                cls.__k_datas[code] = volumes_data
        # 获取最大涨幅
@@ -305,3 +327,11 @@
        rate = int((volumes_data[0]["close"] - min_price) * 100 / min_price)
        cls.__k_max_rate[code] = rate
        return cls.__k_datas.get(code)
if __name__ == "__main__":
    # datas = LatestLimitUpBlockManager().statistics_limit_up_block_infos()
    # print(datas)
    code = "600126"
    blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
    print(blocks)