# -*- coding: utf-8 -*- """ socket工具类 """ # 添加数据头 import json import socket from huaxin_client import crypt def load_header(data_bytes): slen = '##%08d' % len(data_bytes) return slen.encode("utf-8") + data_bytes # 接收数据,去除头 def recv_data(sk): data = "" header_size = 10 buf = sk.recv(1024) header_str = buf[:header_size] if buf: buf = buf.decode('utf-8') if buf.startswith("##"): content_length = int(buf[2:10]) received_size = 0 # 加上读取头的数据 received_size += len(buf[header_size:]) data += buf[header_size:] while not received_size == content_length: r_data = sk.recv(10240) received_size += len(r_data) data += r_data.decode('utf-8') else: data = sk.recv(1024 * 1024) data = buf + data.decode('utf-8') return data, header_str # 客户端参数加密 def encryp_client_params_sign(dataJson): if type(dataJson) != dict: return dataJson str_list = [] for k in dataJson: if type(dataJson[k]) == dict: str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}") else: str_list.append(f"{k}={dataJson[k]}") str_list.sort() str_list.append("%Yeshi2014@#.") dataJson["sign"] = crypt.md5_encrypt("&".join(str_list)) return dataJson # 客户端密码加密验证 def is_client_params_sign_right(dataJson): if type(dataJson) != dict: return False sign = dataJson["sign"] dataJson.pop("sign") str_list = [] for k in dataJson: if type(dataJson[k]) == dict: str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}") else: str_list.append(f"{k}={dataJson[k]}") str_list.sort() str_list.append("%Yeshi2014@#.") new_sign = crypt.md5_encrypt("&".join(str_list)) if sign == new_sign: return True else: return False def is_port_open(host, port, timeout=1): try: # 创建一个socket对象 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 设置超时时间为1秒钟 s.settimeout(timeout) # 尝试连接到指定的主机和端口 s.connect((host, port)) # 如果连接成功,则端口是打开的 s.close() return True except socket.error: # 如果连接出现异常,则端口是关闭的 return False if __name__ == "__main__": print(is_port_open("127.0.0.1",8080))