admin
3 天以前 0b0d0e790fec8c7edfdbcab5c31d625e0c2eadd6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# 分时量分析,即情绪面(针对瞬时行情信息进行获取与分析)
# 计划将 current 和 subscribe(行情订阅) 线程 放在这里
 
import json
import logging
import time
import datetime
 
import dask
 
import utils
from log_module import async_log_util
from log_module.log import logger_common, logger_debug
from strategy.trade_setting import TradeSetting
from utils import huaxin_util, tool
# 引入华鑫API(小辉整理)
from strategy import l1_data_api
from strategy import data_cache
from strategy import basic_methods
from strategy import buying_strategy, selling_strategy, market_sentiment_analysis
 
# from low_suction.shared_memory_util import SharedMemoryObj
 
# 获取logger实例
logger = logger_common
 
'''
创建一个函数来对主要指数的实时行情作处理
'''
 
 
# 获取实时指数行情函数
def index_market_current():
    logging.info(f"index_market_trend进入")
    while True:
        if not TradeSetting().get_running():
            # 已经暂停
            time.sleep(1)
            continue
        try:
            # 在data_cache中获取到推送过来的实时指数行情数据
            stock_index_dict = data_cache.stock_index_dict
            now_time = tool.get_now_time_str()
            if len(stock_index_dict) == 0 and data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME:
                print(f"9:15--15:00 实时指数数据为空===={stock_index_dict}")
            index_judge_thread_manager(stock_index_dict)
        except Exception as error:
            logging.exception(error)
        finally:
            time.sleep(1)
 
 
"""
创建一个线程来驱动拉取华鑫l1数据 给各个策略模块
"""
 
 
# 调用所有以current信息为核心策略的函数
def strategic_thread_manager(current_info):
    if current_info is not None:
        # 调用交易策略模块中的涨幅视界策略
        # 买入策略调用
        buying_strategy.growth_view_strategy(current_info)
        # 卖出策略调用
        selling_strategy.instantaneous_change_strategy(current_info)
        # pass
 
 
# 调用以指数行情信息为核心策略的函数
def index_judge_thread_manager(index_market_info):
    if index_market_info is not None:
        # 调用交易策略模块中的涨幅视界策略
        # 指数行情调用
        market_sentiment_analysis.instant_trend_strategy(index_market_info)
 
 
# 生成所有个股的开盘价字典
def get_all_stocks_current_open(current_infos):
    pass
    # 获取当前时间
    now_time = tool.get_now_time_str()
    # 如果当前时间大于09:25:06才运行最高价和最低价的运算
    if data_cache.LATER_OPEN_BIDDING_TIME < now_time:
        # if data_cache.AFTER_CLOSING_TIME < now_time:
        if now_time < data_cache.OPENING_TIME and data_cache.record_current_open_execution is False:
            # if now_time < data_cache.AFTER_CLOSING_TIME:
            logger.info(f"在开盘前启动,采用【华鑫数据】记录 开盘价")
            data_cache.record_current_open_execution = True
            # print(f"current_info=={current_infos}")
            for current_info in current_infos:
                # 检查股票是否已经在data_cache中
                # if current_info[0] not in data_cache.all_stocks_current_open:
                symbol = basic_methods.format_stock_symbol(current_info[0])
                data_cache.all_stocks_current_open[symbol] = {'current_open': current_info[2]}
            # print(f"data_cache.all_stocks_current_open=={data_cache.all_stocks_current_open}")
            # json_data = data_cache.all_stocks_current_open
            # 将转换后的JSON字符串写入文件(目前考虑取消数据存储本地)
            # with open('local_storage_data/all_stocks_current_open.json', 'w', encoding='utf-8') as f:
            #     # 将字典转换为 JSON 格式的字符串
            #     json_data = json.dumps(data_cache.all_stocks_current_open, ensure_ascii=False, indent=4)
            #     f.write(json_data)
        else:
            # 如果没有在规定时间内运行成功,采用掘金数据(只判断一次)
            if data_cache.record_current_open_execution is False:
                # logger.info(f"【没有】在集合竞价内启动,采用【掘金数据】记录")
                print(f"【没有】在开盘前内启动,采用【掘金数据】记录 开盘价")
                data_cache.record_current_open_execution = True
                current_datas = utils.juejin_api.JueJinApi.get_codes_open(data_cache.DataCache().min_stocks,
                                                                          fields='symbol,open')
                # print(f"current_datas=={current_datas}")
                for current_data in current_datas:
                    # print(f"current_data=={current_data}")
                    # 检查股票是否已经在data_cache中
                    # if current_data[0] not in data_cache.all_stocks_current_open:
                    data_cache.all_stocks_current_open[current_data['symbol']] = {'current_open': current_data['open']}
                # 将转换后的JSON字符串写入文件(目前取消数据存储本地,如需存储本地也要放在D:盘路径)
                # with open('local_storage_data/all_stocks_current_open.json', 'w', encoding='utf-8') as f:
                #     # 将字典转换为 JSON 格式的字符串
                #     json_data = json.dumps(data_cache.all_stocks_current_open, ensure_ascii=False, indent=4)
                #     f.write(json_data)
 
 
# 构建一个跟踪最高价和最低价的对象(价格跟踪器)
class PriceTracker:
    # 初始化 当前最新价、最高价、最低价
    def __init__(self, initial_price):
        self.current_high = initial_price
        self.current_low = initial_price
 
    def set_high_and_low_price(self, high, low):
        self.current_high = high
        self.current_low = low
 
    # 将最新价分别传给 最高价和最低价函数
    def set_current_price(self, new_price):
        self.update_and_get_high(new_price)
        self.update_and_get_low(new_price)
 
    # 构建计算最高价函数
    def update_and_get_high(self, new_price):
        if new_price > self.current_high:
            self.current_high = new_price
        return self.current_high  # 返回新高
        # 如果价格没有变化,则不返回任何值(或可以选择返回None)
 
    # 构建计算最低价函数
    def update_and_get_low(self, new_price):
        if new_price < self.current_low:
            self.current_low = new_price
        return self.current_low  # 返回新低
        # 如果价格没有变化,则不返回任何值(或可以选择返回None)
 
 
# 构建从缓存中查询获取开盘价的函数
def get_symbol_current_open(symbol):
    if data_cache.all_stocks_current_open.get(symbol) is not None:
        return data_cache.all_stocks_current_open[symbol]['current_open']
    else:
        return None
 
 
# 对一个计算出的最高价或最低价 进行初始化
__current_high_or_low_dict = {}
# 对一个计算出的最高价和最低价 进行初始化
__current_high_and_low_dict = {}
 
 
#
def get_all_stocks_current_high_and_low(current_infos):
    """
    生成所有个股的最高价、最低价 字典
    :param current_infos:
    :return:
    """
    # 获取当前时间
    now_time = tool.get_now_time_str()
    # 如果当前时间大于09:25:06才运行最高价和最低价的运算
    if data_cache.LATER_OPEN_BIDDING_TIME < now_time:
        # if data_cache.AFTER_CLOSING_TIME < now_time:
        if data_cache.now_time < data_cache.OPENING_TIME:
            logger.info(f"【在】开盘前启动,采用【华鑫数据】记录 最高最低价")
            # # 如果在9:30前启动,则只采用【华鑫数据】记录 最高最低价
            # while True:
            # print(f"current_info=={current_infos}")
            for current_info in current_infos:
                symbol = basic_methods.format_stock_symbol(current_info[0])  #股票代码
                pre_close = current_info[1]  # 昨日收盘价
                current_price = current_info[2]  # 最新价
                current_quotes_buy_1_price = current_info[5][0][0]  #买一价格
                price_tracker = __current_high_or_low_dict.get(symbol)
                if not price_tracker:
                    # 赋初值
                    price_tracker = PriceTracker(current_price)
                    __current_high_or_low_dict[symbol] = price_tracker
                get_current_high_or_low = price_tracker
                # print(f"current_price>>>>>>==={current_price}")
                if current_price is not None:
                    # 为避免L1数据中最新价偶发为0,在最新价为0时使用买一价记录
                    if current_price > 0:
                        # logger.info(
                        #     f"《current_price 不为空 也不为0.0 也不为0 当日当时最新价:{current_price}》")
                        get_current_high_or_low.set_current_price(current_price)
                        data_cache.all_stocks_current_high_and_low[symbol] = {
                            'current_high': get_current_high_or_low.current_high,
                            'current_low': get_current_high_or_low.current_low}
                    elif current_quotes_buy_1_price > 0:
                        # logger.info(
                        #     f"代码:{symbol}::《current_price 未获取成功 或 值为0.0 零食采用买一价作为最新价,买一价:{current_quotes_buy_1_price}》 ")
                        get_current_high_or_low.set_current_price(current_quotes_buy_1_price)
                        data_cache.all_stocks_current_high_and_low[symbol] = {
                            'current_high': get_current_high_or_low.current_high,
                            'current_low': get_current_high_or_low.current_low}
                    elif pre_close > 0:
                        logger.info(
                            f"最新价和买一价获取失败或有误,获取到的当日当时最新价:{current_price},买一价:{current_quotes_buy_1_price}》,临时性采用昨收价作为最新价,昨收价:{pre_close}")
                        get_current_high_or_low.set_current_price(pre_close)
                        data_cache.all_stocks_current_high_and_low[symbol] = {
                            'current_high': get_current_high_or_low.current_high,
                            'current_low': get_current_high_or_low.current_low}
        else:
            logger.info(f"【没有】开盘前启动,采用【掘金数据】初始化 最高价最低价,采用【华鑫数据】更新 最高最低价")
            # print(f"当前时间更新时间:{now_time}")
            if not __current_high_or_low_dict:
                # 还没初始化
                # current_datas = current(symbols=data_cache.DataCache().min_stocks, fields='symbol,high,low')
                current_datas = utils.juejin_api.JueJinApi.get_codes_high_and_low(
                    data_cache.DataCache().min_stocks, fields='symbol,high,low')
                # print(f"current_datas=={current_datas}")
                for current_data in current_datas:
                    symbol, high, low = current_data['symbol'], current_data['high'], current_data['low']
                    __current_high_or_low_dict[symbol] = PriceTracker(0)
                    __current_high_or_low_dict[symbol].set_high_and_low_price(high, low)
            # print(f"完成掘金初始化:{now_time}")
            for current_info in current_infos:
                # print(f"开始循环current_infos")
                symbol = basic_methods.format_stock_symbol(current_info[0])
                if symbol not in data_cache.DataCache().min_stocks:
                    continue
                # if symbol.find("300810") > 0:
                #     print(f"开始循环current_infos:"+symbol)
                current_price = current_info[2]
 
                price_track_manage = __current_high_or_low_dict.get(symbol)
                if not price_track_manage:
                    # 初始化
                    # current_datas = current(symbols=[symbol], fields='symbol,high,low')
                    current_datas = utils.juejin_api.JueJinApi.get_codes_high_and_low(
                        data_cache.DataCache().min_stocks, fields='symbol,high,low')
                    current_data = current_datas[0]
                    # print(f"开始实例化对象")
                    symbol, high, low = current_data['symbol'], current_data['high'], current_data['low']
                    price_track_manage = PriceTracker(0)
                    __current_high_or_low_dict[symbol] = price_track_manage
                    __current_high_or_low_dict[symbol].set_high_and_low_price(high, low)
 
                if current_price is not None:
                    price_track_manage.set_current_price(current_price)
                    # print(f"开始更新最低价和最高价")
                    data_cache.all_stocks_current_high_and_low[symbol] = {
                        'current_high': price_track_manage.current_high,
                        'current_low': price_track_manage.current_low}
                    # print(f"data_cache.all_stocks_current_high_and_low[symbol]==={data_cache.all_stocks_current_high_and_low[symbol]}")
 
 
# 构建获取个股记录下来的实时当日最高价函数
def get_symbol_current_high(symbol):
    if data_cache.all_stocks_current_high_and_low.get(symbol) is not None:
        return data_cache.all_stocks_current_high_and_low[symbol]['current_high']
    else:
        return None
 
 
# 构建获取个股记录下来的实时当日最低价函数
def get_symbol_current_low(symbol):
    if data_cache.all_stocks_current_high_and_low.get(symbol) is not None:
        return data_cache.all_stocks_current_high_and_low[symbol]['current_low']
    else:
        return None
 
 
# 获取当前L1行情数据
def get_current_info():
    logging.info(f"get_current_info进入")
    # shm = SharedMemoryObj(name="l1_data_shared_memory", size=5 * 1024 * 1024)
    while True:
        try:
            now_start = time.time()
            current_infos = l1_data_api.get_current_info()
            now_time = tool.get_now_time_str()
            if len(current_infos) == 0 and now_time > data_cache.L1_DATA_START_TIME:
                print(f"9:15后 l1数据为空=l1_data_current_infos===={current_infos}")
            # for i in current_infos:
            #     if i[0] == '000001':
            #         print(f"i===={i}")
            get_all_stocks_current_open(current_infos)
            get_all_stocks_current_high_and_low(current_infos)
 
            # 保存现价
            for current_info in current_infos:
                data_cache.current_l1_dict[current_info[0]] = current_info
 
            for current_info in current_infos:
                try:
                    if current_info is not None:
                        strategic_thread_manager(current_info)
                except Exception as error:
                    logging.exception(error)
                    # print("异常:", current_info)
            now_end: float = time.time()
            start_to_end = now_end - now_start
            print(f"运行中=={round(start_to_end, 2)} 秒")
            # logger.info(f"运行中=={round(start_to_end, 2)}秒")
        except Exception as error:
            logging.exception(error)
        finally:
            time.sleep(0.5)
 
 
# 把current_infos灌入相应的线程
def set_current_info(current_infos):
    # @dask.delayed
    def process_current_infos(current_info_list):
        __start_time = time.time()
        use_time_list = []
        for current_info in current_info_list:
            try:
                if current_info is not None:
                    _start_time = time.time()
                    strategic_thread_manager(current_info)
                    use_time_list.append((time.time() - _start_time, current_info[0]))
            except Exception as error:
                logging.exception(error)
                # print("异常:", current_info)
                logger_debug.exception(error)
                logger_debug.error(f"L1处理出错:{current_info}")
        use_time = time.time() - __start_time
        if use_time > 0.5:
            # 记录超过1s的数据
            async_log_util.info(logger_debug, "L1数据处理时间统计:thread-{} 总计用时-{} 平均耗时-{} 最大耗时-{}",
                                tool.get_thread_id(), use_time, sum([x[0] for x in use_time_list]) / len(use_time_list),
                                max(use_time_list, key=lambda e: e[0]))
 
    # @dask.delayed
    # def batch_process_current_infos(fs):
    #     return fs
 
    logging.info(f"set_current_info进入")
    now_start = time.time()
    try:
        now_time = tool.get_now_time_str()
        if len(current_infos) == 0 and now_time > data_cache.L1_DATA_START_TIME:
            print(f"9:15后 l1数据为空=l1_data_current_infos===={current_infos}")
        # for i in current_infos:
        #     if i[0] == '000001':
        #         print(f"i===={i}")
        get_all_stocks_current_open(current_infos)
        get_all_stocks_current_high_and_low(current_infos)
        if current_infos:
            # 保存现价
            for current_info in current_infos:
                data_cache.current_l1_dict[current_info[0]] = current_info
 
            # 分批处理数据
            # ds = []
            # total_count = len(current_infos)
            # page = 2
            # page_size = total_count // page + 1
            # for p in range(page):
            #     temp_list = current_infos[p * page_size:(p + 1) * page_size]
            #     ds.append(process_current_infos(temp_list))
            # dask_result = batch_process_current_infos(ds)
            # dask_result.compute()
            process_current_infos(current_infos)
        now_end: float = time.time()
        start_to_end = now_end - now_start
        print(f"运行中=={round(start_to_end, 2)} 秒")
        # logger.info(f"运行中=={round(start_to_end, 2)}秒")
    except Exception as error:
        logging.exception(error)
    finally:
        async_log_util.info(logger_debug, f"L1处理时间:{time.time() - now_start}")
 
# 仅仅用于测试数据进入策略后的数据情况
# get_current_info()