""" 低吸代理服务器 """ import http import json import logging import socket import socketserver import threading import time from http.server import BaseHTTPRequestHandler from urllib.parse import parse_qs # 禁用http.server的日志输出 import requests from utils import socket_util, tool logger = logging.getLogger("http.server") logger.setLevel(logging.CRITICAL) # 183.234.94.163/125.93.72.195 REAL_HOST, REAL_PORT = "183.234.94.163", 12881 class DataServer(BaseHTTPRequestHandler): # 禁用日志输出 def log_message(self, format, *args): pass def __get_params(self, url): ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) return ps_dict def do_GET(self): path = self.path whole_path = f"http://{REAL_HOST}:{REAL_PORT}{path}" response = requests.get(whole_path) self.send_response(response.status_code) if response.status_code == 200: # 发给请求客户端的响应数据 self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(response.text.encode()) else: self.wfile.write(response.text.encode()) def do_POST(self): path = self.path whole_path = f"http://{REAL_HOST}:{REAL_PORT}{path}" params = self.__parse_request() response = requests.post(whole_path, json=params) self.__send_response(response.text) def __send_response(self, data): # 发给请求客户端的响应数据 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(data.encode()) def __parse_request(self): params = {} datas = self.rfile.read(int(self.headers['content-length'])) _str = str(datas, encoding="gbk") # print(_str) try: params = json.loads(_str) except: pass return params class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): pass class ClientSocketManager: # 客户端类型 CLIENT_TYPE_COMMON = "common" socket_client_dict = {} socket_client_lock_dict = {} active_client_dict = {} @classmethod def add_client(cls, _type, rid, sk): if _type == cls.CLIENT_TYPE_COMMON: # 交易列表 if _type not in cls.socket_client_dict: cls.socket_client_dict[_type] = [] cls.socket_client_dict[_type].append((rid, sk)) cls.socket_client_lock_dict[rid] = threading.Lock() else: cls.socket_client_dict[_type] = (rid, sk) cls.socket_client_lock_dict[rid] = threading.Lock() @classmethod def acquire_client(cls, _type): if _type == cls.CLIENT_TYPE_COMMON: if _type in cls.socket_client_dict: # 根据排序活跃时间排序 client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[ 0] in cls.active_client_dict else 0, reverse=True) for d in client_list: if d[0] in cls.socket_client_lock_dict: try: if cls.socket_client_lock_dict[d[0]].acquire(blocking=False): return d except threading.TimeoutError: pass else: if _type in cls.socket_client_dict: try: d = cls.socket_client_dict[_type] if d[0] in cls.socket_client_lock_dict: if cls.socket_client_lock_dict[d[0]].acquire(blocking=False): return d except threading.TimeoutError: pass return None @classmethod def release_client(cls, rid): if rid in cls.socket_client_lock_dict: # 释放锁 cls.socket_client_lock_dict[rid].release() @classmethod def del_client(cls, rid): # 删除线程锁 if rid in cls.socket_client_lock_dict: cls.socket_client_lock_dict.pop(rid) # 删除sk for t in cls.socket_client_dict: if type(cls.socket_client_dict[t]) == list: for d in cls.socket_client_dict[t]: if d[0] == rid: cls.socket_client_dict[t].remove(d) try: d[1].close() except: pass break elif type(cls.socket_client_dict[t]) == tuple: if cls.socket_client_dict[t][0] == rid: cls.socket_client_dict.pop(t) try: t[1].close() except: pass break # 心跳信息 @classmethod def heart(cls, rid): cls.active_client_dict[rid] = time.time() @classmethod def del_invalid_clients(cls): # 清除长时间无心跳的客户端通道 for k in cls.active_client_dict.keys(): if time.time() - cls.active_client_dict[k] > 20: # 心跳时间间隔20s以上视为无效 cls.del_client(k) @classmethod def list_client(cls, type_=None): """ :param type_: :return:[(客户端ID, 是否锁定, 活跃时间)] """ _type = type_ if not _type: _type = cls.CLIENT_TYPE_COMMON client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[0] in cls.active_client_dict else 0, reverse=True) fdata = [] for client in client_list: active_time = cls.active_client_dict.get(client[0]) if active_time is None: active_time = 0 active_time = tool.to_time_str(int(active_time)) fdata.append( (client[0], cls.socket_client_lock_dict[client[0]].locked(), active_time)) return fdata class MyTCPServer(socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass): socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True) # 如果使用异步的形式则需要再重写ThreadingTCPServer class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass class MyBaseRequestHandle(socketserver.BaseRequestHandler): __inited = False def setup(self): self.__init() @classmethod def __init(cls): if cls.__inited: return True cls.__inited = True cls.__req_socket_dict = {} @classmethod def getRecvData(cls, skk): data = "" header_size = 10 buf = skk.recv(header_size) header_str = buf if buf: start_time = time.time() buf = buf.decode('utf-8') if buf.startswith("##"): content_length = int(buf[2:10]) received_size = 0 while not received_size == content_length: r_data = skk.recv(10240) received_size += len(r_data) data += r_data.decode('utf-8') else: data = skk.recv(1024 * 1024) data = buf + data.decode('utf-8') return data, header_str def handle(self): host = self.client_address[0] super().handle() sk: socket.socket = self.request while True: try: data, header = self.getRecvData(sk) if data: data_str = data # print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}") data_json = None try: data_json = json.loads(data_str) except json.decoder.JSONDecodeError as e: # JSON解析失败 sk.sendall(socket_util.load_header(json.dumps( {"code": 100, "msg": f"JSON解析失败"}).encode( encoding='utf-8'))) continue type_ = data_json["type"] __start_time = time.time() try: if type_ == 'register': client_type = ClientSocketManager.CLIENT_TYPE_COMMON rid = data_json["rid"] ClientSocketManager.add_client(client_type, rid, sk) sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8')) try: # print("客户端", ClientSocketManager.socket_client_dict) while True: result, header = self.getRecvData(sk) try: resultJSON = json.loads(result) if resultJSON["type"] == 'heart': # 记录活跃客户端 ClientSocketManager.heart(resultJSON['client_id']) except json.decoder.JSONDecodeError as e: print("JSON解析出错", result, header) if not result: sk.close() break time.sleep(1) except ConnectionResetError as ee: ClientSocketManager.del_client(rid) except Exception as e: logging.exception(e) elif data_json["type"] == "push_msg": # 华鑫内部服务器推送过来的信息,需要推送到注册的客户端上面 client_info_list = ClientSocketManager.socket_client_dict.get(ClientSocketManager.CLIENT_TYPE_COMMON) clients = [x[1] for x in client_info_list] if client_info_list: for client in client_info_list: try: client[1].sendall( socket_util.load_header( json.dumps(data_json).encode(encoding='utf-8'))) except Exception as e: logging.exception(e) ClientSocketManager.del_client(client[0]) break except Exception as e: pass finally: sk.sendall( socket_util.load_header( json.dumps({"code": 0}).encode(encoding='utf-8'))) else: # 断开连接 break # sk.close() except Exception as e: # log.logger_tuoguan_request_debug.exception(e) logging.exception(e) break def finish(self): super().finish() def run_socket_server(port): laddr = "0.0.0.0", port print("SocketServer is at: http://%s:%d/" % (laddr)) tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle tcpserver.serve_forever() def run(addr, port): handler = DataServer try: httpd = ThreadedHTTPServer((addr, port), handler) print("HTTP server is at: http://%s:%d/" % (addr, port)) httpd.serve_forever() except Exception as e: logging.exception(e) if __name__ == "__main__": threading.Thread(target=run_socket_server, args=(10009,), daemon=True).start() run("", 12008)