import cv2 import numpy as np from typing import List, Dict, Any from shapely.geometry import Polygon import logging class MaskModule: def __init__(self, logger): self.logger = logger self.logger.log("마스크 모듈 초기화 완료", level=logging.INFO) def create_masks(self, image_path: str, ocr_results: List[Dict], expansion_size: int = 6, blur_size: int = 7, mask_option: str = "basic") -> np.ndarray: image = cv2.imread(image_path) if image is None: self.logger.log(f"이미지를 읽을 수 없습니다: {image_path}", level=logging.ERROR) return None height, width = image.shape[:2] mask = np.zeros((height, width), dtype=np.uint8) for i, result in enumerate(ocr_results, 1): polygon = result['polygon'] expanded_poly = self.expand_polygon(polygon, offset=5) cv2.fillPoly(mask, [expanded_poly], 255) processed_mask = self.process_mask(mask, expansion_size, blur_size) # 디버깅용 마스크 저장 (항상 0과 255만 가지는 표준 흑백 마스크로 저장) try: import os base_dir = os.path.dirname(image_path) base_name = os.path.splitext(os.path.basename(image_path))[0] debug_mask_path = os.path.join(base_dir, f"debug_mask_{base_name}.png") # 마스크가 0~255 사이의 값이 섞여 있을 수 있으니, 128 기준으로 이진화 mask_to_save = ((processed_mask > 128) * 255).astype('uint8') cv2.imwrite(debug_mask_path, mask_to_save) self.logger.log(f"디버깅용 마스크 저장: {debug_mask_path}", level=20) except Exception as e: self.logger.log(f"디버깅용 마스크 저장 실패: {e}", level=40) # return processed_mask return processed_mask # ========== MIGAN 전용: 파일 경로 입력 ========== def create_masks_migan(self, image_path: str, ocr_results: List[Dict], expansion_size: int = 6, min_component_area: int = 32) -> np.ndarray: image = cv2.imread(image_path) if image is None: self.logger.log(f"이미지를 읽을 수 없습니다: {image_path}", level=logging.ERROR) return None return self.create_masks_migan_np(image, ocr_results, expansion_size=expansion_size, min_component_area=min_component_area) # ========== NEW: ndarray 직접 마스크 ========== def create_masks_np( self, image: "np.ndarray", ocr_results: List[Dict], expansion_size: int = 6, blur_size: int = 7, mask_option: str = "basic", # 🔥 ROI 전용 옵션 추가 for_roi_processing: bool = False ) -> "np.ndarray | None": """ BGR ndarray와 OCR 결과를 직접 받아 마스크 np.ndarray 반환 (디스크 I/O 없음) Args: for_roi_processing: True면 순수 마스크만 생성 (후처리 없음) """ if image is None or image.size == 0: self.logger.error("ndarray 이미지가 비었습니다.") return None h, w = image.shape[:2] mask = np.zeros((h, w), dtype=np.uint8) for res in ocr_results: poly = res.get("polygon") if not poly: continue # 🔥 ROI 처리용이면 적절한 확장 적용 (후처리 없는 대신 좀 더 확장) if for_roi_processing: expanded = self.expand_polygon(poly, offset=8) # 3 → 8로 증가 else: expanded = self.expand_polygon(poly, offset=5) cv2.fillPoly(mask, [expanded], 255) # 🔥 ROI 처리용이면 최소한의 후처리만 적용 if for_roi_processing: # 🔥 강화된 후처리: 텍스트 잔상 방지 kernel_small = np.ones((3, 3), np.uint8) kernel_large = np.ones((5, 5), np.uint8) # 1단계: 작은 노이즈 제거 mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel_small) # 2단계: 텍스트 경계 완전 커버 (강화된 팽창) mask = cv2.dilate(mask, kernel_large, iterations=1) # 3단계: 추가 안전 마진 mask = cv2.dilate(mask, kernel_small, iterations=1) self.logger.log("🔧 ROI용 강화 마스크 생성 (잔상 방지 처리)", level=logging.INFO) return mask # 기존 방식 (풀프레임용) processed_mask = self.process_mask(mask, expansion_size, blur_size) return processed_mask # ========== NEW: MIGAN 전용 ndarray 마스크 ========== def create_masks_migan_np( self, image: "np.ndarray", ocr_results: List[Dict], expansion_size: int = 6, min_component_area: int = 32, dilate_iters: int = 1, enable_closing: bool = True, closing_kernel: int = 3, bridge_h_px: int = 0, bridge_v_px: int = 0 ) -> "np.ndarray | None": """ MIGAN에 최적화된 마스크 생성: - 하드 바이너리(0/255) 마스크 - 블러 없음 (경계 페더링은 MIGAN 내부 처리에 위임) - 소형 노이즈 제거, 보수적 팽창(dilate) """ if image is None or image.size == 0: self.logger.error("ndarray 이미지가 비었습니다.") return None h, w = image.shape[:2] mask = np.zeros((h, w), dtype=np.uint8) # 폴리곤 확장 후 채우기 (보수적 여유) for res in ocr_results: poly = res.get("polygon") if not poly: continue expanded = self.expand_polygon(poly, offset=max(3, expansion_size)) cv2.fillPoly(mask, [expanded], 255) # 소형 컴포넌트 제거 try: num, labels, stats, _ = cv2.connectedComponentsWithStats((mask > 0).astype(np.uint8), connectivity=8) cleaned = np.zeros_like(mask) for i in range(1, num): area = stats[i, cv2.CC_STAT_AREA] if area >= max(16, min_component_area): cleaned[labels == i] = 255 mask = cleaned except Exception: # 연결 성분 계산 실패 시 원본 유지 pass # 보수적 팽창 (경계 이음새 방지), 블러는 적용하지 않음 if expansion_size > 0: k = max(3, min(13, (expansion_size if isinstance(expansion_size, int) else 7) | 1)) # 홀수 보장, 상한 13 kernel = np.ones((k, k), np.uint8) iters = max(1, min(3, int(dilate_iters))) mask = cv2.dilate(mask, kernel, iterations=iters) # 경계 미세 구멍 제거(클로징) if enable_closing: ck = max(3, min(9, (closing_kernel if isinstance(closing_kernel, int) else 3) | 1)) cker = np.ones((ck, ck), np.uint8) mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, cker) # 인접 텍스트 간 브릿지(선택): 수평/수직 방향으로만 연결 강화 try: if isinstance(bridge_h_px, (int, str)) and int(bridge_h_px) > 0: bh = max(5, min(31, int(bridge_h_px) | 1)) hk = cv2.getStructuringElement(cv2.MORPH_RECT, (bh, 3)) mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, hk) if isinstance(bridge_v_px, (int, str)) and int(bridge_v_px) > 0: bv = max(5, min(31, int(bridge_v_px) | 1)) vk = cv2.getStructuringElement(cv2.MORPH_RECT, (3, bv)) mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, vk) except Exception: pass # 최종 이진화 보장 mask = ((mask > 0) * 255).astype(np.uint8) return mask def expand_polygon(self, polygon, offset=15): poly = Polygon(polygon) expanded = poly.buffer(offset) if expanded.is_empty: return np.array(polygon, dtype=np.int32) return np.array(expanded.exterior.coords, dtype=np.int32) def process_mask(self, mask: np.ndarray, expansion_size: int = 5, blur_size: int = 3) -> np.ndarray: processed_mask = mask.copy() if expansion_size > 0: kernel = np.ones((expansion_size, expansion_size), np.uint8) processed_mask = cv2.dilate(processed_mask, kernel, iterations=1) if blur_size > 0: blur_size = blur_size if blur_size % 2 == 1 else blur_size + 1 processed_mask = cv2.GaussianBlur(processed_mask, (blur_size, blur_size), 0) return processed_mask