# 历史走势分析,及价值面中的历史行情部分(针对历史行情走势进行获取与分析)
|
# coding=utf-8
|
from __future__ import print_function, absolute_import, unicode_literals
|
|
import copy
|
# import logging
|
import os
|
import time
|
import datetime
|
# from datetime import datetime
|
import json
|
|
import constant
|
from log_module.log import logger_common
|
# 引入掘金API
|
# import utils.juejin_api
|
# import kpl_api
|
from strategy import data_cache
|
|
# 引入基础算法模块
|
from strategy import basic_methods
|
# 引入本地日志模块
|
from utils import hx_qc_value_util, tool
|
|
# 获取logger实例
|
logger = logger_common
|
|
start_time = time.time()
|
print(f"all_K_line开始运行--{start_time}")
|
|
|
def init_data():
|
pass
|
|
|
init_data()
|
|
|
# 90个交易日历史K线类对象
|
class KLineHistory:
|
all_stocks_base_K_line_dict = {}
|
|
# 1.类的构造方法,定义类的各个属性
|
def __init__(self):
|
pass
|
|
def init(self, today_date, Next_trading_day, symbols):
|
self.today_date = today_date # 定义属性today_date 当前交易日期
|
self.Next_trading_day = Next_trading_day # 定义属性Next_trading_day 下个交易日期
|
self.symbols = symbols
|
|
# 1.获取90个交易日期或者获取90个交易日期前的具体日期,为历史K线的start_time获取数据
|
|
def k_line_history_90day(self):
|
# 为了解决步长老是差一个,所以设定为92
|
start_time_data = basic_methods.pre_num_trading_day(self.today_date, 90)
|
# print(f"90个交易日前日期=========={start_time_data}")
|
# logger.info("获取历史K线===开始")
|
|
# 如果是16:00之前,截止日期为:前一天,如果在16:00之后截止日期为后一天
|
if int(tool.get_now_time_str().replace(":", "")) < int("160000"):
|
end_date = data_cache.DataCache().pre_trading_day
|
else:
|
end_date = data_cache.DataCache().today_date
|
|
all_symbol_K_line_history_90day_dict = {}
|
for symbol in self.symbols:
|
code = symbol[-6:]
|
k_bars = HistoryKDataManager().get_history_bars(code, end_date)
|
if not k_bars:
|
# 通过掘金接口迭代获取个股的90天基础K线,得到一个日期排列结果为増序的列表
|
k_bars = hx_qc_value_util.get_history_k_bars(symbol[-6:], 90,
|
end_date=end_date)
|
if k_bars:
|
# 将K线保存到文件
|
HistoryKDataManager().save_history_bars(symbol[-6:], k_bars[0]['bob'].strftime("%Y-%m-%d"),
|
k_bars)
|
# print(f"symbol==={symbol}")
|
# print(f"K_line_history_90day_data ====={K_line_history_90day_data} ")
|
# 将获取到的个股90天基础K线赋值给迭代个股的一个新建字典的symbol对应的value
|
all_symbol_K_line_history_90day_dict[symbol] = k_bars
|
# print(f"all_symbol_K_line_history_90day_dict==={len(all_symbol_K_line_history_90day_dict)}")
|
|
return all_symbol_K_line_history_90day_dict
|
|
# 3.获取90个交易日内标的股票的K线指标
|
@staticmethod
|
def get_property_limit_mark(it_K_line, symbol):
|
historical_high_price = 0.01 # 初始化历史最高价
|
historical_low_price = 0.01 # 初始化历史最低价
|
historical_low_price_index = 0 # 初始化历史最高价序号
|
historical_high_price_index = 0 # 初始化历史最低价序号
|
historical_average_price = 0.01 # 初始化历史均价
|
large_amplitude_day = [] # 初始化大幅震荡日期
|
current_count = 0 # 当前连续满足条件的天数
|
start = None
|
results = []
|
dates_list = []
|
# 查询交易标的基本信息并赋值
|
sec_name = data_cache.DataCache().code_name_dict.get(symbol, "default_name")
|
# print(f"K_line_sec_name=={sec_name}")
|
# 将获取到的公司名称直接 生成字典放到 K线的第一个序列中
|
it_K_line[0]['sec_name'] = sec_name
|
for i in range(0, len(it_K_line) - 1):
|
previous_close = it_K_line[i + 1]['close'] # 昨日收盘价
|
previous_high = it_K_line[i + 1]['high'] # 昨日最高价
|
current_open = round(it_K_line[i]['open'], 2) # 当日开盘价
|
current_close = round(it_K_line[i]['close'], 2) # 当日收盘价
|
current_high = round(it_K_line[i]['high'], 2) # 当日最高价
|
current_low = round(it_K_line[i]['low'], 2) # 当日最低价
|
limit_up_price = basic_methods.limit_up_price(previous_close) # 计算出的当天涨停价
|
limit_up_price = float(limit_up_price) # 将涨停价转化为浮点
|
limit_down_price = basic_methods.limit_down_price(previous_close) # 计算出的当天跌停价
|
limit_down_price = float(limit_down_price) # 将跌停价转化为浮点
|
current_today_volume = it_K_line[i]['volume'] # 当日总交易量
|
current_yesterday_volume = it_K_line[i + 1]['volume'] # 昨日总交易量
|
# current_today_amount = it_K_line[i]['amount'] #当日总交易额
|
current_today_growth = basic_methods.intraday_growth(current_close, previous_close) # 计算出当日涨幅
|
it_K_line[i]['today_growth'] = current_today_growth # 将当日涨幅数据添加到字典中对应的日期中
|
current_today_amplitude = round((current_high - current_low) / previous_close, 2) # 计算出当日振幅
|
it_K_line[i]['today_amplitude'] = current_today_amplitude # 将当日振幅数据添加到字典中对应的日期中
|
historical_high_price = round(max(i['high'] for i in it_K_line), 2) # 计算出历史最高价
|
historical_high_price_index = max(range(len(it_K_line)), key=lambda ii: it_K_line[ii]['high']) # 找到最高价的索引
|
historical_low_price = round(min(i['low'] for i in it_K_line), 2) # 计算出历史最低价
|
historical_low_price_index = min(range(len(it_K_line)), key=lambda ii: it_K_line[ii]['low']) # 找到最低价的索引
|
historical_average_price = round((historical_high_price + historical_low_price) / 2, 2) # 计算出历史最均价
|
# 把最高价、最高价序号、最低价、最低价序号,添加到K线的昨日字典中
|
new_properties = {
|
'historical_high_price': historical_high_price,
|
'historical_high_price_index': historical_high_price_index,
|
'historical_low_price': historical_low_price,
|
'historical_low_price_index': historical_low_price_index,
|
'historical_average_price': historical_average_price
|
}
|
it_K_line[0].update(new_properties)
|
|
# 计算大振幅的日期并添加到大振幅日期列表中
|
if not (-4 < it_K_line[i]['today_growth'] < 4):
|
large_amplitude_day.append(it_K_line[i]['bob'])
|
|
# 判断K线图形风险位置 及 高低时间点
|
if historical_low_price_index > historical_high_price_index:
|
# 序列小于1是历史最高价 历史长度90,其实还可以根据更长来判断更显程度,历史长度很短的一般认为具有更高风险,而更长的之前有可能有更高的历史高位还比较远
|
# 最低价在前
|
it_K_line[0]['risk_position'] = 'low_price_ahead'
|
'''
|
以下部分条件分支由于都是对 it_K_line[0]['risk_position'] 数据进行修改,
|
条件判断逻辑有部分重叠,所以应按照逻辑的覆盖面进行先后判断,避免先把大部分过滤掉了。
|
所以当前条件分支语句的顺序不要轻易更换。
|
# [(historical_high_price / round(it_K_line[0]['close'], 2) < 1.05)的预设值取值范围选择其一:
|
# {"均价以上近前高的开盘价":1.05,"接近前高 的跌停价":1.11,"接近前高 的一个炸板加一个跌停的收盘价":1.17,"均价与最高价之间的涨停价":1.21,"均价与最高价之间的价":1.33}
|
'''
|
# 最高位据昨日较远 且 昨收价距历史均价差距较小 且 整体涨跌幅度较大 且 最高位 与 最低位为三倍涨幅的妖股【当前二度试探风险】
|
if (2 < historical_high_price_index < 15 and len(it_K_line) >= 90) and (
|
(historical_average_price / round(it_K_line[0]['close'], 2)) < 2) and (
|
historical_high_price > historical_low_price * 3):
|
it_K_line[0]['risk_position'] = 'recent_monster_second_break_risk'
|
# 最高位据昨日较远 且 昨收价距历史均价差距较小 且 整体涨跌幅度较大 【当前二度突破风险】
|
if (2 < historical_high_price_index < 15 and len(it_K_line) >= 90) and (
|
(historical_average_price / round(it_K_line[0]['close'], 2)) < 1.17) and (
|
historical_high_price > historical_low_price * 1.8):
|
it_K_line[0]['risk_position'] = 'recent_second_break_risk'
|
# 最高位据昨日较远 且 昨收价距历史最高价差距较小(小于5%,涨幅5%是左右是最容易把低吸骗进去的日内涨幅) 且 整体涨跌幅度较大 【当前二度突破近前高风险】
|
# 【双成药业 2024-11-08 这样的二次近前高,实际数值为1.4646...,为安全起见还是设置为1.5】
|
if (2 < historical_high_price_index < 15 and len(it_K_line) >= 90) and (
|
historical_high_price / round(it_K_line[0]['close'], 2) < 1.5) and (
|
historical_high_price > historical_low_price * 1.8):
|
it_K_line[0]['risk_position'] = 'recent_second_break_near_high_position_risk'
|
# 昨日就是最高位 且 整体涨跌幅度较大 【当前高位风险】
|
if (historical_high_price_index < 2 and len(it_K_line) >= 90) and (
|
historical_high_price > historical_low_price * 1.8):
|
it_K_line[0]['risk_position'] = 'recent_high_position_risk'
|
else:
|
# 最高价在前
|
it_K_line[0]['risk_position'] = 'high_price_ahead'
|
# 最低位据昨日较远 且 昨收价距历史均价差距较小 且 整体涨跌幅度较大 【当前二度崩溃风险】
|
if (2 < historical_low_price_index < 15 and len(it_K_line) >= 90) and (
|
(historical_average_price / round(it_K_line[0]['close'], 2)) > 1.1):
|
it_K_line[0]['risk_position'] = 'recent_second_crash_risk'
|
# 昨日就是最低位 且 整体涨跌幅度较大 【当前低位风险】
|
if (historical_low_price_index < 2 and len(it_K_line) >= 90) and (
|
historical_high_price > historical_low_price * 1.5):
|
it_K_line[0]['risk_position'] = 'recent_low_position_risk'
|
|
# 判断K线图形中连续低涨幅天数及具体序列号并把共识价格均价写入文件【阶段性价格共识位置】
|
if 0 < it_K_line[i]['today_amplitude'] < 0.025 and i < 60 and (
|
historical_high_price > historical_low_price * 1.5):
|
if current_count == 0:
|
start = i
|
# 如果当前涨幅小于阈值,增加连续天数计数器
|
current_count += 1
|
# 如果循环复合条件的连续天数达到5天,记录起始索引
|
if current_count > 5:
|
# 检查 results 是否为空或当前序列不与最后一个序列重叠
|
if not results or results[-1][1] < start:
|
if 0.98 < it_K_line[i]['high'] / it_K_line[start]['low'] < 1.02:
|
# 没有重叠,直接添加【其中start为循环到起始序号,i为循环到的结束序号。在实际的日期中要反过来】
|
results.append((start, i))
|
# 算出价格共识区间的均价
|
average_consensus_price = round((it_K_line[i]['high'] + it_K_line[start]['low']) / 2, 2)
|
dates_list.append((
|
it_K_line[start]['bob'].strftime('%Y-%m-%d %H:%M:%S'),
|
it_K_line[i]['bob'].strftime('%Y-%m-%d %H:%M:%S'),
|
average_consensus_price))
|
# if len(dates_list) > 1: # 如果只需要检测包含多段价格共识区
|
it_K_line[0]['rise_and_fall_is_small_start_and_end_date'] = dates_list
|
else:
|
# 重置计数器
|
current_count = 0
|
start = None
|
# 确保不会出现除以零的报错
|
if current_yesterday_volume != 0:
|
if round(current_today_volume / current_yesterday_volume, 2) > 1.1:
|
# print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【放量】")
|
if current_today_growth > 0:
|
it_K_line[i]['today_volume_shape'] = 'increases_up'
|
# print(f"i=={i} {it_K_line[i]['bob']} 【放量上涨】")
|
elif current_today_growth < 0:
|
it_K_line[i]['today_volume_shape'] = 'increases_down'
|
# print(f"i=={i} {it_K_line[i]['bob']} 【放量下跌】")
|
else:
|
it_K_line[i]['today_volume_shape'] = 'increases_balance'
|
# print(f"i=={i} {it_K_line[i]['bob']} 【放量平收】")
|
|
elif round(current_today_volume / current_yesterday_volume, 2) < 0.9:
|
# print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【缩量】")
|
if current_today_growth > 0:
|
it_K_line[i]['today_volume_shape'] = 'decreases_up'
|
# print(f"i=={i} {it_K_line[i]['bob']} 【缩量上涨】")
|
elif current_today_growth < 0:
|
it_K_line[i]['today_volume_shape'] = 'decreases_down'
|
# print(f"i=={i} {it_K_line[i]['bob']} 【缩量下跌】")
|
else:
|
it_K_line[i]['today_volume_shape'] = 'decreases_balance'
|
# print(f"i=={i} {it_K_line[i]['bob']} 【缩量平收】")
|
else:
|
# print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【平量】")
|
if current_today_growth > 0:
|
it_K_line[i]['today_volume_shape'] = 'remained_up'
|
# print(f"i=={i} {it_K_line[i]['bob']} 【平量上涨】")
|
elif current_today_growth < 0:
|
it_K_line[i]['today_volume_shape'] = 'remained_down'
|
# print(f"i=={i} {it_K_line[i]['bob']} 【平量下跌】")
|
else:
|
it_K_line[i]['today_volume_shape'] = 'remained_balance'
|
# print(f"i=={i} {it_K_line[i]['bob']} 【平量平收】")
|
else:
|
logger.info(f"{symbol} 的 昨日成交量 为 0,报错!!")
|
|
if current_open - previous_close > 0:
|
# print(f"i=={i} {it_K_line[i]['bob']} 成交总量:{today_volume},,,成交总金额:{today_amount}")
|
# difference = current_close - previous_close
|
if current_close - current_open > 0:
|
it_K_line[i]['attribute'] = 'up_up'
|
# print(f"i=={i} {it_K_line[i]['bob']} 高开高走 【昨收价:{previous_close} 收盘价:{current_close}】")
|
else:
|
it_K_line[i]['attribute'] = 'up_down'
|
# print(f"i=={i} {it_K_line[i]['bob']} 高开低走 【昨收价:{previous_close} 收盘价:{current_close}】")
|
else:
|
if current_close - current_open > 0:
|
it_K_line[i]['attribute'] = 'down_up'
|
# print(f"i=={i} {it_K_line[i]['bob']} 低开高走 【昨收价:{previous_close} 收盘价:{current_close}】")
|
else:
|
it_K_line[i]['attribute'] = 'down_down'
|
# print(f"i=={i} {it_K_line[i]['bob']} 低开低走 【昨收价:{previous_close} 收盘价:{current_close}】")
|
|
if abs(current_high - limit_up_price) < 0.001 and abs(current_close - limit_up_price) < 0.001:
|
# print(f'整个列表更新了吗 ? it_K_line==={it_K_line}')
|
if abs(current_open - limit_up_price) < 0.001 and abs(limit_up_price - current_low) < 0.01:
|
it_K_line[i]['attribute'] = 'one_line_limit_up'
|
# print(f"i=={i} {it_K_line[i]['bob']} 一字涨停板 ") # 打印出当前行的'bob'
|
elif abs(current_low - limit_down_price) < 0.01:
|
it_K_line[i]['attribute'] = 'limit_down_then_limit_up'
|
# print(f"i=={i} {it_K_line[i]['bob']} [准]地天板") # 打印出当前行的'bob'
|
elif abs(current_open - limit_up_price) < 0.001 and abs(current_low - limit_down_price) < 0.01:
|
it_K_line[i]['attribute'] = 'limit_up_then_limit_down_then_limit_up'
|
# print(f"i=={i} {it_K_line[i]['bob']} 一字涨停天地天板") # 打印出当前行的'bob'
|
else:
|
it_K_line[i]['attribute'] = 'limit_up'
|
# print(f"i=={i} {it_K_line[i]['bob']} 涨停板") # 打印出当前行的'bob'
|
|
if abs(current_high - limit_up_price) < 0.001 <= abs(current_close - limit_up_price) and abs(
|
current_close - limit_down_price) >= 0.001:
|
it_K_line[i]['attribute'] = 'frying_plate'
|
# print(f'在炸板日的列表中更新了对应的字典的键对值吗 ? it_K_line[i]==={it_K_line[i]}')
|
# print(f'整个列表更新了吗 ? it_K_line==={it_K_line}')
|
if abs(previous_close - previous_high) >= 0.001:
|
it_K_line[i]['attribute'] = 'first_frying_plate'
|
# print(f"i=={i} {it_K_line[i]['bob']} 首板炸板") # 打印出当前行的'bob'
|
elif abs(current_open - limit_up_price) < 0.01:
|
it_K_line[i]['attribute'] = 'one_line_limit_up_then_frying_plate'
|
# print(f"i=={i} {it_K_line[i]['bob']} 一字天炸") # 打印出当前行的'bob'
|
elif abs(limit_down_price - limit_up_price) < 0.01 and abs(current_low - limit_down_price) < 0.01:
|
it_K_line[i]['attribute'] = 'one_line_limit_up_then_frying_plate_then_limit_down'
|
# print(f"i=={i} {it_K_line[i]['bob']} 一字天地") # 打印出当前行的'bob'
|
else:
|
it_K_line[i]['attribute'] = 'not_first_frying_plate'
|
# print(f"i=={i} {it_K_line[i]['bob']} 非首板炸板") # 打印出当前行的'bob'
|
|
if abs(current_low - limit_down_price) < 0.001:
|
if abs(current_close - limit_down_price) < 0.001:
|
if abs(current_high - limit_up_price) < 0.001:
|
it_K_line[i]['attribute'] = 'limit_up_then_limit_down'
|
# print(f"i=={i} {it_K_line[i]['bob']} [准]天地板") # 打印出当前行的'bob'
|
elif abs(current_open - limit_down_price) < 0.001 and abs(current_high - limit_down_price) < 0.01:
|
it_K_line[i]['attribute'] = 'one_line_limit_down'
|
# print(f"i=={i} {it_K_line[i]['bob']} 一字跌停板") # 打印出当前行的'bob'
|
else:
|
it_K_line[i]['attribute'] = 'limit_down'
|
# print(f"i=={i} {it_K_line[i]['bob']} 跌停板") # 打印出当前行的'bob'
|
elif abs(current_close - limit_up_price) >= 0.001:
|
it_K_line[i]['attribute'] = 'touch_limit_down'
|
# print(f"i=={i} {it_K_line[i]['bob']} 触及跌停") # 打印出当前行的'bob'
|
|
# 将高涨跌幅日期打印出来
|
# it_K_line[0]['large_amplitude_day'] = large_amplitude_day
|
|
# 判断K线图形长期振幅 及 当前走势 【根据大振幅天数 反推算出 长期低幅度震荡持平、上涨、下跌的个股写入字典】
|
if len(large_amplitude_day) < 1:
|
# 长期低幅度震荡
|
it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation'
|
# it_K_line[0]['large_amplitude_day_len'] = len(large_amplitude_day)
|
if (historical_high_price / historical_low_price) < 1.15:
|
if 0.99 < (historical_average_price / round(it_K_line[0]['close'], 2)) < 1.09:
|
# 长期低幅度震荡持平
|
it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation_remains_stable'
|
else:
|
if (historical_low_price_index > historical_high_price_index) and (
|
historical_high_price_index < 2 and len(it_K_line) > 88):
|
# 长期低幅度震荡上涨
|
it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation_and_rise'
|
elif (historical_low_price_index < historical_high_price_index) and (
|
historical_low_price_index < 2 and len(it_K_line) > 88):
|
# 长期低幅度震荡下跌
|
it_K_line[0]['long_term_amplitude'] = 'low_amplitude_oscillation_and_decline'
|
|
|
# 实例化K线对象
|
k_line_history = KLineHistory()
|
# 实例化指数K线对象
|
main_index_k_line_history = KLineHistory()
|
|
|
# 在main.py中初始化函数里面就实例化上证A股和深证A股的历史K线方法【在本文件中调用且不写入本地文件时才需要在本文件内实例化】
|
# k_line_history.k_line_history_90day()
|
|
# 写入全目标标的股票90天K线
|
def all_stocks_all_k_line_dict_write():
|
all_stocks_base_K_line_dict = k_line_history.k_line_history_90day()
|
# 初始化所有个股的指标K线列表
|
all_stocks_all_K_line_property_dict = {}
|
for i in data_cache.DataCache().filtered_stocks:
|
# print(f"i==========={i}")
|
i_k_line = all_stocks_base_K_line_dict[i] # 获取i的K线
|
i_k_line_copy = copy.deepcopy(i_k_line) # 深拷贝i的K线
|
# it_K_line_reversed = list(reversed(i_k_line_copy)) # 开盘啦获取的数据需要反转i的K线
|
it_K_line_reversed = list(i_k_line_copy) # 小辉端的数据不需要反转i的K线
|
if not it_K_line_reversed:
|
continue
|
k_line_history.get_property_limit_mark(it_K_line_reversed, i) # 给标的的K线更新指标属性 把股票代码同时传给要调用的函数
|
symbol_K_line_property_dict = {i: it_K_line_reversed} # 添加 更新极限指标属性的K线 字典
|
# print(f"symbol_K_line_property_dict===={symbol_K_line_property_dict}")
|
all_stocks_all_K_line_property_dict.update(symbol_K_line_property_dict)
|
|
# 构造时间格式datetime转化为字符串,以便将K线属性指标转化为json格式写入本地文件
|
def convert_datetime(obj):
|
if isinstance(obj, datetime.datetime):
|
return obj.strftime('%Y-%m-%d %H:%M:%S') # 转换为字符串
|
elif isinstance(obj, dict):
|
return {k: convert_datetime(v) for k, v in obj.items()} # 递归处理字典
|
elif isinstance(obj, list):
|
return [convert_datetime(element) for element in obj] # 递归处理列表
|
# 可以添加其他类型的处理逻辑
|
else:
|
# 对于未知类型,你可以选择保留原样、跳过或引发异常
|
# 这里我们选择保留原样
|
return obj
|
|
try:
|
json_data = json.dumps(convert_datetime(all_stocks_all_K_line_property_dict), ensure_ascii=False, indent=4)
|
# 将转换后的JSON字符串写入文件
|
with open(constant.K_BARS_PATH, 'w', encoding='utf-8') as f:
|
f.write(json_data)
|
except Exception as error:
|
print(f"An error occurred while converting the data to JSON: {error}")
|
logger.info(f"标的个股历史k线写完了!{tool.get_now_time_str()}")
|
|
|
# 用开盘啦数据检测昨日的K线中涨停属性是否有误(盘尾 集合竞价 炸开一个卖一档,但涨幅未变的)
|
def check_limit_up_attribute():
|
for key, values in data_cache.all_stocks_all_K_line_property_dict.items():
|
# print(f"data_cache.limit_up_code_list==={data_cache.limit_up_code_list}")
|
if 'attribute' in values[0]:
|
if key not in data_cache.yesterday_limit_up_code_list and values[0]['attribute'] in data_cache.limit_up_type:
|
logger.info(f"在K线中昨日涨停,但是在开盘啦中没有,key==={key} 公司名称==={values[0]['sec_name']} ")
|
data_cache.yesterday_frying_plate_last_minute_list.append(key)
|
else:
|
logger.info(f"K线中缺失昨日属性键值 {values[0]['sec_name']}:values[0]==={values[0]}")
|
|
|
# check_limit_up_attribute()
|
|
|
# 实时检测是否拉取K线线程的函数
|
def check_time_and_data_date():
|
while True:
|
try:
|
# 声明赋值实时时间
|
now_time = tool.get_now_time_str()
|
|
# print(f"now_time==={now_time}")
|
# if now_time > data_cache.AFTER_CLOSING_TIME and data_cache.execution is False:
|
# # if now_time > data_cache.AFTER_CLOSING_TIME:
|
# data_cache.execution = True
|
# # 整理当日涨停信息并写入本地管理好本地数据
|
# kpl_api.get_arrange_limit_up_info()
|
# logger.info(f"整理当日涨停信息 已经运行完成")
|
# # # 获取所有个股的板块概念并写入文件【耗时较长应该放在 核心主线程 和 仓位管理 后面】
|
# kpl_api.get_all_stocks_plate_dict(data_cache.min_stocks)
|
# # logger.info(f"获取所有个股的板块概念 已经运行完成")
|
|
# 构造一个循环检测K线子带你中所有个股代码下的日期是不是和某日一致的,如果不一致则返回False
|
def check_data_date(target_date):
|
# 再判断一下全局化缓存文件中的抽样日期是不是目标日期
|
if data_cache.all_stocks_all_K_line_property_dict['SZSE.000034'][0]['bob'][:10] == target_date:
|
data_date_and_target_date = True
|
return data_date_and_target_date
|
# for key,value in data_cache.all_stocks_all_K_line_property_dict.items():
|
# print(f"key===={key}")
|
# print(f"value[0]['bob'][:10]===={value[0]['bob'][:10]}")
|
# if value[0]['bob'][:10] != target_date and len(data_cache.all_stocks_all_K_line_property_dict) < 1 and key not in data_cache.no_k_line_stocks:
|
# data_date_and_target_date = False
|
# return data_date_and_target_date
|
|
# 盘中不进行时间与数据的判断(也不能早于服务器重启时间,因为次日凌晨拉取的K线会有错误),一方面判断应该在开盘前完成,另一方面收盘后离最新拉取时间都还有很长时间
|
# 在这个时间段内运行 9:00--9:30 或 15:00--23:00
|
if data_cache.SERVER_RESTART_TIME < now_time < data_cache.OPENING_TIME or data_cache.CLOSING_TIME < now_time < data_cache.PROGRAM_SLEEP_TIME:
|
# 然后判断一下,当K线字典中的任意一只个股的的第一个K线表中的第一个日期 等于 上一个交易日日期 或 今日日期 且 运行时时间未到 18:30 那么不需要新拉取直接读取已有的就行,
|
# 否者还是得调用K线对象方法拉取,并重新读取赋值全局化
|
# if hour < 18 or (hour == 18 and minute < 31):
|
if now_time < data_cache.UPDATE_DATA_TIME:
|
# if now_time < data_cache.closing_time:
|
check_pre_trading_day = check_data_date(data_cache.DataCache().pre_trading_day)
|
if check_pre_trading_day is True:
|
# if hour >= 17:
|
if now_time > data_cache.CHECKING_DATA_TIME:
|
print(
|
f"未到【18:31】 但 既有数据 【是】 上个交易日数据 【不执行拉取K线】 ε=ε=ε=ε=ε=ε=ε=ε=ε=ε=ε=ε=ε(/’-‘)/")
|
else:
|
print(
|
f"未到【18:31】 且 既有数据 【不是】 上个交易日数据 执行拉取K线 【既有数据日期:{data_cache.all_stocks_all_K_line_property_dict['SZSE.000034'][0]['bob'][:10]} 上个交易日:{data_cache.DataCache().pre_trading_day}】")
|
# 调用指标K线写入本地文件
|
all_stocks_all_k_line_dict_write()
|
# 读取已经获取到并存储在本地的所有票的指标K线字典,并赋值给data_cache全局缓存
|
if os.path.exists(constant.K_BARS_PATH):
|
with open(constant.K_BARS_PATH, 'r',
|
encoding='utf-8') as f:
|
data_cache.all_stocks_all_K_line_property_dict = json.load(f)
|
# print(
|
# f"data_cache.all_stocks_all_K_line_property_dict的个数{len(data_cache.all_stocks_all_K_line_property_dict)}")
|
|
else:
|
check_today_date = check_data_date(data_cache.DataCache().today_date)
|
if check_today_date is True:
|
print(f"到了【18:31】 但 既有数据 【是】 本交易日数据 【不执行拉取K线】 ε=ε=ε=ε=ε=ε=ε=ε=ε=ε=ε=ε=ε(/’-‘)/")
|
else:
|
print(
|
f"到了【18:31】 且 既有数据 【不是】 本交易日数据 执行拉取K线 【既有数据日期:{data_cache.all_stocks_all_K_line_property_dict['SZSE.000034'][0]['bob'][:10]} 本交易日:{data_cache.DataCache().today_date}】")
|
# 调用指标K线写入本地文件
|
all_stocks_all_k_line_dict_write()
|
# 读取已经获取到并存储在本地的所有票的指标K线字典,并赋值给data_cache全局缓存
|
if os.path.exists(constant.K_BARS_PATH):
|
with open(constant.K_BARS_PATH, 'r',
|
encoding='utf-8') as f:
|
data_cache.all_stocks_all_K_line_property_dict = json.load(f)
|
# print(
|
# f"data_cache.all_stocks_all_K_line_property_dict的个数{len(data_cache.all_stocks_all_K_line_property_dict)}")
|
except Exception as error:
|
logger.exception(error)
|
print(f"实时检测是否拉取K线线程报错An error occurred: {error}")
|
finally:
|
current_time = datetime.datetime.now()
|
# 使用strf time来格式化时间为字符串,只显示时分秒
|
formatted_time = current_time.strftime("%H:%M:%S")
|
# print(f"当前-时间:{formatted_time}")
|
time.sleep(1)
|
|
|
# 历史K线累计涨停天数函数
|
def count_limit_up_day(k_line_data):
|
limit_up_day = 0 # 初始化涨停天数
|
# 计算90天内涨停天数
|
for i in k_line_data:
|
if 'attribute' in i and (
|
i['attribute'] == 'one_line_limit_up' or i['attribute'] == 'limit_down_then_limit_up' or i['attribute'] == 'limit_up_then_limit_down_then_limit_up' or i['attribute'] == 'limit_up'):
|
limit_up_day += 1
|
# print(f"涨停提日期:{i['bob']}")
|
# print(f"涨停天数----limit_up_day======{limit_up_day}")
|
return limit_up_day
|
|
|
# 只在本文件内测试时开启调用
|
# all_stocks_all_k_line_dict_write()
|
|
|
# 原始K线管理器
|
class HistoryKDataManager:
|
__instance = None
|
__db = 0
|
__history_k_day_datas = {}
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(HistoryKDataManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_data()
|
return cls.__instance
|
|
@classmethod
|
def __load_data(cls):
|
pass
|
|
def __get_cache_dir(self):
|
"""
|
获取缓存路径
|
@return:
|
"""
|
dir_path = f"{constant.get_path_prefix()}/datas/k_bars"
|
if not os.path.exists(dir_path):
|
os.makedirs(dir_path)
|
return dir_path
|
|
def __del_outdate_datas(self, code):
|
dir_path = self.__get_cache_dir()
|
datas = []
|
for root, dirs, files in os.walk(dir_path):
|
for file in files:
|
# 输出文件的绝对路径
|
if file.find(code) < 0:
|
continue
|
path_ = os.path.join(root, file)
|
day = file.split("_")[0]
|
datas.append((day, path_))
|
# 保存最新的一条数据
|
if datas:
|
datas.sort(key=lambda x: int(x[0].replace("-", "")), reverse=True)
|
datas = datas[1:]
|
for d in datas:
|
os.remove(d[1])
|
|
def save_history_bars(self, code, day, datas, force=False):
|
"""
|
保存历史K线
|
@param code: 代码
|
@param day: K线最新的日期(取datas最新一条数据的日期)
|
@param datas: 数据
|
@return:
|
"""
|
cache_dir = self.__get_cache_dir()
|
file_name = f"{day}_{code}.txt"
|
path_str = f"{cache_dir}/{file_name}"
|
if os.path.exists(path_str) and not force:
|
return
|
if day not in self.__history_k_day_datas:
|
self.__history_k_day_datas[day] = {}
|
if datas:
|
self.__history_k_day_datas[day][code] = datas
|
# 将日期格式化
|
fdatas = []
|
for d in datas:
|
dd = copy.deepcopy(d)
|
for k in dd:
|
if type(dd[k]) == datetime.datetime:
|
dd[k] = dd[k].strftime("%Y-%m-%d %H:%M:%S")
|
fdatas.append(dd)
|
with open(path_str, encoding="utf-8", mode='w') as f:
|
f.write(f"{fdatas}")
|
self.__del_outdate_datas(code)
|
|
def get_history_bars(self, code, day):
|
"""
|
获取历史K线
|
@param code:
|
@param day:
|
@return:
|
"""
|
if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]:
|
return self.__history_k_day_datas[day][code]
|
cache_dir = self.__get_cache_dir()
|
file_name = f"{day}_{code}.txt"
|
path_str = f"{cache_dir}/{file_name}"
|
if not os.path.exists(path_str):
|
return None
|
with open(path_str, encoding="utf-8", mode='r') as f:
|
line = f.readline()
|
if line:
|
datas = eval(line)
|
# 将日期格式转为datetime
|
for d in datas:
|
for k in d:
|
if type(d[k]) == str and d[k].find("-") > 0 and d[k].find(":") > 0 and d[k].find(" ") > 0:
|
d[k] = datetime.datetime.strptime(d[k], "%Y-%m-%d %H:%M:%S")
|
return datas
|
return None
|
|
def get_pre_close(self, code, day):
|
"""
|
获取之前的收盘价
|
@param code:
|
@param day:
|
@return:
|
"""
|
if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]:
|
return self.__history_k_day_datas[day][code][0]["close"]
|
return None
|
|
def get_history_bars_codes(self, day):
|
"""
|
获取某一天的历史K线的代码数据
|
@param day:
|
@return: 代码集合
|
"""
|
|
dir_path = self.__get_cache_dir()
|
codes = set()
|
for root, dirs, files in os.walk(dir_path):
|
for file in files:
|
# 输出文件的绝对路径
|
if file.find(day) >= 0:
|
codes.add(file.split("_")[1][:6])
|
return codes
|