# -*- coding: utf-8 -*- """ FastDeploy 기반 PaddleOCRv3 모듈 - 세그멘테이션 오류 방지: 모델 객체를 self 속성에 유지 - 폴리곤 데이터 정규화(reshape)로 OpenCV 오류 해결 """ import os import cv2 import gc import logging import fastdeploy as fd import numpy as np from typing import List, Dict, Any, Optional class _NullLogger: def debug(self, *a, **k): pass def info(self, *a, **k): pass def warning(self, *a, **k): pass def error(self, *a, **k): pass def log(self, msg, level=logging.INFO, exc_info=False): if level >= logging.ERROR: print("[ERROR]", msg) elif level >= logging.WARNING: print("[WARN]", msg) else: print("[INFO]", msg) if exc_info: import traceback traceback.print_exc() class OCRModule: def __init__(self, logger: Optional[logging.Logger] = None, base_dir: Optional[str] = None, gpu_id: int = 0): self.logger = logger if logger is not None else _NullLogger() self.base_dir = base_dir or os.getcwd() self.gpu_id = gpu_id # FastDeploy 객체 참조 유지 self.runtime_option: Optional[fd.RuntimeOption] = None self.det_model = None self.rec_model = None self.cls_model = None self.ocr = None # 모델 경로 self.rec_model_dir = os.path.join(self.base_dir, "modules", "PP_Models", "rec") self.det_model_dir = os.path.join(self.base_dir, "modules", "PP_Models", "det") self.cls_model_dir = os.path.join(self.base_dir, "modules", "PP_Models", "cls") self._initialize_ocr() # ------------------------ 내부 공통 로그 ------------------------ def _log(self, msg, level=logging.INFO, exc_info=False): if hasattr(self.logger, "log"): self.logger.log(msg, level=level, exc_info=exc_info) else: if level >= logging.ERROR and hasattr(self.logger, "error"): self.logger.error(msg, exc_info=exc_info) elif level >= logging.WARNING and hasattr(self.logger, "warning"): self.logger.warning(msg) elif hasattr(self.logger, "info"): self.logger.info(msg) else: print(msg) if exc_info: import traceback traceback.print_exc() # ------------------------ OCR 초기화 ------------------------ def _initialize_ocr(self): try: self.runtime_option = fd.RuntimeOption() self.runtime_option.use_gpu(self.gpu_id) det_model_file = os.path.join(self.det_model_dir, "inference.pdmodel") det_params_file = os.path.join(self.det_model_dir, "inference.pdiparams") self.det_model = fd.vision.ocr.DBDetector( det_model_file, det_params_file, runtime_option=self.runtime_option ) rec_model_file = os.path.join(self.rec_model_dir, "inference.pdmodel") rec_params_file = os.path.join(self.rec_model_dir, "inference.pdiparams") rec_label_file = None for cand in ("ppocr_keys_v1.txt", "dict.txt"): p = os.path.join(self.rec_model_dir, cand) if os.path.isfile(p): rec_label_file = p break if rec_label_file is None: raise FileNotFoundError("Recognition label(dict) 파일을 찾을 수 없습니다.") self.rec_model = fd.vision.ocr.Recognizer( rec_model_file, rec_params_file, rec_label_file, runtime_option=self.runtime_option ) cls_model_file = os.path.join(self.cls_model_dir, "inference.pdmodel") cls_params_file = os.path.join(self.cls_model_dir, "inference.pdiparams") if os.path.isfile(cls_model_file) and os.path.isfile(cls_params_file): self.cls_model = fd.vision.ocr.Classifier( cls_model_file, cls_params_file, runtime_option=self.runtime_option ) else: self.cls_model = None self.ocr = fd.vision.ocr.PPOCRv3(self.det_model, self.cls_model, self.rec_model) self._log("✅ FastDeploy PPOCRv3 시스템 초기화 완료", level=logging.INFO) except Exception as e: self._log(f"❌ FastDeploy OCR 초기화 실패: {e}", level=logging.ERROR, exc_info=True) raise # ------------------------ 보조: 폴리곤 정규화 ------------------------ @staticmethod def _normalize_poly(poly) -> np.ndarray: """ poly가 아래 중 어떤 형태여도 (N,2)의 int32 ndarray로 변환: - [x1,y1,x2,y2,x3,y3,x4,y4] - [[x1,y1],[x2,y2],...] - numpy array 등 """ arr = np.asarray(poly, dtype=np.int32) if arr.ndim == 1: # 1차원 → (N,2) if arr.size % 2 != 0: raise ValueError(f"폴리곤 좌표 수가 짝수가 아닙니다: {arr.size}") arr = arr.reshape(-1, 2) elif arr.ndim >= 2: # (N,2) or (1,N,2)등 → (N,2) if arr.shape[-1] != 2: arr = arr.reshape(-1, 2) return arr # ------------------------ 메인 OCR ------------------------ def detect_text(self, image_path: str, method: str = 'polygon') -> List[Dict[str, Any]]: if not os.path.exists(image_path): self._log(f"이미지 파일을 찾을 수 없습니다: {image_path}", level=logging.ERROR) return [] try: image = cv2.imread(image_path) if image is None: self._log(f"이미지를 읽을 수 없습니다: {image_path}", level=logging.ERROR) return [] self._log(f"🔍 OCR 감지 방식: {method}", level=logging.INFO) fd_result = self.ocr.predict(image) if fd_result is None or len(fd_result.text) == 0: self._log("⚠️ OCR 결과가 비어있습니다.", level=logging.WARNING) return [] # 안전 파싱 boxes = getattr(fd_result, "boxes", getattr(fd_result, "det_boxes", None)) texts = getattr(fd_result, "text", []) rec_scores = getattr(fd_result, "rec_scores", None) if boxes is None: raise AttributeError("fd_result에 boxes(또는 det_boxes) 속성이 없습니다.") ocr_raw_results = [] for i, txt in enumerate(texts): poly = self._normalize_poly(boxes[i]).tolist() score = float(rec_scores[i]) if rec_scores is not None else 1.0 ocr_raw_results.append([poly, [txt, score]]) ocr_raw_results = [ocr_raw_results] # 페이지 단위 # PaddleOCR2 스타일 → 내부 공통 구조 converted_results = [] for page in ocr_raw_results: for line in page: poly = line[0] txt = line[1][0] score = line[1][1] converted_results.append([poly, [txt, score]]) # 방식별 후처리 if method == 'polygon': return self._detect_with_polygon(image, converted_results) elif method == 'bbox': return self._detect_with_bbox(image, converted_results) elif method == 'expanded_bbox': return self._detect_with_expanded_bbox(image, converted_results) elif method == 'rotated_bbox': return self._detect_with_rotated_bbox(image, converted_results) elif method == 'contour': return self._detect_with_contour(image, converted_results) else: self._log(f"⚠️ 지원하지 않는 감지 방식: {method}, 기본 polygon 사용", level=logging.WARNING) return self._detect_with_polygon(image, converted_results) except Exception as e: self._log(f"❌ OCR 처리 중 오류 발생: {e}", level=logging.ERROR, exc_info=True) return [] finally: try: del image except Exception: pass # ------------------------ 필터링 ------------------------ def filter_chinese_text(self, ocr_results: List[Dict]) -> List[Dict]: chinese_results = [r for r in ocr_results if any('\u4e00' <= ch <= '\u9fff' for ch in r['text'])] self._log(f"중국어 텍스트 {len(chinese_results)}개 필터링 완료", level=logging.INFO) return chinese_results def filter_korean_text(self, ocr_results: List[Dict]) -> List[Dict]: korean_results = [r for r in ocr_results if any('\uac00' <= ch <= '\ud7a3' for ch in r['text'])] self._log(f"한글 텍스트 {len(korean_results)}개 필터링 완료", level=logging.INFO) return korean_results # ------------------------ 후처리 메서드들 ------------------------ def _detect_with_polygon(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]: out = [] for line in ocr_raw_results: if len(line) < 2: continue poly_raw = line[0] text, conf = line[1] try: polygon_np = self._normalize_poly(poly_raw) except Exception as e: self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING) continue x, y, w, h = cv2.boundingRect(polygon_np) out.append({ 'text': text, 'confidence': float(conf), 'polygon': polygon_np.tolist(), 'bbox': (int(x), int(y), int(w), int(h)), 'method': 'polygon' }) return out def _detect_with_bbox(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]: out = [] for line in ocr_raw_results: if len(line) < 2: continue poly_raw = line[0] text, conf = line[1] try: polygon_np = self._normalize_poly(poly_raw) except Exception as e: self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING) continue x, y, w, h = cv2.boundingRect(polygon_np) bbox_polygon = [[x, y], [x + w, y], [x + w, y + h], [x, y + h]] out.append({ 'text': text, 'confidence': float(conf), 'polygon': bbox_polygon, 'bbox': (int(x), int(y), int(w), int(h)), 'method': 'bbox' }) return out def _detect_with_expanded_bbox(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]: out = [] h_img, w_img = image.shape[:2] for line in ocr_raw_results: if len(line) < 2: continue poly_raw = line[0] text, conf = line[1] try: polygon_np = self._normalize_poly(poly_raw) except Exception as e: self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING) continue x, y, w, h = cv2.boundingRect(polygon_np) expand_x = max(1, int(w * 0.2)) expand_y = max(1, int(h * 0.2)) x_exp = max(0, x - expand_x) y_exp = max(0, y - expand_y) w_exp = min(w_img - x_exp, w + 2 * expand_x) h_exp = min(h_img - y_exp, h + 2 * expand_y) expanded_polygon = [ [x_exp, y_exp], [x_exp + w_exp, y_exp], [x_exp + w_exp, y_exp + h_exp], [x_exp, y_exp + h_exp] ] out.append({ 'text': text, 'confidence': float(conf), 'polygon': expanded_polygon, 'bbox': (int(x_exp), int(y_exp), int(w_exp), int(h_exp)), 'method': 'expanded_bbox' }) return out def _detect_with_rotated_bbox(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]: out = [] for line in ocr_raw_results: if len(line) < 2: continue poly_raw = line[0] text, conf = line[1] try: polygon_np = self._normalize_poly(poly_raw).astype(np.float32) except Exception as e: self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING) continue rect = cv2.minAreaRect(polygon_np) box = cv2.boxPoints(rect).astype(np.int32) x, y, w, h = cv2.boundingRect(polygon_np.astype(np.int32)) out.append({ 'text': text, 'confidence': float(conf), 'polygon': box.tolist(), 'bbox': (int(x), int(y), int(w), int(h)), 'method': 'rotated_bbox', 'rotation_info': { 'center': (float(rect[0][0]), float(rect[0][1])), 'size': (float(rect[1][0]), float(rect[1][1])), 'angle': float(rect[2]) } }) return out def _detect_with_contour(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]: out = [] for line in ocr_raw_results: if len(line) < 2: continue poly_raw = line[0] text, conf = line[1] try: polygon_np = self._normalize_poly(poly_raw) except Exception as e: self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING) continue epsilon = 0.02 * cv2.arcLength(polygon_np, True) approx_contour = cv2.approxPolyDP(polygon_np, epsilon, True) contour_polygon = approx_contour.reshape(-1, 2).tolist() x, y, w, h = cv2.boundingRect(polygon_np) out.append({ 'text': text, 'confidence': float(conf), 'polygon': contour_polygon, 'bbox': (int(x), int(y), int(w), int(h)), 'method': 'contour', 'contour_points': len(contour_polygon) }) return out # ------------------------ 정리 ------------------------ def __del__(self): try: if self.ocr is not None: del self.ocr if self.det_model is not None: del self.det_model if self.rec_model is not None: del self.rec_model if self.cls_model is not None: del self.cls_model if self.runtime_option is not None: del self.runtime_option except Exception: pass finally: gc.collect()