"""
|
历史K线服务
|
"""
|
import datetime
|
import decimal
|
import json
|
import time
|
|
import constant as constant
|
import requests
|
if constant.is_windows():
|
import gm.api as gmapi
|
|
import constant
|
from db.redis_manager_delegate import RedisUtils
|
from log_module.log import logger_request_api, logger_debug
|
from third_data import hx_qc_value_util
|
from utils import tool, middle_api_protocol
|
from db import redis_manager_delegate as redis_manager
|
|
|
# import gm.api as gmapi
|
|
|
class JueJinHttpApi:
|
__BASE_URL = "http://193.112.35.168:10009/"
|
|
@classmethod
|
def __request(cls, path_str, data_json):
|
def deformat_date(val):
|
if type(val) == str and val.find('T') > -1 and val.find(':') > -1 and val.find(
|
'+') > -1:
|
return datetime.datetime.fromisoformat(val)
|
return val
|
|
DELEGATE = True
|
fdata = None
|
if DELEGATE:
|
fdata = middle_api_protocol.load_juejin(path_str, data_json)
|
__start_time = time.time()
|
try:
|
fdata = middle_api_protocol.request(fdata)
|
finally:
|
__use_time = time.time() - __start_time
|
if __use_time > 5:
|
logger_request_api.info(f"掘金API请求时间:{path_str}-{int(__use_time)}")
|
|
else:
|
url = f'{cls.__BASE_URL}{path_str}'
|
# 发送POST请求
|
response = requests.post(url, json=data_json)
|
result = response.text
|
resultJson = json.loads(result)
|
if resultJson['code'] == 0:
|
fdata = resultJson['data']
|
if fdata:
|
if type(fdata) == list:
|
for d in fdata:
|
if type(d) != dict:
|
continue
|
for k in d:
|
d[k] = deformat_date(d[k])
|
elif type(fdata) == dict:
|
for k in fdata:
|
fdata[k] = deformat_date(fdata[k])
|
return fdata
|
else:
|
return None
|
|
@classmethod
|
def get_instruments(cls, symbols, fields):
|
return cls.__request("get_instruments", {"symbols": symbols, "fields": fields})
|
|
@classmethod
|
def history_n(cls, symbol, frequency, count, adjust, fields):
|
return cls.__request("history_n", {"symbol": symbol, "frequency": frequency, "count": count, "adjust": adjust,
|
"fields": fields})
|
|
@classmethod
|
def current(cls, symbols, fields):
|
return cls.__request("current", {"symbols": symbols, "fields": fields})
|
|
@classmethod
|
def get_exchanges_codes(cls, exchanges, sec_types, skip_suspended, skip_st, fields):
|
return cls.__request("get_instruments",
|
{"exchanges": exchanges, "sec_types": sec_types, "skip_suspended": skip_suspended,
|
"skip_st": skip_st, "fields": fields})
|
|
@classmethod
|
def get_history_instruments(cls, symbols, start_date, end_date, fields):
|
return cls.__request("get_history_instruments",
|
{"symbols": symbols, "start_date": start_date, "end_date": end_date, "fields": fields})
|
|
@classmethod
|
def get_previous_trading_date(cls, exchange, date):
|
return cls.__request("get_previous_trading_date", {"exchange": exchange, "date": date})
|
|
@classmethod
|
def get_next_trading_date(cls, exchange, date):
|
return cls.__request("get_next_trading_date", {"exchange": exchange, "date": date})
|
|
@classmethod
|
def get_trading_dates(cls, exchange, start_date, end_date):
|
return cls.__request("get_trading_dates",
|
{"exchange": exchange, "start_date": start_date, "end_date": end_date})
|
|
|
class JueJinApi:
|
__redisManager = redis_manager.RedisManager(0)
|
|
# 获取掘金参数
|
@classmethod
|
def getJueJinAccountInfo(cls):
|
redis = cls.__redisManager.getRedis()
|
try:
|
account_id = RedisUtils.get(redis, "juejin-account-id", auto_free=False)
|
strategy_id = RedisUtils.get(redis, "juejin-strategy-id", auto_free=False)
|
token = RedisUtils.get(redis, "juejin-token", auto_free=False)
|
return account_id, strategy_id, token
|
finally:
|
RedisUtils.realse(redis)
|
|
@classmethod
|
def get_juejin_code_list_with_prefix(cls, codes):
|
list = []
|
for d in codes:
|
if tool.is_sz_code(d):
|
list.append("SZSE.{}".format(d))
|
elif tool.is_sh_code(d):
|
list.append("SHSE.{}".format(d))
|
return list
|
|
@classmethod
|
def get_gp_latest_info(cls, codes, fields=None):
|
if not codes:
|
return []
|
symbols = cls.get_juejin_code_list_with_prefix(codes)
|
if constant.JUEJIN_LOCAL_API:
|
account_id, s_id, token = cls.getJueJinAccountInfo()
|
gmapi.set_token(token)
|
data = gmapi.get_instruments(symbols=",".join(symbols), fields=fields)
|
return data
|
else:
|
return JueJinHttpApi.get_instruments(symbols=",".join(symbols), fields=fields)
|
|
@classmethod
|
def get_history_tick_n(cls, code, count, fields=None):
|
symbols = cls.get_juejin_code_list_with_prefix([code])
|
if constant.JUEJIN_LOCAL_API:
|
account_id, s_id, token = cls.getJueJinAccountInfo()
|
gmapi.set_token(token)
|
# 前除权
|
results = gmapi.history_n(symbol=symbols[0], frequency="1d", count=count, adjust=1, fields=fields)
|
return results
|
else:
|
results = JueJinHttpApi.history_n(symbol=symbols[0], frequency="1d", count=count, adjust=1, fields=fields)
|
return results
|
|
@classmethod
|
def get_gp_current_info(cls, codes, fields=None):
|
if not codes:
|
return []
|
symbols = cls.get_juejin_code_list_with_prefix(codes)
|
if constant.JUEJIN_LOCAL_API:
|
account_id, s_id, token = cls.getJueJinAccountInfo()
|
gmapi.set_token(token)
|
data = gmapi.current(symbols=",".join(symbols))
|
return data
|
else:
|
data = JueJinHttpApi.current(symbols=",".join(symbols), fields=fields)
|
return data
|
# 返回指定日期的上个交易日
|
|
# 获取交易所的代码
|
@classmethod
|
def get_exchanges_codes(cls, exchanges, skip_suspended=True, skip_st=True):
|
if constant.JUEJIN_LOCAL_API:
|
account_id, s_id, token = cls.getJueJinAccountInfo()
|
gmapi.set_token(token)
|
return gmapi.get_instruments(exchanges=exchanges, sec_types=[1], skip_suspended=skip_suspended,
|
skip_st=skip_st,
|
fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,is_suspended,pre_close")
|
else:
|
return JueJinHttpApi.get_exchanges_codes(exchanges=exchanges, sec_types=[1], skip_suspended=skip_suspended,
|
skip_st=skip_st,
|
fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,"
|
"is_suspended,pre_close")
|
|
@classmethod
|
def get_history_instruments(cls, symbols, start_date, end_date, fields=None):
|
if constant.JUEJIN_LOCAL_API:
|
account_id, s_id, token = cls.getJueJinAccountInfo()
|
gmapi.set_token(token)
|
return gmapi.get_history_instruments(symbols=symbols, start_date=start_date, end_date=end_date,
|
fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,is_suspended,pre_close")
|
else:
|
return JueJinHttpApi.get_history_instruments(symbols, start_date, end_date, fields)
|
|
@classmethod
|
def get_previous_trading_date(cls, date):
|
if constant.JUEJIN_LOCAL_API:
|
# account_id, s_id, token = cls.getJueJinAccountInfo()
|
# gmapi.set_token(token)
|
# return gmapi.get_previous_trading_date("SHSE", date)
|
pass
|
else:
|
return JueJinHttpApi.get_previous_trading_date("SHSE", date)
|
|
# 返回指定日期的下个交易日
|
@classmethod
|
def get_next_trading_date(cls, date):
|
if constant.JUEJIN_LOCAL_API:
|
account_id, s_id, token = cls.getJueJinAccountInfo()
|
gmapi.set_token(token)
|
return gmapi.get_next_trading_date("SHSE", date)
|
else:
|
return JueJinHttpApi.get_next_trading_date("SHSE", date)
|
|
@classmethod
|
def get_trading_dates(cls, start_date, end_date):
|
if constant.JUEJIN_LOCAL_API:
|
account_id, s_id, token = cls.getJueJinAccountInfo()
|
gmapi.set_token(token)
|
return gmapi.get_trading_dates("SHSE", start_date, end_date)
|
else:
|
return JueJinHttpApi.get_trading_dates("SHSE", start_date, end_date)
|
|
|
class JueJinLocalApi:
|
|
def __init__(self, strategy_id, token):
|
self.strategy_id = strategy_id
|
self.token = token
|
|
@classmethod
|
def get_juejin_code_list_with_prefix(cls, codes):
|
list = []
|
for d in codes:
|
if tool.is_sz_code(d):
|
list.append("SZSE.{}".format(d))
|
elif tool.is_sh_code(d):
|
list.append("SHSE.{}".format(d))
|
return list
|
|
def get_gp_latest_info(self, codes, fields=None):
|
if not codes:
|
return []
|
symbols = self.get_juejin_code_list_with_prefix(codes)
|
gmapi.set_token(self.token)
|
data = gmapi.get_instruments(symbols=",".join(symbols), fields=fields)
|
return data
|
|
def get_history_tick_n(self, code, count, fields=None, frequency="1d", end_date=""):
|
symbols = self.get_juejin_code_list_with_prefix([code])
|
gmapi.set_token(self.token)
|
# 前除权
|
results = gmapi.history_n(symbol=symbols[0], frequency=frequency, end_time=end_date, count=count, adjust=1,
|
fields=fields)
|
return results
|
|
def history(self, code, start_time, end_time, fields=None, frequency="60s"):
|
symbols = self.get_juejin_code_list_with_prefix([code])
|
gmapi.set_token(self.token)
|
# 前除权
|
results = gmapi.history(symbol=symbols[0], frequency=frequency, start_time=start_time, end_time=end_time,
|
adjust=1,
|
fields=fields)
|
return results
|
|
def get_gp_current_info(self, codes, fields=None):
|
if not codes:
|
return []
|
symbols = self.get_juejin_code_list_with_prefix(codes)
|
gmapi.set_token(self.token)
|
data = gmapi.current(symbols=",".join(symbols))
|
return data
|
|
# 返回指定日期的上个交易日
|
|
# 获取交易所的代码
|
def get_exchanges_codes(self, exchanges, skip_suspended=True, skip_st=True):
|
|
gmapi.set_token(self.token)
|
return gmapi.get_instruments(exchanges=exchanges, sec_types=[1], skip_suspended=skip_suspended,
|
skip_st=skip_st,
|
fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,is_suspended,pre_close")
|
|
def get_history_instruments(self, symbols, start_date, end_date, fields=None):
|
|
gmapi.set_token(self.token)
|
return gmapi.get_history_instruments(symbols=symbols, start_date=start_date, end_date=end_date,
|
fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,is_suspended,pre_close")
|
|
def get_previous_trading_date(self, date):
|
gmapi.set_token(self.token)
|
return gmapi.get_previous_trading_date("SHSE", date)
|
|
# 返回指定日期的下个交易日
|
def get_next_trading_date(self, date):
|
|
gmapi.set_token(self.token)
|
return gmapi.get_next_trading_date("SHSE", date)
|
|
def get_trading_dates(self, start_date, end_date):
|
gmapi.set_token(self.token)
|
return gmapi.get_trading_dates("SHSE", start_date, end_date)
|
|
|
class HistoryKDatasUtils(object):
|
__previous_trading_date_cache = {}
|
__latest_trading_date_cache = {}
|
|
@classmethod
|
def get_gp_latest_info(cls, codes, fields=None):
|
return JueJinApi.get_gp_latest_info(codes, fields)
|
|
@classmethod
|
def get_history_tick_n(cls, code, count, fields=None):
|
# return JueJinApi.get_history_tick_n(code, count, fields)
|
if constant.is_windows():
|
return JueJinApi.get_history_tick_n(code, count, fields)
|
else:
|
try:
|
return hx_qc_value_util.get_history_k_bars(code, count)
|
except Exception as e:
|
logger_debug.exception(e)
|
|
@classmethod
|
def get_gp_current_info(cls, codes):
|
return JueJinApi.get_gp_current_info(codes)
|
|
# 返回指定日期的上个交易日
|
@classmethod
|
def get_previous_trading_date(cls, date):
|
if constant.is_windows():
|
return JueJinApi.get_previous_trading_date(date)
|
else:
|
try:
|
return hx_qc_value_util.get_previous_trading_date(date)
|
except:
|
return JueJinApi.get_previous_trading_date(date)
|
|
@classmethod
|
def get_previous_trading_date_cache(cls, date):
|
if date in cls.__previous_trading_date_cache:
|
return cls.__previous_trading_date_cache.get(date)
|
fdata = cls.get_previous_trading_date(date)
|
if fdata:
|
cls.__previous_trading_date_cache[date] = fdata
|
return fdata
|
|
# 获取最近的交易日
|
@classmethod
|
def get_latest_trading_date(cls, day_count):
|
"""
|
获取最近几个交易日(不包含今天)
|
@param day_count:
|
@return:
|
"""
|
now_day = tool.get_now_date_str()
|
days = []
|
for i in range(day_count):
|
pday = cls.get_previous_trading_date_cache(now_day)
|
days.append(pday)
|
now_day = pday
|
return days
|
|
@classmethod
|
def get_latest_trading_date_cache(cls, day_count):
|
"""
|
获取最近几个交易日(不包含今天)
|
@param day_count:
|
@return:
|
"""
|
key = f"{tool.get_now_date_str()}-{day_count}"
|
if key in cls.__latest_trading_date_cache:
|
return cls.__latest_trading_date_cache[key]
|
days = cls.get_latest_trading_date(day_count)
|
cls.__latest_trading_date_cache[key] = days
|
return days
|
|
# 返回指定日期的下个交易日
|
@classmethod
|
def get_next_trading_date(cls, date):
|
# return JueJinApi.get_next_trading_date(date)
|
if constant.is_windows():
|
return JueJinApi.get_next_trading_date(date)
|
else:
|
return hx_qc_value_util.get_next_trading_date(date)
|
|
@classmethod
|
def get_trading_dates(cls, start_date, end_date):
|
if constant.is_windows():
|
return JueJinApi.get_trading_dates(start_date, end_date)
|
else:
|
return hx_qc_value_util.get_trade_calendar(start_date, end_date)
|
|
@classmethod
|
def get_now_price(cls, codes):
|
data = cls.get_gp_current_info(codes)
|
prices = []
|
for item in data:
|
code = item["symbol"].split(".")[1]
|
price = item["price"]
|
prices.append((code, price))
|
return prices
|
|
# 获取代码的涨幅
|
@classmethod
|
def get_codes_limit_rate(cls, codes):
|
datas = cls.get_gp_latest_info(codes)
|
pre_price_dict = {}
|
for data in datas:
|
code = data["sec_id"]
|
pre_close = tool.to_price(decimal.Decimal(str(data['pre_close'])))
|
pre_price_dict[code] = pre_close
|
|
now_prices = cls.get_now_price(codes)
|
f_results = []
|
for data in now_prices:
|
code = data[0]
|
price = data[1]
|
pre_price = float(pre_price_dict.get(code))
|
rate = round((price - pre_price) * 100 / pre_price, 2)
|
f_results.append((code, rate))
|
f_results.sort(key=lambda tup: tup[1])
|
f_results.reverse()
|
return f_results
|
|
@classmethod
|
def get_lowest_price_rate(cls, code, count):
|
datas = cls.get_history_tick_n(code, count)
|
low_price = datas[0]["close"]
|
for data in datas:
|
if low_price > data["close"]:
|
low_price = data["close"]
|
return (datas[-1]["close"] - low_price) / low_price
|
|
@classmethod
|
def get_gp_codes_names(cls, codes):
|
datas = cls.get_gp_latest_info(codes)
|
results = {}
|
for data in datas:
|
code = data["symbol"].split(".")[1]
|
code_name = data['sec_name']
|
results[code] = code_name
|
return results
|
|
|
def get_k_bar_dead_date():
|
"""
|
获取K线的截止日期
|
@return:
|
"""
|
dates = HistoryKDatasUtils.get_latest_trading_date_cache(5)
|
latest_trading_date = None
|
if dates:
|
latest_trading_date = dates[0]
|
if latest_trading_date is None:
|
raise Exception("没有获取到上一个交易日的日期")
|
# 4点之后改为获取今日的数据
|
if tool.get_now_time_as_int() > 160000:
|
latest_trading_date = tool.get_now_date_str()
|
return latest_trading_date
|
|
|
if __name__ == "__main__":
|
# result = JueJinHttpApi.history_n("SHSE.000300", "1d", 10, 1, "open, high, low, eob")
|
# print(result)
|
jueJinLocalApi = JueJinLocalApi("41c4f5da-2591-11f0-a9c9-f4b5203f67bf", "018db265fa34e241dd6198b7ca507ee0a82ad029")
|
results = jueJinLocalApi.get_history_tick_n("603900", 6000, end_date="2025-04-30 15:00:00", frequency="tick")
|
for r in results:
|
print(r["created_at"])
|
# print(results)
|