import threading
|
import time
|
|
from utils import tool
|
|
|
class ClientSocketManager:
|
# 客户端类型
|
CLIENT_TYPE_COMMON = "common"
|
CLIENT_TYPE_TRADE = "trade"
|
|
socket_client_dict = {}
|
socket_client_lock_dict = {}
|
active_client_dict = {}
|
|
@classmethod
|
def add_client(cls, _type, rid, sk):
|
if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE:
|
# 交易列表
|
if _type not in cls.socket_client_dict:
|
cls.socket_client_dict[_type] = []
|
cls.socket_client_dict[_type].append((rid, sk))
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
else:
|
cls.socket_client_dict[_type] = (rid, sk)
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
|
@classmethod
|
def acquire_client(cls, _type):
|
if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE:
|
if _type in cls.socket_client_dict:
|
# 根据排序活跃时间排序
|
client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
|
0] in cls.active_client_dict else 0,
|
reverse=True)
|
for d in client_list:
|
if d[0] in cls.socket_client_lock_dict:
|
try:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
return d
|
except threading.TimeoutError:
|
pass
|
else:
|
if _type in cls.socket_client_dict:
|
try:
|
d = cls.socket_client_dict[_type]
|
if d[0] in cls.socket_client_lock_dict:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
return d
|
except threading.TimeoutError:
|
pass
|
return None
|
|
@classmethod
|
def release_client(cls, rid):
|
if rid in cls.socket_client_lock_dict:
|
# 释放锁
|
cls.socket_client_lock_dict[rid].release()
|
|
@classmethod
|
def del_client(cls, rid):
|
# 删除线程锁
|
if rid in cls.socket_client_lock_dict:
|
cls.socket_client_lock_dict.pop(rid)
|
# 删除sk
|
for t in cls.socket_client_dict:
|
if type(cls.socket_client_dict[t]) == list:
|
for d in cls.socket_client_dict[t]:
|
if d[0] == rid:
|
cls.socket_client_dict[t].remove(d)
|
try:
|
d[1].close()
|
except:
|
pass
|
break
|
|
elif type(cls.socket_client_dict[t]) == tuple:
|
if cls.socket_client_dict[t][0] == rid:
|
cls.socket_client_dict.pop(t)
|
try:
|
t[1].close()
|
except:
|
pass
|
break
|
|
# 心跳信息
|
@classmethod
|
def heart(cls, rid):
|
cls.active_client_dict[rid] = time.time()
|
|
@classmethod
|
def del_invalid_clients(cls):
|
# 清除长时间无心跳的客户端通道
|
for k in cls.active_client_dict.keys():
|
if time.time() - cls.active_client_dict[k] > 20:
|
# 心跳时间间隔20s以上视为无效
|
cls.del_client(k)
|
|
@classmethod
|
def list_client(cls, type_=None):
|
_type = type_
|
if not _type:
|
_type = cls.CLIENT_TYPE_TRADE
|
client_list = sorted(cls.socket_client_dict[_type],
|
key=lambda x: cls.active_client_dict.get(x[0]) if x[0] in cls.active_client_dict else 0,
|
reverse=True)
|
fdata = []
|
for client in client_list:
|
active_time = cls.active_client_dict.get(client[0])
|
if active_time is None:
|
active_time = 0
|
active_time = tool.to_time_str(int(active_time))
|
fdata.append(
|
(client[0], cls.socket_client_lock_dict[client[0]].locked(), active_time))
|
return fdata
|