# 历史走势分析,及价值面中的历史行情部分(针对历史行情走势进行获取与分析) # coding=utf-8 from __future__ import print_function, absolute_import, unicode_literals import copy # import logging import os import time import datetime # from datetime import datetime import json import constant from log_module import async_log_util from log_module.log import logger_common, logger_k_bars from strategy import data_cache # 引入基础算法模块 from strategy import basic_methods # 引入本地日志模块 from utils import hx_qc_value_util, tool # 获取logger实例 logger = logger_common start_time = time.time() print(f"all_K_line开始运行--{start_time}") def init_data(): pass init_data() # 90个交易日历史K线类对象 class KLineHistory: all_stocks_base_K_line_dict = {} # 1.类的构造方法,定义类的各个属性 def __init__(self): pass def init(self, today_date, Next_trading_day, symbols): self.today_date = today_date # 定义属性today_date 当前交易日期 self.Next_trading_day = Next_trading_day # 定义属性Next_trading_day 下个交易日期 self.symbols = symbols # 1.获取90个交易日期或者获取90个交易日期前的具体日期,为历史K线的start_time获取数据 def k_line_history_90day(self): # 为了解决步长老是差一个,所以设定为92 start_time_data = basic_methods.pre_num_trading_day(self.today_date, 90) # print(f"90个交易日前日期=========={start_time_data}") # logger.info("获取历史K线===开始") # 如果是16:00之前,截止日期为:前一天,如果在16:00之后截止日期为后一天 if int(tool.get_now_time_str().replace(":", "")) < int("160000"): end_date = data_cache.DataCache().pre_trading_day else: end_date = data_cache.DataCache().today_date all_symbol_K_line_history_90day_dict = {} for symbol in self.symbols: code = symbol[-6:] k_bars = HistoryKDataManager().get_history_bars(code, end_date) if not k_bars: # 通过掘金接口迭代获取个股的90天基础K线,得到一个日期排列结果为増序的列表 k_bars = hx_qc_value_util.get_history_k_bars(symbol[-6:], 90, end_date=end_date) if k_bars: # 将K线保存到文件 HistoryKDataManager().save_history_bars(symbol[-6:], k_bars[0]['bob'].strftime("%Y-%m-%d"), k_bars) # print(f"symbol==={symbol}") # print(f"K_line_history_90day_data ====={K_line_history_90day_data} ") # 将获取到的个股90天基础K线赋值给迭代个股的一个新建字典的symbol对应的value all_symbol_K_line_history_90day_dict[symbol] = k_bars # print(f"all_symbol_K_line_history_90day_dict==={len(all_symbol_K_line_history_90day_dict)}") return all_symbol_K_line_history_90day_dict # 3.获取90个交易日内标的股票的K线指标 @staticmethod def get_property_limit_mark(it_K_line, symbol): historical_high_price = 0.01 # 初始化历史最高价 historical_low_price = 0.01 # 初始化历史最低价 historical_low_price_index = 0 # 初始化历史最高价序号 historical_high_price_index = 0 # 初始化历史最低价序号 historical_average_price = 0.01 # 初始化历史均价 large_amplitude_day = [] # 初始化大幅震荡日期 current_count = 0 # 当前连续满足条件的天数 start = None results = [] dates_list = [] # 查询交易标的基本信息并赋值 sec_name = data_cache.DataCache().code_name_dict.get(symbol, "default_name") # print(f"K_line_sec_name=={sec_name}") # 将获取到的公司名称直接 生成字典放到 K线的第一个序列中 it_K_line[0]['sec_name'] = sec_name for i in range(0, len(it_K_line) - 1): previous_close = it_K_line[i + 1]['close'] # 昨日收盘价 previous_high = it_K_line[i + 1]['high'] # 昨日最高价 current_open = round(it_K_line[i]['open'], 2) # 当日开盘价 current_close = round(it_K_line[i]['close'], 2) # 当日收盘价 current_high = round(it_K_line[i]['high'], 2) # 当日最高价 current_low = round(it_K_line[i]['low'], 2) # 当日最低价 limit_up_price = basic_methods.limit_up_price(previous_close) # 计算出的当天涨停价 limit_up_price = float(limit_up_price) # 将涨停价转化为浮点 limit_down_price = basic_methods.limit_down_price(previous_close) # 计算出的当天跌停价 limit_down_price = float(limit_down_price) # 将跌停价转化为浮点 current_today_volume = it_K_line[i]['volume'] # 当日总交易量 current_yesterday_volume = it_K_line[i + 1]['volume'] # 昨日总交易量 # current_today_amount = it_K_line[i]['amount'] #当日总交易额 current_today_growth = basic_methods.intraday_growth(current_close, previous_close) # 计算出当日涨幅 it_K_line[i]['today_growth'] = current_today_growth # 将当日涨幅数据添加到字典中对应的日期中 current_today_amplitude = round((current_high - current_low) / previous_close, 2) # 计算出当日振幅 it_K_line[i]['today_amplitude'] = current_today_amplitude # 将当日振幅数据添加到字典中对应的日期中 historical_high_price = round(max(i['high'] for i in it_K_line), 2) # 计算出历史最高价 historical_high_price_index = max(range(len(it_K_line)), key=lambda ii: it_K_line[ii]['high']) # 找到最高价的索引 historical_low_price = round(min(i['low'] for i in it_K_line), 2) # 计算出历史最低价 historical_low_price_index = min(range(len(it_K_line)), key=lambda ii: it_K_line[ii]['low']) # 找到最低价的索引 historical_average_price = round((historical_high_price + historical_low_price) / 2, 2) # 计算出历史最均价 # 把最高价、最高价序号、最低价、最低价序号,添加到K线的昨日字典中 new_properties = { 'historical_high_price': historical_high_price, 'historical_high_price_index': historical_high_price_index, 'historical_low_price': historical_low_price, 'historical_low_price_index': historical_low_price_index, 'historical_average_price': historical_average_price } it_K_line[0].update(new_properties) # 计算大振幅的日期并添加到大振幅日期列表中 if not (-4 < it_K_line[i]['today_growth'] < 4): large_amplitude_day.append(it_K_line[i]['bob']) # 判断K线图形风险位置 及 高低时间点 if historical_low_price_index > historical_high_price_index: # 序列小于1是历史最高价 历史长度90,其实还可以根据更长来判断更显程度,历史长度很短的一般认为具有更高风险,而更长的之前有可能有更高的历史高位还比较远 # 最低价在前 it_K_line[0]['risk_position'] = 'low_price_ahead' ''' 以下部分条件分支由于都是对 it_K_line[0]['risk_position'] 数据进行修改, 条件判断逻辑有部分重叠,所以应按照逻辑的覆盖面进行先后判断,避免先把大部分过滤掉了。 所以当前条件分支语句的顺序不要轻易更换。 # [(historical_high_price / round(it_K_line[0]['close'], 2) < 1.05)的预设值取值范围选择其一: # {"均价以上近前高的开盘价":1.05,"接近前高 的跌停价":1.11,"接近前高 的一个炸板加一个跌停的收盘价":1.17,"均价与最高价之间的涨停价":1.21,"均价与最高价之间的价":1.33} ''' # 最高位据昨日较远 且 昨收价距历史均价差距较小 且 整体涨跌幅度较大 且 最高位 与 最低位为三倍涨幅的妖股【当前二度试探风险】 if (2 < historical_high_price_index < 15 and len(it_K_line) >= 90) and ( (historical_average_price / round(it_K_line[0]['close'], 2)) < 2) and ( historical_high_price > historical_low_price * 3): it_K_line[0]['risk_position'] = 'recent_monster_second_break_risk' # 最高位据昨日较远 且 昨收价距历史均价差距较小 且 整体涨跌幅度较大 【当前二度突破风险】 if (2 < historical_high_price_index < 15 and len(it_K_line) >= 90) and ( (historical_average_price / round(it_K_line[0]['close'], 2)) < 1.17) and ( historical_high_price > historical_low_price * 1.8): it_K_line[0]['risk_position'] = 'recent_second_break_risk' # 最高位据昨日较远 且 昨收价距历史最高价差距较小(小于5%,涨幅5%是左右是最容易把低吸骗进去的日内涨幅) 且 整体涨跌幅度较大 【当前二度突破近前高风险】 # 【双成药业 2024-11-08 这样的二次近前高,实际数值为1.4646...,为安全起见还是设置为1.5】 if (2 < historical_high_price_index < 15 and len(it_K_line) >= 90) and ( historical_high_price / round(it_K_line[0]['close'], 2) < 1.2) and ( historical_high_price > historical_low_price * 2): it_K_line[0]['risk_position'] = 'recent_second_break_near_high_position_risk' # 昨日就是最高位 且 整体涨跌幅度较大 【当前高位风险】 if (historical_high_price_index < 2 and len(it_K_line) >= 90) and ( historical_high_price > historical_low_price * 1.8): it_K_line[0]['risk_position'] = 'recent_high_position_risk' else: # 最高价在前 it_K_line[0]['risk_position'] = 'high_price_ahead' # 最低位据昨日较远 且 昨收价距历史均价差距较小 且 整体涨跌幅度较大 【当前二度崩溃风险】 if (2 < historical_low_price_index < 15 and len(it_K_line) >= 90) and ( (historical_average_price / round(it_K_line[0]['close'], 2)) > 1.1): it_K_line[0]['risk_position'] = 'recent_second_crash_risk' # 昨日就是最低位 且 整体涨跌幅度较大 【当前低位风险】 if (historical_low_price_index < 2 and len(it_K_line) >= 90) and ( historical_high_price > historical_low_price * 1.5): it_K_line[0]['risk_position'] = 'recent_low_position_risk' # 判断K线图形中连续低涨幅天数及具体序列号并把共识价格均价写入文件【阶段性价格共识位置】 if 0 < it_K_line[i]['today_amplitude'] < 0.025 and i < 60 and ( historical_high_price > historical_low_price * 1.5): if current_count == 0: start = i # 如果当前涨幅小于阈值,增加连续天数计数器 current_count += 1 # 如果循环复合条件的连续天数达到5天,记录起始索引 if current_count > 5: # 检查 results 是否为空或当前序列不与最后一个序列重叠 if not results or results[-1][1] < start: if 0.98 < it_K_line[i]['high'] / it_K_line[start]['low'] < 1.02: # 没有重叠,直接添加【其中start为循环到起始序号,i为循环到的结束序号。在实际的日期中要反过来】 results.append((start, i)) # 算出价格共识区间的均价 average_consensus_price = round((it_K_line[i]['high'] + it_K_line[start]['low']) / 2, 2) dates_list.append(( it_K_line[start]['bob'].strftime('%Y-%m-%d %H:%M:%S'), it_K_line[i]['bob'].strftime('%Y-%m-%d %H:%M:%S'), average_consensus_price)) # if len(dates_list) > 1: # 如果只需要检测包含多段价格共识区 it_K_line[0]['rise_and_fall_is_small_start_and_end_date'] = dates_list else: # 重置计数器 current_count = 0 start = None # 确保不会出现除以零的报错 if current_yesterday_volume != 0: if round(current_today_volume / current_yesterday_volume, 2) > 1.1: # print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【放量】") if current_today_growth > 0: it_K_line[i]['today_volume_shape'] = 'increases_up' # print(f"i=={i} {it_K_line[i]['bob']} 【放量上涨】") elif current_today_growth < 0: it_K_line[i]['today_volume_shape'] = 'increases_down' # print(f"i=={i} {it_K_line[i]['bob']} 【放量下跌】") else: it_K_line[i]['today_volume_shape'] = 'increases_balance' # print(f"i=={i} {it_K_line[i]['bob']} 【放量平收】") elif round(current_today_volume / current_yesterday_volume, 2) < 0.9: # print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【缩量】") if current_today_growth > 0: it_K_line[i]['today_volume_shape'] = 'decreases_up' # print(f"i=={i} {it_K_line[i]['bob']} 【缩量上涨】") elif current_today_growth < 0: it_K_line[i]['today_volume_shape'] = 'decreases_down' # print(f"i=={i} {it_K_line[i]['bob']} 【缩量下跌】") else: it_K_line[i]['today_volume_shape'] = 'decreases_balance' # print(f"i=={i} {it_K_line[i]['bob']} 【缩量平收】") else: # print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【平量】") if current_today_growth > 0: it_K_line[i]['today_volume_shape'] = 'remained_up' # print(f"i=={i} {it_K_line[i]['bob']} 【平量上涨】") elif current_today_growth < 0: it_K_line[i]['today_volume_shape'] = 'remained_down' # print(f"i=={i} {it_K_line[i]['bob']} 【平量下跌】") else: it_K_line[i]['today_volume_shape'] = 'remained_balance' # print(f"i=={i} {it_K_line[i]['bob']} 【平量平收】") else: logger.info(f"{symbol} 的 昨日成交量 为 0,报错!!") if current_open - previous_close > 0: # print(f"i=={i} {it_K_line[i]['bob']} 成交总量:{today_volume},,,成交总金额:{today_amount}") # difference = current_close - previous_close if current_close - current_open > 0: it_K_line[i]['attribute'] = 'up_up' # print(f"i=={i} {it_K_line[i]['bob']} 高开高走 【昨收价:{previous_close} 收盘价:{current_close}】") else: it_K_line[i]['attribute'] = 'up_down' # print(f"i=={i} {it_K_line[i]['bob']} 高开低走 【昨收价:{previous_close} 收盘价:{current_close}】") else: if current_close - current_open > 0: it_K_line[i]['attribute'] = 'down_up' # print(f"i=={i} {it_K_line[i]['bob']} 低开高走 【昨收价:{previous_close} 收盘价:{current_close}】") else: it_K_line[i]['attribute'] = 'down_down' # print(f"i=={i} {it_K_line[i]['bob']} 低开低走 【昨收价:{previous_close} 收盘价:{current_close}】") if abs(current_high - limit_up_price) < 0.001 and abs(current_close - limit_up_price) < 0.001: # print(f'整个列表更新了吗 ? it_K_line==={it_K_line}') if abs(current_open - limit_up_price) < 0.001 and abs(limit_up_price - current_low) < 0.01: it_K_line[i]['attribute'] = 'one_line_limit_up' # print(f"i=={i} {it_K_line[i]['bob']} 一字涨停板 ") # 打印出当前行的'bob' elif abs(current_low - limit_down_price) < 0.01: it_K_line[i]['attribute'] = 'limit_down_then_limit_up' # print(f"i=={i} {it_K_line[i]['bob']} [准]地天板") # 打印出当前行的'bob' elif abs(current_open - limit_up_price) < 0.001 and abs(current_low - limit_down_price) < 0.01: it_K_line[i]['attribute'] = 'limit_up_then_limit_down_then_limit_up' # print(f"i=={i} {it_K_line[i]['bob']} 一字涨停天地天板") # 打印出当前行的'bob' else: it_K_line[i]['attribute'] = 'limit_up' # print(f"i=={i} {it_K_line[i]['bob']} 涨停板") # 打印出当前行的'bob' if abs(current_high - limit_up_price) < 0.001 <= abs(current_close - limit_up_price) and abs( current_close - limit_down_price) >= 0.001: it_K_line[i]['attribute'] = 'frying_plate' # print(f'在炸板日的列表中更新了对应的字典的键对值吗 ? it_K_line[i]==={it_K_line[i]}') # print(f'整个列表更新了吗 ? it_K_line==={it_K_line}') if abs(previous_close - previous_high) >= 0.001: it_K_line[i]['attribute'] = 'first_frying_plate' # print(f"i=={i} {it_K_line[i]['bob']} 首板炸板") # 打印出当前行的'bob' elif abs(current_open - limit_up_price) < 0.01: it_K_line[i]['attribute'] = 'one_line_limit_up_then_frying_plate' # print(f"i=={i} {it_K_line[i]['bob']} 一字天炸") # 打印出当前行的'bob' elif abs(limit_down_price - limit_up_price) < 0.01 and abs(current_low - limit_down_price) < 0.01: it_K_line[i]['attribute'] = 'one_line_limit_up_then_frying_plate_then_limit_down' # print(f"i=={i} {it_K_line[i]['bob']} 一字天地") # 打印出当前行的'bob' else: it_K_line[i]['attribute'] = 'not_first_frying_plate' # print(f"i=={i} {it_K_line[i]['bob']} 非首板炸板") # 打印出当前行的'bob' if abs(current_low - limit_down_price) < 0.001: if abs(current_close - limit_down_price) < 0.001: if abs(current_high - limit_up_price) < 0.001: it_K_line[i]['attribute'] = 'limit_up_then_limit_down' # print(f"i=={i} {it_K_line[i]['bob']} [准]天地板") # 打印出当前行的'bob' elif abs(current_open - limit_down_price) < 0.001 and abs(current_high - limit_down_price) < 0.01: it_K_line[i]['attribute'] = 'one_line_limit_down' # print(f"i=={i} {it_K_line[i]['bob']} 一字跌停板") # 打印出当前行的'bob' else: it_K_line[i]['attribute'] = 'limit_down' # print(f"i=={i} {it_K_line[i]['bob']} 跌停板") # 打印出当前行的'bob' elif abs(current_close - limit_up_price) >= 0.001: it_K_line[i]['attribute'] = 'touch_limit_down' # print(f"i=={i} {it_K_line[i]['bob']} 触及跌停") # 打印出当前行的'bob' # 将高涨跌幅日期打印出来 # it_K_line[0]['large_amplitude_day'] = large_amplitude_day # 判断K线图形长期振幅 及 当前走势 【根据大振幅天数 反推算出 长期低幅度震荡持平、上涨、下跌的个股写入字典】 if len(large_amplitude_day) < 1: # 长期低幅度震荡 it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation' # it_K_line[0]['large_amplitude_day_len'] = len(large_amplitude_day) if (historical_high_price / historical_low_price) < 1.15: if 0.99 < (historical_average_price / round(it_K_line[0]['close'], 2)) < 1.09: # 长期低幅度震荡持平 it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation_remains_stable' else: if (historical_low_price_index > historical_high_price_index) and ( historical_high_price_index < 2 and len(it_K_line) > 88): # 长期低幅度震荡上涨 it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation_and_rise' elif (historical_low_price_index < historical_high_price_index) and ( historical_low_price_index < 2 and len(it_K_line) > 88): # 长期低幅度震荡下跌 it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation_and_decline' # 实例化K线对象 k_line_history = KLineHistory() # 实例化指数K线对象 main_index_k_line_history = KLineHistory() # 在main.py中初始化函数里面就实例化上证A股和深证A股的历史K线方法【在本文件中调用且不写入本地文件时才需要在本文件内实例化】 # k_line_history.k_line_history_90day() # 写入全目标标的股票90天K线 def all_stocks_all_k_line_dict_write(): all_stocks_base_K_line_dict = k_line_history.k_line_history_90day() # 初始化所有个股的指标K线列表 all_stocks_all_K_line_property_dict = {} for i in data_cache.DataCache().filtered_stocks: # print(f"i==========={i}") i_k_line = all_stocks_base_K_line_dict[i] # 获取i的K线 i_k_line_copy = copy.deepcopy(i_k_line) # 深拷贝i的K线 # it_K_line_reversed = list(reversed(i_k_line_copy)) # 开盘啦获取的数据需要反转i的K线 it_K_line_reversed = list(i_k_line_copy) # 小辉端的数据不需要反转i的K线 if not it_K_line_reversed: continue k_line_history.get_property_limit_mark(it_K_line_reversed, i) # 给标的的K线更新指标属性 把股票代码同时传给要调用的函数 symbol_K_line_property_dict = {i: it_K_line_reversed} # 添加 更新极限指标属性的K线 字典 # print(f"symbol_K_line_property_dict===={symbol_K_line_property_dict}") all_stocks_all_K_line_property_dict.update(symbol_K_line_property_dict) # 构造时间格式datetime转化为字符串,以便将K线属性指标转化为json格式写入本地文件 def convert_datetime(obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') # 转换为字符串 elif isinstance(obj, dict): return {k: convert_datetime(v) for k, v in obj.items()} # 递归处理字典 elif isinstance(obj, list): return [convert_datetime(element) for element in obj] # 递归处理列表 # 可以添加其他类型的处理逻辑 else: # 对于未知类型,你可以选择保留原样、跳过或引发异常 # 这里我们选择保留原样 return obj try: json_data = json.dumps(convert_datetime(all_stocks_all_K_line_property_dict), ensure_ascii=False, indent=4) # 将转换后的JSON字符串写入文件 with open(constant.K_BARS_PATH, 'w', encoding='utf-8') as f: f.write(json_data) async_log_util.info(logger_k_bars, json_data) except Exception as error: print(f"An error occurred while converting the data to JSON: {error}") logger.info(f"标的个股历史k线写完了!{tool.get_now_time_str()}") # 用开盘啦数据检测昨日的K线中涨停属性是否有误(盘尾 集合竞价 炸开一个卖一档,但涨幅未变的) def check_limit_up_attribute(): for key, values in data_cache.all_stocks_all_K_line_property_dict.items(): # print(f"data_cache.limit_up_code_list==={data_cache.limit_up_code_list}") if 'attribute' in values[0]: if key not in data_cache.yesterday_limit_up_code_list and values[0]['attribute'] in data_cache.limit_up_type: logger.info(f"在K线中昨日涨停,但是在开盘啦中没有,key==={key} 公司名称==={values[0]['sec_name']} ") data_cache.yesterday_frying_plate_last_minute_list.append(key) else: logger.info(f"K线中缺失昨日属性键值 {values[0]['sec_name']}:values[0]==={values[0]}") # check_limit_up_attribute() # 实时检测是否拉取K线线程的函数 def check_time_and_data_date(): while True: try: # 声明赋值实时时间 now_time = tool.get_now_time_str() # print(f"now_time==={now_time}") # if now_time > data_cache.AFTER_CLOSING_TIME and data_cache.execution is False: # # if now_time > data_cache.AFTER_CLOSING_TIME: # data_cache.execution = True # # 整理当日涨停信息并写入本地管理好本地数据 # plate_strength_analysis.get_arrange_limit_up_info() # logger.info(f"整理当日涨停信息 已经运行完成") # # # 获取所有个股的板块概念并写入文件【耗时较长应该放在 核心主线程 和 仓位管理 后面】 # plate_strength_analysis.get_all_stocks_plate_dict(data_cache.min_stocks) # # logger.info(f"获取所有个股的板块概念 已经运行完成") # 构造一个循环检测K线子带你中所有个股代码下的日期是不是和某日一致的,如果不一致则返回False def check_data_date(target_date): # 再判断一下全局化缓存文件中的抽样日期是不是目标日期 if data_cache.all_stocks_all_K_line_property_dict['SZSE.000034'][0]['bob'][:10] == target_date: data_date_and_target_date = True return data_date_and_target_date # for key,value in data_cache.all_stocks_all_K_line_property_dict.items(): # print(f"key===={key}") # print(f"value[0]['bob'][:10]===={value[0]['bob'][:10]}") # if value[0]['bob'][:10] != target_date and len(data_cache.all_stocks_all_K_line_property_dict) < 1 and key not in data_cache.no_k_line_stocks: # data_date_and_target_date = False # return data_date_and_target_date # 盘中不进行时间与数据的判断(也不能早于服务器重启时间,因为次日凌晨拉取的K线会有错误),一方面判断应该在开盘前完成,另一方面收盘后离最新拉取时间都还有很长时间 # 在这个时间段内运行 9:00--9:30 或 15:00--23:00 if data_cache.SERVER_RESTART_TIME < now_time < data_cache.OPENING_TIME or data_cache.CLOSING_TIME < now_time < data_cache.PROGRAM_SLEEP_TIME: # 然后判断一下,当K线字典中的任意一只个股的的第一个K线表中的第一个日期 等于 上一个交易日日期 或 今日日期 且 运行时时间未到 18:30 那么不需要新拉取直接读取已有的就行, # 否者还是得调用K线对象方法拉取,并重新读取赋值全局化 # if hour < 18 or (hour == 18 and minute < 31): if now_time < data_cache.UPDATE_DATA_TIME: # if now_time < data_cache.closing_time: check_pre_trading_day = check_data_date(data_cache.DataCache().pre_trading_day) if check_pre_trading_day is True: # if hour >= 17: if now_time > data_cache.CHECKING_DATA_TIME: print( f"未到【18:31】 但 既有数据 【是】 上个交易日数据 【不执行拉取K线】 ε=ε=ε=ε=ε=ε=ε=ε=ε=ε=ε=ε=ε(/’-‘)/") else: print( f"未到【18:31】 且 既有数据 【不是】 上个交易日数据 执行拉取K线 【既有数据日期:{data_cache.all_stocks_all_K_line_property_dict['SZSE.000034'][0]['bob'][:10]} 上个交易日:{data_cache.DataCache().pre_trading_day}】") # 调用指标K线写入本地文件 all_stocks_all_k_line_dict_write() # 读取已经获取到并存储在本地的所有票的指标K线字典,并赋值给data_cache全局缓存 if os.path.exists(constant.K_BARS_PATH): with open(constant.K_BARS_PATH, 'r', encoding='utf-8') as f: data_cache.all_stocks_all_K_line_property_dict = json.load(f) # print( # f"data_cache.all_stocks_all_K_line_property_dict的个数{len(data_cache.all_stocks_all_K_line_property_dict)}") else: check_today_date = check_data_date(data_cache.DataCache().today_date) if check_today_date is True: print(f"到了【18:31】 但 既有数据 【是】 本交易日数据 【不执行拉取K线】 ε=ε=ε=ε=ε=ε=ε=ε=ε=ε=ε=ε=ε(/’-‘)/") else: print( f"到了【18:31】 且 既有数据 【不是】 本交易日数据 执行拉取K线 【既有数据日期:{data_cache.all_stocks_all_K_line_property_dict['SZSE.000034'][0]['bob'][:10]} 本交易日:{data_cache.DataCache().today_date}】") # 调用指标K线写入本地文件 all_stocks_all_k_line_dict_write() # 读取已经获取到并存储在本地的所有票的指标K线字典,并赋值给data_cache全局缓存 if os.path.exists(constant.K_BARS_PATH): with open(constant.K_BARS_PATH, 'r', encoding='utf-8') as f: data_cache.all_stocks_all_K_line_property_dict = json.load(f) # print( # f"data_cache.all_stocks_all_K_line_property_dict的个数{len(data_cache.all_stocks_all_K_line_property_dict)}") except Exception as error: logger.exception(error) print(f"实时检测是否拉取K线线程报错An error occurred: {error}") finally: current_time = datetime.datetime.now() # 使用strf time来格式化时间为字符串,只显示时分秒 formatted_time = current_time.strftime("%H:%M:%S") # print(f"当前-时间:{formatted_time}") time.sleep(1) # 历史K线累计涨停天数函数 def count_limit_up_day(k_line_data): limit_up_day = 0 # 初始化涨停天数 # 计算90天内涨停天数 for i in k_line_data: if 'attribute' in i and ( i['attribute'] == 'one_line_limit_up' or i['attribute'] == 'limit_down_then_limit_up' or i['attribute'] == 'limit_up_then_limit_down_then_limit_up' or i['attribute'] == 'limit_up'): limit_up_day += 1 # print(f"涨停提日期:{i['bob']}") # print(f"涨停天数----limit_up_day======{limit_up_day}") return limit_up_day # 只在本文件内测试时开启调用 # all_stocks_all_k_line_dict_write() # 原始K线管理器 class HistoryKDataManager: __instance = None __db = 0 __history_k_day_datas = {} def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(HistoryKDataManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __load_data(cls): pass def __get_cache_dir(self): """ 获取缓存路径 @return: """ dir_path = f"{constant.get_path_prefix()}/datas/k_bars" if not os.path.exists(dir_path): os.makedirs(dir_path) return dir_path def __del_outdate_datas(self, code): dir_path = self.__get_cache_dir() datas = [] for root, dirs, files in os.walk(dir_path): for file in files: # 输出文件的绝对路径 if file.find(code) < 0: continue path_ = os.path.join(root, file) day = file.split("_")[0] datas.append((day, path_)) # 保存最新的一条数据 if datas: datas.sort(key=lambda x: int(x[0].replace("-", "")), reverse=True) datas = datas[1:] for d in datas: os.remove(d[1]) def save_history_bars(self, code, day, datas, force=False): """ 保存历史K线 @param code: 代码 @param day: K线最新的日期(取datas最新一条数据的日期) @param datas: 数据 @return: """ cache_dir = self.__get_cache_dir() file_name = f"{day}_{code}.txt" path_str = f"{cache_dir}/{file_name}" if os.path.exists(path_str) and not force: return if day not in self.__history_k_day_datas: self.__history_k_day_datas[day] = {} if datas: self.__history_k_day_datas[day][code] = datas # 将日期格式化 fdatas = [] for d in datas: dd = copy.deepcopy(d) for k in dd: if type(dd[k]) == datetime.datetime: dd[k] = dd[k].strftime("%Y-%m-%d %H:%M:%S") fdatas.append(dd) with open(path_str, encoding="utf-8", mode='w') as f: f.write(f"{fdatas}") self.__del_outdate_datas(code) def get_history_bars(self, code, day): """ 获取历史K线 @param code: @param day: @return: """ if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]: return self.__history_k_day_datas[day][code] cache_dir = self.__get_cache_dir() file_name = f"{day}_{code}.txt" path_str = f"{cache_dir}/{file_name}" if not os.path.exists(path_str): return None with open(path_str, encoding="utf-8", mode='r') as f: line = f.readline() if line: datas = eval(line) # 将日期格式转为datetime for d in datas: for k in d: if type(d[k]) == str and d[k].find("-") > 0 and d[k].find(":") > 0 and d[k].find(" ") > 0: d[k] = datetime.datetime.strptime(d[k], "%Y-%m-%d %H:%M:%S") return datas return None def get_pre_close(self, code, day): """ 获取之前的收盘价 @param code: @param day: @return: """ if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]: return self.__history_k_day_datas[day][code][0]["close"] return None def get_history_bars_codes(self, day): """ 获取某一天的历史K线的代码数据 @param day: @return: 代码集合 """ dir_path = self.__get_cache_dir() codes = set() for root, dirs, files in os.walk(dir_path): for file in files: # 输出文件的绝对路径 if file.find(day) >= 0: codes.add(file.split("_")[1][:6]) return codes