""" 该模块下进行指数行情策略的编写 """ import copy # import decimal import datetime import json import time import constant from log_module import async_log_util from log_module.log import logger_common, logger_Overall_market_strength_score, logger_debug # import time # 引入掘金API # from gm.api import * from strategy import basic_methods, kpl_api from strategy import data_cache from utils import tool, juejin_api # 获取logger实例 logger = logger_common # ====================== # 模拟数据生成(完整版) # ====================== # data = { # 'total_stocks': 5000, # 全市场股票总数 # 'limit_up': 120, # 涨停股数量 # 'limit_down': 5, # 跌停股数量 # 'advance_count': 2800, # 上涨家数 # 'decline_count': 1500, # 下跌家数 # 'high_retreat_count': 30, # 当日回撤超10%的个股数量 # 'turnover': 1.2e12, # 当日成交额(元) # 'turnover_prev_day': 1.0e12, # 前一日成交额 # 'northbound_inflow': 8e9, # 北向资金净流入(元) # 'max_northbound_30d': 15e9, # 近30日最大北向净流入 # 'margin_buy': 8e10, # 融资买入额 # 'sector_gains': [3.5, 2.8, 1.9, -0.5], # 前4大板块涨幅(%) # 'max_continuous_boards': 7, # 最高连板数(如7连板) # 'continuous_boards': [7, 5, 3, 2], # 连板梯队(各层级连板数量) # } # # # # ====================== # # 因子计算与标准化处理 # # ====================== # def calculate_factors(data): # factors = {} # # # === 新增核心因子 === # # 1. 涨跌家数比(0-100分) # if data['decline_count'] == 0: # factors['advance_ratio'] = 100.0 # else: # factors['advance_ratio'] = min( # (data['advance_count'] / data['decline_count']) * 50, # 比值1:2对应100分 # 100.0 # ) # # # 2. 涨停强度(标准化到0-100) # factors['limit_strength'] = ( # (data['limit_up'] - data['limit_down']) / data['total_stocks'] * 1000 # 放大差异 # ) # # # 3. 大幅回撤比例(0-100分) # factors['high_retreat_ratio'] = ( # data['high_retreat_count'] / data['total_stocks'] * 1000 # 千分比更敏感 # ) # # # 4. 连板高度与梯队(根据历史极值归一化) # factors['max_continuous'] = ( # data['max_continuous_boards'] / 10 * 100 # 假设历史最高10连板 # ) # factors['board_ladder'] = ( # len(data['continuous_boards']) / 5 * 100 # 假设最多5个连板层级 # ) # # # === 原有因子优化 === # # 5. 量能变化(成交额增长率) # turnover_growth = ( # (data['turnover'] - data['turnover_prev_day']) / # data['turnover_prev_day'] * 100 # ) # factors['liquidity'] = turnover_growth # # # 6. 板块强度(前3板块平均涨幅) # top_sectors = sorted(data['sector_gains'], reverse=True)[:3] # sector_avg = sum(top_sectors) / len(top_sectors) # # factors['sector_strength'] = ((sector_avg - (-5.0)) / (10.0 - (-5.0)) * 100) # 历史范围-5%~10% # # 7. 北向资金强度 # factors['northbound_ratio'] = (data['northbound_inflow'] / data['max_northbound_30d'] * 100) # # # 8. 融资买入占比 # factors['margin_ratio'] = (data['margin_buy'] / data['turnover'] * 100) # # # # 强制所有因子在0-100范围内 # for key in factors: # factors[key] = max(0.0, min(factors[key], 100.0)) # # return factors # # # # ====================== # # 权重分配(总权重1.0) # # ====================== # def get_weights(): # return { # # 新增因子权重 # 'advance_ratio': 0.15, # 涨跌家数比 # 'limit_strength': 0.2, # 涨停强度 # 'high_retreat_ratio': 0.1, # 大幅回撤 # 'max_continuous': 0.1, # 连板高度 # 'board_ladder': 0.05, # 连板梯队 # # 原有因子权重 # 'liquidity': 0.15, # 量能变化 # 'sector_strength': 0.1, # 板块强度 # 'northbound_ratio': 0.1, # 北向资金 # 'margin_ratio': 0.05 # 融资买入 # } # # # # ====================== # # 综合强度分数计算(含动态修正) # # ====================== # def composite_strength_score(data): # factors = calculate_factors(data) # weights = get_weights() # # # 基础加权得分 # score = sum(factors[key] * weights[key] for key in factors) # # # === 动态修正规则 === # # 规则1:涨停数量超过100家时额外加分 # if data['limit_up'] > 100: # score += min((data['limit_up'] - 100) * 0.2, 10) # 最多加10分 # # # 规则2:连板梯队断裂时扣分(如最高板与次高板差距≥3) # continuous_boards = sorted(data['continuous_boards'], reverse=True) # if len(continuous_boards) >= 2 and (continuous_boards[0] - continuous_boards[1] >= 3): # score -= 15 # 梯队断裂惩罚 # # # 规则3:北向资金与涨跌家数背离时调整 # if (factors['northbound_ratio'] > 50) and (factors['advance_ratio'] < 40): # score *= 0.9 # 权重股拉升导致的虚假繁荣 # # return max(0.0, min(score, 100.0)) # ====================== # 执行计算与结果输出 # ====================== # final_score = composite_strength_score(data) # print("=== 综合强度分数 ===") # print(f"当前得分: {final_score:.1f}/100") # # # 输出因子贡献度分析 # factors = calculate_factors(data) # weights = get_weights() # print("\n=== 因子贡献度明细 ===") # for key in factors: # print(f"{key:20s}: {factors[key]:5.1f} × {weights[key]:.0%} = {factors[key] * weights[key]:5.1f}") ''' 代码输出示例: === 综合强度分数 === 当前得分: 78.4/100 === 因子贡献度明细 === advance_ratio : 93.3 × 15% = 14.0 limit_strength : 23.0 × 20% = 4.6 high_retreat_ratio : 6.0 × 10% = 0.6 max_continuous : 70.0 × 10% = 7.0 board_ladder : 80.0 × 5% = 4.0 liquidity : 20.0 × 15% = 3.0 sector_strength : 60.0 × 10% = 6.0 northbound_ratio : 53.3 × 10% = 5.3 margin_ratio : 10.0 × 5% = 0.5 ''' # 指数行情策略函数 def instant_trend_strategy(current_info): len_current_info = len(current_info) # 初始化 index_K_line 字典,后期加上日期属性,每日分行写入本地JSONL文件中 index_K_line = { 'Shanghai': {}, 'Shenzhen': {}, 'TSXV': {} } if current_info is not None and len_current_info > 0: # print(f"current_info------------{current_info}") # 上证指数数据 Shanghai_index_data = current_info.get('000001') Shanghai_index = Shanghai_index_data[0] # 上证指数 Shanghai_index_volume = round(Shanghai_index_data[1]/100000000, 2) # 上证指数 当日当时成交量 Shanghai_index_turnover = round(Shanghai_index_data[2]/100000000, 2) # 上证指数 当日当时成交额度 Shanghai_Yesterday_closing_index = round(Shanghai_index_data[3], 2) # 上证指数 昨日收盘指数 logger.info(f"上证 指数:{Shanghai_index} 昨日收盘指数:{Shanghai_Yesterday_closing_index} 成交量:{Shanghai_index_volume}亿 手 成交额:{Shanghai_index_turnover}亿 元") # 深证指数数据 Shenzhen_index_data = current_info.get('399001') Shenzhen_index = Shenzhen_index_data[0] # 深证指数 Shenzhen_index_volume = round(Shenzhen_index_data[1]/100000000, 2) # 深证指数 当日当时成交量 Shenzhen_index_turnover = round(Shenzhen_index_data[2]/100000000, 2) # 深证指数 当日当时成交额度 Shenzhen_Yesterday_closing_index = round(Shenzhen_index_data[3], 2) # 深证指数 昨日收盘指数 logger.info(f"深证 指数:{Shenzhen_index} 昨日收盘指数:{Shenzhen_Yesterday_closing_index} 成交量:{Shenzhen_index_volume}亿 手 成交额:{Shenzhen_index_turnover}亿 元") # 创业板指数数据 TSXV_index_data = current_info.get('399006') TSXV_index = TSXV_index_data[0] # 创业板指 TSXV_index_volume = round(TSXV_index_data[1]/100000000, 2) # 创业板指 当日当时成交量 TSXV_index_turnover = round(TSXV_index_data[2]/100000000, 2) # 创业板指 当日当时成交额度 TSXV_Yesterday_closing_index = round(TSXV_index_data[3], 2) # 深证指数 昨日收盘指数 logger.info(f"创业板 指数:{TSXV_index} 昨日收盘指数:{TSXV_Yesterday_closing_index} 成交量:{TSXV_index_volume}亿 手 成交额:{TSXV_index_turnover}亿 元") # 调用涨幅公式计算对应的股票tick瞬时涨幅 data_cache.Shanghai_tick_growth = basic_methods.tick_growth('000001', Shanghai_index) data_cache.Shanghai_today_growth = basic_methods.intraday_growth(Shanghai_index, Shanghai_Yesterday_closing_index) logger.info(f"上证指数 瞬时涨幅 ---- {round(data_cache.Shanghai_tick_growth, 4)}%") logger.info(f"上证指数 当日涨幅 ==== {round(data_cache.Shanghai_today_growth, 4)}%") # 调用涨幅公式计算对应的股票tick瞬时涨幅 data_cache.Shenzhen_tick_growth = basic_methods.tick_growth('399001', Shanghai_index) data_cache.Shenzhen_today_growth = basic_methods.intraday_growth(Shenzhen_index, Shenzhen_Yesterday_closing_index) logger.info(f"深证指数 瞬时涨幅 ---- {round(data_cache.Shenzhen_tick_growth, 4)}%") logger.info(f"深证指数 当日涨幅 ==== {round(data_cache.Shenzhen_today_growth, 4)}%") # 调用涨幅公式计算对应的股票tick瞬时涨幅 data_cache.TSXV_tick_growth = basic_methods.tick_growth('399006', Shanghai_index) data_cache.TSXV_today_growth = basic_methods.intraday_growth(TSXV_index, TSXV_Yesterday_closing_index) logger.info(f"创业板指 瞬时涨幅 ---- {round(data_cache.TSXV_tick_growth, 4)}%") logger.info(f"创业板指 当日涨幅 ==== {round(data_cache.TSXV_today_growth, 4)}%") # 在集合竞价时更新一下 各个指数的开盘涨幅 now_time = tool.get_now_time_str() # 9:25:06 < now_time < 9:25:12 记录开盘指数 及 开盘涨幅 if data_cache.LATER_OPEN_BIDDING_TIME < now_time < data_cache.AFTER_OPEN_BIDDING_TIME: index_K_line['Shanghai']['Shanghai_open_index'] = Shanghai_index index_K_line['Shenzhen']['Shenzhen_open_index'] = Shenzhen_index index_K_line['TSXV']['TSXV_open_index'] = TSXV_index data_cache.Shanghai_open_growth = data_cache.Shanghai_today_growth data_cache.Shenzhen_open_growth = data_cache.Shenzhen_today_growth data_cache.TSXV_open_growth = data_cache.TSXV_today_growth # 15:00:00 < now_time < 15:01:00 记录收盘指数 if data_cache.CLOSING_TIME < now_time < data_cache.AFTER_CLOSING_TIME: index_K_line['Shanghai']['Shanghai_close_index'] = Shanghai_index index_K_line['Shenzhen']['Shenzhen_close_index'] = Shenzhen_index index_K_line['TSXV']['TSXV_close_index'] = TSXV_index logger.info(f"上证指数 开盘涨幅 ==== {round(data_cache.Shanghai_open_growth, 4)}%") logger.info(f"深证指数 开盘涨幅 ==== {round(data_cache.Shenzhen_open_growth, 4)}%") logger.info(f"创业板指 开盘涨幅 ==== {round(data_cache.TSXV_open_growth, 4)}%") # 大盘指数趋势预期函数(用于对大盘开盘后短期内的趋势预测) def index_trend_expectation(): index_trend_expectation_score = 100 # 判断三大指数开盘涨幅 预期 if data_cache.Shanghai_open_growth > 0 and data_cache.Shenzhen_open_growth > 0 and data_cache.TSXV_open_growth > 0: index_composite_open_increase = '三大指数:高开' index_trend_expectation_score += 5 if data_cache.Shanghai_open_growth >= 1 and data_cache.Shenzhen_open_growth >= 1 and data_cache.TSXV_open_growth >= 1: index_composite_open_increase = '三大指数:高开1% 以上' index_trend_expectation_score += 5 elif data_cache.Shanghai_open_growth < 0 and data_cache.Shenzhen_open_growth < 0 and data_cache.TSXV_open_growth < 0: index_composite_open_increase = '三大指数:低开' index_trend_expectation_score -= 5 if data_cache.Shanghai_open_growth <= 1 and data_cache.Shenzhen_open_growth <= 1 and data_cache.TSXV_open_growth <= 1: index_composite_open_increase = '三大指数:低开1% 以下' index_trend_expectation_score -= 5 else: index_composite_open_increase = '三大指数:涨跌不一' logger.info(f"开盘指数开盘涨幅播报:{index_composite_open_increase}") # 判断三大指数开盘后当日涨幅 预期 if data_cache.Shanghai_today_growth > data_cache.Shanghai_open_growth and data_cache.Shenzhen_today_growth > data_cache.Shenzhen_open_growth and data_cache.TSXV_today_growth > data_cache.TSXV_open_growth: index_composite_today_increase = '三大指数:短期高走' index_trend_expectation_score += 10 elif data_cache.Shanghai_today_growth < data_cache.Shanghai_open_growth and data_cache.Shenzhen_today_growth < data_cache.Shenzhen_open_growth and data_cache.TSXV_today_growth < data_cache.TSXV_open_growth: index_composite_today_increase = '三大指数:短期低走' index_trend_expectation_score -= 10 else: index_composite_today_increase = '三大指数:短期走势不一' logger.info(f"开盘指数开盘短期走势播报:{index_composite_today_increase}") # 判断三大指数开盘涨幅 预期 if data_cache.Shanghai_tick_growth > 0 and data_cache.Shenzhen_tick_growth > 0 and data_cache.TSXV_tick_growth > 0: index_composite_tick_increase = '三大指数:瞬时高走' index_trend_expectation_score += 2 if data_cache.Shanghai_tick_growth >= 0.01 and data_cache.Shenzhen_tick_growth >= 0.01 and data_cache.TSXV_tick_growth >= 0.01: index_composite_tick_increase = '三大指数:高走0.01% 以上' index_trend_expectation_score += 4 elif data_cache.Shanghai_tick_growth < 0 and data_cache.Shenzhen_tick_growth < 0 and data_cache.TSXV_tick_growth < 0: index_composite_tick_increase = '三大指数:瞬时低走' index_trend_expectation_score -= 2 if data_cache.Shanghai_tick_growth <= 0.01 and data_cache.Shenzhen_tick_growth <= 0.01 and data_cache.TSXV_tick_growth <= 0.01: index_composite_tick_increase = '三大指数:低走0.01% 以下' index_trend_expectation_score -= 4 else: index_composite_tick_increase = '三大指数:涨跌不一' logger.info(f"开盘指数开盘瞬时走势播报:{index_composite_tick_increase}") if index_trend_expectation_score > 100: index_trend_expectation_score = 100 return index_trend_expectation_score # 获取指数K线并写入本地的函数 def get_index_K_line(): all_index_K_line_dict = {} for i in data_cache.INDEX_SYMBOL_LIST: index_K_line = juejin_api.JueJinApi.history_n(i, "1d", 90, 1, "open,close,high,low,amount,volume,bob") # print(f"index_K_line=={index_K_line}") all_index_K_line_dict[i] = index_K_line # print(f"all_index_K_line=={all_index_K_line_dict}") return all_index_K_line_dict # 给指数K线加入属性值 def get_property_mark(it_K_line, symbol): historical_high_price = 0.01 # 初始化历史最高价 historical_low_price = 0.01 # 初始化历史最低价 historical_low_price_index = 0 # 初始化历史最高价序号 historical_high_price_index = 0 # 初始化历史最低价序号 historical_average_price = 0.01 # 初始化历史均价 large_amplitude_day = [] # 初始化大幅震荡日期 current_count = 0 # 当前连续满足条件的天数 start = None results = [] dates_list = [] for i in range(0, len(it_K_line) - 1): previous_close = it_K_line[i + 1]['close'] # 昨日收盘价 previous_high = it_K_line[i + 1]['high'] # 昨日最高价 current_open = round(it_K_line[i]['open'], 2) # 当日开盘价 current_close = round(it_K_line[i]['close'], 2) # 当日收盘价 current_high = round(it_K_line[i]['high'], 2) # 当日最高价 current_low = round(it_K_line[i]['low'], 2) # 当日最低价 limit_up_price = basic_methods.limit_up_price(previous_close) # 计算出的当天涨停价 limit_up_price = float(limit_up_price) # 将涨停价转化为浮点 limit_down_price = basic_methods.limit_down_price(previous_close) # 计算出的当天跌停价 limit_down_price = float(limit_down_price) # 将跌停价转化为浮点 current_today_volume = it_K_line[i]['volume'] # 当日总交易量 current_yesterday_volume = it_K_line[i + 1]['volume'] # 昨日总交易量 # current_today_amount = it_K_line[i]['amount'] #当日总交易额 current_today_growth = basic_methods.intraday_growth(current_close, previous_close) # 计算出当日涨幅 it_K_line[i]['today_growth'] = current_today_growth # 将当日涨幅数据添加到字典中对应的日期中 current_today_amplitude = round((current_high - current_low) / previous_close, 2) # 计算出当日振幅 it_K_line[i]['today_amplitude'] = current_today_amplitude # 将当日振幅数据添加到字典中对应的日期中 historical_high_price = round(max(i['high'] for i in it_K_line), 2) # 计算出历史最高价 historical_high_price_index = max(range(len(it_K_line)), key=lambda ii: it_K_line[ii]['high']) # 找到最高价的索引 historical_low_price = round(min(i['low'] for i in it_K_line), 2) # 计算出历史最低价 historical_low_price_index = min(range(len(it_K_line)), key=lambda ii: it_K_line[ii]['low']) # 找到最低价的索引 historical_average_price = round((historical_high_price + historical_low_price) / 2, 2) # 计算出历史最均价 # 把最高价、最高价序号、最低价、最低价序号,添加到K线的昨日字典中 new_properties = { 'historical_high_price': historical_high_price, 'historical_high_price_index': historical_high_price_index, 'historical_low_price': historical_low_price, 'historical_low_price_index': historical_low_price_index, 'historical_average_price': historical_average_price } it_K_line[0].update(new_properties) # 计算大振幅的日期并添加到大振幅日期列表中 if not (-1 < it_K_line[i]['today_growth'] < 1): large_amplitude_day.append(it_K_line[i]['bob']) # 判断K线图形风险位置 及 高低时间点 if historical_low_price_index > historical_high_price_index: # 序列小于1是历史最高价 历史长度90,其实还可以根据更长来判断更显程度,历史长度很短的一般认为具有更高风险,而更长的之前有可能有更高的历史高位还比较远 # 最低价在前 it_K_line[0]['risk_position'] = 'low_price_ahead' ''' 以下部分条件分支由于都是对 it_K_line[0]['risk_position'] 数据进行修改, 条件判断逻辑有部分重叠,所以应按照逻辑的覆盖面进行先后判断,避免先把大部分过滤掉了。 所以当前条件分支语句的顺序不要轻易更换。 # [(historical_high_price / round(it_K_line[0]['close'], 2) < 1.05)的预设值取值范围选择其一: # {"均价以上近前高的开盘价":1.05,"接近前高 的跌停价":1.11,"接近前高 的一个炸板加一个跌停的收盘价":1.17,"均价与最高价之间的涨停价":1.21,"均价与最高价之间的价":1.33} ''' # 最高位据昨日较远 且 昨收价距历史均价差距较小 且 整体涨跌幅度较大 且 最高位 与 最低位为三倍涨幅的妖股【当前二度试探风险】 if (2 < historical_high_price_index < 15 and len(it_K_line) >= 90) and ( (historical_average_price / round(it_K_line[0]['close'], 2)) < 2) and ( historical_high_price > historical_low_price * 3): it_K_line[0]['risk_position'] = 'recent_monster_second_break_risk' # 最高位据昨日较远 且 昨收价距历史均价差距较小 且 整体涨跌幅度较大 【当前二度突破风险】 if (2 < historical_high_price_index < 15 and len(it_K_line) >= 90) and ( (historical_average_price / round(it_K_line[0]['close'], 2)) < 1.17) and ( historical_high_price > historical_low_price * 1.8): it_K_line[0]['risk_position'] = 'recent_second_break_risk' # 最高位据昨日较远 且 昨收价距历史最高价差距较小(小于5%,涨幅5%是左右是最容易把低吸骗进去的日内涨幅) 且 整体涨跌幅度较大 【当前二度突破近前高风险】 # 【双成药业 2024-11-08 这样的二次近前高,实际数值为1.4646...,为安全起见还是设置为1.5】 if (2 < historical_high_price_index < 15 and len(it_K_line) >= 90) and ( historical_high_price / round(it_K_line[0]['close'], 2) < 1.5) and ( historical_high_price > historical_low_price * 1.8): it_K_line[0]['risk_position'] = 'recent_second_break_near_high_position_risk' # 昨日就是最高位 且 整体涨跌幅度较大 【当前高位风险】 if (historical_high_price_index < 2 and len(it_K_line) >= 90) and ( historical_high_price > historical_low_price * 1.8): it_K_line[0]['risk_position'] = 'recent_high_position_risk' else: # 最高价在前 it_K_line[0]['risk_position'] = 'high_price_ahead' # 最低位据昨日较远 且 昨收价距历史均价差距较小 且 整体涨跌幅度较大 【当前二度崩溃风险】 if (2 < historical_low_price_index < 15 and len(it_K_line) >= 90) and ( (historical_average_price / round(it_K_line[0]['close'], 2)) > 1.1): it_K_line[0]['risk_position'] = 'recent_second_crash_risk' # 昨日就是最低位 且 整体涨跌幅度较大 【当前低位风险】 if (historical_low_price_index < 2 and len(it_K_line) >= 90) and ( historical_high_price > historical_low_price * 1.5): it_K_line[0]['risk_position'] = 'recent_low_position_risk' # 判断K线图形中连续低涨幅天数及具体序列号并把共识价格均价写入文件【阶段性价格共识位置】 if 0 < it_K_line[i]['today_amplitude'] < 0.025 and i < 60 and ( historical_high_price > historical_low_price * 1.5): if current_count == 0: start = i # 如果当前涨幅小于阈值,增加连续天数计数器 current_count += 1 # 如果循环复合条件的连续天数达到5天,记录起始索引 if current_count > 5: # 检查 results 是否为空或当前序列不与最后一个序列重叠 if not results or results[-1][1] < start: if 0.98 < it_K_line[i]['high'] / it_K_line[start]['low'] < 1.02: # 没有重叠,直接添加【其中start为循环到起始序号,i为循环到的结束序号。在实际的日期中要反过来】 results.append((start, i)) # 算出价格共识区间的均价 average_consensus_price = round((it_K_line[i]['high'] + it_K_line[start]['low']) / 2, 2) dates_list.append(( it_K_line[start]['bob'].strftime('%Y-%m-%d %H:%M:%S'), it_K_line[i]['bob'].strftime('%Y-%m-%d %H:%M:%S'), average_consensus_price)) # if len(dates_list) > 1: # 如果只需要检测包含多段价格共识区 it_K_line[0]['rise_and_fall_is_small_start_and_end_date'] = dates_list else: # 重置计数器 current_count = 0 start = None # 确保不会出现除以零的报错 if current_yesterday_volume != 0: if str(it_K_line[i]['bob']).find('2025-03-21')>=0: print(f"") print(f"{it_K_line[i]['bob']} 的 current_today_volume=={current_today_volume},, current_yesterday_volume=={current_yesterday_volume}") if round(current_today_volume / current_yesterday_volume, 2) > 1.1: # print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【放量】") if current_today_growth > 0: it_K_line[i]['today_volume_shape'] = 'increases_up' # print(f"i=={i} {it_K_line[i]['bob']} 【放量上涨】") elif current_today_growth < 0: it_K_line[i]['today_volume_shape'] = 'increases_down' # print(f"i=={i} {it_K_line[i]['bob']} 【放量下跌】") else: it_K_line[i]['today_volume_shape'] = 'increases_balance' # print(f"i=={i} {it_K_line[i]['bob']} 【放量平收】") elif round(current_today_volume / current_yesterday_volume, 2) < 0.9: # print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【缩量】") if current_today_growth > 0: it_K_line[i]['today_volume_shape'] = 'decreases_up' # print(f"i=={i} {it_K_line[i]['bob']} 【缩量上涨】") elif current_today_growth < 0: it_K_line[i]['today_volume_shape'] = 'decreases_down' # print(f"i=={i} {it_K_line[i]['bob']} 【缩量下跌】") else: it_K_line[i]['today_volume_shape'] = 'decreases_balance' # print(f"i=={i} {it_K_line[i]['bob']} 【缩量平收】") else: # print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【平量】") if current_today_growth > 0: it_K_line[i]['today_volume_shape'] = 'remained_up' # print(f"i=={i} {it_K_line[i]['bob']} 【平量上涨】") elif current_today_growth < 0: it_K_line[i]['today_volume_shape'] = 'remained_down' # print(f"i=={i} {it_K_line[i]['bob']} 【平量下跌】") else: it_K_line[i]['today_volume_shape'] = 'remained_balance' # print(f"i=={i} {it_K_line[i]['bob']} 【平量平收】") else: logger.info(f"{symbol} 的 昨日成交量 为 0,报错!!") if current_open - previous_close > 0: # print(f"i=={i} {it_K_line[i]['bob']} 成交总量:{today_volume},,,成交总金额:{today_amount}") # difference = current_close - previous_close if current_close - current_open > 0: it_K_line[i]['attribute'] = 'up_up' # print(f"i=={i} {it_K_line[i]['bob']} 高开高走 【昨收价:{previous_close} 收盘价:{current_close}】") else: it_K_line[i]['attribute'] = 'up_down' # print(f"i=={i} {it_K_line[i]['bob']} 高开低走 【昨收价:{previous_close} 收盘价:{current_close}】") else: if current_close - current_open > 0: it_K_line[i]['attribute'] = 'down_up' # print(f"i=={i} {it_K_line[i]['bob']} 低开高走 【昨收价:{previous_close} 收盘价:{current_close}】") else: it_K_line[i]['attribute'] = 'down_down' # print(f"i=={i} {it_K_line[i]['bob']} 低开低走 【昨收价:{previous_close} 收盘价:{current_close}】") # 将高涨跌幅日期打印出来 # it_K_line[0]['large_amplitude_day'] = large_amplitude_day # 判断K线图形长期振幅 及 当前走势 【根据大振幅天数 反推算出 长期低幅度震荡持平、上涨、下跌的个股写入字典】 if len(large_amplitude_day) < 1: # 长期低幅度震荡 it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation' # it_K_line[0]['large_amplitude_day_len'] = len(large_amplitude_day) if (historical_high_price / historical_low_price) < 1.15: if 0.99 < (historical_average_price / round(it_K_line[0]['close'], 2)) < 1.09: # 长期低幅度震荡持平 it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation_remains_stable' else: if (historical_low_price_index > historical_high_price_index) and ( historical_high_price_index < 2 and len(it_K_line) > 88): # 长期低幅度震荡上涨 it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation_and_rise' elif (historical_low_price_index < historical_high_price_index) and ( historical_low_price_index < 2 and len(it_K_line) > 88): # 长期低幅度震荡下跌 it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation_and_decline' # 将加入属性值的指数K线下入到本地文件中 def all_index_k_line_dict_write(): all_index_K_line_dict = get_index_K_line() # 初始化所有个股的指标K线列表 all_index_k_line_property_dict = {} for i in data_cache.INDEX_SYMBOL_LIST: # print(f"i==========={i}") i_k_line = all_index_K_line_dict[i] # 获取i的K线 i_k_line_copy = copy.deepcopy(i_k_line) # 深拷贝i的K线 it_K_line_reversed = list(reversed(i_k_line_copy)) # 开盘啦获取的数据需要反转i的K线 if not it_K_line_reversed: continue get_property_mark(it_K_line_reversed, i) # 给标的的K线更新指标属性 把股票代码同时传给要调用的函数 symbol_K_line_property_dict = {i: it_K_line_reversed} # 添加 更新极限指标属性的K线 字典 # print(f"symbol_K_line_property_dict===={symbol_K_line_property_dict}") all_index_k_line_property_dict.update(symbol_K_line_property_dict) # print(f"all_index_k_line_property_dict===={all_index_k_line_property_dict}") # 构造时间格式datetime转化为字符串,以便将K线属性指标转化为json格式写入本地文件 def convert_datetime(obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') # 转换为字符串 elif isinstance(obj, dict): return {k: convert_datetime(v) for k, v in obj.items()} # 递归处理字典 elif isinstance(obj, list): return [convert_datetime(element) for element in obj] # 递归处理列表 # 可以添加其他类型的处理逻辑 else: # 对于未知类型,你可以选择保留原样、跳过或引发异常 # 这里我们选择保留原样 return obj try: json_data = json.dumps(convert_datetime(all_index_k_line_property_dict), ensure_ascii=False, indent=4) # 将转换后的JSON字符串写入文件 with open(constant.INDEX_K_BARS_PATH, 'w', encoding='utf-8') as f: f.write(json_data) # print(f"json_data:{json_data}") except Exception as error: print(f"An error occurred while converting the data to JSON: {error}") logger.info(f"加属性的指数k线写完了!{tool.get_now_time_str()}") # 获取实时大盘行情情绪综合强度 [分数] 函数 并 计算当日计划持仓数量 def get_real_time_market_strong(): while True: try: if data_cache.position_automatic_management_switch is True: now_time = tool.get_now_time_str() if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME: # 获取大盘综合强度分数 data_cache.real_time_market_strong = kpl_api.get_market_strong() # 获取市场情绪字典【完整】,并整理 data_cache.real_time_market_sentiment_dirt = kpl_api.changeStatistics() date_today = data_cache.real_time_market_sentiment_dirt.get('Day', None) significant_drawdown = data_cache.real_time_market_sentiment_dirt.get('df_num', None) sentiment_indicators = data_cache.real_time_market_sentiment_dirt.get('ztjs', None) limit_up_amount = data_cache.real_time_market_sentiment_dirt.get('ztjs', None) connecting_board_height = data_cache.real_time_market_sentiment_dirt.get('lbgd', None) # 获取市场情绪-涨跌统计 data_cache.rise_and_fall_statistics_dirt = kpl_api.getMarketFelling() limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('ZT', None) actual_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJZT', None) ST_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('STZT', None) limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('DT', None) actual_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJDT', None) ST_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('STDT', None) rise_numbers = data_cache.rise_and_fall_statistics_dirt.get('SZJS', None) fall_numbers = data_cache.rise_and_fall_statistics_dirt.get('XDJS', None) # 该logger.info的的日志不再需要打印,后续将转入到GUI客户端上直接显示,该数据的打印交由下方的打印机制异步执行单独存储,以便后续可视化呈现后进行更高效的数据分析 # logger.info(f"大盘行情情绪综合强度 [分数]==={data_cache.real_time_market_strong}分") if data_cache.MORN_MARKET_CLOSING_TIME < now_time < data_cache.NOON_MARKET_OPENING_TIME: pass logger.info(f"午间休市时间内 不打印大盘综合强度分数") else: # 大盘综合强度分数 的 异步日志 # logger_Overall_market_strength_score.info(data_cache.real_time_market_strong) async_log_util.info(logger_Overall_market_strength_score, f"{data_cache.real_time_market_strong}") logger.info(f"日期:{date_today},情绪指标:{sentiment_indicators}分,大幅回撤:{significant_drawdown},涨停家数:{limit_up_amount},连板高度:{connecting_board_height}") logger.info(f"上涨家数:{rise_numbers}分,下跌家数:{fall_numbers},实际涨停家数:{actual_limit_up_numbers},实际跌停家数:{actual_limit_down_numbers}") logger.info(f"涨跌统计字典{data_cache.rise_and_fall_statistics_dirt}") usefulMoney = data_cache.account_finance_dict[0].get('usefulMoney', 0) logger.info(f"账户可用资金==={usefulMoney}元") # 低迷情绪比例 low_emotion_mood_ratio = 1 # 33分是个两级分化阶梯不好,目前不好拿捏,暂时不用 # if data_cache.real_time_market_strong <= 33: if data_cache.real_time_market_strong < 30: # 如果大盘综合强度分数小于30,将低迷情绪分数比例设置为0.01,可用资金缩小一百倍 low_emotion_mood_ratio = 0.01 if data_cache.real_time_market_strong <= 10: low_emotion_mood_ratio = 0 logger.info(f"极端低迷情绪比例===={low_emotion_mood_ratio * 100}%") data_cache.index_trend_expectation_score = index_trend_expectation() logger.info(f"大盘指数情绪预期分数==={data_cache.index_trend_expectation_score}分") # print(f"大盘指数情绪预期分数==={data_cache.index_trend_expectation_score}分") # # 目前大盘指数情绪预期分数 尚不科学 强制设置为初始0值 # index_trend_expectation_score = 0 # 获取计算今天新增的持仓数量 addition_position_number = len(data_cache.addition_position_symbols_set) # 定义一个今日的剩余新增持仓数量的变量 Unfinished_opening_plan_number = 3 - addition_position_number logger.info(f"今日的剩余新增持仓数量==={Unfinished_opening_plan_number}") if Unfinished_opening_plan_number != 0: # 如果GUI看盘上没有手动设置具体的下单金额,就按照评分策略的金额下单,否则就按照GUI设置的金额下单。 if data_cache.BUY_MONEY_PER_CODE < 0: # 根据账户可用金额 计算今日计划下单金额 # 账户可用金额 默认乘以0.9,永远留一点钱,一方面也冗余一些计算误差 # ((大盘综合强度分数 + 大盘指数情绪预期分数) * 0.01) * (账户可用金额 * 0.9 * 极端低迷情绪比例 / 今日最大新增持仓票数) # data_cache.today_planned_order_amount = ((data_cache.real_time_market_strong + data_cache.index_trend_expectation_score) * 0.01) * ( # usefulMoney * 0.9 * low_emotion_mood_ratio / Unfinished_opening_plan_number) data_cache.today_planned_order_amount = (usefulMoney * 0.95 * low_emotion_mood_ratio / Unfinished_opening_plan_number) logger.info(f"采用开仓策略计算方式=》今日计划下单金额:{data_cache.today_planned_order_amount},") else: data_cache.today_planned_order_amount = data_cache.BUY_MONEY_PER_CODE logger.info(f"采用GUI设置方式=》今日计划下单金额:{data_cache.today_planned_order_amount}") except Exception as error: logger_debug.exception(error) logger.error(f"获取实时大盘行情情绪综合强度[分数] 函数报错: {error}") print(f"获取实时大盘行情情绪综合强度[分数] 函数报错: {error}") finally: time.sleep(3) if __name__ == '__main__': # market_strong = kpl_api.get_market_strong() # print(f"{market_strong}") get_real_time_market_strong() # all_index_K_line_dict = get_index_K_line() # all_index_k_line_dict_write() # print(f"指数K线{data_cache.all_index_k_line_property_dict}") # all_index_k_line_dict_write()