Administrator
4 天以前 245979e3907d34bcd88ac0c4547f399bf33a44de
strategy/data_analyzer.py
@@ -60,6 +60,24 @@
        return k_data[0]['amount']
    @classmethod
    def get_yesterday_low_price(cls, k_data):
        """
        获取昨日最低价
        @param k_data: K线数据列表
        @return: 昨日最高价
        """
        return k_data[0]['low']
    @classmethod
    def get_yesterday_open_price(cls, k_data):
        """
        获取昨日开盘价
        @param k_data: K线数据列表
        @return: 昨日最高价
        """
        return k_data[0]['open']
    @classmethod
    def get_recent_days_high(cls, k_data, days):
        """
        获取近几个交易日的最高价
@@ -151,6 +169,22 @@
            1 for d in k_data[:days] if d['close'] <= cls.calculate_lower_limit_price(d["sec_id"], d["pre_close"]))
    @classmethod
    def get_recent_days_double_volume_date(cls, k_data, days):
        """
        获取最近几个交易日的倍量日期
        @param k_data: K线数据列表
        @param days: 交易日数量
        @return: 倍量的日期
        """
        k_datas: list = k_data[:days]
        k_datas.reverse()
        for i in range(1, len(k_datas)):
            latest_volume = k_datas[i - 1]["volume"]
            if k_datas[i]["volume"] > 2 * latest_volume:
                return k_datas[i]["bob"]
        return None
    @classmethod
    def get_first_limit_up_avg_premium(cls, k_data, days):
        """
        获取近几个交易日的首板涨停平均溢价率
@@ -238,6 +272,18 @@
        return count
    @classmethod
    def __is_limit_up(cls, code, close, pre_close):
        """
        是否涨停
        @param code:
        @param close:
        @param pre_close:
        @return:
        """
        return abs(close - cls.calculate_upper_limit_price(code,
                                                           pre_close)) < 0.01
    @classmethod
    def get_third_limit_up_days(cls, k_data, days):
        """
        获取近几个交易日的三板天数
@@ -246,18 +292,24 @@
        @return: 三板天数
        """
        count = 0
        k_data = k_data[:days]
        k_data = k_data[::-1]
        for i in range(days):
            if i + 3 >= len(k_data):
                continue
            # 判断连续三日涨停且第四日非涨停
            if (k_data[i]['close'] >= cls.calculate_upper_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \
                    (k_data[i + 1]['close'] >= cls.calculate_upper_limit_price(k_data[i + 1]["sec_id"],
                                                                               k_data[i + 1]["pre_close"])) and \
                    (k_data[i + 2]['close'] >= cls.calculate_upper_limit_price(k_data[i + 2]["sec_id"],
                                                                               k_data[i + 2]["pre_close"])) and \
                    (k_data[i + 3]['close'] < cls.calculate_upper_limit_price(k_data[i + 3]["sec_id"],
                                                                              k_data[i + 3]["pre_close"])):
                count += 1
            day_count = 3
            for n in range(day_count + 1):
                if n < day_count:
                    if not cls.__is_limit_up(k_data[i + n]["sec_id"], k_data[i + n]['close'],
                                             k_data[i + n]["pre_close"]):
                        # 非涨停
                        break
                else:
                    if not cls.__is_limit_up(k_data[i + n]["sec_id"], k_data[i + n]['close'],
                                             k_data[i + n]["pre_close"]):
                        count += 1
                        break
        return count
    @classmethod
@@ -369,6 +421,59 @@
                count += 1
        return count
    @classmethod
    def is_too_high_and_not_relase_volume(cls, k_data):
        """
        长得太高且没放量:30个交易日内,出现过最低价(最高价之前的交易日)到最高价之间的涨幅≥35%的票,且今日距离最高价那日无涨停/无炸板且>=3板且必须有2连板
        @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
        @return: 四跌停及以上天数
        """
        k_data = k_data[:30]
        code = k_data[0]["sec_id"]
        # 获取最高价信息
        max_high_price_data = max(k_data, key=lambda x: x["high"])
        before_datas = [d for d in k_data if d['bob'] < max_high_price_data['bob']]
        after_datas = [d for d in k_data if d['bob'] >= max_high_price_data['bob']]
        if not before_datas:
            return False
        if len(before_datas) > 15:
            # 从最高价日期向前最多看15个交易日
            before_datas = before_datas[:15]
        min_close_price_data = min(before_datas, key=lambda x: x["close"])
        if (max_high_price_data['high'] - min_close_price_data['close']) / min_close_price_data['close'] < 0.35:
            # 涨幅小于35%
            return False
        before_k_datas = [d for d in k_data if min_close_price_data['bob'] <= d['bob'] <= max_high_price_data['bob']]
        before_k_datas.sort(key=lambda x: x['bob'])
        # [最低价-最高价]日期内有3个板且有两连扳
        continue_2_limit_up_date = None
        for i in range(len(before_k_datas) - 1):
            if cls.__is_limit_up(code, before_k_datas[i]["close"],
                                 before_k_datas[i]["pre_close"]) and cls.__is_limit_up(code,
                                                                                       before_k_datas[i + 1]["close"],
                                                                                       before_k_datas[i + 1][
                                                                                           "pre_close"]):
                continue_2_limit_up_date = before_k_datas[i + 1]['bob'][:10]
                break
        if not continue_2_limit_up_date:
            # 无两连板
            return False
        # 两连板之后是否有炸板/涨停
        # 取2连板之后的3个交易日
        temp_k_datas = [d for d in before_k_datas if d['bob'][:10] > continue_2_limit_up_date][:3]
        if len([d for d in temp_k_datas if cls.__is_limit_up(code, d["high"], d["pre_close"])]) < 1:
            # 两连板之后有个涨停/炸板且时间在2连板之后的3个交易日内
            return False
        k_data = [d for d in k_data if d['bob'] > max_high_price_data['bob']]
        # 判断是否涨停过
        if len([d for d in k_data if cls.__is_limit_up(code, d["high"], d["pre_close"])]) > 0 or len(after_datas) >= 10:
            # 最高价之后有过涨停或者是最高价后10个交易日
            return False
        return True, f"高价日期:{max_high_price_data['bob'][:10]},低价日期:{min_close_price_data['bob'][:10]},两连扳日期:{continue_2_limit_up_date}"
class K60SLineAnalyzer:
    """
@@ -403,10 +508,96 @@
        reason_counts = {}
        special_reasons = constant.KPL_INVALID_BLOCKS
        if limit_up_data:
            for _, date, reason in limit_up_data:
            for _, date, reason, is_open, _blocks in limit_up_data:
                if is_open:
                    continue
                if min_day <= date <= max_day and reason not in special_reasons:
                    reason_counts[reason] = reason_counts.get(reason, 0) + 1
        if not reason_counts:
            return []
        max_count = max(reason_counts.values())
        return [(reason, count) for reason, count in reason_counts.items() if count == max_count]
    @classmethod
    def get_limit_up_reasons(cls, limit_up_data_list, min_day, max_day, include_recommend_reason=False):
        """
        获取最近一段时间的涨停原因
        @param include_recommend_reason: 是否包含推荐原因
        @param max_day:
        @param limit_up_data_list:
        @param min_day:
        @return:
        """
        special_reasons = constant.KPL_INVALID_BLOCKS
        day_block_codes = {}
        if limit_up_data_list:
            for _, date, reason, is_open, _blocks in limit_up_data_list:
                if reason in special_reasons:
                    continue
                if date > max_day or date < min_day:
                    continue
                # 每天的板块涨停数量
                if date not in day_block_codes:
                    day_block_codes[date] = {}
                reasons = {reason}
                if include_recommend_reason and _blocks:
                    reasons |= set(_blocks.split("、"))
                for r in reasons:
                    if r not in day_block_codes[date]:
                        # {日期:{板块:[{真正涨停集合}, {炸板集合}]}}
                        day_block_codes[date][r] = [set(), set()]
                    if not is_open:
                        # 真正涨停
                        day_block_codes[date][r][0].add(_)
                    else:
                        # 炸板
                        day_block_codes[date][r][1].add(_)
            blocks = set()
            for date in day_block_codes:
                for reason in day_block_codes[date]:
                    if len(day_block_codes[date][reason][0]) >= 2 or len(day_block_codes[date][reason][0]) >= 4:
                        # 最后涨停数>=2 炸板数>=4
                        blocks.add(reason)
            return blocks
        return set()
    @classmethod
    def get_continuous_limit_up_reasons(cls, limit_up_data_list, days_list):
        """
        连续老题材:days_list交易日都在走的题材
        @param limit_up_data_list:
        @param days_list: ["2025-01-01"]
        @return:
        """
        special_reasons = constant.KPL_INVALID_BLOCKS
        day_block_codes = {}
        if limit_up_data_list:
            for _, date, reason, is_open, _blocks in limit_up_data_list:
                if reason in special_reasons:
                    continue
                if date not in days_list:
                    continue
                # 每天的板块涨停数量
                if date not in day_block_codes:
                    day_block_codes[date] = {}
                reasons = {reason}
                for r in reasons:
                    if r not in day_block_codes[date]:
                        # {日期:{板块:[{真正涨停集合}, {炸板集合}]}}
                        day_block_codes[date][r] = [set(), set()]
                    if not is_open:
                        # 真正涨停
                        day_block_codes[date][r][0].add(_)
                    else:
                        # 炸板
                        day_block_codes[date][r][1].add(_)
            block_days = {}
            for date in day_block_codes:
                for reason in day_block_codes[date]:
                    if len(day_block_codes[date][reason][0]) >= 2 or len(day_block_codes[date][reason][0]) >= 4:
                        # 最后涨停数>=2 炸板数>=4
                        if reason not in block_days:
                            block_days[reason] = set()
                        block_days[reason].add(date)
            return set([b for b in block_days if len(block_days[b]) == len(days_list)])
        return set()