# ocr服务器 from multiprocessing import freeze_support freeze_support() import json import logging import time from http.server import BaseHTTPRequestHandler import socketserver import cv2 import wx import ocr_util import multiprocessing logger = logging.getLogger('ocr_logger') logger.setLevel(logging.DEBUG) server_log = logging.FileHandler('logs/server.log', 'a', encoding='utf-8') # 向文件输出的日志级别 server_log.setLevel(logging.DEBUG) # 向文件输出的日志信息格式 formatter = logging.Formatter('%(asctime)s - %(filename)s - line:%(lineno)d - %(levelname)s - %(message)s -%(process)s') server_log.setFormatter(formatter) # 加载文件到logger对象中 logger.addHandler(server_log) class OCRServer(BaseHTTPRequestHandler): ocr_temp_data = {} def do_GET(self): path = self.path self.send_response(200) # 发给请求客户端的响应数据 self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write("".encode()) def do_POST(self): path = self.path params = self.__parse_request() params = params["data"] result_str = self.__process(params) self.__send_response(result_str) def __process(self, _str): return_str = "" try: data = "" try: data = json.loads(_str) except: raise Exception("json解析失败") type = data["type"] # 普通识别 if type == 100: data = data["data"] matId = data["matId"] index = data["index"] maxIndex = data["maxIndex"] cols = data["width"] rows = data["height"] key = data["key"] datas = data["data"] if self.ocr_temp_data.get(matId) is None: self.ocr_temp_data[matId] = [] self.ocr_temp_data[matId].extend(datas) if maxIndex == index: # 数据传输完成 datas = self.ocr_temp_data[matId] if rows * cols == len(datas): self.ocr_temp_data.pop(matId) mat = cv2.numpy.zeros((rows, cols, 1), cv2.numpy.uint8) for r in range(0, rows): for c in range(0, cols): mat[r][c] = [datas[r * cols + c]] # cv2.imwrite("D:/test.png", mat) ocr_results = ocr_util.ocr_with_key(mat, key, 2) # 图像识别 return_str = json.dumps({"code": 0, "data": {"datas": ocr_results}}) else: return_str = json.dumps({"code": 2, "msg": "数据出错"}) else: return_str = json.dumps({"code": 1, "msg": "数据尚未上传完"}) elif type == 101: # 数字识别 data = data["data"] matId = data["matId"] index = data["index"] maxIndex = data["maxIndex"] cols = data["width"] rows = data["height"] datas = data["data"] if self.ocr_temp_data.get(matId) is None: self.ocr_temp_data[matId] = [] self.ocr_temp_data[matId].extend(datas) if maxIndex == index: # 数据传输完成 datas = self.ocr_temp_data[matId] if rows * cols == len(datas): self.ocr_temp_data.pop(matId) mat = cv2.numpy.zeros((rows, cols, 1), cv2.numpy.uint8) for r in range(0, rows): for c in range(0, cols): mat[r][c] = [datas[r * cols + c]] # cv2.imwrite("D:/test.png", mat) result_num = ocr_util.recognize_num(mat) if result_num is None: result_num = "" return_str = json.dumps({"code": 0, "data": {"num": result_num}}) else: return_str = json.dumps({"code": 2, "msg": "数据出错"}) else: return_str = json.dumps({"code": 1, "msg": "数据尚未上传完"}) except Exception as e: if str(e).__contains__("json解析失败"): logging.error("OCR数据JSON解析解析失败") return_str = json.dumps({"code": -1, "msg": str(e)}) return return_str def __send_response(self, data): # 发给请求客户端的响应数据 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(data.encode()) def __parse_request(self): params = {} datas = self.rfile.read(int(self.headers['content-length'])) _str = str(datas, encoding="gbk") # print(_str) start = 0 while True: start = _str.find("Content-Disposition: form-data;", start + 1) if start <= 0: break name_start = start + len("Content-Disposition: form-data;") name_end = _str.find("\r\n\r\n", start) val_end = _str.find("------", name_end) key = _str[name_start:name_end].strip()[6:-1] val = _str[name_end:val_end].strip() params[key] = val return params def run_server(): addr = "127.0.0.1" port = 9009 handler = OCRServer try: httpd = socketserver.TCPServer((addr, port), handler) logger.info("HTTP server is at: http://%s:%d/" % (addr, port)) httpd.serve_forever() except Exception as e: logger.info("server启动出错:%s" % str(e)) class mainFrame(wx.Frame): def __init__(self): '''构造函数''' wx.Frame.__init__(self, None, -1, "文字识别", style=wx.DEFAULT_FRAME_STYLE, size=(200, 100)) class mainApp(wx.App): def OnInit(self): self.SetAppName("文字识别") self.Frame = mainFrame() self.Frame.Show() return True if __name__ == '__main__': run_server() # ocrProcess = multiprocessing.Process(target=lambda: run_server()) # ocrProcess.start() # while True: # time.sleep(1)