ImageProcessor_MainServer/worker/mask_module_for_paddle.py

210 lines
8.7 KiB
Python

import cv2
import numpy as np
from typing import List, Dict, Any
from shapely.geometry import Polygon
import logging
class MaskModule:
def __init__(self, logger):
self.logger = logger
self.logger.log("마스크 모듈 초기화 완료", level=logging.INFO)
def create_masks(self, image_path: str, ocr_results: List[Dict], expansion_size: int = 6, blur_size: int = 7, mask_option: str = "basic") -> np.ndarray:
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"이미지를 읽을 수 없습니다: {image_path}", level=logging.ERROR)
return None
height, width = image.shape[:2]
mask = np.zeros((height, width), dtype=np.uint8)
for i, result in enumerate(ocr_results, 1):
polygon = result['polygon']
expanded_poly = self.expand_polygon(polygon, offset=5)
cv2.fillPoly(mask, [expanded_poly], 255)
processed_mask = self.process_mask(mask, expansion_size, blur_size)
# 디버깅용 마스크 저장 (항상 0과 255만 가지는 표준 흑백 마스크로 저장)
try:
import os
base_dir = os.path.dirname(image_path)
base_name = os.path.splitext(os.path.basename(image_path))[0]
debug_mask_path = os.path.join(base_dir, f"debug_mask_{base_name}.png")
# 마스크가 0~255 사이의 값이 섞여 있을 수 있으니, 128 기준으로 이진화
mask_to_save = ((processed_mask > 128) * 255).astype('uint8')
cv2.imwrite(debug_mask_path, mask_to_save)
self.logger.log(f"디버깅용 마스크 저장: {debug_mask_path}", level=20)
except Exception as e:
self.logger.log(f"디버깅용 마스크 저장 실패: {e}", level=40)
# return processed_mask
return processed_mask
# ========== MIGAN 전용: 파일 경로 입력 ==========
def create_masks_migan(self,
image_path: str,
ocr_results: List[Dict],
expansion_size: int = 6,
min_component_area: int = 32) -> np.ndarray:
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"이미지를 읽을 수 없습니다: {image_path}", level=logging.ERROR)
return None
return self.create_masks_migan_np(image, ocr_results,
expansion_size=expansion_size,
min_component_area=min_component_area)
# ========== NEW: ndarray 직접 마스크 ==========
def create_masks_np(
self,
image: "np.ndarray",
ocr_results: List[Dict],
expansion_size: int = 6,
blur_size: int = 7,
mask_option: str = "basic",
# 🔥 ROI 전용 옵션 추가
for_roi_processing: bool = False
) -> "np.ndarray | None":
"""
BGR ndarray와 OCR 결과를 직접 받아 마스크 np.ndarray 반환
(디스크 I/O 없음)
Args:
for_roi_processing: True면 순수 마스크만 생성 (후처리 없음)
"""
if image is None or image.size == 0:
self.logger.error("ndarray 이미지가 비었습니다.")
return None
h, w = image.shape[:2]
mask = np.zeros((h, w), dtype=np.uint8)
for res in ocr_results:
poly = res.get("polygon")
if not poly:
continue
# 🔥 ROI 처리용이면 적절한 확장 적용 (후처리 없는 대신 좀 더 확장)
if for_roi_processing:
expanded = self.expand_polygon(poly, offset=8) # 3 → 8로 증가
else:
expanded = self.expand_polygon(poly, offset=5)
cv2.fillPoly(mask, [expanded], 255)
# 🔥 ROI 처리용이면 최소한의 후처리만 적용
if for_roi_processing:
# 🔥 강화된 후처리: 텍스트 잔상 방지
kernel_small = np.ones((3, 3), np.uint8)
kernel_large = np.ones((5, 5), np.uint8)
# 1단계: 작은 노이즈 제거
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel_small)
# 2단계: 텍스트 경계 완전 커버 (강화된 팽창)
mask = cv2.dilate(mask, kernel_large, iterations=1)
# 3단계: 추가 안전 마진
mask = cv2.dilate(mask, kernel_small, iterations=1)
self.logger.log("🔧 ROI용 강화 마스크 생성 (잔상 방지 처리)", level=logging.INFO)
return mask
# 기존 방식 (풀프레임용)
processed_mask = self.process_mask(mask, expansion_size, blur_size)
return processed_mask
# ========== NEW: MIGAN 전용 ndarray 마스크 ==========
def create_masks_migan_np(
self,
image: "np.ndarray",
ocr_results: List[Dict],
expansion_size: int = 6,
min_component_area: int = 32,
dilate_iters: int = 1,
enable_closing: bool = True,
closing_kernel: int = 3,
bridge_h_px: int = 0,
bridge_v_px: int = 0
) -> "np.ndarray | None":
"""
MIGAN에 최적화된 마스크 생성:
- 하드 바이너리(0/255) 마스크
- 블러 없음 (경계 페더링은 MIGAN 내부 처리에 위임)
- 소형 노이즈 제거, 보수적 팽창(dilate)
"""
if image is None or image.size == 0:
self.logger.error("ndarray 이미지가 비었습니다.")
return None
h, w = image.shape[:2]
mask = np.zeros((h, w), dtype=np.uint8)
# 폴리곤 확장 후 채우기 (보수적 여유)
for res in ocr_results:
poly = res.get("polygon")
if not poly:
continue
expanded = self.expand_polygon(poly, offset=max(3, expansion_size))
cv2.fillPoly(mask, [expanded], 255)
# 소형 컴포넌트 제거
try:
num, labels, stats, _ = cv2.connectedComponentsWithStats((mask > 0).astype(np.uint8), connectivity=8)
cleaned = np.zeros_like(mask)
for i in range(1, num):
area = stats[i, cv2.CC_STAT_AREA]
if area >= max(16, min_component_area):
cleaned[labels == i] = 255
mask = cleaned
except Exception:
# 연결 성분 계산 실패 시 원본 유지
pass
# 보수적 팽창 (경계 이음새 방지), 블러는 적용하지 않음
if expansion_size > 0:
k = max(3, min(13, (expansion_size if isinstance(expansion_size, int) else 7) | 1)) # 홀수 보장, 상한 13
kernel = np.ones((k, k), np.uint8)
iters = max(1, min(3, int(dilate_iters)))
mask = cv2.dilate(mask, kernel, iterations=iters)
# 경계 미세 구멍 제거(클로징)
if enable_closing:
ck = max(3, min(9, (closing_kernel if isinstance(closing_kernel, int) else 3) | 1))
cker = np.ones((ck, ck), np.uint8)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, cker)
# 인접 텍스트 간 브릿지(선택): 수평/수직 방향으로만 연결 강화
try:
if isinstance(bridge_h_px, (int, str)) and int(bridge_h_px) > 0:
bh = max(5, min(31, int(bridge_h_px) | 1))
hk = cv2.getStructuringElement(cv2.MORPH_RECT, (bh, 3))
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, hk)
if isinstance(bridge_v_px, (int, str)) and int(bridge_v_px) > 0:
bv = max(5, min(31, int(bridge_v_px) | 1))
vk = cv2.getStructuringElement(cv2.MORPH_RECT, (3, bv))
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, vk)
except Exception:
pass
# 최종 이진화 보장
mask = ((mask > 0) * 255).astype(np.uint8)
return mask
def expand_polygon(self, polygon, offset=15):
poly = Polygon(polygon)
expanded = poly.buffer(offset)
if expanded.is_empty:
return np.array(polygon, dtype=np.int32)
return np.array(expanded.exterior.coords, dtype=np.int32)
def process_mask(self, mask: np.ndarray, expansion_size: int = 5, blur_size: int = 3) -> np.ndarray:
processed_mask = mask.copy()
if expansion_size > 0:
kernel = np.ones((expansion_size, expansion_size), np.uint8)
processed_mask = cv2.dilate(processed_mask, kernel, iterations=1)
if blur_size > 0:
blur_size = blur_size if blur_size % 2 == 1 else blur_size + 1
processed_mask = cv2.GaussianBlur(processed_mask, (blur_size, blur_size), 0)
return processed_mask