import http import json import logging import os import socketserver import subprocess from http.server import BaseHTTPRequestHandler from log_module.log import logger_system, logger_request_api from utils import tool from log_module import async_log_util import urllib.parse as urlparse from urllib.parse import parse_qs # 禁用http.server的日志输出 logger = logging.getLogger("http.server") logger.setLevel(logging.CRITICAL) class BuildServer(BaseHTTPRequestHandler): # 禁用日志输出 def log_message(self, format, *args): pass def do_GET(self): path = self.path url = urlparse.urlparse(path) async_log_util.info(logger_request_api, f"开始请求{tool.get_thread_id()}-{url}") response_data = "" if url.path == "/build": ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) root_dir = ps_dict.get("dir") script = ps_dict.get("script") output_file = ps_dict.get("output") output_file_path = os.path.join(root_dir, output_file) if os.path.exists(output_file_path): os.remove(output_file_path) # 进行编译 subprocess.run([os.path.join(root_dir, script)], capture_output=True, text=True) if not os.path.exists(output_file_path): response_data = json.dumps({"code": 1, "msg": "编译失败"}) else: response_data = json.dumps({"code": 0, "data": {"output_file": output_file_path}}) elif url.path == "/download_file": # 文件下载 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) file_path = ps_dict.get("file") if not os.path.exists(file_path): response_data = json.dumps({"code": 1, "msg": "文件不存在"}) else: file_size = os.path.getsize(file_path) self.send_header("Content-Type", "application/octet-stream") # 二进制流 self.send_header("Content-Disposition", f'attachment; filename="{os.path.basename(file_path)}"') self.send_header("Content-Length", str(file_size)) self.end_headers() with open(file_path, "rb") as f: while True: chunk = f.read(8192) # 每次读取 8KB if not chunk: break self.wfile.write(chunk) # 写入输出流 return async_log_util.info(logger_request_api, f"结束请求{tool.get_thread_id()}-{url}") self.send_response(200) # 发给请求客户端的响应数据 self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(response_data.encode()) def do_POST(self): path = self.path url = urlparse.urlparse(path) # if url.path == "/upload_kpl_data": # # 接受开盘啦数据 # params = self.__parse_request() # self.__send_response(result_str) def __send_response(self, data): # 发给请求客户端的响应数据 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(data.encode()) def __parse_request(self): params = {} datas = self.rfile.read(int(self.headers['content-length'])) _str = str(datas, encoding="gbk") # print(_str) params = json.loads(_str) return params class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): pass def run(addr, port): handler = BuildServer try: httpd = ThreadedHTTPServer((addr, port), handler) print("HTTP server is at: http://%s:%d/" % (addr, port)) httpd.serve_forever() except Exception as e: logger_system.exception(e) logger_system.error(f"端口服务器:{port} 启动失败") if __name__ == "__main__": run("", 9004)