# 分时量分析,即情绪面(针对瞬时行情信息进行获取与分析) # 计划将 current 和 subscribe(行情订阅) 线程 放在这里 import json import logging import time import datetime import dask import utils from log_module import async_log_util from log_module.log import logger_common, logger_debug from strategy.trade_setting import TradeSetting from utils import huaxin_util, tool # 引入华鑫API(小辉整理) from strategy import l1_data_api from strategy import data_cache from strategy import basic_methods from strategy import buying_strategy, selling_strategy, market_sentiment_analysis # from low_suction.shared_memory_util import SharedMemoryObj # 获取logger实例 logger = logger_common ''' 创建一个函数来对主要指数的实时行情作处理 ''' # 获取实时指数行情函数 def index_market_current(): logging.info(f"index_market_trend进入") while True: if not TradeSetting().get_running(): # 已经暂停 time.sleep(1) continue try: # 在data_cache中获取到推送过来的实时指数行情数据 stock_index_dict = data_cache.stock_index_dict now_time = tool.get_now_time_str() if len(stock_index_dict) == 0 and data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME: print(f"9:15--15:00 实时指数数据为空===={stock_index_dict}") index_judge_thread_manager(stock_index_dict) except Exception as error: logging.exception(error) finally: time.sleep(1) """ 创建一个线程来驱动拉取华鑫l1数据 给各个策略模块 """ # 调用所有以current信息为核心策略的函数 def strategic_thread_manager(current_info): if current_info is not None: # 调用交易策略模块中的涨幅视界策略 # 买入策略调用 buying_strategy.growth_view_strategy(current_info) # 卖出策略调用 selling_strategy.instantaneous_change_strategy(current_info) # pass # 调用以指数行情信息为核心策略的函数 def index_judge_thread_manager(index_market_info): if index_market_info is not None: # 调用交易策略模块中的涨幅视界策略 # 指数行情调用 market_sentiment_analysis.instant_trend_strategy(index_market_info) # 生成所有个股的开盘价字典 def get_all_stocks_current_open(current_infos): pass # 获取当前时间 now_time = tool.get_now_time_str() # 如果当前时间大于09:25:06才运行最高价和最低价的运算 if data_cache.LATER_OPEN_BIDDING_TIME < now_time: # if data_cache.AFTER_CLOSING_TIME < now_time: if now_time < data_cache.OPENING_TIME and data_cache.record_current_open_execution is False: # if now_time < data_cache.AFTER_CLOSING_TIME: logger.info(f"在开盘前启动,采用【华鑫数据】记录 开盘价") data_cache.record_current_open_execution = True # print(f"current_info=={current_infos}") for current_info in current_infos: # 检查股票是否已经在data_cache中 # if current_info[0] not in data_cache.all_stocks_current_open: symbol = basic_methods.format_stock_symbol(current_info[0]) data_cache.all_stocks_current_open[symbol] = {'current_open': current_info[2]} # print(f"data_cache.all_stocks_current_open=={data_cache.all_stocks_current_open}") # json_data = data_cache.all_stocks_current_open # 将转换后的JSON字符串写入文件(目前考虑取消数据存储本地) # with open('local_storage_data/all_stocks_current_open.json', 'w', encoding='utf-8') as f: # # 将字典转换为 JSON 格式的字符串 # json_data = json.dumps(data_cache.all_stocks_current_open, ensure_ascii=False, indent=4) # f.write(json_data) else: # 如果没有在规定时间内运行成功,采用掘金数据(只判断一次) if data_cache.record_current_open_execution is False: # logger.info(f"【没有】在集合竞价内启动,采用【掘金数据】记录") print(f"【没有】在开盘前内启动,采用【掘金数据】记录 开盘价") data_cache.record_current_open_execution = True current_datas = utils.juejin_api.JueJinApi.get_codes_open(data_cache.DataCache().min_stocks, fields='symbol,open') # print(f"current_datas=={current_datas}") for current_data in current_datas: # print(f"current_data=={current_data}") # 检查股票是否已经在data_cache中 # if current_data[0] not in data_cache.all_stocks_current_open: data_cache.all_stocks_current_open[current_data['symbol']] = {'current_open': current_data['open']} # 将转换后的JSON字符串写入文件(目前取消数据存储本地,如需存储本地也要放在D:盘路径) # with open('local_storage_data/all_stocks_current_open.json', 'w', encoding='utf-8') as f: # # 将字典转换为 JSON 格式的字符串 # json_data = json.dumps(data_cache.all_stocks_current_open, ensure_ascii=False, indent=4) # f.write(json_data) # 构建一个跟踪最高价和最低价的对象(价格跟踪器) class PriceTracker: # 初始化 当前最新价、最高价、最低价 def __init__(self, initial_price): self.current_high = initial_price self.current_low = initial_price def set_high_and_low_price(self, high, low): self.current_high = high self.current_low = low # 将最新价分别传给 最高价和最低价函数 def set_current_price(self, new_price): self.update_and_get_high(new_price) self.update_and_get_low(new_price) # 构建计算最高价函数 def update_and_get_high(self, new_price): if new_price > self.current_high: self.current_high = new_price return self.current_high # 返回新高 # 如果价格没有变化,则不返回任何值(或可以选择返回None) # 构建计算最低价函数 def update_and_get_low(self, new_price): if new_price < self.current_low: self.current_low = new_price return self.current_low # 返回新低 # 如果价格没有变化,则不返回任何值(或可以选择返回None) # 构建从缓存中查询获取开盘价的函数 def get_symbol_current_open(symbol): if data_cache.all_stocks_current_open.get(symbol) is not None: return data_cache.all_stocks_current_open[symbol]['current_open'] else: return None # 对一个计算出的最高价或最低价 进行初始化 __current_high_or_low_dict = {} # 对一个计算出的最高价和最低价 进行初始化 __current_high_and_low_dict = {} # def get_all_stocks_current_high_and_low(current_infos): """ 生成所有个股的最高价、最低价 字典 :param current_infos: :return: """ # 获取当前时间 now_time = tool.get_now_time_str() # 如果当前时间大于09:25:06才运行最高价和最低价的运算 if data_cache.LATER_OPEN_BIDDING_TIME < now_time: # if data_cache.AFTER_CLOSING_TIME < now_time: if data_cache.now_time < data_cache.OPENING_TIME: logger.info(f"【在】开盘前启动,采用【华鑫数据】记录 最高最低价") # # 如果在9:30前启动,则只采用【华鑫数据】记录 最高最低价 # while True: # print(f"current_info=={current_infos}") for current_info in current_infos: symbol = basic_methods.format_stock_symbol(current_info[0]) #股票代码 pre_close = current_info[1] # 昨日收盘价 current_price = current_info[2] # 最新价 current_quotes_buy_1_price = current_info[5][0][0] #买一价格 price_tracker = __current_high_or_low_dict.get(symbol) if not price_tracker: # 赋初值 price_tracker = PriceTracker(current_price) __current_high_or_low_dict[symbol] = price_tracker get_current_high_or_low = price_tracker # print(f"current_price>>>>>>==={current_price}") if current_price is not None: # 为避免L1数据中最新价偶发为0,在最新价为0时使用买一价记录 if current_price > 0: # logger.info( # f"《current_price 不为空 也不为0.0 也不为0 当日当时最新价:{current_price}》") get_current_high_or_low.set_current_price(current_price) data_cache.all_stocks_current_high_and_low[symbol] = { 'current_high': get_current_high_or_low.current_high, 'current_low': get_current_high_or_low.current_low} elif current_quotes_buy_1_price > 0: # logger.info( # f"代码:{symbol}::《current_price 未获取成功 或 值为0.0 零食采用买一价作为最新价,买一价:{current_quotes_buy_1_price}》 ") get_current_high_or_low.set_current_price(current_quotes_buy_1_price) data_cache.all_stocks_current_high_and_low[symbol] = { 'current_high': get_current_high_or_low.current_high, 'current_low': get_current_high_or_low.current_low} elif pre_close > 0: logger.info( f"最新价和买一价获取失败或有误,获取到的当日当时最新价:{current_price},买一价:{current_quotes_buy_1_price}》,临时性采用昨收价作为最新价,昨收价:{pre_close}") get_current_high_or_low.set_current_price(pre_close) data_cache.all_stocks_current_high_and_low[symbol] = { 'current_high': get_current_high_or_low.current_high, 'current_low': get_current_high_or_low.current_low} else: logger.info(f"【没有】开盘前启动,采用【掘金数据】初始化 最高价最低价,采用【华鑫数据】更新 最高最低价") # print(f"当前时间更新时间:{now_time}") if not __current_high_or_low_dict: # 还没初始化 # current_datas = current(symbols=data_cache.DataCache().min_stocks, fields='symbol,high,low') current_datas = utils.juejin_api.JueJinApi.get_codes_high_and_low( data_cache.DataCache().min_stocks, fields='symbol,high,low') # print(f"current_datas=={current_datas}") for current_data in current_datas: symbol, high, low = current_data['symbol'], current_data['high'], current_data['low'] __current_high_or_low_dict[symbol] = PriceTracker(0) __current_high_or_low_dict[symbol].set_high_and_low_price(high, low) # print(f"完成掘金初始化:{now_time}") for current_info in current_infos: # print(f"开始循环current_infos") symbol = basic_methods.format_stock_symbol(current_info[0]) if symbol not in data_cache.DataCache().min_stocks: continue # if symbol.find("300810") > 0: # print(f"开始循环current_infos:"+symbol) current_price = current_info[2] price_track_manage = __current_high_or_low_dict.get(symbol) if not price_track_manage: # 初始化 # current_datas = current(symbols=[symbol], fields='symbol,high,low') current_datas = utils.juejin_api.JueJinApi.get_codes_high_and_low( data_cache.DataCache().min_stocks, fields='symbol,high,low') current_data = current_datas[0] # print(f"开始实例化对象") symbol, high, low = current_data['symbol'], current_data['high'], current_data['low'] price_track_manage = PriceTracker(0) __current_high_or_low_dict[symbol] = price_track_manage __current_high_or_low_dict[symbol].set_high_and_low_price(high, low) if current_price is not None: price_track_manage.set_current_price(current_price) # print(f"开始更新最低价和最高价") data_cache.all_stocks_current_high_and_low[symbol] = { 'current_high': price_track_manage.current_high, 'current_low': price_track_manage.current_low} # print(f"data_cache.all_stocks_current_high_and_low[symbol]==={data_cache.all_stocks_current_high_and_low[symbol]}") # 构建获取个股记录下来的实时当日最高价函数 def get_symbol_current_high(symbol): if data_cache.all_stocks_current_high_and_low.get(symbol) is not None: return data_cache.all_stocks_current_high_and_low[symbol]['current_high'] else: return None # 构建获取个股记录下来的实时当日最低价函数 def get_symbol_current_low(symbol): if data_cache.all_stocks_current_high_and_low.get(symbol) is not None: return data_cache.all_stocks_current_high_and_low[symbol]['current_low'] else: return None # 获取当前L1行情数据 def get_current_info(): logging.info(f"get_current_info进入") # shm = SharedMemoryObj(name="l1_data_shared_memory", size=5 * 1024 * 1024) while True: try: now_start = time.time() current_infos = l1_data_api.get_current_info() now_time = tool.get_now_time_str() if len(current_infos) == 0 and now_time > data_cache.L1_DATA_START_TIME: print(f"9:15后 l1数据为空=l1_data_current_infos===={current_infos}") # for i in current_infos: # if i[0] == '000001': # print(f"i===={i}") get_all_stocks_current_open(current_infos) get_all_stocks_current_high_and_low(current_infos) for current_info in current_infos: try: if current_info is not None: strategic_thread_manager(current_info) except Exception as error: logging.exception(error) # print("异常:", current_info) now_end: float = time.time() start_to_end = now_end - now_start print(f"运行中=={round(start_to_end, 2)} 秒") # logger.info(f"运行中=={round(start_to_end, 2)}秒") except Exception as error: logging.exception(error) finally: time.sleep(0.5) # 把current_infos灌入相应的线程 def set_current_info(current_infos): @dask.delayed def process_current_infos(current_info_list): for current_info in current_info_list: try: if current_info is not None: strategic_thread_manager(current_info) except Exception as error: logging.exception(error) # print("异常:", current_info) logger_debug.exception(error) logger_debug.error(f"L1处理出错:{current_info}") @dask.delayed def batch_process_current_infos(fs): return fs logging.info(f"set_current_info进入") now_start = time.time() try: now_time = tool.get_now_time_str() if len(current_infos) == 0 and now_time > data_cache.L1_DATA_START_TIME: print(f"9:15后 l1数据为空=l1_data_current_infos===={current_infos}") # for i in current_infos: # if i[0] == '000001': # print(f"i===={i}") get_all_stocks_current_open(current_infos) get_all_stocks_current_high_and_low(current_infos) if current_infos: # 分批处理数据 ds = [] total_count = len(current_infos) page = 15 page_size = total_count // page + 1 for p in range(page): temp_list = current_infos[p * page_size:(p + 1) * page_size] ds.append(process_current_infos(temp_list)) dask_result = batch_process_current_infos(ds) dask_result.compute() now_end: float = time.time() start_to_end = now_end - now_start print(f"运行中=={round(start_to_end, 2)} 秒") # logger.info(f"运行中=={round(start_to_end, 2)}秒") except Exception as error: logging.exception(error) finally: async_log_util.info(logger_debug, f"L1处理时间:{time.time() - now_start}") # 仅仅用于测试数据进入策略后的数据情况 # get_current_info()