378 lines
15 KiB
Python
378 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
FastDeploy 기반 PaddleOCRv3 모듈
|
|
- 세그멘테이션 오류 방지: 모델 객체를 self 속성에 유지
|
|
- 폴리곤 데이터 정규화(reshape)로 OpenCV 오류 해결
|
|
"""
|
|
|
|
import os
|
|
import cv2
|
|
import gc
|
|
import logging
|
|
import fastdeploy as fd
|
|
import numpy as np
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
|
|
class _NullLogger:
|
|
def debug(self, *a, **k): pass
|
|
def info(self, *a, **k): pass
|
|
def warning(self, *a, **k): pass
|
|
def error(self, *a, **k): pass
|
|
def log(self, msg, level=logging.INFO, exc_info=False):
|
|
if level >= logging.ERROR:
|
|
print("[ERROR]", msg)
|
|
elif level >= logging.WARNING:
|
|
print("[WARN]", msg)
|
|
else:
|
|
print("[INFO]", msg)
|
|
if exc_info:
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
class OCRModule:
|
|
def __init__(self, logger: Optional[logging.Logger] = None,
|
|
base_dir: Optional[str] = None,
|
|
gpu_id: int = 0):
|
|
self.logger = logger if logger is not None else _NullLogger()
|
|
self.base_dir = base_dir or os.getcwd()
|
|
self.gpu_id = gpu_id
|
|
|
|
# FastDeploy 객체 참조 유지
|
|
self.runtime_option: Optional[fd.RuntimeOption] = None
|
|
self.det_model = None
|
|
self.rec_model = None
|
|
self.cls_model = None
|
|
self.ocr = None
|
|
|
|
# 모델 경로
|
|
self.rec_model_dir = os.path.join(self.base_dir, "modules", "PP_Models", "rec")
|
|
self.det_model_dir = os.path.join(self.base_dir, "modules", "PP_Models", "det")
|
|
self.cls_model_dir = os.path.join(self.base_dir, "modules", "PP_Models", "cls")
|
|
|
|
self._initialize_ocr()
|
|
|
|
# ------------------------ 내부 공통 로그 ------------------------
|
|
def _log(self, msg, level=logging.INFO, exc_info=False):
|
|
if hasattr(self.logger, "log"):
|
|
self.logger.log(msg, level=level, exc_info=exc_info)
|
|
else:
|
|
if level >= logging.ERROR and hasattr(self.logger, "error"):
|
|
self.logger.error(msg, exc_info=exc_info)
|
|
elif level >= logging.WARNING and hasattr(self.logger, "warning"):
|
|
self.logger.warning(msg)
|
|
elif hasattr(self.logger, "info"):
|
|
self.logger.info(msg)
|
|
else:
|
|
print(msg)
|
|
if exc_info:
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# ------------------------ OCR 초기화 ------------------------
|
|
def _initialize_ocr(self):
|
|
try:
|
|
self.runtime_option = fd.RuntimeOption()
|
|
self.runtime_option.use_gpu(self.gpu_id)
|
|
|
|
det_model_file = os.path.join(self.det_model_dir, "inference.pdmodel")
|
|
det_params_file = os.path.join(self.det_model_dir, "inference.pdiparams")
|
|
self.det_model = fd.vision.ocr.DBDetector(
|
|
det_model_file, det_params_file, runtime_option=self.runtime_option
|
|
)
|
|
|
|
rec_model_file = os.path.join(self.rec_model_dir, "inference.pdmodel")
|
|
rec_params_file = os.path.join(self.rec_model_dir, "inference.pdiparams")
|
|
|
|
rec_label_file = None
|
|
for cand in ("ppocr_keys_v1.txt", "dict.txt"):
|
|
p = os.path.join(self.rec_model_dir, cand)
|
|
if os.path.isfile(p):
|
|
rec_label_file = p
|
|
break
|
|
if rec_label_file is None:
|
|
raise FileNotFoundError("Recognition label(dict) 파일을 찾을 수 없습니다.")
|
|
|
|
self.rec_model = fd.vision.ocr.Recognizer(
|
|
rec_model_file, rec_params_file, rec_label_file,
|
|
runtime_option=self.runtime_option
|
|
)
|
|
|
|
cls_model_file = os.path.join(self.cls_model_dir, "inference.pdmodel")
|
|
cls_params_file = os.path.join(self.cls_model_dir, "inference.pdiparams")
|
|
if os.path.isfile(cls_model_file) and os.path.isfile(cls_params_file):
|
|
self.cls_model = fd.vision.ocr.Classifier(
|
|
cls_model_file, cls_params_file, runtime_option=self.runtime_option
|
|
)
|
|
else:
|
|
self.cls_model = None
|
|
|
|
self.ocr = fd.vision.ocr.PPOCRv3(self.det_model, self.cls_model, self.rec_model)
|
|
self._log("✅ FastDeploy PPOCRv3 시스템 초기화 완료", level=logging.INFO)
|
|
|
|
except Exception as e:
|
|
self._log(f"❌ FastDeploy OCR 초기화 실패: {e}", level=logging.ERROR, exc_info=True)
|
|
raise
|
|
|
|
# ------------------------ 보조: 폴리곤 정규화 ------------------------
|
|
@staticmethod
|
|
def _normalize_poly(poly) -> np.ndarray:
|
|
"""
|
|
poly가 아래 중 어떤 형태여도 (N,2)의 int32 ndarray로 변환:
|
|
- [x1,y1,x2,y2,x3,y3,x4,y4]
|
|
- [[x1,y1],[x2,y2],...]
|
|
- numpy array 등
|
|
"""
|
|
arr = np.asarray(poly, dtype=np.int32)
|
|
if arr.ndim == 1:
|
|
# 1차원 → (N,2)
|
|
if arr.size % 2 != 0:
|
|
raise ValueError(f"폴리곤 좌표 수가 짝수가 아닙니다: {arr.size}")
|
|
arr = arr.reshape(-1, 2)
|
|
elif arr.ndim >= 2:
|
|
# (N,2) or (1,N,2)등 → (N,2)
|
|
if arr.shape[-1] != 2:
|
|
arr = arr.reshape(-1, 2)
|
|
return arr
|
|
|
|
# ------------------------ 메인 OCR ------------------------
|
|
def detect_text(self, image_path: str, method: str = 'polygon') -> List[Dict[str, Any]]:
|
|
if not os.path.exists(image_path):
|
|
self._log(f"이미지 파일을 찾을 수 없습니다: {image_path}", level=logging.ERROR)
|
|
return []
|
|
|
|
try:
|
|
image = cv2.imread(image_path)
|
|
if image is None:
|
|
self._log(f"이미지를 읽을 수 없습니다: {image_path}", level=logging.ERROR)
|
|
return []
|
|
|
|
self._log(f"🔍 OCR 감지 방식: {method}", level=logging.INFO)
|
|
|
|
fd_result = self.ocr.predict(image)
|
|
if fd_result is None or len(fd_result.text) == 0:
|
|
self._log("⚠️ OCR 결과가 비어있습니다.", level=logging.WARNING)
|
|
return []
|
|
|
|
# 안전 파싱
|
|
boxes = getattr(fd_result, "boxes", getattr(fd_result, "det_boxes", None))
|
|
texts = getattr(fd_result, "text", [])
|
|
rec_scores = getattr(fd_result, "rec_scores", None)
|
|
|
|
if boxes is None:
|
|
raise AttributeError("fd_result에 boxes(또는 det_boxes) 속성이 없습니다.")
|
|
|
|
ocr_raw_results = []
|
|
for i, txt in enumerate(texts):
|
|
poly = self._normalize_poly(boxes[i]).tolist()
|
|
score = float(rec_scores[i]) if rec_scores is not None else 1.0
|
|
ocr_raw_results.append([poly, [txt, score]])
|
|
ocr_raw_results = [ocr_raw_results] # 페이지 단위
|
|
|
|
# PaddleOCR2 스타일 → 내부 공통 구조
|
|
converted_results = []
|
|
for page in ocr_raw_results:
|
|
for line in page:
|
|
poly = line[0]
|
|
txt = line[1][0]
|
|
score = line[1][1]
|
|
converted_results.append([poly, [txt, score]])
|
|
|
|
# 방식별 후처리
|
|
if method == 'polygon':
|
|
return self._detect_with_polygon(image, converted_results)
|
|
elif method == 'bbox':
|
|
return self._detect_with_bbox(image, converted_results)
|
|
elif method == 'expanded_bbox':
|
|
return self._detect_with_expanded_bbox(image, converted_results)
|
|
elif method == 'rotated_bbox':
|
|
return self._detect_with_rotated_bbox(image, converted_results)
|
|
elif method == 'contour':
|
|
return self._detect_with_contour(image, converted_results)
|
|
else:
|
|
self._log(f"⚠️ 지원하지 않는 감지 방식: {method}, 기본 polygon 사용", level=logging.WARNING)
|
|
return self._detect_with_polygon(image, converted_results)
|
|
|
|
except Exception as e:
|
|
self._log(f"❌ OCR 처리 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
|
return []
|
|
finally:
|
|
try:
|
|
del image
|
|
except Exception:
|
|
pass
|
|
|
|
# ------------------------ 필터링 ------------------------
|
|
def filter_chinese_text(self, ocr_results: List[Dict]) -> List[Dict]:
|
|
chinese_results = [r for r in ocr_results
|
|
if any('\u4e00' <= ch <= '\u9fff' for ch in r['text'])]
|
|
self._log(f"중국어 텍스트 {len(chinese_results)}개 필터링 완료", level=logging.INFO)
|
|
return chinese_results
|
|
|
|
def filter_korean_text(self, ocr_results: List[Dict]) -> List[Dict]:
|
|
korean_results = [r for r in ocr_results
|
|
if any('\uac00' <= ch <= '\ud7a3' for ch in r['text'])]
|
|
self._log(f"한글 텍스트 {len(korean_results)}개 필터링 완료", level=logging.INFO)
|
|
return korean_results
|
|
|
|
# ------------------------ 후처리 메서드들 ------------------------
|
|
def _detect_with_polygon(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
|
|
out = []
|
|
for line in ocr_raw_results:
|
|
if len(line) < 2:
|
|
continue
|
|
poly_raw = line[0]
|
|
text, conf = line[1]
|
|
try:
|
|
polygon_np = self._normalize_poly(poly_raw)
|
|
except Exception as e:
|
|
self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING)
|
|
continue
|
|
|
|
x, y, w, h = cv2.boundingRect(polygon_np)
|
|
out.append({
|
|
'text': text,
|
|
'confidence': float(conf),
|
|
'polygon': polygon_np.tolist(),
|
|
'bbox': (int(x), int(y), int(w), int(h)),
|
|
'method': 'polygon'
|
|
})
|
|
return out
|
|
|
|
def _detect_with_bbox(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
|
|
out = []
|
|
for line in ocr_raw_results:
|
|
if len(line) < 2:
|
|
continue
|
|
poly_raw = line[0]
|
|
text, conf = line[1]
|
|
try:
|
|
polygon_np = self._normalize_poly(poly_raw)
|
|
except Exception as e:
|
|
self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING)
|
|
continue
|
|
|
|
x, y, w, h = cv2.boundingRect(polygon_np)
|
|
bbox_polygon = [[x, y], [x + w, y], [x + w, y + h], [x, y + h]]
|
|
out.append({
|
|
'text': text,
|
|
'confidence': float(conf),
|
|
'polygon': bbox_polygon,
|
|
'bbox': (int(x), int(y), int(w), int(h)),
|
|
'method': 'bbox'
|
|
})
|
|
return out
|
|
|
|
def _detect_with_expanded_bbox(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
|
|
out = []
|
|
h_img, w_img = image.shape[:2]
|
|
for line in ocr_raw_results:
|
|
if len(line) < 2:
|
|
continue
|
|
poly_raw = line[0]
|
|
text, conf = line[1]
|
|
try:
|
|
polygon_np = self._normalize_poly(poly_raw)
|
|
except Exception as e:
|
|
self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING)
|
|
continue
|
|
|
|
x, y, w, h = cv2.boundingRect(polygon_np)
|
|
|
|
expand_x = max(1, int(w * 0.2))
|
|
expand_y = max(1, int(h * 0.2))
|
|
|
|
x_exp = max(0, x - expand_x)
|
|
y_exp = max(0, y - expand_y)
|
|
w_exp = min(w_img - x_exp, w + 2 * expand_x)
|
|
h_exp = min(h_img - y_exp, h + 2 * expand_y)
|
|
|
|
expanded_polygon = [
|
|
[x_exp, y_exp],
|
|
[x_exp + w_exp, y_exp],
|
|
[x_exp + w_exp, y_exp + h_exp],
|
|
[x_exp, y_exp + h_exp]
|
|
]
|
|
out.append({
|
|
'text': text,
|
|
'confidence': float(conf),
|
|
'polygon': expanded_polygon,
|
|
'bbox': (int(x_exp), int(y_exp), int(w_exp), int(h_exp)),
|
|
'method': 'expanded_bbox'
|
|
})
|
|
return out
|
|
|
|
def _detect_with_rotated_bbox(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
|
|
out = []
|
|
for line in ocr_raw_results:
|
|
if len(line) < 2:
|
|
continue
|
|
poly_raw = line[0]
|
|
text, conf = line[1]
|
|
try:
|
|
polygon_np = self._normalize_poly(poly_raw).astype(np.float32)
|
|
except Exception as e:
|
|
self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING)
|
|
continue
|
|
|
|
rect = cv2.minAreaRect(polygon_np)
|
|
box = cv2.boxPoints(rect).astype(np.int32)
|
|
x, y, w, h = cv2.boundingRect(polygon_np.astype(np.int32))
|
|
|
|
out.append({
|
|
'text': text,
|
|
'confidence': float(conf),
|
|
'polygon': box.tolist(),
|
|
'bbox': (int(x), int(y), int(w), int(h)),
|
|
'method': 'rotated_bbox',
|
|
'rotation_info': {
|
|
'center': (float(rect[0][0]), float(rect[0][1])),
|
|
'size': (float(rect[1][0]), float(rect[1][1])),
|
|
'angle': float(rect[2])
|
|
}
|
|
})
|
|
return out
|
|
|
|
def _detect_with_contour(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
|
|
out = []
|
|
for line in ocr_raw_results:
|
|
if len(line) < 2:
|
|
continue
|
|
poly_raw = line[0]
|
|
text, conf = line[1]
|
|
try:
|
|
polygon_np = self._normalize_poly(poly_raw)
|
|
except Exception as e:
|
|
self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING)
|
|
continue
|
|
|
|
epsilon = 0.02 * cv2.arcLength(polygon_np, True)
|
|
approx_contour = cv2.approxPolyDP(polygon_np, epsilon, True)
|
|
contour_polygon = approx_contour.reshape(-1, 2).tolist()
|
|
|
|
x, y, w, h = cv2.boundingRect(polygon_np)
|
|
|
|
out.append({
|
|
'text': text,
|
|
'confidence': float(conf),
|
|
'polygon': contour_polygon,
|
|
'bbox': (int(x), int(y), int(w), int(h)),
|
|
'method': 'contour',
|
|
'contour_points': len(contour_polygon)
|
|
})
|
|
return out
|
|
|
|
# ------------------------ 정리 ------------------------
|
|
def __del__(self):
|
|
try:
|
|
if self.ocr is not None: del self.ocr
|
|
if self.det_model is not None: del self.det_model
|
|
if self.rec_model is not None: del self.rec_model
|
|
if self.cls_model is not None: del self.cls_model
|
|
if self.runtime_option is not None: del self.runtime_option
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
gc.collect()
|