import base64 import json import logging import socketserver from http.server import BaseHTTPRequestHandler import cv2 import ths_industry_util from ocr import ocr_util from ocr.ocr_util import OcrUtil from third_data import kpl_util from trade import bidding_money_manager class OCRServer(BaseHTTPRequestHandler): ocr_temp_data = {} def do_GET(self): path = self.path self.send_response(200) # 发给请求客户端的响应数据 self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write("".encode()) def do_POST(self): path = self.path params = self.__parse_request() params = params["data"] result_str = self.__process(params) self.__send_response(result_str) def __process(self, _str): return_str = "" try: data = "" try: if type(_str) == str: data = json.loads(_str) else: data = _str except Exception as e1: raise Exception("json解析失败") _type = data["type"] if _type == 100: data = data["data"] matId = data["matId"] index = data["index"] maxIndex = data["maxIndex"] cols = data["width"] rows = data["height"] key = data["key"] datas = data["data"] if self.ocr_temp_data.get(matId) is None: self.ocr_temp_data[matId] = [] self.ocr_temp_data[matId].extend(datas) if maxIndex == index: # 数据传输完成 datas = self.ocr_temp_data[matId] if rows * cols == len(datas): self.ocr_temp_data.pop(matId) mat = cv2.numpy.zeros((rows, cols, 1), cv2.numpy.uint8) for r in range(0, rows): for c in range(0, cols): mat[r][c] = [datas[r * cols + c]] # cv2.imwrite("D:/test.png", mat) ocr_results = ocr_util.OcrUtil.ocr_with_key(mat, key) if not ocr_results: # 多重识别,防止识别出错 ocr_results = ocr_util.OcrUtil.ocr_num(mat, key) # 图像识别 return_str = json.dumps({"code": 0, "data": {"datas": ocr_results}}) else: return_str = json.dumps({"code": 2, "msg": "数据出错"}) else: return_str = json.dumps({"code": 1, "msg": "数据尚未上传完"}) elif _type == 101: data = data["data"] matId = data["matId"] index = data["index"] maxIndex = data["maxIndex"] cols = data["width"] rows = data["height"] datas = data["data"] if self.ocr_temp_data.get(matId) is None: self.ocr_temp_data[matId] = [] self.ocr_temp_data[matId].extend(datas) if maxIndex == index: # 数据传输完成 datas = self.ocr_temp_data[matId] if rows * cols == len(datas): self.ocr_temp_data.pop(matId) mat = cv2.numpy.zeros((rows, cols, 1), cv2.numpy.uint8) for r in range(0, rows): for c in range(0, cols): mat[r][c] = [datas[r * cols + c]] # cv2.imwrite("D:/test.png", mat) ocr_results = ocr_util.OcrUtil.ocr_with_key(mat, ".") code_name = "" for res in ocr_results: code_name += res[0] # TODO 根据代码名称获取代码 code = ths_industry_util.get_code_by_name(code_name) # 图像识别 return_str = json.dumps({"code": 0, "data": {"code": code}}) else: return_str = json.dumps({"code": 2, "msg": "数据出错"}) else: return_str = json.dumps({"code": 1, "msg": "数据尚未上传完"}) elif _type == 201: imgdata = base64.b64decode(data["img"]) results = ocr_util.OcrUtil.easy_ocr(imgdata) print(results) kpl_datas = kpl_util.parse_kpl_datas(results) if kpl_datas: bidding_money_manager.set_bidding_money(kpl_datas) with open("D:/kpl.png", mode="wb") as f: f.write(imgdata) except Exception as e: logging.exception(e) if str(e).__contains__("json解析失败"): logging.error("OCR数据JSON解析解析失败") return_str = json.dumps({"code": -1, "msg": str(e)}) return return_str def __send_response(self, data): # 发给请求客户端的响应数据 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(data.encode()) def __parse_request(self): params = {} datas = self.rfile.read(int(self.headers['content-length'])) _str = str(datas, encoding="gbk") # print(_str) if _str.find("Content-Disposition: form-data;") > -1: start = 0 while True: start = _str.find("Content-Disposition: form-data;", start + 1) if start <= 0: break name_start = start + len("Content-Disposition: form-data;") name_end = _str.find("\r\n\r\n", start) val_end = _str.find("------", name_end) key = _str[name_start:name_end].strip()[6:-1] val = _str[name_end:val_end].strip() params[key] = val else: params = json.loads(_str) return params def run(addr, port): handler = OCRServer httpd = socketserver.TCPServer((addr, port), handler) print("HTTP server is at: http://%s:%d/" % (addr, port)) httpd.serve_forever() if __name__ == "__main__": str_={"id":"123"} print(type(str_)==str)