"""
|
socket工具类
|
"""
|
|
# 添加数据头
|
import base64
|
import hashlib
|
import json
|
import socket
|
import ssl
|
|
import rsa
|
|
|
def md5_encrypt(value):
|
md5 = hashlib.md5()
|
md5.update(value.encode('utf-8'))
|
return md5.hexdigest()
|
|
|
def create_socket(addr, port):
|
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 生成socket,连接server
|
client.connect((addr, port))
|
return client
|
|
|
def load_header(data_bytes):
|
slen = '##%08d' % len(data_bytes)
|
return slen.encode("utf-8") + data_bytes
|
|
|
# 接收数据,去除头
|
def recv_data(sk):
|
data = ""
|
header_size = 10
|
buf = sk.recv(1024)
|
header_str = buf[:header_size]
|
if buf:
|
buf = buf.decode('utf-8')
|
if buf.startswith("##"):
|
content_length = int(buf[2:10])
|
received_size = 0
|
# 加上读取头的数据
|
received_size += len(buf[header_size:])
|
data += buf[header_size:]
|
while not received_size == content_length:
|
r_data = sk.recv(10240)
|
received_size += len(r_data)
|
data += r_data.decode('utf-8')
|
else:
|
data = sk.recv(1024 * 1024)
|
data = buf + data.decode('utf-8')
|
return data, header_str
|
|
|
# 客户端参数加密
|
def encryp_client_params_sign(dataJson):
|
if type(dataJson) != dict:
|
return dataJson
|
str_list = []
|
for k in dataJson:
|
if type(dataJson[k]) == dict:
|
str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}")
|
else:
|
str_list.append(f"{k}={dataJson[k]}")
|
str_list.sort()
|
str_list.append("%Yeshi2014@#.")
|
dataJson["sign"] = md5_encrypt("&".join(str_list))
|
return dataJson
|
|
|
# 客户端密码加密验证
|
def is_client_params_sign_right(dataJson):
|
if type(dataJson) != dict:
|
return False
|
sign = dataJson["sign"]
|
dataJson.pop("sign")
|
str_list = []
|
for k in dataJson:
|
if type(dataJson[k]) == dict:
|
str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}")
|
else:
|
str_list.append(f"{k}={dataJson[k]}")
|
str_list.sort()
|
str_list.append("%Yeshi2014@#.")
|
new_sign = md5_encrypt("&".join(str_list))
|
# print("加密前字符串","&".join(str_list))
|
if sign == new_sign:
|
return True
|
else:
|
return False
|
|
|
# 端口是否被占用
|
def is_port_bind(port):
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
result = sock.connect_ex(('127.0.0.1', port))
|
if result == 0:
|
return True
|
else:
|
return False
|
|
|
def test_rsa():
|
def format_rsa(private_key_bytes):
|
private_key_bytes = private_key_bytes.replace(b"\r\n", b"").replace(b"\n", b"")
|
fbytes = []
|
row = len(private_key_bytes) // 64
|
if len(private_key_bytes) % 64 > 0:
|
row += 1
|
|
for i in range(0, row):
|
if i != row - 1:
|
fbytes.append(private_key_bytes[i * 64:(i + 1) * 64])
|
else:
|
fbytes.append(private_key_bytes[i * 64:])
|
fbs = b'-----BEGIN RSA PRIVATE KEY-----\n'+b"\n".join(fbytes)+b'\n-----END RSA PRIVATE KEY-----\n'
|
return fbs
|
|
# with open("D:\\项目\\三方工具\\dubbo\\dubbo-admin\\dubbo-admin-ui\\node_modules\\public-encrypt\\test\\test_rsa_privkey.pem", mode='rb') as f:
|
# private_key_data = f.read()
|
# print(private_key_data)
|
|
|
with open("D:\\文件传输\\交易\\接口解析\\PrivateKey", mode='rb') as f:
|
private_key_data = f.read()
|
private_key_data = format_rsa(private_key_data)
|
|
results = []
|
with open("D:\\文件传输\\交易\\日志文件\\hex.txt", mode='r') as f:
|
lines = f.readlines()
|
for line in lines:
|
if not line:
|
continue
|
results.append(line[8:].strip())
|
|
hex_bytes = bytes.fromhex("".join(results))
|
|
print(private_key_data.decode())
|
# 将私钥字符串解码为 RSA 私钥对象
|
private_key = rsa.PrivateKey._load_pkcs1_der(private_key_data)
|
|
# 将 base64 编码的字符串解码为 bytes,并进行 RSA 解密
|
encrypted_data = hex_bytes
|
decrypted_data = rsa.decrypt(
|
encrypted_data,
|
private_key
|
)
|
|
# 将解密后的 bytes 对象转换为字符串
|
decrypted_data_str = decrypted_data.decode()
|
print(decrypted_data_str)
|
|
|
if __name__ == "__main__":
|
test_rsa()
|
|
if __name__ == "__main__1":
|
results = []
|
with open("D:\\文件传输\\交易\\日志文件\\hex.txt", mode='r') as f:
|
lines = f.readlines()
|
for line in lines:
|
if not line:
|
continue
|
results.append(line[8:].strip())
|
|
hex_bytes = bytes.fromhex("".join(results))
|
# SSL解码
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
with open("D:/文件传输/交易/接口解析/com.aiyu.kaipanla.cert.pem", mode='r') as f:
|
print(f.readlines())
|
|
context.load_cert_chain(certfile='D:/文件传输/交易/接口解析/com.aiyu.kaipanla.cert.pem')
|
decrypter = context.decryptor()
|
decrypted_data = decrypter.update(hex_bytes) + decrypter.finalize()
|
# 打印解码后的文本
|
print(decrypted_data)
|
|
# client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 生成socket,连接server
|
# client.connect(("appsockapp.longhuvip.com", 14000))
|
# client.send(hex_bytes)
|
# print("返回数据", client.recv(10240))
|