""" 该模块下进行指数行情策略的编写 """ import copy # import decimal import datetime import json import time import constant from log_module import async_log_util from log_module.log import logger_common, logger_Overall_market_strength_score, logger_debug from strategy import basic_methods, kpl_api from strategy import data_cache from utils import tool, juejin_api # 获取logger实例 logger = logger_common # 指数行情策略函数 def instant_trend_strategy(current_info): len_current_info = len(current_info) # 初始化 index_K_line 字典,后期加上日期属性,每日分行写入本地JSONL文件中 index_K_line = { 'Shanghai': {}, 'Shenzhen': {}, 'TSXV': {} } if current_info is not None and len_current_info > 0: now_time_str = tool.get_now_time_str() if data_cache.L1_DATA_START_TIME < now_time_str < data_cache.CLOSING_TIME: # print(f"current_info------------{current_info}") # 上证指数数据 Shanghai_index_data = current_info.get('000001') Shanghai_index = Shanghai_index_data[0] # 上证指数 Shanghai_index_volume = round(Shanghai_index_data[1] / 100000000, 2) # 上证指数 当日当时成交量 Shanghai_index_turnover = round(Shanghai_index_data[2] / 100000000, 2) # 上证指数 当日当时成交额度 Shanghai_Yesterday_closing_index = round(Shanghai_index_data[3], 2) # 上证指数 昨日收盘指数 logger.info( f"上证 指数:{Shanghai_index} 昨日收盘指数:{Shanghai_Yesterday_closing_index} 成交量:{Shanghai_index_volume}亿 手 成交额:{Shanghai_index_turnover}亿 元") # 深证指数数据 Shenzhen_index_data = current_info.get('399001') Shenzhen_index = Shenzhen_index_data[0] # 深证指数 Shenzhen_index_volume = round(Shenzhen_index_data[1] / 100000000, 2) # 深证指数 当日当时成交量 Shenzhen_index_turnover = round(Shenzhen_index_data[2] / 100000000, 2) # 深证指数 当日当时成交额度 Shenzhen_Yesterday_closing_index = round(Shenzhen_index_data[3], 2) # 深证指数 昨日收盘指数 logger.info( f"深证 指数:{Shenzhen_index} 昨日收盘指数:{Shenzhen_Yesterday_closing_index} 成交量:{Shenzhen_index_volume}亿 手 成交额:{Shenzhen_index_turnover}亿 元") # 创业板指数数据 TSXV_index_data = current_info.get('399006') TSXV_index = TSXV_index_data[0] # 创业板指 TSXV_index_volume = round(TSXV_index_data[1] / 100000000, 2) # 创业板指 当日当时成交量 TSXV_index_turnover = round(TSXV_index_data[2] / 100000000, 2) # 创业板指 当日当时成交额度 TSXV_Yesterday_closing_index = round(TSXV_index_data[3], 2) # 深证指数 昨日收盘指数 logger.info( f"创业板 指数:{TSXV_index} 昨日收盘指数:{TSXV_Yesterday_closing_index} 成交量:{TSXV_index_volume}亿 手 成交额:{TSXV_index_turnover}亿 元") # 调用涨幅公式计算对应的股票tick瞬时涨幅 data_cache.Shanghai_tick_growth = basic_methods.tick_growth('000001', Shanghai_index) data_cache.Shanghai_today_growth = basic_methods.intraday_growth(Shanghai_index, Shanghai_Yesterday_closing_index) logger.info(f"上证指数 瞬时涨幅 ---- {round(data_cache.Shanghai_tick_growth, 4)}%") logger.info(f"上证指数 当日涨幅 ==== {round(data_cache.Shanghai_today_growth, 4)}%") # 调用涨幅公式计算对应的股票tick瞬时涨幅 data_cache.Shenzhen_tick_growth = basic_methods.tick_growth('399001', Shanghai_index) data_cache.Shenzhen_today_growth = basic_methods.intraday_growth(Shenzhen_index, Shenzhen_Yesterday_closing_index) logger.info(f"深证指数 瞬时涨幅 ---- {round(data_cache.Shenzhen_tick_growth, 4)}%") logger.info(f"深证指数 当日涨幅 ==== {round(data_cache.Shenzhen_today_growth, 4)}%") # 调用涨幅公式计算对应的股票tick瞬时涨幅 data_cache.TSXV_tick_growth = basic_methods.tick_growth('399006', Shanghai_index) data_cache.TSXV_today_growth = basic_methods.intraday_growth(TSXV_index, TSXV_Yesterday_closing_index) logger.info(f"创业板指 瞬时涨幅 ---- {round(data_cache.TSXV_tick_growth, 4)}%") logger.info(f"创业板指 当日涨幅 ==== {round(data_cache.TSXV_today_growth, 4)}%") # 在集合竞价时更新一下 各个指数的开盘涨幅 now_time = tool.get_now_time_str() # 9:25:06 < now_time < 9:25:12 记录开盘指数 及 开盘涨幅 if data_cache.LATER_OPEN_BIDDING_TIME < now_time < data_cache.AFTER_OPEN_BIDDING_TIME: index_K_line['Shanghai']['Shanghai_open_index'] = Shanghai_index index_K_line['Shenzhen']['Shenzhen_open_index'] = Shenzhen_index index_K_line['TSXV']['TSXV_open_index'] = TSXV_index data_cache.Shanghai_open_growth = data_cache.Shanghai_today_growth data_cache.Shenzhen_open_growth = data_cache.Shenzhen_today_growth data_cache.TSXV_open_growth = data_cache.TSXV_today_growth # 15:00:00 < now_time < 15:01:00 记录收盘指数 if data_cache.CLOSING_TIME < now_time < data_cache.AFTER_CLOSING_TIME: index_K_line['Shanghai']['Shanghai_close_index'] = Shanghai_index index_K_line['Shenzhen']['Shenzhen_close_index'] = Shenzhen_index index_K_line['TSXV']['TSXV_close_index'] = TSXV_index logger.info(f"上证指数 开盘涨幅 ==== {round(data_cache.Shanghai_open_growth, 4)}%") logger.info(f"深证指数 开盘涨幅 ==== {round(data_cache.Shenzhen_open_growth, 4)}%") logger.info(f"创业板指 开盘涨幅 ==== {round(data_cache.TSXV_open_growth, 4)}%") # 大盘指数趋势预期函数(用于对大盘开盘后短期内的趋势预测) def index_trend_expectation(): index_trend_expectation_score = 100 # 判断三大指数开盘涨幅 预期 if data_cache.Shanghai_open_growth > 0 and data_cache.Shenzhen_open_growth > 0 and data_cache.TSXV_open_growth > 0: index_composite_open_increase = '三大指数:高开' index_trend_expectation_score += 5 if data_cache.Shanghai_open_growth >= 1 and data_cache.Shenzhen_open_growth >= 1 and data_cache.TSXV_open_growth >= 1: index_composite_open_increase = '三大指数:高开1% 以上' index_trend_expectation_score += 5 elif data_cache.Shanghai_open_growth < 0 and data_cache.Shenzhen_open_growth < 0 and data_cache.TSXV_open_growth < 0: index_composite_open_increase = '三大指数:低开' index_trend_expectation_score -= 5 if data_cache.Shanghai_open_growth <= 1 and data_cache.Shenzhen_open_growth <= 1 and data_cache.TSXV_open_growth <= 1: index_composite_open_increase = '三大指数:低开1% 以下' index_trend_expectation_score -= 5 else: index_composite_open_increase = '三大指数:涨跌不一' logger.info(f"开盘指数开盘涨幅播报:{index_composite_open_increase}") # 判断三大指数开盘后当日涨幅 预期 if data_cache.Shanghai_today_growth > data_cache.Shanghai_open_growth and data_cache.Shenzhen_today_growth > data_cache.Shenzhen_open_growth and data_cache.TSXV_today_growth > data_cache.TSXV_open_growth: index_composite_today_increase = '三大指数:短期高走' index_trend_expectation_score += 10 elif data_cache.Shanghai_today_growth < data_cache.Shanghai_open_growth and data_cache.Shenzhen_today_growth < data_cache.Shenzhen_open_growth and data_cache.TSXV_today_growth < data_cache.TSXV_open_growth: index_composite_today_increase = '三大指数:短期低走' index_trend_expectation_score -= 10 else: index_composite_today_increase = '三大指数:短期走势不一' logger.info(f"开盘指数开盘短期走势播报:{index_composite_today_increase}") # 判断三大指数开盘涨幅 预期 if data_cache.Shanghai_tick_growth > 0 and data_cache.Shenzhen_tick_growth > 0 and data_cache.TSXV_tick_growth > 0: index_composite_tick_increase = '三大指数:瞬时高走' index_trend_expectation_score += 2 if data_cache.Shanghai_tick_growth >= 0.01 and data_cache.Shenzhen_tick_growth >= 0.01 and data_cache.TSXV_tick_growth >= 0.01: index_composite_tick_increase = '三大指数:高走0.01% 以上' index_trend_expectation_score += 4 elif data_cache.Shanghai_tick_growth < 0 and data_cache.Shenzhen_tick_growth < 0 and data_cache.TSXV_tick_growth < 0: index_composite_tick_increase = '三大指数:瞬时低走' index_trend_expectation_score -= 2 if data_cache.Shanghai_tick_growth <= 0.01 and data_cache.Shenzhen_tick_growth <= 0.01 and data_cache.TSXV_tick_growth <= 0.01: index_composite_tick_increase = '三大指数:低走0.01% 以下' index_trend_expectation_score -= 4 else: index_composite_tick_increase = '三大指数:涨跌不一' logger.info(f"开盘指数开盘瞬时走势播报:{index_composite_tick_increase}") if index_trend_expectation_score > 100: index_trend_expectation_score = 100 return index_trend_expectation_score # 获取指数K线并写入本地的函数 def get_index_K_line(): all_index_K_line_dict = {} for i in data_cache.INDEX_SYMBOL_LIST: index_K_line = juejin_api.JueJinApi.history_n(i, "1d", 90, 1, "open,close,high,low,amount,volume,bob") # print(f"index_K_line=={index_K_line}") all_index_K_line_dict[i] = index_K_line # print(f"all_index_K_line=={all_index_K_line_dict}") return all_index_K_line_dict # 给指数K线加入属性值 def get_property_mark(it_K_line, symbol): historical_high_price = 0.01 # 初始化历史最高价 historical_low_price = 0.01 # 初始化历史最低价 historical_low_price_index = 0 # 初始化历史最高价序号 historical_high_price_index = 0 # 初始化历史最低价序号 historical_average_price = 0.01 # 初始化历史均价 large_amplitude_day = [] # 初始化大幅震荡日期 current_count = 0 # 当前连续满足条件的天数 start = None results = [] dates_list = [] for i in range(0, len(it_K_line) - 1): previous_close = it_K_line[i + 1]['close'] # 昨日收盘价 previous_high = it_K_line[i + 1]['high'] # 昨日最高价 current_open = round(it_K_line[i]['open'], 2) # 当日开盘价 current_close = round(it_K_line[i]['close'], 2) # 当日收盘价 current_high = round(it_K_line[i]['high'], 2) # 当日最高价 current_low = round(it_K_line[i]['low'], 2) # 当日最低价 limit_up_price = basic_methods.limit_up_price(previous_close) # 计算出的当天涨停价 limit_up_price = float(limit_up_price) # 将涨停价转化为浮点 limit_down_price = basic_methods.limit_down_price(previous_close) # 计算出的当天跌停价 limit_down_price = float(limit_down_price) # 将跌停价转化为浮点 current_today_volume = it_K_line[i]['volume'] # 当日总交易量 current_yesterday_volume = it_K_line[i + 1]['volume'] # 昨日总交易量 # current_today_amount = it_K_line[i]['amount'] #当日总交易额 current_today_growth = basic_methods.intraday_growth(current_close, previous_close) # 计算出当日涨幅 it_K_line[i]['today_growth'] = current_today_growth # 将当日涨幅数据添加到字典中对应的日期中 current_today_amplitude = round((current_high - current_low) / previous_close, 2) # 计算出当日振幅 it_K_line[i]['today_amplitude'] = current_today_amplitude # 将当日振幅数据添加到字典中对应的日期中 historical_high_price = round(max(i['high'] for i in it_K_line), 2) # 计算出历史最高价 historical_high_price_index = max(range(len(it_K_line)), key=lambda ii: it_K_line[ii]['high']) # 找到最高价的索引 historical_low_price = round(min(i['low'] for i in it_K_line), 2) # 计算出历史最低价 historical_low_price_index = min(range(len(it_K_line)), key=lambda ii: it_K_line[ii]['low']) # 找到最低价的索引 historical_average_price = round((historical_high_price + historical_low_price) / 2, 2) # 计算出历史最均价 # 把最高价、最高价序号、最低价、最低价序号,添加到K线的昨日字典中 new_properties = { 'historical_high_price': historical_high_price, 'historical_high_price_index': historical_high_price_index, 'historical_low_price': historical_low_price, 'historical_low_price_index': historical_low_price_index, 'historical_average_price': historical_average_price } it_K_line[0].update(new_properties) # 计算大振幅的日期并添加到大振幅日期列表中 if not (-1 < it_K_line[i]['today_growth'] < 1): large_amplitude_day.append(it_K_line[i]['bob']) # 判断K线图形风险位置 及 高低时间点 if historical_low_price_index > historical_high_price_index: # 序列小于1是历史最高价 历史长度90,其实还可以根据更长来判断更显程度,历史长度很短的一般认为具有更高风险,而更长的之前有可能有更高的历史高位还比较远 # 最低价在前 it_K_line[0]['risk_position'] = 'low_price_ahead' ''' 以下部分条件分支由于都是对 it_K_line[0]['risk_position'] 数据进行修改, 条件判断逻辑有部分重叠,所以应按照逻辑的覆盖面进行先后判断,避免先把大部分过滤掉了。 所以当前条件分支语句的顺序不要轻易更换。 # [(historical_high_price / round(it_K_line[0]['close'], 2) < 1.05)的预设值取值范围选择其一: # {"均价以上近前高的开盘价":1.05,"接近前高 的跌停价":1.11,"接近前高 的一个炸板加一个跌停的收盘价":1.17,"均价与最高价之间的涨停价":1.21,"均价与最高价之间的价":1.33} ''' # 最高位据昨日较远 且 昨收价距历史均价差距较小 且 整体涨跌幅度较大 且 最高位 与 最低位为三倍涨幅的妖股【当前二度试探风险】 if (2 < historical_high_price_index < 15 and len(it_K_line) >= 90) and ( (historical_average_price / round(it_K_line[0]['close'], 2)) < 2) and ( historical_high_price > historical_low_price * 3): it_K_line[0]['risk_position'] = 'recent_monster_second_break_risk' # 最高位据昨日较远 且 昨收价距历史均价差距较小 且 整体涨跌幅度较大 【当前二度突破风险】 if (2 < historical_high_price_index < 15 and len(it_K_line) >= 90) and ( (historical_average_price / round(it_K_line[0]['close'], 2)) < 1.17) and ( historical_high_price > historical_low_price * 1.8): it_K_line[0]['risk_position'] = 'recent_second_break_risk' # 最高位据昨日较远 且 昨收价距历史最高价差距较小(小于5%,涨幅5%是左右是最容易把低吸骗进去的日内涨幅) 且 整体涨跌幅度较大 【当前二度突破近前高风险】 # 【双成药业 2024-11-08 这样的二次近前高,实际数值为1.4646...,为安全起见还是设置为1.5】 if (2 < historical_high_price_index < 15 and len(it_K_line) >= 90) and ( historical_high_price / round(it_K_line[0]['close'], 2) < 1.5) and ( historical_high_price > historical_low_price * 1.8): it_K_line[0]['risk_position'] = 'recent_second_break_near_high_position_risk' # 昨日就是最高位 且 整体涨跌幅度较大 【当前高位风险】 if (historical_high_price_index < 2 and len(it_K_line) >= 90) and ( historical_high_price > historical_low_price * 1.8): it_K_line[0]['risk_position'] = 'recent_high_position_risk' else: # 最高价在前 it_K_line[0]['risk_position'] = 'high_price_ahead' # 最低位据昨日较远 且 昨收价距历史均价差距较小 且 整体涨跌幅度较大 【当前二度崩溃风险】 if (2 < historical_low_price_index < 15 and len(it_K_line) >= 90) and ( (historical_average_price / round(it_K_line[0]['close'], 2)) > 1.1): it_K_line[0]['risk_position'] = 'recent_second_crash_risk' # 昨日就是最低位 且 整体涨跌幅度较大 【当前低位风险】 if (historical_low_price_index < 2 and len(it_K_line) >= 90) and ( historical_high_price > historical_low_price * 1.5): it_K_line[0]['risk_position'] = 'recent_low_position_risk' # 判断K线图形中连续低涨幅天数及具体序列号并把共识价格均价写入文件【阶段性价格共识位置】 if 0 < it_K_line[i]['today_amplitude'] < 0.025 and i < 60 and ( historical_high_price > historical_low_price * 1.5): if current_count == 0: start = i # 如果当前涨幅小于阈值,增加连续天数计数器 current_count += 1 # 如果循环复合条件的连续天数达到5天,记录起始索引 if current_count > 5: # 检查 results 是否为空或当前序列不与最后一个序列重叠 if not results or results[-1][1] < start: if 0.98 < it_K_line[i]['high'] / it_K_line[start]['low'] < 1.02: # 没有重叠,直接添加【其中start为循环到起始序号,i为循环到的结束序号。在实际的日期中要反过来】 results.append((start, i)) # 算出价格共识区间的均价 average_consensus_price = round((it_K_line[i]['high'] + it_K_line[start]['low']) / 2, 2) dates_list.append(( it_K_line[start]['bob'].strftime('%Y-%m-%d %H:%M:%S'), it_K_line[i]['bob'].strftime('%Y-%m-%d %H:%M:%S'), average_consensus_price)) # if len(dates_list) > 1: # 如果只需要检测包含多段价格共识区 it_K_line[0]['rise_and_fall_is_small_start_and_end_date'] = dates_list else: # 重置计数器 current_count = 0 start = None # 确保不会出现除以零的报错 if current_yesterday_volume != 0: if str(it_K_line[i]['bob']).find('2025-03-21') >= 0: print(f"") print( f"{it_K_line[i]['bob']} 的 current_today_volume=={current_today_volume},, current_yesterday_volume=={current_yesterday_volume}") if round(current_today_volume / current_yesterday_volume, 2) > 1.1: # print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【放量】") if current_today_growth > 0: it_K_line[i]['today_volume_shape'] = 'increases_up' # print(f"i=={i} {it_K_line[i]['bob']} 【放量上涨】") elif current_today_growth < 0: it_K_line[i]['today_volume_shape'] = 'increases_down' # print(f"i=={i} {it_K_line[i]['bob']} 【放量下跌】") else: it_K_line[i]['today_volume_shape'] = 'increases_balance' # print(f"i=={i} {it_K_line[i]['bob']} 【放量平收】") elif round(current_today_volume / current_yesterday_volume, 2) < 0.9: # print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【缩量】") if current_today_growth > 0: it_K_line[i]['today_volume_shape'] = 'decreases_up' # print(f"i=={i} {it_K_line[i]['bob']} 【缩量上涨】") elif current_today_growth < 0: it_K_line[i]['today_volume_shape'] = 'decreases_down' # print(f"i=={i} {it_K_line[i]['bob']} 【缩量下跌】") else: it_K_line[i]['today_volume_shape'] = 'decreases_balance' # print(f"i=={i} {it_K_line[i]['bob']} 【缩量平收】") else: # print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【平量】") if current_today_growth > 0: it_K_line[i]['today_volume_shape'] = 'remained_up' # print(f"i=={i} {it_K_line[i]['bob']} 【平量上涨】") elif current_today_growth < 0: it_K_line[i]['today_volume_shape'] = 'remained_down' # print(f"i=={i} {it_K_line[i]['bob']} 【平量下跌】") else: it_K_line[i]['today_volume_shape'] = 'remained_balance' # print(f"i=={i} {it_K_line[i]['bob']} 【平量平收】") else: logger.info(f"{symbol} 的 昨日成交量 为 0,报错!!") if current_open - previous_close > 0: # print(f"i=={i} {it_K_line[i]['bob']} 成交总量:{today_volume},,,成交总金额:{today_amount}") # difference = current_close - previous_close if current_close - current_open > 0: it_K_line[i]['attribute'] = 'up_up' # print(f"i=={i} {it_K_line[i]['bob']} 高开高走 【昨收价:{previous_close} 收盘价:{current_close}】") else: it_K_line[i]['attribute'] = 'up_down' # print(f"i=={i} {it_K_line[i]['bob']} 高开低走 【昨收价:{previous_close} 收盘价:{current_close}】") else: if current_close - current_open > 0: it_K_line[i]['attribute'] = 'down_up' # print(f"i=={i} {it_K_line[i]['bob']} 低开高走 【昨收价:{previous_close} 收盘价:{current_close}】") else: it_K_line[i]['attribute'] = 'down_down' # print(f"i=={i} {it_K_line[i]['bob']} 低开低走 【昨收价:{previous_close} 收盘价:{current_close}】") # 将高涨跌幅日期打印出来 # it_K_line[0]['large_amplitude_day'] = large_amplitude_day # 判断K线图形长期振幅 及 当前走势 【根据大振幅天数 反推算出 长期低幅度震荡持平、上涨、下跌的个股写入字典】 if len(large_amplitude_day) < 1: # 长期低幅度震荡 it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation' # it_K_line[0]['large_amplitude_day_len'] = len(large_amplitude_day) if (historical_high_price / historical_low_price) < 1.15: if 0.99 < (historical_average_price / round(it_K_line[0]['close'], 2)) < 1.09: # 长期低幅度震荡持平 it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation_remains_stable' else: if (historical_low_price_index > historical_high_price_index) and ( historical_high_price_index < 2 and len(it_K_line) > 88): # 长期低幅度震荡上涨 it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation_and_rise' elif (historical_low_price_index < historical_high_price_index) and ( historical_low_price_index < 2 and len(it_K_line) > 88): # 长期低幅度震荡下跌 it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation_and_decline' # 将加入属性值的指数K线下入到本地文件中 def all_index_k_line_dict_write(): all_index_K_line_dict = get_index_K_line() # 初始化所有个股的指标K线列表 all_index_k_line_property_dict = {} for i in data_cache.INDEX_SYMBOL_LIST: # print(f"i==========={i}") i_k_line = all_index_K_line_dict[i] # 获取i的K线 i_k_line_copy = copy.deepcopy(i_k_line) # 深拷贝i的K线 it_K_line_reversed = list(reversed(i_k_line_copy)) # 开盘啦获取的数据需要反转i的K线 if not it_K_line_reversed: continue get_property_mark(it_K_line_reversed, i) # 给标的的K线更新指标属性 把股票代码同时传给要调用的函数 symbol_K_line_property_dict = {i: it_K_line_reversed} # 添加 更新极限指标属性的K线 字典 # print(f"symbol_K_line_property_dict===={symbol_K_line_property_dict}") all_index_k_line_property_dict.update(symbol_K_line_property_dict) # print(f"all_index_k_line_property_dict===={all_index_k_line_property_dict}") # 构造时间格式datetime转化为字符串,以便将K线属性指标转化为json格式写入本地文件 def convert_datetime(obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') # 转换为字符串 elif isinstance(obj, dict): return {k: convert_datetime(v) for k, v in obj.items()} # 递归处理字典 elif isinstance(obj, list): return [convert_datetime(element) for element in obj] # 递归处理列表 # 可以添加其他类型的处理逻辑 else: # 对于未知类型,你可以选择保留原样、跳过或引发异常 # 这里我们选择保留原样 return obj try: json_data = json.dumps(convert_datetime(all_index_k_line_property_dict), ensure_ascii=False, indent=4) # 将转换后的JSON字符串写入文件 with open(constant.INDEX_K_BARS_PATH, 'w', encoding='utf-8') as f: f.write(json_data) # print(f"json_data:{json_data}") except Exception as error: print(f"An error occurred while converting the data to JSON: {error}") logger.info(f"加属性的指数k线写完了!{tool.get_now_time_str()}") # 计算市场分布形态因子 函数 # ====================== 输入数据 ====================== data = {'-1': '2676', '-10': '4', '-2': '769', '-3': '181', '-4': '59', '-5': '43', '-6': '18', '-7': '11', '-8': '5', '-9': '4', '0': '219', '1': '714', '10': '6', '2': '214', '3': '69', '4': '45', '5': '23', '6': '9', '7': '9', '8': '7', '9': '6', 'DT': 17, 'SJDT': '8', 'SJZT': '17', 'STDT': '9', 'STZT': '4', 'SZJS': 1123, 'XDJS': 3787, 'ZSZDFB': '480,1639,112,123,351,26,76,202,15,17,32,1,15,30,5,78,204,18,', 'ZT': 21, 'sign': '市场人气一般', 'szln': 2591290, 'qscln': 6629612, 's_zrcs': 2604433, 'q_zrcs': 6660830, 's_zrtj': 38031347, 'q_zrtj': 91465597} # ====================== 总手涨跌分布数据预处理 ====================== # 总手涨跌分布解析 def parse_zszdfb(zszdfb_str): """ 解析ZSZDFB数据为易读的字典格式 参数: zszdfb_str: ZSZDFB原始字符串(如'1929,247,56,...') 返回: { "区间分布": [ {"区间": "[-10%, -9%)", "成交量(手)": 1929}, {"区间": "[-9%, -8%)", "成交量(手)": 247}, ... ], "总成交量(手)": int, "说明": "本数据包含从-10%到+8%的成交量分布,共18个1%间隔区间" } """ # 1. 清洗并转换原始数据为整数列表 counts = [int(x) for x in zszdfb_str.strip(',').split(',') if x] # 2. 定义区间划分(假设从-9%到+9%,1%间隔) bins = [ "[+9%, +8%)", "[+8%, +7%)", "[+7%, +6%)", "[+6%, +5%)", "[+5%, +4%)", "[+4%, +3%)", "[+3%, +2%)", "[+2%, +1%)", "[+1%, 0%)", "[0%, -1%)", "[-1%, -2%)", "[-2%, -3%)", "[-3%, -4%)", "[-4%, -5%)", "[-5%, -6%)", "[-6%, -7%)", "[-7%, -8%)", "[ -8%, -9%)" ] # 3. 验证数据长度 if len(counts) != len(bins): raise ValueError(f"数据长度不匹配!预期{len(bins)}个区间,实际{len(counts)}个值") # 4. 构建结构化输出 result = { "区间分布": [ {"区间": bin_name, "成交量(手)": count} for bin_name, count in zip(bins, counts) ], "总成交量(手)": sum(counts), "说明": "本数据包含从-10%到+8%的成交量分布,共18个1%间隔区间" } return result # ====================== 核心因子计算 ====================== def calculate_factors(data): # data = {'-1': '284', '-10': '2', '-2': '80', '-3': '32', '-4': '11', '-5': '6', '-6': '6', '-7': '2', '-8': '0', # '-9': '1', '0': '101', '1': '1376', '10': '8', '2': '1760', '3': '964', '4': '285', '5': '108', '6': '49', # '7': '17', '8': '9', '9': '2', 'DT': 3, 'SJDT': '2', 'SJZT': '15', 'STDT': '1', 'STZT': '7', 'SZJS': 4600, # 'XDJS': 427, 'ZSZDFB': '1939,238,57,446,45,8,271,19,2,42,7,1,26,19,5,217,71,12,', 'ZT': 22, # 'sign': '市场人气较好', 'szln': 1113353, 'qscln': 3725698, 's_zrcs': 2185592, 'q_zrcs': 5573160, # 's_zrtj': 58079140, 'q_zrtj': 134866542} if data is not None and len(data) > 0: zero = int(data.get('0')) rise_one = int(data.get('1')) rise_two = int(data.get('2')) rise_three = int(data.get('3')) rise_four = int(data.get('4')) rise_five = int(data.get('5')) rise_six = int(data.get('6')) rise_seven = int(data.get('7')) rise_eight = int(data.get('8')) rise_nine = int(data.get('9')) rise_ten = int(data.get('10')) fall_one = int(data.get('-1')) fall_two = int(data.get('-2')) fall_three = int(data.get('-3')) fall_four = int(data.get('-4')) fall_five = int(data.get('-5')) fall_six = int(data.get('-6')) fall_seven = int(data.get('-7')) fall_eight = int(data.get('-8')) fall_nine = int(data.get('-9')) fall_ten = int(data.get('-10')) ZT = int(data.get('ZT')) # 跌停 DT = int(data.get('DT')) # 跌停 SJZT = int(data.get('SJZT')) # 实际跌停 SJDT = int(data.get('SJDT')) # 实际跌停 STZT = int(data.get('STZT')) # ST跌停 STZT = int(data.get('STDT')) # ST跌停 SZJS = int(data.get('SZJS')) # 上涨家数 XDJS = int(data.get('XDJS')) # 上涨家数 ZSZDFB = data.get('ZSZDFB') # 总手涨跌分布 '1929,247,56,435,61,3,259,29,4,44,5,1,38,9,3,239,51,10,' szln = int(data.get('szln')) # - szln(卖总量):内盘成交量,即主动卖出量,单位为手 qscln = int(data.get('qscln')) # - qscln(买总量):外盘成交量,即主动买入量,单位为手 s_zrcs = int(data.get('s_zrcs')) # - s_zrcs(卖方昨日持仓数):卖方在昨日的持仓数量 q_zrcs = int(data.get('q_zrcs')) # - q_zrcs(买方昨日持仓数):买方在昨日的持仓数量 s_zrtj = int(data.get('s_zrtj')) # - s_zrtj(卖方资金今日统计):卖方当日的资金流出量 q_zrtj = int(data.get('q_zrtj')) # - q_zrtj(买方资金今日统计):买方当日的资金流入量 # 执行总手涨跌分布解析 parsed_data = parse_zszdfb(ZSZDFB) """计算市场关键指标因子""" factors = {} # 涨跌统计 calm_dirt = { 'zero': zero } rise_dirt = { 'rise_one': rise_one, 'rise_two': rise_two, 'rise_three': rise_three, 'rise_four': rise_four, 'rise_five': rise_five, 'rise_six': rise_six, 'rise_seven': rise_seven, 'rise_eight': rise_eight, 'rise_nine': rise_nine, 'rise_ten': rise_ten, } fall_dirt = { 'fall_one': fall_one, 'fall_two': fall_two, 'fall_three': fall_three, 'fall_four': fall_four, 'fall_five': fall_five, 'fall_six': fall_six, 'fall_seven': fall_seven, 'fall_eight': fall_eight, 'fall_nine': fall_nine, 'fall_ten': fall_ten, } rise_and_fall_dirt = {} rise_and_fall_dirt.update(calm_dirt) rise_and_fall_dirt.update(rise_dirt) rise_and_fall_dirt.update(fall_dirt) # 计算涨跌统计所有股票数量总和 rise_and_fall_sum = sum(rise_and_fall_dirt.values()) # 总票数 factors['total_stocks'] = rise_and_fall_sum # 计算上涨股票数量的和 rise_sum = sum(rise_dirt.values()) # 计算上涨股票数量的和 fall_sum = sum(fall_dirt.values()) if rise_and_fall_sum != 0 and rise_sum != 0 and fall_sum != 0: # 计算每个键(每个幅段)的值占总和的百分比 percentages = {key: round((value / rise_and_fall_sum) * 100, 2) for key, value in rise_and_fall_dirt.items()} # # 计算每个键(每个涨幅段)的值占总和的百分比 rise_percentages = {key: round((value / rise_sum) * 100, 2) for key, value in rise_dirt.items()} # # 计算每个键(每个涨幅段)的值占总和的百分比 fall_percentages = {key: round((value / fall_sum) * 100, 2) for key, value in fall_dirt.items()} else: percentages = {} rise_percentages = {} fall_percentages = {} # 找到全幅段最大值对应的键 max_key = max(rise_and_fall_dirt, key=rise_and_fall_dirt.get) # 找到全幅段最小值对应的键 min_key = min(rise_and_fall_dirt, key=rise_and_fall_dirt.get) # 找到上涨幅段最大值对应的键 rise_max_key = max(rise_dirt, key=rise_dirt.get) # 找到下跌幅段最大值对应的键 fall_max_key = max(fall_dirt, key=fall_dirt.get) # 涨跌比因子 --------------------------------------------------- factors['rise_vs_fall'] = { 'rise_vs_fall_ratio': round(rise_sum / fall_sum, 2) if fall_sum > 0 else 0, # 涨跌比 'gather_area': max_key, # 找到全幅段最大值 'scattered_area': min_key, # 找到全幅段最小值 'percentages': percentages, # 全幅段的股票分布比例 'rise_percentages': rise_percentages, # 涨幅段的股票分布比例 'fall_percentages': fall_percentages, # 跌幅段的股票分布比例 'rise_and_fall_sum': rise_and_fall_sum, 'rise_max_key': rise_max_key, 'fall_max_key': fall_max_key, 'parsed_data': parsed_data } # 按值排序并提取前三个键值对 sorted_items = sorted(factors['rise_vs_fall']['percentages'].items(), key=lambda item: item[1], reverse=True) top_three_items = sorted_items[:3] # 分布数量前三的的振幅区间 factors['top_three'] = { 'top1': {'key': top_three_items[0][0], 'value': top_three_items[0][1]}, 'top2': {'key': top_three_items[1][0], 'value': top_three_items[1][1]}, 'top3': {'key': top_three_items[2][0], 'value': top_three_items[2][1]}, } # 资金流向因子 -------------------------------------------------- factors['capital_flow'] = { 'buy': q_zrtj, # 买方资金净流入 'sell': s_zrtj, # 卖方资金净流出 'net': q_zrtj - s_zrtj # 资金净流入 } # 市场情绪因子 -------------------------------------------------- factors['sentiment'] = { 'zt_ratio': ZT / rise_and_fall_sum, # 涨停占总比 'zt_vs_dt_ratio': (SJZT + 1) / (SJDT + 1), # 涨停跌停比 'rise_vs_fall_ratio': (SZJS + 1) / (XDJS + 1), # 涨跌比 'rise_vs_all_stocks_ratio': SZJS / rise_and_fall_sum, # 活跃股占比 'sign': 1 if '较好' in data['sign'] else 0, # 情绪标签量化 'extreme_ratio': (ZT + DT) / rise_and_fall_sum, # 极端波动(涨停+跌停占总比) } return factors # # ====================== 策略信号生成 ====================== def generate_signals(factors): """生成多维度交易信号""" signals = [] # 信号1:趋势强度判断 if factors['sentiment']['rise_vs_fall_ratio'] > 1.5: signals.append("多头强势:涨跌比高于阈值1.5") elif factors['sentiment']['rise_vs_fall_ratio'] < 0.67: signals.append("空头强势:涨跌比低于阈值0.67") # 信号2:资金动能判断 if factors['capital_flow']['net'] > 1e8: # 假设1亿为阈值 signals.append("资金大幅净流入:关注主力进场") elif factors['capital_flow']['net'] < -5e7: signals.append("资金大幅净流出:警惕风险") # 信号3:极端波动预警 if factors['sentiment']['extreme_ratio'] > 0.1: signals.append("极端波动:涨停跌停股超10%") # 信号4:市场情绪综合 if factors['sentiment']['zt_ratio'] > 0.05 and factors['sentiment']['sign'] == 1: signals.append("高情绪热度:涨停股多且人气向好") # 信号5:涨跌分布综合 if factors['top_three']['top1']['value'] + factors['top_three']['top2']['value'] + factors['top_three']['top3'][ 'value'] > 50: value = factors['top_three']['top1']['value'] + factors['top_three']['top2']['value'] + \ factors['top_three']['top3']['value'] signals.append(f"涨跌分布:长尾分布{value}") if factors['top_three']['top1']['value'] / factors['top_three']['top2']['value'] > 1.25: signals.append( f"涨跌分布:强势聚集{factors['top_three']['top1']['key']}:{factors['top_three']['top1']['value']}%") else: signals.append( f"涨跌分布:中度聚集{factors['top_three']['top1']['key']}:{factors['top_three']['top1']['value']}%") else: value = factors['top_three']['top1']['value'] + factors['top_three']['top2']['value'] + \ factors['top_three']['top3']['value'] signals.append(f"涨跌分布:均匀分布{value}") return signals if signals else ["无显著信号:维持当前策略"] # 实时设置计划持仓数量 函数 def set_plan_position_quantity(): while True: try: if data_cache.position_automatic_management_switch is True: now_time = tool.get_now_time_str() if data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME: # 获取大盘综合强度分数 data_cache.real_time_market_strong = kpl_api.get_market_strong() # 获取市场情绪字典【完整】,并整理 data_cache.real_time_market_sentiment_dirt = kpl_api.changeStatistics() # 市场情绪字典字典 date_today = data_cache.real_time_market_sentiment_dirt.get('Day', None) # 情绪数据日期 significant_drawdown = data_cache.real_time_market_sentiment_dirt.get('df_num', None) # 大幅回撤 sentiment_indicators = data_cache.real_time_market_sentiment_dirt.get('ztjs', None) # 情绪指标 limit_up_amount = data_cache.real_time_market_sentiment_dirt.get('ztjs', None) # 涨停家数 connecting_board_height = data_cache.real_time_market_sentiment_dirt.get('lbgd', None) # 连板高度 # 获取市场情绪-涨跌统计 data_cache.rise_and_fall_statistics_dirt = kpl_api.getMarketFelling() # 涨跌统计字典 if data_cache.rise_and_fall_statistics_dirt is not None: # 涨跌统计因子计算 factors = calculate_factors(data_cache.rise_and_fall_statistics_dirt) # 涨跌统计生成信号 signals = generate_signals(factors) limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('ZT', None) # 涨停家数 actual_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJZT', None) # 实际涨停家数 ST_limit_up_numbers = data_cache.rise_and_fall_statistics_dirt.get('STZT', None) # ST涨停家数 limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('DT', None) # 跌停家数 actual_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('SJDT', None) # 实际跌停家数 ST_limit_down_numbers = data_cache.rise_and_fall_statistics_dirt.get('STDT', None) # ST跌停家数 rise_numbers = data_cache.rise_and_fall_statistics_dirt.get('SZJS', None) fall_numbers = data_cache.rise_and_fall_statistics_dirt.get('XDJS', None) # 控制打印日志的时间段 if data_cache.MORN_MARKET_CLOSING_TIME < now_time < data_cache.NOON_MARKET_OPENING_TIME: pass logger.info(f"午间休市时间内 不打印大盘综合强度分数") else: # 大盘综合强度分数 的 异步日志 # logger_Overall_market_strength_score.info(data_cache.real_time_market_strong) async_log_util.info(logger_Overall_market_strength_score, f"{data_cache.real_time_market_strong}") logger.info( f"日期:{date_today},情绪指标:{sentiment_indicators}分,大幅回撤:{significant_drawdown},涨停家数:{limit_up_amount},连板高度:{connecting_board_height}") logger.info( f"上涨家数:{rise_numbers},下跌家数:{fall_numbers},实际涨停家数:{actual_limit_up_numbers},实际跌停家数:{actual_limit_down_numbers}") logger.info(f"涨跌统计字典{data_cache.rise_and_fall_statistics_dirt}") logger.info(f"涨跌统计因子的计算={factors}") logger.info(f"涨跌统计生成信号={signals}") logger.info("========== 关键涨跌统计数据 ==========") logger.info(f"总股票数: {factors['total_stocks']}\n" f"涨跌比(BDR): {factors['rise_vs_fall']['rise_vs_fall_ratio']:.2f}\n" f"极端波动比例: {factors['sentiment']['extreme_ratio']:.2%}\n" f"买方资金净流入{factors['capital_flow']['buy']},卖方资金净流出{factors['capital_flow']['sell']}" f"资金净流入(元): {round(factors['capital_flow']['net'] / 10000, 2)}万\n" f"涨停股占比: {factors['sentiment']['zt_ratio']:.2%}\n" f"市场情绪量化: {'积极' if factors['sentiment']['sign'] else '谨慎'}\n" f"上涨幅段最大:{factors['rise_vs_fall']['rise_max_key']}:{factors['rise_vs_fall']['percentages'].get(factors['rise_vs_fall']['rise_max_key'])}%\n" f"下跌幅段最大:{factors['rise_vs_fall']['fall_max_key']}:{factors['rise_vs_fall']['percentages'].get(factors['rise_vs_fall']['fall_max_key'])}%\n" f"聚集区域:{factors['rise_vs_fall']['gather_area']},聚集区域的比例值:{factors['rise_vs_fall']['percentages'].get(factors['rise_vs_fall']['gather_area'])}%\n" f"零散区域:{factors['rise_vs_fall']['scattered_area']},聚集区域的比例值:{factors['rise_vs_fall']['percentages'].get(factors['rise_vs_fall']['scattered_area'])}%\n" f"涨跌因子字典={factors['rise_vs_fall']}\n") logger.info("========== 总手涨跌分布 ==========") # 打印结果(美化输出) for item in factors['rise_vs_fall']['parsed_data']["区间分布"]: logger.info(f"{item['区间']}: {item['成交量(手)']}手") logger.info(f"总成交量: {factors['rise_vs_fall']['parsed_data']['总成交量(手)']}手\n") logger.info(f"说明: {factors['rise_vs_fall']['parsed_data']['说明']}\n") logger.info("========== 策略信号 ==========") for i, signal in enumerate(signals, 1): logger.info(f"信号{i}: {signal}") # 只有在开盘时具体计算 计划下单金额 if data_cache.OPENING_TIME < now_time < data_cache.NOON_MARKET_TIME: # 理想交易行情分数【评估当前行情是否有利于低吸策略取得更高抓板率的分数(是否是理想的交易行情)】 ideal_trading_market_score = 1 # 33分是个两级分化阶梯不好,目前不好拿捏,暂时不用 # if data_cache.real_time_market_strong <= 33: 实际跌停大于10个 if data_cache.real_time_market_strong < 30 and int(actual_limit_down_numbers) > 10: # 如果大盘综合强度分数小于30,将低迷情绪分数比例设置为0.01,可用资金缩小一百倍 ideal_trading_market_score = 0.01 logger.info(f"理想交易行情分数===={ideal_trading_market_score * 100}%") data_cache.index_trend_expectation_score = index_trend_expectation() logger.info(f"大盘指数情绪预期分数==={data_cache.index_trend_expectation_score}分") # print(f"大盘指数情绪预期分数==={data_cache.index_trend_expectation_score}分") # # 目前大盘指数情绪预期分数 尚不科学 强制设置为初始0值 # index_trend_expectation_score = 0 # 获取计算今天新增的持仓数量 addition_position_number = len(data_cache.addition_position_symbols_set) # 定义一个今日的剩余新增持仓数量的变量 【目前策略默认一日三仓 所以设置常量3】 Unfinished_opening_plan_number = 3 - addition_position_number logger.info(f"今日的剩余新增持仓数量==={Unfinished_opening_plan_number}") usefulMoney = data_cache.account_finance_dict[0].get('usefulMoney', 0) logger.info(f"账户可用资金==={usefulMoney}元") # 开仓策略计算结果 # 根据账户可用金额 计算今日计划下单金额 # 账户可用金额 默认乘以0.95,永远留一点钱,一方面也冗余一些计算误差 # ((大盘综合强度分数 + 大盘指数情绪预期分数) * 0.01) * (账户可用金额 * 0.9 * 极端低迷情绪比例 / 今日最大新增持仓票数(常量:3)) # data_cache.today_planned_order_amount = ((data_cache.real_time_market_strong + data_cache.index_trend_expectation_score) * 0.01) * ( # usefulMoney * 0.9 * low_emotion_mood_ratio / Unfinished_opening_plan_number) # 除以3应该是一个常量,如果以Unfinished_opening_plan_number,会出现float division by zero # 如果今日计划下单数量不为空 或 计划金额为默认值,那么就实时计算第计划金额,记录下来 if not data_cache.today_planned_order_amount or data_cache.today_first_planned_order_amount <= 0: data_cache.today_planned_order_amount = usefulMoney * 0.95 * ideal_trading_market_score / 3 logger.info(f"实时计算计划下单金额==={data_cache.today_planned_order_amount}元") # 计算并固定首次下单时的计划金额(有延迟误差) # # 如果一日三仓的计划还未完成 且 首次开仓金额为0 if Unfinished_opening_plan_number != 0 and data_cache.today_first_planned_order_amount <= 0: if addition_position_number > 0: # 下过一次单过后再计算动态下单金额 data_cache.today_first_planned_order_amount = float(data_cache.today_planned_order_amount) logger.info( f"采用开仓策略计算方式--》今日(首个)新增持仓产生,将实时计算金额赋值给首笔持仓金额=》今日计划下单金额:{data_cache.today_planned_order_amount}") except Exception as error: logger_debug.exception(error) logger.error(f"实时设置计划持仓数量 函数报错: {error}") print(f"实时设置计划持仓数量 函数报错: {error}") finally: time.sleep(3) if __name__ == '__main__': # market_strong = kpl_api.get_market_strong() # print(f"{market_strong}") # get_real_time_market_strong() # all_index_K_line_dict = get_index_K_line() # all_index_k_line_dict_write() # print(f"指数K线{data_cache.all_index_k_line_property_dict}") # all_index_k_line_dict_write() # ====================== 结果输出 ====================== print(f"data=={data}") # 因子计算 factors = calculate_factors(data) # 生成信号 signals = generate_signals(factors) # 结果输出 print("========== 市场关键指标 ==========") print(f"总股票数: {factors['total_stocks']}") print(f"涨跌比(BDR): {factors['rise_vs_fall']['rise_vs_fall_ratio']:.2f}") print(f"极端波动比例: {factors['sentiment']['extreme_ratio']:.2%}") print(f"资金净流入(元): {round(factors['capital_flow']['net'] / 10000, 2)}万") print(f"涨停股占比: {factors['sentiment']['zt_ratio']:.2%}") print(f"聚集前三名情况字典=={factors['top_three']}") print( f"上涨幅段最大:{factors['rise_vs_fall']['rise_max_key']}:{factors['rise_vs_fall']['percentages'].get(factors['rise_vs_fall']['rise_max_key'])}%") print( f"下跌幅段最大:{factors['rise_vs_fall']['fall_max_key']}:{factors['rise_vs_fall']['percentages'].get(factors['rise_vs_fall']['fall_max_key'])}%") print( f"聚集区域:{factors['rise_vs_fall']['gather_area']},聚集区域的比例值:{factors['rise_vs_fall']['percentages'].get(factors['rise_vs_fall']['gather_area'])}%") print( f"零散区域:{factors['rise_vs_fall']['scattered_area']},聚集区域的比例值:{factors['rise_vs_fall']['percentages'].get(factors['rise_vs_fall']['scattered_area'])}%") print(f"市场情绪量化: {'积极' if factors['sentiment']['sign'] else '谨慎'}") print(f"涨跌因子字典={factors['rise_vs_fall']}") print("\n========== 策略信号 ==========") for i, signal in enumerate(signals, 1): print(f"信号{i}: {signal}") print("\n========== 总手涨跌分布 ==========") # 打印结果(美化输出) for item in factors['rise_vs_fall']['parsed_data']["区间分布"]: print(f"{item['区间']}: {item['成交量(手)']}手") print(f"\n总成交量: {factors['rise_vs_fall']['parsed_data']['总成交量(手)']}手") print(f"说明: {factors['rise_vs_fall']['parsed_data']['说明']}")