Administrator
2025-08-18 ae8d76a456b64c1c6c4ebf11b6ec33b7df217b1a
ocr/ocr_server.py
@@ -1,12 +1,16 @@
import base64
import json
import logging
import socketserver
from http.server import BaseHTTPRequestHandler
import cv2
import numpy
import ths_industry_util
import constant
from log_module.log import logger_system
from utils import ths_industry_util
from ocr import ocr_util
from ocr.ocr_util import OcrUtil
from third_data import kpl_util
from trade import bidding_money_manager
class OCRServer(BaseHTTPRequestHandler):
@@ -32,11 +36,14 @@
        try:
            data = ""
            try:
                data = json.loads(_str)
            except:
                if type(_str) == str:
                    data = json.loads(_str)
                else:
                    data = _str
            except Exception as e1:
                raise Exception("json解析失败")
            type = data["type"]
            if type == 100:
            _type = data["type"]
            if _type == 100:
                data = data["data"]
                matId = data["matId"]
                index = data["index"]
@@ -53,11 +60,11 @@
                    datas = self.ocr_temp_data[matId]
                    if rows * cols == len(datas):
                        self.ocr_temp_data.pop(matId)
                        mat = cv2.numpy.zeros((rows, cols, 1), cv2.numpy.uint8)
                        mat = numpy.zeros((rows, cols, 1), numpy.uint8)
                        for r in range(0, rows):
                            for c in range(0, cols):
                                mat[r][c] = [datas[r * cols + c]]
                        # cv2.imwrite("D:/test.png", mat)
                        # cv2.imwrite(f"{constant.get_path_prefix()}/test.png", mat)
                        ocr_results = ocr_util.OcrUtil.ocr_with_key(mat, key)
                        if not ocr_results:
                            # 多重识别,防止识别出错
@@ -68,7 +75,7 @@
                        return_str = json.dumps({"code": 2, "msg": "数据出错"})
                else:
                    return_str = json.dumps({"code": 1, "msg": "数据尚未上传完"})
            elif type == 101:
            elif _type == 101:
                data = data["data"]
                matId = data["matId"]
                index = data["index"]
@@ -84,16 +91,16 @@
                    datas = self.ocr_temp_data[matId]
                    if rows * cols == len(datas):
                        self.ocr_temp_data.pop(matId)
                        mat = cv2.numpy.zeros((rows, cols, 1), cv2.numpy.uint8)
                        mat = numpy.zeros((rows, cols, 1), numpy.uint8)
                        for r in range(0, rows):
                            for c in range(0, cols):
                                mat[r][c] = [datas[r * cols + c]]
                        # cv2.imwrite("D:/test.png", mat)
                        # cv2.imwrite(f"{constant.get_path_prefix()}/test.png", mat)
                        ocr_results = ocr_util.OcrUtil.ocr_with_key(mat, ".")
                        code_name = ""
                        for res in ocr_results:
                            code_name += res[0]
                        # TODO 根据代码名称获取代码
                        # 根据代码名称获取代码
                        code = ths_industry_util.get_code_by_name(code_name)
                        # 图像识别
                        return_str = json.dumps({"code": 0, "data": {"code": code}})
@@ -101,6 +108,15 @@
                        return_str = json.dumps({"code": 2, "msg": "数据出错"})
                else:
                    return_str = json.dumps({"code": 1, "msg": "数据尚未上传完"})
            elif _type == 201:
                imgdata = base64.b64decode(data["img"])
                results = ocr_util.OcrUtil.easy_ocr(imgdata)
                # print(results)
                kpl_datas = kpl_util.parse_kpl_datas(results)
                if kpl_datas:
                    bidding_money_manager.set_bidding_money(kpl_datas)
                with open(f"{constant.get_path_prefix()}/kpl.png", mode="wb") as f:
                    f.write(imgdata)
        except Exception as e:
            logging.exception(e)
            if str(e).__contains__("json解析失败"):
@@ -120,23 +136,35 @@
        datas = self.rfile.read(int(self.headers['content-length']))
        _str = str(datas, encoding="gbk")
        # print(_str)
        start = 0
        while True:
            start = _str.find("Content-Disposition: form-data;", start + 1)
            if start <= 0:
                break
            name_start = start + len("Content-Disposition: form-data;")
            name_end = _str.find("\r\n\r\n", start)
        if _str.find("Content-Disposition: form-data;") > -1:
            start = 0
            while True:
                start = _str.find("Content-Disposition: form-data;", start + 1)
                if start <= 0:
                    break
                name_start = start + len("Content-Disposition: form-data;")
                name_end = _str.find("\r\n\r\n", start)
            val_end = _str.find("------", name_end)
            key = _str[name_start:name_end].strip()[6:-1]
            val = _str[name_end:val_end].strip()
            params[key] = val
                val_end = _str.find("------", name_end)
                key = _str[name_start:name_end].strip()[6:-1]
                val = _str[name_end:val_end].strip()
                params[key] = val
        else:
            params = json.loads(_str)
        return params
def run(addr, port):
    handler = OCRServer
    httpd = socketserver.TCPServer((addr, port), handler)
    print("HTTP server is at: http://%s:%d/" % (addr, port))
    httpd.serve_forever()
    try:
        httpd = socketserver.TCPServer((addr, port), handler)
        # print("HTTP server is at: http://%s:%d/" % (addr, port))
        httpd.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{port} 启动失败")
if __name__ == "__main__":
    str_={"id":"123"}
    # print(type(str_)==str)