import http
|
import json
|
import logging
|
import os
|
import socketserver
|
import subprocess
|
from http.server import BaseHTTPRequestHandler
|
from log_module.log import logger_system, logger_request_api
|
|
from utils import tool
|
|
from log_module import async_log_util
|
|
import urllib.parse as urlparse
|
from urllib.parse import parse_qs
|
|
# 禁用http.server的日志输出
|
logger = logging.getLogger("http.server")
|
logger.setLevel(logging.CRITICAL)
|
|
|
class BuildServer(BaseHTTPRequestHandler):
|
|
# 禁用日志输出
|
def log_message(self, format, *args):
|
pass
|
|
def do_GET(self):
|
path = self.path
|
url = urlparse.urlparse(path)
|
async_log_util.info(logger_request_api, f"开始请求{tool.get_thread_id()}-{url}")
|
response_data = ""
|
if url.path == "/build":
|
ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
|
root_dir = ps_dict.get("dir")
|
script = ps_dict.get("script")
|
output_file = ps_dict.get("output")
|
output_file_path = os.path.join(root_dir, output_file)
|
if os.path.exists(output_file_path):
|
os.remove(output_file_path)
|
# 进行编译
|
subprocess.run([os.path.join(root_dir, script)], capture_output=True, text=True)
|
if not os.path.exists(output_file_path):
|
response_data = json.dumps({"code": 1, "msg": "编译失败"})
|
else:
|
response_data = json.dumps({"code": 0, "data": {"output_file": output_file_path}})
|
elif url.path == "/download_file":
|
# 文件下载
|
ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
|
file_path = ps_dict.get("file")
|
if not os.path.exists(file_path):
|
response_data = json.dumps({"code": 1, "msg": "文件不存在"})
|
else:
|
file_size = os.path.getsize(file_path)
|
self.send_header("Content-Type", "application/octet-stream") # 二进制流
|
self.send_header("Content-Disposition", f'attachment; filename="{os.path.basename(file_path)}"')
|
self.send_header("Content-Length", str(file_size))
|
self.end_headers()
|
with open(file_path, "rb") as f:
|
while True:
|
chunk = f.read(8192) # 每次读取 8KB
|
if not chunk:
|
break
|
self.wfile.write(chunk) # 写入输出流
|
return
|
|
async_log_util.info(logger_request_api, f"结束请求{tool.get_thread_id()}-{url}")
|
self.send_response(200)
|
# 发给请求客户端的响应数据
|
self.send_header('Content-type', 'application/json')
|
self.end_headers()
|
self.wfile.write(response_data.encode())
|
|
def do_POST(self):
|
path = self.path
|
url = urlparse.urlparse(path)
|
# if url.path == "/upload_kpl_data":
|
# # 接受开盘啦数据
|
# params = self.__parse_request()
|
# self.__send_response(result_str)
|
|
def __send_response(self, data):
|
# 发给请求客户端的响应数据
|
self.send_response(200)
|
self.send_header('Content-type', 'application/json')
|
self.end_headers()
|
self.wfile.write(data.encode())
|
|
def __parse_request(self):
|
params = {}
|
datas = self.rfile.read(int(self.headers['content-length']))
|
_str = str(datas, encoding="gbk")
|
# print(_str)
|
params = json.loads(_str)
|
return params
|
|
|
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
|
pass
|
|
|
def run(addr, port):
|
handler = BuildServer
|
try:
|
httpd = ThreadedHTTPServer((addr, port), handler)
|
print("HTTP server is at: http://%s:%d/" % (addr, port))
|
httpd.serve_forever()
|
except Exception as e:
|
logger_system.exception(e)
|
logger_system.error(f"端口服务器:{port} 启动失败")
|
|
|
if __name__ == "__main__":
|
run("", 9004)
|