""" 推送消息管理器 """ import json from log_module.log import logger_debug from utils import socket_util TYPE_ORDER_ALMOST_DEAL = "order_almost_deal" # 订单即将成交 TYPE_DELEGATE_QUEUE_CHANGE = "delegate_queue_change" # 委托队列变化 # 格式:{id:(sk,[type1,type2])} class SocketManager: __sockets_dict = {} # 注册socket @classmethod def regirster_socket(cls, sk, types: list): if sk is None: return _id = id(sk) cls.__sockets_dict[_id] = (sk, types) @classmethod def get_sockets(cls, _type): sockets = [] for k in cls.__sockets_dict: if _type in cls.__sockets_dict[k][1]: sockets.append(cls.__sockets_dict[k][0]) return sockets @classmethod def remove_socket(cls, sk): _id = id(sk) if _id in cls.__sockets_dict: cls.__sockets_dict.pop(_id) # 添加消息 def push_msg(msg_type, data=None): fdata = {"type": msg_type} if data: fdata["data"] = data sks = SocketManager.get_sockets(msg_type) logger_debug.info(f"socket对象数量({msg_type}):{len(sks)}") if sks: for sk in sks: try: sk.sendall(socket_util.load_header(json.dumps({"code": 0, "data": fdata}).encode("utf-8"))) except Exception as e: logger_debug.exception(e) try: sk.close() except: pass finally: SocketManager.remove_socket(sk)