admin
2025-06-04 287c506725b2d970f721f80169f83c2418cb0991
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import threading
import time
 
from utils import tool
 
 
class ClientSocketManager:
    # 客户端类型
    CLIENT_TYPE_COMMON = "common"
    CLIENT_TYPE_TRADE = "trade"
    CLIENT_TYPE_TRADE_SELL = "trade_sell"
    CLIENT_TYPE_TRADE_LOW_SUCTION = "trade_low_suction"
    # 可转债客户端
    CLIENT_TYPE_TRADE_CB = "trade_cb"
 
    socket_client_dict = {}
    socket_client_lock_dict = {}
    active_client_dict = {}
 
    @classmethod
    def add_client(cls, _type, rid, sk):
        if _type in {cls.CLIENT_TYPE_COMMON, cls.CLIENT_TYPE_TRADE, cls.CLIENT_TYPE_TRADE_SELL,
                     cls.CLIENT_TYPE_TRADE_CB, cls.CLIENT_TYPE_TRADE_LOW_SUCTION}:
            # 交易列表
            if _type not in cls.socket_client_dict:
                cls.socket_client_dict[_type] = []
            cls.socket_client_dict[_type].append((rid, sk))
            cls.socket_client_lock_dict[rid] = threading.Lock()
        else:
            cls.socket_client_dict[_type] = (rid, sk)
            cls.socket_client_lock_dict[rid] = threading.Lock()
 
    @classmethod
    def acquire_client(cls, _type):
        if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL or _type == cls.CLIENT_TYPE_TRADE_CB:
            if _type in cls.socket_client_dict:
                # 根据排序活跃时间排序
                client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
                                                                                                                          0] in cls.active_client_dict else 0,
                                     reverse=True)
                for d in client_list:
                    if d[0] in cls.socket_client_lock_dict:
                        try:
                            if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
                                return d
                        except threading.TimeoutError:
                            pass
        else:
            if _type in cls.socket_client_dict:
                try:
                    d = cls.socket_client_dict[_type]
                    if d[0] in cls.socket_client_lock_dict:
                        if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
                            return d
                except threading.TimeoutError:
                    pass
        return None
 
    @classmethod
    def release_client(cls, rid):
        if rid in cls.socket_client_lock_dict:
            # 释放锁
            cls.socket_client_lock_dict[rid].release()
 
    @classmethod
    def del_client(cls, rid):
        # 删除线程锁
        if rid in cls.socket_client_lock_dict:
            cls.socket_client_lock_dict.pop(rid)
        # 删除sk
        for t in cls.socket_client_dict:
            if type(cls.socket_client_dict[t]) == list:
                for d in cls.socket_client_dict[t]:
                    if d[0] == rid:
                        cls.socket_client_dict[t].remove(d)
                        try:
                            d[1].close()
                        except:
                            pass
                        break
 
            elif type(cls.socket_client_dict[t]) == tuple:
                if cls.socket_client_dict[t][0] == rid:
                    cls.socket_client_dict.pop(t)
                    try:
                        t[1].close()
                    except:
                        pass
                    break
 
    # 心跳信息
    @classmethod
    def heart(cls, rid):
        cls.active_client_dict[rid] = time.time()
 
    @classmethod
    def del_invalid_clients(cls):
        # 清除长时间无心跳的客户端通道
        for k in cls.active_client_dict.keys():
            if time.time() - cls.active_client_dict[k] > 20:
                # 心跳时间间隔20s以上视为无效
                cls.del_client(k)
 
    @classmethod
    def list_client(cls, type_=None):
        _type = type_
        if not _type:
            _type = cls.CLIENT_TYPE_TRADE
        client_list = sorted(cls.socket_client_dict[_type],
                             key=lambda x: cls.active_client_dict.get(x[0]) if x[0] in cls.active_client_dict else 0,
                             reverse=True)
        fdata = []
        for client in client_list:
            active_time = cls.active_client_dict.get(client[0])
            if active_time is None:
                active_time = 0
            active_time = tool.to_time_str(int(active_time))
            fdata.append(
                (client[0], cls.socket_client_lock_dict[client[0]].locked(), active_time))
        return fdata