ImageProcessor_MainServer/worker/roi_inpainting_module.py

1558 lines
63 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
ROI 기반 인페인팅 모듈
- 마스크 영역을 분석하여 효율적인 ROI 처리
- 인접한 마스크 컴포넌트 자동 병합
- 적응적 처리 전략 (ROI vs 전체 이미지)
- 부드러운 블렌딩으로 자연스러운 결합
"""
import cv2
import numpy as np
import logging
import torch
import time
import gc
from typing import List, Tuple, Optional, Any, Dict
from simple_lama_inpainting import SimpleLama
from concurrent.futures import ThreadPoolExecutor
class ROIInpaintingModule:
def __init__(self, logger=None):
"""
ROI 기반 인페인팅 모듈 초기화
Args:
logger: 로깅 객체 (None이면 기본 출력)
"""
self.logger = logger or self._create_default_logger()
self.simple_lama = None
# 🔥 형상 최적화를 위한 버킷 시스템
self.performance_buckets = {
# 일반적인 웹툰/만화 크기들을 64배수로 정규화
(896, 1152): "webtoon_portrait", # 790×1053, 750×917 등
(832, 1024): "webtoon_standard", # 800×800, 790×790 등
(896, 2048): "webtoon_long", # 790×1959, 997×2000 등
(1280, 768): "landscape_wide", # 1242×698 등
(832, 512): "landscape_standard", # 800×450, 790×409 등
(640, 640): "square_small", # 587×587 등
}
# 🔥 성능 히스토리 (버킷별 실측 시간 추적)
self.bucket_performance_history = {}
# 🔥 cuDNN 최적화 설정
self._setup_cudnn_optimization()
# 기본 설정값
self.default_config = {
'min_component_area': 100, # 최소 컴포넌트 크기
'merge_distance': 50, # 컴포넌트 병합 거리
'margin_ratio': 0.15, # ROI 여백 비율 (15%)
'large_mask_threshold': 0.5, # 전체 처리 기준 (50%)
'blend_kernel_ratio': 20, # 블렌딩 커널 크기 비율
'max_blend_kernel': 21, # 최대 블렌딩 커널 크기
'min_blend_kernel': 5, # 최소 블렌딩 커널 크기
# 🔥 성능 최적화 설정
'disable_roi_fallback': False, # ROI 폴백 완전 비활성화 (True면 항상 full 전략)
# 🔥 이미지 크기 제한 설정
'max_image_size': 2048, # 최대 이미지 크기 (긴 변 기준)
'enable_size_limit': True, # 크기 제한 활성화
'scale_interpolation': cv2.INTER_AREA, # 축소 시 보간법
'upscale_interpolation': cv2.INTER_CUBIC, # 확대 시 보간법
# 🔥 하이브리드 전략 설정
'memory_high_threshold': 1200, # GPU 여유 메모리 고임계값 (MB)
'memory_low_threshold': 600, # GPU 여유 메모리 저임계값 (MB)
'small_image_mp': 1.0, # 작은 이미지 기준 (MP)
'roi_area_high': 0.60, # ROI 면적 고임계값 (60%)
'roi_area_low': 0.30, # ROI 면적 저임계값 (30%)
'roi_count_threshold': 3, # ROI 개수 임계값
# 🔥 극단적 종횡비 최적화 설정
'min_roi_dimension': 128, # ROI 최소 차원 (px)
'max_aspect_ratio': 8.0, # 최대 허용 종횡비
'use_64_alignment': False, # 64배수 정렬 사용 여부
'batch_processing_threshold': 256 * 256, # 작은 ROI 배치 처리 임계값 (px)
# 🔥 인페인팅 품질 개선 설정
'mask_dilation_kernel': 3, # 마스크 팽창 커널 크기
'mask_erosion_kernel': 2, # 마스크 침식 커널 크기
'mask_blur_kernel': 5, # 마스크 블러 커널 크기
'enable_mask_refinement': True, # 마스크 정제 활성화
'feather_blend_size': 10, # 부드러운 블렌딩을 위한 페더 크기
'blend_mode': 'advanced', # 'simple' 또는 'advanced'
'context_expansion_ratio': 0.3, # 컨텍스트 확장 비율 (더 넓은 영역으로 인페인팅)
}
self.logger.log("ROI 인페인팅 모듈 초기화 완료", level=logging.INFO)
def _create_default_logger(self):
"""기본 로거 생성"""
class DefaultLogger:
def log(self, msg, level=logging.INFO, **kwargs):
print(f"[ROI-INPAINT] {msg}")
return DefaultLogger()
def _get_simple_lama(self):
"""SimpleLama 인스턴스 지연 로딩"""
if self.simple_lama is None:
self.simple_lama = SimpleLama()
self.logger.log("SimpleLama 인스턴스 생성 완료", level=logging.INFO)
return self.simple_lama
def find_mask_components(self, mask: np.ndarray, config: Dict[str, Any] = None) -> List[Tuple[int, int, int, int]]:
"""
마스크에서 연결된 컴포넌트들을 찾고 바운딩 박스 반환
Args:
mask: 이진 마스크 (0 또는 255)
config: 설정 딕셔너리
Returns:
List of (x1, y1, x2, y2) 바운딩 박스
"""
if config is None:
config = self.default_config
min_area = config.get('min_component_area', self.default_config['min_component_area'])
# 연결된 컴포넌트 분석
num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(
mask, connectivity=8
)
components = []
for i in range(1, num_labels): # 0은 배경
area = stats[i, cv2.CC_STAT_AREA]
if area < min_area:
continue
x = stats[i, cv2.CC_STAT_LEFT]
y = stats[i, cv2.CC_STAT_TOP]
w = stats[i, cv2.CC_STAT_WIDTH]
h = stats[i, cv2.CC_STAT_HEIGHT]
components.append((x, y, x + w, y + h))
self.logger.log(f"마스크 컴포넌트 {len(components)}개 발견", level=logging.INFO)
return components
def merge_nearby_components(self, components: List[Tuple[int, int, int, int]],
merge_distance: int = None) -> List[Tuple[int, int, int, int]]:
"""
인접한 컴포넌트들을 병합
Args:
components: 컴포넌트 바운딩 박스 리스트
merge_distance: 병합 거리 임계값
Returns:
병합된 컴포넌트 리스트
"""
if merge_distance is None:
merge_distance = self.default_config['merge_distance']
if not components:
return []
merged = []
used = set()
for i, comp1 in enumerate(components):
if i in used:
continue
# 현재 컴포넌트와 병합할 그룹 시작
group = [comp1]
used.add(i)
# 다른 컴포넌트들과 거리 확인
for j, comp2 in enumerate(components):
if j in used:
continue
# 두 박스간 최소 거리 계산
distance = self._calculate_bbox_distance(comp1, comp2)
if distance <= merge_distance:
group.append(comp2)
used.add(j)
# 그룹의 전체 바운딩 박스 계산
if group:
xs = [x for x1, y1, x2, y2 in group for x in (x1, x2)]
ys = [y for x1, y1, x2, y2 in group for y in (y1, y2)]
merged.append((min(xs), min(ys), max(xs), max(ys)))
self.logger.log(f"컴포넌트 병합: {len(components)}{len(merged)}", level=logging.INFO)
return merged
def _calculate_bbox_distance(self, bbox1: Tuple[int, int, int, int],
bbox2: Tuple[int, int, int, int]) -> float:
"""두 바운딩 박스간 최소 거리 계산"""
x1_min, y1_min, x1_max, y1_max = bbox1
x2_min, y2_min, x2_max, y2_max = bbox2
# 박스간 거리 (겹치면 0)
dx = max(0, max(x1_min - x2_max, x2_min - x1_max))
dy = max(0, max(y1_min - y2_max, y2_min - y1_max))
return (dx**2 + dy**2)**0.5
def expand_roi(self, bbox: Tuple[int, int, int, int],
image_shape: Tuple[int, int],
margin_ratio: float = None) -> Tuple[int, int, int, int]:
"""
ROI에 여백 추가
Args:
bbox: 원본 바운딩 박스 (x1, y1, x2, y2)
image_shape: 이미지 크기 (H, W)
margin_ratio: 여백 비율
Returns:
확장된 바운딩 박스
"""
if margin_ratio is None:
margin_ratio = self.default_config['margin_ratio']
h, w = image_shape[:2]
x1, y1, x2, y2 = bbox
# 현재 크기 기준으로 여백 계산
roi_w, roi_h = x2 - x1, y2 - y1
margin_x = int(roi_w * margin_ratio)
margin_y = int(roi_h * margin_ratio)
# 이미지 경계 내로 제한
x1 = max(0, x1 - margin_x)
y1 = max(0, y1 - margin_y)
x2 = min(w, x2 + margin_x)
y2 = min(h, y2 + margin_y)
return (x1, y1, x2, y2)
def should_process_full_image(self, components: List[Tuple[int, int, int, int]],
image_shape: Tuple[int, int],
threshold: float = None) -> bool:
"""
전체 이미지 처리 여부 결정
Args:
components: 컴포넌트 리스트
image_shape: 이미지 크기
threshold: 전체 처리 기준 비율
Returns:
전체 처리 여부
"""
if threshold is None:
threshold = self.default_config['large_mask_threshold']
if not components:
return False
if len(components) == 1:
comp = components[0]
comp_area = (comp[2] - comp[0]) * (comp[3] - comp[1])
total_area = image_shape[0] * image_shape[1]
return comp_area > total_area * threshold
return False
def create_blend_mask(self, roi_mask: np.ndarray, config: Dict[str, Any] = None) -> np.ndarray:
"""
🔥 부드러운 블렌딩을 위한 고급 마스크 생성
Args:
roi_mask: ROI 영역 마스크
config: 설정 오버라이드
Returns:
블렌딩용 마스크 (0~1 범위)
"""
if config is None:
config = self.default_config
blend_mode = config.get('blend_mode', 'simple')
if blend_mode == 'simple':
return self._create_simple_blend_mask(roi_mask, config)
else:
return self._create_advanced_blend_mask(roi_mask, config)
def _create_simple_blend_mask(self, roi_mask: np.ndarray, config: Dict[str, Any]) -> np.ndarray:
"""기본 블렌딩 마스크 (기존 방식)"""
blend_mask = roi_mask.astype(np.float32) / 255.0
# 가우시안 블러로 부드러운 전환
kernel_size = min(
config['max_blend_kernel'],
max(
config['min_blend_kernel'],
min(roi_mask.shape[:2]) // config['blend_kernel_ratio']
)
)
if kernel_size % 2 == 0:
kernel_size += 1
blend_mask = cv2.GaussianBlur(blend_mask, (kernel_size, kernel_size), 0)
return np.expand_dims(blend_mask, axis=2)
def _create_advanced_blend_mask(self, roi_mask: np.ndarray, config: Dict[str, Any]) -> np.ndarray:
"""🔥 고급 블렌딩 마스크 (페더링 + 거리 변환 기반)"""
blend_mask = roi_mask.astype(np.float32) / 255.0
# 🔥 1단계: 거리 변환을 이용한 페더링
feather_size = config.get('feather_blend_size', 10)
# 마스크의 경계에서 거리 계산
dist_transform = cv2.distanceTransform(
(blend_mask * 255).astype(np.uint8),
cv2.DIST_L2, 5
)
# 페더링 적용
if feather_size > 0:
# 경계에서 페더 크기만큼 부드럽게 감소
feathered_mask = np.clip(dist_transform / feather_size, 0, 1)
blend_mask = np.minimum(blend_mask, feathered_mask)
# 🔥 2단계: 가우시안 블러로 추가 부드러움
kernel_size = min(
config['max_blend_kernel'],
max(
config['min_blend_kernel'],
min(roi_mask.shape[:2]) // config['blend_kernel_ratio']
)
)
if kernel_size % 2 == 0:
kernel_size += 1
blend_mask = cv2.GaussianBlur(blend_mask, (kernel_size, kernel_size), 0)
# 🔥 3단계: 경계 강화 (중앙은 1.0 유지, 경계만 부드럽게)
core_mask = cv2.erode((roi_mask > 128).astype(np.uint8), np.ones((3, 3), np.uint8), iterations=2)
core_mask = core_mask.astype(np.float32)
# 코어 영역은 완전히 1.0, 경계 영역만 블렌딩
final_mask = np.maximum(blend_mask, core_mask)
self.logger.log(
f"🔧 고급 블렌딩 마스크 생성: feather={feather_size}px, kernel={kernel_size}px",
level=logging.INFO
)
return np.expand_dims(final_mask, axis=2)
def process_roi(self, image: np.ndarray, mask: np.ndarray,
roi_bbox: Tuple[int, int, int, int],
config: Dict[str, Any] = None) -> Tuple[np.ndarray, bool]:
"""
🔥 단일 ROI 영역 인페인팅 처리 (마스크 정제 문제 해결 버전)
Args:
image: 원본 이미지
mask: 이진 마스크
roi_bbox: ROI 바운딩 박스
config: 설정 오버라이드
Returns:
(처리된 ROI 이미지, 성공 여부)
"""
if config is None:
config = self.default_config
x1, y1, x2, y2 = roi_bbox
# 🔥 정확한 ROI-마스크 매칭 추출
roi_image = image[y1:y2, x1:x2].copy()
roi_mask = mask[y1:y2, x1:x2].copy()
# ROI 크기 로깅
roi_size = (x2-x1) * (y2-y1)
total_size = image.shape[0] * image.shape[1]
aspect_ratio = max(x2-x1, y2-y1) / max(min(x2-x1, y2-y1), 1)
self.logger.log(
f"ROI 처리: {x2-x1}x{y2-y1} ({roi_size/total_size*100:.1f}% of image, 종횡비:{aspect_ratio:.1f})",
level=logging.INFO
)
try:
# 🔥 마스크가 비어있는지 확인
if np.sum(roi_mask) == 0:
self.logger.log("ROI 마스크가 비어있음, 원본 반환", level=logging.INFO)
return roi_image, True
# 🔥 마스크 정제 여부에 따라 분기
if config.get('enable_mask_refinement', False):
# 마스크 정제 활성화된 경우만 추가 처리
refined_mask = self.refine_mask(roi_mask, config)
else:
# 🔥 마스크 모듈에서 이미 최적화된 마스크 그대로 사용
refined_mask = roi_mask
self.logger.log("🔧 마스크 모듈 최적화 마스크 사용 (추가 정제 생략)", level=logging.INFO)
# 🔥 극단적 종횡비 전처리
preprocessed_image, preprocessed_mask, preprocess_info = self.preprocess_extreme_aspect_ratio(roi_image, refined_mask)
# 🔥 크기 정규화 (8의 배수로 맞춤)
normalized_image, normalized_mask, normalized_size = self.normalize_roi_size(preprocessed_image, preprocessed_mask)
# ROI 인페인팅 (정규화된 크기로)
simple_lama = self._get_simple_lama()
# 🔥 BGR → RGB 변환 후 SimpleLama 호출
normalized_image_rgb = cv2.cvtColor(normalized_image, cv2.COLOR_BGR2RGB)
roi_result_pil = simple_lama(normalized_image_rgb, normalized_mask)
roi_result = np.array(roi_result_pil)
roi_result_bgr = cv2.cvtColor(roi_result, cv2.COLOR_RGB2BGR)
# 🔥 정규화 크기에서 전처리 크기로 복원
restored_from_normalized = self.restore_roi_size(roi_result_bgr, normalized_size)
# 🔥 전처리된 크기에서 원본 크기로 복원 (필요한 경우)
if preprocess_info['adjusted']:
final_result = cv2.resize(
restored_from_normalized,
(roi_image.shape[1], roi_image.shape[0]),
interpolation=cv2.INTER_CUBIC
)
else:
final_result = restored_from_normalized
# 🔥 최종 크기 검증
if final_result.shape[:2] != roi_image.shape[:2]:
self.logger.log(
f"[오류] 최종 크기 불일치: result{final_result.shape[:2]} vs roi{roi_image.shape[:2]}",
level=logging.ERROR
)
# 강제 리사이즈 (마지막 수단)
final_result = cv2.resize(final_result, (roi_image.shape[1], roi_image.shape[0]))
# 🔥 블렌딩 (원본 roi_mask 사용 - 왜곡 방지)
blend_mask = self.create_blend_mask(roi_mask, config) # refined_mask 대신 원본 사용
blended_roi = (final_result * blend_mask + roi_image * (1 - blend_mask)).astype(np.uint8)
return blended_roi, True
except Exception as e:
self.logger.log(f"ROI 처리 실패: {e}", level=logging.WARNING)
import traceback
self.logger.log(f"ROI 처리 실패 상세: {traceback.format_exc()}", level=logging.DEBUG)
return roi_image, False
def normalize_roi_size(self, roi_image: np.ndarray, roi_mask: np.ndarray,
modulo: int = 8) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int]]:
"""
ROI 크기를 지정된 배수로 정규화 (SimpleLama 패딩과 동일한 로직)
Args:
roi_image: ROI 이미지
roi_mask: ROI 마스크
modulo: 배수 (기본값: 8)
Returns:
(정규화된 이미지, 정규화된 마스크, 원본 크기)
"""
original_h, original_w = roi_image.shape[:2]
# 8의 배수로 올림
target_h = ((original_h + modulo - 1) // modulo) * modulo
target_w = ((original_w + modulo - 1) // modulo) * modulo
if target_h == original_h and target_w == original_w:
# 이미 8의 배수이면 그대로 반환
return roi_image, roi_mask, (original_h, original_w)
# 패딩으로 크기 조정 (reflect 모드 사용 - SimpleLama와 동일)
pad_h = target_h - original_h
pad_w = target_w - original_w
# 균등하게 패딩 분배
top = pad_h // 2
bottom = pad_h - top
left = pad_w // 2
right = pad_w - left
# 이미지 패딩
padded_image = cv2.copyMakeBorder(
roi_image, top, bottom, left, right,
cv2.BORDER_REFLECT
)
# 마스크 패딩 (상수값 0으로 패딩)
padded_mask = cv2.copyMakeBorder(
roi_mask, top, bottom, left, right,
cv2.BORDER_CONSTANT, value=0
)
self.logger.log(
f"크기 정규화: {original_w}x{original_h}{target_w}x{target_h} (패딩: t{top}b{bottom}l{left}r{right})",
level=logging.INFO
)
return padded_image, padded_mask, (original_h, original_w)
def restore_roi_size(self, processed_image: np.ndarray,
original_size: Tuple[int, int],
modulo: int = 8) -> np.ndarray:
"""
정규화된 ROI를 원본 크기로 복원
Args:
processed_image: 처리된 이미지
original_size: 원본 크기 (h, w)
modulo: 배수
Returns:
원본 크기로 복원된 이미지
"""
original_h, original_w = original_size
current_h, current_w = processed_image.shape[:2]
if current_h == original_h and current_w == original_w:
return processed_image
# 패딩 계산 (정규화할 때와 동일한 로직)
pad_h = current_h - original_h
pad_w = current_w - original_w
top = pad_h // 2
bottom = pad_h - top
left = pad_w // 2
right = pad_w - left
# 패딩 제거
restored = processed_image[top:current_h-bottom, left:current_w-right]
self.logger.log(
f"크기 복원: {current_w}x{current_h}{original_w}x{original_h}",
level=logging.INFO
)
return restored
def scale_image_if_needed(self, image: np.ndarray, mask: np.ndarray,
config: Dict[str, Any]) -> Tuple[np.ndarray, np.ndarray, Dict]:
"""
이미지가 최대 크기를 초과하면 축소
Args:
image: 입력 이미지
mask: 입력 마스크
config: 설정 딕셔너리
Returns:
(스케일된 이미지, 스케일된 마스크, 스케일 정보)
"""
original_h, original_w = image.shape[:2]
max_dimension = max(original_h, original_w)
# 🔥 설정에서 최대 크기 가져오기
max_size = config.get('max_image_size', 2048)
interpolation = config.get('scale_interpolation', cv2.INTER_AREA)
if max_dimension <= max_size:
return image, mask, {'scaled': False, 'original_size': (original_h, original_w)}
# 스케일 계산
scale_factor = max_size / max_dimension
new_h = int(original_h * scale_factor)
new_w = int(original_w * scale_factor)
# 이미지 스케일링
scaled_image = cv2.resize(image, (new_w, new_h), interpolation=interpolation)
scaled_mask = cv2.resize(mask, (new_w, new_h), interpolation=cv2.INTER_NEAREST)
self.logger.log(
f"이미지 스케일 다운: {original_w}x{original_h}{new_w}x{new_h} (factor={scale_factor:.3f})",
level=logging.INFO
)
return scaled_image, scaled_mask, {
'scaled': True,
'original_size': (original_h, original_w),
'scale_factor': scale_factor
}
def restore_original_scale(self, processed_image: np.ndarray,
scale_info: Dict) -> np.ndarray:
"""
처리된 이미지를 원본 크기로 복원
Args:
processed_image: 처리된 이미지
scale_info: 스케일 정보 딕셔너리
Returns:
원본 크기로 복원된 이미지
"""
if not scale_info.get('scaled', False):
return processed_image
original_h, original_w = scale_info['original_size']
current_h, current_w = processed_image.shape[:2]
if current_h == original_h and current_w == original_w:
return processed_image
# 고품질 보간법으로 업스케일
interpolation = cv2.INTER_CUBIC
restored = cv2.resize(processed_image, (original_w, original_h), interpolation=interpolation)
self.logger.log(
f"이미지 스케일 업: {current_w}x{current_h}{original_w}x{original_h}",
level=logging.INFO
)
return restored
def preprocess_extreme_aspect_ratio(self, roi_image: np.ndarray, roi_mask: np.ndarray) -> Tuple[np.ndarray, np.ndarray, Dict[str, Any]]:
"""
🔥 극단적 종횡비 ROI 전처리 ("마른 막대" 문제 해결)
Args:
roi_image: 원본 ROI 이미지
roi_mask: 원본 ROI 마스크
Returns:
(처리된 이미지, 처리된 마스크, 복원 정보)
"""
original_h, original_w = roi_image.shape[:2]
# 종횡비 계산
aspect_ratio = max(original_w, original_h) / max(min(original_w, original_h), 1)
min_dimension = self.default_config['min_roi_dimension']
max_aspect_ratio = self.default_config['max_aspect_ratio']
# 조정이 필요한지 확인
needs_adjustment = (
aspect_ratio > max_aspect_ratio or
min(original_w, original_h) < min_dimension
)
if not needs_adjustment:
return roi_image, roi_mask, {'adjusted': False}
# 🔥 목표 크기 계산
target_w, target_h = original_w, original_h
# 최소 차원 보장
if original_w < min_dimension:
target_w = min_dimension
if original_h < min_dimension:
target_h = min_dimension
# 종횡비 제한
new_ratio = max(target_w, target_h) / max(min(target_w, target_h), 1)
if new_ratio > max_aspect_ratio:
if target_w > target_h:
# 가로가 긴 경우
target_h = max(target_h, int(target_w / max_aspect_ratio))
else:
# 세로가 긴 경우
target_w = max(target_w, int(target_h / max_aspect_ratio))
# 🔥 고품질 리사이즈
processed_image = cv2.resize(roi_image, (target_w, target_h), interpolation=cv2.INTER_CUBIC)
processed_mask = cv2.resize(roi_mask, (target_w, target_h), interpolation=cv2.INTER_NEAREST)
self.logger.log(
f"🔧 극단적 ROI 전처리: {original_w}x{original_h} (비율:{aspect_ratio:.1f}) → "
f"{target_w}x{target_h} (비율:{max(target_w, target_h)/max(min(target_w, target_h), 1):.1f})",
level=logging.INFO
)
return processed_image, processed_mask, {
'adjusted': True,
'original_size': (original_h, original_w),
'processed_size': (target_h, target_w)
}
def inpaint_with_roi(self, image: np.ndarray, mask: np.ndarray,
config: Dict[str, Any] = None) -> np.ndarray:
"""
🔥 ROI 기반 인페인팅 처리 (형상 최적화 버전)
"""
start_time = time.time()
if config is None:
config = self.default_config
effective_config = {**self.default_config, **config}
try:
# 🔥 1단계: 형상 버킷 최적화
bucket_start_time = time.time()
optimized_image, optimized_mask, bucket_info = self.apply_optimal_padding(image, mask)
bucket_time = time.time() - bucket_start_time
bucket_name = bucket_info['bucket_name']
# 이미지 크기 스케일링 (기존 로직)
scale_start_time = time.time()
scaled_image, scaled_mask, scale_info = self.scale_image_if_needed(optimized_image, optimized_mask, effective_config)
scale_time = time.time() - scale_start_time
# 🔥 2단계: 적응적 전략 선택 (성능 히스토리 고려)
strategy_start_time = time.time()
components = self.find_mask_components(scaled_mask, effective_config)
strategy = self.choose_processing_strategy(scaled_image.shape, components, effective_config)
# 🔥 성능 히스토리 기반 전략 재조정
total_pixels = scaled_image.shape[0] * scaled_image.shape[1]
predicted_time = 0.3 + (total_pixels / 1000000) * 0.2 # 간단한 예측 모델
if strategy == "full" and self.should_fallback_to_roi(bucket_name, predicted_time):
strategy = "roi"
self.logger.log(f"🔄 전략 변경: full → roi (성능 히스토리 기반)", level=logging.WARNING)
strategy_time = time.time() - strategy_start_time
self.logger.log(
f"🔧 처리 준비: 버킷={bucket_name}, 전략={strategy}, "
f"버킷화={bucket_time:.3f}s, 스케일링={scale_time:.3f}s, 전략선택={strategy_time:.3f}s",
level=logging.INFO
)
# 🔥 3단계: 인페인팅 실행 (기존 로직 유지)
inpaint_start_time = time.time()
if strategy == "full":
result = self._process_full_image_optimized(scaled_image, scaled_mask, effective_config, bucket_name)
elif strategy == "roi_parallel":
result = self._process_roi_parallel(scaled_image, scaled_mask, components, effective_config)
else: # roi
result = self._process_roi_sequential(scaled_image, scaled_mask, components, effective_config)
inpaint_time = time.time() - inpaint_start_time
# 🔥 4단계: 복원 및 성능 기록
restore_start_time = time.time()
# 스케일링 복원
if scale_info['scaled']:
result = self.restore_original_scale(result, scale_info)
# 버킷 패딩 복원
result = self.restore_from_padding(result, bucket_info)
restore_time = time.time() - restore_start_time
total_time = time.time() - start_time
# 🔥 성능 히스토리 기록
self.record_performance(bucket_name, inpaint_time, strategy)
self.logger.log(
f"🎯 인페인팅 완료: 총 {total_time:.3f}s (인페인팅: {inpaint_time:.3f}s, 복원: {restore_time:.3f}s)",
level=logging.INFO
)
return result
except Exception as e:
self.logger.log(f"ROI 인페인팅 실패: {e}", level=logging.ERROR)
import traceback
self.logger.log(traceback.format_exc(), level=logging.DEBUG)
return image
def prepare_rois_parallel(self, image: np.ndarray, binary_mask: np.ndarray,
merged_components: List[Tuple[int, int, int, int]],
config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
ROI 전처리를 병렬로 수행 (CPU 작업)
Args:
image: 원본 이미지
binary_mask: 이진 마스크
merged_components: 병합된 컴포넌트 리스트
config: 설정
Returns:
전처리된 ROI 정보 리스트
"""
import time
def prepare_single_roi(roi_info):
idx, comp_bbox = roi_info
# 🔥 ROI 영역 확장 (설정에 따라 기본/컨텍스트 선택)
if config.get('enable_mask_refinement', False):
roi_bbox = self.expand_roi_with_context(
comp_bbox, image.shape, config
)
else:
roi_bbox = self.expand_roi(
comp_bbox, image.shape,
margin_ratio=config['margin_ratio']
)
x1, y1, x2, y2 = roi_bbox
# ROI 추출
roi_image = image[y1:y2, x1:x2].copy()
roi_mask = binary_mask[y1:y2, x1:x2].copy()
# 🔥 CPU에서 크기 정규화 (병렬 처리 가능)
if np.sum(roi_mask) == 0:
return {
'idx': idx,
'roi_bbox': roi_bbox,
'roi_image': roi_image,
'roi_mask': roi_mask,
'normalized_image': None,
'normalized_mask': None,
'original_size': roi_image.shape[:2],
'skip_processing': True
}
# 크기 정규화
normalized_image, normalized_mask, original_size = self.normalize_roi_size(roi_image, roi_mask)
return {
'idx': idx,
'roi_bbox': roi_bbox,
'roi_image': roi_image,
'roi_mask': roi_mask,
'normalized_image': normalized_image,
'normalized_mask': normalized_mask,
'original_size': original_size,
'skip_processing': False
}
# 🔥 병렬 전처리 (CPU 작업만)
prep_start_time = time.time()
with ThreadPoolExecutor(max_workers=min(4, len(merged_components))) as executor:
roi_infos = list(enumerate(merged_components))
prepared_rois = list(executor.map(prepare_single_roi, roi_infos))
prep_time = time.time() - prep_start_time
self.logger.log(f"[PERF] ROI 병렬 전처리 시간: {prep_time:.3f}초 ({len(prepared_rois)}개)", level=logging.INFO)
return prepared_rois
def get_gpu_memory_info(self) -> Dict[str, float]:
"""
GPU 메모리 정보 확인
Returns:
{'total_mb': float, 'used_mb': float, 'free_mb': float}
"""
try:
import torch
if torch.cuda.is_available():
total_mb = torch.cuda.get_device_properties(0).total_memory / 1024 / 1024
used_mb = torch.cuda.memory_allocated(0) / 1024 / 1024
free_mb = total_mb - used_mb
return {'total_mb': total_mb, 'used_mb': used_mb, 'free_mb': free_mb}
except ImportError:
pass
# GPU 정보를 얻을 수 없으면 안전한 기본값
return {'total_mb': 4096, 'used_mb': 2048, 'free_mb': 2048}
def calculate_roi_coverage(self, components: List[Tuple[int, int, int, int]],
image_shape: Tuple[int, int]) -> float:
"""
ROI 면적 비율 계산
Args:
components: 컴포넌트 리스트
image_shape: 이미지 크기 (H, W)
Returns:
A (ROI 면적 비율) = (모든 ROI의 합산 면적) / (이미지 전체 면적)
"""
if not components:
return 0.0
total_image_area = image_shape[0] * image_shape[1]
total_roi_area = 0
for comp in components:
x1, y1, x2, y2 = comp
roi_area = (x2 - x1) * (y2 - y1)
total_roi_area += roi_area
return total_roi_area / total_image_area if total_image_area > 0 else 0.0
def choose_processing_strategy(self, image_shape: Tuple[int, int],
components: List[Tuple[int, int, int, int]],
config: Dict[str, Any]) -> str:
"""
🔥 GPU 메모리와 ROI 면적 비율을 고려한 하이브리드 전략 선택
핵심 지표:
- A (ROI 면적 비율) = (모든 ROI의 합산 면적) / (이미지 전체 면적)
- N (ROI 개수)
- H (GPU 여유 메모리 MB)
- S (이미지 크기, MP)
Returns:
'full': 전체 이미지 처리
'roi': ROI 기반 처리
'roi_parallel': ROI 병렬 전처리
"""
# 🔥 핵심 지표 계산
gpu_info = self.get_gpu_memory_info()
H = gpu_info['free_mb'] # GPU 여유 메모리 (MB)
total_pixels = image_shape[0] * image_shape[1]
S = total_pixels / 1_000_000 # 이미지 크기 (MP)
N = len(components) # ROI 개수
A = self.calculate_roi_coverage(components, image_shape) # ROI 면적 비율
# 🔥 조정 가능한 임계값 (config로 튜닝 가능)
memory_high_threshold = config.get('memory_high_threshold', 1200) # MB
memory_low_threshold = config.get('memory_low_threshold', 600) # MB
small_image_threshold = config.get('small_image_mp', 1.0) # MP
roi_area_high_threshold = config.get('roi_area_high', 0.60) # 60%
roi_area_low_threshold = config.get('roi_area_low', 0.30) # 30%
roi_count_threshold = config.get('roi_count_threshold', 3) # 개수
# 🚫 ROI 전략 완전 폐기: 작은 이미지에서는 항상 full이 더 빠름
# 성능 측정 결과 1.2초(full) vs 4-5초(roi)로 full이 압도적으로 빠름
strategy = "full"
reason = f"roi_strategy_deprecated(S={S:.1f}MP,A={A:.1%},N={N},H={H:.0f}MB)"
# 🔥 상세 성능 예측 로깅
estimated_roi_overhead = N * 0.02 + 0.03 # 컴포넌트당 20ms + 기본 30ms
estimated_processing_time = S * 0.5 # 메가픽셀당 0.5초 (SimpleLama 기준)
efficiency_ratio = estimated_roi_overhead / max(estimated_processing_time, 0.01)
self.logger.log(
f"[STRATEGY] 🎯 핵심 지표: S={S:.1f}MP, A={A:.1%}, N={N}, H={H:.0f}MB",
level=logging.INFO
)
self.logger.log(
f"[STRATEGY] 선택된 전략: {strategy} (이유: {reason})",
level=logging.INFO
)
self.logger.log(
f"[STRATEGY] 성능 예측: ROI오버헤드={estimated_roi_overhead:.3f}s, "
f"처리시간={estimated_processing_time:.3f}s, 효율비={efficiency_ratio:.2f}",
level=logging.INFO
)
return strategy
def cleanup_memory(self):
"""메모리 정리"""
import gc
gc.collect()
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except ImportError:
pass
self.logger.log("메모리 정리 완료", level=logging.INFO)
def get_optimal_bucket_size(self, height: int, width: int) -> Tuple[int, int, str]:
"""
🔥 이미지 크기를 성능 최적화된 버킷으로 정규화
Args:
height, width: 원본 이미지 크기
Returns:
(최적화된 높이, 너비, 버킷명)
"""
# 긴 변과 짧은 변 구분
long_side = max(height, width)
short_side = min(height, width)
is_portrait = height > width
# 🔥 사용자 이미지 크기 기반 버킷 매핑 (항상 원본보다 크거나 같게)
if long_side <= 800:
# 작은 이미지: 640×640 또는 832×512
if abs(height - width) < 100: # 정사각형에 가까움
bucket_h, bucket_w = 640, 640
bucket_name = "square_small"
else:
bucket_h, bucket_w = (832, 512) if is_portrait else (512, 832)
bucket_name = "landscape_standard"
elif long_side <= 1200:
# 중간 이미지: 웹툰 표준
if short_side >= 700: # 정사각형에 가까움
bucket_h, bucket_w = 1024, 832
bucket_name = "webtoon_standard"
else:
bucket_h, bucket_w = (1152, 896) if is_portrait else (896, 1152)
bucket_name = "webtoon_portrait"
elif long_side <= 2100:
# 🔥 긴 이미지: 원본 크기 고려하여 버킷 선택
if is_portrait:
# 세로형: 높이를 충분히 큰 버킷으로
bucket_h = max(2048, ((height // 64) + 1) * 64) # 64배수로 올림
bucket_w = max(896, ((width // 64) + 1) * 64)
else:
# 가로형: 너비를 충분히 큰 버킷으로
bucket_w = max(2048, ((width // 64) + 1) * 64)
bucket_h = max(896, ((height // 64) + 1) * 64)
bucket_name = "webtoon_long"
else:
# 🔥 매우 큰 이미지: 원본 크기보다 크게
if is_portrait:
bucket_h = ((height // 128) + 1) * 128 # 128배수로 올림
bucket_w = max(1280, ((width // 64) + 1) * 64)
else:
bucket_w = ((width // 128) + 1) * 128
bucket_h = max(768, ((height // 64) + 1) * 64)
bucket_name = "landscape_wide"
# 🔥 최종 방향 조정 (항상 원본보다 크거나 같게 보장)
if is_portrait:
final_h = max(height, bucket_h)
final_w = max(width, bucket_w)
else:
final_h = max(height, bucket_h)
final_w = max(width, bucket_w)
self.logger.log(
f"🔧 형상 버킷 최적화: {height}×{width}{final_h}×{final_w} ({bucket_name})",
level=logging.INFO
)
return final_h, final_w, bucket_name
def apply_optimal_padding(self, image: np.ndarray, mask: np.ndarray) -> Tuple[np.ndarray, np.ndarray, Dict]:
"""
🔥 성능 최적화된 크기로 패딩
Args:
image, mask: 원본 이미지와 마스크
Returns:
(패딩된 이미지, 패딩된 마스크, 복원 정보)
"""
original_h, original_w = image.shape[:2]
target_h, target_w, bucket_name = self.get_optimal_bucket_size(original_h, original_w)
# 🔥 패딩 값 계산 및 안전성 검증
pad_h = target_h - original_h
pad_w = target_w - original_w
# 🔥 음수 패딩 방지 (타겟이 원본보다 작을 경우)
if pad_h < 0 or pad_w < 0:
self.logger.log(
f"⚠️ 버킷 크기 오류: 원본({original_h}×{original_w}) > 타겟({target_h}×{target_w}), 패딩 건너뜀",
level=logging.WARNING
)
# 패딩 없이 원본 반환
return image, mask, {
'original_size': (original_h, original_w),
'target_size': (original_h, original_w),
'bucket_name': bucket_name + "_no_padding",
'padding': (0, 0, 0, 0)
}
pad_top = pad_h // 2
pad_bottom = pad_h - pad_top
pad_left = pad_w // 2
pad_right = pad_w - pad_left
# 🔥 추가 안전성 검증
if pad_top < 0 or pad_bottom < 0 or pad_left < 0 or pad_right < 0:
self.logger.log(
f"⚠️ 패딩 값 오류: top={pad_top}, bottom={pad_bottom}, left={pad_left}, right={pad_right}",
level=logging.ERROR
)
return image, mask, {
'original_size': (original_h, original_w),
'target_size': (original_h, original_w),
'bucket_name': bucket_name + "_error",
'padding': (0, 0, 0, 0)
}
# 이미지 패딩 (reflect로 자연스럽게)
padded_image = cv2.copyMakeBorder(
image, pad_top, pad_bottom, pad_left, pad_right,
borderType=cv2.BORDER_REFLECT
)
# 마스크 패딩 (상수로)
padded_mask = cv2.copyMakeBorder(
mask, pad_top, pad_bottom, pad_left, pad_right,
borderType=cv2.BORDER_CONSTANT, value=0
)
restore_info = {
'original_size': (original_h, original_w),
'target_size': (target_h, target_w),
'bucket_name': bucket_name,
'padding': (pad_top, pad_bottom, pad_left, pad_right)
}
self.logger.log(
f"🔧 패딩 적용: {original_h}×{original_w}{target_h}×{target_w} "
f"(padding: top={pad_top}, bottom={pad_bottom}, left={pad_left}, right={pad_right})",
level=logging.INFO
)
return padded_image, padded_mask, restore_info
def restore_from_padding(self, image: np.ndarray, restore_info: Dict) -> np.ndarray:
"""패딩된 이미지를 원본 크기로 복원"""
pad_top, pad_bottom, pad_left, pad_right = restore_info['padding']
original_h, original_w = restore_info['original_size']
# 패딩 제거
if pad_bottom == 0:
cropped = image[pad_top:, :]
else:
cropped = image[pad_top:-pad_bottom, :]
if pad_right == 0:
cropped = cropped[:, pad_left:]
else:
cropped = cropped[:, pad_left:-pad_right]
# 최종 크기 검증 및 리사이즈
if cropped.shape[:2] != (original_h, original_w):
cropped = cv2.resize(cropped, (original_w, original_h), interpolation=cv2.INTER_CUBIC)
return cropped
def get_processing_stats(self, image: np.ndarray, mask: np.ndarray) -> Dict[str, Any]:
"""
처리 통계 정보 반환 (실제 처리 없이 분석만)
Args:
image: 입력 이미지
mask: 이진 마스크
Returns:
처리 통계 딕셔너리
"""
binary_mask = (mask > 128).astype(np.uint8) * 255
components = self.find_mask_components(binary_mask, self.default_config)
merged_components = self.merge_nearby_components(components)
total_area = image.shape[0] * image.shape[1]
roi_areas = []
for comp_bbox in merged_components:
roi_bbox = self.expand_roi(comp_bbox, image.shape)
x1, y1, x2, y2 = roi_bbox
roi_area = (x2 - x1) * (y2 - y1)
roi_areas.append(roi_area)
return {
'total_image_size': total_area,
'num_components': len(components),
'num_merged_rois': len(merged_components),
'roi_areas': roi_areas,
'total_roi_area': sum(roi_areas),
'roi_coverage_ratio': sum(roi_areas) / total_area if total_area > 0 else 0.0,
'will_process_full': self.should_process_full_image(components, image.shape),
'memory_efficiency': 1.0 - (sum(roi_areas) / total_area) if not self.should_process_full_image(components, image.shape) else 0.0
}
def should_use_batch_processing(self, roi_infos: List[Dict[str, Any]]) -> bool:
"""
ROI 배치 처리 사용 여부 결정
Args:
roi_infos: ROI 정보 리스트
Returns:
배치 처리 사용 여부
"""
if len(roi_infos) < 2:
return False
# 모든 ROI가 작은 경우에만 배치 처리
batch_threshold = self.default_config.get('batch_processing_threshold', 256 * 256)
for roi_info in roi_infos:
if roi_info['skip_processing']:
continue
h, w = roi_info['original_size']
if h * w > batch_threshold:
return False
return True
def create_roi_batch(self, roi_infos: List[Dict[str, Any]]) -> Tuple[np.ndarray, np.ndarray, List[Dict[str, Any]]]:
"""
🔥 작은 ROI들을 하나의 배치로 결합하여 처리 효율성 향상
Args:
roi_infos: ROI 정보 리스트
Returns:
(배치 이미지, 배치 마스크, 배치 정보 리스트)
"""
valid_rois = [roi for roi in roi_infos if not roi['skip_processing']]
if len(valid_rois) < 2:
return None, None, []
# 🔥 최대 크기 계산
max_h = max(roi['normalized_image'].shape[0] for roi in valid_rois)
max_w = max(roi['normalized_image'].shape[1] for roi in valid_rois)
# 8의 배수로 올림
max_h = ((max_h + 7) // 8) * 8
max_w = ((max_w + 7) // 8) * 8
# 배치 크기 결정 (2x2 그리드)
grid_size = int(np.ceil(np.sqrt(len(valid_rois))))
batch_h = max_h * grid_size
batch_w = max_w * grid_size
# 배치 이미지/마스크 생성
batch_image = np.zeros((batch_h, batch_w, 3), dtype=np.uint8)
batch_mask = np.zeros((batch_h, batch_w), dtype=np.uint8)
batch_info = []
for i, roi_info in enumerate(valid_rois):
row = i // grid_size
col = i % grid_size
y_start = row * max_h
x_start = col * max_w
roi_img = roi_info['normalized_image']
roi_mask = roi_info['normalized_mask']
h, w = roi_img.shape[:2]
# ROI를 배치에 배치
batch_image[y_start:y_start+h, x_start:x_start+w] = roi_img
batch_mask[y_start:y_start+h, x_start:x_start+w] = roi_mask
batch_info.append({
**roi_info,
'batch_position': (y_start, x_start, y_start+h, x_start+w),
'roi_size': (h, w)
})
self.logger.log(
f"🔥 ROI 배치 생성: {len(valid_rois)}개 ROI → {batch_w}x{batch_h} 배치 ({grid_size}x{grid_size} 그리드)",
level=logging.INFO
)
return batch_image, batch_mask, batch_info
def refine_mask(self, mask: np.ndarray, config: Dict[str, Any] = None) -> np.ndarray:
"""
🔥 마스크 품질 개선을 위한 고급 정제
Args:
mask: 원본 마스크
config: 설정 오버라이드
Returns:
정제된 마스크
"""
if config is None:
config = self.default_config
if not config.get('enable_mask_refinement', True):
return mask
refined_mask = mask.copy()
# 🔥 1단계: 작은 노이즈 제거 (Opening)
erosion_kernel = config.get('mask_erosion_kernel', 2)
if erosion_kernel > 0:
kernel = np.ones((erosion_kernel, erosion_kernel), np.uint8)
refined_mask = cv2.morphologyEx(refined_mask, cv2.MORPH_OPEN, kernel)
# 🔥 2단계: 마스크 영역 확장 (텍스트 경계 완전 커버)
dilation_kernel = config.get('mask_dilation_kernel', 3)
if dilation_kernel > 0:
kernel = np.ones((dilation_kernel, dilation_kernel), np.uint8)
refined_mask = cv2.dilate(refined_mask, kernel, iterations=1)
# 🔥 3단계: 부드러운 경계 생성
blur_kernel = config.get('mask_blur_kernel', 5)
if blur_kernel > 0 and blur_kernel % 2 == 1:
refined_mask = cv2.GaussianBlur(refined_mask, (blur_kernel, blur_kernel), 0)
# 블러 후 다시 이진화 (128 이상을 255로)
refined_mask = np.where(refined_mask > 128, 255, 0).astype(np.uint8)
self.logger.log(
f"🔧 마스크 정제 완료: erosion={erosion_kernel}, dilation={dilation_kernel}, blur={blur_kernel}",
level=logging.INFO
)
return refined_mask
def expand_roi_with_context(self, bbox: Tuple[int, int, int, int],
image_shape: Tuple[int, int],
config: Dict[str, Any] = None) -> Tuple[int, int, int, int]:
"""
🔥 컨텍스트를 고려한 ROI 확장 (더 넓은 영역으로 품질 향상)
Args:
bbox: 원본 바운딩 박스
image_shape: 이미지 크기
config: 설정
Returns:
확장된 바운딩 박스
"""
if config is None:
config = self.default_config
# 기본 여백 + 컨텍스트 확장
base_margin_ratio = config.get('margin_ratio', 0.15)
context_expansion = config.get('context_expansion_ratio', 0.3)
total_margin_ratio = base_margin_ratio + context_expansion
h, w = image_shape[:2]
x1, y1, x2, y2 = bbox
# 현재 크기 기준으로 여백 계산
roi_w, roi_h = x2 - x1, y2 - y1
margin_x = int(roi_w * total_margin_ratio)
margin_y = int(roi_h * total_margin_ratio)
# 이미지 경계 내로 제한
x1 = max(0, x1 - margin_x)
y1 = max(0, y1 - margin_y)
x2 = min(w, x2 + margin_x)
y2 = min(h, y2 + margin_y)
self.logger.log(
f"🔧 컨텍스트 확장: 기본 여백 {base_margin_ratio:.1%} + 컨텍스트 {context_expansion:.1%} = {total_margin_ratio:.1%}",
level=logging.INFO
)
return (x1, y1, x2, y2)
def _setup_cudnn_optimization(self):
"""🔥 강화된 cuDNN 최적화 설정"""
try:
import torch
if torch.cuda.is_available():
# 🔥 cuDNN 벤치마크 활성화 - 첫 실행에서 최적 알고리즘 찾아서 캐시
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.deterministic = False
# 🔥 메모리 형식 최적화
torch.backends.cudnn.allow_tf32 = True
torch.backends.cuda.matmul.allow_tf32 = True
# 🔥 메모리 할당 전략 최적화
torch.cuda.empty_cache() # 기존 캐시 정리
# 🔥 cuDNN 알고리즘 캐시 예열 (동일 크기 더미 텐서로 미리 탐색)
self._warmup_cudnn_algorithms()
self.logger.log("🔥 강화된 cuDNN 최적화 설정 완료", level=logging.INFO)
except ImportError:
self.logger.log("cuDNN 라이브러리를 찾을 수 없습니다. cuDNN 최적화를 사용할 수 없습니다.", level=logging.WARNING)
def _warmup_cudnn_algorithms(self):
"""🔥 cuDNN 알고리즘 캐시 예열"""
try:
import torch
import torch.nn.functional as F
# 🔥 일반적인 웹툰 크기들로 cuDNN 알고리즘 미리 탐색
common_sizes = [
(1, 3, 640, 640), # 작은 정사각형
(1, 3, 832, 1024), # 웹툰 표준
(1, 3, 1200, 816), # 현재 샘플 크기
(1, 3, 896, 1152), # 웹툰 세로형
]
with torch.no_grad():
for size in common_sizes:
try:
# 더미 텐서 생성
dummy_input = torch.randn(size, device='cuda', dtype=torch.float32)
# 일반적인 연산들로 cuDNN 알고리즘 탐색 트리거
_ = F.conv2d(dummy_input, torch.randn(32, 3, 3, 3, device='cuda'), padding=1)
_ = F.conv2d(dummy_input, torch.randn(64, 3, 5, 5, device='cuda'), padding=2)
_ = F.interpolate(dummy_input, scale_factor=0.5, mode='bilinear')
_ = F.interpolate(dummy_input, scale_factor=2.0, mode='bilinear')
del dummy_input # 즉시 메모리 해제
except Exception as e:
# 개별 크기 실패해도 계속 진행
continue
# 메모리 정리
torch.cuda.empty_cache()
self.logger.log("🔥 cuDNN 알고리즘 캐시 예열 완료", level=logging.INFO)
except Exception as e:
self.logger.log(f"cuDNN 예열 중 오류 (무시): {e}", level=logging.DEBUG)
def record_performance(self, bucket_name: str, processing_time: float, strategy: str):
"""🔥 버킷별 성능 히스토리 기록"""
if bucket_name not in self.bucket_performance_history:
self.bucket_performance_history[bucket_name] = {
'times': [],
'strategies': [],
'avg_time': 0.0,
'slow_count': 0
}
history = self.bucket_performance_history[bucket_name]
history['times'].append(processing_time)
history['strategies'].append(strategy)
# 최근 5개 기록만 유지
if len(history['times']) > 5:
history['times'] = history['times'][-5:]
history['strategies'] = history['strategies'][-5:]
# 평균 시간 계산
history['avg_time'] = sum(history['times']) / len(history['times'])
# 느린 처리 카운트 (2초 이상)
history['slow_count'] = sum(1 for t in history['times'] if t > 2.0)
self.logger.log(
f"📊 성능 기록: {bucket_name} ({strategy}) = {processing_time:.2f}s, 평균: {history['avg_time']:.2f}s",
level=logging.INFO
)
def should_fallback_to_roi(self, bucket_name: str, predicted_time: float) -> bool:
"""🚫 성능 히스토리 기반 ROI 폴백 완전 비활성화"""
# 🔥 성능 히스토리 폴백은 1초대 성능을 4-5초로 악화시키므로 완전 비활성화
self.logger.log(
f"🚫 ROI 폴백 비활성화: {bucket_name} (성능 악화 방지를 위해 full 전략 강제)",
level=logging.INFO
)
return False
# 🔥 조건 1: 평균 시간이 예측치의 4배 이상
if history['avg_time'] > predicted_time * 4:
self.logger.log(
f"⚡ ROI 폴백 트리거: {bucket_name} 평균 {history['avg_time']:.2f}s > 예측 {predicted_time:.2f}s × 4",
level=logging.WARNING
)
return True
# 🔥 조건 2: 최근 3회 중 2회 이상이 느림
if len(history['times']) >= 3 and history['slow_count'] >= 2:
self.logger.log(
f"⚡ ROI 폴백 트리거: {bucket_name} 최근 {history['slow_count']}/3회 느림",
level=logging.WARNING
)
return True
return False
def _process_full_image_optimized(self, image: np.ndarray, mask: np.ndarray,
config: Dict[str, Any], bucket_name: str) -> np.ndarray:
"""🔥 최적화된 전체 이미지 처리"""
self.logger.log(f"전체 이미지 처리 시작 (버킷: {bucket_name})", level=logging.INFO)
# 이진 마스크로 변환
binary_mask = (mask > 128).astype(np.uint8) * 255
# 크기 정규화
normalized_image, normalized_mask, normalized_size = self.normalize_roi_size(image, binary_mask)
# SimpleLama 처리
simple_lama = self._get_simple_lama()
normalized_image_rgb = cv2.cvtColor(normalized_image, cv2.COLOR_BGR2RGB)
result_pil = simple_lama(normalized_image_rgb, normalized_mask)
result = np.array(result_pil)
result_bgr = cv2.cvtColor(result, cv2.COLOR_RGB2BGR)
# 원본 크기로 복원
restored_result = self.restore_roi_size(result_bgr, normalized_size)
return restored_result
def _process_roi_sequential(self, image: np.ndarray, mask: np.ndarray,
components: List, config: Dict[str, Any]) -> np.ndarray:
"""🔥 순차 ROI 처리"""
self.logger.log("ROI 순차 처리 시작", level=logging.INFO)
binary_mask = (mask > 128).astype(np.uint8) * 255
merged_components = self.merge_nearby_components(components, config['merge_distance'])
result_image = image.copy()
successful_rois = 0
for i, comp_bbox in enumerate(merged_components):
# ROI 영역 확장
if config.get('enable_mask_refinement', False):
roi_bbox = self.expand_roi_with_context(comp_bbox, image.shape, config)
else:
roi_bbox = self.expand_roi(comp_bbox, image.shape, margin_ratio=config['margin_ratio'])
# ROI 처리
processed_roi, success = self.process_roi(image, binary_mask, roi_bbox, config)
if success:
x1, y1, x2, y2 = roi_bbox
result_image[y1:y2, x1:x2] = processed_roi
successful_rois += 1
self.logger.log(f"ROI 순차 처리 완료: {successful_rois}/{len(merged_components)} 성공", level=logging.INFO)
return result_image
def _process_roi_parallel(self, image: np.ndarray, mask: np.ndarray,
components: List, config: Dict[str, Any]) -> np.ndarray:
"""🔥 병렬 ROI 처리"""
self.logger.log("ROI 병렬 처리 시작", level=logging.INFO)
binary_mask = (mask > 128).astype(np.uint8) * 255
merged_components = self.merge_nearby_components(components, config['merge_distance'])
# ROI 전처리를 병렬로 수행
prepared_rois = self.prepare_rois_parallel(image, binary_mask, merged_components, config)
result_image = image.copy()
successful_rois = 0
for roi_info in prepared_rois:
if roi_info['skip_processing']:
continue
# SimpleLama 처리
simple_lama = self._get_simple_lama()
roi_normalized_rgb = cv2.cvtColor(roi_info['normalized_image'], cv2.COLOR_BGR2RGB)
roi_result_pil = simple_lama(roi_normalized_rgb, roi_info['normalized_mask'])
roi_result = np.array(roi_result_pil)
roi_result_bgr = cv2.cvtColor(roi_result, cv2.COLOR_RGB2BGR)
# 원본 크기로 복원
restored_roi = self.restore_roi_size(roi_result_bgr, roi_info['original_size'])
# 부드러운 블렌딩
blend_mask = self.create_blend_mask(roi_info['roi_mask'], config)
blended_roi = (restored_roi * blend_mask + roi_info['roi_image'] * (1 - blend_mask)).astype(np.uint8)
# 원본 이미지에 적용
x1, y1, x2, y2 = roi_info['roi_bbox']
result_image[y1:y2, x1:x2] = blended_roi
successful_rois += 1
self.logger.log(f"ROI 병렬 처리 완료: {successful_rois}/{len(merged_components)} 성공", level=logging.INFO)
return result_image
# 편의 함수들
def create_roi_inpainter(logger=None, config=None):
"""ROI 인페인팅 모듈 팩토리 함수"""
inpainter = ROIInpaintingModule(logger)
if config:
inpainter.default_config.update(config)
return inpainter
def quick_roi_inpaint(image: np.ndarray, mask: np.ndarray,
logger=None, config=None) -> np.ndarray:
"""간단한 ROI 인페인팅 수행"""
inpainter = create_roi_inpainter(logger, config)
result = inpainter.inpaint_with_roi(image, mask, config)
inpainter.cleanup_memory()
return result