Administrator
2024-12-31 c405d26042cd4a9ad7195245ff8816289d1a5a6a
华鑫增值接口
1个文件已添加
163 ■■■■■ 已修改文件
third_data/hx_qc_value_util.py 163 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/hx_qc_value_util.py
New file
@@ -0,0 +1,163 @@
"""
华鑫增值服务API
"""
import multiprocessing
import random
import threading
import time
from huaxin_client import l1_api_client
from utils import tool
__response_data = {}
TIMEOUT = 3
def __set_response_data(request_id, response_data):
    __response_data[request_id] = response_data
def __read_response(request_id, blocking=True, timeout=TIMEOUT):
    if blocking:
        start_time = time.time()
        try:
            while True:
                time.sleep(0.005)
                if request_id in __response_data:
                    # 获取到了响应内容
                    result = __response_data.pop(request_id)
                    return result
                if time.time() - start_time > timeout:
                    # 读取内容超时才会释放
                    raise Exception(f"读取内容超时: request_id={request_id}")
        finally:
            pass
    return None
def __set_response(response_queue: multiprocessing.Queue):
    """
    读取结果队列
    @param response_queue:
    @return:
    """
    while True:
        try:
            val = response_queue.get()
            request_id = val['request_id']
            response_data = val['data']
            __response_data[request_id] = response_data
        except:
            pass
def __get_request_id(type):
    return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
def __base_request(type, data):
    request_id = __get_request_id(type)
    request_queue.put_nowait({"type": type, "request_id": request_id, "data": data})
    return request_id
def get_history_k_bars(code, count, end_date=tool.get_now_date_str()):
    """
    获取历史K线
    @param code:
    @param count:
    @param end_date:
    @return:
    """
    load_latest_trade_calendar()
    _count = -1
    start_date = None
    for i in range(len(__trade_calendar_list) - 1, -1, -1):
        if _count == -1:
            if int(end_date.replace("-", "")) >= int(__trade_calendar_list[i].replace("-", "")):
                # 获取起始位置
                _count = 1
                start_date = __trade_calendar_list[i]
        else:
            _count += 1
            start_date = __trade_calendar_list[i]
        if _count >= count:
            break
    request_id = __base_request("get_history_k_bars", {"start_date": start_date, "end_date": end_date, "code": code})
    return __read_response(request_id)
def get_trade_calendar(start_date, end_date):
    request_id = __base_request("get_trade_calendar", {"start_date": start_date, "end_date": end_date})
    results = __read_response(request_id)
    return [f"{x[:4]}-{x[4:6]}-{x[6:8]}" for x in results]
# 缓存交易日期:格式:{"日期":(上一个交易日, 下一个交易日),}
__trade_calendar_list = []
__trade_calendar_dict = {}
def load_latest_trade_calendar():
    """
    加载最近的交易日历
    @return:
    """
    if __trade_calendar_list or __trade_calendar_dict:
        return
    end_date = tool.get_now_date_str()
    days = get_trade_calendar(tool.date_sub(end_date, 365), end_date)
    for i in range(0, len(days)):
        if 0 < i < len(days) - 1:
            __trade_calendar_dict[days[i]] = (days[i - 1], days[i + 1])
        elif i == 0:
            __trade_calendar_dict[days[i]] = (None, days[i + 1])
        else:
            __trade_calendar_dict[days[i]] = (days[i - 1], None)
    __trade_calendar_list.clear()
    __trade_calendar_list.extend(days)
def get_previous_trading_date(day):
    """
    获取上一个交易日
    @param day:
    @return:
    """
    load_latest_trade_calendar()
    if day in __trade_calendar_dict:
        return __trade_calendar_dict.get(day)[0]
    return None
def get_next_trading_date(day):
    """
    获取下一个交易日
    @param day:
    @return:
    """
    load_latest_trade_calendar()
    if day in __trade_calendar_dict:
        return __trade_calendar_dict.get(day)[1]
    return None
def run():
    global request_queue
    request_queue, response_queue = multiprocessing.Queue(), multiprocessing.Queue()
    # 启动增值服务进程
    process = multiprocessing.Process(target=l1_api_client.run, args=(request_queue, response_queue,), daemon=True)
    process.start()
    __set_response(response_queue)
def __test():
    time.sleep(5)
    print( get_history_k_bars("000333", count=150))
    print(get_previous_trading_date("2024-12-31"))
    print(get_next_trading_date("2024-12-30"))
if __name__ == '__main__':
    threading.Thread(target=__test, daemon=True).start()
    run()