| | |
| | | # 对于未知类型,你可以选择保留原样、跳过或引发异常 |
| | | # 这里我们选择保留原样 |
| | | return obj |
| | | |
| | | try: |
| | | json_data = json.dumps(convert_datetime(all_index_k_line_property_dict), ensure_ascii=False, indent=4) |
| | | # 将转换后的JSON字符串写入文件 |
| | |
| | | logger.info(f"加属性的指数k线写完了!{tool.get_now_time_str()}") |
| | | |
| | | |
| | | # 获取实时大盘行情情绪综合强度 [分数] 函数 并 计算当日计划持仓数量 |
| | | def get_real_time_market_strong(): |
| | | # 计算市场分布形态因子 函数 |
| | | def calculate_market_distribution_pattern_factor(data): |
| | | """ |
| | | 原生Python数据预处理 |
| | | 返回: |
| | | sorted_bins : 排序后的涨跌幅区间(整数列表) |
| | | sorted_counts : 对应区间的股票数量 |
| | | total : 总股票数 |
| | | pos_counts : 正涨跌区间的数量列表 |
| | | neg_counts : 负涨跌区间的数量列表 |
| | | """ |
| | | # 将字符串键转为整数并排序 |
| | | sorted_items = sorted(data.items(), key=lambda x: int(x[0])) |
| | | sorted_bins = [int(k) for k, v in sorted_items] |
| | | sorted_counts = [v for k, v in sorted_items] |
| | | |
| | | # 计算总股票数 |
| | | total = sum(sorted_counts) |
| | | |
| | | # 分离正负区间(排除0%) |
| | | pos_counts = [v for k, v in sorted_items if int(k) > 0] |
| | | neg_counts = [v for k, v in sorted_items if int(k) < 0] |
| | | |
| | | return sorted_bins, sorted_counts, total, pos_counts, neg_counts |
| | | |
| | | |
| | | # 执行预处理 |
| | | bins, counts, total, pos_counts, neg_counts = calculate_market_distribution_pattern_factor(data_cache.rise_and_fall_statistics_dirt) |
| | | |
| | | |
| | | # ====================== 因子计算模块 ====================== |
| | | class NativeFactorCalculator: |
| | | def __init__(self, bins, counts, total, pos_counts, neg_counts): |
| | | self.bins = bins |
| | | self.counts = counts |
| | | # todo total 会默认为0的情况 |
| | | if total != 0: |
| | | self.total = total |
| | | else: |
| | | self.total = 1 |
| | | self.pos_counts = pos_counts |
| | | self.neg_counts = neg_counts |
| | | |
| | | def calculate_bdr(self): |
| | | """原生涨跌比计算""" |
| | | sum_pos = sum(self.pos_counts) |
| | | sum_neg = sum(self.neg_counts) |
| | | return sum_pos / sum_neg if sum_neg != 0 else float('nan') |
| | | |
| | | def calculate_extreme_ratio(self): |
| | | """极端波动比例计算""" |
| | | extreme_total = 0 |
| | | for b, c in zip(self.bins, self.counts): |
| | | if b >= 5 or b <= -5: |
| | | extreme_total += c |
| | | return extreme_total / self.total |
| | | |
| | | def calculate_market_breadth(self): |
| | | """市场宽度因子计算""" |
| | | diff = sum(self.pos_counts) - sum(self.neg_counts) |
| | | return diff / self.total |
| | | |
| | | def _expand_distribution(self): |
| | | """将分箱数据展开为原始数据点(用于计算统计指标)""" |
| | | expanded = [] |
| | | for value, count in zip(self.bins, self.counts): |
| | | expanded.extend([value] * count) |
| | | return expanded |
| | | |
| | | def calculate_distribution_metrics(self): |
| | | """原生偏度与峰度计算""" |
| | | data = self._expand_distribution() |
| | | n = len(data) |
| | | if n < 2: |
| | | return (float('nan'), float('nan')) |
| | | |
| | | # 计算均值 |
| | | mean = sum(data) / n |
| | | |
| | | # 计算标准差 |
| | | variance = sum((x - mean) ** 2 for x in data) / (n - 1) |
| | | std_dev = variance ** 0.5 |
| | | |
| | | # 计算偏度 |
| | | skewness = (sum((x - mean) ** 3 for x in data) / n) / (std_dev ** 3) if std_dev != 0 else 0 |
| | | |
| | | # 计算峰度(Fisher定义,即正态分布峰度为0) |
| | | kurtosis = (sum((x - mean) ** 4 for x in data) / n) / (std_dev ** 4) - 3 if std_dev != 0 else 0 |
| | | |
| | | return (skewness, kurtosis) |
| | | |
| | | |
| | | # 实例化并计算因子 |
| | | calculator = NativeFactorCalculator(bins, counts, total, pos_counts, neg_counts) |
| | | bdr = calculator.calculate_bdr() |
| | | extreme_ratio = calculator.calculate_extreme_ratio() |
| | | market_breadth = calculator.calculate_market_breadth() |
| | | skewness, kurtosis = calculator.calculate_distribution_metrics() |
| | | |
| | | |
| | | # ====================== 策略信号生成模块 ====================== |
| | | class NativeTradingStrategy: |
| | | def __init__(self, bdr_threshold=1.5, extreme_threshold=0.15): |
| | | self.bdr_threshold = bdr_threshold |
| | | self.extreme_threshold = extreme_threshold |
| | | |
| | | def generate_signals(self, bdr, extreme_ratio, market_breadth, skewness): |
| | | """生成交易信号(与之前相同,保持策略逻辑)""" |
| | | signals = [] |
| | | |
| | | # 信号1:基于涨跌比 |
| | | if bdr > self.bdr_threshold: |
| | | signals.append("多头趋势增强 - 加仓指数ETF") |
| | | elif bdr < 1 / self.bdr_threshold: |
| | | signals.append("空头趋势增强 - 减仓或反向操作") |
| | | else: |
| | | signals.append("趋势中性 - 保持当前仓位") |
| | | |
| | | # 信号2:极端波动 |
| | | if extreme_ratio > self.extreme_threshold: |
| | | signals.append("波动风险升高 - 启动对冲或降低杠杆") |
| | | else: |
| | | signals.append("波动正常 - 维持现有风险敞口") |
| | | |
| | | # 信号3:市场宽度与偏度 |
| | | if market_breadth > 0.2 and skewness > 0.5: |
| | | signals.append("市场普涨且偏度正向 - 超配行业龙头股") |
| | | elif market_breadth < -0.2 and skewness < -0.5: |
| | | signals.append("市场普跌且偏度负向 - 低估值防御型配置") |
| | | return signals |
| | | |
| | | |
| | | # 实例化策略并生成信号 |
| | | strategy = NativeTradingStrategy() |
| | | signals = strategy.generate_signals(bdr, extreme_ratio, market_breadth, skewness) |
| | | |
| | | |
| | | # 实时设置计划持仓数量 函数 |
| | | def set_plan_position_quantity(): |
| | | while True: |
| | | try: |
| | | if data_cache.position_automatic_management_switch is True: |
| | |
| | | # 获取大盘综合强度分数 |
| | | data_cache.real_time_market_strong = kpl_api.get_market_strong() |
| | | # 获取市场情绪字典【完整】,并整理 |
| | | data_cache.real_time_market_sentiment_dirt = kpl_api.changeStatistics() |
| | | date_today = data_cache.real_time_market_sentiment_dirt.get('Day', None) |
| | | significant_drawdown = data_cache.real_time_market_sentiment_dirt.get('df_num', None) |
| | | sentiment_indicators = data_cache.real_time_market_sentiment_dirt.get('ztjs', None) |
| | | limit_up_amount = data_cache.real_time_market_sentiment_dirt.get('ztjs', None) |
| | | connecting_board_height = data_cache.real_time_market_sentiment_dirt.get('lbgd', None) |
| | | data_cache.real_time_market_sentiment_dirt = kpl_api.changeStatistics() # 市场情绪字典字典 |
| | | date_today = data_cache.real_time_market_sentiment_dirt.get('Day', None) # 情绪数据日期 |
| | | significant_drawdown = data_cache.real_time_market_sentiment_dirt.get('df_num', None) # 大幅回撤 |
| | | sentiment_indicators = data_cache.real_time_market_sentiment_dirt.get('ztjs', None) # 情绪指标 |
| | | limit_up_amount = data_cache.real_time_market_sentiment_dirt.get('ztjs', None) # 涨停家数 |
| | | connecting_board_height = data_cache.real_time_market_sentiment_dirt.get('lbgd', None) # 连板高度 |
| | | # 获取市场情绪-涨跌统计 |
| | | data_cache.rise_and_fall_statistics_dirt = kpl_api.getMarketFelling() |
| | | limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('ZT', None) |
| | | actual_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJZT', None) |
| | | ST_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('STZT', None) |
| | | limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('DT', None) |
| | | actual_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJDT', None) |
| | | ST_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('STDT', None) |
| | | data_cache.rise_and_fall_statistics_dirt = kpl_api.getMarketFelling() # 涨跌统计字典 |
| | | limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('ZT', None) # 涨停家数 |
| | | actual_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJZT', None) # 实际涨停家数 |
| | | ST_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('STZT', None) # ST涨停家数 |
| | | limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('DT', None) # 跌停家数 |
| | | actual_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJDT', None) # 实际跌停家数 |
| | | ST_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('STDT', None) # ST跌停家数 |
| | | |
| | | rise_numbers = data_cache.rise_and_fall_statistics_dirt.get('SZJS', None) |
| | | fall_numbers = data_cache.rise_and_fall_statistics_dirt.get('XDJS', None) |
| | | |
| | | # 该logger.info的的日志不再需要打印,后续将转入到GUI客户端上直接显示,该数据的打印交由下方的打印机制异步执行单独存储,以便后续可视化呈现后进行更高效的数据分析 |
| | | # logger.info(f"大盘行情情绪综合强度 [分数]==={data_cache.real_time_market_strong}分") |
| | | # 控制打印日志的时间段 |
| | | if data_cache.MORN_MARKET_CLOSING_TIME < now_time < data_cache.NOON_MARKET_OPENING_TIME: |
| | | pass |
| | | logger.info(f"午间休市时间内 不打印大盘综合强度分数") |
| | |
| | | # logger_Overall_market_strength_score.info(data_cache.real_time_market_strong) |
| | | async_log_util.info(logger_Overall_market_strength_score, f"{data_cache.real_time_market_strong}") |
| | | logger.info(f"日期:{date_today},情绪指标:{sentiment_indicators}分,大幅回撤:{significant_drawdown},涨停家数:{limit_up_amount},连板高度:{connecting_board_height}") |
| | | logger.info(f"上涨家数:{rise_numbers}分,下跌家数:{fall_numbers},实际涨停家数:{actual_limit_up_numbers},实际跌停家数:{actual_limit_down_numbers}") |
| | | logger.info(f"上涨家数:{rise_numbers},下跌家数:{fall_numbers},实际涨停家数:{actual_limit_up_numbers},实际跌停家数:{actual_limit_down_numbers}") |
| | | logger.info(f"涨跌统计字典{data_cache.rise_and_fall_statistics_dirt}") |
| | | |
| | | usefulMoney = data_cache.account_finance_dict[0].get('usefulMoney', 0) |
| | |
| | | |
| | | except Exception as error: |
| | | logger_debug.exception(error) |
| | | logger.error(f"获取实时大盘行情情绪综合强度[分数] 函数报错: {error}") |
| | | print(f"获取实时大盘行情情绪综合强度[分数] 函数报错: {error}") |
| | | logger.error(f"实时设置计划持仓数量 函数报错: {error}") |
| | | print(f"实时设置计划持仓数量 函数报错: {error}") |
| | | finally: |
| | | time.sleep(3) |
| | | |
| | |
| | | if __name__ == '__main__': |
| | | # market_strong = kpl_api.get_market_strong() |
| | | # print(f"{market_strong}") |
| | | get_real_time_market_strong() |
| | | # get_real_time_market_strong() |
| | | # all_index_K_line_dict = get_index_K_line() |
| | | |
| | | # all_index_k_line_dict_write() |
| | | # print(f"指数K线{data_cache.all_index_k_line_property_dict}") |
| | | # all_index_k_line_dict_write() |
| | | # all_index_k_line_dict_write() |
| | | # ====================== 结果输出 ====================== |
| | | print("========== 因子数值(原生计算) ==========") |
| | | print(f"涨跌比(BDR): {bdr:.2f}") |
| | | print(f"极端波动比例: {extreme_ratio:.2%}") |
| | | print(f"市场宽度因子: {market_breadth:.2f}") |
| | | print(f"分布偏度: {skewness:.2f}, 峰度: {kurtosis:.2f}") |
| | | |
| | | print("\n========== 策略信号 ==========") |
| | | for i, signal in enumerate(signals, 1): |
| | | print(f"信号{i}: {signal}") |