import multiprocessing import random import threading import time from local_api import juejin from local_api.code_block_manager import CodeKPLBlockManager from local_api.limit_up_manager import KPLLimitUpDataManager from local_api.log_module import async_log_util from local_api.log_module.log import logger_print from local_api.tick_data_manager import TickQueueManager from local_api.util import event_type, l1_data_api import concurrent.futures from local_api.util.juejin_util import JueJinApi class APICallback: def OnJueJinInited(self): """ 掘金初始化完成 :return: """ pass def OnTick(self, tick, blocks: set, limit_up_list: list): """ Tick回调 :param tick: :param blocks: 代码的板块: 例如{"光伏"} :param limit_up_list: 所有涨停 :return: """ pass def OnTrade(self, trade_info): """ 交易回调:当有成交或挂单时回调 :param trade_info: :return: """ pass def OnLimitUpListUpdated(self, limit_up_list): """ 涨停列表更新回调 :param limit_up_list: :return: """ class __TradeApi: def __init__(self): self.__result_dict = {} self.__not_request_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20) def init(self, command_queue: multiprocessing.Queue, result_queue: multiprocessing.Queue, api_callback: APICallback): self.command_queue = command_queue self.result_queue = result_queue self.api_callback = api_callback @classmethod def __create_request_id(cls, type): return f"request_id-{type}-{int(time.time() * 1000000)}-{random.randint(0, 100000)}" def __read_result(self, request_id, timeout=3.0): """ 读取结果 :param request_id: :param timeout: 超时时间 :return: """ start_time = time.time() # 最大超时时间50s for i in range(10000): if request_id in self.__result_dict: return self.__result_dict.pop(request_id) time.sleep(0.005) if time.time() - start_time > timeout: raise Exception("读取结果超时") raise Exception("未获取到请求结果") def buy(self, code, volume, price, blocking=True): """ 买入下单 :param code: :param volume: :param price: :param blocking: True表示需要等待结果 :return: """ commands = ("buy", {"code": code, "volume": volume, "price": price}, self.__create_request_id("buy")) self.command_queue.put_nowait(commands) if not blocking: return return self.__read_result(commands[2]) def sell(self, code, volume, price, blocking=True): """ 买入下单 :param code: :param volume: :param price: :param blocking: True表示需要等待结果 :return: """ commands = ("sell", {"code": code, "volume": volume, "price": price}, self.__create_request_id("sell")) self.command_queue.put_nowait(commands) if not blocking: return return self.__read_result(commands[2]) def positions(self, blocking=True): """ 买入下单 :param code: :param volume: :param price: :param blocking: True表示需要等待结果 :return: """ commands = ("position", {}, self.__create_request_id("position")) self.command_queue.put_nowait(commands) if not blocking: return return self.__read_result(commands[2], timeout=10) def position_by_symbol(self, symbol, blocking=True): commands = ("get_position_by_symbol", {"symbol": symbol}, self.__create_request_id("get_position_by_symbol")) self.command_queue.put_nowait(commands) if not blocking: return return self.__read_result(commands[2], timeout=10) def get_unfinish_orders(self, blocking=True): """ 获取委托列表 :param blocking: :return: """ commands = ("get_unfinish_orders", {}, self.__create_request_id("get_unfinish_orders")) self.command_queue.put_nowait(commands) if not blocking: return return self.__read_result(commands[2], timeout=10) def cancel_order(self, local_order_id, account_id, blocking=True): """ 撤单 :param local_order_id: :param account_id: :param blocking: :return: """ commands = ("cancel_order", {"local_order_id": local_order_id, "account_id": account_id}, self.__create_request_id("cancel_order")) self.command_queue.put_nowait(commands) if not blocking: return return self.__read_result(commands[2], timeout=10) def get_symbol_names(self, symbols): """ 获取名称 :param symbols: :return: """ results = JueJinApi().get_gp_latest_info(symbols, "symbol,sec_name") return {x["symbol"]: x["sec_name"] for x in results} def get_execution_reports(self, blocking=True): commands = ("get_execution_reports", {}, self.__create_request_id("cancel_order")) self.command_queue.put_nowait(commands) if not blocking: return return self.__read_result(commands[2], timeout=10) def start_read_results(self): while True: try: result = self.result_queue.get() type, request_id, data = result[0], result[1], result[2] logger_print.debug("API读取到结果:{}", result) if type == event_type.EVENT_TYPE_REQUEST: self.__result_dict[request_id] = data elif type == event_type.EVENT_TYPE_JUEJIN_INIT: if self.api_callback: # 采用线程调用 self.__not_request_thread_pool.submit(self.api_callback.OnJueJinInited) except: time.sleep(0.005) trade_api = __TradeApi() target_codes = set() def update_blocks(force=False): """ 更新板块 :param force:是否强制更新 :return: """ CodeKPLBlockManager().request_block(target_codes, force) def get_blocks_count_info(): """ 获取更新到板块的数量 :return: (今日更新到板块的代码数量, 上个交易日的代码数量) """ return CodeKPLBlockManager().get_codes_count_info() # 最新的tick数据 latest_tick_dict = {} def tick_callback(tick): code = tick[0] if code in latest_tick_dict and latest_tick_dict[code][7] == tick[7]: return latest_tick_dict[code] = tick blocks = CodeKPLBlockManager().get_blocks(code) limit_up_info = KPLLimitUpDataManager().get_current_limitup_info() api_callback.OnTick(tick, blocks, limit_up_info[1] if limit_up_info else []) __TickQueueManager = TickQueueManager(60, tick_callback) def __run_tick_tasks(): threading.Thread(target=__TickQueueManager.run(), daemon=True).start() while True: try: datas = l1_data_api.get_current_info() for d in datas: __TickQueueManager.add_tick(d[0], d) except: pass finally: time.sleep(3) def run(jeujin_strategy_id, juejin_token, target_codes_, api_callback_: APICallback): """ 启动API :param jeujin_strategy_id: 掘金策略ID :param juejin_token: TOKEN :param target_codes: 目标代码 :param api_callback: 回调 :return: """ # 设置全局的目标代码 global target_codes, api_callback api_callback = api_callback_ target_codes.clear() target_codes |= set(target_codes_) # 初始化掘金API的参数 JueJinApi.init(jeujin_strategy_id, juejin_token) command_queue, result_queue = multiprocessing.Queue(), multiprocessing.Queue() trade_api.init(command_queue, result_queue, api_callback) threading.Thread(target=trade_api.start_read_results, daemon=True).start() # 启动异步日志 threading.Thread(target=async_log_util.run_sync, daemon=True).start() # 运行tick threading.Thread(target=__run_tick_tasks, daemon=True).start() # 运行掘金策略 # 开启新进程访问 jueJinProcess = multiprocessing.Process(target=juejin.start_run, args=(jeujin_strategy_id, juejin_token, command_queue, result_queue,)) jueJinProcess.start() # 运行异步日志 # threading.Thread(target=lambda:, daemon=True).start()