""" socket工具类 """ # 添加数据头 import base64 import hashlib import json import socket import ssl import rsa def md5_encrypt(value): md5 = hashlib.md5() md5.update(value.encode('utf-8')) return md5.hexdigest() def create_socket(addr, port): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 生成socket,连接server client.connect((addr, port)) return client def load_header(data_bytes): slen = '##%08d' % len(data_bytes) return slen.encode("utf-8") + data_bytes # 接收数据,去除头 def recv_data(sk): data = "" header_size = 10 buf = sk.recv(1024) header_str = buf[:header_size] if buf: buf = buf.decode('utf-8') if buf.startswith("##"): content_length = int(buf[2:10]) received_size = 0 # 加上读取头的数据 received_size += len(buf[header_size:]) data += buf[header_size:] while not received_size == content_length: r_data = sk.recv(10240) received_size += len(r_data) data += r_data.decode('utf-8') else: data = sk.recv(1024 * 1024) data = buf + data.decode('utf-8') return data, header_str # 客户端参数加密 def encryp_client_params_sign(dataJson): if type(dataJson) != dict: return dataJson str_list = [] for k in dataJson: if type(dataJson[k]) == dict: str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}") else: str_list.append(f"{k}={dataJson[k]}") str_list.sort() str_list.append("%Yeshi2014@#.") dataJson["sign"] = md5_encrypt("&".join(str_list)) return dataJson # 客户端密码加密验证 def is_client_params_sign_right(dataJson): if type(dataJson) != dict: return False sign = dataJson["sign"] dataJson.pop("sign") str_list = [] for k in dataJson: if type(dataJson[k]) == dict: str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}") else: str_list.append(f"{k}={dataJson[k]}") str_list.sort() str_list.append("%Yeshi2014@#.") new_sign = md5_encrypt("&".join(str_list)) # print("加密前字符串","&".join(str_list)) if sign == new_sign: return True else: return False # 端口是否被占用 def is_port_bind(port): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex(('127.0.0.1', port)) if result == 0: return True else: return False def test_rsa(): def format_rsa(private_key_bytes): private_key_bytes = private_key_bytes.replace(b"\r\n", b"").replace(b"\n", b"") fbytes = [] row = len(private_key_bytes) // 64 if len(private_key_bytes) % 64 > 0: row += 1 for i in range(0, row): if i != row - 1: fbytes.append(private_key_bytes[i * 64:(i + 1) * 64]) else: fbytes.append(private_key_bytes[i * 64:]) fbs = b'-----BEGIN RSA PRIVATE KEY-----\n'+b"\n".join(fbytes)+b'\n-----END RSA PRIVATE KEY-----\n' return fbs # with open("D:\\项目\\三方工具\\dubbo\\dubbo-admin\\dubbo-admin-ui\\node_modules\\public-encrypt\\test\\test_rsa_privkey.pem", mode='rb') as f: # private_key_data = f.read() # print(private_key_data) with open("D:\\文件传输\\交易\\接口解析\\PrivateKey", mode='rb') as f: private_key_data = f.read() private_key_data = format_rsa(private_key_data) results = [] with open("D:\\文件传输\\交易\\日志文件\\hex.txt", mode='r') as f: lines = f.readlines() for line in lines: if not line: continue results.append(line[8:].strip()) hex_bytes = bytes.fromhex("".join(results)) print(private_key_data.decode()) # 将私钥字符串解码为 RSA 私钥对象 private_key = rsa.PrivateKey._load_pkcs1_der(private_key_data) # 将 base64 编码的字符串解码为 bytes,并进行 RSA 解密 encrypted_data = hex_bytes decrypted_data = rsa.decrypt( encrypted_data, private_key ) # 将解密后的 bytes 对象转换为字符串 decrypted_data_str = decrypted_data.decode() print(decrypted_data_str) if __name__ == "__main__": test_rsa() if __name__ == "__main__1": results = [] with open("D:\\文件传输\\交易\\日志文件\\hex.txt", mode='r') as f: lines = f.readlines() for line in lines: if not line: continue results.append(line[8:].strip()) hex_bytes = bytes.fromhex("".join(results)) # SSL解码 context = ssl.SSLContext(ssl.PROTOCOL_TLS) with open("D:/文件传输/交易/接口解析/com.aiyu.kaipanla.cert.pem", mode='r') as f: print(f.readlines()) context.load_cert_chain(certfile='D:/文件传输/交易/接口解析/com.aiyu.kaipanla.cert.pem') decrypter = context.decryptor() decrypted_data = decrypter.update(hex_bytes) + decrypter.finalize() # 打印解码后的文本 print(decrypted_data) # client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 生成socket,连接server # client.connect(("appsockapp.longhuvip.com", 14000)) # client.send(hex_bytes) # print("返回数据", client.recv(10240))