# coding=utf-8
|
from __future__ import print_function, absolute_import, unicode_literals
|
|
import copy
|
import time
|
import datetime
|
import json
|
from gm.api import *
|
|
# 引入基础算法模块
|
import basic_methods
|
|
now = time.time()
|
print(f"K_line开始运行--{now}")
|
|
# 赋值账户ID
|
account_id = 'aaee2221-839c-11ee-a7cd-00163e022aa6'
|
# 设置token
|
set_token("6c1dbe95191fb77cced9d805cb9c853805551ddb")
|
def init_data():
|
global today_date,Pre_trading_day,Double_Pre_trading_day,Next_trading_day,all_stocks,filtered_stocks
|
|
# 今日时间初始化
|
now = datetime.datetime.now() # 获取本机时间
|
# print(f"本机时间::{now}")
|
today_date = now.strftime('%Y-%m-%d') # 获取今日日期参数
|
# print(f"今日日期{today_date}")
|
Pre_trading_day = get_previous_trading_date(exchange='SHSE', date=today_date) # 获取前一个交易日
|
Double_Pre_trading_day = get_previous_trading_date(exchange='SHSE', date=Pre_trading_day) # 获取上上一个交易日
|
Next_trading_day = get_next_trading_date(exchange='SHSE', date=today_date) # 获取下一个交易日
|
# print(f"前一个交易日{Pre_trading_day}")
|
# print(f"上上一个交易日{Double_Pre_trading_day}")
|
# print(f"下一个交易日{Next_trading_day}")
|
|
# 获取A股市场(包含沪深两市)的股票列表排除ST,停牌 上交所 SHSE.600000 深交所 SZSE.000000 biaodi = ['SHSE.603839', 'SZSE.002855']
|
all_stocks = get_instruments(exchanges='SZSE,SHSE', sec_types=1, fields='symbol', df=1, skip_suspended=True, skip_st=True)['symbol'].tolist()
|
# print("A股所有代码数量:", len(all_stocks))
|
|
# 只要上证A股和深证A股
|
filtered_stocks = [stock for stock in all_stocks if stock.startswith('SHSE.60') or (stock.startswith('SZSE.00'))]
|
# print(f"过滤后上证A股和深证A股数量:{len(filtered_stocks)}")
|
|
init_data()
|
|
class K_Property:
|
def __init__(self, max_continue_count=0):
|
self.max_continue_count = max_continue_count
|
|
|
# 90个交易日历史K线类对象
|
class K_line_history:
|
history_k_dict = {}
|
property_k_dict = {}
|
|
def __init__(self, today_date,Next_trading_day, symbols):
|
self.today_date = today_date # 定义属性today_date
|
self.Next_trading_day = Next_trading_day
|
self.symbols = symbols
|
|
# 2.获取90个交易日期或者获取90个交易日期前的具体日期,为历史K线的start_time获取数据
|
def K_line_history_90day(self):
|
# 为了解决步长老是差一个,所以设定为92
|
start_time_data = basic_methods.pre_num_trading_day(self.today_date, 90)
|
# print(f"90个交易日前日期=========={start_time_data}")
|
# logger.info("获取历史K线===开始")
|
for symbol in self.symbols:
|
# try:
|
# now = time.time()
|
# logger.info("历史K线开始:{}", symbol)
|
K_line_history_90day_data = history(symbol=symbol, frequency='1d', start_time=start_time_data,
|
end_time=self.Next_trading_day,
|
fields='open,high,close,low,cum_volume,cum_amount,bob',
|
adjust=ADJUST_PREV, df=False)
|
# logger.info("历史K线耗时:{}-{}", symbol,round(time.time() - now,2))
|
self.history_k_dict[symbol] = K_line_history_90day_data
|
# except:
|
# logging.error("获取历史K线错误")
|
# logger.info("获取历史K线===结束")
|
|
|
|
# 3.获取90个交易日内标的股票的K线指标 按照需求应该在current里面调用,所以这里正常情况会注释掉,直接在这里用也用不了,只是演示思维过程!!!!
|
def get_property_Limit_mark(self,it_K_line):
|
for i in range(0, len(it_K_line) - 1):
|
previous_close = it_K_line[i + 1]['close'] # 昨日收盘价
|
previous_high = it_K_line[i + 1]['high'] # 昨日最高价
|
top_price = basic_methods.limit_up_price(previous_close) # 计算出的当天涨停价
|
top_price = float(top_price) # 将涨停价转化为浮点
|
down_price = basic_methods.limit_down_price(previous_close) # 计算出的当天跌停价
|
down_price = float(down_price) # 将跌停价转化为浮点
|
current_open = round(it_K_line[i]['open'], 2) # 当日开盘价
|
current_high = round(it_K_line[i]['high'], 2) # 当日最高价
|
current_close = round(it_K_line[i]['close'], 2) # 当日收盘价
|
current_low = round(it_K_line[i]['low'], 2) # 当日最低价
|
current_today_volume = it_K_line[i]['volume']
|
current_yesterday_volume = it_K_line[i + 1]['volume']
|
current_today_amount = it_K_line[i]['amount']
|
current_today_growth = basic_methods.intraday_growth(current_close, previous_close)
|
|
|
if current_today_growth != None:
|
new_item = {'today_growth': current_today_growth}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 当日涨幅 {current_today_growth}") # 打印出当前行的'bob'
|
|
if round(current_today_volume/current_yesterday_volume,2) > 1.25:
|
# print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【放量】")
|
if current_today_growth > 0 :
|
new_item = {'today_volume_shape': 'increases_up'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 【放量上涨】")
|
elif current_today_growth < 0 :
|
new_item = {'today_volume_shape': 'increases_down'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 【放量下跌】")
|
else:
|
new_item = {'today_volume_shape': 'increases_balance'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 【放量平收】")
|
|
elif round(current_today_volume/current_yesterday_volume,2) < 1.25:
|
# print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【缩量】")
|
if current_today_growth > 0 :
|
new_item = {'today_volume_shape': 'decreases_up'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 【缩量上涨】")
|
elif current_today_growth < 0 :
|
new_item = {'today_volume_shape': 'decreases_down'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 【缩量下跌】")
|
else:
|
new_item = {'today_volume_shape': 'decreases_balance'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 【缩量平收】")
|
else:
|
# print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【平量】")
|
if current_today_growth > 0 :
|
new_item = {'today_volume_shape': 'remained_up'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 【平量上涨】")
|
elif current_today_growth < 0 :
|
new_item = {'today_volume_shape': 'remained_down'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 【平量下跌】")
|
else:
|
new_item = {'today_volume_shape': 'remained_balance'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 【平量平收】")
|
|
if current_open - previous_close > 0:
|
# print(f"i=={i} {it_K_line[i]['bob']} 成交总量:{today_volume},,,成交总金额:{today_amount}")
|
chazhi = current_close - previous_close
|
if current_close - current_open > 0:
|
new_item = {'attribute': 'up_up'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 高开高走 【昨收价:{previous_close} 收盘价:{current_close}】") # 打印出当前行的'bob'
|
else:
|
new_item = {'attribute': 'up_down'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 高开低走 【昨收价:{previous_close} 收盘价:{current_close}】") # 打印出当前行的'bob'
|
else:
|
if current_close - current_open > 0:
|
new_item = {'attribute': 'up_up'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 低开高走 【昨收价:{previous_close} 收盘价:{current_close}】") # 打印出当前行的'bob'
|
else:
|
new_item = {'attribute': 'up_down'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 低开低走 【昨收价:{previous_close} 收盘价:{current_close}】") # 打印出当前行的'bob'
|
|
if abs(current_high - top_price) < 0.001 and abs(current_close - top_price) < 0.001:
|
# print(f'整个列表更新了吗 ? it_K_line==={it_K_line}')
|
if abs(current_open - top_price) < 0.001 and abs(top_price - current_low) < 0.01:
|
new_item = {'attribute': 'one_line_limit_up'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 一字涨停板 ") # 打印出当前行的'bob'
|
elif abs(current_low - down_price) < 0.01:
|
new_item = {'attribute': 'limit_down_then_limit_up'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} [准]地天板") # 打印出当前行的'bob'
|
elif abs(current_open - top_price) < 0.001 and abs(current_low - down_price) < 0.01:
|
new_item = {'attribute': 'limit_up_then_limit_down_then_limit_up'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 一字涨停天地天板") # 打印出当前行的'bob'
|
else:
|
new_item = {'attribute': 'limit_up'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 涨停板") # 打印出当前行的'bob'
|
|
if abs(current_high - top_price) < 0.001 and abs(current_close - top_price) >= 0.001 and abs(current_close - down_price) >= 0.001:
|
new_item = {'attribute': 'frying_plate'}
|
it_K_line[i].update(new_item)
|
# print(f'在炸板日的列表中更新了对应的字典的键对值吗 ? it_K_line[i]==={it_K_line[i]}')
|
# print(f'整个列表更新了吗 ? it_K_line==={it_K_line}')
|
if abs(previous_close - previous_high) >= 0.001:
|
new_item = {'attribute': 'first_frying_plate'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 首板炸板") # 打印出当前行的'bob'
|
elif abs(current_open - top_price) < 0.01:
|
new_item = {'attribute': 'one_line_limit_up_then_frying_plate'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 一字天炸") # 打印出当前行的'bob'
|
elif abs(down_price - top_price) < 0.01 and abs(current_low - down_price) < 0.01:
|
new_item = {'attribute': 'one_line_limit_up_then_frying_plate_then_limit_down'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 一字天地") # 打印出当前行的'bob'
|
else:
|
new_item = {'attribute': 'not_first_frying_plate'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 非首板炸板") # 打印出当前行的'bob'
|
|
if abs(current_low - down_price) < 0.001:
|
if abs(current_close - down_price) < 0.001:
|
if abs(current_high - top_price) < 0.001:
|
new_item = {'attribute': 'limit_up_then_limit_down'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} [准]天地板") # 打印出当前行的'bob'
|
elif abs(current_open - down_price) < 0.001 and abs(current_high - down_price) < 0.01:
|
new_item = {'attribute': 'one_line_limit_down'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 一字跌停板") # 打印出当前行的'bob'
|
else:
|
new_item = {'attribute': 'limit_down'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 跌停板") # 打印出当前行的'bob'
|
elif abs(current_close - top_price) >= 0.001:
|
new_item = {'attribute': 'touch_limit_down'}
|
it_K_line[i].update(new_item)
|
# print(f"i=={i} {it_K_line[i]['bob']} 触及跌停") # 打印出当前行的'bob'
|
|
def __get_property(self, symbol):
|
if symbol in self.property_k_dict:
|
return self.property_k_dict[symbol]
|
self.property_k_dict[symbol] = K_Property()
|
return self.property_k_dict[symbol]
|
|
|
def get_continue_count(self, symbol):
|
p = self.__get_property(symbol)
|
ks = self.history_k_dict[symbol]
|
if not ks:
|
return 0
|
p.max_continue_count = 10
|
ks_new = copy.deepcopy(ks)
|
ks_new.reverse()
|
print(f"ks_new==={ks_new}")
|
|
|
|
# history_data = history(symbol='SHSE.600000', frequency='1d', start_time='2024-03-28', end_time='2024-04-09', fields='open, close, low, high, eob', adjust=ADJUST_PREV, df= True)
|
# print(f"history_data==={history_data}")
|
# history_n_data = history_n(symbol='SHSE.600988', frequency='1d', count=1, end_time='2024-04-04', fields='symbol, open, close, low, high, eob', adjust=ADJUST_PREV, df=True)
|
# print(f"{history_n_data}")
|
# print(f"Next_trading_day={Next_trading_day}")
|
|
|
|
|
# 在初始化函数里面就实例化上证A股和深证A股的历史K线方法
|
k_line_history = K_line_history(today_date,Next_trading_day, filtered_stocks)
|
k_line_history.K_line_history_90day()
|
# 写入全股票90天K线
|
def all_stocks_all_K_line_list_write():
|
# 初始化所有个股的K线列表
|
all_stocks_all_K_line_list = []
|
|
for i in filtered_stocks:
|
its_k_line = k_line_history.history_k_dict[i] # 获取标的K线
|
its_k_line_copy = copy.deepcopy(its_k_line) # 浅拷贝标的K线
|
it_K_line = list(reversed(its_k_line_copy)) # 反转标的K线
|
k_line_history.get_property_Limit_mark(it_K_line) # 给标的的K线更新极限指标属性
|
it_90day_K_line_dict = {
|
'symbol': i, # 添加股票代码
|
# 'sec_name': sec_name, #添加公司名称
|
'it_K_line_property': it_K_line, # 添加更新极限指标属性的K线
|
}
|
all_stocks_all_K_line_list.append(it_90day_K_line_dict)
|
# print(f'it_90day_K_line_dict==={it_90day_K_line_dict}')
|
|
from datetime import datetime
|
|
def convert_datetime(obj):
|
if isinstance(obj, datetime):
|
return obj.strftime('%Y-%m-%d %H:%M:%S') # 转换为字符串
|
elif isinstance(obj, dict):
|
return {k: convert_datetime(v) for k, v in obj.items()} # 递归处理字典
|
elif isinstance(obj, list):
|
return [convert_datetime(element) for element in obj] # 递归处理列表
|
# 可以添加其他类型的处理逻辑
|
else:
|
# 对于未知类型,你可以选择保留原样、跳过或引发异常
|
# 这里我们选择保留原样
|
return obj
|
try:
|
json_data = json.dumps(convert_datetime(all_stocks_all_K_line_list), ensure_ascii=False, indent=4)
|
# 将转换后的JSON字符串写入文件
|
with open('local_storage_data/all_stocks_all_K_line_list.json', 'w', encoding='utf-8') as f:
|
f.write(json_data)
|
except Exception as e:
|
print(f"An error occurred while converting the data to JSON: {e}")
|
# now = datetime.datetime.now() # 获取本机时间
|
print(f"历史k线写完了!{now}")
|
|
# # 预设(提前一日获取K线)K线获取时间定时执行函数 (默认设定为15:15)
|
# def check_time():
|
# # now = global_context.now # 获取当前时间
|
# now = datetime.datetime.now()
|
# hour = now.hour
|
# minute = now.minute
|
# #
|
# if hour > 10 or (hour == 10 and minute >= 15):
|
# print(f"K线写入时间到了:{now}")
|
# all_stocks_all_K_line_list_write()
|
# return False # 返回False,表示结束循环
|
# else:
|
# return True # 返回True,表示继续循环
|
#
|
# # 使用while循环和check_time函数检查时间,并设置固定的时间间隔
|
# while check_time():
|
# time.sleep(1)
|
|
|
|
# 为定时写入预设K线单独开一个进程
|
# def all_stocks_all_K_line_list_write_process():
|
# while True:
|
# try:
|
# now = time.time()
|
# print(f"预设写入K线进程开始{now}")
|
# all_stocks_all_K_line_list_write()
|
# print(f"预设写入K线进程完成一次{now}")
|
# except Exception as e:
|
# print(f"An error occurred: {e}")
|
# finally:
|
# time.sleep(600)
|
|
|
# # 进行最大17次五日内涨幅计算,并暂存结果,已完成90日k线涨幅的分段切片
|
# five_day_growth_list = []
|
# for i in range(0, len(it_K_line) - 5, 5): # 减去4以确保不会超出索引范围
|
# five_day_growth = sum(it_K_line[j]['today_growth'] for j in range(i, i + 5))
|
# five_day_growth_list.append(round(five_day_growth, 2))
|
# print(f"five_day_growth_list == {five_day_growth_list}")
|
# 进行最大17次五日内涨幅计算,并暂存结果,已完成90日k线涨幅的分段切片
|
# five_day_growth_average_list = []
|
# for i in range(0, len(it_K_line) - 5, 5): # 减去4以确保不会超出索引范围
|
# five_day_growth_average = sum(it_K_line[j]['today_growth'] for j in range(i, i + 5))/5
|
# five_day_growth_average_list.append(round(five_day_growth_average, 2))
|
# print(f"five_day_growth_average_list == {five_day_growth_average_list}")
|
|
# it_K_line = k_line_history.get_continue_count(current_data['symbol'])
|
# print(f'it_K_line===={it_K_line}')
|
|
|
all_stocks_all_K_line_list_write()
|
|
# all_stocks_all_K_line_list_write_process()
|