Administrator
2025-07-04 10bcaa783134a0738922ac869b556a7ed97f0559
构建服务器
2个文件已添加
164 ■■■■■ 已修改文件
build_server.py 114 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build_server.spec 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build_server.py
New file
@@ -0,0 +1,114 @@
import http
import json
import logging
import os
import socketserver
import subprocess
from http.server import BaseHTTPRequestHandler
from log_module.log import logger_system, logger_request_api
from utils import tool
from log_module import async_log_util
import urllib.parse as urlparse
from urllib.parse import parse_qs
# 禁用http.server的日志输出
logger = logging.getLogger("http.server")
logger.setLevel(logging.CRITICAL)
class BuildServer(BaseHTTPRequestHandler):
    # 禁用日志输出
    def log_message(self, format, *args):
        pass
    def do_GET(self):
        path = self.path
        url = urlparse.urlparse(path)
        async_log_util.info(logger_request_api, f"开始请求{tool.get_thread_id()}-{url}")
        response_data = ""
        if url.path == "/build":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            root_dir = ps_dict.get("dir")
            script = ps_dict.get("script")
            output_file = ps_dict.get("output")
            output_file_path = os.path.join(root_dir, output_file)
            if os.path.exists(output_file_path):
                os.remove(output_file_path)
            # 进行编译
            subprocess.run([os.path.join(root_dir, script)], capture_output=True, text=True)
            if not os.path.exists(output_file_path):
                response_data = json.dumps({"code": 1, "msg": "编译失败"})
            else:
                response_data = json.dumps({"code": 0, "data": {"output_file": output_file_path}})
        elif url.path == "/download_file":
            # 文件下载
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            file_path = ps_dict.get("file")
            if not os.path.exists(file_path):
                response_data = json.dumps({"code": 1, "msg": "文件不存在"})
            else:
                file_size = os.path.getsize(file_path)
                self.send_header("Content-Type", "application/octet-stream")  # 二进制流
                self.send_header("Content-Disposition", f'attachment; filename="{os.path.basename(file_path)}"')
                self.send_header("Content-Length", str(file_size))
                self.end_headers()
                with open(file_path, "rb") as f:
                    while True:
                        chunk = f.read(8192)  # 每次读取 8KB
                        if not chunk:
                            break
                        self.wfile.write(chunk)  # 写入输出流
                return
        async_log_util.info(logger_request_api, f"结束请求{tool.get_thread_id()}-{url}")
        self.send_response(200)
        # 发给请求客户端的响应数据
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(response_data.encode())
    def do_POST(self):
        path = self.path
        url = urlparse.urlparse(path)
        # if url.path == "/upload_kpl_data":
        #     # 接受开盘啦数据
        #     params = self.__parse_request()
        #     self.__send_response(result_str)
    def __send_response(self, data):
        # 发给请求客户端的响应数据
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(data.encode())
    def __parse_request(self):
        params = {}
        datas = self.rfile.read(int(self.headers['content-length']))
        _str = str(datas, encoding="gbk")
        # print(_str)
        params = json.loads(_str)
        return params
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
    pass
def run(addr, port):
    handler = BuildServer
    try:
        httpd = ThreadedHTTPServer((addr, port), handler)
        print("HTTP server is at: http://%s:%d/" % (addr, port))
        httpd.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{port} 启动失败")
if __name__ == "__main__":
    run("", 9004)
build_server.spec
New file
@@ -0,0 +1,50 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
    ['build_server.py'],
    pathex=[],
    binaries=[],
    datas=[],
    hiddenimports=[],
    hookspath=[],
    hooksconfig={},
    runtime_hooks=[],
    excludes=[],
    win_no_prefer_redirects=False,
    win_private_assemblies=False,
    noarchive=False,
    cipher=block_cipher,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
    pyz,
    a.scripts,
    [],
    exclude_binaries=True,
    name='build_server',
    debug=False,
    bootloader_ignore_signals=False,
    strip=False,
    upx=True,
    console=True,
    disable_windowed_traceback=False,
    argv_emulation=False,
    target_arch=None,
    codesign_identity=None,
    entitlements_file=None,
)
coll = COLLECT(
    exe,
    a.binaries,
    a.zipfiles,
    a.datas,
    strip=False,
    upx=True,
    upx_exclude=[],
    name='build_server',
)