import threading import time from utils import tool class ClientSocketManager: # 客户端类型 CLIENT_TYPE_COMMON = "common" CLIENT_TYPE_TRADE = "trade" CLIENT_TYPE_TRADE_SELL = "trade_sell" CLIENT_TYPE_TRADE_LOW_SUCTION = "trade_low_suction" # 可转债客户端 CLIENT_TYPE_TRADE_CB = "trade_cb" socket_client_dict = {} socket_client_lock_dict = {} active_client_dict = {} @classmethod def add_client(cls, _type, rid, sk): if _type in {cls.CLIENT_TYPE_COMMON, cls.CLIENT_TYPE_TRADE, cls.CLIENT_TYPE_TRADE_SELL, cls.CLIENT_TYPE_TRADE_CB, cls.CLIENT_TYPE_TRADE_LOW_SUCTION}: # 交易列表 if _type not in cls.socket_client_dict: cls.socket_client_dict[_type] = [] cls.socket_client_dict[_type].append((rid, sk)) cls.socket_client_lock_dict[rid] = threading.Lock() else: cls.socket_client_dict[_type] = (rid, sk) cls.socket_client_lock_dict[rid] = threading.Lock() @classmethod def acquire_client(cls, _type): if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL or _type == cls.CLIENT_TYPE_TRADE_CB: if _type in cls.socket_client_dict: # 根据排序活跃时间排序 client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[ 0] in cls.active_client_dict else 0, reverse=True) for d in client_list: if d[0] in cls.socket_client_lock_dict: try: if cls.socket_client_lock_dict[d[0]].acquire(blocking=False): return d except threading.TimeoutError: pass else: if _type in cls.socket_client_dict: try: d = cls.socket_client_dict[_type] if d[0] in cls.socket_client_lock_dict: if cls.socket_client_lock_dict[d[0]].acquire(blocking=False): return d except threading.TimeoutError: pass return None @classmethod def release_client(cls, rid): if rid in cls.socket_client_lock_dict: # 释放锁 cls.socket_client_lock_dict[rid].release() @classmethod def del_client(cls, rid): # 删除线程锁 if rid in cls.socket_client_lock_dict: cls.socket_client_lock_dict.pop(rid) # 删除sk for t in cls.socket_client_dict: if type(cls.socket_client_dict[t]) == list: for d in cls.socket_client_dict[t]: if d[0] == rid: cls.socket_client_dict[t].remove(d) try: d[1].close() except: pass break elif type(cls.socket_client_dict[t]) == tuple: if cls.socket_client_dict[t][0] == rid: cls.socket_client_dict.pop(t) try: t[1].close() except: pass break # 心跳信息 @classmethod def heart(cls, rid): cls.active_client_dict[rid] = time.time() @classmethod def del_invalid_clients(cls): # 清除长时间无心跳的客户端通道 for k in cls.active_client_dict.keys(): if time.time() - cls.active_client_dict[k] > 20: # 心跳时间间隔20s以上视为无效 cls.del_client(k) @classmethod def list_client(cls, type_=None): _type = type_ if not _type: _type = cls.CLIENT_TYPE_TRADE client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[0] in cls.active_client_dict else 0, reverse=True) fdata = [] for client in client_list: active_time = cls.active_client_dict.get(client[0]) if active_time is None: active_time = 0 active_time = tool.to_time_str(int(active_time)) fdata.append( (client[0], cls.socket_client_lock_dict[client[0]].locked(), active_time)) return fdata