# ocr服务器
|
from multiprocessing import freeze_support
|
|
freeze_support()
|
import json
|
import logging
|
|
import time
|
from http.server import BaseHTTPRequestHandler
|
import socketserver
|
import cv2
|
import wx
|
import ocr_util
|
import multiprocessing
|
|
logger = logging.getLogger('ocr_logger')
|
logger.setLevel(logging.DEBUG)
|
server_log = logging.FileHandler('logs/server.log', 'a', encoding='utf-8')
|
# 向文件输出的日志级别
|
server_log.setLevel(logging.DEBUG)
|
# 向文件输出的日志信息格式
|
formatter = logging.Formatter('%(asctime)s - %(filename)s - line:%(lineno)d - %(levelname)s - %(message)s -%(process)s')
|
server_log.setFormatter(formatter)
|
|
# 加载文件到logger对象中
|
logger.addHandler(server_log)
|
|
|
class OCRServer(BaseHTTPRequestHandler):
|
ocr_temp_data = {}
|
|
def do_GET(self):
|
path = self.path
|
self.send_response(200)
|
# 发给请求客户端的响应数据
|
self.send_header('Content-type', 'application/json')
|
self.end_headers()
|
self.wfile.write("".encode())
|
|
def do_POST(self):
|
path = self.path
|
params = self.__parse_request()
|
params = params["data"]
|
result_str = self.__process(params)
|
self.__send_response(result_str)
|
|
def __process(self, _str):
|
return_str = ""
|
try:
|
data = ""
|
try:
|
data = json.loads(_str)
|
except:
|
raise Exception("json解析失败")
|
type = data["type"]
|
# 普通识别
|
if type == 100:
|
data = data["data"]
|
matId = data["matId"]
|
index = data["index"]
|
maxIndex = data["maxIndex"]
|
cols = data["width"]
|
rows = data["height"]
|
key = data["key"]
|
datas = data["data"]
|
if self.ocr_temp_data.get(matId) is None:
|
self.ocr_temp_data[matId] = []
|
self.ocr_temp_data[matId].extend(datas)
|
if maxIndex == index:
|
# 数据传输完成
|
datas = self.ocr_temp_data[matId]
|
if rows * cols == len(datas):
|
self.ocr_temp_data.pop(matId)
|
mat = cv2.numpy.zeros((rows, cols, 1), cv2.numpy.uint8)
|
for r in range(0, rows):
|
for c in range(0, cols):
|
mat[r][c] = [datas[r * cols + c]]
|
# cv2.imwrite("D:/test.png", mat)
|
ocr_results = ocr_util.ocr_with_key(mat, key, 2)
|
# 图像识别
|
return_str = json.dumps({"code": 0, "data": {"datas": ocr_results}})
|
else:
|
return_str = json.dumps({"code": 2, "msg": "数据出错"})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "数据尚未上传完"})
|
elif type == 101:
|
# 数字识别
|
data = data["data"]
|
matId = data["matId"]
|
index = data["index"]
|
maxIndex = data["maxIndex"]
|
cols = data["width"]
|
rows = data["height"]
|
datas = data["data"]
|
if self.ocr_temp_data.get(matId) is None:
|
self.ocr_temp_data[matId] = []
|
self.ocr_temp_data[matId].extend(datas)
|
if maxIndex == index:
|
# 数据传输完成
|
datas = self.ocr_temp_data[matId]
|
if rows * cols == len(datas):
|
self.ocr_temp_data.pop(matId)
|
mat = cv2.numpy.zeros((rows, cols, 1), cv2.numpy.uint8)
|
for r in range(0, rows):
|
for c in range(0, cols):
|
mat[r][c] = [datas[r * cols + c]]
|
# cv2.imwrite("D:/test.png", mat)
|
result_num = ocr_util.recognize_num(mat)
|
if result_num is None:
|
result_num = ""
|
return_str = json.dumps({"code": 0, "data": {"num": result_num}})
|
else:
|
return_str = json.dumps({"code": 2, "msg": "数据出错"})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "数据尚未上传完"})
|
except Exception as e:
|
if str(e).__contains__("json解析失败"):
|
logging.error("OCR数据JSON解析解析失败")
|
return_str = json.dumps({"code": -1, "msg": str(e)})
|
return return_str
|
|
def __send_response(self, data):
|
# 发给请求客户端的响应数据
|
self.send_response(200)
|
self.send_header('Content-type', 'application/json')
|
self.end_headers()
|
self.wfile.write(data.encode())
|
|
def __parse_request(self):
|
params = {}
|
datas = self.rfile.read(int(self.headers['content-length']))
|
_str = str(datas, encoding="gbk")
|
# print(_str)
|
start = 0
|
while True:
|
start = _str.find("Content-Disposition: form-data;", start + 1)
|
if start <= 0:
|
break
|
name_start = start + len("Content-Disposition: form-data;")
|
name_end = _str.find("\r\n\r\n", start)
|
|
val_end = _str.find("------", name_end)
|
key = _str[name_start:name_end].strip()[6:-1]
|
val = _str[name_end:val_end].strip()
|
params[key] = val
|
return params
|
|
|
def run_server():
|
addr = "127.0.0.1"
|
port = 9009
|
handler = OCRServer
|
try:
|
httpd = socketserver.TCPServer((addr, port), handler)
|
logger.info("HTTP server is at: http://%s:%d/" % (addr, port))
|
httpd.serve_forever()
|
except Exception as e:
|
logger.info("server启动出错:%s" % str(e))
|
|
|
class mainFrame(wx.Frame):
|
def __init__(self):
|
'''构造函数'''
|
wx.Frame.__init__(self, None, -1, "文字识别", style=wx.DEFAULT_FRAME_STYLE,
|
size=(200, 100))
|
|
|
class mainApp(wx.App):
|
def OnInit(self):
|
self.SetAppName("文字识别")
|
self.Frame = mainFrame()
|
self.Frame.Show()
|
return True
|
|
|
if __name__ == '__main__':
|
run_server()
|
# ocrProcess = multiprocessing.Process(target=lambda: run_server())
|
# ocrProcess.start()
|
# while True:
|
# time.sleep(1)
|