admin
2025-01-15 f84dcf456dbfa318f490d6cf878be5d5d5262718
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# 分时量分析,即情绪面(针对瞬时行情信息进行获取与分析)
# 计划将 current 和 subscribe(行情订阅) 线程 放在这里
 
# import json
import logging
import time
import datetime
# 引入掘金API
# 引入华鑫API(小辉整理)
from strategy import l1_data_api
from strategy import data_cache
from strategy import basic_methods
from strategy import buying_strategy
from strategy import selling_strategy
from strategy.logging_config import get_logger
# from low_suction.shared_memory_util import SharedMemoryObj
 
# 获取logger实例
logger = get_logger()
 
 
# 获取tick数据
def on_tick(context, tick):
    pass
    __process(context, tick)
    print(f"启动on_tick")
    print(f"tick-===={tick}")
 
 
# 驱动调用所有以current数据为核心策略的函数
def __process(context, current_data):
    pass
    # print(f"current_data===={current_data}")
    # 调用交易策略模块中的涨幅视界策略
    # buying_strategy.growth_horizon_strategy(context, current_data)
    # selling_strategy.instantaneous_increase_strategy(context, current_data)
 
 
# 获取掘金current数据
def get_current_data():
    logging.info(f"get_current_data进入")
    while True:
        try:
            now_start = time.time()
            current_datas = current(symbols=data_cache.DataCache.min_stocks,
                                    fields='open,high,low,symbol,price,created_at,cum_volume,cum_amount,last_volume,quotes')
            # current_datas = l1_data_api.get_current_info()
            # print(f"l1_data_current_datas===={current_datas}")
            # print(f"current_datas====={current_datas}")
            now_end = time.time()
            now_start_to_end = now_end - now_start
            # index = 0
            for current_data in current_datas:
                # index+=1
                try:
                    __process(data_cache.context, current_data)
                    # if index == 100:
                    #     print(current_data)
                    # print(f"第几个{index}")
                except Exception as error:
                    logging.exception(error)
                    print("异常:", current_data)
            print(f"运行中=={round(now_start_to_end, 2)}秒")
            # logger.info(f"运行中=={round(now_start_to_end, 2)}秒")
        except Exception as error:
            logging.exception(error)
        finally:
            time.sleep(1)
 
 
"""
另外创建一个线程来驱动拉取华鑫l1数据 给各个策略模块
"""
 
 
# 调用所有以current信息为核心策略的函数
def strategic_thread_manager(context, current_info):
    if current_info is not None:
        # 调用交易策略模块中的涨幅视界策略
        buying_strategy.growth_view_strategy(context, current_info)
        selling_strategy.instantaneous_change_strategy(context, current_info)
        # pass
 
 
# 生成所有个股的开盘价字典
def get_all_stocks_current_open(current_infos):
    pass
    # 获取当前时间
    now_time = datetime.datetime.now().strftime("%H:%M:%S")
    # 如果当前时间大于09:25:30才运行最高价和最低价的运算
    if data_cache.later_open_bidding_time < now_time:
        # if data_cache.after_closing_time < now_time:
        if now_time < data_cache.opening_time and data_cache.record_current_open_execution is False:
            # if now_time < data_cache.after_closing_time:
            logger.info(f"在开盘前启动,采用【华鑫数据】记录 开盘价")
            data_cache.record_current_open_execution = True
            # print(f"current_info=={current_infos}")
            for current_info in current_infos:
                # 检查股票是否已经在data_cache中
                # if current_info[0] not in data_cache.all_stocks_current_open:
                symbol = basic_methods.format_stock_symbol(current_info[0])
                data_cache.all_stocks_current_open[symbol] = {'current_open': current_info[2]}
            # print(f"data_cache.all_stocks_current_open=={data_cache.all_stocks_current_open}")
            # json_data = data_cache.all_stocks_current_open
            # 将转换后的JSON字符串写入文件(目前考虑取消数据存储本地)
            # with open('local_storage_data/all_stocks_current_open.json', 'w', encoding='utf-8') as f:
            #     # 将字典转换为 JSON 格式的字符串
            #     json_data = json.dumps(data_cache.all_stocks_current_open, ensure_ascii=False, indent=4)
            #     f.write(json_data)
        else:
            # 如果没有在规定时间内运行成功,采用掘金数据(只判断一次)
            if data_cache.record_current_open_execution is False:
                # logger.info(f"【没有】在集合竞价内启动,采用【掘金数据】记录")
                print(f"【没有】在开盘前内启动,采用【掘金数据】记录 开盘价")
                data_cache.record_current_open_execution = True
                current_datas = current(symbols=data_cache.DataCache.filtered_stocks, fields='symbol,open')
                # print(f"current_datas=={current_datas}")
                for current_data in current_datas:
                    # print(f"current_data=={current_data}")
                    # 检查股票是否已经在data_cache中
                    # if current_data[0] not in data_cache.all_stocks_current_open:
                    data_cache.all_stocks_current_open[current_data['symbol']] = {'current_open': current_data['open']}
                # 将转换后的JSON字符串写入文件(目前考虑取消数据存储本地)
                # with open('local_storage_data/all_stocks_current_open.json', 'w', encoding='utf-8') as f:
                #     # 将字典转换为 JSON 格式的字符串
                #     json_data = json.dumps(data_cache.all_stocks_current_open, ensure_ascii=False, indent=4)
                #     f.write(json_data)
 
 
# 构建一个跟踪最高价和最低价的对象(价格跟踪器)
class PriceTracker:
    # 初始化 当前最新价、最高价、最低价
    def __init__(self, initial_price):
        self.current_high = initial_price
        self.current_low = initial_price
 
    def set_high_and_low_price(self, high, low):
        self.current_high = high
        self.current_low = low
 
    # 将最新价分别传给 最高价和最低价函数
    def set_current_price(self, new_price):
        self.update_and_get_high(new_price)
        self.update_and_get_low(new_price)
 
    # 构建计算最高价函数
    def update_and_get_high(self, new_price):
        if new_price > self.current_high:
            self.current_high = new_price
        return self.current_high  # 返回新高
        # 如果价格没有变化,则不返回任何值(或可以选择返回None)
 
    # 构建计算最低价函数
    def update_and_get_low(self, new_price):
        if new_price < self.current_low:
            self.current_low = new_price
        return self.current_low  # 返回新低
        # 如果价格没有变化,则不返回任何值(或可以选择返回None)
 
 
# 构建从缓存中查询获取开盘价的函数
def get_symbol_current_open(symbol):
    if data_cache.all_stocks_current_open.get(symbol) is not None:
        return data_cache.all_stocks_current_open[symbol]['current_open']
    else:
        return None
 
 
# 对一个计算出的最高价或最低价 进行初始化
__current_high_or_low_dict = {}
# 对一个计算出的最高价和最低价 进行初始化
__current_high_and_low_dict = {}
 
 
# 生成所有个股的最高价、最低价 字典
def get_all_stocks_current_high_and_low(current_infos):
    # 获取当前时间
    now_time = datetime.datetime.now().strftime("%H:%M:%S")
    # 如果当前时间大于09:25:30才运行最高价和最低价的运算
    if data_cache.later_open_bidding_time < now_time:
        # if data_cache.after_closing_time < now_time:
        if data_cache.Local_startup_time < data_cache.opening_time:
            logger.info(f"【在】开盘前启动,采用【华鑫数据】记录 最高最低价")
            # # 如果在9:30前启动,则只采用【华鑫数据】记录 最高最低价
            # while True:
            # print(f"current_info=={current_infos}")
            for current_info in current_infos:
                symbol = basic_methods.format_stock_symbol(current_info[0])  #股票代码
                pre_close = current_info[1]        # 昨日收盘价
                current_price = current_info[2]   # 最新价
                current_quotes_buy_1_price = current_info[5][0][0]   #买一价格
                price_tracker = __current_high_or_low_dict.get(symbol)
                if not price_tracker:
                    # 赋初值
                    price_tracker = PriceTracker(current_price)
                    __current_high_or_low_dict[symbol] = price_tracker
                get_current_high_or_low = price_tracker
                # print(f"current_price>>>>>>==={current_price}")
                if current_price is not None:
                    # 为避免L1数据中最新价偶发为0,在最新价为0时使用买一价记录
                    if current_price != 0 or current_price != 0.0:
                        # logger.info(
                        #     f"《current_price 不为空 也不为0.0 也不为0 当日当时最新价:{current_price}》")
                        get_current_high_or_low.set_current_price(current_price)
                        data_cache.all_stocks_current_high_and_low[symbol] = {
                            'current_high': get_current_high_or_low.current_high,
                            'current_low': get_current_high_or_low.current_low}
                    elif current_quotes_buy_1_price != 0 or current_quotes_buy_1_price != 0.0:
                        # logger.info(
                        #     f"代码:{symbol}::《current_price 未获取成功 或 值为0.0 零食采用买一价作为最新价,买一价:{current_quotes_buy_1_price}》 ")
                        get_current_high_or_low.set_current_price(current_quotes_buy_1_price)
                        data_cache.all_stocks_current_high_and_low[symbol] = {
                            'current_high': get_current_high_or_low.current_high,
                            'current_low': get_current_high_or_low.current_low}
                    elif pre_close != 0 or pre_close != 0.0:
                        logger.info(
                            f"最新价和买一价获取失败或有误,获取到的当日当时最新价:{current_price},买一价:{current_quotes_buy_1_price}》,临时性采用昨收价作为最新价,昨收价:{pre_close}")
                        get_current_high_or_low.set_current_price(pre_close)
                        data_cache.all_stocks_current_high_and_low[symbol] = {
                            'current_high': get_current_high_or_low.current_high,
                            'current_low': get_current_high_or_low.current_low}
        else:
            logger.info(f"【没有】开盘前启动,采用【掘金数据】初始化 最高价最低价,采用【华鑫数据】更新 最高最低价")
            # print(f"当前时间更新时间:{now_time}")
            if not __current_high_or_low_dict:
                # 还没初始化
                current_datas = current(symbols=data_cache.DataCache.filtered_stocks, fields='symbol,high,low')
                # print(f"current_datas=={current_datas}")
                for current_data in current_datas:
                    symbol, high, low = current_data['symbol'], current_data['high'], current_data['low']
                    __current_high_or_low_dict[symbol] = PriceTracker(0)
                    __current_high_or_low_dict[symbol].set_high_and_low_price(high, low)
            # print(f"完成掘金初始化:{now_time}")
            for current_info in current_infos:
                # print(f"开始循环current_infos")
                symbol = basic_methods.format_stock_symbol(current_info[0])
                if symbol not in data_cache.DataCache.filtered_stocks:
                    continue
                # if symbol.find("300810") > 0:
                #     print(f"开始循环current_infos:"+symbol)
                current_price = current_info[2]
 
                price_track_manage = __current_high_or_low_dict.get(symbol)
                if not price_track_manage:
                    # 初始化
                    current_datas = current(symbols=[symbol], fields='symbol,high,low')
                    current_data = current_datas[0]
                    # print(f"开始实例化对象")
                    symbol, high, low = current_data['symbol'], current_data['high'], current_data['low']
                    price_track_manage = PriceTracker(0)
                    __current_high_or_low_dict[symbol] = price_track_manage
                    __current_high_or_low_dict[symbol].set_high_and_low_price(high, low)
 
                if current_price is not None:
                    price_track_manage.set_current_price(current_price)
                    # print(f"开始更新最低价和最高价")
                    data_cache.all_stocks_current_high_and_low[symbol] = {
                        'current_high': price_track_manage.current_high,
                        'current_low': price_track_manage.current_low}
                    # print(f"data_cache.all_stocks_current_high_and_low[symbol]==={data_cache.all_stocks_current_high_and_low[symbol]}")
            # print(f"完成华鑫初始化:{now_time}")
 
 
# 构建获取个股记录下来的实时当日最高价函数
def get_symbol_current_high(symbol):
    if data_cache.all_stocks_current_high_and_low.get(symbol) is not None:
        return data_cache.all_stocks_current_high_and_low[symbol]['current_high']
    else:
        return None
 
 
# 构建获取个股记录下来的实时当日最低价函数
def get_symbol_current_low(symbol):
    if data_cache.all_stocks_current_high_and_low.get(symbol) is not None:
        return data_cache.all_stocks_current_high_and_low[symbol]['current_low']
    else:
        return None
 
 
def process_current_infos(current_infos):
    """
    处理现价
    :param current_infos:
    :return:
    """
    now_start = time.time()
    get_all_stocks_current_open(current_infos)
    get_all_stocks_current_high_and_low(current_infos)
    for current_info in current_infos:
        try:
            if current_info is not None:
                strategic_thread_manager(None, current_info)
        except Exception as error:
            logging.exception(error)
            print("异常:", current_info)
    now_end: float = time.time()
    start_to_end = now_end - now_start
    print(f"运行中=={round(start_to_end, 2)} 秒")
 
 
 
# 仅仅用于测试数据进入策略后的数据情况
# get_current_info()