""" 华鑫增值服务API """ import multiprocessing import random import threading import time from huaxin_client import l1_api_client from log_module.log import logger_debug from utils import tool __response_data = {} TIMEOUT = 3 def __set_response_data(request_id, response_data): __response_data[request_id] = response_data def __read_response(request_id, blocking=True, timeout=TIMEOUT): if blocking: start_time = time.time() try: while True: time.sleep(0.005) if request_id in __response_data: # 获取到了响应内容 result = __response_data.pop(request_id) return result if time.time() - start_time > timeout: # 读取内容超时才会释放 raise Exception(f"读取内容超时: request_id={request_id}") finally: pass return None def __set_response(response_queue: multiprocessing.Queue): """ 读取结果队列 @param response_queue: @return: """ while True: try: val = response_queue.get() request_id = val['request_id'] response_data = val['data'] __response_data[request_id] = response_data except: pass def __get_request_id(type): return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}" def __base_request(type, data): request_id = __get_request_id(type) request_queue.put_nowait({"type": type, "request_id": request_id, "data": data}) return request_id def get_history_k_bars(code, count, end_date=tool.get_now_date_str()): """ 获取历史K线 @param code: @param count: @param end_date: @return: """ load_latest_trade_calendar() _count = -1 start_date = None for i in range(len(__trade_calendar_list) - 1, -1, -1): if _count == -1: if int(end_date.replace("-", "")) >= int(__trade_calendar_list[i].replace("-", "")): # 获取起始位置 _count = 1 start_date = __trade_calendar_list[i] else: _count += 1 start_date = __trade_calendar_list[i] if _count >= count: break request_id = __base_request("get_history_k_bars", {"start_date": start_date, "end_date": end_date, "code": code}) return __read_response(request_id) def get_trade_calendar(start_date, end_date): request_id = __base_request("get_trade_calendar", {"start_date": start_date, "end_date": end_date}) results = __read_response(request_id) return [f"{x[:4]}-{x[4:6]}-{x[6:8]}" for x in results] # 缓存交易日期:格式:{"日期":(上一个交易日, 下一个交易日),} __trade_calendar_list = [] __trade_calendar_dict = {} def load_latest_trade_calendar(): """ 加载最近的交易日历 @return: """ if __trade_calendar_list or __trade_calendar_dict: return end_date = tool.get_now_date_str() end_date = tool.date_sub(end_date, -30) days = get_trade_calendar(tool.date_sub(end_date, 365), end_date) for i in range(0, len(days)): if 0 < i < len(days) - 1: __trade_calendar_dict[days[i]] = (days[i - 1], days[i + 1]) elif i == 0: __trade_calendar_dict[days[i]] = (None, days[i + 1]) else: __trade_calendar_dict[days[i]] = (days[i - 1], None) __trade_calendar_list.clear() __trade_calendar_list.extend(days) def get_previous_trading_date(day): """ 获取上一个交易日 @param day: @return: """ load_latest_trade_calendar() if day in __trade_calendar_dict: return __trade_calendar_dict.get(day)[0] return None def get_next_trading_date(day): """ 获取下一个交易日 @param day: @return: """ load_latest_trade_calendar() if day in __trade_calendar_dict: return __trade_calendar_dict.get(day)[1] return None def run(): global request_queue request_queue, response_queue = multiprocessing.Queue(), multiprocessing.Queue() # 启动增值服务进程 process = multiprocessing.Process(target=l1_api_client.run, args=(request_queue, response_queue,), daemon=True) process.start() __set_response(response_queue) def __test(): time.sleep(5) print(get_history_k_bars("000333", count=150)) print(get_previous_trading_date("2024-12-31")) print(get_next_trading_date("2024-12-30")) if __name__ == '__main__': threading.Thread(target=__test, daemon=True).start() run()