lhr
2024-06-21 09414b22094c35993f331f25ec104b5f74a6d91f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# coding=utf-8
from __future__ import print_function, absolute_import, unicode_literals
 
import copy
import time
import datetime
import json
from gm.api import *
 
# 引入基础算法模块
import basic_methods
 
now = time.time()
print(f"K_line开始运行--{now}")
 
# 赋值账户ID
account_id = 'aaee2221-839c-11ee-a7cd-00163e022aa6'
# 设置token
set_token("6c1dbe95191fb77cced9d805cb9c853805551ddb")
def init_data():
    global today_date,Pre_trading_day,Double_Pre_trading_day,Next_trading_day,all_stocks,filtered_stocks
 
    # 今日时间初始化
    now = datetime.datetime.now()  # 获取本机时间
    # print(f"本机时间::{now}")
    today_date = now.strftime('%Y-%m-%d')  # 获取今日日期参数
    # print(f"今日日期{today_date}")
    Pre_trading_day = get_previous_trading_date(exchange='SHSE', date=today_date)  # 获取前一个交易日
    Double_Pre_trading_day = get_previous_trading_date(exchange='SHSE', date=Pre_trading_day)  # 获取上上一个交易日
    Next_trading_day = get_next_trading_date(exchange='SHSE', date=today_date)  # 获取下一个交易日
    # print(f"前一个交易日{Pre_trading_day}")
    # print(f"上上一个交易日{Double_Pre_trading_day}")
    # print(f"下一个交易日{Next_trading_day}")
 
    # 获取A股市场(包含沪深两市)的股票列表排除ST,停牌    上交所 SHSE.600000   深交所 SZSE.000000    biaodi = ['SHSE.603839', 'SZSE.002855']
    all_stocks = get_instruments(exchanges='SZSE,SHSE', sec_types=1, fields='symbol', df=1, skip_suspended=True, skip_st=True)['symbol'].tolist()
    # print("A股所有代码数量:", len(all_stocks))
 
    # 只要上证A股和深证A股
    filtered_stocks = [stock for stock in all_stocks if stock.startswith('SHSE.60') or (stock.startswith('SZSE.00'))]
    # print(f"过滤后上证A股和深证A股数量:{len(filtered_stocks)}")
 
init_data()
 
class K_Property:
    def __init__(self, max_continue_count=0):
        self.max_continue_count = max_continue_count
 
 
# 90个交易日历史K线类对象
class K_line_history:
    history_k_dict = {}
    property_k_dict = {}
 
    def __init__(self, today_date,Next_trading_day, symbols):
        self.today_date = today_date  # 定义属性today_date
        self.Next_trading_day = Next_trading_day
        self.symbols = symbols
 
    # 2.获取90个交易日期或者获取90个交易日期前的具体日期,为历史K线的start_time获取数据
    def K_line_history_90day(self):
        # 为了解决步长老是差一个,所以设定为92
        start_time_data = basic_methods.pre_num_trading_day(self.today_date, 90)
        # print(f"90个交易日前日期=========={start_time_data}")
        # logger.info("获取历史K线===开始")
        for symbol in self.symbols:
            # try:
            #     now = time.time()
            #     logger.info("历史K线开始:{}", symbol)
                K_line_history_90day_data = history(symbol=symbol, frequency='1d', start_time=start_time_data,
                                                    end_time=self.Next_trading_day,
                                                    fields='open,high,close,low,cum_volume,cum_amount,bob',
                                                    adjust=ADJUST_PREV, df=False)
                # logger.info("历史K线耗时:{}-{}", symbol,round(time.time() - now,2))
                self.history_k_dict[symbol] = K_line_history_90day_data
            # except:
            #     logging.error("获取历史K线错误")
        # logger.info("获取历史K线===结束")
 
 
 
    # 3.获取90个交易日内标的股票的K线指标  按照需求应该在current里面调用,所以这里正常情况会注释掉,直接在这里用也用不了,只是演示思维过程!!!!
    def get_property_Limit_mark(self,it_K_line):
        for i in range(0, len(it_K_line) - 1):
            previous_close = it_K_line[i + 1]['close']  # 昨日收盘价
            previous_high = it_K_line[i + 1]['high']  # 昨日最高价
            top_price = basic_methods.limit_up_price(previous_close)  # 计算出的当天涨停价
            top_price = float(top_price)  # 将涨停价转化为浮点
            down_price = basic_methods.limit_down_price(previous_close)  # 计算出的当天跌停价
            down_price = float(down_price)  # 将跌停价转化为浮点
            current_open = round(it_K_line[i]['open'], 2)  # 当日开盘价
            current_high = round(it_K_line[i]['high'], 2)  # 当日最高价
            current_close = round(it_K_line[i]['close'], 2)  # 当日收盘价
            current_low = round(it_K_line[i]['low'], 2)  # 当日最低价
            current_today_volume = it_K_line[i]['volume']
            current_yesterday_volume = it_K_line[i + 1]['volume']
            current_today_amount = it_K_line[i]['amount']
            current_today_growth = basic_methods.intraday_growth(current_close, previous_close)
 
 
            if current_today_growth != None:
                new_item = {'today_growth': current_today_growth}
                it_K_line[i].update(new_item)
                # print(f"i=={i}  {it_K_line[i]['bob']} 当日涨幅 {current_today_growth}")  # 打印出当前行的'bob'
 
            if round(current_today_volume/current_yesterday_volume,2) > 1.25:
                # print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【放量】")
                if current_today_growth > 0 :
                    new_item = {'today_volume_shape': 'increases_up'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']}   【放量上涨】")
                elif current_today_growth < 0 :
                    new_item = {'today_volume_shape': 'increases_down'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']}   【放量下跌】")
                else:
                    new_item = {'today_volume_shape': 'increases_balance'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']}   【放量平收】")
 
            elif round(current_today_volume/current_yesterday_volume,2) < 1.25:
                # print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【缩量】")
                if current_today_growth > 0 :
                    new_item = {'today_volume_shape': 'decreases_up'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']}   【缩量上涨】")
                elif current_today_growth < 0 :
                    new_item = {'today_volume_shape': 'decreases_down'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']}   【缩量下跌】")
                else:
                    new_item = {'today_volume_shape': 'decreases_balance'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']}   【缩量平收】")
            else:
                # print(f"i=={i} {it_K_line[i]['bob']} {round(current_today_volume/current_yesterday_volume,2)} 【平量】")
                if current_today_growth > 0 :
                    new_item = {'today_volume_shape': 'remained_up'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']}   【平量上涨】")
                elif current_today_growth < 0 :
                    new_item = {'today_volume_shape': 'remained_down'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']}   【平量下跌】")
                else:
                    new_item = {'today_volume_shape': 'remained_balance'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']}   【平量平收】")
 
            if current_open - previous_close > 0:
                # print(f"i=={i}  {it_K_line[i]['bob']} 成交总量:{today_volume},,,成交总金额:{today_amount}")
                chazhi = current_close - previous_close
                if current_close - current_open > 0:
                    new_item = {'attribute': 'up_up'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']} 高开高走 【昨收价:{previous_close}  收盘价:{current_close}】")  # 打印出当前行的'bob'
                else:
                    new_item = {'attribute': 'up_down'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']} 高开低走 【昨收价:{previous_close}  收盘价:{current_close}】")  # 打印出当前行的'bob'
            else:
                if current_close - current_open > 0:
                    new_item = {'attribute': 'up_up'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']} 低开高走 【昨收价:{previous_close}  收盘价:{current_close}】")  # 打印出当前行的'bob'
                else:
                    new_item = {'attribute': 'up_down'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']} 低开低走 【昨收价:{previous_close}  收盘价:{current_close}】")  # 打印出当前行的'bob'
 
            if abs(current_high - top_price) < 0.001 and abs(current_close - top_price) < 0.001:
                # print(f'整个列表更新了吗 ? it_K_line==={it_K_line}')
                if abs(current_open - top_price) < 0.001 and abs(top_price - current_low) < 0.01:
                    new_item = {'attribute': 'one_line_limit_up'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']} 一字涨停板 ")  # 打印出当前行的'bob'
                elif abs(current_low - down_price) < 0.01:
                    new_item = {'attribute': 'limit_down_then_limit_up'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']} [准]地天板")  # 打印出当前行的'bob'
                elif abs(current_open - top_price) < 0.001 and abs(current_low - down_price) < 0.01:
                    new_item = {'attribute': 'limit_up_then_limit_down_then_limit_up'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']} 一字涨停天地天板")  # 打印出当前行的'bob'
                else:
                    new_item = {'attribute': 'limit_up'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']} 涨停板")  # 打印出当前行的'bob'
 
            if abs(current_high - top_price) < 0.001 and abs(current_close - top_price) >= 0.001 and abs(current_close - down_price) >= 0.001:
                new_item = {'attribute': 'frying_plate'}
                it_K_line[i].update(new_item)
                # print(f'在炸板日的列表中更新了对应的字典的键对值吗 ? it_K_line[i]==={it_K_line[i]}')
                # print(f'整个列表更新了吗 ? it_K_line==={it_K_line}')
                if abs(previous_close - previous_high) >= 0.001:
                    new_item = {'attribute': 'first_frying_plate'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']} 首板炸板")  # 打印出当前行的'bob'
                elif abs(current_open - top_price) < 0.01:
                    new_item = {'attribute': 'one_line_limit_up_then_frying_plate'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']} 一字天炸")  # 打印出当前行的'bob'
                elif abs(down_price - top_price) < 0.01 and abs(current_low - down_price) < 0.01:
                    new_item = {'attribute': 'one_line_limit_up_then_frying_plate_then_limit_down'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']} 一字天地")  # 打印出当前行的'bob'
                else:
                    new_item = {'attribute': 'not_first_frying_plate'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']} 非首板炸板")  # 打印出当前行的'bob'
 
            if abs(current_low - down_price) < 0.001:
                if abs(current_close - down_price) < 0.001:
                    if abs(current_high - top_price) < 0.001:
                        new_item = {'attribute': 'limit_up_then_limit_down'}
                        it_K_line[i].update(new_item)
                        # print(f"i=={i}  {it_K_line[i]['bob']} [准]天地板")  # 打印出当前行的'bob'
                    elif abs(current_open - down_price) < 0.001 and abs(current_high - down_price) < 0.01:
                        new_item = {'attribute': 'one_line_limit_down'}
                        it_K_line[i].update(new_item)
                        # print(f"i=={i}  {it_K_line[i]['bob']} 一字跌停板")  # 打印出当前行的'bob'
                    else:
                        new_item = {'attribute': 'limit_down'}
                        it_K_line[i].update(new_item)
                        # print(f"i=={i}  {it_K_line[i]['bob']} 跌停板")  # 打印出当前行的'bob'
                elif abs(current_close - top_price) >= 0.001:
                    new_item = {'attribute': 'touch_limit_down'}
                    it_K_line[i].update(new_item)
                    # print(f"i=={i}  {it_K_line[i]['bob']} 触及跌停")  # 打印出当前行的'bob'
 
    def __get_property(self, symbol):
        if symbol in self.property_k_dict:
            return self.property_k_dict[symbol]
        self.property_k_dict[symbol] = K_Property()
        return self.property_k_dict[symbol]
 
 
    def get_continue_count(self, symbol):
        p = self.__get_property(symbol)
        ks = self.history_k_dict[symbol]
        if not ks:
            return 0
        p.max_continue_count = 10
        ks_new = copy.deepcopy(ks)
        ks_new.reverse()
        print(f"ks_new==={ks_new}")
 
 
 
# history_data = history(symbol='SHSE.600000', frequency='1d', start_time='2024-03-28',  end_time='2024-04-09', fields='open, close, low, high, eob', adjust=ADJUST_PREV, df= True)
# print(f"history_data==={history_data}")
# history_n_data = history_n(symbol='SHSE.600988', frequency='1d', count=1, end_time='2024-04-04', fields='symbol, open, close, low, high, eob', adjust=ADJUST_PREV, df=True)
# print(f"{history_n_data}")
# print(f"Next_trading_day={Next_trading_day}")
 
 
 
 
# 在初始化函数里面就实例化上证A股和深证A股的历史K线方法
k_line_history = K_line_history(today_date,Next_trading_day, filtered_stocks)
k_line_history.K_line_history_90day()
# 写入全股票90天K线
def all_stocks_all_K_line_list_write():
    # 初始化所有个股的K线列表
    all_stocks_all_K_line_list = []
 
    for i in filtered_stocks:
        its_k_line = k_line_history.history_k_dict[i]  # 获取标的K线
        its_k_line_copy = copy.deepcopy(its_k_line)  # 浅拷贝标的K线
        it_K_line = list(reversed(its_k_line_copy))  # 反转标的K线
        k_line_history.get_property_Limit_mark(it_K_line)  # 给标的的K线更新极限指标属性
        it_90day_K_line_dict = {
            'symbol': i,  # 添加股票代码
            # 'sec_name': sec_name,  #添加公司名称
            'it_K_line_property': it_K_line,  # 添加更新极限指标属性的K线
        }
        all_stocks_all_K_line_list.append(it_90day_K_line_dict)
        # print(f'it_90day_K_line_dict==={it_90day_K_line_dict}')
 
    from datetime import datetime
 
    def convert_datetime(obj):
        if isinstance(obj, datetime):
            return obj.strftime('%Y-%m-%d %H:%M:%S')  # 转换为字符串
        elif isinstance(obj, dict):
            return {k: convert_datetime(v) for k, v in obj.items()}  # 递归处理字典
        elif isinstance(obj, list):
            return [convert_datetime(element) for element in obj]  # 递归处理列表
        # 可以添加其他类型的处理逻辑
        else:
            # 对于未知类型,你可以选择保留原样、跳过或引发异常
            # 这里我们选择保留原样
            return obj
    try:
        json_data = json.dumps(convert_datetime(all_stocks_all_K_line_list), ensure_ascii=False, indent=4)
        # 将转换后的JSON字符串写入文件
        with open('local_storage_data/all_stocks_all_K_line_list.json', 'w', encoding='utf-8') as f:
            f.write(json_data)
    except Exception as e:
        print(f"An error occurred while converting the data to JSON: {e}")
    # now = datetime.datetime.now()  # 获取本机时间
    print(f"历史k线写完了!{now}")
 
    # # 预设(提前一日获取K线)K线获取时间定时执行函数  (默认设定为15:15)
    # def check_time():
    #     # now = global_context.now  # 获取当前时间
    #     now = datetime.datetime.now()
    #     hour = now.hour
    #     minute = now.minute
    #     #
    #     if hour > 10 or (hour == 10 and minute >= 15):
    #         print(f"K线写入时间到了:{now}")
    #         all_stocks_all_K_line_list_write()
    #         return False  # 返回False,表示结束循环
    #     else:
    #         return True  # 返回True,表示继续循环
    #
    # # 使用while循环和check_time函数检查时间,并设置固定的时间间隔
    # while check_time():
    #     time.sleep(1)
 
 
 
# 为定时写入预设K线单独开一个进程
# def all_stocks_all_K_line_list_write_process():
#     while True:
#         try:
#             now = time.time()
#             print(f"预设写入K线进程开始{now}")
#             all_stocks_all_K_line_list_write()
#             print(f"预设写入K线进程完成一次{now}")
#         except Exception as e:
#             print(f"An error occurred: {e}")
#         finally:
#             time.sleep(600)
 
 
# # 进行最大17次五日内涨幅计算,并暂存结果,已完成90日k线涨幅的分段切片
# five_day_growth_list = []
# for i in range(0, len(it_K_line) - 5, 5):  # 减去4以确保不会超出索引范围
#     five_day_growth = sum(it_K_line[j]['today_growth'] for j in range(i, i + 5))
#     five_day_growth_list.append(round(five_day_growth, 2))
# print(f"five_day_growth_list == {five_day_growth_list}")
# 进行最大17次五日内涨幅计算,并暂存结果,已完成90日k线涨幅的分段切片
# five_day_growth_average_list = []
# for i in range(0, len(it_K_line) - 5, 5):  # 减去4以确保不会超出索引范围
#     five_day_growth_average = sum(it_K_line[j]['today_growth'] for j in range(i, i + 5))/5
#     five_day_growth_average_list.append(round(five_day_growth_average, 2))
# print(f"five_day_growth_average_list == {five_day_growth_average_list}")
 
# it_K_line = k_line_history.get_continue_count(current_data['symbol'])
# print(f'it_K_line===={it_K_line}')
 
 
all_stocks_all_K_line_list_write()
 
# all_stocks_all_K_line_list_write_process()