admin
2025-06-10 568c763084b926a6f2d632b7ac65b9ec8280752f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
"""
socket工具类
"""
 
# 添加数据头
import base64
import hashlib
import json
import socket
import ssl
 
import rsa
 
 
def md5_encrypt(value):
    md5 = hashlib.md5()
    md5.update(value.encode('utf-8'))
    return md5.hexdigest()
 
 
def create_socket(addr, port):
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 生成socket,连接server
    client.connect((addr, port))
    return client
 
 
def load_header(data_bytes):
    slen = '##%08d' % len(data_bytes)
    return slen.encode("utf-8") + data_bytes
 
 
# 接收数据,去除头
def recv_data(sk):
    data = ""
    header_size = 10
    buf = sk.recv(1024)
    header_str = buf[:header_size]
    if buf:
        buf = buf.decode('utf-8')
        if buf.startswith("##"):
            content_length = int(buf[2:10])
            received_size = 0
            # 加上读取头的数据
            received_size += len(buf[header_size:])
            data += buf[header_size:]
            while not received_size == content_length:
                r_data = sk.recv(10240)
                received_size += len(r_data)
                data += r_data.decode('utf-8')
        else:
            data = sk.recv(1024 * 1024)
            data = buf + data.decode('utf-8')
    return data, header_str
 
 
# 客户端参数加密
def encryp_client_params_sign(dataJson):
    if type(dataJson) != dict:
        return dataJson
    str_list = []
    for k in dataJson:
        if type(dataJson[k]) == dict:
            str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}")
        else:
            str_list.append(f"{k}={dataJson[k]}")
    str_list.sort()
    str_list.append("%Yeshi2014@#.")
    dataJson["sign"] = md5_encrypt("&".join(str_list))
    return dataJson
 
 
# 客户端密码加密验证
def is_client_params_sign_right(dataJson):
    if type(dataJson) != dict:
        return False
    sign = dataJson["sign"]
    dataJson.pop("sign")
    str_list = []
    for k in dataJson:
        if type(dataJson[k]) == dict:
            str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}")
        else:
            str_list.append(f"{k}={dataJson[k]}")
    str_list.sort()
    str_list.append("%Yeshi2014@#.")
    new_sign = md5_encrypt("&".join(str_list))
    # print("加密前字符串","&".join(str_list))
    if sign == new_sign:
        return True
    else:
        return False
 
 
# 端口是否被占用
def is_port_bind(port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    result = sock.connect_ex(('127.0.0.1', port))
    if result == 0:
        return True
    else:
        return False
 
 
def test_rsa():
    def format_rsa(private_key_bytes):
        private_key_bytes = private_key_bytes.replace(b"\r\n", b"").replace(b"\n", b"")
        fbytes = []
        row = len(private_key_bytes) // 64
        if len(private_key_bytes) % 64 > 0:
            row += 1
 
        for i in range(0, row):
            if i != row - 1:
                fbytes.append(private_key_bytes[i * 64:(i + 1) * 64])
            else:
                fbytes.append(private_key_bytes[i * 64:])
        fbs = b'-----BEGIN RSA PRIVATE KEY-----\n'+b"\n".join(fbytes)+b'\n-----END RSA PRIVATE KEY-----\n'
        return fbs
 
    # with open("D:\\项目\\三方工具\\dubbo\\dubbo-admin\\dubbo-admin-ui\\node_modules\\public-encrypt\\test\\test_rsa_privkey.pem", mode='rb') as f:
    #     private_key_data = f.read()
    #     print(private_key_data)
 
 
    with open("D:\\文件传输\\交易\\接口解析\\PrivateKey", mode='rb') as f:
        private_key_data = f.read()
        private_key_data = format_rsa(private_key_data)
 
    results = []
    with open("D:\\文件传输\\交易\\日志文件\\hex.txt", mode='r') as f:
        lines = f.readlines()
        for line in lines:
            if not line:
                continue
            results.append(line[8:].strip())
 
    hex_bytes = bytes.fromhex("".join(results))
 
    print(private_key_data.decode())
    # 将私钥字符串解码为 RSA 私钥对象
    private_key = rsa.PrivateKey._load_pkcs1_der(private_key_data)
 
    # 将 base64 编码的字符串解码为 bytes,并进行 RSA 解密
    encrypted_data = hex_bytes
    decrypted_data = rsa.decrypt(
        encrypted_data,
        private_key
    )
 
    # 将解密后的 bytes 对象转换为字符串
    decrypted_data_str = decrypted_data.decode()
    print(decrypted_data_str)
 
 
if __name__ == "__main__":
    test_rsa()
 
if __name__ == "__main__1":
    results = []
    with open("D:\\文件传输\\交易\\日志文件\\hex.txt", mode='r') as f:
        lines = f.readlines()
        for line in lines:
            if not line:
                continue
            results.append(line[8:].strip())
 
    hex_bytes = bytes.fromhex("".join(results))
    # SSL解码
    context = ssl.SSLContext(ssl.PROTOCOL_TLS)
    with open("D:/文件传输/交易/接口解析/com.aiyu.kaipanla.cert.pem", mode='r') as f:
        print(f.readlines())
 
    context.load_cert_chain(certfile='D:/文件传输/交易/接口解析/com.aiyu.kaipanla.cert.pem')
    decrypter = context.decryptor()
    decrypted_data = decrypter.update(hex_bytes) + decrypter.finalize()
    # 打印解码后的文本
    print(decrypted_data)
 
    # client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 生成socket,连接server
    # client.connect(("appsockapp.longhuvip.com", 14000))
    # client.send(hex_bytes)
    # print("返回数据", client.recv(10240))