""" 华鑫增值服务API """ import multiprocessing import random import threading import time from huaxin_client import l1_api_client from log_module.log import logger_debug from strategy import data_cache from utils import tool __response_data = {} TIMEOUT = 3 def __set_response_data(request_id, response_data): __response_data[request_id] = response_data def __read_response(request_id, blocking=True, timeout=TIMEOUT): if blocking: start_time = time.time() try: while True: time.sleep(0.005) if request_id in __response_data: # 获取到了响应内容 result = __response_data.pop(request_id) return result if time.time() - start_time > timeout: # 读取内容超时才会释放 raise Exception(f"读取内容超时: request_id={request_id}") finally: pass return None def __set_response(response_queue: multiprocessing.Queue): """ 读取结果队列 @param response_queue: @return: """ while True: try: val = response_queue.get() request_id = val['request_id'] response_data = val['data'] __response_data[request_id] = response_data except: pass def __get_request_id(type): return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}" def __base_request(type, data): request_id = __get_request_id(type) request_queue.put_nowait({"type": type, "request_id": request_id, "data": data}) return request_id def get_history_k_bars(code, count, end_date=tool.get_now_date_str()): """ 获取历史K线 @param code: @param count: @param end_date: @return: """ if not end_date: end_date = tool.get_now_date_str() load_latest_trade_calendar() _count = -1 start_date = None for i in range(len(__trade_calendar_list) - 1, -1, -1): if _count == -1: if int(end_date.replace("-", "")) >= int(__trade_calendar_list[i].replace("-", "")): # 获取起始位置 _count = 1 start_date = __trade_calendar_list[i] else: _count += 1 start_date = __trade_calendar_list[i] if _count >= count: break request_id = __base_request("get_history_k_bars", {"start_date": start_date, "end_date": end_date, "code": code}) return __read_response(request_id) def get_trade_calendar(start_date, end_date): request_id = __base_request("get_trade_calendar", {"start_date": start_date, "end_date": end_date}) results = __read_response(request_id) return [f"{x[:4]}-{x[4:6]}-{x[6:8]}" for x in results] # 缓存交易日期:格式:{"日期":(上一个交易日, 下一个交易日),} __trade_calendar_list = [] __trade_calendar_dict = {} def load_latest_trade_calendar(): """ 加载最近的交易日历 @return: """ if __trade_calendar_list or __trade_calendar_dict: return end_date = tool.get_now_date_str() end_date = tool.date_sub(end_date, -30) days = get_trade_calendar(tool.date_sub(end_date, 365), end_date) logger_debug.info(f"华鑫API获取交易日历结果:{days}") for i in range(0, len(days)): if 0 < i < len(days) - 1: __trade_calendar_dict[days[i]] = (days[i - 1], days[i + 1]) elif i == 0: __trade_calendar_dict[days[i]] = (None, days[i + 1]) else: __trade_calendar_dict[days[i]] = (days[i - 1], None) __trade_calendar_list.clear() __trade_calendar_list.extend(days) def get_previous_trading_date(day): """ 获取上一个交易日 @param day: @return: """ load_latest_trade_calendar() if day in __trade_calendar_dict: return __trade_calendar_dict.get(day)[0] return None def get_next_trading_date(day): """ 获取下一个交易日 @param day: @return: """ load_latest_trade_calendar() if day in __trade_calendar_dict: return __trade_calendar_dict.get(day)[1] return None def __read_callback_data(data_callback_queue: multiprocessing.Queue): """ 读取数据回调 :param data_callback_queue: :return: """ while True: try: data = data_callback_queue.get() type_ = data[0] data = data[1] if type_ == "stock_index_datas": for k in data: """ 值的格式为: { "PreClosePrice":pStockIndexData, "LastPrice": pStockIndexData.LastPrice, "SecurityID": pStockIndexData.SecurityID, "UpdateTime": pStockIndexData.UpdateTime, "Volume": pStockIndexData.Volume, "Turnover": pStockIndexData.Turnover, "LXLastPrice": pStockIndexData.LXLastPrice, } """ d = data[k] data_cache.stock_index_dict[k] = (round(d["LastPrice"], 2), d["Volume"], d["Turnover"], d["PreClosePrice"]) except: pass def run(): global request_queue request_queue, response_queue, data_callback_queue = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue() # 启动增值服务进程 process = multiprocessing.Process(target=l1_api_client.run, args=(request_queue, response_queue, data_callback_queue,), daemon=True) process.start() # 读取数据回调 threading.Thread(target=__read_callback_data, args=(data_callback_queue,), daemon=True).start() __set_response(response_queue) def __test(): time.sleep(5) print(get_history_k_bars("000333", count=150)) print(get_previous_trading_date("2024-12-31")) print(get_next_trading_date("2024-12-30")) if __name__ == '__main__': threading.Thread(target=__test, daemon=True).start() run()