Administrator
2025-06-05 48e24816c88f722e40b43187fa63a8334196f5ae
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""
socket工具类
"""
 
# 添加数据头
import json
import socket
 
from utils import crypt_util
 
 
def create_socket(addr, port):
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 生成socket,连接server
    client.connect((addr, port))
    return client
 
 
def load_header(data_bytes):
    slen = '##%08d' % len(data_bytes)
    return slen.encode("utf-8") + data_bytes
 
 
# 接收数据,去除头
def recv_data(sk):
    data = ""
    header_size = 10
    buf = sk.recv(1024)
    header_str = buf[:header_size]
    if buf:
        buf = buf.decode('utf-8')
        if buf.startswith("##"):
            content_length = int(buf[2:10])
            received_size = 0
            # 加上读取头的数据
            received_size += len(buf[header_size:])
            data += buf[header_size:]
            while not received_size == content_length:
                r_data = sk.recv(10240)
                received_size += len(r_data)
                data += r_data.decode('utf-8')
        else:
            data = sk.recv(1024 * 1024)
            data = buf + data.decode('utf-8')
    return data, header_str
 
 
# 客户端参数加密
def encryp_client_params_sign(dataJson):
    if type(dataJson) != dict:
        return dataJson
    str_list = []
    for k in dataJson:
        if type(dataJson[k]) == dict:
            str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}")
        else:
            str_list.append(f"{k}={dataJson[k]}")
    str_list.sort()
    str_list.append("%Yeshi2014@#.")
    dataJson["sign"] = crypt_util.md5_encrypt("&".join(str_list))
    return dataJson
 
 
# 客户端密码加密验证
def is_client_params_sign_right(dataJson):
    if type(dataJson) != dict:
        return False
    sign = dataJson["sign"]
    dataJson.pop("sign")
    str_list = []
    for k in dataJson:
        if type(dataJson[k]) == dict:
            str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}")
        else:
            str_list.append(f"{k}={dataJson[k]}")
    str_list.sort()
    str_list.append("%Yeshi2014@#.")
    new_sign = crypt_util.md5_encrypt("&".join(str_list))
    if sign == new_sign:
        return True
    else:
        return False
 
 
# 端口是否被占用
def is_port_bind(port, timeout=1):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    result = sock.connect_ex(('127.0.0.1', port))
    sock.close()
    if result == 0:
        return True
    else:
        return False
 
 
if __name__ == "__main__":
    print(is_port_bind(9004))