""" 看盘端消息管理器 """ import json import queue import threading import time from code_attribute import gpcode_manager from log_module import log_export from utils import middle_api_protocol, tool CLIENT_IDS = ["zjb", "hxh"] __temp_msg_queue = queue.Queue(maxsize=1000) class MsgQueueManager: __queue_dict = {} # 添加消息,2s内有效 def add_msg(self, client_id, msg): if client_id not in self.__queue_dict: self.__queue_dict[client_id] = queue.Queue(maxsize=1000) self.__queue_dict[client_id].put_nowait((time.time() + 2, msg)) # 读取消息 def read_msg(self, client_id): if client_id in self.__queue_dict: try: return self.__queue_dict[client_id].get(block=False) except: pass return None # 运行采集器 __MsgQueueManager = MsgQueueManager() # 添加消息 @tool.async_call def add_msg(code, msg): # 根据代码获取名称 name = gpcode_manager.get_code_name(code) msg = f"【{name}({code})】{msg}" fdata = middle_api_protocol.load_kp_msg(msg) middle_api_protocol.request(fdata) # # __temp_msg_queue.put_nowait(msg) # # 添加到日志 # logger_kp_msg.info(msg) def read_msg(client_id): msg_data = __MsgQueueManager.read_msg(client_id) if not msg_data: return None expire_time = msg_data[1] msg = msg_data[2] if expire_time < time.time(): # 过期 return None return msg # 读取本地消息列表 def list_msg_from_local(): return log_export.get_kp_msg_list() # 运行采集 def run_capture(): def capture(): while True: try: msg = __temp_msg_queue.get() if msg: for c in CLIENT_IDS: __MsgQueueManager.add_msg(c, msg) except: pass time.sleep(0.01) t1 = threading.Thread(target=lambda: capture()) # 后台运行 t1.setDaemon(True) t1.start() if __name__ == "__main__": for i in range(0, 10): add_msg("600839", "买入成功") run_capture() input()