""" 看盘端消息管理器 """ import json import queue import threading import time from code_attribute import gpcode_manager from log_module import log from db.redis_manager import RedisManager from log_module.log import logger_kp_msg CLIENT_IDS = ["zjb", "hxh"] __temp_msg_queue = queue.Queue() class MsgQueueManager: __redisManager = RedisManager(3) def __get_redis(self): return self.__redisManager.getRedis() # 添加消息,2s内有效 def add_msg(self, client_id, msg): self.__get_redis().lpush(f"kp_msg_queue-{client_id}", json.dumps((time.time() + 2, msg))) # 读取消息 def read_msg(self, client_id): data = self.__get_redis().lpop(f"kp_msg_queue-{client_id}") if not data: return None data = json.loads(data) return data # 运行采集器 __MsgQueueManager = MsgQueueManager() # 添加消息 def add_msg(code, msg): # 根据代码获取名称 name = gpcode_manager.get_code_name(code) msg = f"【{name}({code})】{msg}" __temp_msg_queue.put_nowait(msg) # 添加到日志 logger_kp_msg.info(msg) def read_msg(client_id): msg_data = __MsgQueueManager.read_msg(client_id) if not msg_data: return None expire_time = msg_data[0] msg = msg_data[1] if expire_time < time.time(): # 过期 return None return msg # 读取本地消息列表 def list_msg_from_local(): return log.get_kp_msg_list() # 运行采集 def run_capture(): def capture(): while True: try: msg = __temp_msg_queue.get() if msg: for c in CLIENT_IDS: __MsgQueueManager.add_msg(c, msg) except: pass time.sleep(0.01) t1 = threading.Thread(target=lambda: capture()) # 后台运行 t1.setDaemon(True) t1.start() if __name__ == "__main__": for i in range(0, 10): add_msg("600839", "买入成功") run_capture() input()