# 分时量分析,即情绪面(针对瞬时行情信息进行获取与分析)
|
# 计划将 current 和 subscribe(行情订阅) 线程 放在这里
|
|
import json
|
import logging
|
import time
|
import datetime
|
|
import dask
|
|
import utils
|
from log_module import async_log_util
|
from log_module.log import logger_common, logger_debug
|
from strategy.trade_setting import TradeSetting
|
from utils import huaxin_util, tool
|
# 引入华鑫API(小辉整理)
|
from strategy import l1_data_api
|
from strategy import data_cache
|
from strategy import basic_methods
|
from strategy import buying_strategy, selling_strategy, market_sentiment_analysis
|
|
# from low_suction.shared_memory_util import SharedMemoryObj
|
|
# 获取logger实例
|
logger = logger_common
|
|
'''
|
创建一个函数来对主要指数的实时行情作处理
|
'''
|
|
|
# 获取实时指数行情函数
|
def index_market_current():
|
logging.info(f"index_market_trend进入")
|
while True:
|
if not TradeSetting().get_running():
|
# 已经暂停
|
time.sleep(1)
|
continue
|
try:
|
# 在data_cache中获取到推送过来的实时指数行情数据
|
stock_index_dict = data_cache.stock_index_dict
|
now_time = tool.get_now_time_str()
|
if len(stock_index_dict) == 0 and data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME:
|
print(f"9:15--15:00 实时指数数据为空===={stock_index_dict}")
|
index_judge_thread_manager(stock_index_dict)
|
except Exception as error:
|
logging.exception(error)
|
finally:
|
time.sleep(1)
|
|
|
"""
|
创建一个线程来驱动拉取华鑫l1数据 给各个策略模块
|
"""
|
|
|
# 调用所有以current信息为核心策略的函数
|
def strategic_thread_manager(current_info):
|
if current_info is not None:
|
# 调用交易策略模块中的涨幅视界策略
|
# 买入策略调用
|
buying_strategy.growth_view_strategy(current_info)
|
# 卖出策略调用
|
selling_strategy.instantaneous_change_strategy(current_info)
|
# pass
|
|
|
# 调用以指数行情信息为核心策略的函数
|
def index_judge_thread_manager(index_market_info):
|
if index_market_info is not None:
|
# 调用交易策略模块中的涨幅视界策略
|
# 指数行情调用
|
market_sentiment_analysis.instant_trend_strategy(index_market_info)
|
|
|
# 生成所有个股的开盘价字典
|
def get_all_stocks_current_open(current_infos):
|
pass
|
# 获取当前时间
|
now_time = tool.get_now_time_str()
|
# 如果当前时间大于09:25:06才运行最高价和最低价的运算
|
if data_cache.LATER_OPEN_BIDDING_TIME < now_time:
|
# if data_cache.AFTER_CLOSING_TIME < now_time:
|
if now_time < data_cache.OPENING_TIME and data_cache.record_current_open_execution is False:
|
# if now_time < data_cache.AFTER_CLOSING_TIME:
|
logger.info(f"在开盘前启动,采用【华鑫数据】记录 开盘价")
|
data_cache.record_current_open_execution = True
|
# print(f"current_info=={current_infos}")
|
for current_info in current_infos:
|
# 检查股票是否已经在data_cache中
|
# if current_info[0] not in data_cache.all_stocks_current_open:
|
symbol = basic_methods.format_stock_symbol(current_info[0])
|
data_cache.all_stocks_current_open[symbol] = {'current_open': current_info[2]}
|
# print(f"data_cache.all_stocks_current_open=={data_cache.all_stocks_current_open}")
|
# json_data = data_cache.all_stocks_current_open
|
# 将转换后的JSON字符串写入文件(目前考虑取消数据存储本地)
|
# with open('local_storage_data/all_stocks_current_open.json', 'w', encoding='utf-8') as f:
|
# # 将字典转换为 JSON 格式的字符串
|
# json_data = json.dumps(data_cache.all_stocks_current_open, ensure_ascii=False, indent=4)
|
# f.write(json_data)
|
else:
|
# 如果没有在规定时间内运行成功,采用掘金数据(只判断一次)
|
if data_cache.record_current_open_execution is False:
|
# logger.info(f"【没有】在集合竞价内启动,采用【掘金数据】记录")
|
print(f"【没有】在开盘前内启动,采用【掘金数据】记录 开盘价")
|
data_cache.record_current_open_execution = True
|
current_datas = utils.juejin_api.JueJinApi.get_codes_open(data_cache.DataCache().min_stocks,
|
fields='symbol,open')
|
# print(f"current_datas=={current_datas}")
|
for current_data in current_datas:
|
# print(f"current_data=={current_data}")
|
# 检查股票是否已经在data_cache中
|
# if current_data[0] not in data_cache.all_stocks_current_open:
|
data_cache.all_stocks_current_open[current_data['symbol']] = {'current_open': current_data['open']}
|
# 将转换后的JSON字符串写入文件(目前取消数据存储本地,如需存储本地也要放在D:盘路径)
|
# with open('local_storage_data/all_stocks_current_open.json', 'w', encoding='utf-8') as f:
|
# # 将字典转换为 JSON 格式的字符串
|
# json_data = json.dumps(data_cache.all_stocks_current_open, ensure_ascii=False, indent=4)
|
# f.write(json_data)
|
|
|
# 构建一个跟踪最高价和最低价的对象(价格跟踪器)
|
class PriceTracker:
|
# 初始化 当前最新价、最高价、最低价
|
def __init__(self, initial_price):
|
self.current_high = initial_price
|
self.current_low = initial_price
|
|
def set_high_and_low_price(self, high, low):
|
self.current_high = high
|
self.current_low = low
|
|
# 将最新价分别传给 最高价和最低价函数
|
def set_current_price(self, new_price):
|
self.update_and_get_high(new_price)
|
self.update_and_get_low(new_price)
|
|
# 构建计算最高价函数
|
def update_and_get_high(self, new_price):
|
if new_price > self.current_high:
|
self.current_high = new_price
|
return self.current_high # 返回新高
|
# 如果价格没有变化,则不返回任何值(或可以选择返回None)
|
|
# 构建计算最低价函数
|
def update_and_get_low(self, new_price):
|
if new_price < self.current_low:
|
self.current_low = new_price
|
return self.current_low # 返回新低
|
# 如果价格没有变化,则不返回任何值(或可以选择返回None)
|
|
|
# 构建从缓存中查询获取开盘价的函数
|
def get_symbol_current_open(symbol):
|
if data_cache.all_stocks_current_open.get(symbol) is not None:
|
return data_cache.all_stocks_current_open[symbol]['current_open']
|
else:
|
return None
|
|
|
# 对一个计算出的最高价或最低价 进行初始化
|
__current_high_or_low_dict = {}
|
# 对一个计算出的最高价和最低价 进行初始化
|
__current_high_and_low_dict = {}
|
|
|
#
|
def get_all_stocks_current_high_and_low(current_infos):
|
"""
|
生成所有个股的最高价、最低价 字典
|
:param current_infos:
|
:return:
|
"""
|
# 获取当前时间
|
now_time = tool.get_now_time_str()
|
# 如果当前时间大于09:25:06才运行最高价和最低价的运算
|
if data_cache.LATER_OPEN_BIDDING_TIME < now_time:
|
# if data_cache.AFTER_CLOSING_TIME < now_time:
|
if data_cache.now_time < data_cache.OPENING_TIME:
|
logger.info(f"【在】开盘前启动,采用【华鑫数据】记录 最高最低价")
|
# # 如果在9:30前启动,则只采用【华鑫数据】记录 最高最低价
|
# while True:
|
# print(f"current_info=={current_infos}")
|
for current_info in current_infos:
|
symbol = basic_methods.format_stock_symbol(current_info[0]) #股票代码
|
pre_close = current_info[1] # 昨日收盘价
|
current_price = current_info[2] # 最新价
|
current_quotes_buy_1_price = current_info[5][0][0] #买一价格
|
price_tracker = __current_high_or_low_dict.get(symbol)
|
if not price_tracker:
|
# 赋初值
|
price_tracker = PriceTracker(current_price)
|
__current_high_or_low_dict[symbol] = price_tracker
|
get_current_high_or_low = price_tracker
|
# print(f"current_price>>>>>>==={current_price}")
|
if current_price is not None:
|
# 为避免L1数据中最新价偶发为0,在最新价为0时使用买一价记录
|
if current_price > 0:
|
# logger.info(
|
# f"《current_price 不为空 也不为0.0 也不为0 当日当时最新价:{current_price}》")
|
get_current_high_or_low.set_current_price(current_price)
|
data_cache.all_stocks_current_high_and_low[symbol] = {
|
'current_high': get_current_high_or_low.current_high,
|
'current_low': get_current_high_or_low.current_low}
|
elif current_quotes_buy_1_price > 0:
|
# logger.info(
|
# f"代码:{symbol}::《current_price 未获取成功 或 值为0.0 零食采用买一价作为最新价,买一价:{current_quotes_buy_1_price}》 ")
|
get_current_high_or_low.set_current_price(current_quotes_buy_1_price)
|
data_cache.all_stocks_current_high_and_low[symbol] = {
|
'current_high': get_current_high_or_low.current_high,
|
'current_low': get_current_high_or_low.current_low}
|
elif pre_close > 0:
|
logger.info(
|
f"最新价和买一价获取失败或有误,获取到的当日当时最新价:{current_price},买一价:{current_quotes_buy_1_price}》,临时性采用昨收价作为最新价,昨收价:{pre_close}")
|
get_current_high_or_low.set_current_price(pre_close)
|
data_cache.all_stocks_current_high_and_low[symbol] = {
|
'current_high': get_current_high_or_low.current_high,
|
'current_low': get_current_high_or_low.current_low}
|
else:
|
logger.info(f"【没有】开盘前启动,采用【掘金数据】初始化 最高价最低价,采用【华鑫数据】更新 最高最低价")
|
# print(f"当前时间更新时间:{now_time}")
|
if not __current_high_or_low_dict:
|
# 还没初始化
|
# current_datas = current(symbols=data_cache.DataCache().min_stocks, fields='symbol,high,low')
|
current_datas = utils.juejin_api.JueJinApi.get_codes_high_and_low(
|
data_cache.DataCache().min_stocks, fields='symbol,high,low')
|
# print(f"current_datas=={current_datas}")
|
for current_data in current_datas:
|
symbol, high, low = current_data['symbol'], current_data['high'], current_data['low']
|
__current_high_or_low_dict[symbol] = PriceTracker(0)
|
__current_high_or_low_dict[symbol].set_high_and_low_price(high, low)
|
# print(f"完成掘金初始化:{now_time}")
|
for current_info in current_infos:
|
# print(f"开始循环current_infos")
|
symbol = basic_methods.format_stock_symbol(current_info[0])
|
if symbol not in data_cache.DataCache().min_stocks:
|
continue
|
# if symbol.find("300810") > 0:
|
# print(f"开始循环current_infos:"+symbol)
|
current_price = current_info[2]
|
|
price_track_manage = __current_high_or_low_dict.get(symbol)
|
if not price_track_manage:
|
# 初始化
|
# current_datas = current(symbols=[symbol], fields='symbol,high,low')
|
current_datas = utils.juejin_api.JueJinApi.get_codes_high_and_low(
|
data_cache.DataCache().min_stocks, fields='symbol,high,low')
|
current_data = current_datas[0]
|
# print(f"开始实例化对象")
|
symbol, high, low = current_data['symbol'], current_data['high'], current_data['low']
|
price_track_manage = PriceTracker(0)
|
__current_high_or_low_dict[symbol] = price_track_manage
|
__current_high_or_low_dict[symbol].set_high_and_low_price(high, low)
|
|
if current_price is not None:
|
price_track_manage.set_current_price(current_price)
|
# print(f"开始更新最低价和最高价")
|
data_cache.all_stocks_current_high_and_low[symbol] = {
|
'current_high': price_track_manage.current_high,
|
'current_low': price_track_manage.current_low}
|
# print(f"data_cache.all_stocks_current_high_and_low[symbol]==={data_cache.all_stocks_current_high_and_low[symbol]}")
|
|
|
# 构建获取个股记录下来的实时当日最高价函数
|
def get_symbol_current_high(symbol):
|
if data_cache.all_stocks_current_high_and_low.get(symbol) is not None:
|
return data_cache.all_stocks_current_high_and_low[symbol]['current_high']
|
else:
|
return None
|
|
|
# 构建获取个股记录下来的实时当日最低价函数
|
def get_symbol_current_low(symbol):
|
if data_cache.all_stocks_current_high_and_low.get(symbol) is not None:
|
return data_cache.all_stocks_current_high_and_low[symbol]['current_low']
|
else:
|
return None
|
|
|
# 获取当前L1行情数据
|
def get_current_info():
|
logging.info(f"get_current_info进入")
|
# shm = SharedMemoryObj(name="l1_data_shared_memory", size=5 * 1024 * 1024)
|
while True:
|
try:
|
now_start = time.time()
|
current_infos = l1_data_api.get_current_info()
|
now_time = tool.get_now_time_str()
|
if len(current_infos) == 0 and now_time > data_cache.L1_DATA_START_TIME:
|
print(f"9:15后 l1数据为空=l1_data_current_infos===={current_infos}")
|
# for i in current_infos:
|
# if i[0] == '000001':
|
# print(f"i===={i}")
|
get_all_stocks_current_open(current_infos)
|
get_all_stocks_current_high_and_low(current_infos)
|
|
# 保存现价
|
for current_info in current_infos:
|
data_cache.current_l1_dict[current_info[0]] = current_info
|
|
for current_info in current_infos:
|
try:
|
if current_info is not None:
|
strategic_thread_manager(current_info)
|
except Exception as error:
|
logging.exception(error)
|
# print("异常:", current_info)
|
now_end: float = time.time()
|
start_to_end = now_end - now_start
|
print(f"运行中=={round(start_to_end, 2)} 秒")
|
# logger.info(f"运行中=={round(start_to_end, 2)}秒")
|
except Exception as error:
|
logging.exception(error)
|
finally:
|
time.sleep(0.5)
|
|
|
# 把current_infos灌入相应的线程
|
def set_current_info(current_infos):
|
# @dask.delayed
|
def process_current_infos(current_info_list):
|
__start_time = time.time()
|
use_time_list = []
|
for current_info in current_info_list:
|
try:
|
if current_info is not None:
|
_start_time = time.time()
|
strategic_thread_manager(current_info)
|
use_time_list.append((time.time() - _start_time, current_info[0]))
|
except Exception as error:
|
logging.exception(error)
|
# print("异常:", current_info)
|
logger_debug.exception(error)
|
logger_debug.error(f"L1处理出错:{current_info}")
|
use_time = time.time() - __start_time
|
if use_time > 0.5:
|
# 记录超过1s的数据
|
async_log_util.info(logger_debug, "L1数据处理时间统计:thread-{} 总计用时-{} 平均耗时-{} 最大耗时-{}",
|
tool.get_thread_id(), use_time, sum([x[0] for x in use_time_list]) / len(use_time_list),
|
max(use_time_list, key=lambda e: e[0]))
|
|
# @dask.delayed
|
# def batch_process_current_infos(fs):
|
# return fs
|
|
logging.info(f"set_current_info进入")
|
now_start = time.time()
|
try:
|
now_time = tool.get_now_time_str()
|
if len(current_infos) == 0 and now_time > data_cache.L1_DATA_START_TIME:
|
print(f"9:15后 l1数据为空=l1_data_current_infos===={current_infos}")
|
# for i in current_infos:
|
# if i[0] == '000001':
|
# print(f"i===={i}")
|
get_all_stocks_current_open(current_infos)
|
get_all_stocks_current_high_and_low(current_infos)
|
if current_infos:
|
# 保存现价
|
for current_info in current_infos:
|
data_cache.current_l1_dict[current_info[0]] = current_info
|
|
# 分批处理数据
|
# ds = []
|
# total_count = len(current_infos)
|
# page = 2
|
# page_size = total_count // page + 1
|
# for p in range(page):
|
# temp_list = current_infos[p * page_size:(p + 1) * page_size]
|
# ds.append(process_current_infos(temp_list))
|
# dask_result = batch_process_current_infos(ds)
|
# dask_result.compute()
|
process_current_infos(current_infos)
|
now_end: float = time.time()
|
start_to_end = now_end - now_start
|
print(f"运行中=={round(start_to_end, 2)} 秒")
|
# logger.info(f"运行中=={round(start_to_end, 2)}秒")
|
except Exception as error:
|
logging.exception(error)
|
finally:
|
async_log_util.info(logger_debug, f"L1处理时间:{time.time() - now_start}")
|
|
# 仅仅用于测试数据进入策略后的数据情况
|
# get_current_info()
|