diff --git a/test1/translated_result.png b/test1/translated_result.png index 8041fe0..c04da4c 100644 Binary files a/test1/translated_result.png and b/test1/translated_result.png differ diff --git a/test1/worker_test.py b/test1/worker_test.py index 8b6a15a..724bf95 100644 --- a/test1/worker_test.py +++ b/test1/worker_test.py @@ -16,7 +16,7 @@ import requests API_ROOT = "http://localhost:7890" # 메인 서버 주소 -IMAGE_PATH = pathlib.Path("3.jpg") +IMAGE_PATH = pathlib.Path("5.jpg") TIMEOUT = 120 # 초 unwanted_texts = { @@ -37,10 +37,13 @@ unwanted_texts = { toggle_states = {"inpaint_method": "lama:cuda", "min_masks_for_lama": 2, 'title': False, 'title_shuffle': False, 'title_trans_type': False, 'collect_method_combo': '쇼핑API', 'ocr': True, 'unwanted_words': {'할인': '', '무료': '', '증정': '', '이벤트': '', '특가': '', '세일': '', '사은품': '', '보증': '', '품절': '', '행사': '', '할인가': '', '무료배송': '', '가격설명': ''}, 'interval': 3.0, 'watingTime': 20, 'memo': False, 'memo_toggle_exposer': False, 'memo_toggle_order': False, 'optionTrnas': True, 'optionTrnas_method': True, 'optionIMGTrans': True, 'optionIMGTrans_type': '자체서버', 'optionAutoSelect': True, 'price': False, 'tag': False, 'tag_ai': False, 'thumb': False, 'thumb_trans_type': 'CPU', 'thumb_nukki': False, 'remove_background_white': True, 'detail_Option': False, 'detail_IMGTrans': True, 'detail_IMGTrans_type': '자체서버', 'debug_mode': True, 'ed_mode': False, 'discord': False, 'is_localServer': False, 'watermark_toggle': False, 'clientID': '', 'clientSecret': '', 'discord_webhook': '', 'watermark_text': '', 'thumb_rmb_count': 3, 'max_option_count': 6, 'opacity_percent': 20, 'group_index': 4, 'remove_overprice': False, 'cat_rec': False, 'fixed_keywords': False, 'fixed_keywords_count': 2, 'title_length_limit': 27, 'base_dir': 'C:\\Program Files\\Edit_PartTimer\\lib\\src', 'TEMP_IMAGE_DIR': 'C:\\Program Files\\Edit_PartTimer\\lib\\src\\temp_images', 'ERROR_SCREENSHOT_DIR': 'C:\\Program Files\\Edit_PartTimer\\lib\\src\\error_screenshots', 'image_font_path': 'C:\\Program Files\\Edit_PartTimer\\lib\\src\\fonts\\HakgyoansimDunggeunmisoTTFB.ttf', 'watermark_font_path': 'C:\\Program Files\\Edit_PartTimer\\lib\\src\\fonts\\HakgyoansimDunggeunmisoTTFB.ttf', 'request_inpainting_server_url': 'http://171.101.232.45:50205', 'request_rembg_server_url': 'http://171.101.232.45:50205', 'request_rembg_server_url_local': 'http://192.168.0.150:35756', 'membership_level': 'premium', 'image_worker_restart_every': 10, 'image_worker_restart_count': 0, 'products_per_context_restart': 19, 'is_admin': False, 'admin_id': 'matia0514@naver.com', 'admin_pw': '', 'user_id': 'dreamm8985', 'user_pw': '112233', 'unwanted_words_button': False, 'font_type': '폰트5', 'cmb_button': False, 'detail_text_button': False, 'watermark': False} toggle_states.update({ - "ab_mode": "B", - "roi_strategy_B": "full", - "max_side_B": 1600, - "_trace_id": "exp002" + "use_roi_optimized_mask": True, # True: 새 방식, False: 기존 방식 + "enable_mask_refinement": False, # ROI 마스크 정제 비활성화 + "context_expansion_ratio": 0.4, # 최소 확장 + "blend_mode": "simple", # 단순 블렌딩 + "performance_mode": True, # 빠른 경로 사용 + "max_image_size": 1280, # 더 작은 크기 제한 + }) def call_translate(img_path: pathlib.Path): diff --git a/worker/celery_worker.py b/worker/celery_worker.py index 468695b..292282b 100644 --- a/worker/celery_worker.py +++ b/worker/celery_worker.py @@ -15,37 +15,37 @@ from PIL import Image from celery import Celery from celery.utils.log import get_task_logger -# ── SimpleLama 가중치 경로 설정 (임포트 전에 환경 구성) -try: - torch_home = "/app/torch_cache" - os.makedirs(torch_home, exist_ok=True) - os.environ.setdefault("TORCH_HOME", torch_home) +# # ── SimpleLama 가중치 경로 설정 (임포트 전에 환경 구성) +# try: +# torch_home = "/app/torch_cache" +# os.makedirs(torch_home, exist_ok=True) +# os.environ.setdefault("TORCH_HOME", torch_home) - # ✅ 환경변수로 FP16 사용 여부를 제어 (기본: 0=끄기) - use_lama_fp16 = os.getenv("USE_LAMA_FP16", "0").strip() in {"1", "true", "True"} +# # ✅ 환경변수로 FP16 사용 여부를 제어 (기본: 0=끄기) +# use_lama_fp16 = os.getenv("USE_LAMA_FP16", "0").strip() in {"1", "true", "True"} - fp16_path = os.path.join(torch_home, "Big-LaMa.fp16.pt") - default_ckpt = os.path.join(torch_home, "big-lama.pt") +# fp16_path = os.path.join(torch_home, "Big-LaMa.fp16.pt") +# default_ckpt = os.path.join(torch_home, "big-lama.pt") - # 기본은 FP32 체크포인트를 우선 - if os.path.isfile(default_ckpt): - os.environ.setdefault("SIMPLE_LAMA_CKPT", default_ckpt) - elif os.path.isfile(fp16_path) and use_lama_fp16: - os.environ.setdefault("SIMPLE_LAMA_CKPT", fp16_path) +# # 기본은 FP32 체크포인트를 우선 +# if os.path.isfile(default_ckpt): +# os.environ.setdefault("SIMPLE_LAMA_CKPT", default_ckpt) +# elif os.path.isfile(fp16_path) and use_lama_fp16: +# os.environ.setdefault("SIMPLE_LAMA_CKPT", fp16_path) - # 🔧 [기존 문제 원인] FP16 → big-lama.pt 강제 링크/복사 로직 제거 - # 필요 시에만 FP16을 직접 지정해서 쓰도록 함. -except Exception: - pass +# # 🔧 [기존 문제 원인] FP16 → big-lama.pt 강제 링크/복사 로직 제거 +# # 필요 시에만 FP16을 직접 지정해서 쓰도록 함. +# except Exception: +# pass #from worker.ocr_module import OCRModule # ndarray 지원 버전 from worker.mask_module_for_paddle import MaskModule -# from worker.text_rendering_module import TextRenderingModule -from worker.text_rendering_module2 import TextRenderingModule +from worker.text_rendering_module import TextRenderingModule +# from worker.text_rendering_module2 import TextRenderingModule from worker.rembg_module import RembgRemover from worker.loggerModule import Logger -from simple_lama_inpainting import SimpleLama -from worker.inpaint_module import Inpainter, InpaintBackends +# from simple_lama_inpainting import SimpleLama +# from worker.inpaint_module import Inpainter, InpaintBackends from worker.utils_debug import save_debug_artifacts, draw_ocr_overlay from worker.roi_inpainting_module import ROIInpaintingModule @@ -111,34 +111,34 @@ def track_phase(phase: str, trace_id: Optional[str] = None): _TEMP = Path(os.getenv("TEMP_STORAGE", "/app/temp_files")) _TEMP.mkdir(exist_ok=True, parents=True) -_lama: SimpleLama | None = None +# _lama: SimpleLama | None = None _ocr = None _mask: MaskModule | None = None _text: TextRenderingModule | None = None -_inpainter: Inpainter | None = None +# _inpainter: Inpainter | None = None _roi_inpainter: ROIInpaintingModule | None = None _translator = get_translator() # ✅ 워커 부팅 시 생성 & 재사용 -def get_lama(): - global _lama - if _lama is None: - _lama = SimpleLama() - # 라마 초기화 직후 VRAM 스냅샷 - _gpu_tracker.log_snapshot(tag="after SimpleLama init") +# def get_lama(): +# global _lama +# if _lama is None: +# _lama = SimpleLama() +# # 라마 초기화 직후 VRAM 스냅샷 +# _gpu_tracker.log_snapshot(tag="after SimpleLama init") - return _lama +# return _lama -def get_inpainter() -> Inpainter: - global _inpainter - if _inpainter is None: - _inpainter = Inpainter( - logger=clogger, - default_backend=InpaintBackends.LAMA, # 기본값은 자유롭게 - # lama_onnx_fd_path="/app/worker/models/inpainting_lama_2025jan.onnx", - # lama_onnx_fd_device="gpu", # "cpu"도 가능 - # lama_onnx_fd_backend="trt" # "ort"=ONNX Runtime 기본 CPU/GPU 실행(CUDA 환경이면 GPU 사용 가능) "trt"=TensorRT 실행, "cuda"=ONNX Runtime CUDA Execution Provider, "cpu"=ONNX Runtime CPU Execution Provider - ) - return _inpainter +# def get_inpainter() -> Inpainter: +# global _inpainter +# if _inpainter is None: +# _inpainter = Inpainter( +# logger=clogger, +# default_backend=InpaintBackends.LAMA, # 기본값은 자유롭게 +# # lama_onnx_fd_path="/app/worker/models/inpainting_lama_2025jan.onnx", +# # lama_onnx_fd_device="gpu", # "cpu"도 가능 +# # lama_onnx_fd_backend="trt" # "ort"=ONNX Runtime 기본 CPU/GPU 실행(CUDA 환경이면 GPU 사용 가능) "trt"=TensorRT 실행, "cuda"=ONNX Runtime CUDA Execution Provider, "cpu"=ONNX Runtime CPU Execution Provider +# ) +# return _inpainter def get_ocr(): from worker.ocr_module import OCRModule @@ -180,19 +180,41 @@ except Exception: from celery.signals import worker_process_init @worker_process_init.connect def _warm_up_models(**_): + """워커 프로세스 초기화 시 모델들을 사전 로딩""" try: - # OCR 등도 여기서 미리 띄울 수 있음 + # 🔥 PyTorch 성능 최적화 설정 + import torch + if torch.cuda.is_available(): + # cuDNN 최적화 + torch.backends.cudnn.benchmark = True + torch.backends.cudnn.deterministic = False + + # TF32 활성화 (Ampere 이상 GPU에서 성능 향상) + torch.backends.cuda.matmul.allow_tf32 = True + torch.backends.cudnn.allow_tf32 = True + + # 메모리 형식 최적화 + torch.set_float32_matmul_precision('high') + + logger.info( + f"🔧 PyTorch 최적화 완료: " + f"cudnn.benchmark={torch.backends.cudnn.benchmark}, " + f"allow_tf32={torch.backends.cuda.matmul.allow_tf32}" + ) + + # 모델 사전 로딩 get_ocr() - # Inpainter 생성 - get_inpainter() + get_mask() + get_text() + # 🔥 ROI 인페인팅 모듈 사전 초기화 roi_inpainter = get_roi_inpainter() roi_inpainter._get_simple_lama() # SimpleLama 사전 로딩 - # 필요하다면 특정 백엔드 강제 초기화: - # get_inpainter()._get_lama_onnx_ort(backend_hint="cuda") - print("[warmup] models preloaded (including ROI SimpleLama)") + + logger.info("✅ 모델 사전 로딩 완료 (성능 최적화 포함)") + except Exception as e: - print(f"[warmup] skipped: {e}") + logger.warning(f"⚠️ 모델 사전 로딩 건너뜀: {e}") _warm_up_models() # ───────────────────────────────── 공통 헬퍼 @@ -229,80 +251,80 @@ def _parse_font_number_from_toggle(toggle_states: Dict[str, Any]) -> int | None: logger.warning(f"[font] font_type 파싱 실패: {e}") return None -def _parse_inpaint_backend( - toggle_states: Dict[str, Any], - *, - default_method: str = "lama", - default_backend: str = "ort", - default_min_masks_for_lama: int = 4 -) -> Tuple[str, str, int]: - """ - toggle_states에서 inpaint_method, backend, min_masks_for_lama 를 파싱. +# def _parse_inpaint_backend( +# toggle_states: Dict[str, Any], +# *, +# default_method: str = "lama", +# default_backend: str = "ort", +# default_min_masks_for_lama: int = 4 +# ) -> Tuple[str, str, int]: +# """ +# toggle_states에서 inpaint_method, backend, min_masks_for_lama 를 파싱. - 허용 표기(대소문자/공백 무시): - - method: - "opencv", "cv" - "lama", "lama_torch", "torch" - "lama_onnx_ort", "onnx_ort" # OpenCV DNN 경로 - "lama_onnx_fd", "onnx_fd", "fd" # FastDeploy(ORT/TRT/CUDA/CPU) - - backend (lama_onnx_fd / lama_onnx 전용): - "ort", "trt", "cuda", "cpu" - - 콜론 구분 지원: "lama_onnx_fd:trt", "lama_onnx_fd:ort" +# 허용 표기(대소문자/공백 무시): +# - method: +# "opencv", "cv" +# "lama", "lama_torch", "torch" +# "lama_onnx_ort", "onnx_ort" # OpenCV DNN 경로 +# "lama_onnx_fd", "onnx_fd", "fd" # FastDeploy(ORT/TRT/CUDA/CPU) +# - backend (lama_onnx_fd / lama_onnx 전용): +# "ort", "trt", "cuda", "cpu" +# - 콜론 구분 지원: "lama_onnx_fd:trt", "lama_onnx_fd:ort" - 키 없음/실패 시 기본값: - method = default_method ("lama_onnx_fd") - backend = default_backend ("ort") - min_masks_for_lama = default_min_masks_for_lama (4) +# 키 없음/실패 시 기본값: +# method = default_method ("lama_onnx_fd") +# backend = default_backend ("ort") +# min_masks_for_lama = default_min_masks_for_lama (4) - Returns: - (method_enum, backend_str, min_masks_for_lama:int) - """ +# Returns: +# (method_enum, backend_str, min_masks_for_lama:int) +# """ - # 1) 안전하게 읽기 - try: - raw = str((toggle_states or {}).get("inpaint_method", "")).strip().lower() - except Exception: - raw = "" - if not raw: - raw = f"{default_method}:{default_backend}" +# # 1) 안전하게 읽기 +# try: +# raw = str((toggle_states or {}).get("inpaint_method", "")).strip().lower() +# except Exception: +# raw = "" +# if not raw: +# raw = f"{default_method}:{default_backend}" - # 2) method / backend 분리 - if ":" in raw: - method_tok, backend_tok = [t.strip() for t in raw.split(":", 1)] - else: - method_tok, backend_tok = raw, default_backend +# # 2) method / backend 분리 +# if ":" in raw: +# method_tok, backend_tok = [t.strip() for t in raw.split(":", 1)] +# else: +# method_tok, backend_tok = raw, default_backend - # 3) method 매핑 - method_map = { - "opencv": InpaintBackends.OPENCV, - "cv": InpaintBackends.OPENCV, +# # 3) method 매핑 +# method_map = { +# "opencv": InpaintBackends.OPENCV, +# "cv": InpaintBackends.OPENCV, - "lama": InpaintBackends.LAMA, - "lama_torch": InpaintBackends.LAMA, - "torch": InpaintBackends.LAMA, +# "lama": InpaintBackends.LAMA, +# "lama_torch": InpaintBackends.LAMA, +# "torch": InpaintBackends.LAMA, - # ⬇️ 새 별칭들 - "lama_torch_amp": InpaintBackends.LAMA_TORCH_AMP, - "torch_amp": InpaintBackends.LAMA_TORCH_AMP, - "amp": InpaintBackends.LAMA_TORCH_AMP, +# # ⬇️ 새 별칭들 +# "lama_torch_amp": InpaintBackends.LAMA_TORCH_AMP, +# "torch_amp": InpaintBackends.LAMA_TORCH_AMP, +# "amp": InpaintBackends.LAMA_TORCH_AMP, - } - method_enum = method_map.get( - method_tok, - method_map.get(default_method, InpaintBackends.LAMA) - ) +# } +# method_enum = method_map.get( +# method_tok, +# method_map.get(default_method, InpaintBackends.LAMA) +# ) - # 4) backend 정규화 - backend_tok = (backend_tok or default_backend).lower() - backend_enum = backend_tok if backend_tok in {"ort", "trt", "cuda", "cpu"} else default_backend +# # 4) backend 정규화 +# backend_tok = (backend_tok or default_backend).lower() +# backend_enum = backend_tok if backend_tok in {"ort", "trt", "cuda", "cpu"} else default_backend - # 5) min_masks_for_lama 파싱 - try: - mmfl = int((toggle_states or {}).get("min_masks_for_lama", default_min_masks_for_lama)) - except (TypeError, ValueError): - mmfl = default_min_masks_for_lama +# # 5) min_masks_for_lama 파싱 +# try: +# mmfl = int((toggle_states or {}).get("min_masks_for_lama", default_min_masks_for_lama)) +# except (TypeError, ValueError): +# mmfl = default_min_masks_for_lama - return method_enum, backend_enum, mmfl +# return method_enum, backend_enum, mmfl # def run_inpaint( # src_bgr, @@ -350,118 +372,118 @@ def _parse_inpaint_backend( # ) -def run_inpaint( - src_bgr, - polygons, - toggle_states: Dict[str, Any], - *, - max_side: int = 1024, - auto_opencv_if_few: bool = True -): - """ - 기존 호출부 유지. toggle_states 로 A/B 모드 제어: - - ab_mode: "A" | "B" | "A+B" (기본 "A") - - A = components ROI (확대/근접 병합/소프트블렌딩) - - B = full-frame (ROI 미사용, 비교용) - """ - # 기존 파라미터 파싱 유지 - method_enum, backend_enum, min_masks_for_lama = _parse_inpaint_backend(toggle_states) - inpainter = get_inpainter() +# def run_inpaint( +# src_bgr, +# polygons, +# toggle_states: Dict[str, Any], +# *, +# max_side: int = 1024, +# auto_opencv_if_few: bool = True +# ): +# """ +# 기존 호출부 유지. toggle_states 로 A/B 모드 제어: +# - ab_mode: "A" | "B" | "A+B" (기본 "A") +# - A = components ROI (확대/근접 병합/소프트블렌딩) +# - B = full-frame (ROI 미사용, 비교용) +# """ +# # 기존 파라미터 파싱 유지 +# method_enum, backend_enum, min_masks_for_lama = _parse_inpaint_backend(toggle_states) +# inpainter = get_inpainter() - # ── 공통 토글 - ab_mode = str((toggle_states or {}).get("ab_mode", "A")).upper() # "A" | "B" | "A+B" - trace_id = (toggle_states or {}).get("_trace_id", None) - debug_root = os.getenv("DEBUG_DUMP_DIR", "/app/temp_files/debug") - ab_dir = os.path.join(debug_root, "AB") - try: - os.makedirs(ab_dir, exist_ok=True) - except Exception: - pass +# # ── 공통 토글 +# ab_mode = str((toggle_states or {}).get("ab_mode", "A")).upper() # "A" | "B" | "A+B" +# trace_id = (toggle_states or {}).get("_trace_id", None) +# debug_root = os.getenv("DEBUG_DUMP_DIR", "/app/temp_files/debug") +# ab_dir = os.path.join(debug_root, "AB") +# try: +# os.makedirs(ab_dir, exist_ok=True) +# except Exception: +# pass - # ── A(components ROI)용 kwargs: (값 없으면 기본 추천값 사용) - A_kwargs = dict( - backend=method_enum, # "lama" 권장 - roi_strategy=str((toggle_states or {}).get("roi_strategy_A", "components")).lower(), # "components" - max_side=int((toggle_states or {}).get("max_side_A", 1600)), - auto_opencv_if_few=bool((toggle_states or {}).get("auto_opencv_if_few", False)), - few_threshold=int((toggle_states or {}).get("few_threshold", 0)), - comp_min_area=int((toggle_states or {}).get("comp_min_area", 30)), - pad_ratio=float((toggle_states or {}).get("pad_ratio", 0.12)), - merge_thresh_factor=float((toggle_states or {}).get("merge_thresh_factor", 0.7)), - merge_abs_min_px=int((toggle_states or {}).get("merge_abs_min_px", 8)), - soft_dilate_px=int((toggle_states or {}).get("soft_dilate_px", 10)), - soft_blur_px=int((toggle_states or {}).get("soft_blur_px", 17)), - debug_save_rois=bool((toggle_states or {}).get("debug_save_rois", False)), - debug_dir=os.path.join(debug_root, "ROIs"), - request_id=trace_id - ) +# # ── A(components ROI)용 kwargs: (값 없으면 기본 추천값 사용) +# A_kwargs = dict( +# backend=method_enum, # "lama" 권장 +# roi_strategy=str((toggle_states or {}).get("roi_strategy_A", "components")).lower(), # "components" +# max_side=int((toggle_states or {}).get("max_side_A", 1600)), +# auto_opencv_if_few=bool((toggle_states or {}).get("auto_opencv_if_few", False)), +# few_threshold=int((toggle_states or {}).get("few_threshold", 0)), +# comp_min_area=int((toggle_states or {}).get("comp_min_area", 30)), +# pad_ratio=float((toggle_states or {}).get("pad_ratio", 0.12)), +# merge_thresh_factor=float((toggle_states or {}).get("merge_thresh_factor", 0.7)), +# merge_abs_min_px=int((toggle_states or {}).get("merge_abs_min_px", 8)), +# soft_dilate_px=int((toggle_states or {}).get("soft_dilate_px", 10)), +# soft_blur_px=int((toggle_states or {}).get("soft_blur_px", 17)), +# debug_save_rois=bool((toggle_states or {}).get("debug_save_rois", False)), +# debug_dir=os.path.join(debug_root, "ROIs"), +# request_id=trace_id +# ) - # ── B(full-frame)용 kwargs - B_kwargs = dict( - backend=method_enum, - roi_strategy=str((toggle_states or {}).get("roi_strategy_B", "full")).lower(), # "full" - max_side=int((toggle_states or {}).get("max_side_B", 1600)), - auto_opencv_if_few=False, - few_threshold=0, - # full 도 얇게 블렌딩 - soft_dilate_px=int((toggle_states or {}).get("soft_dilate_px_full", (toggle_states or {}).get("soft_dilate_px", 10))), - soft_blur_px=int((toggle_states or {}).get("soft_blur_px_full", (toggle_states or {}).get("soft_blur_px", 17))), - # 아래는 시그니처 호환용 - comp_min_area=int((toggle_states or {}).get("comp_min_area", 30)), - pad_ratio=float((toggle_states or {}).get("pad_ratio", 0.12)), - merge_thresh_factor=float((toggle_states or {}).get("merge_thresh_factor", 0.7)), - merge_abs_min_px=int((toggle_states or {}).get("merge_abs_min_px", 8)), - debug_save_rois=False, - debug_dir=None, - request_id=trace_id - ) +# # ── B(full-frame)용 kwargs +# B_kwargs = dict( +# backend=method_enum, +# roi_strategy=str((toggle_states or {}).get("roi_strategy_B", "full")).lower(), # "full" +# max_side=int((toggle_states or {}).get("max_side_B", 1600)), +# auto_opencv_if_few=False, +# few_threshold=0, +# # full 도 얇게 블렌딩 +# soft_dilate_px=int((toggle_states or {}).get("soft_dilate_px_full", (toggle_states or {}).get("soft_dilate_px", 10))), +# soft_blur_px=int((toggle_states or {}).get("soft_blur_px_full", (toggle_states or {}).get("soft_blur_px", 17))), +# # 아래는 시그니처 호환용 +# comp_min_area=int((toggle_states or {}).get("comp_min_area", 30)), +# pad_ratio=float((toggle_states or {}).get("pad_ratio", 0.12)), +# merge_thresh_factor=float((toggle_states or {}).get("merge_thresh_factor", 0.7)), +# merge_abs_min_px=int((toggle_states or {}).get("merge_abs_min_px", 8)), +# debug_save_rois=False, +# debug_dir=None, +# request_id=trace_id +# ) - # ── 실행 래퍼 (결과 파일도 저장) - def _run_and_save(label: str, kwargs: Dict[str, Any]) -> np.ndarray: - out = inpainter.inpaint(src_bgr, polygons, **kwargs) - try: - fname = f"{(trace_id or 'ab')}_{label}.png" - cv2.imwrite(os.path.join(ab_dir, fname), out) - except Exception: - pass - return out +# # ── 실행 래퍼 (결과 파일도 저장) +# def _run_and_save(label: str, kwargs: Dict[str, Any]) -> np.ndarray: +# out = inpainter.inpaint(src_bgr, polygons, **kwargs) +# try: +# fname = f"{(trace_id or 'ab')}_{label}.png" +# cv2.imwrite(os.path.join(ab_dir, fname), out) +# except Exception: +# pass +# return out - # ── 모드 분기 - if ab_mode == "A": - return _run_and_save("A_components", A_kwargs) +# # ── 모드 분기 +# if ab_mode == "A": +# return _run_and_save("A_components", A_kwargs) - if ab_mode == "B": - return _run_and_save("B_full", B_kwargs) +# if ab_mode == "B": +# return _run_and_save("B_full", B_kwargs) - # ── "A+B": 좌우 합성 프리뷰 반환 (단일 결과는 파일로 저장됨) - outA = _run_and_save("A_components", A_kwargs) - outB = _run_and_save("B_full", B_kwargs) +# # ── "A+B": 좌우 합성 프리뷰 반환 (단일 결과는 파일로 저장됨) +# outA = _run_and_save("A_components", A_kwargs) +# outB = _run_and_save("B_full", B_kwargs) - # 높이 맞춰 좌우 스택 - h = min(outA.shape[0], outB.shape[0]) +# # 높이 맞춰 좌우 스택 +# h = min(outA.shape[0], outB.shape[0]) - def _resize_to_h(img, h): - if img.shape[0] == h: - return img - ratio = h / img.shape[0] - new_w = int(round(img.shape[1] * ratio)) - return cv2.resize(img, (new_w, h), interpolation=cv2.INTER_CUBIC) +# def _resize_to_h(img, h): +# if img.shape[0] == h: +# return img +# ratio = h / img.shape[0] +# new_w = int(round(img.shape[1] * ratio)) +# return cv2.resize(img, (new_w, h), interpolation=cv2.INTER_CUBIC) - a2 = _resize_to_h(outA, h) - b2 = _resize_to_h(outB, h) - combo = np.hstack([a2, b2]) +# a2 = _resize_to_h(outA, h) +# b2 = _resize_to_h(outB, h) +# combo = np.hstack([a2, b2]) - # 레이블(있으면 편함) - try: - cv2.putText(combo, "A: components ROI", (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,0), 4, cv2.LINE_AA) - cv2.putText(combo, "A: components ROI", (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2, cv2.LINE_AA) - cv2.putText(combo, "B: full-frame", (a2.shape[1] + 10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,0), 4, cv2.LINE_AA) - cv2.putText(combo, "B: full-frame", (a2.shape[1] + 10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2, cv2.LINE_AA) - cv2.imwrite(os.path.join(ab_dir, f"{(trace_id or 'ab')}_AplusB.png"), combo) - except Exception: - pass +# # 레이블(있으면 편함) +# try: +# cv2.putText(combo, "A: components ROI", (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,0), 4, cv2.LINE_AA) +# cv2.putText(combo, "A: components ROI", (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2, cv2.LINE_AA) +# cv2.putText(combo, "B: full-frame", (a2.shape[1] + 10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,0), 4, cv2.LINE_AA) +# cv2.putText(combo, "B: full-frame", (a2.shape[1] + 10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2, cv2.LINE_AA) +# cv2.imwrite(os.path.join(ab_dir, f"{(trace_id or 'ab')}_AplusB.png"), combo) +# except Exception: +# pass - return combo +# return combo # ───────────────────────────────── translate_task @celery_app.task(name="worker.translate_task", @@ -512,7 +534,34 @@ def translate_task(self, *, image_b64: str, filename: str, # # 2. 번역 with track_phase("MASK", trace_id): - mask = get_mask().create_masks_np(src_bgr, chn) # <─ ndarray 지원 + # 🔥 A/B 테스트: 기존 방식 vs ROI 최적화 방식 + use_roi_optimized_mask = toggle_states.get('use_roi_optimized_mask', False) # True → False로 변경 + + if use_roi_optimized_mask: + # 🔥 ROI 최적화: 적응형 마스크 생성 + mask = get_mask().create_masks_np( + src_bgr, chn, + for_roi_processing=True, + # 🔥 텍스트 개수에 따른 적응형 expansion + expansion_size=min(8, max(4, 10 - len(chn))), # 텍스트 많으면 작게, 적으면 크게 + blur_size=0 # ROI 모드에서는 블러 없음 + ) + mask_type = "ROI최적화" + else: + # 기존 방식: 전체 후처리 적용 + mask = get_mask().create_masks_np(src_bgr, chn) + mask_type = "기존방식" + + # 🔥 마스크 통계 로깅 + mask_pixels = np.sum(mask > 0) + total_pixels = mask.shape[0] * mask.shape[1] + mask_coverage = mask_pixels / total_pixels * 100 + + clogger.log( + f"🔧 {mask_type} 마스크 사용: 커버리지 {mask_coverage:.2f}% ({mask_pixels:,}/{total_pixels:,} 픽셀)", + level=logging.INFO + ) + if mask is None: return fail("MASK_ERR", "mask failed") @@ -530,6 +579,17 @@ def translate_task(self, *, image_b64: str, filename: str, 'merge_distance': toggle_states.get('merge_distance', 50), 'margin_ratio': toggle_states.get('margin_ratio', 0.15), 'large_mask_threshold': toggle_states.get('large_mask_threshold', 0.5), + # 🔥 마스크 정제 비활성화 (마스크 모듈에서 이미 최적화됨) + 'enable_mask_refinement': toggle_states.get('enable_mask_refinement', False), + 'mask_erosion_kernel': 0, # 비활성화 + 'mask_dilation_kernel': 0, # 비활성화 + 'mask_blur_kernel': 0, # 비활성화 + 'context_expansion_ratio': toggle_states.get('context_expansion_ratio', 0.1), # 줄임 + 'blend_mode': toggle_states.get('blend_mode', 'simple'), # 단순 블렌딩 + 'feather_blend_size': toggle_states.get('feather_blend_size', 5), # 줄임 + # 🔥 형상 최적화 설정 + 'enable_shape_optimization': toggle_states.get('enable_shape_optimization', True), + 'performance_tracking': toggle_states.get('performance_tracking', True), } # 처리 전 통계 로깅 @@ -557,15 +617,15 @@ def translate_task(self, *, image_b64: str, filename: str, logger.info(f"[TRACE][{trace_id}][font] 폰트 지정 없음 -> 기본 폰트(3번) 사용") with track_phase("RENDER", trace_id): - # out = get_text().render_text(dst_bgr, chn, ko, font_number=font_number) - out = get_text().render_with_market_preset( - image_bgr=dst_bgr, - ocr_results=chn, # [{'polygon': [[x,y]...], 'text':...}, ...] - translated_texts=ko, - market=toggle_states.get("market", "coupang"), # 'coupang'|'naver' - preset=toggle_states.get("preset", "basic"), # 'basic'|'badge'|'price' - font_number=font_number - ) + out = get_text().render_text(dst_bgr, chn, ko, font_number=font_number) + # out = get_text().render_with_market_preset( + # image_bgr=dst_bgr, + # ocr_results=chn, # [{'polygon': [[x,y]...], 'text':...}, ...] + # translated_texts=ko, + # market=toggle_states.get("market", "coupang"), # 'coupang'|'naver' + # preset=toggle_states.get("preset", "basic"), # 'basic'|'badge'|'price' + # font_number=font_number + # ) # 최종 diff --git a/worker/inpaint_module.py b/worker/inpaint_module.py deleted file mode 100644 index 9add310..0000000 --- a/worker/inpaint_module.py +++ /dev/null @@ -1,243 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import annotations -import os, cv2, numpy as np -from typing import List, Tuple, Optional -from PIL import Image - -# ── (옵션) LaMa -try: - from simple_lama_inpainting.models.model import SimpleLama - _HAVE_LAMA = True -except Exception: - _HAVE_LAMA = False - -class InpaintBackends: - OPENCV = "opencv" - LAMA = "lama" - LAMA_TORCH_AMP = "lama_torch_amp" # placeholder - -# ── 공통 유틸 -def polygons_to_mask(shape: Tuple[int,int], polygons: List[List[List[int]]]) -> np.ndarray: - h, w = shape - mask = np.zeros((h, w), dtype=np.uint8) - for poly in polygons: - pts = np.array(poly, dtype=np.int32).reshape(-1, 2) - cv2.fillPoly(mask, [pts], 255) - return mask - -def resize_long_side(img: np.ndarray, max_side: int) -> Tuple[np.ndarray, float]: - h, w = img.shape[:2] - if max(h, w) <= max_side: - return img, 1.0 - if h >= w: - s = max_side / float(h) - nh, nw = max_side, int(round(w * s)) - else: - s = max_side / float(w) - nw, nh = max_side, int(round(h * s)) - out = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_AREA) - return out, s - -def _soften_mask(mask: np.ndarray, *, dilate_px: int, blur_px: int) -> np.ndarray: - m = mask.copy() - if dilate_px > 0: - k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2*dilate_px+1, 2*dilate_px+1)) - m = cv2.dilate(m, k, iterations=1) - m = cv2.GaussianBlur(m, (blur_px | 1, blur_px | 1), 0) - return m - -# ── 컴포넌트/ROI 유틸 -def _connected_components(mask: np.ndarray, *, min_area: int = 30) -> List[Tuple[int,int,int,int]]: - num, _, stats, _ = cv2.connectedComponentsWithStats((mask > 0).astype(np.uint8), connectivity=8) - boxes = [] - for cid in range(1, num): - x, y, w, h, area = stats[cid] - if w > 1 and h > 1 and area >= min_area: - boxes.append((int(x), int(y), int(w), int(h))) - return boxes - -def _expand_box(b: Tuple[int,int,int,int], pad_ratio: float, W: int, H: int) -> Tuple[int,int,int,int]: - x, y, w, h = b - pad = int(round(max(w, h) * pad_ratio)) - x0 = max(0, x - pad); y0 = max(0, y - pad) - x1 = min(W, x + w + pad); y1 = min(H, y + h + pad) - return x0, y0, x1 - x0, y1 - y0 - -def _min_gap(a: Tuple[int,int,int,int], b: Tuple[int,int,int,int]) -> int: - ax, ay, aw, ah = a; bx, by, bw, bh = b - ar, ab = ax + aw, ay + ah - br, bb = bx + bw, by + bh - dx = max(0, max(ax - br, bx - ar)) - dy = max(0, max(ay - bb, by - ab)) - return max(dx, dy) # L∞ gap - -def _merge_close_boxes(boxes: List[Tuple[int,int,int,int]], *, thresh_px: int) -> List[Tuple[int,int,int,int]]: - if not boxes: return [] - n = len(boxes) - parent = list(range(n)) - def find(i): - while parent[i] != i: - parent[i] = parent[parent[i]] - i = parent[i] - return i - def union(i, j): - ri, rj = find(i), find(j) - if ri != rj: parent[rj] = ri - for i in range(n): - for j in range(i+1, n): - if _min_gap(boxes[i], boxes[j]) <= thresh_px: - union(i, j) - groups = {} - for i, b in enumerate(boxes): - r = find(i) - groups.setdefault(r, []).append(b) - merged = [] - for grp in groups.values(): - xs = [x for x,_,_,_ in grp]; ys = [y for _,y,_,_ in grp] - rs = [x+w for x,_,w,_ in grp]; bs = [y+h for _,y,_,h in grp] - x0, y0, x1, y1 = min(xs), min(ys), max(rs), max(bs) - merged.append((x0, y0, x1 - x0, y1 - y0)) - return merged - -class Inpainter: - def __init__(self, logger=None, - default_backend: str = InpaintBackends.LAMA, - lama_device: str = "cuda"): - self.logger = logger - self.default_backend = default_backend - self.lama_device = lama_device - self._lama: Optional[SimpleLama] = None - - def _log(self, msg): - if self.logger and hasattr(self.logger, "log"): self.logger.log(msg) - else: print(msg) - - def _get_lama(self): - if not _HAVE_LAMA: - raise RuntimeError("SimpleLama not installed") - if self._lama is None: - self._log("Init SimpleLama...") - self._lama = SimpleLama(device=self.lama_device) - return self._lama - - # 평탄 배경에 강한 OpenCV (필요 시 사용) - def _opencv_text_inpaint(self, img_bgr: np.ndarray, hard_mask: np.ndarray, - r1: int = 3, r2: int = 7) -> np.ndarray: - out1 = cv2.inpaint(img_bgr, hard_mask, r1, cv2.INPAINT_TELEA) - remain = (hard_mask > 0) & (np.abs(out1.astype(np.int16) - img_bgr.astype(np.int16)).max(axis=2) > 3) - out2 = cv2.inpaint(out1, (remain.astype(np.uint8) * 255), r2, cv2.INPAINT_TELEA) if remain.any() else out1 - return out2 - - def _run_backend(self, roi_img: np.ndarray, roi_mask: np.ndarray, backend: str) -> np.ndarray: - if backend == InpaintBackends.OPENCV: - return self._opencv_text_inpaint(roi_img, roi_mask) - elif backend in (InpaintBackends.LAMA, InpaintBackends.LAMA_TORCH_AMP): - lama = self._get_lama() - dst_pil = lama(Image.fromarray(cv2.cvtColor(roi_img, cv2.COLOR_BGR2RGB)), - Image.fromarray(roi_mask, "L")) - return cv2.cvtColor(np.array(dst_pil), cv2.COLOR_RGB2BGR) - else: - raise NotImplementedError(f"Backend {backend} not wired.") - - def inpaint(self, img_bgr: np.ndarray, polygons: List[List[List[int]]], - *, - backend: Optional[str] = None, - # 공통 - roi_strategy: str = "components", # "components" | "full" - max_side: int = 1600, - auto_opencv_if_few: bool = False, - few_threshold: int = 0, - # components 전용 - comp_min_area: int = 30, - pad_ratio: float = 0.12, - merge_thresh_factor: float = 0.7, - merge_abs_min_px: int = 8, - soft_dilate_px: int = 10, - soft_blur_px: int = 17, - # 디버그 저장 - debug_save_rois: bool = False, - debug_dir: Optional[str] = None, - request_id: Optional[str] = None) -> np.ndarray: - - backend = (backend or self.default_backend).lower() - H, W = img_bgr.shape[:2] - base_mask = polygons_to_mask((H, W), polygons) - - # ── 풀프레임 모드 - if roi_strategy == "full": - img_small, s = resize_long_side(img_bgr, max_side) - mask_small = cv2.resize(base_mask, (img_small.shape[1], img_small.shape[0]), - interpolation=cv2.INTER_NEAREST) if s != 1.0 else base_mask - dst_small = self._run_backend(img_small, mask_small, InpaintBackends.LAMA) - - # 소프트 블렌딩(테두리 얇게) - soft_small = _soften_mask(mask_small, dilate_px=soft_dilate_px, blur_px=soft_blur_px) - alpha = (soft_small.astype(np.float32) / 255.0)[..., None] - blended_small = (alpha * dst_small.astype(np.float32) + (1 - alpha) * img_small.astype(np.float32)).astype(np.uint8) - - out = cv2.resize(blended_small, (W, H), interpolation=cv2.INTER_CUBIC) if s != 1.0 else blended_small - return out - - # ── 컴포넌트 기반 ROI 모드 - boxes = _connected_components(base_mask, min_area=comp_min_area) - if not boxes: - return img_bgr.copy() - - heights = [h for _,_,_,h in boxes] - med_h = float(np.median(heights)) if heights else 0.0 - merge_px = max(merge_abs_min_px, int(round(med_h * merge_thresh_factor))) - merged = _merge_close_boxes(boxes, thresh_px=merge_px) - rois = [_expand_box(b, pad_ratio, W, H) for b in merged] - rois.sort(key=lambda r: (r[1]//32, r[0])) - - # 디버그 저장 준비 - save_idx = 0 - if debug_save_rois and debug_dir: - os.makedirs(debug_dir, exist_ok=True) - - out = img_bgr.copy() - - for (x, y, w, h) in rois: - if w <= 1 or h <= 1: - continue - - roi_img = out[y:y+h, x:x+w] - roi_mask = base_mask[y:y+h, x:x+w] - roi_soft = _soften_mask(roi_mask, dilate_px=soft_dilate_px, blur_px=soft_blur_px) - - roi_img_small, s = resize_long_side(roi_img, max_side) - if s != 1.0: - roi_mask_small = cv2.resize(roi_mask, (roi_img_small.shape[1], roi_img_small.shape[0]), - interpolation=cv2.INTER_NEAREST) - roi_soft_small = cv2.resize(roi_soft, (roi_img_small.shape[1], roi_img_small.shape[0]), - interpolation=cv2.INTER_LINEAR) - else: - roi_mask_small = roi_mask - roi_soft_small = roi_soft - - use_backend = (InpaintBackends.OPENCV if (auto_opencv_if_few and len(merged) <= few_threshold) - else InpaintBackends.LAMA if backend not in (InpaintBackends.OPENCV,) else backend) - - dst_small = self._run_backend(roi_img_small, roi_mask_small, use_backend) - if dst_small.shape[:2] != roi_img_small.shape[:2]: - dst_small = cv2.resize(dst_small, (roi_img_small.shape[1], roi_img_small.shape[0]), interpolation=cv2.INTER_CUBIC) - - # 소프트 블렌딩 - alpha = (roi_soft_small.astype(np.float32) / 255.0)[..., None] - blended_small = (alpha * dst_small.astype(np.float32) + - (1 - alpha) * roi_img_small.astype(np.float32)).astype(np.uint8) - - # 원 크기로 복원 - dst_roi = cv2.resize(blended_small, (w, h), interpolation=cv2.INTER_CUBIC) if s != 1.0 else blended_small - out[y:y+h, x:x+w] = dst_roi - - # ── 중간 저장 (원본/마스크/결과) - if debug_save_rois and debug_dir: - base = f"{request_id or 'req'}_roi{save_idx:02d}" - cv2.imwrite(os.path.join(debug_dir, base + "_img.png"), roi_img) - cv2.imwrite(os.path.join(debug_dir, base + "_mask.png"), roi_mask) - cv2.imwrite(os.path.join(debug_dir, base + "_soft.png"), roi_soft) - cv2.imwrite(os.path.join(debug_dir, base + "_dst.png"), dst_roi) - save_idx += 1 - - return out diff --git a/worker/inpaint_module2.py b/worker/inpaint_module2.py deleted file mode 100644 index eff0c96..0000000 --- a/worker/inpaint_module2.py +++ /dev/null @@ -1,917 +0,0 @@ -# -*- coding: utf-8 -*- -""" -통합 인페인팅 모듈 -- OpenCV 텍스트 최적화 인페인트 -- SimpleLama (PyTorch) -- LaMa ONNX (Hugging Face: opencv/inpainting_lama_2025jan.onnx) -- MiGAN / EdgeConnect 어댑터 자리 마련 - -사용 예: - from worker.inpaint_module import Inpainter, InpaintBackends - inp = Inpainter(default_backend=InpaintBackends.LAMA_TORCH, - lama_device="cuda", - lama_onnx_path="/app/worker/models/inpainting_lama_2025jan.onnx") - out = inp.inpaint(img_bgr, [poly1, poly2, ...], backend=None, max_side=1024, - auto_opencv_if_few=True, few_threshold=4) -""" -from __future__ import annotations -import os -import cv2 -import numpy as np -from typing import Dict, Any, List, Tuple, Optional -from PIL import Image -import threading - - -# ─────────────────────────────────────────────── -# 백엔드 식별자 -# ─────────────────────────────────────────────── -class InpaintBackends: - OPENCV = "opencv" - LAMA_TORCH = "lama_torch" - LAMA_ONNX_FD = "lama_onnx_fd" # FastDeploy 기반 - LAMA_ONNX_ORT = "lama_onnx_ort" # 순수 onnxruntime 기반 - MIGAN = "migan" # placeholder - EDGECONNECT = "edgeconnect" # placeholder - LAMA_TORCH_AMP = "lama_torch_amp" # 패치한 파일 위치 그대로 - - -# ─────────────────────────────────────────────── -# 유틸 -# ─────────────────────────────────────────────── -def _log(logger, msg, level=20): - """logger가 있으면 logger.log로, 없으면 print""" - if logger and hasattr(logger, "log"): - logger.log(msg, level=level) - else: - print(msg) - -def polygons_to_mask(shape: Tuple[int,int], polygons: List[List[List[int]]]) -> np.ndarray: - """폴리곤 리스트 -> 단일 바이너리 마스크(0/255)""" - h, w = shape - mask = np.zeros((h, w), dtype=np.uint8) - for poly in polygons: - pts = np.array(poly, dtype=np.int32).reshape(-1, 2) - cv2.fillPoly(mask, [pts], 255) - return mask - -def union_bbox_of_mask(mask: np.ndarray, pad_ratio: float = 0.1) -> Tuple[int,int,int,int]: - """마스크의 합집합 영역 bbox + 패딩""" - ys, xs = np.where(mask > 0) - if len(xs) == 0: - return 0,0,mask.shape[1],mask.shape[0] - x, y = int(xs.min()), int(ys.min()) - w, h = int(xs.max()-xs.min()+1), int(ys.max()-ys.min()+1) - pad = int(max(w,h) * pad_ratio) - x0 = max(0, x - pad); y0 = max(0, y - pad) - x1 = min(mask.shape[1], x + w + pad) - y1 = min(mask.shape[0], y + h + pad) - return x0, y0, x1 - x0, y1 - y0 - -def resize_long_side(img: np.ndarray, max_side: int) -> Tuple[np.ndarray, float]: - """가장 긴 변을 max_side로 맞춰 축소(확대 안함) + scale 반환""" - h, w = img.shape[:2] - if max(h, w) <= max_side: - return img, 1.0 - if h >= w: - scale = max_side / float(h) - nh, nw = max_side, int(w * scale) - else: - scale = max_side / float(w) - nw, nh = max_side, int(h * scale) - out = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_AREA) - return out, scale - -def _next_pow2(n: int) -> int: - return 1 if n <= 1 else 1 << (n - 1).bit_length() - -def _reflect_pad_to(img: np.ndarray, target_h: int, target_w: int) -> Tuple[np.ndarray, Tuple[int,int,int,int]]: - import cv2, numpy as np - h, w = img.shape[:2] - top = max(0, (target_h - h) // 2) - bottom = max(0, target_h - h - top) - left = max(0, (target_w - w) // 2) - right = max(0, target_w - w - left) - if top or bottom or left or right: - img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_REFLECT_101) - return img, (top, bottom, left, right) - -def _crop_by_pad(img: np.ndarray, pad: Tuple[int,int,int,int]) -> np.ndarray: - top, bottom, left, right = pad - if not (top or bottom or left or right): - return img - h, w = img.shape[:2] - return img[top:h-bottom, left:w-right] - -# ─────────────────────────────────────────────── -# OpenCV 텍스트 특화 인페인트 -# ─────────────────────────────────────────────── -def _opencv_text_inpaint(img_bgr: np.ndarray, mask: np.ndarray, - small_radius: int = 3, large_radius: int = 7, - dilate_px: int = 2, smooth_kernel: int = 3) -> np.ndarray: - """ - 텍스트 제거 최적화: - 1) 마스크 소폭 팽창 → 글자 테두리까지 포함 - 2) TELEA/r=3 1차 인페인트 - 3) 잔여 노이즈만 r=7로 2차 인페인트 - 4) 경계 feathering(가벼운 블렌딩) - """ - h, w = mask.shape[:2] - dil_k = max(0, int(dilate_px)) - if dil_k > 0: - kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2*dil_k+1, 2*dil_k+1)) - mask1 = cv2.dilate(mask, kernel, iterations=1) - else: - mask1 = mask.copy() - - out1 = cv2.inpaint(img_bgr, mask1, small_radius, cv2.INPAINT_TELEA) - - # 남은 영역만 큰 반경으로 한 번 더 - remain = (mask1 > 0) & (np.abs(out1.astype(np.int16) - img_bgr.astype(np.int16)).max(axis=2) > 3) - if remain.any(): - mask2 = (remain.astype(np.uint8) * 255) - out2 = cv2.inpaint(out1, mask2, large_radius, cv2.INPAINT_TELEA) - else: - out2 = out1 - - # Feathering - k = (smooth_kernel | 1) - blur = cv2.GaussianBlur(mask1, (k, k), 0) - alpha = (blur.astype(np.float32) / 255.0)[..., None] - blended = (alpha * out2.astype(np.float32) + (1 - alpha) * img_bgr.astype(np.float32)).astype(np.uint8) - return blended - - -# ─────────────────────────────────────────────── -# SimpleLama (PyTorch) 어댑터 -# ─────────────────────────────────────────────── -# _HAVE_LAMA_TORCH = False -# try: -# from simple_lama_inpainting.models.model import SimpleLama -# _HAVE_LAMA_TORCH = True -# except Exception: -# _HAVE_LAMA_TORCH = False - -_HAVE_LAMA_TORCH = False -try: - # 패치한 파일 위치 그대로 - from simple_lama_inpainting.models.model import SimpleLama - _HAVE_LAMA_TORCH = True -except Exception: - _HAVE_LAMA_TORCH = False - - -# ─────────────────────────────────────────────── -# LaMa ONNX 어댑터 (opencv/inpainting_lama) -# ─────────────────────────────────────────────── -class LamaOnnxORT: - def __init__(self, model_path: str, logger=None, providers=None, backend_hint: Optional[str]=None): - import os, onnxruntime as ort - - self._log = (lambda m: logger.log(m) if logger and hasattr(logger, "log") else print) - - # 세션 옵션 (원하면 스레드/그래프옵트 추가) - so = ort.SessionOptions() - so.log_severity_level = 2 - # so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL - # so.enable_mem_pattern = False - - avail = ort.get_available_providers() # ['TensorrtExecutionProvider','CUDAExecutionProvider','CPUExecutionProvider'] 기대 - self._log(f"[ORT] available providers={avail}") - - def _truthy(s: str) -> bool: - return str(s).lower() in ("1", "true", "yes", "on") - - # ── provider 리스트 구성 (우선순위: TRT → CUDA → CPU) - if providers is None: - # hint 강제 - if backend_hint and backend_hint.lower() == "cpu": - providers = ["CPUExecutionProvider"] - else: - providers = [] - - # Tensorrt EP (있으면 최우선) - if "TensorrtExecutionProvider" in avail: - # env 기반 옵션 주입 - trt_opts = { - "trt_engine_cache_enable": _truthy(os.getenv("ORT_TENSORRT_ENGINE_CACHE_ENABLE", "1")), - "trt_engine_cache_path": os.getenv("ORT_TENSORRT_CACHE_PATH", "/app/trt_cache"), - "trt_fp16_enable": _truthy(os.getenv("ORT_TENSORRT_FP16_ENABLE", "1")), - } - # 워크스페이스 (기본 1GB) - try: - trt_opts["trt_max_workspace_size"] = int(os.getenv("ORT_TENSORRT_MAX_WORKSPACE_SIZE", str(1 << 30))) - except Exception: - pass - - # 캐시 디렉토리 보장 - try: - os.makedirs(trt_opts["trt_engine_cache_path"], exist_ok=True) - except Exception: - pass - - providers.append(("TensorrtExecutionProvider", trt_opts)) - - # CUDA EP - if "CUDAExecutionProvider" in avail and (not backend_hint or backend_hint.lower() in ("cuda", "gpu")): - cuda_opts = { - # 선택 옵션들 — 버전에 따라 무시될 수 있음 - "cudnn_conv_use_max_workspace": "1", - # "do_copy_in_default_stream": "1", - } - providers.append(("CUDAExecutionProvider", cuda_opts)) - - # CPU EP (항상 폴백) - providers.append("CPUExecutionProvider") - - self._log(f"[ORT] providers={providers}") - self.sess = ort.InferenceSession(model_path, sess_options=so, providers=providers) - - # IO 이름 로깅 - self.input_name = self.sess.get_inputs()[0].name - self.output_name = self.sess.get_outputs()[0].name - self._log(f"[ORT] io: in={self.input_name}, out={self.output_name}") - - def infer(self, img_bgr: np.ndarray, mask_gray: np.ndarray) -> np.ndarray: - import numpy as np, cv2 - H, W = img_bgr.shape[:2] - target = 512 - need_resize = (H != target or W != target) - if need_resize: - img = cv2.resize(img_bgr, (target, target), cv2.INTER_AREA) - msk = cv2.resize(mask_gray, (target, target), cv2.INTER_NEAREST) - else: - img, msk = img_bgr, mask_gray - - img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0 - m = (msk.astype(np.float32) / 255.0)[..., None] - inp = np.concatenate([img_rgb, m], axis=2) # H,W,4 - blob = np.transpose(inp, (2,0,1))[None, ...] # 1,4,H,W - out = self.sess.run([self.output_name], {self.input_name: blob})[0] # 1,3,H,W - out_rgb = np.transpose(out[0], (1,2,0)) - out_rgb = np.clip(out_rgb, 0.0, 1.0) - out_bgr = cv2.cvtColor((out_rgb * 255.0).astype(np.uint8), cv2.COLOR_RGB2BGR) - if need_resize: - out_bgr = cv2.resize(out_bgr, (W, H), cv2.INTER_CUBIC) - return out_bgr - - def _log(self, msg): - if self.logger and hasattr(self.logger, "log"): - self.logger.log(msg) - else: - print(msg) - -# 추가: FastDeploy 기반 ONNX LaMa -class LamaOnnxFD: - """ - FastDeploy Runtime으로 inpainting_lama_2025jan.onnx 실행 - - model_path: ONNX 경로 - - device: "gpu" or "cpu" - - device_id: GPU index - - backend: "ort" | "trt" (기본 ort) - """ - def __init__(self, model_path: str, - device: str = "gpu", - device_id: int = 0, - backend: str = "ort", - logger=None): - print("LamaOnnxFD init") - import fastdeploy as fd - self.fd = fd - self.logger = logger - self.model_path = model_path - - opt = fd.RuntimeOption() - if device.lower() == "gpu": - opt.use_gpu(device_id) - # 백엔드 선택 - try: - if backend.lower() == "trt": - opt.use_trt_backend() - # 필요시 워크스페이스/FP16 설정 - opt.trt_option.enable_fp16 = True - opt.trt_option.max_workspace_size = 1 << 28 # 256MB - try: - print("TRT 프로필 설정 시작") - # 1) 입력 이름 알아내기 - in_infos = None - try: - tmp_rt = self.fd.Runtime(opt) - in_infos = tmp_rt.get_input_info() - del tmp_rt - except Exception: - pass - - # 2) 이름 모르면 첫 번째 입력을 "input" 가정 (나중에 로그로 확인) - input_names = [x.name for x in in_infos] if in_infos else ["input"] - - # 3) min/opt/max 프로필 등록 (예: 256~1024 사이 허용) - min_hw, opt_hw, max_hw = 256, 512, 1024 - for name in input_names: - # N,C,H,W = 1,4,* - opt.set_trt_input_shape(name, - min_shape=[1, 4, min_hw, min_hw], - opt_shape=[1, 4, opt_hw, opt_hw], - max_shape=[1, 4, max_hw, max_hw] - ) - except Exception: - print("TRT 프로필 설정 실패") - - try: - print("TRT 런타임 생성 시작") - self.runtime = self.fd.Runtime(opt) - in_infos = self.runtime.get_input_info() - out_infos = self.runtime.get_output_info() - self._log(f"[TRT] inputs={[ (i.name, i.shape) for i in in_infos ]}") - self._log(f"[TRT] outputs={[ (o.name, o.shape) for o in out_infos ]}") - except Exception as e: - self._log(f"[TRT] engine build failed: {e}") - # 안전하게 ORT fallback - opt_fallback = self.fd.RuntimeOption() - opt_fallback.use_ort_backend(); opt_fallback.use_gpu(device_id) - opt_fallback.set_model_path(model_path, model_format=self.fd.ModelFormat.ONNX) - self.runtime = self.fd.Runtime(opt_fallback) - self._log("[TRT] Fallback to ORT GPU") - print("TRT 런타임 생성 실패") - - elif backend.lower() == "cuda": - opt.use_ort_backend() - opt.use_gpu(device_id) - elif backend.lower() == "cpu": - opt.use_ort_backend() - opt.use_cpu() - else: # "ort" - opt.use_ort_backend() - opt.use_gpu(device_id) # GPU ORT - except Exception as e: - self._log(f"[LaMa-ONNX-FD] backend init failed ({backend}), fallback to ORT: {e}") - opt = self.fd.RuntimeOption() - opt.use_ort_backend(); opt.use_gpu(device_id) - - else: - opt.use_cpu() - opt.use_ort_backend() - opt.set_cpu_thread_num(2) - - # ONNX 모델 지정 - opt.set_model_path(model_path, model_format=self.fd.ModelFormat.ONNX) - - # Runtime 생성 - self.runtime = self.fd.Runtime(opt) - - # 입력/출력 메타 확인해두면 디버깅 쉬움 - try: - in_infos = self.runtime.get_input_info() - out_infos = self.runtime.get_output_info() - names_in = [x.name for x in in_infos] - names_out = [x.name for x in out_infos] - self._log(f"[LaMa-ONNX-FD] inputs={names_in} outputs={names_out}") - except Exception: - pass - - def _log(self, msg): - if self.logger and hasattr(self.logger, "log"): - self.logger.log(msg) - else: - print(msg) - - def infer(self, img_bgr, mask_gray): - import numpy as np, cv2 - H, W = img_bgr.shape[:2] - - # 모델이 자유 크기 지원이면 그대로, 아니면 512 정사각으로 - target = 512 - need_resize = not (H == target and W == target) - if need_resize: - img_resized = cv2.resize(img_bgr, (target, target), interpolation=cv2.INTER_AREA) - mask_resized = cv2.resize(mask_gray, (target, target), interpolation=cv2.INTER_NEAREST) - else: - img_resized, mask_resized = img_bgr, mask_gray - - # 전처리: BGR->RGB, [0,1], mask → [0,1], 채널 concat (img 3ch + mask 1ch = 4ch) - img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0 - m = (mask_resized.astype(np.float32) / 255.0)[..., None] - inp = np.concatenate([img_rgb, m], axis=2) # H,W,4 - blob = np.transpose(inp, (2,0,1))[None, ...].astype(np.float32) # 1,4,H,W - - # 추론 - outputs = self.runtime.infer([blob]) - out = outputs[0] # numpy array, shape (1,3,H,W) 예상 - out_rgb = np.transpose(out[0], (1,2,0)) - out_rgb = np.clip(out_rgb, 0.0, 1.0) - out_bgr = cv2.cvtColor((out_rgb * 255.0).astype(np.uint8), cv2.COLOR_RGB2BGR) - - if need_resize: - out_bgr = cv2.resize(out_bgr, (W, H), interpolation=cv2.INTER_CUBIC) - return out_bgr - -# ─────────────────────────────────────────────── -# 메인 Inpainter -# ─────────────────────────────────────────────── -from PIL import Image -import threading - -_SIMPLE_LAMA_SINGLETON = None -class Inpainter: - _lock = threading.Lock() # 내부 초기화 경쟁 방지 - - - def __init__(self, - logger=None, - default_backend: str = InpaintBackends.LAMA_TORCH, - lama_device: str = "cuda", - lama_onnx_ort_path: Optional[str] = None, - lama_onnx_ort_providers: Optional[list] = None, - lama_onnx_fd_path: Optional[str] = None, - lama_onnx_fd_device: str = "gpu", - lama_onnx_fd_device_id: int = 0, - lama_onnx_fd_backend: str = "ort"): - print("Inpainter init") - self.logger = logger - self.default_backend = (default_backend or InpaintBackends.LAMA_TORCH).lower() - self.lama_device = lama_device - - # self.lama_onnx_ort_path = lama_onnx_ort_path or os.getenv("INPAINT_LAMA_ONNX", "/app/worker/models/inpainting_lama_2025jan.onnx") - self.lama_onnx_ort_path = lama_onnx_ort_path or os.getenv("INPAINT_LAMA_ONNX", "/app/worker/models/lama_fp32.onnx") - self.lama_onnx_ort_providers = lama_onnx_ort_providers - self._lama_onnx_ort = None - - self._lama_torch = None - # self.lama_onnx_fd_path = lama_onnx_fd_path or os.getenv("INPAINT_LAMA_ONNX", "/app/worker/models/inpainting_lama_2025jan.onnx") - self.lama_onnx_fd_path = lama_onnx_fd_path or os.getenv("INPAINT_LAMA_ONNX", "/app/worker/models/lama_fp32.onnx") - self.lama_onnx_fd_device = lama_onnx_fd_device - self.lama_onnx_fd_device_id = lama_onnx_fd_device_id - self.lama_onnx_fd_backend = lama_onnx_fd_backend - - self._lama_onnx_fd = None - - self._lama_torch_amp = None # ⬅️ 추가 - - self._log(f"Inpainter init: default={self.default_backend}") - - # 통일 로그 - def _log(self, msg): - if self.logger and hasattr(self.logger, "log"): self.logger.log(msg) - else: print(msg) - - - # ── 백엔드별 lazy 생성 (스레드 세이프) - def _get_lama_onnx_ort(self, backend_hint: Optional[str] = None) -> LamaOnnxORT: - if self._lama_onnx_ort is None: - with self._lock: - if self._lama_onnx_ort is None: - self._log("[Init] LamaOnnxORT") - self._lama_onnx_ort = LamaOnnxORT( - model_path=self.lama_onnx_ort_path, - logger=self.logger, - providers=self.lama_onnx_ort_providers, - backend_hint=backend_hint - ) - return self._lama_onnx_ort - - def _get_lama_onnx_fd(self): - if self._lama_onnx_fd is None: - with self._lock: - if self._lama_onnx_fd is None: - self._log("[Init] LamaOnnxFD") - self._lama_onnx_fd = LamaOnnxFD( - model_path=self.lama_onnx_fd_path, - backend=self.lama_onnx_fd_backend, - device="gpu", device_id=0, logger=self.logger - ) - return self._lama_onnx_fd - - def _get_lama_torch(self): - if self._lama_torch is None: - with self._lock: - if self._lama_torch is None: - self._log("[Init] SimpleLaMa (torch)") - # from simple_lama_inpainting.models.model import SimpleLama - # self._lama_torch = SimpleLama(device=self.lama_device) - self._lama_torch = self.get_simple_lama(device=self.lama_device) - return self._lama_torch - - @staticmethod - def get_simple_lama(device="cuda"): - global _SIMPLE_LAMA_SINGLETON - if _SIMPLE_LAMA_SINGLETON is None: - # 캐시 폴더 고정 (있으면 유지) - torch_home = "/app/torch_cache" - os.makedirs(torch_home, exist_ok=True) - os.environ.setdefault("TORCH_HOME", torch_home) - - # (선택) 네가 fp16 체크포인트를 이 경로로 마운트해두면, - # 컨테이너 환경변수 또는 여기에서 직접 지정 가능 - # 예) os.environ.setdefault("LAMA_MODEL", "/app/torch_cache/Big-LaMa.fp16.pt") - - # 순서 힌트가 필요하면(보통 필요 없음): image_first | mask_first - # os.environ.setdefault("SIMPLE_LAMA_JIT_ORDER", "mask_first") - - # 디버그(형상/순서 로그): "1"로 켜기 - # os.environ.setdefault("SIMPLE_LAMA_DEBUG_SHAPES", "0") - - # 패치된 SimpleLama는 내부에서 FP16/순서 자동 처리 - m = SimpleLama(device=torch.device(device if device != "gpu" else "cuda")) - _SIMPLE_LAMA_SINGLETON = m - return _SIMPLE_LAMA_SINGLETON - - def _get_lama_torch_amp(self): - if self._lama_torch_amp is None: - with self._lock: - if self._lama_torch_amp is None: - self._log("[Init] SimpleLaMa (torch AMP)") - # ckpt는 환경변수 SIMPLE_LAMA_CKPT 또는 simple-lama 기본 URL 자동 다운로드 - self._lama_torch_amp = LamaTorchAMP(device=self.lama_device) - return self._lama_torch_amp - - # ── Public API - def inpaint(self, - image_bgr: np.ndarray, - polygons: List[List[List[int]]], - *, - backend: Optional[str] = None, - max_side: int = 1024, - auto_opencv_if_few: bool = True, - few_threshold: int = 4, - backend_hint: Optional[str] = None) -> np.ndarray: - """ - Args: - img_bgr: 원본 BGR 이미지 (H,W,3) - polygons: [[ [x,y], [x,y], ... ], ...] - backend: 명시 시 강제 사용, None이면 default_backend - max_side: ROI 다운스케일 상한 (VRAM/속도 절충) - auto_opencv_if_few: 텍스트 박스가 적으면 OpenCV로 자동 전환 - few_threshold: '적다'의 기준 (기본 4) - """ - - backend = backend or self.default_backend - - # 1) 폴리곤 → 마스크 - mask = np.zeros(image_bgr.shape[:2], np.uint8) - for poly in polygons: - pts = np.array(poly, dtype=np.int32) - cv2.fillPoly(mask, [pts], color=255) - - # 2) 마스크 수가 적으면 OpenCV로 빠르게 - cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - if auto_opencv_if_few and len(cnts) <= few_threshold and backend != InpaintBackends.OPENCV: - backend = InpaintBackends.OPENCV - - # 3) ROI + 다운스케일 - ys, xs = np.where(mask > 0) - if len(xs) == 0 or len(ys) == 0: - return image_bgr - x1, x2, y1, y2 = xs.min(), xs.max(), ys.min(), ys.max() - roi_img = image_bgr[y1:y2+1, x1:x2+1] - roi_mask = mask[y1:y2+1, x1:x2+1] - h, w = roi_img.shape[:2] - if max(h, w) > max_side: - scale = max_side / float(max(h, w)) - roi_img_small = cv2.resize(roi_img, (int(w*scale), int(h*scale)), cv2.INTER_AREA) - roi_mask_small = cv2.resize(roi_mask, (int(w*scale), int(h*scale)), cv2.INTER_NEAREST) - else: - roi_img_small, roi_mask_small = roi_img, roi_mask - - # 4) 백엔드 호출 - if backend == InpaintBackends.OPENCV: - out_small = cv2.inpaint(roi_img_small, roi_mask_small, 3, cv2.INPAINT_TELEA) - - elif backend == InpaintBackends.LAMA_TORCH: - mdl = self._get_lama_torch() - img_pil = Image.fromarray(cv2.cvtColor(roi_img_small, cv2.COLOR_BGR2RGB)) - msk_pil = Image.fromarray(roi_mask_small, "L") # 1채널 보장 - - # 패치된 SimpleLama가 FP16/순서/채널을 내부에서 처리 - out_pil = mdl(img_pil, msk_pil) - - out_small = cv2.cvtColor(np.array(out_pil), cv2.COLOR_RGB2BGR) - - elif backend == InpaintBackends.LAMA_TORCH_AMP: - # SimpleLama(fp32 가중치) + autocast(fp16) + cuFFT pow2 안전 패딩 - mdl = self._get_lama_torch() # simple_lama_inpainting.models.model.SimpleLama (fp32) - img_roi = roi_img_small - msk_roi = roi_mask_small - - # pow2 패딩(AMP에서 FFC/cuFFT 반쯤 쓰는 모델 보호) - H, W = img_roi.shape[:2] - th, tw = _next_pow2(H), _next_pow2(W) - if (th % 8) != 0: th = ((th + 7) // 8) * 8 - if (tw % 8) != 0: tw = ((tw + 7) // 8) * 8 - - pad_info = (0,0,0,0) - if (th, tw) != (H, W): - img_roi, pad_info = _reflect_pad_to(img_roi, th, tw) - msk_roi, _ = _reflect_pad_to(msk_roi, th, tw) - - # PIL 변환 - img_pil = Image.fromarray(cv2.cvtColor(img_roi, cv2.COLOR_BGR2RGB)) - msk_pil = Image.fromarray(msk_roi, "L") - - # AMP (모델은 fp32 유지, 연산은 자동 혼합정밀) - import torch - with torch.cuda.amp.autocast(enabled=(self.lama_device in ("cuda","gpu")), dtype=torch.float16): - out_pil = mdl(img_pil, msk_pil) - - out_small = cv2.cvtColor(np.array(out_pil), cv2.COLOR_RGB2BGR) - - # 패딩 되돌리기 - if pad_info != (0,0,0,0): - out_small = _crop_by_pad(out_small, pad_info) - - - elif backend == InpaintBackends.LAMA_ONNX_FD: - mdl = self._get_lama_onnx_fd() - out_small = mdl.infer(roi_img_small, roi_mask_small) - - elif backend == InpaintBackends.LAMA_ONNX_ORT: - mdl = self._get_lama_onnx_ort(backend_hint=backend_hint) # "cuda"/"cpu" 힌트 - out_small = mdl.infer(roi_img_small, roi_mask_small) - else: - # 안전폴백 - out_small = cv2.inpaint(roi_img_small, roi_mask_small, 3, cv2.INPAINT_TELEA) - - # 5) 업스케일 + 합성 - if out_small.shape[:2] != roi_img.shape[:2]: - out_roi = cv2.resize(out_small, (roi_img.shape[1], roi_img.shape[0]), cv2.INTER_CUBIC) - else: - out_roi = out_small - - result = image_bgr.copy() - m = (roi_mask > 0)[:, :, None] - result[y1:y2+1, x1:x2+1] = np.where(m, out_roi, roi_img) - return result - -import torch, numpy as np -import torch.nn.functional as F -from PIL import Image - -class _SimpleLamaFPCompat: - """ - FP16 TorchScript / state_dict 체크포인트를 SimpleLama처럼 호출 가능하게 래핑. - __call__(image_pil|ndarray, mask_pil|ndarray) -> PIL.Image - - 가중치가 fp16이면 입력도 fp16으로 자동 캐스팅(AMP 포함) - - JIT 빌드(enesmsahin big-lama JIT)는 (mask, image) 순서를 기대 - 원본 SimpleLama는 (image, mask) 순서 → is_jit 플래그로 분기 - """ - def __init__(self, model, device="cuda", is_jit=True, is_fp16=True): - self.model = model.eval() - self.device = torch.device("cuda" if device in ("cuda","gpu") else device) - self.is_jit = is_jit - self.is_fp16 = is_fp16 - self.model.to(self.device) - if self.is_fp16: - self.model.half() - - @classmethod - def load(cls, ckpt_path: str, device="cuda"): - # 1) TorchScript 시도 - try: - m = torch.jit.load(ckpt_path, map_location="cpu") - # fp16 여부 대략 추정 (파라미터가 없으면 fp16 JIT로 가정) - is_fp16 = True - try: - p = next(m.parameters()) - is_fp16 = (p.dtype == torch.float16) - except StopIteration: - pass - return cls(m, device=device, is_jit=True, is_fp16=is_fp16) - except Exception: - pass - - # 2) state_dict 시도 (원 SimpleLama 구조 필요) - from simple_lama_inpainting import SimpleLama - base = SimpleLama(device="cpu") - sd = torch.load(ckpt_path, map_location="cpu") - core = getattr(base, "model", base) - core.load_state_dict(sd, strict=False) - is_fp16 = any(p.dtype == torch.float16 for p in core.parameters()) - return cls(base, device=device, is_jit=False, is_fp16=is_fp16) - - # ---------- 유틸 ---------- - @staticmethod - def _to_pil(x, mode=None): - if isinstance(x, Image.Image): - return x.convert(mode) if mode else x - if isinstance(x, np.ndarray): - if mode == "L": - if x.ndim == 2: - return Image.fromarray(x.astype(np.uint8), "L") - return Image.fromarray(x[..., 0].astype(np.uint8), "L") - if x.ndim == 3 and x.shape[2] == 3: # BGR -> RGB - x = x[..., ::-1] - return Image.fromarray(x.astype(np.uint8), "RGB") - raise TypeError(f"Unsupported input type: {type(x)}") - - @staticmethod - def _to_numpy_rgb(img: Image.Image) -> np.ndarray: - if img.mode != "RGB": - img = img.convert("RGB") - arr = np.asarray(img, dtype=np.uint8) - if not arr.flags['C_CONTIGUOUS']: - arr = np.ascontiguousarray(arr) - return arr - - @staticmethod - def _to_numpy_mask1(mask: Image.Image) -> np.ndarray: - if mask.mode != "L": - mask = mask.convert("L") - m = np.asarray(mask, dtype=np.uint8) - if not m.flags['C_CONTIGUOUS']: - m = np.ascontiguousarray(m) - return (m > 127).astype(np.float32) # 0/1 - - @staticmethod - def _pad8_reflect(t: torch.Tensor, target_dtype: torch.dtype): - h, w = t.shape[-2:] - nh = (h + 7) // 8 * 8 - nw = (w + 7) // 8 * 8 - if nh == h and nw == w: - return t, (0,0,0,0) - ph, pw = nh - h, nw - w - t32 = t.to(torch.float32) - t32 = F.pad(t32, (0, pw, 0, ph), mode="reflect") # reflect는 fp16 미지원 버전 존재 - return t32.to(target_dtype), (0, pw, 0, ph) - - # ---------- 호출 ---------- - @torch.inference_mode() - def __call__(self, image: Image.Image, mask: Image.Image) -> Image.Image: - # 모델 dtype/디바이스 - try: - p0 = next(self.model.parameters()) - target_dtype = p0.dtype - device = p0.device - except StopIteration: - target_dtype = torch.float16 if self.device.type == "cuda" and self.is_fp16 else torch.float32 - device = self.device - - # numpy → tensor - img_np = self._to_numpy_rgb(self._to_pil(image, "RGB")) # H,W,3 uint8 - msk_np = self._to_numpy_mask1(self._to_pil(mask, "L")) # H,W float32 {0,1} - - img_t = torch.from_numpy(img_np).permute(2,0,1).unsqueeze(0).to(device=device, dtype=torch.float32) / 255.0 # 1,3,H,W - msk_t = torch.from_numpy(msk_np).unsqueeze(0).unsqueeze(0).to(device=device, dtype=torch.float32) # 1,1,H,W - - # pad (fp32) → target dtype - img_t, pad_hw = self._pad8_reflect(img_t, torch.float32) - msk_t, _ = self._pad8_reflect(msk_t, torch.float32) - img_t = img_t.to(dtype=target_dtype) - msk_t = msk_t.to(dtype=target_dtype) - - # 호출 순서 분기 - if self.is_jit: - # JIT big-lama는 (mask, image) 순서 - out = self.model(msk_t, img_t) - else: - # 원 SimpleLama는 (image, mask) 순서 - out = self.model(img_t, msk_t) - - # unpad 및 to PIL - _, _, H, W = img_t.shape - _, pw, _, ph = pad_hw - if ph or pw: - out = out[..., :H-ph, :W-pw] - out = out.clamp(0, 1).to(torch.float32) - out_np = (out[0].permute(1,2,0).cpu().numpy() * 255.0 + 0.5).astype(np.uint8) - return Image.fromarray(out_np, "RGB") - - - - - - - - - - - - - - -import os, torch, torch.nn.functional as F, numpy as np -from PIL import Image - -def _to_pil_rgb(x): - if isinstance(x, Image.Image): - return x.convert("RGB") - if isinstance(x, np.ndarray): - if x.ndim == 3 and x.shape[2] == 3: - # BGR -> RGB - x = x[..., ::-1] - return Image.fromarray(x.astype(np.uint8)).convert("RGB") - raise TypeError(f"unsupported image type: {type(x)}") - -def _to_pil_maskL(x): - if isinstance(x, Image.Image): - return x.convert("L") - if isinstance(x, np.ndarray): - if x.ndim == 3: - x = x[..., 0] - return Image.fromarray(x.astype(np.uint8)).convert("L") - raise TypeError(f"unsupported mask type: {type(x)}") - -def _pad_mod8_reflect_nchw(t: torch.Tensor): - # t: NCHW (float32) - _, _, h, w = t.shape - nh = (h + 7) // 8 * 8 - nw = (w + 7) // 8 * 8 - if nh == h and nw == w: - return t, (0,0,0,0) - ph, pw = nh - h, nw - w - top = ph // 2; bottom = ph - top - left = pw // 2; right = pw - left - t32 = F.pad(t, (left, right, top, bottom), mode="reflect") - return t32, (top, bottom, left, right) - -def _crop_from_pad_nchw(t: torch.Tensor, pad): - top, bottom, left, right = pad - if top==bottom==left==right==0: - return t - return t[..., top:t.shape[-2]-bottom, left:t.shape[-1]-right] - -def _detect_arg_order(script_module) -> str: - """ - TorchScript LaMa(JIT)의 forward 인자 순서 추정. - - enesmsahin big-lama.pt: (mask, image) 가 일반적. - - 안전하게 스키마/코드에서 먼저 감지, 실패 시 'mask_im' 기본. - """ - try: - sch = str(getattr(script_module, "forward").schema).lower() - if "tensor mask" in sch and "tensor image" in sch: - return "mask_im" if sch.index("tensor mask") < sch.index("tensor image") else "im_mask" - except Exception: - pass - code = getattr(script_module, "code", "") - if isinstance(code, str): - if "forward(mask, image" in code.replace(" ", ""): - return "mask_im" - if "forward(image, mask" in code.replace(" ", ""): - return "im_mask" - # 기본값 - return os.getenv("SIMPLE_LAMA_ARG_ORDER", "mask_im").lower() - -class LamaTorchAMP: - """ - - 가중치: FP32 유지 - - 추론: torch.cuda.amp.autocast(dtype=torch.float16) - - 입력: RGB/0..1, mask 1ch/0..1, NCHW, mod=8 reflect pad - """ - def __init__(self, device="cuda", ckpt_path: str|None=None): - self.device = torch.device("cuda" if device in ("cuda","gpu") and torch.cuda.is_available() else "cpu") - - # 체크포인트 경로: 우선순위 ENV → 인자 → simple-lama 기본 URL 다운로드 - if ckpt_path is None: - ckpt_path = os.getenv("SIMPLE_LAMA_CKPT") - if ckpt_path is None or not os.path.isfile(ckpt_path): - # simple-lama의 다운로드 유틸 재사용 - from simple_lama_inpainting.utils.util import download_model - from simple_lama_inpainting.models.model import LAMA_MODEL_URL - ckpt_path = download_model(LAMA_MODEL_URL) - - m = torch.jit.load(ckpt_path, map_location="cpu").eval() - - - try: - m = m.to(dtype=torch.float32) - except Exception: - # TorchScript에서 .to 실패하는 경우 수동 승격 - for p in m.parameters(recurse=True): - if p.dtype != torch.float32: - p.data = p.data.float() - for b in m.buffers(recurse=True): - if b.dtype != torch.float32: - b.data = b.data.float() - - m = m.to(self.device) # FP32 유지 - self.model = m - self.device = torch.device(device if device != "gpu" else "cuda") - self.arg_order = _detect_arg_order(m) - if self.device.type == "cuda": - torch.backends.cudnn.benchmark = True - - @torch.inference_mode() - def __call__(self, image: Image.Image|np.ndarray, mask: Image.Image|np.ndarray) -> Image.Image: - im = _to_pil_rgb(image) - mk = _to_pil_maskL(mask) - - im_np = np.asarray(im, dtype=np.uint8) - mk_np = np.asarray(mk, dtype=np.uint8) - - im_t = torch.from_numpy(im_np).permute(2,0,1).unsqueeze(0).to(self.device, dtype=torch.float32) / 255.0 # 1,3,H,W - mk_f = (mk_np > 127).astype(np.float32) - mk_t = torch.from_numpy(mk_f).unsqueeze(0).unsqueeze(0).to(self.device, dtype=torch.float32) # 1,1,H,W - - # mod=8 pad (float32에서) - im_t, pad = _pad_mod8_reflect_nchw(im_t) - mk_t, _ = _pad_mod8_reflect_nchw(mk_t) - - # AMP 추론 - if self.device.type == "cuda": - with torch.autocast(device_type="cuda", dtype=torch.float16): - out = self.model(mk_t, im_t) if self.arg_order == "mask_im" else self.model(im_t, mk_t) - else: - out = self.model(mk_t, im_t) if self.arg_order == "mask_im" else self.model(im_t, mk_t) - - out = out[0] if isinstance(out, (list, tuple)) else out # NCHW - out = _crop_from_pad_nchw(out, pad).clamp(0,1).to(torch.float32) - out_np = (out[0].permute(1,2,0).cpu().numpy() * 255.0 + 0.5).astype(np.uint8) - return Image.fromarray(out_np, "RGB") diff --git a/worker/mask_module_for_paddle.py b/worker/mask_module_for_paddle.py index b9444dd..c20ca06 100644 --- a/worker/mask_module_for_paddle.py +++ b/worker/mask_module_for_paddle.py @@ -46,11 +46,16 @@ class MaskModule: ocr_results: List[Dict], expansion_size: int = 6, blur_size: int = 7, - mask_option: str = "basic" + mask_option: str = "basic", + # 🔥 ROI 전용 옵션 추가 + for_roi_processing: bool = False ) -> "np.ndarray | None": """ BGR ndarray와 OCR 결과를 직접 받아 마스크 np.ndarray 반환 (디스크 I/O 없음) + + Args: + for_roi_processing: True면 순수 마스크만 생성 (후처리 없음) """ if image is None or image.size == 0: self.logger.error("ndarray 이미지가 비었습니다.") @@ -63,9 +68,32 @@ class MaskModule: poly = res.get("polygon") if not poly: continue - expanded = self.expand_polygon(poly, offset=5) + # 🔥 ROI 처리용이면 적절한 확장 적용 (후처리 없는 대신 좀 더 확장) + if for_roi_processing: + expanded = self.expand_polygon(poly, offset=8) # 3 → 8로 증가 + else: + expanded = self.expand_polygon(poly, offset=5) cv2.fillPoly(mask, [expanded], 255) + # 🔥 ROI 처리용이면 최소한의 후처리만 적용 + if for_roi_processing: + # 🔥 강화된 후처리: 텍스트 잔상 방지 + kernel_small = np.ones((3, 3), np.uint8) + kernel_large = np.ones((5, 5), np.uint8) + + # 1단계: 작은 노이즈 제거 + mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel_small) + + # 2단계: 텍스트 경계 완전 커버 (강화된 팽창) + mask = cv2.dilate(mask, kernel_large, iterations=1) + + # 3단계: 추가 안전 마진 + mask = cv2.dilate(mask, kernel_small, iterations=1) + + self.logger.log("🔧 ROI용 강화 마스크 생성 (잔상 방지 처리)", level=logging.INFO) + return mask + + # 기존 방식 (풀프레임용) processed_mask = self.process_mask(mask, expansion_size, blur_size) return processed_mask diff --git a/worker/roi_inpainting_module.py b/worker/roi_inpainting_module.py index 5d98546..5acc449 100644 --- a/worker/roi_inpainting_module.py +++ b/worker/roi_inpainting_module.py @@ -10,8 +10,12 @@ ROI 기반 인페인팅 모듈 import cv2 import numpy as np import logging +import torch +import time +import gc from typing import List, Tuple, Optional, Any, Dict from simple_lama_inpainting import SimpleLama +from concurrent.futures import ThreadPoolExecutor class ROIInpaintingModule: @@ -25,6 +29,23 @@ class ROIInpaintingModule: self.logger = logger or self._create_default_logger() self.simple_lama = None + # 🔥 형상 최적화를 위한 버킷 시스템 + self.performance_buckets = { + # 일반적인 웹툰/만화 크기들을 64배수로 정규화 + (896, 1152): "webtoon_portrait", # 790×1053, 750×917 등 + (832, 1024): "webtoon_standard", # 800×800, 790×790 등 + (896, 2048): "webtoon_long", # 790×1959, 997×2000 등 + (1280, 768): "landscape_wide", # 1242×698 등 + (832, 512): "landscape_standard", # 800×450, 790×409 등 + (640, 640): "square_small", # 587×587 등 + } + + # 🔥 성능 히스토리 (버킷별 실측 시간 추적) + self.bucket_performance_history = {} + + # 🔥 cuDNN 최적화 설정 + self._setup_cudnn_optimization() + # 기본 설정값 self.default_config = { 'min_component_area': 100, # 최소 컴포넌트 크기 @@ -51,6 +72,14 @@ class ROIInpaintingModule: 'max_aspect_ratio': 8.0, # 최대 허용 종횡비 'use_64_alignment': False, # 64배수 정렬 사용 여부 'batch_processing_threshold': 256 * 256, # 작은 ROI 배치 처리 임계값 (px) + # 🔥 인페인팅 품질 개선 설정 + 'mask_dilation_kernel': 3, # 마스크 팽창 커널 크기 + 'mask_erosion_kernel': 2, # 마스크 침식 커널 크기 + 'mask_blur_kernel': 5, # 마스크 블러 커널 크기 + 'enable_mask_refinement': True, # 마스크 정제 활성화 + 'feather_blend_size': 10, # 부드러운 블렌딩을 위한 페더 크기 + 'blend_mode': 'advanced', # 'simple' 또는 'advanced' + 'context_expansion_ratio': 0.3, # 컨텍스트 확장 비율 (더 넓은 영역으로 인페인팅) } self.logger.log("ROI 인페인팅 모듈 초기화 완료", level=logging.INFO) @@ -69,19 +98,21 @@ class ROIInpaintingModule: self.logger.log("SimpleLama 인스턴스 생성 완료", level=logging.INFO) return self.simple_lama - def find_mask_components(self, mask: np.ndarray, min_area: int = None) -> List[Tuple[int, int, int, int]]: + def find_mask_components(self, mask: np.ndarray, config: Dict[str, Any] = None) -> List[Tuple[int, int, int, int]]: """ 마스크에서 연결된 컴포넌트들을 찾고 바운딩 박스 반환 Args: mask: 이진 마스크 (0 또는 255) - min_area: 최소 컴포넌트 면적 + config: 설정 딕셔너리 Returns: List of (x1, y1, x2, y2) 바운딩 박스 """ - if min_area is None: - min_area = self.default_config['min_component_area'] + if config is None: + config = self.default_config + + min_area = config.get('min_component_area', self.default_config['min_component_area']) # 연결된 컴포넌트 분석 num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats( @@ -225,24 +256,37 @@ class ROIInpaintingModule: return False - def create_blend_mask(self, roi_mask: np.ndarray) -> np.ndarray: + def create_blend_mask(self, roi_mask: np.ndarray, config: Dict[str, Any] = None) -> np.ndarray: """ - 부드러운 블렌딩을 위한 마스크 생성 + 🔥 부드러운 블렌딩을 위한 고급 마스크 생성 Args: roi_mask: ROI 영역 마스크 + config: 설정 오버라이드 Returns: 블렌딩용 마스크 (0~1 범위) """ + if config is None: + config = self.default_config + + blend_mode = config.get('blend_mode', 'simple') + + if blend_mode == 'simple': + return self._create_simple_blend_mask(roi_mask, config) + else: + return self._create_advanced_blend_mask(roi_mask, config) + + def _create_simple_blend_mask(self, roi_mask: np.ndarray, config: Dict[str, Any]) -> np.ndarray: + """기본 블렌딩 마스크 (기존 방식)""" blend_mask = roi_mask.astype(np.float32) / 255.0 # 가우시안 블러로 부드러운 전환 kernel_size = min( - self.default_config['max_blend_kernel'], + config['max_blend_kernel'], max( - self.default_config['min_blend_kernel'], - min(roi_mask.shape[:2]) // self.default_config['blend_kernel_ratio'] + config['min_blend_kernel'], + min(roi_mask.shape[:2]) // config['blend_kernel_ratio'] ) ) if kernel_size % 2 == 0: @@ -250,25 +294,76 @@ class ROIInpaintingModule: blend_mask = cv2.GaussianBlur(blend_mask, (kernel_size, kernel_size), 0) return np.expand_dims(blend_mask, axis=2) + + def _create_advanced_blend_mask(self, roi_mask: np.ndarray, config: Dict[str, Any]) -> np.ndarray: + """🔥 고급 블렌딩 마스크 (페더링 + 거리 변환 기반)""" + blend_mask = roi_mask.astype(np.float32) / 255.0 + + # 🔥 1단계: 거리 변환을 이용한 페더링 + feather_size = config.get('feather_blend_size', 10) + + # 마스크의 경계에서 거리 계산 + dist_transform = cv2.distanceTransform( + (blend_mask * 255).astype(np.uint8), + cv2.DIST_L2, 5 + ) + + # 페더링 적용 + if feather_size > 0: + # 경계에서 페더 크기만큼 부드럽게 감소 + feathered_mask = np.clip(dist_transform / feather_size, 0, 1) + blend_mask = np.minimum(blend_mask, feathered_mask) + + # 🔥 2단계: 가우시안 블러로 추가 부드러움 + kernel_size = min( + config['max_blend_kernel'], + max( + config['min_blend_kernel'], + min(roi_mask.shape[:2]) // config['blend_kernel_ratio'] + ) + ) + if kernel_size % 2 == 0: + kernel_size += 1 + + blend_mask = cv2.GaussianBlur(blend_mask, (kernel_size, kernel_size), 0) + + # 🔥 3단계: 경계 강화 (중앙은 1.0 유지, 경계만 부드럽게) + core_mask = cv2.erode((roi_mask > 128).astype(np.uint8), np.ones((3, 3), np.uint8), iterations=2) + core_mask = core_mask.astype(np.float32) + + # 코어 영역은 완전히 1.0, 경계 영역만 블렌딩 + final_mask = np.maximum(blend_mask, core_mask) + + self.logger.log( + f"🔧 고급 블렌딩 마스크 생성: feather={feather_size}px, kernel={kernel_size}px", + level=logging.INFO + ) + + return np.expand_dims(final_mask, axis=2) def process_roi(self, image: np.ndarray, mask: np.ndarray, - roi_bbox: Tuple[int, int, int, int]) -> Tuple[np.ndarray, bool]: + roi_bbox: Tuple[int, int, int, int], + config: Dict[str, Any] = None) -> Tuple[np.ndarray, bool]: """ - 단일 ROI 영역 인페인팅 처리 + 🔥 단일 ROI 영역 인페인팅 처리 (마스크 정제 문제 해결 버전) Args: image: 원본 이미지 mask: 이진 마스크 roi_bbox: ROI 바운딩 박스 + config: 설정 오버라이드 Returns: (처리된 ROI 이미지, 성공 여부) """ + if config is None: + config = self.default_config + x1, y1, x2, y2 = roi_bbox - # ROI 추출 - roi_image = image[y1:y2, x1:x2] - roi_mask = mask[y1:y2, x1:x2] + # 🔥 정확한 ROI-마스크 매칭 추출 + roi_image = image[y1:y2, x1:x2].copy() + roi_mask = mask[y1:y2, x1:x2].copy() # ROI 크기 로깅 roi_size = (x2-x1) * (y2-y1) @@ -286,8 +381,17 @@ class ROIInpaintingModule: self.logger.log("ROI 마스크가 비어있음, 원본 반환", level=logging.INFO) return roi_image, True + # 🔥 마스크 정제 여부에 따라 분기 + if config.get('enable_mask_refinement', False): + # 마스크 정제 활성화된 경우만 추가 처리 + refined_mask = self.refine_mask(roi_mask, config) + else: + # 🔥 마스크 모듈에서 이미 최적화된 마스크 그대로 사용 + refined_mask = roi_mask + self.logger.log("🔧 마스크 모듈 최적화 마스크 사용 (추가 정제 생략)", level=logging.INFO) + # 🔥 극단적 종횡비 전처리 - preprocessed_image, preprocessed_mask, preprocess_info = self.preprocess_extreme_aspect_ratio(roi_image, roi_mask) + preprocessed_image, preprocessed_mask, preprocess_info = self.preprocess_extreme_aspect_ratio(roi_image, refined_mask) # 🔥 크기 정규화 (8의 배수로 맞춤) normalized_image, normalized_mask, normalized_size = self.normalize_roi_size(preprocessed_image, preprocessed_mask) @@ -322,8 +426,8 @@ class ROIInpaintingModule: # 강제 리사이즈 (마지막 수단) final_result = cv2.resize(final_result, (roi_image.shape[1], roi_image.shape[0])) - # 부드러운 블렌딩 - blend_mask = self.create_blend_mask(roi_mask) + # 🔥 블렌딩 (원본 roi_mask 사용 - 왜곡 방지) + blend_mask = self.create_blend_mask(roi_mask, config) # refined_mask 대신 원본 사용 blended_roi = (final_result * blend_mask + roi_image * (1 - blend_mask)).astype(np.uint8) return blended_roi, True @@ -426,24 +530,27 @@ class ROIInpaintingModule: return restored def scale_image_if_needed(self, image: np.ndarray, mask: np.ndarray, - max_size: int, interpolation: int = cv2.INTER_AREA) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int], bool]: + config: Dict[str, Any]) -> Tuple[np.ndarray, np.ndarray, Dict]: """ 이미지가 최대 크기를 초과하면 축소 Args: image: 입력 이미지 mask: 입력 마스크 - max_size: 최대 크기 (긴 변 기준) - interpolation: 보간법 + config: 설정 딕셔너리 Returns: - (스케일된 이미지, 스케일된 마스크, 원본 크기, 스케일 적용 여부) + (스케일된 이미지, 스케일된 마스크, 스케일 정보) """ original_h, original_w = image.shape[:2] max_dimension = max(original_h, original_w) + # 🔥 설정에서 최대 크기 가져오기 + max_size = config.get('max_image_size', 2048) + interpolation = config.get('scale_interpolation', cv2.INTER_AREA) + if max_dimension <= max_size: - return image, mask, (original_h, original_w), False + return image, mask, {'scaled': False, 'original_size': (original_h, original_w)} # 스케일 계산 scale_factor = max_size / max_dimension @@ -459,28 +566,35 @@ class ROIInpaintingModule: level=logging.INFO ) - return scaled_image, scaled_mask, (original_h, original_w), True + return scaled_image, scaled_mask, { + 'scaled': True, + 'original_size': (original_h, original_w), + 'scale_factor': scale_factor + } def restore_original_scale(self, processed_image: np.ndarray, - original_size: Tuple[int, int], - interpolation: int = cv2.INTER_CUBIC) -> np.ndarray: + scale_info: Dict) -> np.ndarray: """ 처리된 이미지를 원본 크기로 복원 Args: processed_image: 처리된 이미지 - original_size: 원본 크기 (h, w) - interpolation: 보간법 + scale_info: 스케일 정보 딕셔너리 Returns: 원본 크기로 복원된 이미지 """ - original_h, original_w = original_size + if not scale_info.get('scaled', False): + return processed_image + + original_h, original_w = scale_info['original_size'] current_h, current_w = processed_image.shape[:2] if current_h == original_h and current_w == original_w: return processed_image + # 고품질 보간법으로 업스케일 + interpolation = cv2.INTER_CUBIC restored = cv2.resize(processed_image, (original_w, original_h), interpolation=interpolation) self.logger.log( @@ -556,287 +670,87 @@ class ROIInpaintingModule: def inpaint_with_roi(self, image: np.ndarray, mask: np.ndarray, config: Dict[str, Any] = None) -> np.ndarray: """ - ROI 기반 인페인팅 메인 함수 - - Args: - image: 입력 이미지 (BGR) - mask: 이진 마스크 (0 또는 255) - config: 설정 오버라이드 - - Returns: - 인페인팅된 이미지 + 🔥 ROI 기반 인페인팅 처리 (형상 최적화 버전) """ - import time + start_time = time.time() - # 설정 병합 - effective_config = {**self.default_config} - if config: - effective_config.update(config) + if config is None: + config = self.default_config + effective_config = {**self.default_config, **config} - # 🔥 전체 처리 시간 측정 시작 - total_start_time = time.time() - - # 🔥 이미지 크기 제한 및 스케일링 - scale_start_time = time.time() - scaled_image, scaled_mask, original_size, was_scaled = self.scale_image_if_needed( - image, mask, effective_config['max_image_size'], effective_config['scale_interpolation'] - ) - scale_time = time.time() - scale_start_time - self.logger.log(f"[PERF] 이미지 스케일링 시간: {scale_time:.3f}초", level=logging.INFO) - - # 마스크를 이진 마스크로 정규화 - mask_prep_start_time = time.time() - binary_mask = (scaled_mask > 128).astype(np.uint8) * 255 - mask_prep_time = time.time() - mask_prep_start_time - - # 🔥 마스크 컴포넌트 분석 시간 측정 - component_start_time = time.time() - components = self.find_mask_components( - binary_mask, - min_area=effective_config['min_component_area'] - ) - component_time = time.time() - component_start_time - self.logger.log(f"[PERF] 컴포넌트 분석 시간: {component_time:.3f}초 ({len(components)}개 발견)", level=logging.INFO) - - if not components: - # 마스크가 없으면 원본 반환 - self.logger.log("마스크 컴포넌트 없음, 원본 반환", level=logging.INFO) - result = scaled_image.copy() - # 🔥 크기 제한 적용된 경우 원본 크기로 복원 - if was_scaled: - restore_start_time = time.time() - result = self.restore_original_scale(result, original_size, effective_config['upscale_interpolation']) - restore_time = time.time() - restore_start_time - self.logger.log(f"[PERF] 크기 복원 시간: {restore_time:.3f}초", level=logging.INFO) + try: + # 🔥 1단계: 형상 버킷 최적화 + bucket_start_time = time.time() + optimized_image, optimized_mask, bucket_info = self.apply_optimal_padding(image, mask) + bucket_time = time.time() - bucket_start_time + bucket_name = bucket_info['bucket_name'] - total_time = time.time() - total_start_time - self.logger.log(f"[PERF] 전체 처리 시간: {total_time:.3f}초 (마스크 없음)", level=logging.INFO) - return result - - # 🔥 적응적 처리 전략 선택 - strategy = self.choose_processing_strategy( - scaled_image.shape, components, effective_config - ) - - # 전략에 따른 처리 분기 - if strategy == "full": - # 전체 이미지 처리 (크기 정규화 적용) - self.logger.log("적응적 전략: 전체 이미지 처리", level=logging.INFO) + # 이미지 크기 스케일링 (기존 로직) + scale_start_time = time.time() + scaled_image, scaled_mask, scale_info = self.scale_image_if_needed(optimized_image, optimized_mask, effective_config) + scale_time = time.time() - scale_start_time - # 🔥 전체 처리 시간 측정 - full_start_time = time.time() + # 🔥 2단계: 적응적 전략 선택 (성능 히스토리 고려) + strategy_start_time = time.time() + components = self.find_mask_components(scaled_mask, effective_config) + strategy = self.choose_processing_strategy(scaled_image.shape, components, effective_config) - # 크기 정규화 - norm_start_time = time.time() - normalized_image, normalized_mask, normalized_original_size = self.normalize_roi_size(scaled_image, binary_mask) - norm_time = time.time() - norm_start_time - self.logger.log(f"[PERF] 전체이미지 정규화 시간: {norm_time:.3f}초", level=logging.INFO) + # 🔥 성능 히스토리 기반 전략 재조정 + total_pixels = scaled_image.shape[0] * scaled_image.shape[1] + predicted_time = 0.3 + (total_pixels / 1000000) * 0.2 # 간단한 예측 모델 - # SimpleLama 처리 - lama_start_time = time.time() - simple_lama = self._get_simple_lama() - # 🔥 BGR → RGB 변환 후 SimpleLama 호출 - normalized_image_rgb = cv2.cvtColor(normalized_image, cv2.COLOR_BGR2RGB) - result_pil = simple_lama(normalized_image_rgb, normalized_mask) - result = np.array(result_pil) - result_bgr = cv2.cvtColor(result, cv2.COLOR_RGB2BGR) - lama_time = time.time() - lama_start_time - self.logger.log(f"[PERF] 전체이미지 SimpleLama 처리 시간: {lama_time:.3f}초", level=logging.INFO) + if strategy == "full" and self.should_fallback_to_roi(bucket_name, predicted_time): + strategy = "roi" + self.logger.log(f"🔄 전략 변경: full → roi (성능 히스토리 기반)", level=logging.WARNING) - # 원본 크기로 복원 - restore_norm_start_time = time.time() - restored_result = self.restore_roi_size(result_bgr, normalized_original_size) - restore_norm_time = time.time() - restore_norm_start_time - self.logger.log(f"[PERF] 정규화 복원 시간: {restore_norm_time:.3f}초", level=logging.INFO) + strategy_time = time.time() - strategy_start_time - # 🔥 크기 제한 적용된 경우 원본 크기로 복원 - if was_scaled: - restore_scale_start_time = time.time() - restored_result = self.restore_original_scale( - restored_result, original_size, effective_config['upscale_interpolation'] - ) - restore_scale_time = time.time() - restore_scale_start_time - self.logger.log(f"[PERF] 스케일 복원 시간: {restore_scale_time:.3f}초", level=logging.INFO) - - full_time = time.time() - full_start_time - total_time = time.time() - total_start_time - self.logger.log(f"[PERF] 전체이미지 처리 시간: {full_time:.3f}초", level=logging.INFO) - self.logger.log(f"[PERF] 총 처리 시간: {total_time:.3f}초", level=logging.INFO) - - return restored_result - - # 🔥 ROI 병합 시간 측정 - merge_start_time = time.time() - merged_components = self.merge_nearby_components( - components, - merge_distance=effective_config['merge_distance'] - ) - merge_time = time.time() - merge_start_time - self.logger.log(f"[PERF] 컴포넌트 병합 시간: {merge_time:.3f}초 ({len(components)}→{len(merged_components)})", level=logging.INFO) - - result_image = scaled_image.copy() - - # 🔥 전략에 따른 ROI 처리 분기 - if strategy == "roi_parallel": - self.logger.log("적응적 전략: ROI 병렬 전처리", level=logging.INFO) - # 🔥 ROI 전처리를 병렬로 수행 - prepared_rois = self.prepare_rois_parallel(scaled_image, binary_mask, merged_components, effective_config) - - # 🔥 병렬 처리된 ROI 처리 시간 측정 - total_roi_time = 0 - successful_rois = 0 - for roi_info in prepared_rois: - if roi_info['skip_processing']: - self.logger.log(f"[PERF] ROI {roi_info['idx']+1} 건너뛰기 (빈 마스크)", level=logging.INFO) - continue - - roi_start_time = time.time() - - # SimpleLama 처리 - lama_start_time = time.time() - simple_lama = self._get_simple_lama() - # 🔥 BGR → RGB 변환 후 SimpleLama 호출 - roi_normalized_rgb = cv2.cvtColor(roi_info['normalized_image'], cv2.COLOR_BGR2RGB) - roi_result_pil = simple_lama(roi_normalized_rgb, roi_info['normalized_mask']) - roi_result = np.array(roi_result_pil) - roi_result_bgr = cv2.cvtColor(roi_result, cv2.COLOR_RGB2BGR) - lama_time = time.time() - lama_start_time - - # 원본 크기로 복원 - restore_roi_start_time = time.time() - restored_roi = self.restore_roi_size(roi_result_bgr, roi_info['original_size']) - restore_roi_time = time.time() - restore_roi_start_time - - # 부드러운 블렌딩 - blend_start_time = time.time() - blend_mask = self.create_blend_mask(roi_info['roi_mask']) - blended_roi = (restored_roi * blend_mask + roi_info['roi_image'] * (1 - blend_mask)).astype(np.uint8) - blend_time = time.time() - blend_start_time - - # 원본 이미지에 적용 - x1, y1, x2, y2 = roi_info['roi_bbox'] - result_image[y1:y2, x1:x2] = blended_roi - - roi_total_time = time.time() - roi_start_time - total_roi_time += roi_total_time - successful_rois += 1 - - self.logger.log( - f"[PERF] ROI {roi_info['idx']+1} 처리 완료: {roi_total_time:.3f}초 " - f"(SimpleLama:{lama_time:.3f}s, 복원:{restore_roi_time:.3f}s, 블렌딩:{blend_time:.3f}s)", - level=logging.INFO - ) - else: - # strategy == "roi" : 순차 ROI 처리 - self.logger.log("적응적 전략: ROI 순차 처리", level=logging.INFO) - total_roi_time = 0 - successful_rois = 0 - - for i, comp_bbox in enumerate(merged_components): - roi_start_time = time.time() - - # ROI 영역 확장 - expand_start_time = time.time() - roi_bbox = self.expand_roi( - comp_bbox, scaled_image.shape, - margin_ratio=effective_config['margin_ratio'] - ) - expand_time = time.time() - expand_start_time - - x1, y1, x2, y2 = roi_bbox - - # ROI 처리 - process_start_time = time.time() - processed_roi, success = self.process_roi(scaled_image, binary_mask, roi_bbox) - process_time = time.time() - process_start_time - - if success: - blend_start_time = time.time() - result_image[y1:y2, x1:x2] = processed_roi - blend_time = time.time() - blend_start_time - successful_rois += 1 - - roi_total_time = time.time() - roi_start_time - total_roi_time += roi_total_time - self.logger.log( - f"[PERF] ROI {i+1} 처리 완료: {roi_total_time:.3f}초 " - f"(확장:{expand_time:.3f}s, 처리:{process_time:.3f}s, 블렌딩:{blend_time:.3f}s)", - level=logging.INFO - ) - else: - roi_total_time = time.time() - roi_start_time - self.logger.log(f"[PERF] ROI {i+1} 처리 실패: {roi_total_time:.3f}초", level=logging.WARNING) - - self.logger.log(f"[PERF] 전체 ROI 처리 시간: {total_roi_time:.3f}초", level=logging.INFO) - self.logger.log(f"ROI 처리 완료: {successful_rois}/{len(merged_components)} 성공", level=logging.INFO) - - # 🔥 ROI 처리가 모두 실패한 경우 전체 이미지 처리로 폴백 - if successful_rois == 0 and len(merged_components) > 0: - self.logger.log("모든 ROI 처리 실패, 전체 이미지 처리로 폴백", level=logging.WARNING) - fallback_start_time = time.time() - - try: - # 크기 정규화 적용 - normalized_image, normalized_mask, normalized_original_size = self.normalize_roi_size(scaled_image, binary_mask) - - simple_lama = self._get_simple_lama() - # 🔥 BGR → RGB 변환 후 SimpleLama 호출 - normalized_image_rgb = cv2.cvtColor(normalized_image, cv2.COLOR_BGR2RGB) - result_pil = simple_lama(normalized_image_rgb, normalized_mask) - result = np.array(result_pil) - result_bgr = cv2.cvtColor(result, cv2.COLOR_RGB2BGR) - - # 원본 크기로 복원 - restored_result = self.restore_roi_size(result_bgr, normalized_original_size) - - # 🔥 크기 제한 적용된 경우 원본 크기로 복원 - if was_scaled: - restored_result = self.restore_original_scale( - restored_result, original_size, effective_config['upscale_interpolation'] - ) - - fallback_time = time.time() - fallback_start_time - total_time = time.time() - total_start_time - self.logger.log(f"[PERF] 폴백 처리 시간: {fallback_time:.3f}초", level=logging.INFO) - self.logger.log(f"[PERF] 총 처리 시간: {total_time:.3f}초 (폴백)", level=logging.INFO) - - return restored_result - except Exception as e: - self.logger.log(f"전체 이미지 처리도 실패: {e}, 원본 반환", level=logging.ERROR) - result = scaled_image.copy() - # 🔥 크기 제한 적용된 경우 원본 크기로 복원 - if was_scaled: - result = self.restore_original_scale(result, original_size, effective_config['upscale_interpolation']) - return result - - # 🔥 최종 크기 복원 시간 측정 - final_restore_time = 0 - if was_scaled: - final_restore_start_time = time.time() - result_image = self.restore_original_scale( - result_image, original_size, effective_config['upscale_interpolation'] + self.logger.log( + f"🔧 처리 준비: 버킷={bucket_name}, 전략={strategy}, " + f"버킷화={bucket_time:.3f}s, 스케일링={scale_time:.3f}s, 전략선택={strategy_time:.3f}s", + level=logging.INFO ) - final_restore_time = time.time() - final_restore_start_time - self.logger.log(f"[PERF] 최종 크기 복원 시간: {final_restore_time:.3f}초", level=logging.INFO) - - # 🔥 전체 처리 시간 요약 - total_time = time.time() - total_start_time - overhead_time = total_time - total_roi_time # 순수 ROI 처리 외 오버헤드 - - self.logger.log( - f"[PERF] ===== ROI 처리 성능 요약 =====", level=logging.INFO - ) - self.logger.log( - f"[PERF] 전처리 오버헤드: {scale_time + mask_prep_time + component_time + merge_time:.3f}초 " - f"(스케일:{scale_time:.3f}s, 마스크:{mask_prep_time:.3f}s, 분석:{component_time:.3f}s, 병합:{merge_time:.3f}s)", - level=logging.INFO - ) - self.logger.log(f"[PERF] ROI 처리 시간: {total_roi_time:.3f}초", level=logging.INFO) - self.logger.log(f"[PERF] 후처리 시간: {final_restore_time:.3f}초", level=logging.INFO) - self.logger.log(f"[PERF] 총 처리 시간: {total_time:.3f}초", level=logging.INFO) - self.logger.log(f"[PERF] ROI 처리 효율성: {(total_roi_time/total_time)*100:.1f}%", level=logging.INFO) - - return result_image + + # 🔥 3단계: 인페인팅 실행 (기존 로직 유지) + inpaint_start_time = time.time() + + if strategy == "full": + result = self._process_full_image_optimized(scaled_image, scaled_mask, effective_config, bucket_name) + elif strategy == "roi_parallel": + result = self._process_roi_parallel(scaled_image, scaled_mask, components, effective_config) + else: # roi + result = self._process_roi_sequential(scaled_image, scaled_mask, components, effective_config) + + inpaint_time = time.time() - inpaint_start_time + + # 🔥 4단계: 복원 및 성능 기록 + restore_start_time = time.time() + + # 스케일링 복원 + if scale_info['scaled']: + result = self.restore_original_scale(result, scale_info) + + # 버킷 패딩 복원 + result = self.restore_from_padding(result, bucket_info) + + restore_time = time.time() - restore_start_time + total_time = time.time() - start_time + + # 🔥 성능 히스토리 기록 + self.record_performance(bucket_name, inpaint_time, strategy) + + self.logger.log( + f"🎯 인페인팅 완료: 총 {total_time:.3f}s (인페인팅: {inpaint_time:.3f}s, 복원: {restore_time:.3f}s)", + level=logging.INFO + ) + + return result + + except Exception as e: + self.logger.log(f"ROI 인페인팅 실패: {e}", level=logging.ERROR) + import traceback + self.logger.log(traceback.format_exc(), level=logging.DEBUG) + return image def prepare_rois_parallel(self, image: np.ndarray, binary_mask: np.ndarray, merged_components: List[Tuple[int, int, int, int]], @@ -853,17 +767,21 @@ class ROIInpaintingModule: Returns: 전처리된 ROI 정보 리스트 """ - from concurrent.futures import ThreadPoolExecutor import time def prepare_single_roi(roi_info): idx, comp_bbox = roi_info - # ROI 영역 확장 - roi_bbox = self.expand_roi( - comp_bbox, image.shape, - margin_ratio=config['margin_ratio'] - ) + # 🔥 ROI 영역 확장 (설정에 따라 기본/컨텍스트 선택) + if config.get('enable_mask_refinement', False): + roi_bbox = self.expand_roi_with_context( + comp_bbox, image.shape, config + ) + else: + roi_bbox = self.expand_roi( + comp_bbox, image.shape, + margin_ratio=config['margin_ratio'] + ) x1, y1, x2, y2 = roi_bbox # ROI 추출 @@ -1047,6 +965,175 @@ class ROIInpaintingModule: pass self.logger.log("메모리 정리 완료", level=logging.INFO) + def get_optimal_bucket_size(self, height: int, width: int) -> Tuple[int, int, str]: + """ + 🔥 이미지 크기를 성능 최적화된 버킷으로 정규화 + + Args: + height, width: 원본 이미지 크기 + + Returns: + (최적화된 높이, 너비, 버킷명) + """ + # 긴 변과 짧은 변 구분 + long_side = max(height, width) + short_side = min(height, width) + is_portrait = height > width + + # 🔥 사용자 이미지 크기 기반 버킷 매핑 (항상 원본보다 크거나 같게) + if long_side <= 800: + # 작은 이미지: 640×640 또는 832×512 + if abs(height - width) < 100: # 정사각형에 가까움 + bucket_h, bucket_w = 640, 640 + bucket_name = "square_small" + else: + bucket_h, bucket_w = (832, 512) if is_portrait else (512, 832) + bucket_name = "landscape_standard" + + elif long_side <= 1200: + # 중간 이미지: 웹툰 표준 + if short_side >= 700: # 정사각형에 가까움 + bucket_h, bucket_w = 1024, 832 + bucket_name = "webtoon_standard" + else: + bucket_h, bucket_w = (1152, 896) if is_portrait else (896, 1152) + bucket_name = "webtoon_portrait" + + elif long_side <= 2100: + # 🔥 긴 이미지: 원본 크기 고려하여 버킷 선택 + if is_portrait: + # 세로형: 높이를 충분히 큰 버킷으로 + bucket_h = max(2048, ((height // 64) + 1) * 64) # 64배수로 올림 + bucket_w = max(896, ((width // 64) + 1) * 64) + else: + # 가로형: 너비를 충분히 큰 버킷으로 + bucket_w = max(2048, ((width // 64) + 1) * 64) + bucket_h = max(896, ((height // 64) + 1) * 64) + bucket_name = "webtoon_long" + + else: + # 🔥 매우 큰 이미지: 원본 크기보다 크게 + if is_portrait: + bucket_h = ((height // 128) + 1) * 128 # 128배수로 올림 + bucket_w = max(1280, ((width // 64) + 1) * 64) + else: + bucket_w = ((width // 128) + 1) * 128 + bucket_h = max(768, ((height // 64) + 1) * 64) + bucket_name = "landscape_wide" + + # 🔥 최종 방향 조정 (항상 원본보다 크거나 같게 보장) + if is_portrait: + final_h = max(height, bucket_h) + final_w = max(width, bucket_w) + else: + final_h = max(height, bucket_h) + final_w = max(width, bucket_w) + + self.logger.log( + f"🔧 형상 버킷 최적화: {height}×{width} → {final_h}×{final_w} ({bucket_name})", + level=logging.INFO + ) + + return final_h, final_w, bucket_name + + def apply_optimal_padding(self, image: np.ndarray, mask: np.ndarray) -> Tuple[np.ndarray, np.ndarray, Dict]: + """ + 🔥 성능 최적화된 크기로 패딩 + + Args: + image, mask: 원본 이미지와 마스크 + + Returns: + (패딩된 이미지, 패딩된 마스크, 복원 정보) + """ + original_h, original_w = image.shape[:2] + target_h, target_w, bucket_name = self.get_optimal_bucket_size(original_h, original_w) + + # 🔥 패딩 값 계산 및 안전성 검증 + pad_h = target_h - original_h + pad_w = target_w - original_w + + # 🔥 음수 패딩 방지 (타겟이 원본보다 작을 경우) + if pad_h < 0 or pad_w < 0: + self.logger.log( + f"⚠️ 버킷 크기 오류: 원본({original_h}×{original_w}) > 타겟({target_h}×{target_w}), 패딩 건너뜀", + level=logging.WARNING + ) + # 패딩 없이 원본 반환 + return image, mask, { + 'original_size': (original_h, original_w), + 'target_size': (original_h, original_w), + 'bucket_name': bucket_name + "_no_padding", + 'padding': (0, 0, 0, 0) + } + + pad_top = pad_h // 2 + pad_bottom = pad_h - pad_top + pad_left = pad_w // 2 + pad_right = pad_w - pad_left + + # 🔥 추가 안전성 검증 + if pad_top < 0 or pad_bottom < 0 or pad_left < 0 or pad_right < 0: + self.logger.log( + f"⚠️ 패딩 값 오류: top={pad_top}, bottom={pad_bottom}, left={pad_left}, right={pad_right}", + level=logging.ERROR + ) + return image, mask, { + 'original_size': (original_h, original_w), + 'target_size': (original_h, original_w), + 'bucket_name': bucket_name + "_error", + 'padding': (0, 0, 0, 0) + } + + # 이미지 패딩 (reflect로 자연스럽게) + padded_image = cv2.copyMakeBorder( + image, pad_top, pad_bottom, pad_left, pad_right, + borderType=cv2.BORDER_REFLECT + ) + + # 마스크 패딩 (상수로) + padded_mask = cv2.copyMakeBorder( + mask, pad_top, pad_bottom, pad_left, pad_right, + borderType=cv2.BORDER_CONSTANT, value=0 + ) + + restore_info = { + 'original_size': (original_h, original_w), + 'target_size': (target_h, target_w), + 'bucket_name': bucket_name, + 'padding': (pad_top, pad_bottom, pad_left, pad_right) + } + + self.logger.log( + f"🔧 패딩 적용: {original_h}×{original_w} → {target_h}×{target_w} " + f"(padding: top={pad_top}, bottom={pad_bottom}, left={pad_left}, right={pad_right})", + level=logging.INFO + ) + + return padded_image, padded_mask, restore_info + + def restore_from_padding(self, image: np.ndarray, restore_info: Dict) -> np.ndarray: + """패딩된 이미지를 원본 크기로 복원""" + pad_top, pad_bottom, pad_left, pad_right = restore_info['padding'] + original_h, original_w = restore_info['original_size'] + + # 패딩 제거 + if pad_bottom == 0: + cropped = image[pad_top:, :] + else: + cropped = image[pad_top:-pad_bottom, :] + + if pad_right == 0: + cropped = cropped[:, pad_left:] + else: + cropped = cropped[:, pad_left:-pad_right] + + # 최종 크기 검증 및 리사이즈 + if cropped.shape[:2] != (original_h, original_w): + cropped = cv2.resize(cropped, (original_w, original_h), interpolation=cv2.INTER_CUBIC) + + return cropped + def get_processing_stats(self, image: np.ndarray, mask: np.ndarray) -> Dict[str, Any]: """ 처리 통계 정보 반환 (실제 처리 없이 분석만) @@ -1059,7 +1146,7 @@ class ROIInpaintingModule: 처리 통계 딕셔너리 """ binary_mask = (mask > 128).astype(np.uint8) * 255 - components = self.find_mask_components(binary_mask) + components = self.find_mask_components(binary_mask, self.default_config) merged_components = self.merge_nearby_components(components) total_area = image.shape[0] * image.shape[1] @@ -1077,7 +1164,7 @@ class ROIInpaintingModule: 'num_merged_rois': len(merged_components), 'roi_areas': roi_areas, 'total_roi_area': sum(roi_areas), - 'roi_coverage_ratio': sum(roi_areas) / total_area if total_area > 0 else 0, + 'roi_coverage_ratio': sum(roi_areas) / total_area if total_area > 0 else 0.0, 'will_process_full': self.should_process_full_image(components, image.shape), 'memory_efficiency': 1.0 - (sum(roi_areas) / total_area) if not self.should_process_full_image(components, image.shape) else 0.0 } @@ -1171,6 +1258,251 @@ class ROIInpaintingModule: return batch_image, batch_mask, batch_info + def refine_mask(self, mask: np.ndarray, config: Dict[str, Any] = None) -> np.ndarray: + """ + 🔥 마스크 품질 개선을 위한 고급 정제 + + Args: + mask: 원본 마스크 + config: 설정 오버라이드 + + Returns: + 정제된 마스크 + """ + if config is None: + config = self.default_config + + if not config.get('enable_mask_refinement', True): + return mask + + refined_mask = mask.copy() + + # 🔥 1단계: 작은 노이즈 제거 (Opening) + erosion_kernel = config.get('mask_erosion_kernel', 2) + if erosion_kernel > 0: + kernel = np.ones((erosion_kernel, erosion_kernel), np.uint8) + refined_mask = cv2.morphologyEx(refined_mask, cv2.MORPH_OPEN, kernel) + + # 🔥 2단계: 마스크 영역 확장 (텍스트 경계 완전 커버) + dilation_kernel = config.get('mask_dilation_kernel', 3) + if dilation_kernel > 0: + kernel = np.ones((dilation_kernel, dilation_kernel), np.uint8) + refined_mask = cv2.dilate(refined_mask, kernel, iterations=1) + + # 🔥 3단계: 부드러운 경계 생성 + blur_kernel = config.get('mask_blur_kernel', 5) + if blur_kernel > 0 and blur_kernel % 2 == 1: + refined_mask = cv2.GaussianBlur(refined_mask, (blur_kernel, blur_kernel), 0) + # 블러 후 다시 이진화 (128 이상을 255로) + refined_mask = np.where(refined_mask > 128, 255, 0).astype(np.uint8) + + self.logger.log( + f"🔧 마스크 정제 완료: erosion={erosion_kernel}, dilation={dilation_kernel}, blur={blur_kernel}", + level=logging.INFO + ) + + return refined_mask + + def expand_roi_with_context(self, bbox: Tuple[int, int, int, int], + image_shape: Tuple[int, int], + config: Dict[str, Any] = None) -> Tuple[int, int, int, int]: + """ + 🔥 컨텍스트를 고려한 ROI 확장 (더 넓은 영역으로 품질 향상) + + Args: + bbox: 원본 바운딩 박스 + image_shape: 이미지 크기 + config: 설정 + + Returns: + 확장된 바운딩 박스 + """ + if config is None: + config = self.default_config + + # 기본 여백 + 컨텍스트 확장 + base_margin_ratio = config.get('margin_ratio', 0.15) + context_expansion = config.get('context_expansion_ratio', 0.3) + + total_margin_ratio = base_margin_ratio + context_expansion + + h, w = image_shape[:2] + x1, y1, x2, y2 = bbox + + # 현재 크기 기준으로 여백 계산 + roi_w, roi_h = x2 - x1, y2 - y1 + margin_x = int(roi_w * total_margin_ratio) + margin_y = int(roi_h * total_margin_ratio) + + # 이미지 경계 내로 제한 + x1 = max(0, x1 - margin_x) + y1 = max(0, y1 - margin_y) + x2 = min(w, x2 + margin_x) + y2 = min(h, y2 + margin_y) + + self.logger.log( + f"🔧 컨텍스트 확장: 기본 여백 {base_margin_ratio:.1%} + 컨텍스트 {context_expansion:.1%} = {total_margin_ratio:.1%}", + level=logging.INFO + ) + + return (x1, y1, x2, y2) + + def _setup_cudnn_optimization(self): + """cuDNN 최적화 설정""" + try: + import torch + if torch.cuda.is_available(): + torch.backends.cudnn.benchmark = True + torch.backends.cudnn.deterministic = False + self.logger.log("cuDNN 최적화 설정 완료", level=logging.INFO) + except ImportError: + self.logger.log("cuDNN 라이브러리를 찾을 수 없습니다. cuDNN 최적화를 사용할 수 없습니다.", level=logging.WARNING) + + def record_performance(self, bucket_name: str, processing_time: float, strategy: str): + """🔥 버킷별 성능 히스토리 기록""" + if bucket_name not in self.bucket_performance_history: + self.bucket_performance_history[bucket_name] = { + 'times': [], + 'strategies': [], + 'avg_time': 0.0, + 'slow_count': 0 + } + + history = self.bucket_performance_history[bucket_name] + history['times'].append(processing_time) + history['strategies'].append(strategy) + + # 최근 5개 기록만 유지 + if len(history['times']) > 5: + history['times'] = history['times'][-5:] + history['strategies'] = history['strategies'][-5:] + + # 평균 시간 계산 + history['avg_time'] = sum(history['times']) / len(history['times']) + + # 느린 처리 카운트 (2초 이상) + history['slow_count'] = sum(1 for t in history['times'] if t > 2.0) + + self.logger.log( + f"📊 성능 기록: {bucket_name} ({strategy}) = {processing_time:.2f}s, 평균: {history['avg_time']:.2f}s", + level=logging.INFO + ) + + def should_fallback_to_roi(self, bucket_name: str, predicted_time: float) -> bool: + """🔥 성능 히스토리 기반 ROI 폴백 판단""" + if bucket_name not in self.bucket_performance_history: + return False + + history = self.bucket_performance_history[bucket_name] + + # 🔥 조건 1: 평균 시간이 예측치의 4배 이상 + if history['avg_time'] > predicted_time * 4: + self.logger.log( + f"⚡ ROI 폴백 트리거: {bucket_name} 평균 {history['avg_time']:.2f}s > 예측 {predicted_time:.2f}s × 4", + level=logging.WARNING + ) + return True + + # 🔥 조건 2: 최근 3회 중 2회 이상이 느림 + if len(history['times']) >= 3 and history['slow_count'] >= 2: + self.logger.log( + f"⚡ ROI 폴백 트리거: {bucket_name} 최근 {history['slow_count']}/3회 느림", + level=logging.WARNING + ) + return True + + return False + + def _process_full_image_optimized(self, image: np.ndarray, mask: np.ndarray, + config: Dict[str, Any], bucket_name: str) -> np.ndarray: + """🔥 최적화된 전체 이미지 처리""" + self.logger.log(f"전체 이미지 처리 시작 (버킷: {bucket_name})", level=logging.INFO) + + # 이진 마스크로 변환 + binary_mask = (mask > 128).astype(np.uint8) * 255 + + # 크기 정규화 + normalized_image, normalized_mask, normalized_size = self.normalize_roi_size(image, binary_mask) + + # SimpleLama 처리 + simple_lama = self._get_simple_lama() + normalized_image_rgb = cv2.cvtColor(normalized_image, cv2.COLOR_BGR2RGB) + result_pil = simple_lama(normalized_image_rgb, normalized_mask) + result = np.array(result_pil) + result_bgr = cv2.cvtColor(result, cv2.COLOR_RGB2BGR) + + # 원본 크기로 복원 + restored_result = self.restore_roi_size(result_bgr, normalized_size) + + return restored_result + + def _process_roi_sequential(self, image: np.ndarray, mask: np.ndarray, + components: List, config: Dict[str, Any]) -> np.ndarray: + """🔥 순차 ROI 처리""" + self.logger.log("ROI 순차 처리 시작", level=logging.INFO) + + binary_mask = (mask > 128).astype(np.uint8) * 255 + merged_components = self.merge_nearby_components(components, config['merge_distance']) + result_image = image.copy() + + successful_rois = 0 + for i, comp_bbox in enumerate(merged_components): + # ROI 영역 확장 + if config.get('enable_mask_refinement', False): + roi_bbox = self.expand_roi_with_context(comp_bbox, image.shape, config) + else: + roi_bbox = self.expand_roi(comp_bbox, image.shape, margin_ratio=config['margin_ratio']) + + # ROI 처리 + processed_roi, success = self.process_roi(image, binary_mask, roi_bbox, config) + + if success: + x1, y1, x2, y2 = roi_bbox + result_image[y1:y2, x1:x2] = processed_roi + successful_rois += 1 + + self.logger.log(f"ROI 순차 처리 완료: {successful_rois}/{len(merged_components)} 성공", level=logging.INFO) + return result_image + + def _process_roi_parallel(self, image: np.ndarray, mask: np.ndarray, + components: List, config: Dict[str, Any]) -> np.ndarray: + """🔥 병렬 ROI 처리""" + self.logger.log("ROI 병렬 처리 시작", level=logging.INFO) + + binary_mask = (mask > 128).astype(np.uint8) * 255 + merged_components = self.merge_nearby_components(components, config['merge_distance']) + + # ROI 전처리를 병렬로 수행 + prepared_rois = self.prepare_rois_parallel(image, binary_mask, merged_components, config) + result_image = image.copy() + + successful_rois = 0 + for roi_info in prepared_rois: + if roi_info['skip_processing']: + continue + + # SimpleLama 처리 + simple_lama = self._get_simple_lama() + roi_normalized_rgb = cv2.cvtColor(roi_info['normalized_image'], cv2.COLOR_BGR2RGB) + roi_result_pil = simple_lama(roi_normalized_rgb, roi_info['normalized_mask']) + roi_result = np.array(roi_result_pil) + roi_result_bgr = cv2.cvtColor(roi_result, cv2.COLOR_RGB2BGR) + + # 원본 크기로 복원 + restored_roi = self.restore_roi_size(roi_result_bgr, roi_info['original_size']) + + # 부드러운 블렌딩 + blend_mask = self.create_blend_mask(roi_info['roi_mask'], config) + blended_roi = (restored_roi * blend_mask + roi_info['roi_image'] * (1 - blend_mask)).astype(np.uint8) + + # 원본 이미지에 적용 + x1, y1, x2, y2 = roi_info['roi_bbox'] + result_image[y1:y2, x1:x2] = blended_roi + successful_rois += 1 + + self.logger.log(f"ROI 병렬 처리 완료: {successful_rois}/{len(merged_components)} 성공", level=logging.INFO) + return result_image + # 편의 함수들 def create_roi_inpainter(logger=None, config=None):