# ocr_runtime_module.py # PP-OCRv4 계열 ONNX 추론 파이프라인 (DET/CLS/REC) + PaddleOCR 비교 + 프로바이더 선택 # Windows/PowerShell 환경에서 바로 동작하도록 설계 import os import time import glob import math import cv2 import numpy as np import onnxruntime as ort # ---------------------------- # Utils # ---------------------------- def read_image_bgr(path_or_bgr): if isinstance(path_or_bgr, str): img = cv2.imdecode(np.fromfile(path_or_bgr, dtype=np.uint8), cv2.IMREAD_COLOR) if img is None: raise FileNotFoundError(f"Cannot read image: {path_or_bgr}") return img return path_or_bgr def sigmoid(x): return 1 / (1 + np.exp(-x)) def softmax(x, axis=-1): x = x - np.max(x, axis=axis, keepdims=True) e = np.exp(x) return e / np.sum(e, axis=axis, keepdims=True) def to_chw(img): return img.transpose(2, 0, 1) def to_nchw(img): if img.ndim == 3: return img[np.newaxis, ...] return img def timer_ms(fn): t0 = time.perf_counter() out = fn() t1 = time.perf_counter() return out, (t1 - t0) * 1000.0 def ensure_exists(p, msg="File not found"): if p is None: return if not os.path.exists(p): raise FileNotFoundError(f"{msg}: {p}") def find_best_onnx(onnx_dir, stem_hint): """ onnx_dir 안에서 우선순위에 따라 파일 탐색: 1) *{stem_hint}*.fp16.onnx 2) *{stem_hint}*.opt.onnx 3) *{stem_hint}*.simp.onnx 4) *{stem_hint}*.onnx """ patterns = [ f"*{stem_hint}*.fp16.onnx", f"*{stem_hint}*.opt.onnx", f"*{stem_hint}*.simp.onnx", f"*{stem_hint}*.onnx", ] for pat in patterns: cands = sorted(glob.glob(os.path.join(onnx_dir, pat))) if cands: return cands[0] return None # ---------------------------- # Provider selection # ---------------------------- def resolve_providers(choice="auto", use_trt_fp16=False, trt_max_workspace=2<<30): """ choice: auto|cpu|cuda|trt Returns: (used_det, used_rec, used_cls, provider_options_dict) - 모든 세션 동일한 provider 목록 사용 (단순/안정성 우선) """ avail = ort.get_available_providers() choice = (choice or "auto").lower() # 기본 후보 목록 구성 if choice == "cpu": used = ["CPUExecutionProvider"] elif choice == "cuda": used = [p for p in ["CUDAExecutionProvider", "CPUExecutionProvider"] if p in avail] or ["CPUExecutionProvider"] elif choice == "trt": # TRT -> CUDA -> CPU used = [p for p in ["TensorrtExecutionProvider", "CUDAExecutionProvider", "CPUExecutionProvider"] if p in avail] if not used: used = ["CPUExecutionProvider"] else: # auto # TRT 선호, 그다음 CUDA, 그다음 CPU used = [p for p in ["TensorrtExecutionProvider", "CUDAExecutionProvider", "CPUExecutionProvider"] if p in avail] if not used: used = ["CPUExecutionProvider"] # 각 EP별 provider options po = {} if "TensorrtExecutionProvider" in used: po["TensorrtExecutionProvider"] = { "trt_engine_cache_enable": True, "trt_fp16_enable": bool(use_trt_fp16), "trt_max_workspace_size": int(trt_max_workspace), } if "CUDAExecutionProvider" in used: po["CUDAExecutionProvider"] = { "arena_extend_strategy": "kNextPowerOfTwo" } return used, used, used, po, avail def create_sess(model_path, providers, provider_options=None, intra_op=0, inter_op=0, graph_optim=True): ensure_exists(model_path, "ONNX model missing") so = ort.SessionOptions() if graph_optim: so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL if intra_op > 0: so.intra_op_num_threads = intra_op if inter_op > 0: so.inter_op_num_threads = inter_op # provider options 매핑 prov_opts = [] for p in providers: if provider_options and p in provider_options: prov_opts.append(provider_options[p]) else: prov_opts.append({}) # 세션 생성 및 예열 sess = ort.InferenceSession(model_path, sess_options=so, providers=providers, provider_options=prov_opts) return sess # ---------------------------- # Geometry / crop # ---------------------------- def order_points_clockwise(pts): pts = np.array(pts).astype(np.float32) c = np.mean(pts, axis=0) angles = np.arctan2(pts[:,1]-c[1], pts[:,0]-c[0]) idx = np.argsort(angles) return pts[idx] def four_point_warp(bgr, box, out_h=48): """박스(4x2)를 대상으로 퍼스펙티브 워프, 가로길이는 비율로 잡아줌""" box = order_points_clockwise(box) (tl, tr, br, bl) = box w1 = np.linalg.norm(br - bl) w2 = np.linalg.norm(tr - tl) h1 = np.linalg.norm(tr - br) h2 = np.linalg.norm(tl - bl) width = int(max(w1, w2)) height = int(max(h1, h2)) if height <= 0 or width <= 0: return None dst = np.array([[0,0],[width-1,0],[width-1,height-1],[0,height-1]], dtype=np.float32) M = cv2.getPerspectiveTransform(box.astype(np.float32), dst) warped = cv2.warpPerspective(bgr, M, (width, height)) # rec 입력을 위해 높이를 out_h로 리사이즈(가로는 비율 유지) scale = out_h / max(1, warped.shape[0]) out_w = max(1, int(warped.shape[1] * scale)) warped = cv2.resize(warped, (out_w, out_h), interpolation=cv2.INTER_LINEAR) return warped def crop_by_boxes(img_bgr, boxes, use_warp=True, out_h=48): crops = [] for box in boxes: if use_warp: crop = four_point_warp(img_bgr, np.array(box), out_h=out_h) if crop is not None: crops.append(crop) else: b = np.array(box) x0, y0 = np.min(b, axis=0).astype(int) x1, y1 = np.max(b, axis=0).astype(int) x0 = max(0, x0); y0 = max(0, y0) x1 = min(img_bgr.shape[1]-1, x1); y1 = min(img_bgr.shape[0]-1, y1) if x1 > x0 and y1 > y0: crops.append(img_bgr[y0:y1, x0:x1].copy()) return crops # ---------------------------- # DET preprocess/postprocess (DB) # ---------------------------- def det_resize(img, max_side=960, limit_type="max"): h, w = img.shape[:2] if limit_type == "max": scale = min(max_side / max(h, w), 1.0) else: scale = max_side / float(max(h, w)) nh, nw = int(h*scale), int(w*scale) nh = max(nh, 32); nw = max(nw, 32) nh = nh // 32 * 32 nw = nw // 32 * 32 resized = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_LINEAR) return resized, (nh / h, nw / w) def det_preprocess(img_bgr): img = img_bgr.astype(np.float32) img = img / 255.0 # PP-OCR det: mean/std (ImageNet) mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)[None,None,:] std = np.array([0.229, 0.224, 0.225], dtype=np.float32)[None,None,:] img = (img - mean) / std img = to_chw(img) # (3,H,W) return img def boxes_from_bitmap(pred, thresh=0.3, box_thresh=0.6, unclip_ratio=1.5, min_size=3): """ 매우 간단/견고하게: pred(1,H,W) → 바이너리 → 컨투어 → minAreaRect 박스 """ prob_map = pred[0] _, bin_map = cv2.threshold(prob_map.astype(np.float32), thresh, 1, 0) bin_map = (bin_map * 255).astype(np.uint8) contours, _ = cv2.findContours(bin_map, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) boxes = [] scores = [] for cnt in contours: if cv2.contourArea(cnt) < min_size: continue rect = cv2.minAreaRect(cnt) box = cv2.boxPoints(rect) # (4,2) box = np.array(box, dtype=np.float32) # 점수 계산: 박스 내부 평균값 mask = np.zeros_like(bin_map, dtype=np.uint8) cv2.fillPoly(mask, [box.astype(np.int32)], 1) score = float((prob_map * mask).sum() / (mask.sum() + 1e-6)) if score < box_thresh: continue # 언클립(확장) area = cv2.contourArea(box.astype(np.int32)) length = cv2.arcLength(box.astype(np.int32), True) if length > 0: distance = area * unclip_ratio / length # OpenCV 4.7+의 unclip이 없다면 폴리곤 오프셋 간단 근사: box 중심으로 확장 c = box.mean(axis=0) box = (box - c) * (1.0 + distance / (np.linalg.norm(box[0]-c)+1e-6)) + c # 정렬/정수화 box = order_points_clockwise(box).astype(np.int32) boxes.append(box) scores.append(score) return boxes, scores # ---------------------------- # CTC decoder (rec) # ---------------------------- class CTCLabelDecoder: def __init__(self, dict_path, use_space_char=True): chars = [] with open(dict_path, 'r', encoding='utf-8') as f: for line in f: ch = line.strip('\n\r') if len(ch) > 0: chars.append(ch) if use_space_char and " " not in chars: chars.append(" ") self.blank_idx = 0 self.idx2char = [""] + chars self.char2idx = {c:i for i,c in enumerate(self.idx2char)} def decode(self, probs): # probs: (T, C) idxs = probs.argmax(axis=1) confs = probs.max(axis=1) text = [] conf_list = [] prev = self.blank_idx for i, (ix, cf) in enumerate(zip(idxs, confs)): if ix != self.blank_idx and not (i>0 and ix == prev): text.append(self.idx2char[ix]) conf_list.append(float(cf)) prev = ix s = "".join(text) conf = float(np.mean(conf_list)) if conf_list else 0.0 return s, conf # ---------------------------- # ONNX models: Det / Cls / Rec # ---------------------------- class ORTDet: def __init__(self, model_path, providers, provider_options=None, max_side=960): self.sess = create_sess(model_path, providers, provider_options) self.inp = self.sess.get_inputs()[0].name self.out = self.sess.get_outputs()[0].name self.max_side = max_side def infer(self, img_bgr): # resize (resized, scale), t_resize = timer_ms(lambda: det_resize(img_bgr, self.max_side, "max")) # norm det_x = det_preprocess(resized) det_x = to_nchw(det_x).astype(np.float32) # run (y,), t_run = timer_ms(lambda: self.sess.run([self.out], {self.inp: det_x})) # post def _post(): if y.ndim == 4: # (N,1,H,W) or (N, H, W, 1) if y.shape[1] == 1: pm = y[0,0] elif y.shape[-1] == 1: pm = y[0,...,0] else: pm = y[0,0] elif y.ndim == 3: pm = y[0] else: pm = y pm = sigmoid(pm) boxes, scores = boxes_from_bitmap(pm, thresh=0.3, box_thresh=0.6, unclip_ratio=1.5) # resize back to original scale sy, sx = 1.0/scale[0], 1.0/scale[1] boxes_orig = [] for b in boxes: b = b.astype(np.float32) b[:,0] = np.clip(b[:,0] * sx, 0, img_bgr.shape[1]-1) b[:,1] = np.clip(b[:,1] * sy, 0, img_bgr.shape[0]-1) boxes_orig.append(b.astype(np.int32)) return boxes_orig, scores (boxes, scores), t_post = timer_ms(_post) T = {"resize": t_resize, "det": t_run, "post": t_post} return boxes, T class ORTCls: def __init__(self, model_path, providers, provider_options=None, thresh=0.9): self.sess = create_sess(model_path, providers, provider_options) self.inp = self.sess.get_inputs()[0].name self.out = self.sess.get_outputs()[0].name self.thresh = float(thresh) def preprocess(self, crop_bgr): # PP-OCR cls: 3x48x192, mean=0.5, std=0.5 h, w = crop_bgr.shape[:2] target_h, target_w = 48, 192 scale = target_h / max(1, h) nw = max(1, int(w*scale)) if nw > target_w: nw = target_w resized = cv2.resize(crop_bgr, (nw, target_h), interpolation=cv2.INTER_LINEAR) pad = np.ones((target_h, target_w, 3), dtype=np.uint8) * 255 pad[:, :nw, :] = resized img = pad[:, :, ::-1].astype(np.float32) / 255.0 # BGR->RGB img = (img - 0.5) / 0.5 img = to_chw(img) return img def infer(self, crops): if not crops: return [] batch = np.stack([self.preprocess(c) for c in crops], axis=0).astype(np.float32) (y,), t = timer_ms(lambda: self.sess.run([self.out], {self.inp: batch})) # assume y: (N,2) y = softmax(y, axis=1) res = [] for i in range(y.shape[0]): lbl = int(np.argmax(y[i])) prob = float(np.max(y[i])) need_rotate = (lbl == 1 and prob > self.thresh) res.append((need_rotate, prob)) return res class ORTRec: def __init__(self, model_path, providers, provider_options=None, dict_path=None, use_space_char=True, img_h=48, img_w=320, batch_size=6): self.sess = create_sess(model_path, providers, provider_options) self.inp = self.sess.get_inputs()[0].name outs = self.sess.get_outputs() # 일부 모델은 single output, 일부는 sequence logits (N,T,C) self.out = outs[0].name self.decoder = CTCLabelDecoder(dict_path, use_space_char) self.img_h = img_h self.img_w = img_w self.batch_size = max(1, int(batch_size)) def _prep_one(self, crop_bgr): # PP-OCR rec: 3x48x320, mean=0.5, std=0.5, RGB h, w = crop_bgr.shape[:2] scale = self.img_h / max(1, h) nw = max(1, int(w*scale)) if nw > self.img_w: nw = self.img_w resized = cv2.resize(crop_bgr, (nw, self.img_h), interpolation=cv2.INTER_LINEAR) pad = np.ones((self.img_h, self.img_w, 3), dtype=np.uint8) * 255 pad[:, :nw, :] = resized img = pad[:, :, ::-1].astype(np.float32) / 255.0 img = (img - 0.5) / 0.5 img = to_chw(img) return img def infer(self, crops): if not crops: return [], {"prep": 0.0, "rec": 0.0} def _prep(): return np.stack([self._prep_one(c) for c in crops], axis=0).astype(np.float32) batch, t_prep = timer_ms(_prep) texts = [] # 배치 추론 t_rec_total = 0.0 for i in range(0, batch.shape[0], self.batch_size): chunk = batch[i:i+self.batch_size] (y,), t_rec = timer_ms(lambda: self.sess.run([self.out], {self.inp: chunk})) t_rec_total += t_rec # y: (N, T, C) 또는 (N, C, T)일 수 있으므로 정규화 if y.ndim == 3 and y.shape[1] != self.decoder.blank_idx and y.shape[1] < y.shape[2]: # (N, T, C) probs = softmax(y, axis=2) elif y.ndim == 3 and y.shape[1] > y.shape[2]: # (N, C, T) -> (N, T, C) probs = softmax(np.transpose(y, (0,2,1)), axis=2) else: # (N, C) 같은 이상 케이스 방어 probs = softmax(y, axis=-1) probs = probs[:, np.newaxis, :] for j in range(probs.shape[0]): txt, conf = self.decoder.decode(probs[j]) texts.append((txt, conf)) return texts, {"prep": t_prep, "rec": t_rec_total} # ---------------------------- # Orchestrator # ---------------------------- class ONNXOCR: def __init__(self, onnx_dir=None, det_path=None, rec_path=None, cls_path=None, dict_path=None, ep="auto", trt_fp16=False, trt_workspace=2<<30, rec_bs=6, use_warp_crop=True): # 모델 경로 찾기 if det_path is None: det_path = find_best_onnx(onnx_dir, "det") if rec_path is None: rec_path = find_best_onnx(onnx_dir, "rec") # cls는 선택 if cls_path is None: cls_path = find_best_onnx(onnx_dir, "cls") ensure_exists(det_path, "det model missing") ensure_exists(rec_path, "rec model missing") if dict_path is None: # PaddleOCR 내부 dict 사용 경로를 찾아볼 수도 있지만, 인자로 넘기는 걸 권장 raise FileNotFoundError("Provide --dict path to ppocr_keys_v1.txt") self.providers_det, self.providers_rec, self.providers_cls, self.po, self.avail = resolve_providers( ep, trt_fp16, trt_workspace ) self.det = ORTDet(det_path, self.providers_det, self.po if self.po else None) self.rec = ORTRec(rec_path, self.providers_rec, self.po if self.po else None, dict_path=dict_path, img_h=48, img_w=320, batch_size=rec_bs) self.cls = None if cls_path and os.path.exists(cls_path): self.cls = ORTCls(cls_path, self.providers_cls, self.po if self.po else None, thresh=0.9) self.use_warp_crop = bool(use_warp_crop) def run(self, img_bgr, use_cls=False, rec_bs=None): # DET boxes, t_det = self.det.infer(img_bgr) # CROP crops = crop_by_boxes(img_bgr, boxes, use_warp=self.use_warp_crop, out_h=48) # CLS (optional) if use_cls and self.cls and len(crops) > 0: res = self.cls.infer(crops) for i, (need_rot, prob) in enumerate(res): if need_rot: crops[i] = cv2.rotate(crops[i], cv2.ROTATE_180) # REC if rec_bs is not None: self.rec.batch_size = max(1, int(rec_bs)) rec_res, t_rec = self.rec.infer(crops) T = { "det_resize_ms": t_det["resize"], "det_ms": t_det["det"], "det_post_ms": t_det["post"], "rec_prep_ms": t_rec["prep"], "rec_ms": t_rec["rec"], } return boxes, rec_res, T # ---------------------------- # PaddleOCR 비교 러너 (선택) # ---------------------------- class PaddleRunner: def __init__(self, use_angle_cls=False, lang='ch'): from paddleocr import PaddleOCR self.ocr = PaddleOCR(use_angle_cls=use_angle_cls, lang=lang, show_log=True) def run(self, img_bgr): h, w = img_bgr.shape[:2] # PaddleOCR는 경로 입력 선호 → 메모리 입력을 위해 임시 인코딩 _, buf = cv2.imencode(".jpg", img_bgr) img_bytes = buf.tobytes() # warmup _ = self.ocr.ocr(img_bytes, cls=True) t0 = time.perf_counter() res = self.ocr.ocr(img_bytes, cls=True) t1 = time.perf_counter() out = [] if res and len(res)>0: for line in res[0]: txt = line[1][0] conf = float(line[1][1]) out.append((txt, conf)) return out, (t1 - t0) * 1000.0