strategy/market_sentiment_analysis.py
@@ -548,7 +548,6 @@
            # 对于未知类型,你可以选择保留原样、跳过或引发异常
            # 这里我们选择保留原样
            return obj
    try:
        json_data = json.dumps(convert_datetime(all_index_k_line_property_dict), ensure_ascii=False, indent=4)
        # 将转换后的JSON字符串写入文件
@@ -561,8 +560,145 @@
    logger.info(f"加属性的指数k线写完了!{tool.get_now_time_str()}")
# 获取实时大盘行情情绪综合强度 [分数] 函数 并 计算当日计划持仓数量
def get_real_time_market_strong():
# 计算市场分布形态因子 函数
def calculate_market_distribution_pattern_factor(data):
    """
        原生Python数据预处理
        返回:
            sorted_bins   : 排序后的涨跌幅区间(整数列表)
            sorted_counts : 对应区间的股票数量
            total         : 总股票数
            pos_counts    : 正涨跌区间的数量列表
            neg_counts    : 负涨跌区间的数量列表
        """
    # 将字符串键转为整数并排序
    sorted_items = sorted(data.items(), key=lambda x: int(x[0]))
    sorted_bins = [int(k) for k, v in sorted_items]
    sorted_counts = [v for k, v in sorted_items]
    # 计算总股票数
    total = sum(sorted_counts)
    # 分离正负区间(排除0%)
    pos_counts = [v for k, v in sorted_items if int(k) > 0]
    neg_counts = [v for k, v in sorted_items if int(k) < 0]
    return sorted_bins, sorted_counts, total, pos_counts, neg_counts
# 执行预处理
bins, counts, total, pos_counts, neg_counts = calculate_market_distribution_pattern_factor(data_cache.rise_and_fall_statistics_dirt)
# ====================== 因子计算模块 ======================
class NativeFactorCalculator:
    def __init__(self, bins, counts, total, pos_counts, neg_counts):
        self.bins = bins
        self.counts = counts
        # todo total 会默认为0的情况
        if total != 0:
            self.total = total
        else:
            self.total = 1
        self.pos_counts = pos_counts
        self.neg_counts = neg_counts
    def calculate_bdr(self):
        """原生涨跌比计算"""
        sum_pos = sum(self.pos_counts)
        sum_neg = sum(self.neg_counts)
        return sum_pos / sum_neg if sum_neg != 0 else float('nan')
    def calculate_extreme_ratio(self):
        """极端波动比例计算"""
        extreme_total = 0
        for b, c in zip(self.bins, self.counts):
            if b >= 5 or b <= -5:
                extreme_total += c
        return extreme_total / self.total
    def calculate_market_breadth(self):
        """市场宽度因子计算"""
        diff = sum(self.pos_counts) - sum(self.neg_counts)
        return diff / self.total
    def _expand_distribution(self):
        """将分箱数据展开为原始数据点(用于计算统计指标)"""
        expanded = []
        for value, count in zip(self.bins, self.counts):
            expanded.extend([value] * count)
        return expanded
    def calculate_distribution_metrics(self):
        """原生偏度与峰度计算"""
        data = self._expand_distribution()
        n = len(data)
        if n < 2:
            return (float('nan'), float('nan'))
        # 计算均值
        mean = sum(data) / n
        # 计算标准差
        variance = sum((x - mean) ** 2 for x in data) / (n - 1)
        std_dev = variance ** 0.5
        # 计算偏度
        skewness = (sum((x - mean) ** 3 for x in data) / n) / (std_dev ** 3) if std_dev != 0 else 0
        # 计算峰度(Fisher定义,即正态分布峰度为0)
        kurtosis = (sum((x - mean) ** 4 for x in data) / n) / (std_dev ** 4) - 3 if std_dev != 0 else 0
        return (skewness, kurtosis)
# 实例化并计算因子
calculator = NativeFactorCalculator(bins, counts, total, pos_counts, neg_counts)
bdr = calculator.calculate_bdr()
extreme_ratio = calculator.calculate_extreme_ratio()
market_breadth = calculator.calculate_market_breadth()
skewness, kurtosis = calculator.calculate_distribution_metrics()
# ====================== 策略信号生成模块 ======================
class NativeTradingStrategy:
    def __init__(self, bdr_threshold=1.5, extreme_threshold=0.15):
        self.bdr_threshold = bdr_threshold
        self.extreme_threshold = extreme_threshold
    def generate_signals(self, bdr, extreme_ratio, market_breadth, skewness):
        """生成交易信号(与之前相同,保持策略逻辑)"""
        signals = []
        # 信号1:基于涨跌比
        if bdr > self.bdr_threshold:
            signals.append("多头趋势增强 - 加仓指数ETF")
        elif bdr < 1 / self.bdr_threshold:
            signals.append("空头趋势增强 - 减仓或反向操作")
        else:
            signals.append("趋势中性 - 保持当前仓位")
        # 信号2:极端波动
        if extreme_ratio > self.extreme_threshold:
            signals.append("波动风险升高 - 启动对冲或降低杠杆")
        else:
            signals.append("波动正常 - 维持现有风险敞口")
        # 信号3:市场宽度与偏度
        if market_breadth > 0.2 and skewness > 0.5:
            signals.append("市场普涨且偏度正向 - 超配行业龙头股")
        elif market_breadth < -0.2 and skewness < -0.5:
            signals.append("市场普跌且偏度负向 - 低估值防御型配置")
        return signals
# 实例化策略并生成信号
strategy = NativeTradingStrategy()
signals = strategy.generate_signals(bdr, extreme_ratio, market_breadth, skewness)
# 实时设置计划持仓数量 函数
def set_plan_position_quantity():
    while True:
        try:
            if data_cache.position_automatic_management_switch is True:
@@ -571,26 +707,24 @@
                    # 获取大盘综合强度分数
                    data_cache.real_time_market_strong = kpl_api.get_market_strong()
                    # 获取市场情绪字典【完整】,并整理
                    data_cache.real_time_market_sentiment_dirt = kpl_api.changeStatistics()
                    date_today = data_cache.real_time_market_sentiment_dirt.get('Day', None)
                    significant_drawdown = data_cache.real_time_market_sentiment_dirt.get('df_num', None)
                    sentiment_indicators = data_cache.real_time_market_sentiment_dirt.get('ztjs', None)
                    limit_up_amount = data_cache.real_time_market_sentiment_dirt.get('ztjs', None)
                    connecting_board_height = data_cache.real_time_market_sentiment_dirt.get('lbgd', None)
                    data_cache.real_time_market_sentiment_dirt = kpl_api.changeStatistics()  # 市场情绪字典字典
                    date_today = data_cache.real_time_market_sentiment_dirt.get('Day', None)  # 情绪数据日期
                    significant_drawdown = data_cache.real_time_market_sentiment_dirt.get('df_num', None)  # 大幅回撤
                    sentiment_indicators = data_cache.real_time_market_sentiment_dirt.get('ztjs', None)  # 情绪指标
                    limit_up_amount = data_cache.real_time_market_sentiment_dirt.get('ztjs', None)  # 涨停家数
                    connecting_board_height = data_cache.real_time_market_sentiment_dirt.get('lbgd', None)  # 连板高度
                    # 获取市场情绪-涨跌统计
                    data_cache.rise_and_fall_statistics_dirt = kpl_api.getMarketFelling()
                    limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('ZT', None)
                    actual_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJZT', None)
                    ST_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('STZT', None)
                    limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('DT', None)
                    actual_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJDT', None)
                    ST_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('STDT', None)
                    data_cache.rise_and_fall_statistics_dirt = kpl_api.getMarketFelling()  # 涨跌统计字典
                    limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('ZT', None)  # 涨停家数
                    actual_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJZT', None)  # 实际涨停家数
                    ST_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('STZT', None)  # ST涨停家数
                    limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('DT', None)  # 跌停家数
                    actual_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJDT', None)  # 实际跌停家数
                    ST_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('STDT', None)  # ST跌停家数
                    rise_numbers = data_cache.rise_and_fall_statistics_dirt.get('SZJS', None)
                    fall_numbers = data_cache.rise_and_fall_statistics_dirt.get('XDJS', None)
                    # 该logger.info的的日志不再需要打印,后续将转入到GUI客户端上直接显示,该数据的打印交由下方的打印机制异步执行单独存储,以便后续可视化呈现后进行更高效的数据分析
                    # logger.info(f"大盘行情情绪综合强度 [分数]==={data_cache.real_time_market_strong}分")
                    # 控制打印日志的时间段
                    if data_cache.MORN_MARKET_CLOSING_TIME < now_time < data_cache.NOON_MARKET_OPENING_TIME:
                        pass
                        logger.info(f"午间休市时间内 不打印大盘综合强度分数")
@@ -599,7 +733,7 @@
                        # logger_Overall_market_strength_score.info(data_cache.real_time_market_strong)
                        async_log_util.info(logger_Overall_market_strength_score, f"{data_cache.real_time_market_strong}")
                        logger.info(f"日期:{date_today},情绪指标:{sentiment_indicators}分,大幅回撤:{significant_drawdown},涨停家数:{limit_up_amount},连板高度:{connecting_board_height}")
                        logger.info(f"上涨家数:{rise_numbers}分,下跌家数:{fall_numbers},实际涨停家数:{actual_limit_up_numbers},实际跌停家数:{actual_limit_down_numbers}")
                        logger.info(f"上涨家数:{rise_numbers},下跌家数:{fall_numbers},实际涨停家数:{actual_limit_up_numbers},实际跌停家数:{actual_limit_down_numbers}")
                        logger.info(f"涨跌统计字典{data_cache.rise_and_fall_statistics_dirt}")
                    usefulMoney = data_cache.account_finance_dict[0].get('usefulMoney', 0)
@@ -642,8 +776,8 @@
        except Exception as error:
            logger_debug.exception(error)
            logger.error(f"获取实时大盘行情情绪综合强度[分数] 函数报错: {error}")
            print(f"获取实时大盘行情情绪综合强度[分数] 函数报错: {error}")
            logger.error(f"实时设置计划持仓数量 函数报错: {error}")
            print(f"实时设置计划持仓数量 函数报错: {error}")
        finally:
            time.sleep(3)
@@ -651,9 +785,19 @@
if __name__ == '__main__':
    # market_strong = kpl_api.get_market_strong()
    # print(f"{market_strong}")
    get_real_time_market_strong()
    # get_real_time_market_strong()
    # all_index_K_line_dict = get_index_K_line()
    # all_index_k_line_dict_write()
    # print(f"指数K线{data_cache.all_index_k_line_property_dict}")
    # all_index_k_line_dict_write()
    # all_index_k_line_dict_write()
    # ====================== 结果输出 ======================
    print("========== 因子数值(原生计算) ==========")
    print(f"涨跌比(BDR): {bdr:.2f}")
    print(f"极端波动比例: {extreme_ratio:.2%}")
    print(f"市场宽度因子: {market_breadth:.2f}")
    print(f"分布偏度: {skewness:.2f}, 峰度: {kurtosis:.2f}")
    print("\n========== 策略信号 ==========")
    for i, signal in enumerate(signals, 1):
        print(f"信号{i}: {signal}")