Administrator
2025-01-09 7de28c45eb5fc393bfed07ffbefb69dc63eeaa4d
third_data/history_k_data_util.py
@@ -10,7 +10,7 @@
import constant
from db.redis_manager_delegate import RedisUtils
from log_module.log import logger_request_api
from log_module.log import logger_request_api, logger_debug
from third_data import hx_qc_value_util
from utils import tool, middle_api_protocol
from db import redis_manager_delegate as redis_manager
@@ -234,7 +234,14 @@
    @classmethod
    def get_history_tick_n(cls, code, count, fields=None):
        # return JueJinApi.get_history_tick_n(code, count, fields)
        return hx_qc_value_util.get_history_k_bars(code, count)
        if constant.is_windows():
            return JueJinApi.get_history_tick_n(code, count, fields)
        else:
            try:
                return hx_qc_value_util.get_history_k_bars(code, count)
            except Exception as e:
                logger_debug.exception(e)
    @classmethod
    def get_gp_current_info(cls, codes):
@@ -243,7 +250,10 @@
    # 返回指定日期的上个交易日
    @classmethod
    def get_previous_trading_date(cls, date):
        return hx_qc_value_util.get_previous_trading_date(date)
        if constant.is_windows():
            return JueJinApi.get_previous_trading_date(date)
        else:
            return hx_qc_value_util.get_previous_trading_date(date)
    @classmethod
    def get_previous_trading_date_cache(cls, date):
@@ -257,6 +267,11 @@
    # 获取最近的交易日
    @classmethod
    def get_latest_trading_date(cls, day_count):
        """
       获取最近几个交易日(不包含今天)
        @param day_count:
        @return:
        """
        now_day = tool.get_now_date_str()
        days = []
        for i in range(day_count):
@@ -267,6 +282,11 @@
    @classmethod
    def get_latest_trading_date_cache(cls, day_count):
        """
        获取最近几个交易日(不包含今天)
        @param day_count:
        @return:
        """
        key = f"{tool.get_now_date_str()}-{day_count}"
        if key in cls.__latest_trading_date_cache:
            return cls.__latest_trading_date_cache[key]
@@ -336,6 +356,22 @@
        return results
def get_k_bar_dead_date():
    """
    获取K线的截止日期
    @return:
    """
    dates = HistoryKDatasUtils.get_latest_trading_date_cache(5)
    latest_trading_date = None
    if dates:
        latest_trading_date = dates[0]
    if latest_trading_date is None:
        raise Exception("没有获取到上一个交易日的日期")
    # 4点之后改为获取今日的数据
    if tool.get_now_time_as_int() > 160000:
        latest_trading_date = tool.get_now_date_str()
    return latest_trading_date
if __name__ == "__main__":
    print(HistoryKDatasUtils.get_previous_trading_date("2024-12-31"))
    print(HistoryKDatasUtils.get_history_tick_n("000095", 10))