""" 历史K线服务 """ import datetime import decimal import json import time import requests import constant from db.redis_manager_delegate import RedisUtils from log_module.log import logger_request_api, logger_debug from third_data import hx_qc_value_util from utils import tool, middle_api_protocol from db import redis_manager_delegate as redis_manager # import gm.api as gmapi class JueJinHttpApi: __BASE_URL = "http://193.112.35.168:10009/" @classmethod def __request(cls, path_str, data_json): def deformat_date(val): if type(val) == str and val.find('T') > -1 and val.find(':') > -1 and val.find( '+') > -1: return datetime.datetime.fromisoformat(val) return val DELEGATE = True fdata = None if DELEGATE: fdata = middle_api_protocol.load_juejin(path_str, data_json) __start_time = time.time() try: fdata = middle_api_protocol.request(fdata) finally: __use_time = time.time() - __start_time if __use_time > 5: logger_request_api.info(f"掘金API请求时间:{path_str}-{int(__use_time)}") else: url = f'{cls.__BASE_URL}{path_str}' # 发送POST请求 response = requests.post(url, json=data_json) result = response.text resultJson = json.loads(result) if resultJson['code'] == 0: fdata = resultJson['data'] if fdata: if type(fdata) == list: for d in fdata: if type(d) != dict: continue for k in d: d[k] = deformat_date(d[k]) elif type(fdata) == dict: for k in fdata: fdata[k] = deformat_date(fdata[k]) return fdata else: return None @classmethod def get_instruments(cls, symbols, fields): return cls.__request("get_instruments", {"symbols": symbols, "fields": fields}) @classmethod def history_n(cls, symbol, frequency, count, adjust, fields): return cls.__request("history_n", {"symbol": symbol, "frequency": frequency, "count": count, "adjust": adjust, "fields": fields}) @classmethod def current(cls, symbols, fields): return cls.__request("current", {"symbols": symbols, "fields": fields}) @classmethod def get_exchanges_codes(cls, exchanges, sec_types, skip_suspended, skip_st, fields): return cls.__request("get_instruments", {"exchanges": exchanges, "sec_types": sec_types, "skip_suspended": skip_suspended, "skip_st": skip_st, "fields": fields}) @classmethod def get_history_instruments(cls, symbols, start_date, end_date, fields): return cls.__request("get_history_instruments", {"symbols": symbols, "start_date": start_date, "end_date": end_date, "fields": fields}) @classmethod def get_previous_trading_date(cls, exchange, date): return cls.__request("get_previous_trading_date", {"exchange": exchange, "date": date}) @classmethod def get_next_trading_date(cls, exchange, date): return cls.__request("get_next_trading_date", {"exchange": exchange, "date": date}) @classmethod def get_trading_dates(cls, exchange, start_date, end_date): return cls.__request("get_trading_dates", {"exchange": exchange, "start_date": start_date, "end_date": end_date}) class JueJinApi: __redisManager = redis_manager.RedisManager(0) # 获取掘金参数 @classmethod def getJueJinAccountInfo(cls): redis = cls.__redisManager.getRedis() try: account_id = RedisUtils.get(redis, "juejin-account-id", auto_free=False) strategy_id = RedisUtils.get(redis, "juejin-strategy-id", auto_free=False) token = RedisUtils.get(redis, "juejin-token", auto_free=False) return account_id, strategy_id, token finally: RedisUtils.realse(redis) @classmethod def get_juejin_code_list_with_prefix(cls, codes): list = [] for d in codes: if tool.is_sz_code(d): list.append("SZSE.{}".format(d)) elif tool.is_sh_code(d): list.append("SHSE.{}".format(d)) return list @classmethod def get_gp_latest_info(cls, codes, fields=None): if not codes: return [] symbols = cls.get_juejin_code_list_with_prefix(codes) if constant.JUEJIN_LOCAL_API: account_id, s_id, token = cls.getJueJinAccountInfo() gmapi.set_token(token) data = gmapi.get_instruments(symbols=",".join(symbols), fields=fields) return data else: return JueJinHttpApi.get_instruments(symbols=",".join(symbols), fields=fields) @classmethod def get_history_tick_n(cls, code, count, fields=None): symbols = cls.get_juejin_code_list_with_prefix([code]) if constant.JUEJIN_LOCAL_API: account_id, s_id, token = cls.getJueJinAccountInfo() gmapi.set_token(token) # 前除权 results = gmapi.history_n(symbol=symbols[0], frequency="1d", count=count, adjust=1, fields=fields) return results else: results = JueJinHttpApi.history_n(symbol=symbols[0], frequency="1d", count=count, adjust=1, fields=fields) return results @classmethod def get_gp_current_info(cls, codes, fields=None): if not codes: return [] symbols = cls.get_juejin_code_list_with_prefix(codes) if constant.JUEJIN_LOCAL_API: account_id, s_id, token = cls.getJueJinAccountInfo() gmapi.set_token(token) data = gmapi.current(symbols=",".join(symbols)) return data else: data = JueJinHttpApi.current(symbols=",".join(symbols), fields=fields) return data # 返回指定日期的上个交易日 # 获取交易所的代码 @classmethod def get_exchanges_codes(cls, exchanges, skip_suspended=True, skip_st=True): if constant.JUEJIN_LOCAL_API: account_id, s_id, token = cls.getJueJinAccountInfo() gmapi.set_token(token) return gmapi.get_instruments(exchanges=exchanges, sec_types=[1], skip_suspended=skip_suspended, skip_st=skip_st, fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,is_suspended,pre_close") else: return JueJinHttpApi.get_exchanges_codes(exchanges=exchanges, sec_types=[1], skip_suspended=skip_suspended, skip_st=skip_st, fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level," "is_suspended,pre_close") @classmethod def get_history_instruments(cls, symbols, start_date, end_date, fields=None): if constant.JUEJIN_LOCAL_API: account_id, s_id, token = cls.getJueJinAccountInfo() gmapi.set_token(token) return gmapi.get_history_instruments(symbols=symbols, start_date=start_date, end_date=end_date, fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,is_suspended,pre_close") else: return JueJinHttpApi.get_history_instruments(symbols, start_date, end_date, fields) @classmethod def get_previous_trading_date(cls, date): if constant.JUEJIN_LOCAL_API: # account_id, s_id, token = cls.getJueJinAccountInfo() # gmapi.set_token(token) # return gmapi.get_previous_trading_date("SHSE", date) pass else: return JueJinHttpApi.get_previous_trading_date("SHSE", date) # 返回指定日期的下个交易日 @classmethod def get_next_trading_date(cls, date): if constant.JUEJIN_LOCAL_API: account_id, s_id, token = cls.getJueJinAccountInfo() gmapi.set_token(token) return gmapi.get_next_trading_date("SHSE", date) else: return JueJinHttpApi.get_next_trading_date("SHSE", date) @classmethod def get_trading_dates(cls, start_date, end_date): if constant.JUEJIN_LOCAL_API: account_id, s_id, token = cls.getJueJinAccountInfo() gmapi.set_token(token) return gmapi.get_trading_dates("SHSE", start_date, end_date) else: return JueJinHttpApi.get_trading_dates("SHSE", start_date, end_date) class HistoryKDatasUtils(object): __previous_trading_date_cache = {} __latest_trading_date_cache = {} @classmethod def get_gp_latest_info(cls, codes, fields=None): return JueJinApi.get_gp_latest_info(codes, fields) @classmethod def get_history_tick_n(cls, code, count, fields=None): # return JueJinApi.get_history_tick_n(code, count, fields) try: return hx_qc_value_util.get_history_k_bars(code, count) except Exception as e: logger_debug.exception(e) @classmethod def get_gp_current_info(cls, codes): return JueJinApi.get_gp_current_info(codes) # 返回指定日期的上个交易日 @classmethod def get_previous_trading_date(cls, date): return hx_qc_value_util.get_previous_trading_date(date) @classmethod def get_previous_trading_date_cache(cls, date): if date in cls.__previous_trading_date_cache: return cls.__previous_trading_date_cache.get(date) fdata = cls.get_previous_trading_date(date) if fdata: cls.__previous_trading_date_cache[date] = fdata return fdata # 获取最近的交易日 @classmethod def get_latest_trading_date(cls, day_count): """ 获取最近几个交易日(不包含今天) @param day_count: @return: """ now_day = tool.get_now_date_str() days = [] for i in range(day_count): pday = cls.get_previous_trading_date_cache(now_day) days.append(pday) now_day = pday return days @classmethod def get_latest_trading_date_cache(cls, day_count): """ 获取最近几个交易日(不包含今天) @param day_count: @return: """ key = f"{tool.get_now_date_str()}-{day_count}" if key in cls.__latest_trading_date_cache: return cls.__latest_trading_date_cache[key] days = cls.get_latest_trading_date(day_count) cls.__latest_trading_date_cache[key] = days return days # 返回指定日期的下个交易日 @classmethod def get_next_trading_date(cls, date): # return JueJinApi.get_next_trading_date(date) return hx_qc_value_util.get_next_trading_date(date) @classmethod def get_trading_dates(cls, start_date, end_date): return hx_qc_value_util.get_trade_calendar(start_date, end_date) @classmethod def get_now_price(cls, codes): data = cls.get_gp_current_info(codes) prices = [] for item in data: code = item["symbol"].split(".")[1] price = item["price"] prices.append((code, price)) return prices # 获取代码的涨幅 @classmethod def get_codes_limit_rate(cls, codes): datas = cls.get_gp_latest_info(codes) pre_price_dict = {} for data in datas: code = data["sec_id"] pre_close = tool.to_price(decimal.Decimal(str(data['pre_close']))) pre_price_dict[code] = pre_close now_prices = cls.get_now_price(codes) f_results = [] for data in now_prices: code = data[0] price = data[1] pre_price = float(pre_price_dict.get(code)) rate = round((price - pre_price) * 100 / pre_price, 2) f_results.append((code, rate)) f_results.sort(key=lambda tup: tup[1]) f_results.reverse() return f_results @classmethod def get_lowest_price_rate(cls, code, count): datas = cls.get_history_tick_n(code, count) low_price = datas[0]["close"] for data in datas: if low_price > data["close"]: low_price = data["close"] return (datas[-1]["close"] - low_price) / low_price @classmethod def get_gp_codes_names(cls, codes): datas = cls.get_gp_latest_info(codes) results = {} for data in datas: code = data["symbol"].split(".")[1] code_name = data['sec_name'] results[code] = code_name return results if __name__ == "__main__": print(HistoryKDatasUtils.get_previous_trading_date("2024-12-31")) print(HistoryKDatasUtils.get_history_tick_n("000095", 10)) # now_day = tool.get_now_date_str() # results = JueJinApi.get_history_instruments(JueJinApi.get_juejin_code_list_with_prefix(["600265"]), # tool.date_sub(now_day, 30), tool.date_sub(now_day, 1)) # results = results[-5:] # normal = True # for r in results: # if r["sec_level"] != 1: # normal = False # break # print(normal)