210 lines
8.7 KiB
Python
210 lines
8.7 KiB
Python
|
|
import cv2
|
|
import numpy as np
|
|
from typing import List, Dict, Any
|
|
from shapely.geometry import Polygon
|
|
import logging
|
|
|
|
class MaskModule:
|
|
def __init__(self, logger):
|
|
self.logger = logger
|
|
self.logger.log("마스크 모듈 초기화 완료", level=logging.INFO)
|
|
|
|
def create_masks(self, image_path: str, ocr_results: List[Dict], expansion_size: int = 6, blur_size: int = 7, mask_option: str = "basic") -> np.ndarray:
|
|
image = cv2.imread(image_path)
|
|
if image is None:
|
|
self.logger.log(f"이미지를 읽을 수 없습니다: {image_path}", level=logging.ERROR)
|
|
return None
|
|
height, width = image.shape[:2]
|
|
mask = np.zeros((height, width), dtype=np.uint8)
|
|
for i, result in enumerate(ocr_results, 1):
|
|
polygon = result['polygon']
|
|
expanded_poly = self.expand_polygon(polygon, offset=5)
|
|
cv2.fillPoly(mask, [expanded_poly], 255)
|
|
processed_mask = self.process_mask(mask, expansion_size, blur_size)
|
|
|
|
# 디버깅용 마스크 저장 (항상 0과 255만 가지는 표준 흑백 마스크로 저장)
|
|
try:
|
|
import os
|
|
base_dir = os.path.dirname(image_path)
|
|
base_name = os.path.splitext(os.path.basename(image_path))[0]
|
|
debug_mask_path = os.path.join(base_dir, f"debug_mask_{base_name}.png")
|
|
# 마스크가 0~255 사이의 값이 섞여 있을 수 있으니, 128 기준으로 이진화
|
|
mask_to_save = ((processed_mask > 128) * 255).astype('uint8')
|
|
cv2.imwrite(debug_mask_path, mask_to_save)
|
|
self.logger.log(f"디버깅용 마스크 저장: {debug_mask_path}", level=20)
|
|
except Exception as e:
|
|
self.logger.log(f"디버깅용 마스크 저장 실패: {e}", level=40)
|
|
# return processed_mask
|
|
|
|
return processed_mask
|
|
|
|
# ========== MIGAN 전용: 파일 경로 입력 ==========
|
|
def create_masks_migan(self,
|
|
image_path: str,
|
|
ocr_results: List[Dict],
|
|
expansion_size: int = 6,
|
|
min_component_area: int = 32) -> np.ndarray:
|
|
image = cv2.imread(image_path)
|
|
if image is None:
|
|
self.logger.log(f"이미지를 읽을 수 없습니다: {image_path}", level=logging.ERROR)
|
|
return None
|
|
return self.create_masks_migan_np(image, ocr_results,
|
|
expansion_size=expansion_size,
|
|
min_component_area=min_component_area)
|
|
|
|
# ========== NEW: ndarray 직접 마스크 ==========
|
|
def create_masks_np(
|
|
self,
|
|
image: "np.ndarray",
|
|
ocr_results: List[Dict],
|
|
expansion_size: int = 6,
|
|
blur_size: int = 7,
|
|
mask_option: str = "basic",
|
|
# 🔥 ROI 전용 옵션 추가
|
|
for_roi_processing: bool = False
|
|
) -> "np.ndarray | None":
|
|
"""
|
|
BGR ndarray와 OCR 결과를 직접 받아 마스크 np.ndarray 반환
|
|
(디스크 I/O 없음)
|
|
|
|
Args:
|
|
for_roi_processing: True면 순수 마스크만 생성 (후처리 없음)
|
|
"""
|
|
if image is None or image.size == 0:
|
|
self.logger.error("ndarray 이미지가 비었습니다.")
|
|
return None
|
|
|
|
h, w = image.shape[:2]
|
|
mask = np.zeros((h, w), dtype=np.uint8)
|
|
|
|
for res in ocr_results:
|
|
poly = res.get("polygon")
|
|
if not poly:
|
|
continue
|
|
# 🔥 ROI 처리용이면 적절한 확장 적용 (후처리 없는 대신 좀 더 확장)
|
|
if for_roi_processing:
|
|
expanded = self.expand_polygon(poly, offset=8) # 3 → 8로 증가
|
|
else:
|
|
expanded = self.expand_polygon(poly, offset=5)
|
|
cv2.fillPoly(mask, [expanded], 255)
|
|
|
|
# 🔥 ROI 처리용이면 최소한의 후처리만 적용
|
|
if for_roi_processing:
|
|
# 🔥 강화된 후처리: 텍스트 잔상 방지
|
|
kernel_small = np.ones((3, 3), np.uint8)
|
|
kernel_large = np.ones((5, 5), np.uint8)
|
|
|
|
# 1단계: 작은 노이즈 제거
|
|
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel_small)
|
|
|
|
# 2단계: 텍스트 경계 완전 커버 (강화된 팽창)
|
|
mask = cv2.dilate(mask, kernel_large, iterations=1)
|
|
|
|
# 3단계: 추가 안전 마진
|
|
mask = cv2.dilate(mask, kernel_small, iterations=1)
|
|
|
|
self.logger.log("🔧 ROI용 강화 마스크 생성 (잔상 방지 처리)", level=logging.INFO)
|
|
return mask
|
|
|
|
# 기존 방식 (풀프레임용)
|
|
processed_mask = self.process_mask(mask, expansion_size, blur_size)
|
|
return processed_mask
|
|
|
|
# ========== NEW: MIGAN 전용 ndarray 마스크 ==========
|
|
def create_masks_migan_np(
|
|
self,
|
|
image: "np.ndarray",
|
|
ocr_results: List[Dict],
|
|
expansion_size: int = 6,
|
|
min_component_area: int = 32,
|
|
dilate_iters: int = 1,
|
|
enable_closing: bool = True,
|
|
closing_kernel: int = 3,
|
|
bridge_h_px: int = 0,
|
|
bridge_v_px: int = 0
|
|
) -> "np.ndarray | None":
|
|
"""
|
|
MIGAN에 최적화된 마스크 생성:
|
|
- 하드 바이너리(0/255) 마스크
|
|
- 블러 없음 (경계 페더링은 MIGAN 내부 처리에 위임)
|
|
- 소형 노이즈 제거, 보수적 팽창(dilate)
|
|
"""
|
|
if image is None or image.size == 0:
|
|
self.logger.error("ndarray 이미지가 비었습니다.")
|
|
return None
|
|
|
|
h, w = image.shape[:2]
|
|
mask = np.zeros((h, w), dtype=np.uint8)
|
|
|
|
# 폴리곤 확장 후 채우기 (보수적 여유)
|
|
for res in ocr_results:
|
|
poly = res.get("polygon")
|
|
if not poly:
|
|
continue
|
|
expanded = self.expand_polygon(poly, offset=max(3, expansion_size))
|
|
cv2.fillPoly(mask, [expanded], 255)
|
|
|
|
# 소형 컴포넌트 제거
|
|
try:
|
|
num, labels, stats, _ = cv2.connectedComponentsWithStats((mask > 0).astype(np.uint8), connectivity=8)
|
|
cleaned = np.zeros_like(mask)
|
|
for i in range(1, num):
|
|
area = stats[i, cv2.CC_STAT_AREA]
|
|
if area >= max(16, min_component_area):
|
|
cleaned[labels == i] = 255
|
|
mask = cleaned
|
|
except Exception:
|
|
# 연결 성분 계산 실패 시 원본 유지
|
|
pass
|
|
|
|
# 보수적 팽창 (경계 이음새 방지), 블러는 적용하지 않음
|
|
if expansion_size > 0:
|
|
k = max(3, min(13, (expansion_size if isinstance(expansion_size, int) else 7) | 1)) # 홀수 보장, 상한 13
|
|
kernel = np.ones((k, k), np.uint8)
|
|
iters = max(1, min(3, int(dilate_iters)))
|
|
mask = cv2.dilate(mask, kernel, iterations=iters)
|
|
|
|
# 경계 미세 구멍 제거(클로징)
|
|
if enable_closing:
|
|
ck = max(3, min(9, (closing_kernel if isinstance(closing_kernel, int) else 3) | 1))
|
|
cker = np.ones((ck, ck), np.uint8)
|
|
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, cker)
|
|
|
|
# 인접 텍스트 간 브릿지(선택): 수평/수직 방향으로만 연결 강화
|
|
try:
|
|
if isinstance(bridge_h_px, (int, str)) and int(bridge_h_px) > 0:
|
|
bh = max(5, min(31, int(bridge_h_px) | 1))
|
|
hk = cv2.getStructuringElement(cv2.MORPH_RECT, (bh, 3))
|
|
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, hk)
|
|
if isinstance(bridge_v_px, (int, str)) and int(bridge_v_px) > 0:
|
|
bv = max(5, min(31, int(bridge_v_px) | 1))
|
|
vk = cv2.getStructuringElement(cv2.MORPH_RECT, (3, bv))
|
|
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, vk)
|
|
except Exception:
|
|
pass
|
|
|
|
# 최종 이진화 보장
|
|
mask = ((mask > 0) * 255).astype(np.uint8)
|
|
return mask
|
|
|
|
def expand_polygon(self, polygon, offset=15):
|
|
poly = Polygon(polygon)
|
|
expanded = poly.buffer(offset)
|
|
if expanded.is_empty:
|
|
return np.array(polygon, dtype=np.int32)
|
|
return np.array(expanded.exterior.coords, dtype=np.int32)
|
|
|
|
def process_mask(self, mask: np.ndarray, expansion_size: int = 5, blur_size: int = 3) -> np.ndarray:
|
|
processed_mask = mask.copy()
|
|
if expansion_size > 0:
|
|
kernel = np.ones((expansion_size, expansion_size), np.uint8)
|
|
processed_mask = cv2.dilate(processed_mask, kernel, iterations=1)
|
|
if blur_size > 0:
|
|
blur_size = blur_size if blur_size % 2 == 1 else blur_size + 1
|
|
processed_mask = cv2.GaussianBlur(processed_mask, (blur_size, blur_size), 0)
|
|
return processed_mask
|
|
|
|
|
|
|