ImageProcessor_MainServer/worker/ocr_module.py

645 lines
27 KiB
Python

# -*- coding: utf-8 -*-
"""
FastDeploy 기반 PaddleOCRv3 모듈
- 세그멘테이션 오류 방지: 모델 객체를 self 속성에 유지
- 폴리곤 데이터 정규화(reshape)로 OpenCV 오류 해결
"""
import os
import ctypes
import cv2
import gc
import logging
import fastdeploy as fd
import numpy as np
from typing import List, Dict, Any, Optional
class _NullLogger:
def debug(self, *a, **k): pass
def info(self, *a, **k): pass
def warning(self, *a, **k): pass
def error(self, *a, **k): pass
def log(self, msg, level=logging.INFO, exc_info=False):
if level >= logging.ERROR:
print("[ERROR]", msg)
elif level >= logging.WARNING:
print("[WARN]", msg)
else:
print("[INFO]", msg)
if exc_info:
import traceback
traceback.print_exc()
class OCRModule:
def __init__(self, logger: Optional[logging.Logger] = None,
base_dir: Optional[str] = None,
gpu_id: int = 0):
self.logger = logger if logger is not None else _NullLogger()
self.base_dir = base_dir or os.getcwd()
self.gpu_id = gpu_id
# FastDeploy 객체 참조 유지
self.runtime_option: Optional[fd.RuntimeOption] = None
self.det_model = None
self.rec_model = None
self.cls_model = None
self.ocr = None
# 모델 경로
self.rec_model_dir = os.path.join(self.base_dir, "worker", "modules", "PP_Models", "rec")
self.det_model_dir = os.path.join(self.base_dir, "worker", "modules", "PP_Models", "det")
self.cls_model_dir = os.path.join(self.base_dir, "worker", "modules", "PP_Models", "cls")
self._initialize_ocr()
# ------------------------ 내부 공통 로그 ------------------------
def _log(self, msg, level=logging.INFO, exc_info=False):
if hasattr(self.logger, "log"):
self.logger.log(msg, level=level, exc_info=exc_info)
else:
if level >= logging.ERROR and hasattr(self.logger, "error"):
self.logger.error(msg, exc_info=exc_info)
elif level >= logging.WARNING and hasattr(self.logger, "warning"):
self.logger.warning(msg)
elif hasattr(self.logger, "info"):
self.logger.info(msg)
else:
print(msg)
if exc_info:
import traceback
traceback.print_exc()
def _cuda_diag(self):
cudart = ctypes.util.find_library("cudart")
cudnn = ctypes.util.find_library("cudnn")
self._log(f"[CUDA-CHECK] cudart={cudart} cudnn={cudnn} "
f"LD_LIBRARY_PATH={os.environ.get('LD_LIBRARY_PATH','')}", level=logging.INFO)
# ------------------------ OCR 초기화 ------------------------
# def _initialize_ocr(self):
# def _make_opt_trt_safe(cache_path: str, set_shapes_cb):
# opt = fd.RuntimeOption()
# opt.use_gpu(self.gpu_id)
# # Paddle Inference 백엔드 + TensorRT 통합 활성화
# opt.use_paddle_infer_backend()
# opt.paddle_infer_option.enable_trt = True
# # (선택) TRT shape 자동 수집: 첫 실행 느림, 이후 cache 사용 빨라짐
# # opt.enable_paddle_trt_collect_shape()
# opt.use_trt_backend() # TRT 백엔드 사용
# opt.enable_trt_fp16() # FP16 활성화 trt_option.enable_fp16 = True
# opt.set_trt_cache_file(cache_path) # 엔진 캐시 파일 경로
# opt.set_trt_max_workspace_size(512 << 20) # 512MB # 1GB 정도, 필요시 줄이기
# # 입력 동적 shape 범위 지정 (모델별로 다름)
# try:
# set_shapes_cb(opt)
# except Exception as e:
# self._log(f"[OCR][TRT] set_shape 실패(무시): {e}", level=logging.WARNING)
# return opt
# try:
# # ───────────────── DETECTOR ─────────────────
# det_model_file = os.path.join(self.det_model_dir, "inference.pdmodel")
# det_params_file = os.path.join(self.det_model_dir, "inference.pdiparams")
# def _det_shapes(opt):
# # DBDetector: [N,3,H,W]
# opt.set_trt_input_shape(
# "x",
# min_shape=[1, 3, 320, 320],
# opt_shape=[1, 3, 960, 960],
# max_shape=[1, 3, 1280, 1280],
# )
# det_opt = _make_opt_trt_safe("/app/trt_cache/ocr_det.trt", _det_shapes)
# try:
# self.det_model = fd.vision.ocr.DBDetector(
# det_model_file, det_params_file, runtime_option=det_opt
# )
# except Exception as e:
# self._log(f"[OCR][DET] TRT 경로 실패 → PDINFER 폴백: {e}", level=logging.WARNING)
# det_opt2 = fd.RuntimeOption(); det_opt2.use_gpu(self.gpu_id); det_opt2.use_paddle_infer_backend()
# self.det_model = fd.vision.ocr.DBDetector(
# det_model_file, det_params_file, runtime_option=det_opt2
# )
# # ───────────────── RECOGNIZER ─────────────────
# rec_model_file = os.path.join(self.rec_model_dir, "inference.pdmodel")
# rec_params_file = os.path.join(self.rec_model_dir, "inference.pdiparams")
# rec_label_file = None
# for cand in ("ppocr_keys_v1.txt", "dict.txt"):
# p = os.path.join(self.rec_model_dir, cand)
# if os.path.isfile(p):
# rec_label_file = p; break
# if rec_label_file is None:
# raise FileNotFoundError("Recognition label(dict) 파일을 찾을 수 없습니다.")
# def _rec_shapes(opt):
# # CRNN류: [N,3,H,W] 가로 긴 입력, 프로젝트 상황에 맞춰 조정
# opt.set_trt_input_shape(
# "x",
# min_shape=[1, 3, 48, 10],
# opt_shape=[1, 3, 48, 320],
# max_shape=[1, 3, 48, 2304],
# )
# rec_opt = _make_opt_trt_safe("/app/trt_cache/ocr_rec.trt", _rec_shapes)
# try:
# self.rec_model = fd.vision.ocr.Recognizer(
# rec_model_file, rec_params_file, rec_label_file,
# runtime_option=rec_opt
# )
# except Exception as e:
# self._log(f"[OCR][REC] TRT 경로 실패 → PDINFER 폴백: {e}", level=logging.WARNING)
# rec_opt2 = fd.RuntimeOption(); rec_opt2.use_gpu(self.gpu_id); rec_opt2.use_paddle_infer_backend()
# self.rec_model = fd.vision.ocr.Recognizer(
# rec_model_file, rec_params_file, rec_label_file,
# runtime_option=rec_opt2
# )
# # ───────────────── CLASSIFIER(optional) ─────────────────
# cls_model_file = os.path.join(self.cls_model_dir, "inference.pdmodel")
# cls_params_file = os.path.join(self.cls_model_dir, "inference.pdiparams")
# if os.path.isfile(cls_model_file) and os.path.isfile(cls_params_file):
# def _cls_shapes(opt):
# opt.set_trt_input_shape(
# "x",
# min_shape=[1, 3, 48, 10],
# opt_shape=[1, 3, 48, 192],
# max_shape=[1, 3, 48, 960],
# )
# cls_opt = _make_opt_trt_safe("/app/trt_cache/ocr_cls.trt", _cls_shapes)
# try:
# self.cls_model = fd.vision.ocr.Classifier(
# cls_model_file, cls_params_file, runtime_option=cls_opt
# )
# except Exception as e:
# self._log(f"[OCR][CLS] TRT 경로 실패 → PDINFER 폴백: {e}", level=logging.WARNING)
# cls_opt2 = fd.RuntimeOption(); cls_opt2.use_gpu(self.gpu_id); cls_opt2.use_paddle_infer_backend()
# self.cls_model = fd.vision.ocr.Classifier(
# cls_model_file, cls_params_file, runtime_option=cls_opt2
# )
# else:
# self.cls_model = None
# self.ocr = fd.vision.ocr.PPOCRv3(self.det_model, self.cls_model, self.rec_model)
# self._log("✅ FastDeploy PPOCRv3 시스템 초기화 완료", level=logging.INFO)
# except Exception as e:
# self._log(f"❌ FastDeploy OCR 초기화 실패: {e}", level=logging.ERROR, exc_info=True)
# raise
def _initialize_ocr(self):
"""
FastDeploy + Paddle Inference(TRT) 구성
- DET/REC/CLS 각각 TensorRT 동적 shape 프로파일 지정
- FP16, pinned memory, 엔진 캐시(serialize_file) 활성화
- 배치 N=1 전제 (파이프라인에서 배치 넣지 않기!)
"""
import os
import fastdeploy as fd
try:
# ── 경로 준비
trt_cache_dir = "/app/trt_cache"
os.makedirs(trt_cache_dir, exist_ok=True)
det_model_file = os.path.join(self.det_model_dir, "inference.pdmodel")
det_params_file = os.path.join(self.det_model_dir, "inference.pdiparams")
rec_model_file = os.path.join(self.rec_model_dir, "inference.pdmodel")
rec_params_file = os.path.join(self.rec_model_dir, "inference.pdiparams")
cls_model_file = os.path.join(self.cls_model_dir, "inference.pdmodel")
cls_params_file = os.path.join(self.cls_model_dir, "inference.pdiparams")
# rec label(dict)
rec_label_file = None
for cand in ("ppocr_keys_v1.txt", "dict.txt"):
p = os.path.join(self.rec_model_dir, cand)
if os.path.isfile(p):
rec_label_file = p
break
if rec_label_file is None:
raise FileNotFoundError("Recognition label(dict) 파일을 찾을 수 없습니다.")
# ───────────────────────── DET 설정 (H,W 동적)
det_opt = fd.RuntimeOption()
det_opt.use_gpu(self.gpu_id)
det_opt.use_paddle_backend() # Paddle Inference + TRT
det_opt.enable_pinned_memory()
det_opt.trt_option.enable_fp16 = True
det_opt.trt_option.max_batch_size = 1
det_opt.trt_option.serialize_file = os.path.join(trt_cache_dir, "ocr_det.trt")
# 전처리에서 긴 변 960~1280 기준 권장 프로파일
det_opt.trt_option.set_shape(
"x",
[1, 3, 320, 320], # min
[1, 3, 960, 960], # opt (전처리 목표)
[1, 3, 1280, 1280], # max
)
# ───────────────────────── REC 설정 (H=48 고정, W만 동적)
rec_opt = fd.RuntimeOption()
rec_opt.use_gpu(self.gpu_id)
rec_opt.use_paddle_backend()
rec_opt.enable_pinned_memory()
rec_opt.trt_option.enable_fp16 = True
rec_opt.trt_option.max_batch_size = 1
rec_opt.trt_option.serialize_file = os.path.join(trt_cache_dir, "ocr_rec.trt")
rec_opt.trt_option.set_shape(
"x",
[1, 3, 48, 10], # min (아주 짧은 토큰)
[1, 3, 48, 320], # opt (일반 문구)
[1, 3, 48, 2304], # max (가로 롤배너 등 긴 문장 대비)
)
# ───────────────────────── CLS 설정 (있을 때)
cls_exists = os.path.isfile(cls_model_file) and os.path.isfile(cls_params_file)
cls_opt = None
if cls_exists:
cls_opt = fd.RuntimeOption()
cls_opt.use_gpu(self.gpu_id)
cls_opt.use_paddle_backend()
cls_opt.enable_pinned_memory()
cls_opt.trt_option.enable_fp16 = True
cls_opt.trt_option.max_batch_size = 1
cls_opt.trt_option.serialize_file = os.path.join(trt_cache_dir, "ocr_cls.trt")
cls_opt.trt_option.set_shape(
"x",
[1, 3, 48, 10],
[1, 3, 48, 192],
[1, 3, 48, 960],
)
# ───────────────────────── 모델 로드
self.det_model = fd.vision.ocr.DBDetector(
det_model_file, det_params_file, runtime_option=det_opt
)
self.rec_model = fd.vision.ocr.Recognizer(
rec_model_file, rec_params_file, rec_label_file, runtime_option=rec_opt
)
self.cls_model = (
fd.vision.ocr.Classifier(cls_model_file, cls_params_file, runtime_option=cls_opt)
if cls_exists else None
)
self.ocr = fd.vision.ocr.PPOCRv3(self.det_model, self.cls_model, self.rec_model)
self._log("✅ FastDeploy PPOCRv3(TRT FP16, cached) 초기화 완료", level=logging.INFO)
except Exception as e:
self._log(f"❌ FastDeploy OCR 초기화 실패: {e}", level=logging.ERROR, exc_info=True)
raise
# ------------------------ 보조: 폴리곤 정규화 ------------------------
@staticmethod
def _normalize_poly(poly) -> np.ndarray:
"""
poly가 아래 중 어떤 형태여도 (N,2)의 int32 ndarray로 변환:
- [x1,y1,x2,y2,x3,y3,x4,y4]
- [[x1,y1],[x2,y2],...]
- numpy array 등
"""
arr = np.asarray(poly, dtype=np.int32)
if arr.ndim == 1:
# 1차원 → (N,2)
if arr.size % 2 != 0:
raise ValueError(f"폴리곤 좌표 수가 짝수가 아닙니다: {arr.size}")
arr = arr.reshape(-1, 2)
elif arr.ndim >= 2:
# (N,2) or (1,N,2)등 → (N,2)
if arr.shape[-1] != 2:
arr = arr.reshape(-1, 2)
return arr
# ------------------------ 메인 OCR ------------------------
def detect_text(self, image_path: str, method: str = 'polygon') -> List[Dict[str, Any]]:
if not os.path.exists(image_path):
self._log(f"이미지 파일을 찾을 수 없습니다: {image_path}", level=logging.ERROR)
return []
try:
image = cv2.imread(image_path)
if image is None:
self._log(f"이미지를 읽을 수 없습니다: {image_path}", level=logging.ERROR)
return []
self._log(f"🔍 OCR 감지 방식: {method}", level=logging.INFO)
fd_result = self.ocr.predict(image)
if fd_result is None or len(fd_result.text) == 0:
self._log("⚠️ OCR 결과가 비어있습니다.", level=logging.WARNING)
return []
# 안전 파싱
boxes = getattr(fd_result, "boxes", getattr(fd_result, "det_boxes", None))
texts = getattr(fd_result, "text", [])
rec_scores = getattr(fd_result, "rec_scores", None)
if boxes is None:
raise AttributeError("fd_result에 boxes(또는 det_boxes) 속성이 없습니다.")
ocr_raw_results = []
for i, txt in enumerate(texts):
poly = self._normalize_poly(boxes[i]).tolist()
score = float(rec_scores[i]) if rec_scores is not None else 1.0
ocr_raw_results.append([poly, [txt, score]])
ocr_raw_results = [ocr_raw_results] # 페이지 단위
# PaddleOCR2 스타일 → 내부 공통 구조
converted_results = []
for page in ocr_raw_results:
for line in page:
poly = line[0]
txt = line[1][0]
score = line[1][1]
converted_results.append([poly, [txt, score]])
# 방식별 후처리
if method == 'polygon':
return self._detect_with_polygon(image, converted_results)
elif method == 'bbox':
return self._detect_with_bbox(image, converted_results)
elif method == 'expanded_bbox':
return self._detect_with_expanded_bbox(image, converted_results)
elif method == 'rotated_bbox':
return self._detect_with_rotated_bbox(image, converted_results)
elif method == 'contour':
return self._detect_with_contour(image, converted_results)
else:
self._log(f"⚠️ 지원하지 않는 감지 방식: {method}, 기본 polygon 사용", level=logging.WARNING)
return self._detect_with_polygon(image, converted_results)
except Exception as e:
self._log(f"❌ OCR 처리 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return []
finally:
try:
del image
except Exception:
pass
# ========= NEW: ndarray 직접 처리 =========
def detect_text_ndarray(
self,
image: "np.ndarray",
method: str = "polygon"
) -> List[Dict[str, Any]]:
"""
파일 저장 없이 BGR ndarray를 바로 OCR.
기존 detect_text()를 재사용하기 위해
임시 파일을 만들지 않고 OpenCV 메모리 버퍼만 활용.
"""
MAX_SIDE = 1280 # 상한 (960~1280 추천)
h0, w0 = image.shape[:2]
scale = 1.0
if max(h0, w0) > MAX_SIDE:
scale = MAX_SIDE / float(max(h0, w0))
image_small = cv2.resize(image, (int(w0*scale), int(h0*scale)), interpolation=cv2.INTER_AREA)
else:
image_small = image
if image_small is None or image_small.size == 0:
self._log("ndarray input empty", level=logging.ERROR)
return []
try:
self._log("🔍 OCR(ndarray) 감지 시작", level=logging.INFO)
fd_result = self.ocr.predict(image_small)
except Exception as e:
self._log(f"OCR(ndarray) 실패: {e}", level=logging.ERROR, exc_info=True)
return []
return self._postprocess_fd_result((fd_result, 1.0/scale), image, method)
# ========= 기존 detect_text()에서 공통 후처리 추출 =========
def _postprocess_fd_result(self, fd_pack, image, method):
"""
fd_result를 내부 Dict 리스트로 변환 (detect_text와 공용)
"""
try:
# fd_pack이 (fd_result, inv_scale)인 경우를 지원
if isinstance(fd_pack, tuple):
fd_result, inv_scale = fd_pack
else:
fd_result, inv_scale = fd_pack, 1.0
if fd_result is None:
return []
boxes = getattr(fd_result, "boxes", getattr(fd_result, "det_boxes", None))
texts = getattr(fd_result, "text", [])
rec_scores = getattr(fd_result, "rec_scores", None)
out = []
for i, txt in enumerate(texts):
poly = self._normalize_poly(boxes[i]).astype(np.float32) * inv_scale
poly = poly.astype(np.int32).tolist()
score = float(rec_scores[i]) if rec_scores is not None else 1.0
out.append([poly, [txt, score]])
# 이후 방식별 후처리
if method == "polygon":
return self._detect_with_polygon(image, out)
elif method == "bbox":
return self._detect_with_bbox(image, out)
# … 나머지 방식 동일 …
elif method == 'bbox':
return self._detect_with_bbox(image, out)
elif method == 'expanded_bbox':
return self._detect_with_expanded_bbox(image, out)
elif method == 'rotated_bbox':
return self._detect_with_rotated_bbox(image, out)
elif method == 'contour':
return self._detect_with_contour(image, out)
else:
self._log(f"⚠️ 지원하지 않는 감지 방식: {method}, 기본 polygon 사용", level=logging.WARNING)
return self._detect_with_polygon(image, out)
except Exception as e:
self._log(f"❌ OCR _postprocess_fd_result 처리 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return []
finally:
try:
del image
except Exception:
pass
# ------------------------ 필터링 ------------------------
def filter_chinese_text(self, ocr_results: List[Dict]) -> List[Dict]:
chinese_results = [r for r in ocr_results
if any('\u4e00' <= ch <= '\u9fff' for ch in r['text'])]
self._log(f"중국어 텍스트 {len(chinese_results)}개 필터링 완료", level=logging.INFO)
self._log(f"{chinese_results}", level=logging.DEBUG)
return chinese_results
def filter_korean_text(self, ocr_results: List[Dict]) -> List[Dict]:
korean_results = [r for r in ocr_results
if any('\uac00' <= ch <= '\ud7a3' for ch in r['text'])]
self._log(f"한글 텍스트 {len(korean_results)}개 필터링 완료", level=logging.INFO)
return korean_results
# ------------------------ 후처리 메서드들 ------------------------
def _detect_with_polygon(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
out = []
for line in ocr_raw_results:
if len(line) < 2:
continue
poly_raw = line[0]
text, conf = line[1]
try:
polygon_np = self._normalize_poly(poly_raw)
except Exception as e:
self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING)
continue
x, y, w, h = cv2.boundingRect(polygon_np)
out.append({
'text': text,
'confidence': float(conf),
'polygon': polygon_np.tolist(),
'bbox': (int(x), int(y), int(w), int(h)),
'method': 'polygon'
})
return out
def _detect_with_bbox(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
out = []
for line in ocr_raw_results:
if len(line) < 2:
continue
poly_raw = line[0]
text, conf = line[1]
try:
polygon_np = self._normalize_poly(poly_raw)
except Exception as e:
self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING)
continue
x, y, w, h = cv2.boundingRect(polygon_np)
bbox_polygon = [[x, y], [x + w, y], [x + w, y + h], [x, y + h]]
out.append({
'text': text,
'confidence': float(conf),
'polygon': bbox_polygon,
'bbox': (int(x), int(y), int(w), int(h)),
'method': 'bbox'
})
return out
def _detect_with_expanded_bbox(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
out = []
h_img, w_img = image.shape[:2]
for line in ocr_raw_results:
if len(line) < 2:
continue
poly_raw = line[0]
text, conf = line[1]
try:
polygon_np = self._normalize_poly(poly_raw)
except Exception as e:
self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING)
continue
x, y, w, h = cv2.boundingRect(polygon_np)
expand_x = max(1, int(w * 0.2))
expand_y = max(1, int(h * 0.2))
x_exp = max(0, x - expand_x)
y_exp = max(0, y - expand_y)
w_exp = min(w_img - x_exp, w + 2 * expand_x)
h_exp = min(h_img - y_exp, h + 2 * expand_y)
expanded_polygon = [
[x_exp, y_exp],
[x_exp + w_exp, y_exp],
[x_exp + w_exp, y_exp + h_exp],
[x_exp, y_exp + h_exp]
]
out.append({
'text': text,
'confidence': float(conf),
'polygon': expanded_polygon,
'bbox': (int(x_exp), int(y_exp), int(w_exp), int(h_exp)),
'method': 'expanded_bbox'
})
return out
def _detect_with_rotated_bbox(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
out = []
for line in ocr_raw_results:
if len(line) < 2:
continue
poly_raw = line[0]
text, conf = line[1]
try:
polygon_np = self._normalize_poly(poly_raw).astype(np.float32)
except Exception as e:
self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING)
continue
rect = cv2.minAreaRect(polygon_np)
box = cv2.boxPoints(rect).astype(np.int32)
x, y, w, h = cv2.boundingRect(polygon_np.astype(np.int32))
out.append({
'text': text,
'confidence': float(conf),
'polygon': box.tolist(),
'bbox': (int(x), int(y), int(w), int(h)),
'method': 'rotated_bbox',
'rotation_info': {
'center': (float(rect[0][0]), float(rect[0][1])),
'size': (float(rect[1][0]), float(rect[1][1])),
'angle': float(rect[2])
}
})
return out
def _detect_with_contour(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
out = []
for line in ocr_raw_results:
if len(line) < 2:
continue
poly_raw = line[0]
text, conf = line[1]
try:
polygon_np = self._normalize_poly(poly_raw)
except Exception as e:
self._log(f"폴리곤 정규화 실패: {e}", level=logging.WARNING)
continue
epsilon = 0.02 * cv2.arcLength(polygon_np, True)
approx_contour = cv2.approxPolyDP(polygon_np, epsilon, True)
contour_polygon = approx_contour.reshape(-1, 2).tolist()
x, y, w, h = cv2.boundingRect(polygon_np)
out.append({
'text': text,
'confidence': float(conf),
'polygon': contour_polygon,
'bbox': (int(x), int(y), int(w), int(h)),
'method': 'contour',
'contour_points': len(contour_polygon)
})
return out
# ------------------------ 정리 ------------------------
def __del__(self):
try:
if self.ocr is not None: del self.ocr
if self.det_model is not None: del self.det_model
if self.rec_model is not None: del self.rec_model
if self.cls_model is not None: del self.cls_model
if self.runtime_option is not None: del self.runtime_option
except Exception:
pass
finally:
gc.collect()