"""
|
华鑫增值服务API
|
"""
|
import multiprocessing
|
import random
|
import threading
|
import time
|
|
from huaxin_client import l1_api_client
|
from log_module.log import logger_debug
|
from strategy import data_cache
|
from utils import tool
|
|
__response_data = {}
|
TIMEOUT = 3
|
|
|
def __set_response_data(request_id, response_data):
|
__response_data[request_id] = response_data
|
|
|
def __read_response(request_id, blocking=True, timeout=TIMEOUT):
|
if blocking:
|
start_time = time.time()
|
try:
|
while True:
|
time.sleep(0.005)
|
if request_id in __response_data:
|
# 获取到了响应内容
|
result = __response_data.pop(request_id)
|
return result
|
if time.time() - start_time > timeout:
|
# 读取内容超时才会释放
|
raise Exception(f"读取内容超时: request_id={request_id}")
|
finally:
|
pass
|
return None
|
|
|
def __set_response(response_queue: multiprocessing.Queue):
|
"""
|
读取结果队列
|
@param response_queue:
|
@return:
|
"""
|
while True:
|
try:
|
val = response_queue.get()
|
request_id = val['request_id']
|
response_data = val['data']
|
__response_data[request_id] = response_data
|
except:
|
pass
|
|
|
def __get_request_id(type):
|
return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
|
|
|
def __base_request(type, data):
|
request_id = __get_request_id(type)
|
request_queue.put_nowait({"type": type, "request_id": request_id, "data": data})
|
return request_id
|
|
|
def get_history_k_bars(code, count, end_date=tool.get_now_date_str()):
|
"""
|
获取历史K线
|
@param code:
|
@param count:
|
@param end_date:
|
@return:
|
"""
|
if not end_date:
|
end_date = tool.get_now_date_str()
|
load_latest_trade_calendar()
|
_count = -1
|
start_date = None
|
for i in range(len(__trade_calendar_list) - 1, -1, -1):
|
if _count == -1:
|
if int(end_date.replace("-", "")) >= int(__trade_calendar_list[i].replace("-", "")):
|
# 获取起始位置
|
_count = 1
|
start_date = __trade_calendar_list[i]
|
else:
|
_count += 1
|
start_date = __trade_calendar_list[i]
|
if _count >= count:
|
break
|
request_id = __base_request("get_history_k_bars", {"start_date": start_date, "end_date": end_date, "code": code})
|
return __read_response(request_id)
|
|
|
def get_trade_calendar(start_date, end_date):
|
request_id = __base_request("get_trade_calendar", {"start_date": start_date, "end_date": end_date})
|
results = __read_response(request_id)
|
return [f"{x[:4]}-{x[4:6]}-{x[6:8]}" for x in results]
|
|
|
# 缓存交易日期:格式:{"日期":(上一个交易日, 下一个交易日),}
|
__trade_calendar_list = []
|
__trade_calendar_dict = {}
|
|
|
def load_latest_trade_calendar():
|
"""
|
加载最近的交易日历
|
@return:
|
"""
|
if __trade_calendar_list or __trade_calendar_dict:
|
return
|
end_date = tool.get_now_date_str()
|
end_date = tool.date_sub(end_date, -30)
|
days = get_trade_calendar(tool.date_sub(end_date, 365), end_date)
|
logger_debug.info(f"华鑫API获取交易日历结果:{days}")
|
for i in range(0, len(days)):
|
if 0 < i < len(days) - 1:
|
__trade_calendar_dict[days[i]] = (days[i - 1], days[i + 1])
|
elif i == 0:
|
__trade_calendar_dict[days[i]] = (None, days[i + 1])
|
else:
|
__trade_calendar_dict[days[i]] = (days[i - 1], None)
|
__trade_calendar_list.clear()
|
__trade_calendar_list.extend(days)
|
|
|
def get_previous_trading_date(day):
|
"""
|
获取上一个交易日
|
@param day:
|
@return:
|
"""
|
load_latest_trade_calendar()
|
if day in __trade_calendar_dict:
|
return __trade_calendar_dict.get(day)[0]
|
return None
|
|
|
def get_next_trading_date(day):
|
"""
|
获取下一个交易日
|
@param day:
|
@return:
|
"""
|
load_latest_trade_calendar()
|
if day in __trade_calendar_dict:
|
return __trade_calendar_dict.get(day)[1]
|
return None
|
|
|
def __read_callback_data(data_callback_queue: multiprocessing.Queue):
|
"""
|
读取数据回调
|
:param data_callback_queue:
|
:return:
|
"""
|
while True:
|
try:
|
data = data_callback_queue.get()
|
type_ = data[0]
|
data = data[1]
|
if type_ == "stock_index_datas":
|
for k in data:
|
"""
|
值的格式为:
|
{
|
"PreClosePrice":pStockIndexData,
|
"LastPrice": pStockIndexData.LastPrice,
|
"SecurityID": pStockIndexData.SecurityID,
|
"UpdateTime": pStockIndexData.UpdateTime,
|
"Volume": pStockIndexData.Volume,
|
"Turnover": pStockIndexData.Turnover,
|
"LXLastPrice": pStockIndexData.LXLastPrice,
|
}
|
"""
|
d = data[k]
|
data_cache.stock_index_dict[k] = (round(d["LastPrice"], 2), d["Volume"], d["Turnover"], d["PreClosePrice"])
|
except:
|
pass
|
|
|
def run():
|
global request_queue
|
request_queue, response_queue, data_callback_queue = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue()
|
# 启动增值服务进程
|
process = multiprocessing.Process(target=l1_api_client.run,
|
args=(request_queue, response_queue, data_callback_queue,), daemon=True)
|
process.start()
|
|
# 读取数据回调
|
threading.Thread(target=__read_callback_data, args=(data_callback_queue,), daemon=True).start()
|
|
__set_response(response_queue)
|
|
|
def __test():
|
time.sleep(5)
|
print(get_history_k_bars("000333", count=150))
|
print(get_previous_trading_date("2024-12-31"))
|
print(get_next_trading_date("2024-12-30"))
|
|
|
if __name__ == '__main__':
|
threading.Thread(target=__test, daemon=True).start()
|
run()
|