"""
|
推送消息管理器
|
"""
|
import json
|
|
from log import logger_debug
|
from utils import socket_util
|
|
TYPE_ORDER_ALMOST_DEAL = "order_almost_deal" # 订单即将成交
|
|
TYPE_DELEGATE_QUEUE_CHANGE = "delegate_queue_change" # 委托队列变化
|
|
|
# 格式:{id:(sk,[type1,type2])}
|
|
class SocketManager:
|
__sockets_dict = {}
|
|
# 注册socket
|
@classmethod
|
def regirster_socket(cls, sk, types: list):
|
if sk is None:
|
return
|
_id = id(sk)
|
cls.__sockets_dict[_id] = (sk, types)
|
|
@classmethod
|
def get_sockets(cls, _type):
|
sockets = []
|
for k in cls.__sockets_dict:
|
if _type in cls.__sockets_dict[k][1]:
|
sockets.append(cls.__sockets_dict[k][0])
|
return sockets
|
|
@classmethod
|
def remove_socket(cls, sk):
|
_id = id(sk)
|
if _id in cls.__sockets_dict:
|
cls.__sockets_dict.pop(_id)
|
|
|
# 添加消息
|
def __push_msg(msg_type, data=None):
|
fdata = {"type": msg_type}
|
if data:
|
fdata["data"] = data
|
sks = SocketManager.get_sockets(msg_type)
|
logger_debug.info(f"socket对象数量({msg_type}):{len(sks)}")
|
if sks:
|
for sk in sks:
|
try:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0, "data": fdata}).encode("utf-8")))
|
except Exception as e:
|
logger_debug.exception(e)
|
try:
|
sk.close()
|
except:
|
pass
|
finally:
|
SocketManager.remove_socket(sk)
|