"""
|
低吸代理服务器
|
"""
|
import http
|
import json
|
import logging
|
import socket
|
import socketserver
|
import threading
|
import time
|
from http.server import BaseHTTPRequestHandler
|
from urllib.parse import parse_qs
|
|
# 禁用http.server的日志输出
|
import requests
|
|
from utils import socket_util, tool
|
|
logger = logging.getLogger("http.server")
|
logger.setLevel(logging.CRITICAL)
|
|
# 183.234.94.163/125.93.72.195
|
REAL_HOST, REAL_PORT = "183.234.94.163", 12881
|
|
|
class DataServer(BaseHTTPRequestHandler):
|
# 禁用日志输出
|
def log_message(self, format, *args):
|
pass
|
|
def __get_params(self, url):
|
ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
|
return ps_dict
|
|
def do_GET(self):
|
path = self.path
|
whole_path = f"http://{REAL_HOST}:{REAL_PORT}{path}"
|
response = requests.get(whole_path)
|
self.send_response(response.status_code)
|
if response.status_code == 200:
|
# 发给请求客户端的响应数据
|
self.send_header('Content-type', 'application/json')
|
self.end_headers()
|
self.wfile.write(response.text.encode())
|
else:
|
self.wfile.write(response.text.encode())
|
|
def do_POST(self):
|
path = self.path
|
whole_path = f"http://{REAL_HOST}:{REAL_PORT}{path}"
|
params = self.__parse_request()
|
response = requests.post(whole_path, json=params)
|
self.__send_response(response.text)
|
|
def __send_response(self, data):
|
# 发给请求客户端的响应数据
|
self.send_response(200)
|
self.send_header('Content-type', 'application/json')
|
self.end_headers()
|
self.wfile.write(data.encode())
|
|
def __parse_request(self):
|
params = {}
|
datas = self.rfile.read(int(self.headers['content-length']))
|
_str = str(datas, encoding="gbk")
|
# print(_str)
|
try:
|
params = json.loads(_str)
|
except:
|
pass
|
return params
|
|
|
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
|
pass
|
|
|
class ClientSocketManager:
|
# 客户端类型
|
CLIENT_TYPE_COMMON = "common"
|
socket_client_dict = {}
|
socket_client_lock_dict = {}
|
active_client_dict = {}
|
|
@classmethod
|
def add_client(cls, _type, rid, sk):
|
if _type == cls.CLIENT_TYPE_COMMON:
|
# 交易列表
|
if _type not in cls.socket_client_dict:
|
cls.socket_client_dict[_type] = []
|
cls.socket_client_dict[_type].append((rid, sk))
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
else:
|
cls.socket_client_dict[_type] = (rid, sk)
|
cls.socket_client_lock_dict[rid] = threading.Lock()
|
|
@classmethod
|
def acquire_client(cls, _type):
|
if _type == cls.CLIENT_TYPE_COMMON:
|
if _type in cls.socket_client_dict:
|
# 根据排序活跃时间排序
|
client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
|
0] in cls.active_client_dict else 0,
|
reverse=True)
|
for d in client_list:
|
if d[0] in cls.socket_client_lock_dict:
|
try:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
return d
|
except threading.TimeoutError:
|
pass
|
else:
|
if _type in cls.socket_client_dict:
|
try:
|
d = cls.socket_client_dict[_type]
|
if d[0] in cls.socket_client_lock_dict:
|
if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
|
return d
|
except threading.TimeoutError:
|
pass
|
return None
|
|
@classmethod
|
def release_client(cls, rid):
|
if rid in cls.socket_client_lock_dict:
|
# 释放锁
|
cls.socket_client_lock_dict[rid].release()
|
|
@classmethod
|
def del_client(cls, rid):
|
# 删除线程锁
|
if rid in cls.socket_client_lock_dict:
|
cls.socket_client_lock_dict.pop(rid)
|
# 删除sk
|
for t in cls.socket_client_dict:
|
if type(cls.socket_client_dict[t]) == list:
|
for d in cls.socket_client_dict[t]:
|
if d[0] == rid:
|
cls.socket_client_dict[t].remove(d)
|
try:
|
d[1].close()
|
except:
|
pass
|
break
|
|
elif type(cls.socket_client_dict[t]) == tuple:
|
if cls.socket_client_dict[t][0] == rid:
|
cls.socket_client_dict.pop(t)
|
try:
|
t[1].close()
|
except:
|
pass
|
break
|
|
# 心跳信息
|
@classmethod
|
def heart(cls, rid):
|
cls.active_client_dict[rid] = time.time()
|
|
@classmethod
|
def del_invalid_clients(cls):
|
# 清除长时间无心跳的客户端通道
|
for k in cls.active_client_dict.keys():
|
if time.time() - cls.active_client_dict[k] > 20:
|
# 心跳时间间隔20s以上视为无效
|
cls.del_client(k)
|
|
@classmethod
|
def list_client(cls, type_=None):
|
"""
|
:param type_:
|
:return:[(客户端ID, 是否锁定, 活跃时间)]
|
"""
|
_type = type_
|
if not _type:
|
_type = cls.CLIENT_TYPE_COMMON
|
client_list = sorted(cls.socket_client_dict[_type],
|
key=lambda x: cls.active_client_dict.get(x[0]) if x[0] in cls.active_client_dict else 0,
|
reverse=True)
|
fdata = []
|
for client in client_list:
|
active_time = cls.active_client_dict.get(client[0])
|
if active_time is None:
|
active_time = 0
|
active_time = tool.to_time_str(int(active_time))
|
fdata.append(
|
(client[0], cls.socket_client_lock_dict[client[0]].locked(), active_time))
|
return fdata
|
|
|
class MyTCPServer(socketserver.TCPServer):
|
def __init__(self, server_address, RequestHandlerClass):
|
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
|
|
|
# 如果使用异步的形式则需要再重写ThreadingTCPServer
|
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
|
|
|
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
|
__inited = False
|
|
def setup(self):
|
self.__init()
|
|
@classmethod
|
def __init(cls):
|
if cls.__inited:
|
return True
|
cls.__inited = True
|
cls.__req_socket_dict = {}
|
|
@classmethod
|
def getRecvData(cls, skk):
|
data = ""
|
header_size = 10
|
buf = skk.recv(header_size)
|
header_str = buf
|
if buf:
|
start_time = time.time()
|
buf = buf.decode('utf-8')
|
if buf.startswith("##"):
|
content_length = int(buf[2:10])
|
received_size = 0
|
while not received_size == content_length:
|
r_data = skk.recv(10240)
|
received_size += len(r_data)
|
data += r_data.decode('utf-8')
|
else:
|
data = skk.recv(1024 * 1024)
|
data = buf + data.decode('utf-8')
|
return data, header_str
|
|
def handle(self):
|
host = self.client_address[0]
|
super().handle()
|
sk: socket.socket = self.request
|
while True:
|
try:
|
data, header = self.getRecvData(sk)
|
if data:
|
data_str = data
|
# print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
|
data_json = None
|
try:
|
data_json = json.loads(data_str)
|
except json.decoder.JSONDecodeError as e:
|
# JSON解析失败
|
sk.sendall(socket_util.load_header(json.dumps(
|
{"code": 100, "msg": f"JSON解析失败"}).encode(
|
encoding='utf-8')))
|
continue
|
type_ = data_json["type"]
|
__start_time = time.time()
|
try:
|
if type_ == 'register':
|
client_type = ClientSocketManager.CLIENT_TYPE_COMMON
|
rid = data_json["rid"]
|
ClientSocketManager.add_client(client_type, rid, sk)
|
sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8'))
|
try:
|
# print("客户端", ClientSocketManager.socket_client_dict)
|
while True:
|
result, header = self.getRecvData(sk)
|
try:
|
resultJSON = json.loads(result)
|
if resultJSON["type"] == 'heart':
|
# 记录活跃客户端
|
ClientSocketManager.heart(resultJSON['client_id'])
|
except json.decoder.JSONDecodeError as e:
|
print("JSON解析出错", result, header)
|
if not result:
|
sk.close()
|
break
|
time.sleep(1)
|
except ConnectionResetError as ee:
|
ClientSocketManager.del_client(rid)
|
except Exception as e:
|
logging.exception(e)
|
elif data_json["type"] == "push_msg":
|
# 华鑫内部服务器推送过来的信息,需要推送到注册的客户端上面
|
client_info_list = ClientSocketManager.socket_client_dict.get(ClientSocketManager.CLIENT_TYPE_COMMON)
|
clients = [x[1] for x in client_info_list]
|
if client_info_list:
|
for client in client_info_list:
|
try:
|
client[1].sendall(
|
socket_util.load_header(
|
json.dumps(data_json).encode(encoding='utf-8')))
|
except Exception as e:
|
logging.exception(e)
|
ClientSocketManager.del_client(client[0])
|
break
|
except Exception as e:
|
pass
|
finally:
|
sk.sendall(
|
socket_util.load_header(
|
json.dumps({"code": 0}).encode(encoding='utf-8')))
|
else:
|
# 断开连接
|
break
|
# sk.close()
|
except Exception as e:
|
# log.logger_tuoguan_request_debug.exception(e)
|
logging.exception(e)
|
break
|
|
def finish(self):
|
super().finish()
|
|
|
def run_socket_server(port):
|
laddr = "0.0.0.0", port
|
print("SocketServer is at: http://%s:%d/" % (laddr))
|
tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle
|
tcpserver.serve_forever()
|
|
|
def run(addr, port):
|
handler = DataServer
|
try:
|
httpd = ThreadedHTTPServer((addr, port), handler)
|
print("HTTP server is at: http://%s:%d/" % (addr, port))
|
httpd.serve_forever()
|
except Exception as e:
|
logging.exception(e)
|
|
|
if __name__ == "__main__":
|
threading.Thread(target=run_socket_server, args=(10009,), daemon=True).start()
|
run("", 12008)
|