| | |
| | | |
| | | import constant |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from log_module.log import logger_request_api |
| | | from log_module.log import logger_request_api, logger_debug |
| | | from third_data import hx_qc_value_util |
| | | from utils import tool, middle_api_protocol |
| | | from db import redis_manager_delegate as redis_manager |
| | |
| | | @classmethod |
| | | def get_history_tick_n(cls, code, count, fields=None): |
| | | # return JueJinApi.get_history_tick_n(code, count, fields) |
| | | return hx_qc_value_util.get_history_k_bars(code, count) |
| | | if constant.is_windows(): |
| | | return JueJinApi.get_history_tick_n(code, count, fields) |
| | | else: |
| | | try: |
| | | return hx_qc_value_util.get_history_k_bars(code, count) |
| | | except Exception as e: |
| | | logger_debug.exception(e) |
| | | |
| | | |
| | | @classmethod |
| | | def get_gp_current_info(cls, codes): |
| | |
| | | # 返回指定日期的上个交易日 |
| | | @classmethod |
| | | def get_previous_trading_date(cls, date): |
| | | return hx_qc_value_util.get_previous_trading_date(date) |
| | | if constant.is_windows(): |
| | | return JueJinApi.get_previous_trading_date(date) |
| | | else: |
| | | return hx_qc_value_util.get_previous_trading_date(date) |
| | | |
| | | @classmethod |
| | | def get_previous_trading_date_cache(cls, date): |
| | |
| | | # 获取最近的交易日 |
| | | @classmethod |
| | | def get_latest_trading_date(cls, day_count): |
| | | """ |
| | | 获取最近几个交易日(不包含今天) |
| | | @param day_count: |
| | | @return: |
| | | """ |
| | | now_day = tool.get_now_date_str() |
| | | days = [] |
| | | for i in range(day_count): |
| | |
| | | |
| | | @classmethod |
| | | def get_latest_trading_date_cache(cls, day_count): |
| | | """ |
| | | 获取最近几个交易日(不包含今天) |
| | | @param day_count: |
| | | @return: |
| | | """ |
| | | key = f"{tool.get_now_date_str()}-{day_count}" |
| | | if key in cls.__latest_trading_date_cache: |
| | | return cls.__latest_trading_date_cache[key] |
| | |
| | | return results |
| | | |
| | | |
| | | def get_k_bar_dead_date(): |
| | | """ |
| | | 获取K线的截止日期 |
| | | @return: |
| | | """ |
| | | dates = HistoryKDatasUtils.get_latest_trading_date_cache(5) |
| | | latest_trading_date = None |
| | | if dates: |
| | | latest_trading_date = dates[0] |
| | | if latest_trading_date is None: |
| | | raise Exception("没有获取到上一个交易日的日期") |
| | | # 4点之后改为获取今日的数据 |
| | | if tool.get_now_time_as_int() > 160000: |
| | | latest_trading_date = tool.get_now_date_str() |
| | | return latest_trading_date |
| | | |
| | | if __name__ == "__main__": |
| | | print(HistoryKDatasUtils.get_previous_trading_date("2024-12-31")) |
| | | print(HistoryKDatasUtils.get_history_tick_n("000095", 10)) |
| | | |
| | | |
| | | # now_day = tool.get_now_date_str() |
| | | # results = JueJinApi.get_history_instruments(JueJinApi.get_juejin_code_list_with_prefix(["600265"]), |
| | | # tool.date_sub(now_day, 30), tool.date_sub(now_day, 1)) |