| | |
| | | |
| | | # 添加数据头 |
| | | import json |
| | | import socket |
| | | |
| | | from utils import crypt_util |
| | | |
| | | |
| | | def create_socket(addr, port): |
| | | client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 生成socket,连接server |
| | | client.connect((addr, port)) |
| | | return client |
| | | |
| | | |
| | | def load_header(data_bytes): |
| | |
| | | str_list = [] |
| | | for k in dataJson: |
| | | if type(dataJson[k]) == dict: |
| | | str_list.append(f"{k}={json.dumps(dataJson[k],separators=(',',':'))}") |
| | | str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}") |
| | | else: |
| | | str_list.append(f"{k}={dataJson[k]}") |
| | | str_list.sort() |
| | |
| | | str_list = [] |
| | | for k in dataJson: |
| | | if type(dataJson[k]) == dict: |
| | | str_list.append(f"{k}={json.dumps(dataJson[k],separators=(',',':'))}") |
| | | str_list.append(f"{k}={json.dumps(dataJson[k], separators=(',', ':'))}") |
| | | else: |
| | | str_list.append(f"{k}={dataJson[k]}") |
| | | str_list.sort() |
| | | str_list.append("%Yeshi2014@#.") |
| | | new_sign = crypt_util.md5_encrypt("&".join(str_list)) |
| | | #print("加密前字符串","&".join(str_list)) |
| | | if sign == new_sign: |
| | | return True |
| | | else: |
| | | return False |
| | | |
| | | |
| | | # 端口是否被占用 |
| | | def is_port_bind(port, timeout=1): |
| | | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| | | sock.settimeout(timeout) |
| | | result = sock.connect_ex(('127.0.0.1', port)) |
| | | sock.close() |
| | | if result == 0: |
| | | return True |
| | | else: |
| | | return False |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | print(is_port_bind(9004)) |