"""
|
华鑫增值服务API
|
"""
|
import multiprocessing
|
import random
|
import threading
|
import time
|
|
from huaxin_client import l1_api_client
|
from utils import tool
|
|
__response_data = {}
|
TIMEOUT = 3
|
|
|
def __set_response_data(request_id, response_data):
|
__response_data[request_id] = response_data
|
|
|
def __read_response(request_id, blocking=True, timeout=TIMEOUT):
|
if blocking:
|
start_time = time.time()
|
try:
|
while True:
|
time.sleep(0.005)
|
if request_id in __response_data:
|
# 获取到了响应内容
|
result = __response_data.pop(request_id)
|
return result
|
if time.time() - start_time > timeout:
|
# 读取内容超时才会释放
|
raise Exception(f"读取内容超时: request_id={request_id}")
|
finally:
|
pass
|
return None
|
|
|
def __set_response(response_queue: multiprocessing.Queue):
|
"""
|
读取结果队列
|
@param response_queue:
|
@return:
|
"""
|
while True:
|
try:
|
val = response_queue.get()
|
request_id = val['request_id']
|
response_data = val['data']
|
__response_data[request_id] = response_data
|
except:
|
pass
|
|
|
def __get_request_id(type):
|
return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
|
|
|
def __base_request(type, data):
|
request_id = __get_request_id(type)
|
request_queue.put_nowait({"type": type, "request_id": request_id, "data": data})
|
return request_id
|
|
|
def get_history_k_bars(code, count, end_date=tool.get_now_date_str()):
|
"""
|
获取历史K线
|
@param code:
|
@param count:
|
@param end_date:
|
@return:
|
"""
|
load_latest_trade_calendar()
|
_count = -1
|
start_date = None
|
for i in range(len(__trade_calendar_list) - 1, -1, -1):
|
if _count == -1:
|
if int(end_date.replace("-", "")) >= int(__trade_calendar_list[i].replace("-", "")):
|
# 获取起始位置
|
_count = 1
|
start_date = __trade_calendar_list[i]
|
else:
|
_count += 1
|
start_date = __trade_calendar_list[i]
|
if _count >= count:
|
break
|
request_id = __base_request("get_history_k_bars", {"start_date": start_date, "end_date": end_date, "code": code})
|
return __read_response(request_id)
|
|
|
def get_trade_calendar(start_date, end_date):
|
request_id = __base_request("get_trade_calendar", {"start_date": start_date, "end_date": end_date})
|
results = __read_response(request_id)
|
return [f"{x[:4]}-{x[4:6]}-{x[6:8]}" for x in results]
|
|
|
# 缓存交易日期:格式:{"日期":(上一个交易日, 下一个交易日),}
|
__trade_calendar_list = []
|
__trade_calendar_dict = {}
|
|
|
def load_latest_trade_calendar():
|
"""
|
加载最近的交易日历
|
@return:
|
"""
|
if __trade_calendar_list or __trade_calendar_dict:
|
return
|
end_date = tool.get_now_date_str()
|
days = get_trade_calendar(tool.date_sub(end_date, 365), end_date)
|
for i in range(0, len(days)):
|
if 0 < i < len(days) - 1:
|
__trade_calendar_dict[days[i]] = (days[i - 1], days[i + 1])
|
elif i == 0:
|
__trade_calendar_dict[days[i]] = (None, days[i + 1])
|
else:
|
__trade_calendar_dict[days[i]] = (days[i - 1], None)
|
__trade_calendar_list.clear()
|
__trade_calendar_list.extend(days)
|
|
|
def get_previous_trading_date(day):
|
"""
|
获取上一个交易日
|
@param day:
|
@return:
|
"""
|
load_latest_trade_calendar()
|
if day in __trade_calendar_dict:
|
return __trade_calendar_dict.get(day)[0]
|
return None
|
|
|
def get_next_trading_date(day):
|
"""
|
获取下一个交易日
|
@param day:
|
@return:
|
"""
|
load_latest_trade_calendar()
|
if day in __trade_calendar_dict:
|
return __trade_calendar_dict.get(day)[1]
|
return None
|
|
|
request_queue = None
|
|
|
def run():
|
global request_queue
|
request_queue, response_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024)
|
# 启动增值服务进程
|
process = multiprocessing.Process(target=l1_api_client.run, args=(request_queue, response_queue,), daemon=True)
|
process.start()
|
__set_response(response_queue)
|
|
|
def __test():
|
time.sleep(5)
|
print(get_history_k_bars("000333", count=150))
|
print(get_previous_trading_date("2024-12-31"))
|
print(get_next_trading_date("2024-12-30"))
|
|
|
if __name__ == '__main__':
|
threading.Thread(target=__test, daemon=True).start()
|
run()
|