Administrator
2024-01-19 a00da3062c6c825b585f82275823ac45cdeb6502
third_data/history_k_data_util.py
@@ -4,13 +4,18 @@
import datetime
import decimal
import json
import time
import requests
import constant
from utils import tool
from db import redis_manager
import gm.api as gmapi
from db.redis_manager_delegate import RedisUtils
from log_module.log import logger_request_api
from utils import tool, middle_api_protocol
from db import redis_manager_delegate as redis_manager
# import gm.api as gmapi
class JueJinHttpApi:
@@ -24,25 +29,39 @@
                return datetime.datetime.fromisoformat(val)
            return val
        url = f'{cls.__BASE_URL}{path_str}'
        # 发送POST请求
        response = requests.post(url, json=data_json)
        result = response.text
        resultJson = json.loads(result)
        if resultJson['code'] == 0:
            data = resultJson['data']
            if type(data) == list:
                for d in data:
        DELEGATE = True
        fdata = None
        if DELEGATE:
            fdata = middle_api_protocol.load_juejin(path_str, data_json)
            __start_time = time.time()
            try:
                fdata = middle_api_protocol.request(fdata)
            finally:
                __use_time = time.time() - __start_time
                if __use_time > 5:
                    logger_request_api.info(f"掘金API请求时间:{path_str}-{int(__use_time)}")
        else:
            url = f'{cls.__BASE_URL}{path_str}'
            # 发送POST请求
            response = requests.post(url, json=data_json)
            result = response.text
            resultJson = json.loads(result)
            if resultJson['code'] == 0:
                fdata = resultJson['data']
        if fdata:
            if type(fdata) == list:
                for d in fdata:
                    if type(d) != dict:
                        continue
                    for k in d:
                        d[k] = deformat_date(d[k])
            elif type(data) == dict:
                for k in data:
                    data[k] = deformat_date(data[k])
            return data
        return None
            elif type(fdata) == dict:
                for k in fdata:
                    fdata[k] = deformat_date(fdata[k])
            return fdata
        else:
            return None
    @classmethod
    def get_instruments(cls, symbols, fields):
@@ -84,10 +103,13 @@
    @classmethod
    def getJueJinAccountInfo(cls):
        redis = cls.__redisManager.getRedis()
        account_id = redis.get("juejin-account-id")
        strategy_id = redis.get("juejin-strategy-id")
        token = redis.get("juejin-token")
        return account_id, strategy_id, token
        try:
            account_id = RedisUtils.get(redis, "juejin-account-id", auto_free=False)
            strategy_id = RedisUtils.get(redis, "juejin-strategy-id", auto_free=False)
            token = RedisUtils.get(redis, "juejin-token", auto_free=False)
            return account_id, strategy_id, token
        finally:
            RedisUtils.realse(redis)
    @classmethod
    def get_juejin_code_list_with_prefix(cls, codes):
@@ -183,6 +205,8 @@
class HistoryKDatasUtils(object):
    __previous_trading_date_cache = {}
    __latest_trading_date_cache = {}
    @classmethod
    def get_gp_latest_info(cls, codes, fields=None):
@@ -200,6 +224,35 @@
    @classmethod
    def get_previous_trading_date(cls, date):
        return JueJinApi.get_previous_trading_date(date)
    @classmethod
    def get_previous_trading_date_cache(cls, date):
        if date in cls.__previous_trading_date_cache:
            return cls.__previous_trading_date_cache.get(date)
        fdata = cls.get_previous_trading_date(date)
        if fdata:
            cls.__previous_trading_date_cache[date] = fdata
        return fdata
    # 获取最近的交易日
    @classmethod
    def get_latest_trading_date(cls, day_count):
        now_day = tool.get_now_date_str()
        days = []
        for i in range(day_count):
            pday = cls.get_previous_trading_date_cache(now_day)
            days.append(pday)
            now_day = pday
        return days
    @classmethod
    def get_latest_trading_date_cache(cls, day_count):
        key = f"{tool.get_now_date_str()}-{day_count}"
        if key in cls.__latest_trading_date_cache:
            return cls.__latest_trading_date_cache[key]
        days = cls.get_latest_trading_date(day_count)
        cls.__latest_trading_date_cache[key] = days
        return days
    # 返回指定日期的下个交易日
    @classmethod
@@ -263,17 +316,5 @@
if __name__ == "__main__":
    constant.JUEJIN_LOCAL_API = False
    list_ = JueJinApi.get_exchanges_codes(["SHSE","SZSE"])
    fdatas = []
    for d in list_:
        if d["sec_id"].find("60") != 0 and d["sec_id"].find("00") != 0:
            continue
        if d["sec_level"] != 1:
            continue
        if d["pre_close"] * 1.1 > 40:
            continue
        if (d["listed_date"] + datetime.timedelta(days=100)).timestamp() > datetime.datetime.now().timestamp():
            continue
        fdatas.append(d)
    print(len(fdatas))
    results = HistoryKDatasUtils.get_codes_limit_rate(list({"000422", "600610"}))
    print(results)