admin
2025-04-07 e7c7efc9d6fc46a2925fa5b2b74358c68b007df2
strategy/instant_time_market.py
@@ -5,8 +5,12 @@
import logging
import time
import datetime
import dask
import utils
from log_module.log import logger_common
from log_module import async_log_util
from log_module.log import logger_common, logger_debug
from strategy.trade_setting import TradeSetting
from utils import huaxin_util, tool
# 引入华鑫API(小辉整理)
@@ -101,7 +105,8 @@
                # logger.info(f"【没有】在集合竞价内启动,采用【掘金数据】记录")
                print(f"【没有】在开盘前内启动,采用【掘金数据】记录 开盘价")
                data_cache.record_current_open_execution = True
                current_datas = utils.juejin_api.JueJinApi.get_codes_open(data_cache.DataCache().min_stocks, fields='symbol,open')
                current_datas = utils.juejin_api.JueJinApi.get_codes_open(data_cache.DataCache().min_stocks,
                                                                          fields='symbol,open')
                # print(f"current_datas=={current_datas}")
                for current_data in current_datas:
                    # print(f"current_data=={current_data}")
@@ -160,8 +165,13 @@
__current_high_and_low_dict = {}
# 生成所有个股的最高价、最低价 字典
#
def get_all_stocks_current_high_and_low(current_infos):
    """
    生成所有个股的最高价、最低价 字典
    :param current_infos:
    :return:
    """
    # 获取当前时间
    now_time = tool.get_now_time_str()
    # 如果当前时间大于09:25:06才运行最高价和最低价的运算
@@ -174,9 +184,9 @@
            # print(f"current_info=={current_infos}")
            for current_info in current_infos:
                symbol = basic_methods.format_stock_symbol(current_info[0])  #股票代码
                pre_close = current_info[1]        # 昨日收盘价
                current_price = current_info[2]   # 最新价
                current_quotes_buy_1_price = current_info[5][0][0]   #买一价格
                pre_close = current_info[1]  # 昨日收盘价
                current_price = current_info[2]  # 最新价
                current_quotes_buy_1_price = current_info[5][0][0]  #买一价格
                price_tracker = __current_high_or_low_dict.get(symbol)
                if not price_tracker:
                    # 赋初值
@@ -186,21 +196,21 @@
                # print(f"current_price>>>>>>==={current_price}")
                if current_price is not None:
                    # 为避免L1数据中最新价偶发为0,在最新价为0时使用买一价记录
                    if current_price != 0 or current_price != 0.0:
                    if current_price > 0:
                        # logger.info(
                        #     f"《current_price 不为空 也不为0.0 也不为0 当日当时最新价:{current_price}》")
                        get_current_high_or_low.set_current_price(current_price)
                        data_cache.all_stocks_current_high_and_low[symbol] = {
                            'current_high': get_current_high_or_low.current_high,
                            'current_low': get_current_high_or_low.current_low}
                    elif current_quotes_buy_1_price != 0 or current_quotes_buy_1_price != 0.0:
                    elif current_quotes_buy_1_price > 0:
                        # logger.info(
                        #     f"代码:{symbol}::《current_price 未获取成功 或 值为0.0 零食采用买一价作为最新价,买一价:{current_quotes_buy_1_price}》 ")
                        get_current_high_or_low.set_current_price(current_quotes_buy_1_price)
                        data_cache.all_stocks_current_high_and_low[symbol] = {
                            'current_high': get_current_high_or_low.current_high,
                            'current_low': get_current_high_or_low.current_low}
                    elif pre_close != 0 or pre_close != 0.0:
                    elif pre_close > 0:
                        logger.info(
                            f"最新价和买一价获取失败或有误,获取到的当日当时最新价:{current_price},买一价:{current_quotes_buy_1_price}》,临时性采用昨收价作为最新价,昨收价:{pre_close}")
                        get_current_high_or_low.set_current_price(pre_close)
@@ -303,9 +313,25 @@
# 把current_infos灌入相应的线程
def set_current_info(current_infos):
    @dask.delayed
    def process_current_infos(current_info_list):
        for current_info in current_info_list:
            try:
                if current_info is not None:
                    strategic_thread_manager(current_info)
            except Exception as error:
                logging.exception(error)
                # print("异常:", current_info)
                logger_debug.exception(error)
                logger_debug.error(f"L1处理出错:{current_info}")
    @dask.delayed
    def batch_process_current_infos(fs):
        return fs
    logging.info(f"set_current_info进入")
    now_start = time.time()
    try:
        now_start = time.time()
        now_time = tool.get_now_time_str()
        if len(current_infos) == 0 and now_time > data_cache.L1_DATA_START_TIME:
            print(f"9:15后 l1数据为空=l1_data_current_infos===={current_infos}")
@@ -314,13 +340,17 @@
        #         print(f"i===={i}")
        get_all_stocks_current_open(current_infos)
        get_all_stocks_current_high_and_low(current_infos)
        for current_info in current_infos:
            try:
                if current_info is not None:
                    strategic_thread_manager(current_info)
            except Exception as error:
                logging.exception(error)
                # print("异常:", current_info)
        if current_infos:
            # 分批处理数据
            ds = []
            total_count = len(current_infos)
            page = 5
            page_size = total_count // page + 1
            for p in range(page):
                temp_list = current_infos[p * page_size:(p + 1) * page_size]
                ds.append(process_current_infos(temp_list))
            dask_result = batch_process_current_infos(ds)
            dask_result.compute()
        now_end: float = time.time()
        start_to_end = now_end - now_start
        print(f"运行中=={round(start_to_end, 2)} 秒")
@@ -328,8 +358,7 @@
    except Exception as error:
        logging.exception(error)
    finally:
        pass
        async_log_util.info(logger_debug, f"L1处理时间:{time.time() - now_start}")
# 仅仅用于测试数据进入策略后的数据情况
# get_current_info()