Administrator
2025-08-18 ae8d76a456b64c1c6c4ebf11b6ec33b7df217b1a
third_data/history_k_data_util.py
@@ -4,14 +4,19 @@
import datetime
import decimal
import json
import time
import requests
import constant
from db.redis_manager import RedisUtils
from utils import tool
from db import redis_manager
import gm.api as gmapi
from db.redis_manager_delegate import RedisUtils
from log_module.log import logger_request_api, logger_debug
from third_data import hx_qc_value_util
from utils import tool, middle_api_protocol
from db import redis_manager_delegate as redis_manager
# import gm.api as gmapi
class JueJinHttpApi:
@@ -25,25 +30,39 @@
                return datetime.datetime.fromisoformat(val)
            return val
        url = f'{cls.__BASE_URL}{path_str}'
        # 发送POST请求
        response = requests.post(url, json=data_json)
        result = response.text
        resultJson = json.loads(result)
        if resultJson['code'] == 0:
            data = resultJson['data']
            if type(data) == list:
                for d in data:
        DELEGATE = True
        fdata = None
        if DELEGATE:
            fdata = middle_api_protocol.load_juejin(path_str, data_json)
            __start_time = time.time()
            try:
                fdata = middle_api_protocol.request(fdata)
            finally:
                __use_time = time.time() - __start_time
                if __use_time > 5:
                    logger_request_api.info(f"掘金API请求时间:{path_str}-{int(__use_time)}")
        else:
            url = f'{cls.__BASE_URL}{path_str}'
            # 发送POST请求
            response = requests.post(url, json=data_json)
            result = response.text
            resultJson = json.loads(result)
            if resultJson['code'] == 0:
                fdata = resultJson['data']
        if fdata:
            if type(fdata) == list:
                for d in fdata:
                    if type(d) != dict:
                        continue
                    for k in d:
                        d[k] = deformat_date(d[k])
            elif type(data) == dict:
                for k in data:
                    data[k] = deformat_date(data[k])
            return data
        return None
            elif type(fdata) == dict:
                for k in fdata:
                    fdata[k] = deformat_date(fdata[k])
            return fdata
        else:
            return None
    @classmethod
    def get_instruments(cls, symbols, fields):
@@ -63,6 +82,11 @@
        return cls.__request("get_instruments",
                             {"exchanges": exchanges, "sec_types": sec_types, "skip_suspended": skip_suspended,
                              "skip_st": skip_st, "fields": fields})
    @classmethod
    def get_history_instruments(cls, symbols, start_date, end_date, fields):
        return cls.__request("get_history_instruments",
                             {"symbols": symbols, "start_date": start_date, "end_date": end_date, "fields": fields})
    @classmethod
    def get_previous_trading_date(cls, exchange, date):
@@ -87,20 +111,19 @@
        redis = cls.__redisManager.getRedis()
        try:
            account_id = RedisUtils.get(redis, "juejin-account-id", auto_free=False)
            strategy_id =RedisUtils.get(redis, "juejin-strategy-id", auto_free=False)
            strategy_id = RedisUtils.get(redis, "juejin-strategy-id", auto_free=False)
            token = RedisUtils.get(redis, "juejin-token", auto_free=False)
            return account_id, strategy_id, token
        finally:
            RedisUtils.realse(redis)
    @classmethod
    def get_juejin_code_list_with_prefix(cls, codes):
        list = []
        for d in codes:
            if d[0:2] == '00':
            if tool.is_sz_code(d):
                list.append("SZSE.{}".format(d))
            elif d[0:2] == '60':
            elif tool.is_sh_code(d):
                list.append("SHSE.{}".format(d))
        return list
@@ -131,7 +154,7 @@
            return results
    @classmethod
    def get_gp_current_info(cls, codes):
    def get_gp_current_info(cls, codes, fields=None):
        if not codes:
            return []
        symbols = cls.get_juejin_code_list_with_prefix(codes)
@@ -141,29 +164,43 @@
            data = gmapi.current(symbols=",".join(symbols))
            return data
        else:
            data = JueJinHttpApi.current(symbols=",".join(symbols), fields='')
            data = JueJinHttpApi.current(symbols=",".join(symbols), fields=fields)
            return data
        # 返回指定日期的上个交易日
    # 获取交易所的代码
    @classmethod
    def get_exchanges_codes(cls, exchanges):
    def get_exchanges_codes(cls, exchanges, skip_suspended=True, skip_st=True, fields=None):
        if constant.JUEJIN_LOCAL_API:
            account_id, s_id, token = cls.getJueJinAccountInfo()
            gmapi.set_token(token)
            return gmapi.get_instruments(exchanges=exchanges, sec_types=[1], skip_suspended=True, skip_st=True,
            return gmapi.get_instruments(exchanges=exchanges, sec_types=[1], skip_suspended=skip_suspended,
                                         skip_st=skip_st,
                                         fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,is_suspended,pre_close")
        else:
            return JueJinHttpApi.get_exchanges_codes(exchanges=exchanges, sec_types=[1], skip_suspended=True,
                                                     skip_st=True,
                                                     fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,is_suspended,pre_close")
            if not fields:
                fields = "symbol,sec_type,sec_id,sec_name,listed_date,sec_level," "is_suspended,pre_close"
            return JueJinHttpApi.get_exchanges_codes(exchanges=exchanges, sec_types=[1], skip_suspended=skip_suspended,
                                                     skip_st=skip_st,
                                                     fields=fields)
    @classmethod
    def get_history_instruments(cls, symbols, start_date, end_date, fields=None):
        if constant.JUEJIN_LOCAL_API:
            account_id, s_id, token = cls.getJueJinAccountInfo()
            gmapi.set_token(token)
            return gmapi.get_history_instruments(symbols=symbols, start_date=start_date, end_date=end_date,
                                                 fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,is_suspended,pre_close")
        else:
            return JueJinHttpApi.get_history_instruments(symbols, start_date, end_date, fields)
    @classmethod
    def get_previous_trading_date(cls, date):
        if constant.JUEJIN_LOCAL_API:
            account_id, s_id, token = cls.getJueJinAccountInfo()
            gmapi.set_token(token)
            return gmapi.get_previous_trading_date("SHSE", date)
            # account_id, s_id, token = cls.getJueJinAccountInfo()
            # gmapi.set_token(token)
            # return gmapi.get_previous_trading_date("SHSE", date)
            pass
        else:
            return JueJinHttpApi.get_previous_trading_date("SHSE", date)
@@ -188,14 +225,23 @@
class HistoryKDatasUtils(object):
    __previous_trading_date_cache = {}
    __latest_trading_date_cache = {}
    @classmethod
    def get_gp_latest_info(cls, codes, fields=None):
        return JueJinApi.get_gp_latest_info(codes, fields)
    @classmethod
    def get_history_tick_n(cls, code, count, fields=None):
        return JueJinApi.get_history_tick_n(code, count, fields)
    def get_history_tick_n(cls, code, count, fields=None, juejin=False):
        # return JueJinApi.get_history_tick_n(code, count, fields)
        if constant.is_windows() or juejin:
            return JueJinApi.get_history_tick_n(code, count, fields)
        else:
            try:
                return hx_qc_value_util.get_history_k_bars(code, count)
            except Exception as e:
                logger_debug.exception(e)
    @classmethod
    def get_gp_current_info(cls, codes):
@@ -204,16 +250,65 @@
    # 返回指定日期的上个交易日
    @classmethod
    def get_previous_trading_date(cls, date):
        return JueJinApi.get_previous_trading_date(date)
        if constant.is_windows():
            return JueJinApi.get_previous_trading_date(date)
        else:
            try:
                return hx_qc_value_util.get_previous_trading_date(date)
            except:
                return JueJinApi.get_previous_trading_date(date)
    @classmethod
    def get_previous_trading_date_cache(cls, date):
        if date in cls.__previous_trading_date_cache:
            return cls.__previous_trading_date_cache.get(date)
        fdata = cls.get_previous_trading_date(date)
        if fdata:
            cls.__previous_trading_date_cache[date] = fdata
        return fdata
    # 获取最近的交易日
    @classmethod
    def get_latest_trading_date(cls, day_count):
        """
       获取最近几个交易日(不包含今天)
        @param day_count:
        @return:
        """
        now_day = tool.get_now_date_str()
        days = []
        for i in range(day_count):
            pday = cls.get_previous_trading_date_cache(now_day)
            days.append(pday)
            now_day = pday
        return days
    @classmethod
    def get_latest_trading_date_cache(cls, day_count):
        """
        获取最近几个交易日(不包含今天)
        @param day_count:
        @return:
        """
        key = f"{tool.get_now_date_str()}-{day_count}"
        if key in cls.__latest_trading_date_cache:
            return cls.__latest_trading_date_cache[key]
        days = cls.get_latest_trading_date(day_count)
        cls.__latest_trading_date_cache[key] = days
        return days
    # 返回指定日期的下个交易日
    @classmethod
    def get_next_trading_date(cls, date):
        return JueJinApi.get_next_trading_date(date)
        # return JueJinApi.get_next_trading_date(date)
        if constant.is_windows():
            return JueJinApi.get_next_trading_date(date)
        else:
            return hx_qc_value_util.get_next_trading_date(date)
    @classmethod
    def get_trading_dates(cls, start_date, end_date):
        return JueJinApi.get_trading_dates(start_date, end_date)
        return hx_qc_value_util.get_trade_calendar(start_date, end_date)
    @classmethod
    def get_now_price(cls, codes):
@@ -266,19 +361,33 @@
            results[code] = code_name
        return results
    @classmethod
    def get_st_codes(cls):
        """
        获取st代码
        @return:
        """
        results = JueJinApi.get_exchanges_codes("SHSE,SZSE", skip_st=False, fields="sec_id,sec_level")
        codes = [x["sec_id"] for x in results if x["sec_level"] != 1]
        return codes
def get_k_bar_dead_date():
    """
    获取K线的截止日期
    @return:
    """
    dates = HistoryKDatasUtils.get_latest_trading_date_cache(5)
    latest_trading_date = None
    if dates:
        latest_trading_date = dates[0]
    if latest_trading_date is None:
        raise Exception("没有获取到上一个交易日的日期")
    # 4点之后改为获取今日的数据
    if tool.get_now_time_as_int() > 160000:
        latest_trading_date = tool.get_now_date_str()
    return latest_trading_date
if __name__ == "__main__":
    constant.JUEJIN_LOCAL_API = False
    list_ = JueJinApi.get_exchanges_codes(["SHSE","SZSE"])
    fdatas = []
    for d in list_:
        if d["sec_id"].find("60") != 0 and d["sec_id"].find("00") != 0:
            continue
        if d["sec_level"] != 1:
            continue
        if d["pre_close"] * 1.1 > 40:
            continue
        if (d["listed_date"] + datetime.timedelta(days=100)).timestamp() > datetime.datetime.now().timestamp():
            continue
        fdatas.append(d)
    print(len(fdatas))
    pass