admin
66 分钟以前 bc1913abfdc364d97facb3edcd2d54130c8e1ce2
strategy/instant_time_market.py
@@ -5,14 +5,19 @@
import logging
import time
import datetime
import dask
import utils
from log_module.log import logger_common
from utils import huaxin_util
from log_module import async_log_util
from log_module.log import logger_common, logger_debug
from strategy.trade_setting import TradeSetting
from utils import huaxin_util, tool
# 引入华鑫API(小辉整理)
from strategy import l1_data_api
from strategy import data_cache
from strategy import basic_methods
from strategy import buying_strategy, selling_strategy, index_market_trend_strategy
from strategy import buying_strategy, selling_strategy, market_sentiment_analysis
# from low_suction.shared_memory_util import SharedMemoryObj
@@ -28,11 +33,15 @@
def index_market_current():
    logging.info(f"index_market_trend进入")
    while True:
        if not TradeSetting().get_running():
            # 已经暂停
            time.sleep(1)
            continue
        try:
            # 在data_cache中获取到推送过来的实时指数行情数据
            stock_index_dict = data_cache.stock_index_dict
            now_time = datetime.datetime.now().strftime("%H:%M:%S")
            if len(stock_index_dict) == 0 and data_cache.L1_data_start_time < now_time < data_cache.closing_time:
            now_time = tool.get_now_time_str()
            if len(stock_index_dict) == 0 and data_cache.L1_DATA_START_TIME < now_time < data_cache.CLOSING_TIME:
                print(f"9:15--15:00 实时指数数据为空===={stock_index_dict}")
            index_judge_thread_manager(stock_index_dict)
        except Exception as error:
@@ -62,19 +71,19 @@
    if index_market_info is not None:
        # 调用交易策略模块中的涨幅视界策略
        # 指数行情调用
        index_market_trend_strategy.instant_trend_strategy(index_market_info)
        market_sentiment_analysis.instant_trend_strategy(index_market_info)
# 生成所有个股的开盘价字典
def get_all_stocks_current_open(current_infos):
    pass
    # 获取当前时间
    now_time = datetime.datetime.now().strftime("%H:%M:%S")
    now_time = tool.get_now_time_str()
    # 如果当前时间大于09:25:06才运行最高价和最低价的运算
    if data_cache.later_open_bidding_time < now_time:
        # if data_cache.after_closing_time < now_time:
        if now_time < data_cache.opening_time and data_cache.record_current_open_execution is False:
            # if now_time < data_cache.after_closing_time:
    if data_cache.LATER_OPEN_BIDDING_TIME < now_time:
        # if data_cache.AFTER_CLOSING_TIME < now_time:
        if now_time < data_cache.OPENING_TIME and data_cache.record_current_open_execution is False:
            # if now_time < data_cache.AFTER_CLOSING_TIME:
            logger.info(f"在开盘前启动,采用【华鑫数据】记录 开盘价")
            data_cache.record_current_open_execution = True
            # print(f"current_info=={current_infos}")
@@ -96,7 +105,8 @@
                # logger.info(f"【没有】在集合竞价内启动,采用【掘金数据】记录")
                print(f"【没有】在开盘前内启动,采用【掘金数据】记录 开盘价")
                data_cache.record_current_open_execution = True
                current_datas = utils.juejin_api.JueJinApi.get_codes_open(data_cache.DataCache().filtered_stocks, fields='symbol,open')
                current_datas = utils.juejin_api.JueJinApi.get_codes_open(data_cache.DataCache().min_stocks,
                                                                          fields='symbol,open')
                # print(f"current_datas=={current_datas}")
                for current_data in current_datas:
                    # print(f"current_data=={current_data}")
@@ -155,23 +165,28 @@
__current_high_and_low_dict = {}
# 生成所有个股的最高价、最低价 字典
#
def get_all_stocks_current_high_and_low(current_infos):
    """
    生成所有个股的最高价、最低价 字典
    :param current_infos:
    :return:
    """
    # 获取当前时间
    now_time = datetime.datetime.now().strftime("%H:%M:%S")
    now_time = tool.get_now_time_str()
    # 如果当前时间大于09:25:06才运行最高价和最低价的运算
    if data_cache.later_open_bidding_time < now_time:
        # if data_cache.after_closing_time < now_time:
        if data_cache.Local_startup_time < data_cache.opening_time:
    if data_cache.LATER_OPEN_BIDDING_TIME < now_time:
        # if data_cache.AFTER_CLOSING_TIME < now_time:
        if data_cache.now_time < data_cache.OPENING_TIME:
            logger.info(f"【在】开盘前启动,采用【华鑫数据】记录 最高最低价")
            # # 如果在9:30前启动,则只采用【华鑫数据】记录 最高最低价
            # while True:
            # print(f"current_info=={current_infos}")
            for current_info in current_infos:
                symbol = basic_methods.format_stock_symbol(current_info[0])  #股票代码
                pre_close = current_info[1]        # 昨日收盘价
                current_price = current_info[2]   # 最新价
                current_quotes_buy_1_price = current_info[5][0][0]   #买一价格
                pre_close = current_info[1]  # 昨日收盘价
                current_price = current_info[2]  # 最新价
                current_quotes_buy_1_price = current_info[5][0][0]  #买一价格
                price_tracker = __current_high_or_low_dict.get(symbol)
                if not price_tracker:
                    # 赋初值
@@ -181,21 +196,21 @@
                # print(f"current_price>>>>>>==={current_price}")
                if current_price is not None:
                    # 为避免L1数据中最新价偶发为0,在最新价为0时使用买一价记录
                    if current_price != 0 or current_price != 0.0:
                    if current_price > 0:
                        # logger.info(
                        #     f"《current_price 不为空 也不为0.0 也不为0 当日当时最新价:{current_price}》")
                        get_current_high_or_low.set_current_price(current_price)
                        data_cache.all_stocks_current_high_and_low[symbol] = {
                            'current_high': get_current_high_or_low.current_high,
                            'current_low': get_current_high_or_low.current_low}
                    elif current_quotes_buy_1_price != 0 or current_quotes_buy_1_price != 0.0:
                    elif current_quotes_buy_1_price > 0:
                        # logger.info(
                        #     f"代码:{symbol}::《current_price 未获取成功 或 值为0.0 零食采用买一价作为最新价,买一价:{current_quotes_buy_1_price}》 ")
                        get_current_high_or_low.set_current_price(current_quotes_buy_1_price)
                        data_cache.all_stocks_current_high_and_low[symbol] = {
                            'current_high': get_current_high_or_low.current_high,
                            'current_low': get_current_high_or_low.current_low}
                    elif pre_close != 0 or pre_close != 0.0:
                    elif pre_close > 0:
                        logger.info(
                            f"最新价和买一价获取失败或有误,获取到的当日当时最新价:{current_price},买一价:{current_quotes_buy_1_price}》,临时性采用昨收价作为最新价,昨收价:{pre_close}")
                        get_current_high_or_low.set_current_price(pre_close)
@@ -207,9 +222,9 @@
            # print(f"当前时间更新时间:{now_time}")
            if not __current_high_or_low_dict:
                # 还没初始化
                # current_datas = current(symbols=data_cache.DataCache().filtered_stocks, fields='symbol,high,low')
                # current_datas = current(symbols=data_cache.DataCache().min_stocks, fields='symbol,high,low')
                current_datas = utils.juejin_api.JueJinApi.get_codes_high_and_low(
                    data_cache.DataCache().filtered_stocks, fields='symbol,high,low')
                    data_cache.DataCache().min_stocks, fields='symbol,high,low')
                # print(f"current_datas=={current_datas}")
                for current_data in current_datas:
                    symbol, high, low = current_data['symbol'], current_data['high'], current_data['low']
@@ -219,7 +234,7 @@
            for current_info in current_infos:
                # print(f"开始循环current_infos")
                symbol = basic_methods.format_stock_symbol(current_info[0])
                if symbol not in data_cache.DataCache().filtered_stocks:
                if symbol not in data_cache.DataCache().min_stocks:
                    continue
                # if symbol.find("300810") > 0:
                #     print(f"开始循环current_infos:"+symbol)
@@ -230,7 +245,7 @@
                    # 初始化
                    # current_datas = current(symbols=[symbol], fields='symbol,high,low')
                    current_datas = utils.juejin_api.JueJinApi.get_codes_high_and_low(
                        data_cache.DataCache().filtered_stocks, fields='symbol,high,low')
                        data_cache.DataCache().min_stocks, fields='symbol,high,low')
                    current_data = current_datas[0]
                    # print(f"开始实例化对象")
                    symbol, high, low = current_data['symbol'], current_data['high'], current_data['low']
@@ -262,6 +277,7 @@
    else:
        return None
# 获取当前L1行情数据
def get_current_info():
    logging.info(f"get_current_info进入")
@@ -270,14 +286,19 @@
        try:
            now_start = time.time()
            current_infos = l1_data_api.get_current_info()
            now_time = datetime.datetime.now().strftime("%H:%M:%S")
            if len(current_infos) == 0 and now_time > data_cache.L1_data_start_time:
            now_time = tool.get_now_time_str()
            if len(current_infos) == 0 and now_time > data_cache.L1_DATA_START_TIME:
                print(f"9:15后 l1数据为空=l1_data_current_infos===={current_infos}")
            # for i in current_infos:
            #     if i[0] == '000001':
            #         print(f"i===={i}")
            get_all_stocks_current_open(current_infos)
            get_all_stocks_current_high_and_low(current_infos)
            # 保存现价
            for current_info in current_infos:
                data_cache.current_l1_dict[current_info[0]] = current_info
            for current_info in current_infos:
                try:
                    if current_info is not None:
@@ -295,27 +316,61 @@
            time.sleep(0.5)
# 把current_infos灌入相应的线程
def set_current_info(current_infos):
    # @dask.delayed
    def process_current_infos(current_info_list):
        __start_time = time.time()
        use_time_list = []
        for current_info in current_info_list:
            try:
                if current_info is not None:
                    _start_time = time.time()
                    strategic_thread_manager(current_info)
                    use_time_list.append((time.time() - _start_time, current_info[0]))
            except Exception as error:
                logging.exception(error)
                # print("异常:", current_info)
                logger_debug.exception(error)
                logger_debug.error(f"L1处理出错:{current_info}")
        use_time = time.time() - __start_time
        if use_time > 0.5:
            # 记录超过1s的数据
            async_log_util.info(logger_debug, "L1数据处理时间统计:thread-{} 总计用时-{} 平均耗时-{} 最大耗时-{}",
                                tool.get_thread_id(), use_time, sum([x[0] for x in use_time_list]) / len(use_time_list),
                                max(use_time_list, key=lambda e: e[0]))
    # @dask.delayed
    # def batch_process_current_infos(fs):
    #     return fs
    logging.info(f"set_current_info进入")
    now_start = time.time()
    try:
        now_start = time.time()
        now_time = datetime.datetime.now().strftime("%H:%M:%S")
        if len(current_infos) == 0 and now_time > data_cache.L1_data_start_time:
        now_time = tool.get_now_time_str()
        if len(current_infos) == 0 and now_time > data_cache.L1_DATA_START_TIME:
            print(f"9:15后 l1数据为空=l1_data_current_infos===={current_infos}")
        # for i in current_infos:
        #     if i[0] == '000001':
        #         print(f"i===={i}")
        get_all_stocks_current_open(current_infos)
        get_all_stocks_current_high_and_low(current_infos)
        for current_info in current_infos:
            try:
                if current_info is not None:
                    strategic_thread_manager(current_info)
            except Exception as error:
                logging.exception(error)
                # print("异常:", current_info)
        if current_infos:
            # 保存现价
            for current_info in current_infos:
                data_cache.current_l1_dict[current_info[0]] = current_info
            # 分批处理数据
            # ds = []
            # total_count = len(current_infos)
            # page = 2
            # page_size = total_count // page + 1
            # for p in range(page):
            #     temp_list = current_infos[p * page_size:(p + 1) * page_size]
            #     ds.append(process_current_infos(temp_list))
            # dask_result = batch_process_current_infos(ds)
            # dask_result.compute()
            process_current_infos(current_infos)
        now_end: float = time.time()
        start_to_end = now_end - now_start
        print(f"运行中=={round(start_to_end, 2)} 秒")
@@ -323,8 +378,7 @@
    except Exception as error:
        logging.exception(error)
    finally:
        pass
        async_log_util.info(logger_debug, f"L1处理时间:{time.time() - now_start}")
# 仅仅用于测试数据进入策略后的数据情况
# get_current_info()