# -*- coding: utf-8 -*- """ ROI 기반 인페인팅 모듈 - 마스크 영역을 분석하여 효율적인 ROI 처리 - 인접한 마스크 컴포넌트 자동 병합 - 적응적 처리 전략 (ROI vs 전체 이미지) - 부드러운 블렌딩으로 자연스러운 결합 """ import cv2 import numpy as np import logging import torch import time import gc from typing import List, Tuple, Optional, Any, Dict from simple_lama_inpainting import SimpleLama from concurrent.futures import ThreadPoolExecutor class ROIInpaintingModule: def __init__(self, logger=None): """ ROI 기반 인페인팅 모듈 초기화 Args: logger: 로깅 객체 (None이면 기본 출력) """ self.logger = logger or self._create_default_logger() self.simple_lama = None # 🔥 형상 최적화를 위한 버킷 시스템 self.performance_buckets = { # 일반적인 웹툰/만화 크기들을 64배수로 정규화 (896, 1152): "webtoon_portrait", # 790×1053, 750×917 등 (832, 1024): "webtoon_standard", # 800×800, 790×790 등 (896, 2048): "webtoon_long", # 790×1959, 997×2000 등 (1280, 768): "landscape_wide", # 1242×698 등 (832, 512): "landscape_standard", # 800×450, 790×409 등 (640, 640): "square_small", # 587×587 등 } # 🔥 성능 히스토리 (버킷별 실측 시간 추적) self.bucket_performance_history = {} # 🔥 cuDNN 최적화 설정 self._setup_cudnn_optimization() # 기본 설정값 self.default_config = { 'min_component_area': 100, # 최소 컴포넌트 크기 'merge_distance': 50, # 컴포넌트 병합 거리 'margin_ratio': 0.15, # ROI 여백 비율 (15%) 'large_mask_threshold': 0.5, # 전체 처리 기준 (50%) 'blend_kernel_ratio': 20, # 블렌딩 커널 크기 비율 'max_blend_kernel': 21, # 최대 블렌딩 커널 크기 'min_blend_kernel': 5, # 최소 블렌딩 커널 크기 # 🔥 성능 최적화 설정 'disable_roi_fallback': False, # ROI 폴백 완전 비활성화 (True면 항상 full 전략) # 🔥 이미지 크기 제한 설정 'max_image_size': 2048, # 최대 이미지 크기 (긴 변 기준) 'enable_size_limit': True, # 크기 제한 활성화 'scale_interpolation': cv2.INTER_AREA, # 축소 시 보간법 'upscale_interpolation': cv2.INTER_CUBIC, # 확대 시 보간법 # 🔥 하이브리드 전략 설정 'memory_high_threshold': 1200, # GPU 여유 메모리 고임계값 (MB) 'memory_low_threshold': 600, # GPU 여유 메모리 저임계값 (MB) 'small_image_mp': 1.0, # 작은 이미지 기준 (MP) 'roi_area_high': 0.60, # ROI 면적 고임계값 (60%) 'roi_area_low': 0.30, # ROI 면적 저임계값 (30%) 'roi_count_threshold': 3, # ROI 개수 임계값 # 🔥 극단적 종횡비 최적화 설정 'min_roi_dimension': 128, # ROI 최소 차원 (px) 'max_aspect_ratio': 8.0, # 최대 허용 종횡비 'use_64_alignment': False, # 64배수 정렬 사용 여부 'batch_processing_threshold': 256 * 256, # 작은 ROI 배치 처리 임계값 (px) # 🔥 인페인팅 품질 개선 설정 'mask_dilation_kernel': 3, # 마스크 팽창 커널 크기 'mask_erosion_kernel': 2, # 마스크 침식 커널 크기 'mask_blur_kernel': 5, # 마스크 블러 커널 크기 'enable_mask_refinement': True, # 마스크 정제 활성화 'feather_blend_size': 10, # 부드러운 블렌딩을 위한 페더 크기 'blend_mode': 'advanced', # 'simple' 또는 'advanced' 'context_expansion_ratio': 0.3, # 컨텍스트 확장 비율 (더 넓은 영역으로 인페인팅) } self.logger.log("ROI 인페인팅 모듈 초기화 완료", level=logging.INFO) def _create_default_logger(self): """기본 로거 생성""" class DefaultLogger: def log(self, msg, level=logging.INFO, **kwargs): print(f"[ROI-INPAINT] {msg}") return DefaultLogger() def _get_simple_lama(self): """SimpleLama 인스턴스 지연 로딩""" if self.simple_lama is None: self.simple_lama = SimpleLama() self.logger.log("SimpleLama 인스턴스 생성 완료", level=logging.INFO) return self.simple_lama def find_mask_components(self, mask: np.ndarray, config: Dict[str, Any] = None) -> List[Tuple[int, int, int, int]]: """ 마스크에서 연결된 컴포넌트들을 찾고 바운딩 박스 반환 Args: mask: 이진 마스크 (0 또는 255) config: 설정 딕셔너리 Returns: List of (x1, y1, x2, y2) 바운딩 박스 """ if config is None: config = self.default_config min_area = config.get('min_component_area', self.default_config['min_component_area']) # 연결된 컴포넌트 분석 num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats( mask, connectivity=8 ) components = [] for i in range(1, num_labels): # 0은 배경 area = stats[i, cv2.CC_STAT_AREA] if area < min_area: continue x = stats[i, cv2.CC_STAT_LEFT] y = stats[i, cv2.CC_STAT_TOP] w = stats[i, cv2.CC_STAT_WIDTH] h = stats[i, cv2.CC_STAT_HEIGHT] components.append((x, y, x + w, y + h)) self.logger.log(f"마스크 컴포넌트 {len(components)}개 발견", level=logging.INFO) return components def merge_nearby_components(self, components: List[Tuple[int, int, int, int]], merge_distance: int = None) -> List[Tuple[int, int, int, int]]: """ 인접한 컴포넌트들을 병합 Args: components: 컴포넌트 바운딩 박스 리스트 merge_distance: 병합 거리 임계값 Returns: 병합된 컴포넌트 리스트 """ if merge_distance is None: merge_distance = self.default_config['merge_distance'] if not components: return [] merged = [] used = set() for i, comp1 in enumerate(components): if i in used: continue # 현재 컴포넌트와 병합할 그룹 시작 group = [comp1] used.add(i) # 다른 컴포넌트들과 거리 확인 for j, comp2 in enumerate(components): if j in used: continue # 두 박스간 최소 거리 계산 distance = self._calculate_bbox_distance(comp1, comp2) if distance <= merge_distance: group.append(comp2) used.add(j) # 그룹의 전체 바운딩 박스 계산 if group: xs = [x for x1, y1, x2, y2 in group for x in (x1, x2)] ys = [y for x1, y1, x2, y2 in group for y in (y1, y2)] merged.append((min(xs), min(ys), max(xs), max(ys))) self.logger.log(f"컴포넌트 병합: {len(components)} → {len(merged)}", level=logging.INFO) return merged def _calculate_bbox_distance(self, bbox1: Tuple[int, int, int, int], bbox2: Tuple[int, int, int, int]) -> float: """두 바운딩 박스간 최소 거리 계산""" x1_min, y1_min, x1_max, y1_max = bbox1 x2_min, y2_min, x2_max, y2_max = bbox2 # 박스간 거리 (겹치면 0) dx = max(0, max(x1_min - x2_max, x2_min - x1_max)) dy = max(0, max(y1_min - y2_max, y2_min - y1_max)) return (dx**2 + dy**2)**0.5 def expand_roi(self, bbox: Tuple[int, int, int, int], image_shape: Tuple[int, int], margin_ratio: float = None) -> Tuple[int, int, int, int]: """ ROI에 여백 추가 Args: bbox: 원본 바운딩 박스 (x1, y1, x2, y2) image_shape: 이미지 크기 (H, W) margin_ratio: 여백 비율 Returns: 확장된 바운딩 박스 """ if margin_ratio is None: margin_ratio = self.default_config['margin_ratio'] h, w = image_shape[:2] x1, y1, x2, y2 = bbox # 현재 크기 기준으로 여백 계산 roi_w, roi_h = x2 - x1, y2 - y1 margin_x = int(roi_w * margin_ratio) margin_y = int(roi_h * margin_ratio) # 이미지 경계 내로 제한 x1 = max(0, x1 - margin_x) y1 = max(0, y1 - margin_y) x2 = min(w, x2 + margin_x) y2 = min(h, y2 + margin_y) return (x1, y1, x2, y2) def should_process_full_image(self, components: List[Tuple[int, int, int, int]], image_shape: Tuple[int, int], threshold: float = None) -> bool: """ 전체 이미지 처리 여부 결정 Args: components: 컴포넌트 리스트 image_shape: 이미지 크기 threshold: 전체 처리 기준 비율 Returns: 전체 처리 여부 """ if threshold is None: threshold = self.default_config['large_mask_threshold'] if not components: return False if len(components) == 1: comp = components[0] comp_area = (comp[2] - comp[0]) * (comp[3] - comp[1]) total_area = image_shape[0] * image_shape[1] return comp_area > total_area * threshold return False def create_blend_mask(self, roi_mask: np.ndarray, config: Dict[str, Any] = None) -> np.ndarray: """ 🔥 부드러운 블렌딩을 위한 고급 마스크 생성 Args: roi_mask: ROI 영역 마스크 config: 설정 오버라이드 Returns: 블렌딩용 마스크 (0~1 범위) """ if config is None: config = self.default_config blend_mode = config.get('blend_mode', 'simple') if blend_mode == 'simple': return self._create_simple_blend_mask(roi_mask, config) else: return self._create_advanced_blend_mask(roi_mask, config) def _create_simple_blend_mask(self, roi_mask: np.ndarray, config: Dict[str, Any]) -> np.ndarray: """기본 블렌딩 마스크 (기존 방식)""" blend_mask = roi_mask.astype(np.float32) / 255.0 # 가우시안 블러로 부드러운 전환 kernel_size = min( config['max_blend_kernel'], max( config['min_blend_kernel'], min(roi_mask.shape[:2]) // config['blend_kernel_ratio'] ) ) if kernel_size % 2 == 0: kernel_size += 1 blend_mask = cv2.GaussianBlur(blend_mask, (kernel_size, kernel_size), 0) return np.expand_dims(blend_mask, axis=2) def _create_advanced_blend_mask(self, roi_mask: np.ndarray, config: Dict[str, Any]) -> np.ndarray: """🔥 고급 블렌딩 마스크 (페더링 + 거리 변환 기반)""" blend_mask = roi_mask.astype(np.float32) / 255.0 # 🔥 1단계: 거리 변환을 이용한 페더링 feather_size = config.get('feather_blend_size', 10) # 마스크의 경계에서 거리 계산 dist_transform = cv2.distanceTransform( (blend_mask * 255).astype(np.uint8), cv2.DIST_L2, 5 ) # 페더링 적용 if feather_size > 0: # 경계에서 페더 크기만큼 부드럽게 감소 feathered_mask = np.clip(dist_transform / feather_size, 0, 1) blend_mask = np.minimum(blend_mask, feathered_mask) # 🔥 2단계: 가우시안 블러로 추가 부드러움 kernel_size = min( config['max_blend_kernel'], max( config['min_blend_kernel'], min(roi_mask.shape[:2]) // config['blend_kernel_ratio'] ) ) if kernel_size % 2 == 0: kernel_size += 1 blend_mask = cv2.GaussianBlur(blend_mask, (kernel_size, kernel_size), 0) # 🔥 3단계: 경계 강화 (중앙은 1.0 유지, 경계만 부드럽게) core_mask = cv2.erode((roi_mask > 128).astype(np.uint8), np.ones((3, 3), np.uint8), iterations=2) core_mask = core_mask.astype(np.float32) # 코어 영역은 완전히 1.0, 경계 영역만 블렌딩 final_mask = np.maximum(blend_mask, core_mask) self.logger.log( f"🔧 고급 블렌딩 마스크 생성: feather={feather_size}px, kernel={kernel_size}px", level=logging.INFO ) return np.expand_dims(final_mask, axis=2) def process_roi(self, image: np.ndarray, mask: np.ndarray, roi_bbox: Tuple[int, int, int, int], config: Dict[str, Any] = None) -> Tuple[np.ndarray, bool]: """ 🔥 단일 ROI 영역 인페인팅 처리 (마스크 정제 문제 해결 버전) Args: image: 원본 이미지 mask: 이진 마스크 roi_bbox: ROI 바운딩 박스 config: 설정 오버라이드 Returns: (처리된 ROI 이미지, 성공 여부) """ if config is None: config = self.default_config x1, y1, x2, y2 = roi_bbox # 🔥 정확한 ROI-마스크 매칭 추출 roi_image = image[y1:y2, x1:x2].copy() roi_mask = mask[y1:y2, x1:x2].copy() # ROI 크기 로깅 roi_size = (x2-x1) * (y2-y1) total_size = image.shape[0] * image.shape[1] aspect_ratio = max(x2-x1, y2-y1) / max(min(x2-x1, y2-y1), 1) self.logger.log( f"ROI 처리: {x2-x1}x{y2-y1} ({roi_size/total_size*100:.1f}% of image, 종횡비:{aspect_ratio:.1f})", level=logging.INFO ) try: # 🔥 마스크가 비어있는지 확인 if np.sum(roi_mask) == 0: self.logger.log("ROI 마스크가 비어있음, 원본 반환", level=logging.INFO) return roi_image, True # 🔥 마스크 정제 여부에 따라 분기 if config.get('enable_mask_refinement', False): # 마스크 정제 활성화된 경우만 추가 처리 refined_mask = self.refine_mask(roi_mask, config) else: # 🔥 마스크 모듈에서 이미 최적화된 마스크 그대로 사용 refined_mask = roi_mask self.logger.log("🔧 마스크 모듈 최적화 마스크 사용 (추가 정제 생략)", level=logging.INFO) # 🔥 극단적 종횡비 전처리 preprocessed_image, preprocessed_mask, preprocess_info = self.preprocess_extreme_aspect_ratio(roi_image, refined_mask) # 🔥 크기 정규화 (8의 배수로 맞춤) normalized_image, normalized_mask, normalized_size = self.normalize_roi_size(preprocessed_image, preprocessed_mask) # ROI 인페인팅 (정규화된 크기로) simple_lama = self._get_simple_lama() # 🔥 BGR → RGB 변환 후 SimpleLama 호출 normalized_image_rgb = cv2.cvtColor(normalized_image, cv2.COLOR_BGR2RGB) roi_result_pil = simple_lama(normalized_image_rgb, normalized_mask) roi_result = np.array(roi_result_pil) roi_result_bgr = cv2.cvtColor(roi_result, cv2.COLOR_RGB2BGR) # 🔥 정규화 크기에서 전처리 크기로 복원 restored_from_normalized = self.restore_roi_size(roi_result_bgr, normalized_size) # 🔥 전처리된 크기에서 원본 크기로 복원 (필요한 경우) if preprocess_info['adjusted']: final_result = cv2.resize( restored_from_normalized, (roi_image.shape[1], roi_image.shape[0]), interpolation=cv2.INTER_CUBIC ) else: final_result = restored_from_normalized # 🔥 최종 크기 검증 if final_result.shape[:2] != roi_image.shape[:2]: self.logger.log( f"[오류] 최종 크기 불일치: result{final_result.shape[:2]} vs roi{roi_image.shape[:2]}", level=logging.ERROR ) # 강제 리사이즈 (마지막 수단) final_result = cv2.resize(final_result, (roi_image.shape[1], roi_image.shape[0])) # 🔥 블렌딩 (원본 roi_mask 사용 - 왜곡 방지) blend_mask = self.create_blend_mask(roi_mask, config) # refined_mask 대신 원본 사용 blended_roi = (final_result * blend_mask + roi_image * (1 - blend_mask)).astype(np.uint8) return blended_roi, True except Exception as e: self.logger.log(f"ROI 처리 실패: {e}", level=logging.WARNING) import traceback self.logger.log(f"ROI 처리 실패 상세: {traceback.format_exc()}", level=logging.DEBUG) return roi_image, False def normalize_roi_size(self, roi_image: np.ndarray, roi_mask: np.ndarray, modulo: int = 8) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int]]: """ ROI 크기를 지정된 배수로 정규화 (SimpleLama 패딩과 동일한 로직) Args: roi_image: ROI 이미지 roi_mask: ROI 마스크 modulo: 배수 (기본값: 8) Returns: (정규화된 이미지, 정규화된 마스크, 원본 크기) """ original_h, original_w = roi_image.shape[:2] # 8의 배수로 올림 target_h = ((original_h + modulo - 1) // modulo) * modulo target_w = ((original_w + modulo - 1) // modulo) * modulo if target_h == original_h and target_w == original_w: # 이미 8의 배수이면 그대로 반환 return roi_image, roi_mask, (original_h, original_w) # 패딩으로 크기 조정 (reflect 모드 사용 - SimpleLama와 동일) pad_h = target_h - original_h pad_w = target_w - original_w # 균등하게 패딩 분배 top = pad_h // 2 bottom = pad_h - top left = pad_w // 2 right = pad_w - left # 이미지 패딩 padded_image = cv2.copyMakeBorder( roi_image, top, bottom, left, right, cv2.BORDER_REFLECT ) # 마스크 패딩 (상수값 0으로 패딩) padded_mask = cv2.copyMakeBorder( roi_mask, top, bottom, left, right, cv2.BORDER_CONSTANT, value=0 ) self.logger.log( f"크기 정규화: {original_w}x{original_h} → {target_w}x{target_h} (패딩: t{top}b{bottom}l{left}r{right})", level=logging.INFO ) return padded_image, padded_mask, (original_h, original_w) def restore_roi_size(self, processed_image: np.ndarray, original_size: Tuple[int, int], modulo: int = 8) -> np.ndarray: """ 정규화된 ROI를 원본 크기로 복원 Args: processed_image: 처리된 이미지 original_size: 원본 크기 (h, w) modulo: 배수 Returns: 원본 크기로 복원된 이미지 """ original_h, original_w = original_size current_h, current_w = processed_image.shape[:2] if current_h == original_h and current_w == original_w: return processed_image # 패딩 계산 (정규화할 때와 동일한 로직) pad_h = current_h - original_h pad_w = current_w - original_w top = pad_h // 2 bottom = pad_h - top left = pad_w // 2 right = pad_w - left # 패딩 제거 restored = processed_image[top:current_h-bottom, left:current_w-right] self.logger.log( f"크기 복원: {current_w}x{current_h} → {original_w}x{original_h}", level=logging.INFO ) return restored def scale_image_if_needed(self, image: np.ndarray, mask: np.ndarray, config: Dict[str, Any]) -> Tuple[np.ndarray, np.ndarray, Dict]: """ 이미지가 최대 크기를 초과하면 축소 Args: image: 입력 이미지 mask: 입력 마스크 config: 설정 딕셔너리 Returns: (스케일된 이미지, 스케일된 마스크, 스케일 정보) """ original_h, original_w = image.shape[:2] max_dimension = max(original_h, original_w) # 🔥 설정에서 최대 크기 가져오기 max_size = config.get('max_image_size', 2048) interpolation = config.get('scale_interpolation', cv2.INTER_AREA) if max_dimension <= max_size: return image, mask, {'scaled': False, 'original_size': (original_h, original_w)} # 스케일 계산 scale_factor = max_size / max_dimension new_h = int(original_h * scale_factor) new_w = int(original_w * scale_factor) # 이미지 스케일링 scaled_image = cv2.resize(image, (new_w, new_h), interpolation=interpolation) scaled_mask = cv2.resize(mask, (new_w, new_h), interpolation=cv2.INTER_NEAREST) self.logger.log( f"이미지 스케일 다운: {original_w}x{original_h} → {new_w}x{new_h} (factor={scale_factor:.3f})", level=logging.INFO ) return scaled_image, scaled_mask, { 'scaled': True, 'original_size': (original_h, original_w), 'scale_factor': scale_factor } def restore_original_scale(self, processed_image: np.ndarray, scale_info: Dict) -> np.ndarray: """ 처리된 이미지를 원본 크기로 복원 Args: processed_image: 처리된 이미지 scale_info: 스케일 정보 딕셔너리 Returns: 원본 크기로 복원된 이미지 """ if not scale_info.get('scaled', False): return processed_image original_h, original_w = scale_info['original_size'] current_h, current_w = processed_image.shape[:2] if current_h == original_h and current_w == original_w: return processed_image # 고품질 보간법으로 업스케일 interpolation = cv2.INTER_CUBIC restored = cv2.resize(processed_image, (original_w, original_h), interpolation=interpolation) self.logger.log( f"이미지 스케일 업: {current_w}x{current_h} → {original_w}x{original_h}", level=logging.INFO ) return restored def preprocess_extreme_aspect_ratio(self, roi_image: np.ndarray, roi_mask: np.ndarray) -> Tuple[np.ndarray, np.ndarray, Dict[str, Any]]: """ 🔥 극단적 종횡비 ROI 전처리 ("마른 막대" 문제 해결) Args: roi_image: 원본 ROI 이미지 roi_mask: 원본 ROI 마스크 Returns: (처리된 이미지, 처리된 마스크, 복원 정보) """ original_h, original_w = roi_image.shape[:2] # 종횡비 계산 aspect_ratio = max(original_w, original_h) / max(min(original_w, original_h), 1) min_dimension = self.default_config['min_roi_dimension'] max_aspect_ratio = self.default_config['max_aspect_ratio'] # 조정이 필요한지 확인 needs_adjustment = ( aspect_ratio > max_aspect_ratio or min(original_w, original_h) < min_dimension ) if not needs_adjustment: return roi_image, roi_mask, {'adjusted': False} # 🔥 목표 크기 계산 target_w, target_h = original_w, original_h # 최소 차원 보장 if original_w < min_dimension: target_w = min_dimension if original_h < min_dimension: target_h = min_dimension # 종횡비 제한 new_ratio = max(target_w, target_h) / max(min(target_w, target_h), 1) if new_ratio > max_aspect_ratio: if target_w > target_h: # 가로가 긴 경우 target_h = max(target_h, int(target_w / max_aspect_ratio)) else: # 세로가 긴 경우 target_w = max(target_w, int(target_h / max_aspect_ratio)) # 🔥 고품질 리사이즈 processed_image = cv2.resize(roi_image, (target_w, target_h), interpolation=cv2.INTER_CUBIC) processed_mask = cv2.resize(roi_mask, (target_w, target_h), interpolation=cv2.INTER_NEAREST) self.logger.log( f"🔧 극단적 ROI 전처리: {original_w}x{original_h} (비율:{aspect_ratio:.1f}) → " f"{target_w}x{target_h} (비율:{max(target_w, target_h)/max(min(target_w, target_h), 1):.1f})", level=logging.INFO ) return processed_image, processed_mask, { 'adjusted': True, 'original_size': (original_h, original_w), 'processed_size': (target_h, target_w) } def inpaint_with_roi(self, image: np.ndarray, mask: np.ndarray, config: Dict[str, Any] = None) -> np.ndarray: """ 🔥 ROI 기반 인페인팅 처리 (형상 최적화 버전) """ start_time = time.time() if config is None: config = self.default_config effective_config = {**self.default_config, **config} try: # 🔥 1단계: 형상 버킷 최적화 bucket_start_time = time.time() optimized_image, optimized_mask, bucket_info = self.apply_optimal_padding(image, mask) bucket_time = time.time() - bucket_start_time bucket_name = bucket_info['bucket_name'] # 이미지 크기 스케일링 (기존 로직) scale_start_time = time.time() scaled_image, scaled_mask, scale_info = self.scale_image_if_needed(optimized_image, optimized_mask, effective_config) scale_time = time.time() - scale_start_time # 🔥 2단계: 적응적 전략 선택 (성능 히스토리 고려) strategy_start_time = time.time() components = self.find_mask_components(scaled_mask, effective_config) strategy = self.choose_processing_strategy(scaled_image.shape, components, effective_config) # 🔥 성능 히스토리 기반 전략 재조정 total_pixels = scaled_image.shape[0] * scaled_image.shape[1] predicted_time = 0.3 + (total_pixels / 1000000) * 0.2 # 간단한 예측 모델 if strategy == "full" and self.should_fallback_to_roi(bucket_name, predicted_time): strategy = "roi" self.logger.log(f"🔄 전략 변경: full → roi (성능 히스토리 기반)", level=logging.WARNING) strategy_time = time.time() - strategy_start_time self.logger.log( f"🔧 처리 준비: 버킷={bucket_name}, 전략={strategy}, " f"버킷화={bucket_time:.3f}s, 스케일링={scale_time:.3f}s, 전략선택={strategy_time:.3f}s", level=logging.INFO ) # 🔥 3단계: 인페인팅 실행 (기존 로직 유지) inpaint_start_time = time.time() if strategy == "full": result = self._process_full_image_optimized(scaled_image, scaled_mask, effective_config, bucket_name) elif strategy == "roi_parallel": result = self._process_roi_parallel(scaled_image, scaled_mask, components, effective_config) else: # roi result = self._process_roi_sequential(scaled_image, scaled_mask, components, effective_config) inpaint_time = time.time() - inpaint_start_time # 🔥 4단계: 복원 및 성능 기록 restore_start_time = time.time() # 스케일링 복원 if scale_info['scaled']: result = self.restore_original_scale(result, scale_info) # 버킷 패딩 복원 result = self.restore_from_padding(result, bucket_info) restore_time = time.time() - restore_start_time total_time = time.time() - start_time # 🔥 성능 히스토리 기록 self.record_performance(bucket_name, inpaint_time, strategy) self.logger.log( f"🎯 인페인팅 완료: 총 {total_time:.3f}s (인페인팅: {inpaint_time:.3f}s, 복원: {restore_time:.3f}s)", level=logging.INFO ) return result except Exception as e: self.logger.log(f"ROI 인페인팅 실패: {e}", level=logging.ERROR) import traceback self.logger.log(traceback.format_exc(), level=logging.DEBUG) return image def prepare_rois_parallel(self, image: np.ndarray, binary_mask: np.ndarray, merged_components: List[Tuple[int, int, int, int]], config: Dict[str, Any]) -> List[Dict[str, Any]]: """ ROI 전처리를 병렬로 수행 (CPU 작업) Args: image: 원본 이미지 binary_mask: 이진 마스크 merged_components: 병합된 컴포넌트 리스트 config: 설정 Returns: 전처리된 ROI 정보 리스트 """ import time def prepare_single_roi(roi_info): idx, comp_bbox = roi_info # 🔥 ROI 영역 확장 (설정에 따라 기본/컨텍스트 선택) if config.get('enable_mask_refinement', False): roi_bbox = self.expand_roi_with_context( comp_bbox, image.shape, config ) else: roi_bbox = self.expand_roi( comp_bbox, image.shape, margin_ratio=config['margin_ratio'] ) x1, y1, x2, y2 = roi_bbox # ROI 추출 roi_image = image[y1:y2, x1:x2].copy() roi_mask = binary_mask[y1:y2, x1:x2].copy() # 🔥 CPU에서 크기 정규화 (병렬 처리 가능) if np.sum(roi_mask) == 0: return { 'idx': idx, 'roi_bbox': roi_bbox, 'roi_image': roi_image, 'roi_mask': roi_mask, 'normalized_image': None, 'normalized_mask': None, 'original_size': roi_image.shape[:2], 'skip_processing': True } # 크기 정규화 normalized_image, normalized_mask, original_size = self.normalize_roi_size(roi_image, roi_mask) return { 'idx': idx, 'roi_bbox': roi_bbox, 'roi_image': roi_image, 'roi_mask': roi_mask, 'normalized_image': normalized_image, 'normalized_mask': normalized_mask, 'original_size': original_size, 'skip_processing': False } # 🔥 병렬 전처리 (CPU 작업만) prep_start_time = time.time() with ThreadPoolExecutor(max_workers=min(4, len(merged_components))) as executor: roi_infos = list(enumerate(merged_components)) prepared_rois = list(executor.map(prepare_single_roi, roi_infos)) prep_time = time.time() - prep_start_time self.logger.log(f"[PERF] ROI 병렬 전처리 시간: {prep_time:.3f}초 ({len(prepared_rois)}개)", level=logging.INFO) return prepared_rois def get_gpu_memory_info(self) -> Dict[str, float]: """ GPU 메모리 정보 확인 Returns: {'total_mb': float, 'used_mb': float, 'free_mb': float} """ try: import torch if torch.cuda.is_available(): total_mb = torch.cuda.get_device_properties(0).total_memory / 1024 / 1024 used_mb = torch.cuda.memory_allocated(0) / 1024 / 1024 free_mb = total_mb - used_mb return {'total_mb': total_mb, 'used_mb': used_mb, 'free_mb': free_mb} except ImportError: pass # GPU 정보를 얻을 수 없으면 안전한 기본값 return {'total_mb': 4096, 'used_mb': 2048, 'free_mb': 2048} def calculate_roi_coverage(self, components: List[Tuple[int, int, int, int]], image_shape: Tuple[int, int]) -> float: """ ROI 면적 비율 계산 Args: components: 컴포넌트 리스트 image_shape: 이미지 크기 (H, W) Returns: A (ROI 면적 비율) = (모든 ROI의 합산 면적) / (이미지 전체 면적) """ if not components: return 0.0 total_image_area = image_shape[0] * image_shape[1] total_roi_area = 0 for comp in components: x1, y1, x2, y2 = comp roi_area = (x2 - x1) * (y2 - y1) total_roi_area += roi_area return total_roi_area / total_image_area if total_image_area > 0 else 0.0 def choose_processing_strategy(self, image_shape: Tuple[int, int], components: List[Tuple[int, int, int, int]], config: Dict[str, Any]) -> str: """ 🔥 GPU 메모리와 ROI 면적 비율을 고려한 하이브리드 전략 선택 핵심 지표: - A (ROI 면적 비율) = (모든 ROI의 합산 면적) / (이미지 전체 면적) - N (ROI 개수) - H (GPU 여유 메모리 MB) - S (이미지 크기, MP) Returns: 'full': 전체 이미지 처리 'roi': ROI 기반 처리 'roi_parallel': ROI 병렬 전처리 """ # 🔥 핵심 지표 계산 gpu_info = self.get_gpu_memory_info() H = gpu_info['free_mb'] # GPU 여유 메모리 (MB) total_pixels = image_shape[0] * image_shape[1] S = total_pixels / 1_000_000 # 이미지 크기 (MP) N = len(components) # ROI 개수 A = self.calculate_roi_coverage(components, image_shape) # ROI 면적 비율 # 🔥 조정 가능한 임계값 (config로 튜닝 가능) memory_high_threshold = config.get('memory_high_threshold', 1200) # MB memory_low_threshold = config.get('memory_low_threshold', 600) # MB small_image_threshold = config.get('small_image_mp', 1.0) # MP roi_area_high_threshold = config.get('roi_area_high', 0.60) # 60% roi_area_low_threshold = config.get('roi_area_low', 0.30) # 30% roi_count_threshold = config.get('roi_count_threshold', 3) # 개수 # 🚫 ROI 전략 완전 폐기: 작은 이미지에서는 항상 full이 더 빠름 # 성능 측정 결과 1.2초(full) vs 4-5초(roi)로 full이 압도적으로 빠름 strategy = "full" reason = f"roi_strategy_deprecated(S={S:.1f}MP,A={A:.1%},N={N},H={H:.0f}MB)" # 🔥 상세 성능 예측 로깅 estimated_roi_overhead = N * 0.02 + 0.03 # 컴포넌트당 20ms + 기본 30ms estimated_processing_time = S * 0.5 # 메가픽셀당 0.5초 (SimpleLama 기준) efficiency_ratio = estimated_roi_overhead / max(estimated_processing_time, 0.01) self.logger.log( f"[STRATEGY] 🎯 핵심 지표: S={S:.1f}MP, A={A:.1%}, N={N}, H={H:.0f}MB", level=logging.INFO ) self.logger.log( f"[STRATEGY] 선택된 전략: {strategy} (이유: {reason})", level=logging.INFO ) self.logger.log( f"[STRATEGY] 성능 예측: ROI오버헤드={estimated_roi_overhead:.3f}s, " f"처리시간={estimated_processing_time:.3f}s, 효율비={efficiency_ratio:.2f}", level=logging.INFO ) return strategy def cleanup_memory(self): """메모리 정리""" import gc gc.collect() try: import torch if torch.cuda.is_available(): torch.cuda.empty_cache() except ImportError: pass self.logger.log("메모리 정리 완료", level=logging.INFO) def get_optimal_bucket_size(self, height: int, width: int) -> Tuple[int, int, str]: """ 🔥 이미지 크기를 성능 최적화된 버킷으로 정규화 Args: height, width: 원본 이미지 크기 Returns: (최적화된 높이, 너비, 버킷명) """ # 긴 변과 짧은 변 구분 long_side = max(height, width) short_side = min(height, width) is_portrait = height > width # 🔥 사용자 이미지 크기 기반 버킷 매핑 (항상 원본보다 크거나 같게) if long_side <= 800: # 작은 이미지: 640×640 또는 832×512 if abs(height - width) < 100: # 정사각형에 가까움 bucket_h, bucket_w = 640, 640 bucket_name = "square_small" else: bucket_h, bucket_w = (832, 512) if is_portrait else (512, 832) bucket_name = "landscape_standard" elif long_side <= 1200: # 중간 이미지: 웹툰 표준 if short_side >= 700: # 정사각형에 가까움 bucket_h, bucket_w = 1024, 832 bucket_name = "webtoon_standard" else: bucket_h, bucket_w = (1152, 896) if is_portrait else (896, 1152) bucket_name = "webtoon_portrait" elif long_side <= 2100: # 🔥 긴 이미지: 원본 크기 고려하여 버킷 선택 if is_portrait: # 세로형: 높이를 충분히 큰 버킷으로 bucket_h = max(2048, ((height // 64) + 1) * 64) # 64배수로 올림 bucket_w = max(896, ((width // 64) + 1) * 64) else: # 가로형: 너비를 충분히 큰 버킷으로 bucket_w = max(2048, ((width // 64) + 1) * 64) bucket_h = max(896, ((height // 64) + 1) * 64) bucket_name = "webtoon_long" else: # 🔥 매우 큰 이미지: 원본 크기보다 크게 if is_portrait: bucket_h = ((height // 128) + 1) * 128 # 128배수로 올림 bucket_w = max(1280, ((width // 64) + 1) * 64) else: bucket_w = ((width // 128) + 1) * 128 bucket_h = max(768, ((height // 64) + 1) * 64) bucket_name = "landscape_wide" # 🔥 최종 방향 조정 (항상 원본보다 크거나 같게 보장) if is_portrait: final_h = max(height, bucket_h) final_w = max(width, bucket_w) else: final_h = max(height, bucket_h) final_w = max(width, bucket_w) self.logger.log( f"🔧 형상 버킷 최적화: {height}×{width} → {final_h}×{final_w} ({bucket_name})", level=logging.INFO ) return final_h, final_w, bucket_name def apply_optimal_padding(self, image: np.ndarray, mask: np.ndarray) -> Tuple[np.ndarray, np.ndarray, Dict]: """ 🔥 성능 최적화된 크기로 패딩 Args: image, mask: 원본 이미지와 마스크 Returns: (패딩된 이미지, 패딩된 마스크, 복원 정보) """ original_h, original_w = image.shape[:2] target_h, target_w, bucket_name = self.get_optimal_bucket_size(original_h, original_w) # 🔥 패딩 값 계산 및 안전성 검증 pad_h = target_h - original_h pad_w = target_w - original_w # 🔥 음수 패딩 방지 (타겟이 원본보다 작을 경우) if pad_h < 0 or pad_w < 0: self.logger.log( f"⚠️ 버킷 크기 오류: 원본({original_h}×{original_w}) > 타겟({target_h}×{target_w}), 패딩 건너뜀", level=logging.WARNING ) # 패딩 없이 원본 반환 return image, mask, { 'original_size': (original_h, original_w), 'target_size': (original_h, original_w), 'bucket_name': bucket_name + "_no_padding", 'padding': (0, 0, 0, 0) } pad_top = pad_h // 2 pad_bottom = pad_h - pad_top pad_left = pad_w // 2 pad_right = pad_w - pad_left # 🔥 추가 안전성 검증 if pad_top < 0 or pad_bottom < 0 or pad_left < 0 or pad_right < 0: self.logger.log( f"⚠️ 패딩 값 오류: top={pad_top}, bottom={pad_bottom}, left={pad_left}, right={pad_right}", level=logging.ERROR ) return image, mask, { 'original_size': (original_h, original_w), 'target_size': (original_h, original_w), 'bucket_name': bucket_name + "_error", 'padding': (0, 0, 0, 0) } # 이미지 패딩 (reflect로 자연스럽게) padded_image = cv2.copyMakeBorder( image, pad_top, pad_bottom, pad_left, pad_right, borderType=cv2.BORDER_REFLECT ) # 마스크 패딩 (상수로) padded_mask = cv2.copyMakeBorder( mask, pad_top, pad_bottom, pad_left, pad_right, borderType=cv2.BORDER_CONSTANT, value=0 ) restore_info = { 'original_size': (original_h, original_w), 'target_size': (target_h, target_w), 'bucket_name': bucket_name, 'padding': (pad_top, pad_bottom, pad_left, pad_right) } self.logger.log( f"🔧 패딩 적용: {original_h}×{original_w} → {target_h}×{target_w} " f"(padding: top={pad_top}, bottom={pad_bottom}, left={pad_left}, right={pad_right})", level=logging.INFO ) return padded_image, padded_mask, restore_info def restore_from_padding(self, image: np.ndarray, restore_info: Dict) -> np.ndarray: """패딩된 이미지를 원본 크기로 복원""" pad_top, pad_bottom, pad_left, pad_right = restore_info['padding'] original_h, original_w = restore_info['original_size'] # 패딩 제거 if pad_bottom == 0: cropped = image[pad_top:, :] else: cropped = image[pad_top:-pad_bottom, :] if pad_right == 0: cropped = cropped[:, pad_left:] else: cropped = cropped[:, pad_left:-pad_right] # 최종 크기 검증 및 리사이즈 if cropped.shape[:2] != (original_h, original_w): cropped = cv2.resize(cropped, (original_w, original_h), interpolation=cv2.INTER_CUBIC) return cropped def get_processing_stats(self, image: np.ndarray, mask: np.ndarray) -> Dict[str, Any]: """ 처리 통계 정보 반환 (실제 처리 없이 분석만) Args: image: 입력 이미지 mask: 이진 마스크 Returns: 처리 통계 딕셔너리 """ binary_mask = (mask > 128).astype(np.uint8) * 255 components = self.find_mask_components(binary_mask, self.default_config) merged_components = self.merge_nearby_components(components) total_area = image.shape[0] * image.shape[1] roi_areas = [] for comp_bbox in merged_components: roi_bbox = self.expand_roi(comp_bbox, image.shape) x1, y1, x2, y2 = roi_bbox roi_area = (x2 - x1) * (y2 - y1) roi_areas.append(roi_area) return { 'total_image_size': total_area, 'num_components': len(components), 'num_merged_rois': len(merged_components), 'roi_areas': roi_areas, 'total_roi_area': sum(roi_areas), 'roi_coverage_ratio': sum(roi_areas) / total_area if total_area > 0 else 0.0, 'will_process_full': self.should_process_full_image(components, image.shape), 'memory_efficiency': 1.0 - (sum(roi_areas) / total_area) if not self.should_process_full_image(components, image.shape) else 0.0 } def should_use_batch_processing(self, roi_infos: List[Dict[str, Any]]) -> bool: """ ROI 배치 처리 사용 여부 결정 Args: roi_infos: ROI 정보 리스트 Returns: 배치 처리 사용 여부 """ if len(roi_infos) < 2: return False # 모든 ROI가 작은 경우에만 배치 처리 batch_threshold = self.default_config.get('batch_processing_threshold', 256 * 256) for roi_info in roi_infos: if roi_info['skip_processing']: continue h, w = roi_info['original_size'] if h * w > batch_threshold: return False return True def create_roi_batch(self, roi_infos: List[Dict[str, Any]]) -> Tuple[np.ndarray, np.ndarray, List[Dict[str, Any]]]: """ 🔥 작은 ROI들을 하나의 배치로 결합하여 처리 효율성 향상 Args: roi_infos: ROI 정보 리스트 Returns: (배치 이미지, 배치 마스크, 배치 정보 리스트) """ valid_rois = [roi for roi in roi_infos if not roi['skip_processing']] if len(valid_rois) < 2: return None, None, [] # 🔥 최대 크기 계산 max_h = max(roi['normalized_image'].shape[0] for roi in valid_rois) max_w = max(roi['normalized_image'].shape[1] for roi in valid_rois) # 8의 배수로 올림 max_h = ((max_h + 7) // 8) * 8 max_w = ((max_w + 7) // 8) * 8 # 배치 크기 결정 (2x2 그리드) grid_size = int(np.ceil(np.sqrt(len(valid_rois)))) batch_h = max_h * grid_size batch_w = max_w * grid_size # 배치 이미지/마스크 생성 batch_image = np.zeros((batch_h, batch_w, 3), dtype=np.uint8) batch_mask = np.zeros((batch_h, batch_w), dtype=np.uint8) batch_info = [] for i, roi_info in enumerate(valid_rois): row = i // grid_size col = i % grid_size y_start = row * max_h x_start = col * max_w roi_img = roi_info['normalized_image'] roi_mask = roi_info['normalized_mask'] h, w = roi_img.shape[:2] # ROI를 배치에 배치 batch_image[y_start:y_start+h, x_start:x_start+w] = roi_img batch_mask[y_start:y_start+h, x_start:x_start+w] = roi_mask batch_info.append({ **roi_info, 'batch_position': (y_start, x_start, y_start+h, x_start+w), 'roi_size': (h, w) }) self.logger.log( f"🔥 ROI 배치 생성: {len(valid_rois)}개 ROI → {batch_w}x{batch_h} 배치 ({grid_size}x{grid_size} 그리드)", level=logging.INFO ) return batch_image, batch_mask, batch_info def refine_mask(self, mask: np.ndarray, config: Dict[str, Any] = None) -> np.ndarray: """ 🔥 마스크 품질 개선을 위한 고급 정제 Args: mask: 원본 마스크 config: 설정 오버라이드 Returns: 정제된 마스크 """ if config is None: config = self.default_config if not config.get('enable_mask_refinement', True): return mask refined_mask = mask.copy() # 🔥 1단계: 작은 노이즈 제거 (Opening) erosion_kernel = config.get('mask_erosion_kernel', 2) if erosion_kernel > 0: kernel = np.ones((erosion_kernel, erosion_kernel), np.uint8) refined_mask = cv2.morphologyEx(refined_mask, cv2.MORPH_OPEN, kernel) # 🔥 2단계: 마스크 영역 확장 (텍스트 경계 완전 커버) dilation_kernel = config.get('mask_dilation_kernel', 3) if dilation_kernel > 0: kernel = np.ones((dilation_kernel, dilation_kernel), np.uint8) refined_mask = cv2.dilate(refined_mask, kernel, iterations=1) # 🔥 3단계: 부드러운 경계 생성 blur_kernel = config.get('mask_blur_kernel', 5) if blur_kernel > 0 and blur_kernel % 2 == 1: refined_mask = cv2.GaussianBlur(refined_mask, (blur_kernel, blur_kernel), 0) # 블러 후 다시 이진화 (128 이상을 255로) refined_mask = np.where(refined_mask > 128, 255, 0).astype(np.uint8) self.logger.log( f"🔧 마스크 정제 완료: erosion={erosion_kernel}, dilation={dilation_kernel}, blur={blur_kernel}", level=logging.INFO ) return refined_mask def expand_roi_with_context(self, bbox: Tuple[int, int, int, int], image_shape: Tuple[int, int], config: Dict[str, Any] = None) -> Tuple[int, int, int, int]: """ 🔥 컨텍스트를 고려한 ROI 확장 (더 넓은 영역으로 품질 향상) Args: bbox: 원본 바운딩 박스 image_shape: 이미지 크기 config: 설정 Returns: 확장된 바운딩 박스 """ if config is None: config = self.default_config # 기본 여백 + 컨텍스트 확장 base_margin_ratio = config.get('margin_ratio', 0.15) context_expansion = config.get('context_expansion_ratio', 0.3) total_margin_ratio = base_margin_ratio + context_expansion h, w = image_shape[:2] x1, y1, x2, y2 = bbox # 현재 크기 기준으로 여백 계산 roi_w, roi_h = x2 - x1, y2 - y1 margin_x = int(roi_w * total_margin_ratio) margin_y = int(roi_h * total_margin_ratio) # 이미지 경계 내로 제한 x1 = max(0, x1 - margin_x) y1 = max(0, y1 - margin_y) x2 = min(w, x2 + margin_x) y2 = min(h, y2 + margin_y) self.logger.log( f"🔧 컨텍스트 확장: 기본 여백 {base_margin_ratio:.1%} + 컨텍스트 {context_expansion:.1%} = {total_margin_ratio:.1%}", level=logging.INFO ) return (x1, y1, x2, y2) def _setup_cudnn_optimization(self): """🔥 강화된 cuDNN 최적화 설정""" try: import torch if torch.cuda.is_available(): # 🔥 cuDNN 벤치마크 활성화 - 첫 실행에서 최적 알고리즘 찾아서 캐시 torch.backends.cudnn.benchmark = True torch.backends.cudnn.deterministic = False # 🔥 메모리 형식 최적화 torch.backends.cudnn.allow_tf32 = True torch.backends.cuda.matmul.allow_tf32 = True # 🔥 메모리 할당 전략 최적화 torch.cuda.empty_cache() # 기존 캐시 정리 # 🔥 cuDNN 알고리즘 캐시 예열 (동일 크기 더미 텐서로 미리 탐색) self._warmup_cudnn_algorithms() self.logger.log("🔥 강화된 cuDNN 최적화 설정 완료", level=logging.INFO) except ImportError: self.logger.log("cuDNN 라이브러리를 찾을 수 없습니다. cuDNN 최적화를 사용할 수 없습니다.", level=logging.WARNING) def _warmup_cudnn_algorithms(self): """🔥 cuDNN 알고리즘 캐시 예열""" try: import torch import torch.nn.functional as F # 🔥 일반적인 웹툰 크기들로 cuDNN 알고리즘 미리 탐색 common_sizes = [ (1, 3, 640, 640), # 작은 정사각형 (1, 3, 832, 1024), # 웹툰 표준 (1, 3, 1200, 816), # 현재 샘플 크기 (1, 3, 896, 1152), # 웹툰 세로형 ] with torch.no_grad(): for size in common_sizes: try: # 더미 텐서 생성 dummy_input = torch.randn(size, device='cuda', dtype=torch.float32) # 일반적인 연산들로 cuDNN 알고리즘 탐색 트리거 _ = F.conv2d(dummy_input, torch.randn(32, 3, 3, 3, device='cuda'), padding=1) _ = F.conv2d(dummy_input, torch.randn(64, 3, 5, 5, device='cuda'), padding=2) _ = F.interpolate(dummy_input, scale_factor=0.5, mode='bilinear') _ = F.interpolate(dummy_input, scale_factor=2.0, mode='bilinear') del dummy_input # 즉시 메모리 해제 except Exception as e: # 개별 크기 실패해도 계속 진행 continue # 메모리 정리 torch.cuda.empty_cache() self.logger.log("🔥 cuDNN 알고리즘 캐시 예열 완료", level=logging.INFO) except Exception as e: self.logger.log(f"cuDNN 예열 중 오류 (무시): {e}", level=logging.DEBUG) def record_performance(self, bucket_name: str, processing_time: float, strategy: str): """🔥 버킷별 성능 히스토리 기록""" if bucket_name not in self.bucket_performance_history: self.bucket_performance_history[bucket_name] = { 'times': [], 'strategies': [], 'avg_time': 0.0, 'slow_count': 0 } history = self.bucket_performance_history[bucket_name] history['times'].append(processing_time) history['strategies'].append(strategy) # 최근 5개 기록만 유지 if len(history['times']) > 5: history['times'] = history['times'][-5:] history['strategies'] = history['strategies'][-5:] # 평균 시간 계산 history['avg_time'] = sum(history['times']) / len(history['times']) # 느린 처리 카운트 (2초 이상) history['slow_count'] = sum(1 for t in history['times'] if t > 2.0) self.logger.log( f"📊 성능 기록: {bucket_name} ({strategy}) = {processing_time:.2f}s, 평균: {history['avg_time']:.2f}s", level=logging.INFO ) def should_fallback_to_roi(self, bucket_name: str, predicted_time: float) -> bool: """🚫 성능 히스토리 기반 ROI 폴백 완전 비활성화""" # 🔥 성능 히스토리 폴백은 1초대 성능을 4-5초로 악화시키므로 완전 비활성화 self.logger.log( f"🚫 ROI 폴백 비활성화: {bucket_name} (성능 악화 방지를 위해 full 전략 강제)", level=logging.INFO ) return False # 🔥 조건 1: 평균 시간이 예측치의 4배 이상 if history['avg_time'] > predicted_time * 4: self.logger.log( f"⚡ ROI 폴백 트리거: {bucket_name} 평균 {history['avg_time']:.2f}s > 예측 {predicted_time:.2f}s × 4", level=logging.WARNING ) return True # 🔥 조건 2: 최근 3회 중 2회 이상이 느림 if len(history['times']) >= 3 and history['slow_count'] >= 2: self.logger.log( f"⚡ ROI 폴백 트리거: {bucket_name} 최근 {history['slow_count']}/3회 느림", level=logging.WARNING ) return True return False def _process_full_image_optimized(self, image: np.ndarray, mask: np.ndarray, config: Dict[str, Any], bucket_name: str) -> np.ndarray: """🔥 최적화된 전체 이미지 처리""" self.logger.log(f"전체 이미지 처리 시작 (버킷: {bucket_name})", level=logging.INFO) # 이진 마스크로 변환 binary_mask = (mask > 128).astype(np.uint8) * 255 # 크기 정규화 normalized_image, normalized_mask, normalized_size = self.normalize_roi_size(image, binary_mask) # SimpleLama 처리 simple_lama = self._get_simple_lama() normalized_image_rgb = cv2.cvtColor(normalized_image, cv2.COLOR_BGR2RGB) result_pil = simple_lama(normalized_image_rgb, normalized_mask) result = np.array(result_pil) result_bgr = cv2.cvtColor(result, cv2.COLOR_RGB2BGR) # 원본 크기로 복원 restored_result = self.restore_roi_size(result_bgr, normalized_size) return restored_result def _process_roi_sequential(self, image: np.ndarray, mask: np.ndarray, components: List, config: Dict[str, Any]) -> np.ndarray: """🔥 순차 ROI 처리""" self.logger.log("ROI 순차 처리 시작", level=logging.INFO) binary_mask = (mask > 128).astype(np.uint8) * 255 merged_components = self.merge_nearby_components(components, config['merge_distance']) result_image = image.copy() successful_rois = 0 for i, comp_bbox in enumerate(merged_components): # ROI 영역 확장 if config.get('enable_mask_refinement', False): roi_bbox = self.expand_roi_with_context(comp_bbox, image.shape, config) else: roi_bbox = self.expand_roi(comp_bbox, image.shape, margin_ratio=config['margin_ratio']) # ROI 처리 processed_roi, success = self.process_roi(image, binary_mask, roi_bbox, config) if success: x1, y1, x2, y2 = roi_bbox result_image[y1:y2, x1:x2] = processed_roi successful_rois += 1 self.logger.log(f"ROI 순차 처리 완료: {successful_rois}/{len(merged_components)} 성공", level=logging.INFO) return result_image def _process_roi_parallel(self, image: np.ndarray, mask: np.ndarray, components: List, config: Dict[str, Any]) -> np.ndarray: """🔥 병렬 ROI 처리""" self.logger.log("ROI 병렬 처리 시작", level=logging.INFO) binary_mask = (mask > 128).astype(np.uint8) * 255 merged_components = self.merge_nearby_components(components, config['merge_distance']) # ROI 전처리를 병렬로 수행 prepared_rois = self.prepare_rois_parallel(image, binary_mask, merged_components, config) result_image = image.copy() successful_rois = 0 for roi_info in prepared_rois: if roi_info['skip_processing']: continue # SimpleLama 처리 simple_lama = self._get_simple_lama() roi_normalized_rgb = cv2.cvtColor(roi_info['normalized_image'], cv2.COLOR_BGR2RGB) roi_result_pil = simple_lama(roi_normalized_rgb, roi_info['normalized_mask']) roi_result = np.array(roi_result_pil) roi_result_bgr = cv2.cvtColor(roi_result, cv2.COLOR_RGB2BGR) # 원본 크기로 복원 restored_roi = self.restore_roi_size(roi_result_bgr, roi_info['original_size']) # 부드러운 블렌딩 blend_mask = self.create_blend_mask(roi_info['roi_mask'], config) blended_roi = (restored_roi * blend_mask + roi_info['roi_image'] * (1 - blend_mask)).astype(np.uint8) # 원본 이미지에 적용 x1, y1, x2, y2 = roi_info['roi_bbox'] result_image[y1:y2, x1:x2] = blended_roi successful_rois += 1 self.logger.log(f"ROI 병렬 처리 완료: {successful_rois}/{len(merged_components)} 성공", level=logging.INFO) return result_image # 편의 함수들 def create_roi_inpainter(logger=None, config=None): """ROI 인페인팅 모듈 팩토리 함수""" inpainter = ROIInpaintingModule(logger) if config: inpainter.default_config.update(config) return inpainter def quick_roi_inpaint(image: np.ndarray, mask: np.ndarray, logger=None, config=None) -> np.ndarray: """간단한 ROI 인페인팅 수행""" inpainter = create_roi_inpainter(logger, config) result = inpainter.inpaint_with_roi(image, mask, config) inpainter.cleanup_memory() return result