"""
|
socket工具类
|
"""
|
|
# 添加数据头
|
import json
|
import socket
|
|
from utils import crypt_util
|
|
|
def create_socket(addr, port):
|
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 生成socket,连接server
|
client.connect((addr, port))
|
return client
|
|
|
def load_header(data_bytes):
|
slen = '##%08d' % len(data_bytes)
|
return slen.encode("utf-8") + data_bytes
|
|
|
# 接收数据,去除头
|
def recv_data(sk):
|
data = ""
|
header_size = 10
|
buf = sk.recv(1024)
|
header_str = buf[:header_size]
|
if buf:
|
buf = buf.decode('utf-8')
|
if buf.startswith("##"):
|
content_length = int(buf[2:10])
|
received_size = 0
|
# 加上读取头的数据
|
received_size += len(buf[header_size:])
|
data += buf[header_size:]
|
while not received_size == content_length:
|
r_data = sk.recv(10240)
|
received_size += len(r_data)
|
data += r_data.decode('utf-8')
|
else:
|
data = sk.recv(1024 * 1024)
|
data = buf + data.decode('utf-8')
|
return data, header_str
|
|
|
# 客户端参数加密
|
def encryp_client_params_sign(dataJson):
|
if type(dataJson) != dict:
|
return dataJson
|
str_list = []
|
for k in dataJson:
|
if type(dataJson[k]) == dict:
|
str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}")
|
else:
|
str_list.append(f"{k}={dataJson[k]}")
|
str_list.sort()
|
str_list.append("%Yeshi2014@#.")
|
dataJson["sign"] = crypt_util.md5_encrypt("&".join(str_list))
|
return dataJson
|
|
|
# 客户端密码加密验证
|
def is_client_params_sign_right(dataJson):
|
if type(dataJson) != dict:
|
return False
|
sign = dataJson["sign"]
|
dataJson.pop("sign")
|
str_list = []
|
for k in dataJson:
|
if type(dataJson[k]) == dict:
|
str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}")
|
else:
|
str_list.append(f"{k}={dataJson[k]}")
|
str_list.sort()
|
str_list.append("%Yeshi2014@#.")
|
new_sign = crypt_util.md5_encrypt("&".join(str_list))
|
# printlog("加密前字符串","&".join(str_list))
|
if sign == new_sign:
|
return True
|
else:
|
return False
|
|
|
# 端口是否被占用
|
def is_port_bind(port):
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
result = sock.connect_ex(('127.0.0.1', port))
|
if result == 0:
|
return True
|
else:
|
return False
|
|
|
if __name__ == "__main__":
|
pass
|