이미지 경로 변경 및 ROI 최적화 관련 설정 추가. 기존 인페인팅 모듈 삭제 및 마스크 생성 로직 개선. 성능 최적화를 위한 cuDNN 설정 및 ROI 처리 방식 개선. 전체적인 코드 정리 및 주석 보강.
This commit is contained in:
parent
922d19ca31
commit
29a32c4ef6
Binary file not shown.
|
Before Width: | Height: | Size: 1.1 MiB After Width: | Height: | Size: 1.3 MiB |
|
|
@ -16,7 +16,7 @@ import requests
|
|||
|
||||
|
||||
API_ROOT = "http://localhost:7890" # 메인 서버 주소
|
||||
IMAGE_PATH = pathlib.Path("3.jpg")
|
||||
IMAGE_PATH = pathlib.Path("5.jpg")
|
||||
TIMEOUT = 120 # 초
|
||||
|
||||
unwanted_texts = {
|
||||
|
|
@ -37,10 +37,13 @@ unwanted_texts = {
|
|||
toggle_states = {"inpaint_method": "lama:cuda", "min_masks_for_lama": 2, 'title': False, 'title_shuffle': False, 'title_trans_type': False, 'collect_method_combo': '쇼핑API', 'ocr': True, 'unwanted_words': {'할인': '', '무료': '', '증정': '', '이벤트': '', '특가': '', '세일': '', '사은품': '', '보증': '', '품절': '', '행사': '', '할인가': '', '무료배송': '', '가격설명': ''}, 'interval': 3.0, 'watingTime': 20, 'memo': False, 'memo_toggle_exposer': False, 'memo_toggle_order': False, 'optionTrnas': True, 'optionTrnas_method': True, 'optionIMGTrans': True, 'optionIMGTrans_type': '자체서버', 'optionAutoSelect': True, 'price': False, 'tag': False, 'tag_ai': False, 'thumb': False, 'thumb_trans_type': 'CPU', 'thumb_nukki': False, 'remove_background_white': True, 'detail_Option': False, 'detail_IMGTrans': True, 'detail_IMGTrans_type': '자체서버', 'debug_mode': True, 'ed_mode': False, 'discord': False, 'is_localServer': False, 'watermark_toggle': False, 'clientID': '', 'clientSecret': '', 'discord_webhook': '', 'watermark_text': '', 'thumb_rmb_count': 3, 'max_option_count': 6, 'opacity_percent': 20, 'group_index': 4, 'remove_overprice': False, 'cat_rec': False, 'fixed_keywords': False, 'fixed_keywords_count': 2, 'title_length_limit': 27, 'base_dir': 'C:\\Program Files\\Edit_PartTimer\\lib\\src', 'TEMP_IMAGE_DIR': 'C:\\Program Files\\Edit_PartTimer\\lib\\src\\temp_images', 'ERROR_SCREENSHOT_DIR': 'C:\\Program Files\\Edit_PartTimer\\lib\\src\\error_screenshots', 'image_font_path': 'C:\\Program Files\\Edit_PartTimer\\lib\\src\\fonts\\HakgyoansimDunggeunmisoTTFB.ttf', 'watermark_font_path': 'C:\\Program Files\\Edit_PartTimer\\lib\\src\\fonts\\HakgyoansimDunggeunmisoTTFB.ttf', 'request_inpainting_server_url': 'http://171.101.232.45:50205', 'request_rembg_server_url': 'http://171.101.232.45:50205', 'request_rembg_server_url_local': 'http://192.168.0.150:35756', 'membership_level': 'premium', 'image_worker_restart_every': 10, 'image_worker_restart_count': 0, 'products_per_context_restart': 19, 'is_admin': False, 'admin_id': 'matia0514@naver.com', 'admin_pw': '', 'user_id': 'dreamm8985', 'user_pw': '112233', 'unwanted_words_button': False, 'font_type': '폰트5', 'cmb_button': False, 'detail_text_button': False, 'watermark': False}
|
||||
|
||||
toggle_states.update({
|
||||
"ab_mode": "B",
|
||||
"roi_strategy_B": "full",
|
||||
"max_side_B": 1600,
|
||||
"_trace_id": "exp002"
|
||||
"use_roi_optimized_mask": True, # True: 새 방식, False: 기존 방식
|
||||
"enable_mask_refinement": False, # ROI 마스크 정제 비활성화
|
||||
"context_expansion_ratio": 0.4, # 최소 확장
|
||||
"blend_mode": "simple", # 단순 블렌딩
|
||||
"performance_mode": True, # 빠른 경로 사용
|
||||
"max_image_size": 1280, # 더 작은 크기 제한
|
||||
|
||||
})
|
||||
|
||||
def call_translate(img_path: pathlib.Path):
|
||||
|
|
|
|||
|
|
@ -15,37 +15,37 @@ from PIL import Image
|
|||
from celery import Celery
|
||||
from celery.utils.log import get_task_logger
|
||||
|
||||
# ── SimpleLama 가중치 경로 설정 (임포트 전에 환경 구성)
|
||||
try:
|
||||
torch_home = "/app/torch_cache"
|
||||
os.makedirs(torch_home, exist_ok=True)
|
||||
os.environ.setdefault("TORCH_HOME", torch_home)
|
||||
# # ── SimpleLama 가중치 경로 설정 (임포트 전에 환경 구성)
|
||||
# try:
|
||||
# torch_home = "/app/torch_cache"
|
||||
# os.makedirs(torch_home, exist_ok=True)
|
||||
# os.environ.setdefault("TORCH_HOME", torch_home)
|
||||
|
||||
# ✅ 환경변수로 FP16 사용 여부를 제어 (기본: 0=끄기)
|
||||
use_lama_fp16 = os.getenv("USE_LAMA_FP16", "0").strip() in {"1", "true", "True"}
|
||||
# # ✅ 환경변수로 FP16 사용 여부를 제어 (기본: 0=끄기)
|
||||
# use_lama_fp16 = os.getenv("USE_LAMA_FP16", "0").strip() in {"1", "true", "True"}
|
||||
|
||||
fp16_path = os.path.join(torch_home, "Big-LaMa.fp16.pt")
|
||||
default_ckpt = os.path.join(torch_home, "big-lama.pt")
|
||||
# fp16_path = os.path.join(torch_home, "Big-LaMa.fp16.pt")
|
||||
# default_ckpt = os.path.join(torch_home, "big-lama.pt")
|
||||
|
||||
# 기본은 FP32 체크포인트를 우선
|
||||
if os.path.isfile(default_ckpt):
|
||||
os.environ.setdefault("SIMPLE_LAMA_CKPT", default_ckpt)
|
||||
elif os.path.isfile(fp16_path) and use_lama_fp16:
|
||||
os.environ.setdefault("SIMPLE_LAMA_CKPT", fp16_path)
|
||||
# # 기본은 FP32 체크포인트를 우선
|
||||
# if os.path.isfile(default_ckpt):
|
||||
# os.environ.setdefault("SIMPLE_LAMA_CKPT", default_ckpt)
|
||||
# elif os.path.isfile(fp16_path) and use_lama_fp16:
|
||||
# os.environ.setdefault("SIMPLE_LAMA_CKPT", fp16_path)
|
||||
|
||||
# 🔧 [기존 문제 원인] FP16 → big-lama.pt 강제 링크/복사 로직 제거
|
||||
# 필요 시에만 FP16을 직접 지정해서 쓰도록 함.
|
||||
except Exception:
|
||||
pass
|
||||
# # 🔧 [기존 문제 원인] FP16 → big-lama.pt 강제 링크/복사 로직 제거
|
||||
# # 필요 시에만 FP16을 직접 지정해서 쓰도록 함.
|
||||
# except Exception:
|
||||
# pass
|
||||
|
||||
#from worker.ocr_module import OCRModule # ndarray 지원 버전
|
||||
from worker.mask_module_for_paddle import MaskModule
|
||||
# from worker.text_rendering_module import TextRenderingModule
|
||||
from worker.text_rendering_module2 import TextRenderingModule
|
||||
from worker.text_rendering_module import TextRenderingModule
|
||||
# from worker.text_rendering_module2 import TextRenderingModule
|
||||
from worker.rembg_module import RembgRemover
|
||||
from worker.loggerModule import Logger
|
||||
from simple_lama_inpainting import SimpleLama
|
||||
from worker.inpaint_module import Inpainter, InpaintBackends
|
||||
# from simple_lama_inpainting import SimpleLama
|
||||
# from worker.inpaint_module import Inpainter, InpaintBackends
|
||||
from worker.utils_debug import save_debug_artifacts, draw_ocr_overlay
|
||||
from worker.roi_inpainting_module import ROIInpaintingModule
|
||||
|
||||
|
|
@ -111,34 +111,34 @@ def track_phase(phase: str, trace_id: Optional[str] = None):
|
|||
_TEMP = Path(os.getenv("TEMP_STORAGE", "/app/temp_files"))
|
||||
_TEMP.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
_lama: SimpleLama | None = None
|
||||
# _lama: SimpleLama | None = None
|
||||
_ocr = None
|
||||
_mask: MaskModule | None = None
|
||||
_text: TextRenderingModule | None = None
|
||||
_inpainter: Inpainter | None = None
|
||||
# _inpainter: Inpainter | None = None
|
||||
_roi_inpainter: ROIInpaintingModule | None = None
|
||||
_translator = get_translator() # ✅ 워커 부팅 시 생성 & 재사용
|
||||
|
||||
def get_lama():
|
||||
global _lama
|
||||
if _lama is None:
|
||||
_lama = SimpleLama()
|
||||
# 라마 초기화 직후 VRAM 스냅샷
|
||||
_gpu_tracker.log_snapshot(tag="after SimpleLama init")
|
||||
# def get_lama():
|
||||
# global _lama
|
||||
# if _lama is None:
|
||||
# _lama = SimpleLama()
|
||||
# # 라마 초기화 직후 VRAM 스냅샷
|
||||
# _gpu_tracker.log_snapshot(tag="after SimpleLama init")
|
||||
|
||||
return _lama
|
||||
# return _lama
|
||||
|
||||
def get_inpainter() -> Inpainter:
|
||||
global _inpainter
|
||||
if _inpainter is None:
|
||||
_inpainter = Inpainter(
|
||||
logger=clogger,
|
||||
default_backend=InpaintBackends.LAMA, # 기본값은 자유롭게
|
||||
# lama_onnx_fd_path="/app/worker/models/inpainting_lama_2025jan.onnx",
|
||||
# lama_onnx_fd_device="gpu", # "cpu"도 가능
|
||||
# lama_onnx_fd_backend="trt" # "ort"=ONNX Runtime 기본 CPU/GPU 실행(CUDA 환경이면 GPU 사용 가능) "trt"=TensorRT 실행, "cuda"=ONNX Runtime CUDA Execution Provider, "cpu"=ONNX Runtime CPU Execution Provider
|
||||
)
|
||||
return _inpainter
|
||||
# def get_inpainter() -> Inpainter:
|
||||
# global _inpainter
|
||||
# if _inpainter is None:
|
||||
# _inpainter = Inpainter(
|
||||
# logger=clogger,
|
||||
# default_backend=InpaintBackends.LAMA, # 기본값은 자유롭게
|
||||
# # lama_onnx_fd_path="/app/worker/models/inpainting_lama_2025jan.onnx",
|
||||
# # lama_onnx_fd_device="gpu", # "cpu"도 가능
|
||||
# # lama_onnx_fd_backend="trt" # "ort"=ONNX Runtime 기본 CPU/GPU 실행(CUDA 환경이면 GPU 사용 가능) "trt"=TensorRT 실행, "cuda"=ONNX Runtime CUDA Execution Provider, "cpu"=ONNX Runtime CPU Execution Provider
|
||||
# )
|
||||
# return _inpainter
|
||||
|
||||
def get_ocr():
|
||||
from worker.ocr_module import OCRModule
|
||||
|
|
@ -180,19 +180,41 @@ except Exception:
|
|||
from celery.signals import worker_process_init
|
||||
@worker_process_init.connect
|
||||
def _warm_up_models(**_):
|
||||
"""워커 프로세스 초기화 시 모델들을 사전 로딩"""
|
||||
try:
|
||||
# OCR 등도 여기서 미리 띄울 수 있음
|
||||
# 🔥 PyTorch 성능 최적화 설정
|
||||
import torch
|
||||
if torch.cuda.is_available():
|
||||
# cuDNN 최적화
|
||||
torch.backends.cudnn.benchmark = True
|
||||
torch.backends.cudnn.deterministic = False
|
||||
|
||||
# TF32 활성화 (Ampere 이상 GPU에서 성능 향상)
|
||||
torch.backends.cuda.matmul.allow_tf32 = True
|
||||
torch.backends.cudnn.allow_tf32 = True
|
||||
|
||||
# 메모리 형식 최적화
|
||||
torch.set_float32_matmul_precision('high')
|
||||
|
||||
logger.info(
|
||||
f"🔧 PyTorch 최적화 완료: "
|
||||
f"cudnn.benchmark={torch.backends.cudnn.benchmark}, "
|
||||
f"allow_tf32={torch.backends.cuda.matmul.allow_tf32}"
|
||||
)
|
||||
|
||||
# 모델 사전 로딩
|
||||
get_ocr()
|
||||
# Inpainter 생성
|
||||
get_inpainter()
|
||||
get_mask()
|
||||
get_text()
|
||||
|
||||
# 🔥 ROI 인페인팅 모듈 사전 초기화
|
||||
roi_inpainter = get_roi_inpainter()
|
||||
roi_inpainter._get_simple_lama() # SimpleLama 사전 로딩
|
||||
# 필요하다면 특정 백엔드 강제 초기화:
|
||||
# get_inpainter()._get_lama_onnx_ort(backend_hint="cuda")
|
||||
print("[warmup] models preloaded (including ROI SimpleLama)")
|
||||
|
||||
logger.info("✅ 모델 사전 로딩 완료 (성능 최적화 포함)")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[warmup] skipped: {e}")
|
||||
logger.warning(f"⚠️ 모델 사전 로딩 건너뜀: {e}")
|
||||
_warm_up_models()
|
||||
|
||||
# ───────────────────────────────── 공통 헬퍼
|
||||
|
|
@ -229,80 +251,80 @@ def _parse_font_number_from_toggle(toggle_states: Dict[str, Any]) -> int | None:
|
|||
logger.warning(f"[font] font_type 파싱 실패: {e}")
|
||||
return None
|
||||
|
||||
def _parse_inpaint_backend(
|
||||
toggle_states: Dict[str, Any],
|
||||
*,
|
||||
default_method: str = "lama",
|
||||
default_backend: str = "ort",
|
||||
default_min_masks_for_lama: int = 4
|
||||
) -> Tuple[str, str, int]:
|
||||
"""
|
||||
toggle_states에서 inpaint_method, backend, min_masks_for_lama 를 파싱.
|
||||
# def _parse_inpaint_backend(
|
||||
# toggle_states: Dict[str, Any],
|
||||
# *,
|
||||
# default_method: str = "lama",
|
||||
# default_backend: str = "ort",
|
||||
# default_min_masks_for_lama: int = 4
|
||||
# ) -> Tuple[str, str, int]:
|
||||
# """
|
||||
# toggle_states에서 inpaint_method, backend, min_masks_for_lama 를 파싱.
|
||||
|
||||
허용 표기(대소문자/공백 무시):
|
||||
- method:
|
||||
"opencv", "cv"
|
||||
"lama", "lama_torch", "torch"
|
||||
"lama_onnx_ort", "onnx_ort" # OpenCV DNN 경로
|
||||
"lama_onnx_fd", "onnx_fd", "fd" # FastDeploy(ORT/TRT/CUDA/CPU)
|
||||
- backend (lama_onnx_fd / lama_onnx 전용):
|
||||
"ort", "trt", "cuda", "cpu"
|
||||
- 콜론 구분 지원: "lama_onnx_fd:trt", "lama_onnx_fd:ort"
|
||||
# 허용 표기(대소문자/공백 무시):
|
||||
# - method:
|
||||
# "opencv", "cv"
|
||||
# "lama", "lama_torch", "torch"
|
||||
# "lama_onnx_ort", "onnx_ort" # OpenCV DNN 경로
|
||||
# "lama_onnx_fd", "onnx_fd", "fd" # FastDeploy(ORT/TRT/CUDA/CPU)
|
||||
# - backend (lama_onnx_fd / lama_onnx 전용):
|
||||
# "ort", "trt", "cuda", "cpu"
|
||||
# - 콜론 구분 지원: "lama_onnx_fd:trt", "lama_onnx_fd:ort"
|
||||
|
||||
키 없음/실패 시 기본값:
|
||||
method = default_method ("lama_onnx_fd")
|
||||
backend = default_backend ("ort")
|
||||
min_masks_for_lama = default_min_masks_for_lama (4)
|
||||
# 키 없음/실패 시 기본값:
|
||||
# method = default_method ("lama_onnx_fd")
|
||||
# backend = default_backend ("ort")
|
||||
# min_masks_for_lama = default_min_masks_for_lama (4)
|
||||
|
||||
Returns:
|
||||
(method_enum, backend_str, min_masks_for_lama:int)
|
||||
"""
|
||||
# Returns:
|
||||
# (method_enum, backend_str, min_masks_for_lama:int)
|
||||
# """
|
||||
|
||||
# 1) 안전하게 읽기
|
||||
try:
|
||||
raw = str((toggle_states or {}).get("inpaint_method", "")).strip().lower()
|
||||
except Exception:
|
||||
raw = ""
|
||||
if not raw:
|
||||
raw = f"{default_method}:{default_backend}"
|
||||
# # 1) 안전하게 읽기
|
||||
# try:
|
||||
# raw = str((toggle_states or {}).get("inpaint_method", "")).strip().lower()
|
||||
# except Exception:
|
||||
# raw = ""
|
||||
# if not raw:
|
||||
# raw = f"{default_method}:{default_backend}"
|
||||
|
||||
# 2) method / backend 분리
|
||||
if ":" in raw:
|
||||
method_tok, backend_tok = [t.strip() for t in raw.split(":", 1)]
|
||||
else:
|
||||
method_tok, backend_tok = raw, default_backend
|
||||
# # 2) method / backend 분리
|
||||
# if ":" in raw:
|
||||
# method_tok, backend_tok = [t.strip() for t in raw.split(":", 1)]
|
||||
# else:
|
||||
# method_tok, backend_tok = raw, default_backend
|
||||
|
||||
# 3) method 매핑
|
||||
method_map = {
|
||||
"opencv": InpaintBackends.OPENCV,
|
||||
"cv": InpaintBackends.OPENCV,
|
||||
# # 3) method 매핑
|
||||
# method_map = {
|
||||
# "opencv": InpaintBackends.OPENCV,
|
||||
# "cv": InpaintBackends.OPENCV,
|
||||
|
||||
"lama": InpaintBackends.LAMA,
|
||||
"lama_torch": InpaintBackends.LAMA,
|
||||
"torch": InpaintBackends.LAMA,
|
||||
# "lama": InpaintBackends.LAMA,
|
||||
# "lama_torch": InpaintBackends.LAMA,
|
||||
# "torch": InpaintBackends.LAMA,
|
||||
|
||||
# ⬇️ 새 별칭들
|
||||
"lama_torch_amp": InpaintBackends.LAMA_TORCH_AMP,
|
||||
"torch_amp": InpaintBackends.LAMA_TORCH_AMP,
|
||||
"amp": InpaintBackends.LAMA_TORCH_AMP,
|
||||
# # ⬇️ 새 별칭들
|
||||
# "lama_torch_amp": InpaintBackends.LAMA_TORCH_AMP,
|
||||
# "torch_amp": InpaintBackends.LAMA_TORCH_AMP,
|
||||
# "amp": InpaintBackends.LAMA_TORCH_AMP,
|
||||
|
||||
}
|
||||
method_enum = method_map.get(
|
||||
method_tok,
|
||||
method_map.get(default_method, InpaintBackends.LAMA)
|
||||
)
|
||||
# }
|
||||
# method_enum = method_map.get(
|
||||
# method_tok,
|
||||
# method_map.get(default_method, InpaintBackends.LAMA)
|
||||
# )
|
||||
|
||||
# 4) backend 정규화
|
||||
backend_tok = (backend_tok or default_backend).lower()
|
||||
backend_enum = backend_tok if backend_tok in {"ort", "trt", "cuda", "cpu"} else default_backend
|
||||
# # 4) backend 정규화
|
||||
# backend_tok = (backend_tok or default_backend).lower()
|
||||
# backend_enum = backend_tok if backend_tok in {"ort", "trt", "cuda", "cpu"} else default_backend
|
||||
|
||||
# 5) min_masks_for_lama 파싱
|
||||
try:
|
||||
mmfl = int((toggle_states or {}).get("min_masks_for_lama", default_min_masks_for_lama))
|
||||
except (TypeError, ValueError):
|
||||
mmfl = default_min_masks_for_lama
|
||||
# # 5) min_masks_for_lama 파싱
|
||||
# try:
|
||||
# mmfl = int((toggle_states or {}).get("min_masks_for_lama", default_min_masks_for_lama))
|
||||
# except (TypeError, ValueError):
|
||||
# mmfl = default_min_masks_for_lama
|
||||
|
||||
return method_enum, backend_enum, mmfl
|
||||
# return method_enum, backend_enum, mmfl
|
||||
|
||||
# def run_inpaint(
|
||||
# src_bgr,
|
||||
|
|
@ -350,118 +372,118 @@ def _parse_inpaint_backend(
|
|||
# )
|
||||
|
||||
|
||||
def run_inpaint(
|
||||
src_bgr,
|
||||
polygons,
|
||||
toggle_states: Dict[str, Any],
|
||||
*,
|
||||
max_side: int = 1024,
|
||||
auto_opencv_if_few: bool = True
|
||||
):
|
||||
"""
|
||||
기존 호출부 유지. toggle_states 로 A/B 모드 제어:
|
||||
- ab_mode: "A" | "B" | "A+B" (기본 "A")
|
||||
- A = components ROI (확대/근접 병합/소프트블렌딩)
|
||||
- B = full-frame (ROI 미사용, 비교용)
|
||||
"""
|
||||
# 기존 파라미터 파싱 유지
|
||||
method_enum, backend_enum, min_masks_for_lama = _parse_inpaint_backend(toggle_states)
|
||||
inpainter = get_inpainter()
|
||||
# def run_inpaint(
|
||||
# src_bgr,
|
||||
# polygons,
|
||||
# toggle_states: Dict[str, Any],
|
||||
# *,
|
||||
# max_side: int = 1024,
|
||||
# auto_opencv_if_few: bool = True
|
||||
# ):
|
||||
# """
|
||||
# 기존 호출부 유지. toggle_states 로 A/B 모드 제어:
|
||||
# - ab_mode: "A" | "B" | "A+B" (기본 "A")
|
||||
# - A = components ROI (확대/근접 병합/소프트블렌딩)
|
||||
# - B = full-frame (ROI 미사용, 비교용)
|
||||
# """
|
||||
# # 기존 파라미터 파싱 유지
|
||||
# method_enum, backend_enum, min_masks_for_lama = _parse_inpaint_backend(toggle_states)
|
||||
# inpainter = get_inpainter()
|
||||
|
||||
# ── 공통 토글
|
||||
ab_mode = str((toggle_states or {}).get("ab_mode", "A")).upper() # "A" | "B" | "A+B"
|
||||
trace_id = (toggle_states or {}).get("_trace_id", None)
|
||||
debug_root = os.getenv("DEBUG_DUMP_DIR", "/app/temp_files/debug")
|
||||
ab_dir = os.path.join(debug_root, "AB")
|
||||
try:
|
||||
os.makedirs(ab_dir, exist_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
# # ── 공통 토글
|
||||
# ab_mode = str((toggle_states or {}).get("ab_mode", "A")).upper() # "A" | "B" | "A+B"
|
||||
# trace_id = (toggle_states or {}).get("_trace_id", None)
|
||||
# debug_root = os.getenv("DEBUG_DUMP_DIR", "/app/temp_files/debug")
|
||||
# ab_dir = os.path.join(debug_root, "AB")
|
||||
# try:
|
||||
# os.makedirs(ab_dir, exist_ok=True)
|
||||
# except Exception:
|
||||
# pass
|
||||
|
||||
# ── A(components ROI)용 kwargs: (값 없으면 기본 추천값 사용)
|
||||
A_kwargs = dict(
|
||||
backend=method_enum, # "lama" 권장
|
||||
roi_strategy=str((toggle_states or {}).get("roi_strategy_A", "components")).lower(), # "components"
|
||||
max_side=int((toggle_states or {}).get("max_side_A", 1600)),
|
||||
auto_opencv_if_few=bool((toggle_states or {}).get("auto_opencv_if_few", False)),
|
||||
few_threshold=int((toggle_states or {}).get("few_threshold", 0)),
|
||||
comp_min_area=int((toggle_states or {}).get("comp_min_area", 30)),
|
||||
pad_ratio=float((toggle_states or {}).get("pad_ratio", 0.12)),
|
||||
merge_thresh_factor=float((toggle_states or {}).get("merge_thresh_factor", 0.7)),
|
||||
merge_abs_min_px=int((toggle_states or {}).get("merge_abs_min_px", 8)),
|
||||
soft_dilate_px=int((toggle_states or {}).get("soft_dilate_px", 10)),
|
||||
soft_blur_px=int((toggle_states or {}).get("soft_blur_px", 17)),
|
||||
debug_save_rois=bool((toggle_states or {}).get("debug_save_rois", False)),
|
||||
debug_dir=os.path.join(debug_root, "ROIs"),
|
||||
request_id=trace_id
|
||||
)
|
||||
# # ── A(components ROI)용 kwargs: (값 없으면 기본 추천값 사용)
|
||||
# A_kwargs = dict(
|
||||
# backend=method_enum, # "lama" 권장
|
||||
# roi_strategy=str((toggle_states or {}).get("roi_strategy_A", "components")).lower(), # "components"
|
||||
# max_side=int((toggle_states or {}).get("max_side_A", 1600)),
|
||||
# auto_opencv_if_few=bool((toggle_states or {}).get("auto_opencv_if_few", False)),
|
||||
# few_threshold=int((toggle_states or {}).get("few_threshold", 0)),
|
||||
# comp_min_area=int((toggle_states or {}).get("comp_min_area", 30)),
|
||||
# pad_ratio=float((toggle_states or {}).get("pad_ratio", 0.12)),
|
||||
# merge_thresh_factor=float((toggle_states or {}).get("merge_thresh_factor", 0.7)),
|
||||
# merge_abs_min_px=int((toggle_states or {}).get("merge_abs_min_px", 8)),
|
||||
# soft_dilate_px=int((toggle_states or {}).get("soft_dilate_px", 10)),
|
||||
# soft_blur_px=int((toggle_states or {}).get("soft_blur_px", 17)),
|
||||
# debug_save_rois=bool((toggle_states or {}).get("debug_save_rois", False)),
|
||||
# debug_dir=os.path.join(debug_root, "ROIs"),
|
||||
# request_id=trace_id
|
||||
# )
|
||||
|
||||
# ── B(full-frame)용 kwargs
|
||||
B_kwargs = dict(
|
||||
backend=method_enum,
|
||||
roi_strategy=str((toggle_states or {}).get("roi_strategy_B", "full")).lower(), # "full"
|
||||
max_side=int((toggle_states or {}).get("max_side_B", 1600)),
|
||||
auto_opencv_if_few=False,
|
||||
few_threshold=0,
|
||||
# full 도 얇게 블렌딩
|
||||
soft_dilate_px=int((toggle_states or {}).get("soft_dilate_px_full", (toggle_states or {}).get("soft_dilate_px", 10))),
|
||||
soft_blur_px=int((toggle_states or {}).get("soft_blur_px_full", (toggle_states or {}).get("soft_blur_px", 17))),
|
||||
# 아래는 시그니처 호환용
|
||||
comp_min_area=int((toggle_states or {}).get("comp_min_area", 30)),
|
||||
pad_ratio=float((toggle_states or {}).get("pad_ratio", 0.12)),
|
||||
merge_thresh_factor=float((toggle_states or {}).get("merge_thresh_factor", 0.7)),
|
||||
merge_abs_min_px=int((toggle_states or {}).get("merge_abs_min_px", 8)),
|
||||
debug_save_rois=False,
|
||||
debug_dir=None,
|
||||
request_id=trace_id
|
||||
)
|
||||
# # ── B(full-frame)용 kwargs
|
||||
# B_kwargs = dict(
|
||||
# backend=method_enum,
|
||||
# roi_strategy=str((toggle_states or {}).get("roi_strategy_B", "full")).lower(), # "full"
|
||||
# max_side=int((toggle_states or {}).get("max_side_B", 1600)),
|
||||
# auto_opencv_if_few=False,
|
||||
# few_threshold=0,
|
||||
# # full 도 얇게 블렌딩
|
||||
# soft_dilate_px=int((toggle_states or {}).get("soft_dilate_px_full", (toggle_states or {}).get("soft_dilate_px", 10))),
|
||||
# soft_blur_px=int((toggle_states or {}).get("soft_blur_px_full", (toggle_states or {}).get("soft_blur_px", 17))),
|
||||
# # 아래는 시그니처 호환용
|
||||
# comp_min_area=int((toggle_states or {}).get("comp_min_area", 30)),
|
||||
# pad_ratio=float((toggle_states or {}).get("pad_ratio", 0.12)),
|
||||
# merge_thresh_factor=float((toggle_states or {}).get("merge_thresh_factor", 0.7)),
|
||||
# merge_abs_min_px=int((toggle_states or {}).get("merge_abs_min_px", 8)),
|
||||
# debug_save_rois=False,
|
||||
# debug_dir=None,
|
||||
# request_id=trace_id
|
||||
# )
|
||||
|
||||
# ── 실행 래퍼 (결과 파일도 저장)
|
||||
def _run_and_save(label: str, kwargs: Dict[str, Any]) -> np.ndarray:
|
||||
out = inpainter.inpaint(src_bgr, polygons, **kwargs)
|
||||
try:
|
||||
fname = f"{(trace_id or 'ab')}_{label}.png"
|
||||
cv2.imwrite(os.path.join(ab_dir, fname), out)
|
||||
except Exception:
|
||||
pass
|
||||
return out
|
||||
# # ── 실행 래퍼 (결과 파일도 저장)
|
||||
# def _run_and_save(label: str, kwargs: Dict[str, Any]) -> np.ndarray:
|
||||
# out = inpainter.inpaint(src_bgr, polygons, **kwargs)
|
||||
# try:
|
||||
# fname = f"{(trace_id or 'ab')}_{label}.png"
|
||||
# cv2.imwrite(os.path.join(ab_dir, fname), out)
|
||||
# except Exception:
|
||||
# pass
|
||||
# return out
|
||||
|
||||
# ── 모드 분기
|
||||
if ab_mode == "A":
|
||||
return _run_and_save("A_components", A_kwargs)
|
||||
# # ── 모드 분기
|
||||
# if ab_mode == "A":
|
||||
# return _run_and_save("A_components", A_kwargs)
|
||||
|
||||
if ab_mode == "B":
|
||||
return _run_and_save("B_full", B_kwargs)
|
||||
# if ab_mode == "B":
|
||||
# return _run_and_save("B_full", B_kwargs)
|
||||
|
||||
# ── "A+B": 좌우 합성 프리뷰 반환 (단일 결과는 파일로 저장됨)
|
||||
outA = _run_and_save("A_components", A_kwargs)
|
||||
outB = _run_and_save("B_full", B_kwargs)
|
||||
# # ── "A+B": 좌우 합성 프리뷰 반환 (단일 결과는 파일로 저장됨)
|
||||
# outA = _run_and_save("A_components", A_kwargs)
|
||||
# outB = _run_and_save("B_full", B_kwargs)
|
||||
|
||||
# 높이 맞춰 좌우 스택
|
||||
h = min(outA.shape[0], outB.shape[0])
|
||||
# # 높이 맞춰 좌우 스택
|
||||
# h = min(outA.shape[0], outB.shape[0])
|
||||
|
||||
def _resize_to_h(img, h):
|
||||
if img.shape[0] == h:
|
||||
return img
|
||||
ratio = h / img.shape[0]
|
||||
new_w = int(round(img.shape[1] * ratio))
|
||||
return cv2.resize(img, (new_w, h), interpolation=cv2.INTER_CUBIC)
|
||||
# def _resize_to_h(img, h):
|
||||
# if img.shape[0] == h:
|
||||
# return img
|
||||
# ratio = h / img.shape[0]
|
||||
# new_w = int(round(img.shape[1] * ratio))
|
||||
# return cv2.resize(img, (new_w, h), interpolation=cv2.INTER_CUBIC)
|
||||
|
||||
a2 = _resize_to_h(outA, h)
|
||||
b2 = _resize_to_h(outB, h)
|
||||
combo = np.hstack([a2, b2])
|
||||
# a2 = _resize_to_h(outA, h)
|
||||
# b2 = _resize_to_h(outB, h)
|
||||
# combo = np.hstack([a2, b2])
|
||||
|
||||
# 레이블(있으면 편함)
|
||||
try:
|
||||
cv2.putText(combo, "A: components ROI", (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,0), 4, cv2.LINE_AA)
|
||||
cv2.putText(combo, "A: components ROI", (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2, cv2.LINE_AA)
|
||||
cv2.putText(combo, "B: full-frame", (a2.shape[1] + 10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,0), 4, cv2.LINE_AA)
|
||||
cv2.putText(combo, "B: full-frame", (a2.shape[1] + 10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2, cv2.LINE_AA)
|
||||
cv2.imwrite(os.path.join(ab_dir, f"{(trace_id or 'ab')}_AplusB.png"), combo)
|
||||
except Exception:
|
||||
pass
|
||||
# # 레이블(있으면 편함)
|
||||
# try:
|
||||
# cv2.putText(combo, "A: components ROI", (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,0), 4, cv2.LINE_AA)
|
||||
# cv2.putText(combo, "A: components ROI", (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2, cv2.LINE_AA)
|
||||
# cv2.putText(combo, "B: full-frame", (a2.shape[1] + 10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,0), 4, cv2.LINE_AA)
|
||||
# cv2.putText(combo, "B: full-frame", (a2.shape[1] + 10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2, cv2.LINE_AA)
|
||||
# cv2.imwrite(os.path.join(ab_dir, f"{(trace_id or 'ab')}_AplusB.png"), combo)
|
||||
# except Exception:
|
||||
# pass
|
||||
|
||||
return combo
|
||||
# return combo
|
||||
|
||||
# ───────────────────────────────── translate_task
|
||||
@celery_app.task(name="worker.translate_task",
|
||||
|
|
@ -512,7 +534,34 @@ def translate_task(self, *, image_b64: str, filename: str,
|
|||
# # 2. 번역
|
||||
|
||||
with track_phase("MASK", trace_id):
|
||||
mask = get_mask().create_masks_np(src_bgr, chn) # <─ ndarray 지원
|
||||
# 🔥 A/B 테스트: 기존 방식 vs ROI 최적화 방식
|
||||
use_roi_optimized_mask = toggle_states.get('use_roi_optimized_mask', False) # True → False로 변경
|
||||
|
||||
if use_roi_optimized_mask:
|
||||
# 🔥 ROI 최적화: 적응형 마스크 생성
|
||||
mask = get_mask().create_masks_np(
|
||||
src_bgr, chn,
|
||||
for_roi_processing=True,
|
||||
# 🔥 텍스트 개수에 따른 적응형 expansion
|
||||
expansion_size=min(8, max(4, 10 - len(chn))), # 텍스트 많으면 작게, 적으면 크게
|
||||
blur_size=0 # ROI 모드에서는 블러 없음
|
||||
)
|
||||
mask_type = "ROI최적화"
|
||||
else:
|
||||
# 기존 방식: 전체 후처리 적용
|
||||
mask = get_mask().create_masks_np(src_bgr, chn)
|
||||
mask_type = "기존방식"
|
||||
|
||||
# 🔥 마스크 통계 로깅
|
||||
mask_pixels = np.sum(mask > 0)
|
||||
total_pixels = mask.shape[0] * mask.shape[1]
|
||||
mask_coverage = mask_pixels / total_pixels * 100
|
||||
|
||||
clogger.log(
|
||||
f"🔧 {mask_type} 마스크 사용: 커버리지 {mask_coverage:.2f}% ({mask_pixels:,}/{total_pixels:,} 픽셀)",
|
||||
level=logging.INFO
|
||||
)
|
||||
|
||||
if mask is None:
|
||||
return fail("MASK_ERR", "mask failed")
|
||||
|
||||
|
|
@ -530,6 +579,17 @@ def translate_task(self, *, image_b64: str, filename: str,
|
|||
'merge_distance': toggle_states.get('merge_distance', 50),
|
||||
'margin_ratio': toggle_states.get('margin_ratio', 0.15),
|
||||
'large_mask_threshold': toggle_states.get('large_mask_threshold', 0.5),
|
||||
# 🔥 마스크 정제 비활성화 (마스크 모듈에서 이미 최적화됨)
|
||||
'enable_mask_refinement': toggle_states.get('enable_mask_refinement', False),
|
||||
'mask_erosion_kernel': 0, # 비활성화
|
||||
'mask_dilation_kernel': 0, # 비활성화
|
||||
'mask_blur_kernel': 0, # 비활성화
|
||||
'context_expansion_ratio': toggle_states.get('context_expansion_ratio', 0.1), # 줄임
|
||||
'blend_mode': toggle_states.get('blend_mode', 'simple'), # 단순 블렌딩
|
||||
'feather_blend_size': toggle_states.get('feather_blend_size', 5), # 줄임
|
||||
# 🔥 형상 최적화 설정
|
||||
'enable_shape_optimization': toggle_states.get('enable_shape_optimization', True),
|
||||
'performance_tracking': toggle_states.get('performance_tracking', True),
|
||||
}
|
||||
|
||||
# 처리 전 통계 로깅
|
||||
|
|
@ -557,15 +617,15 @@ def translate_task(self, *, image_b64: str, filename: str,
|
|||
logger.info(f"[TRACE][{trace_id}][font] 폰트 지정 없음 -> 기본 폰트(3번) 사용")
|
||||
|
||||
with track_phase("RENDER", trace_id):
|
||||
# out = get_text().render_text(dst_bgr, chn, ko, font_number=font_number)
|
||||
out = get_text().render_with_market_preset(
|
||||
image_bgr=dst_bgr,
|
||||
ocr_results=chn, # [{'polygon': [[x,y]...], 'text':...}, ...]
|
||||
translated_texts=ko,
|
||||
market=toggle_states.get("market", "coupang"), # 'coupang'|'naver'
|
||||
preset=toggle_states.get("preset", "basic"), # 'basic'|'badge'|'price'
|
||||
font_number=font_number
|
||||
)
|
||||
out = get_text().render_text(dst_bgr, chn, ko, font_number=font_number)
|
||||
# out = get_text().render_with_market_preset(
|
||||
# image_bgr=dst_bgr,
|
||||
# ocr_results=chn, # [{'polygon': [[x,y]...], 'text':...}, ...]
|
||||
# translated_texts=ko,
|
||||
# market=toggle_states.get("market", "coupang"), # 'coupang'|'naver'
|
||||
# preset=toggle_states.get("preset", "basic"), # 'basic'|'badge'|'price'
|
||||
# font_number=font_number
|
||||
# )
|
||||
|
||||
|
||||
# 최종
|
||||
|
|
|
|||
|
|
@ -1,243 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from __future__ import annotations
|
||||
import os, cv2, numpy as np
|
||||
from typing import List, Tuple, Optional
|
||||
from PIL import Image
|
||||
|
||||
# ── (옵션) LaMa
|
||||
try:
|
||||
from simple_lama_inpainting.models.model import SimpleLama
|
||||
_HAVE_LAMA = True
|
||||
except Exception:
|
||||
_HAVE_LAMA = False
|
||||
|
||||
class InpaintBackends:
|
||||
OPENCV = "opencv"
|
||||
LAMA = "lama"
|
||||
LAMA_TORCH_AMP = "lama_torch_amp" # placeholder
|
||||
|
||||
# ── 공통 유틸
|
||||
def polygons_to_mask(shape: Tuple[int,int], polygons: List[List[List[int]]]) -> np.ndarray:
|
||||
h, w = shape
|
||||
mask = np.zeros((h, w), dtype=np.uint8)
|
||||
for poly in polygons:
|
||||
pts = np.array(poly, dtype=np.int32).reshape(-1, 2)
|
||||
cv2.fillPoly(mask, [pts], 255)
|
||||
return mask
|
||||
|
||||
def resize_long_side(img: np.ndarray, max_side: int) -> Tuple[np.ndarray, float]:
|
||||
h, w = img.shape[:2]
|
||||
if max(h, w) <= max_side:
|
||||
return img, 1.0
|
||||
if h >= w:
|
||||
s = max_side / float(h)
|
||||
nh, nw = max_side, int(round(w * s))
|
||||
else:
|
||||
s = max_side / float(w)
|
||||
nw, nh = max_side, int(round(h * s))
|
||||
out = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_AREA)
|
||||
return out, s
|
||||
|
||||
def _soften_mask(mask: np.ndarray, *, dilate_px: int, blur_px: int) -> np.ndarray:
|
||||
m = mask.copy()
|
||||
if dilate_px > 0:
|
||||
k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2*dilate_px+1, 2*dilate_px+1))
|
||||
m = cv2.dilate(m, k, iterations=1)
|
||||
m = cv2.GaussianBlur(m, (blur_px | 1, blur_px | 1), 0)
|
||||
return m
|
||||
|
||||
# ── 컴포넌트/ROI 유틸
|
||||
def _connected_components(mask: np.ndarray, *, min_area: int = 30) -> List[Tuple[int,int,int,int]]:
|
||||
num, _, stats, _ = cv2.connectedComponentsWithStats((mask > 0).astype(np.uint8), connectivity=8)
|
||||
boxes = []
|
||||
for cid in range(1, num):
|
||||
x, y, w, h, area = stats[cid]
|
||||
if w > 1 and h > 1 and area >= min_area:
|
||||
boxes.append((int(x), int(y), int(w), int(h)))
|
||||
return boxes
|
||||
|
||||
def _expand_box(b: Tuple[int,int,int,int], pad_ratio: float, W: int, H: int) -> Tuple[int,int,int,int]:
|
||||
x, y, w, h = b
|
||||
pad = int(round(max(w, h) * pad_ratio))
|
||||
x0 = max(0, x - pad); y0 = max(0, y - pad)
|
||||
x1 = min(W, x + w + pad); y1 = min(H, y + h + pad)
|
||||
return x0, y0, x1 - x0, y1 - y0
|
||||
|
||||
def _min_gap(a: Tuple[int,int,int,int], b: Tuple[int,int,int,int]) -> int:
|
||||
ax, ay, aw, ah = a; bx, by, bw, bh = b
|
||||
ar, ab = ax + aw, ay + ah
|
||||
br, bb = bx + bw, by + bh
|
||||
dx = max(0, max(ax - br, bx - ar))
|
||||
dy = max(0, max(ay - bb, by - ab))
|
||||
return max(dx, dy) # L∞ gap
|
||||
|
||||
def _merge_close_boxes(boxes: List[Tuple[int,int,int,int]], *, thresh_px: int) -> List[Tuple[int,int,int,int]]:
|
||||
if not boxes: return []
|
||||
n = len(boxes)
|
||||
parent = list(range(n))
|
||||
def find(i):
|
||||
while parent[i] != i:
|
||||
parent[i] = parent[parent[i]]
|
||||
i = parent[i]
|
||||
return i
|
||||
def union(i, j):
|
||||
ri, rj = find(i), find(j)
|
||||
if ri != rj: parent[rj] = ri
|
||||
for i in range(n):
|
||||
for j in range(i+1, n):
|
||||
if _min_gap(boxes[i], boxes[j]) <= thresh_px:
|
||||
union(i, j)
|
||||
groups = {}
|
||||
for i, b in enumerate(boxes):
|
||||
r = find(i)
|
||||
groups.setdefault(r, []).append(b)
|
||||
merged = []
|
||||
for grp in groups.values():
|
||||
xs = [x for x,_,_,_ in grp]; ys = [y for _,y,_,_ in grp]
|
||||
rs = [x+w for x,_,w,_ in grp]; bs = [y+h for _,y,_,h in grp]
|
||||
x0, y0, x1, y1 = min(xs), min(ys), max(rs), max(bs)
|
||||
merged.append((x0, y0, x1 - x0, y1 - y0))
|
||||
return merged
|
||||
|
||||
class Inpainter:
|
||||
def __init__(self, logger=None,
|
||||
default_backend: str = InpaintBackends.LAMA,
|
||||
lama_device: str = "cuda"):
|
||||
self.logger = logger
|
||||
self.default_backend = default_backend
|
||||
self.lama_device = lama_device
|
||||
self._lama: Optional[SimpleLama] = None
|
||||
|
||||
def _log(self, msg):
|
||||
if self.logger and hasattr(self.logger, "log"): self.logger.log(msg)
|
||||
else: print(msg)
|
||||
|
||||
def _get_lama(self):
|
||||
if not _HAVE_LAMA:
|
||||
raise RuntimeError("SimpleLama not installed")
|
||||
if self._lama is None:
|
||||
self._log("Init SimpleLama...")
|
||||
self._lama = SimpleLama(device=self.lama_device)
|
||||
return self._lama
|
||||
|
||||
# 평탄 배경에 강한 OpenCV (필요 시 사용)
|
||||
def _opencv_text_inpaint(self, img_bgr: np.ndarray, hard_mask: np.ndarray,
|
||||
r1: int = 3, r2: int = 7) -> np.ndarray:
|
||||
out1 = cv2.inpaint(img_bgr, hard_mask, r1, cv2.INPAINT_TELEA)
|
||||
remain = (hard_mask > 0) & (np.abs(out1.astype(np.int16) - img_bgr.astype(np.int16)).max(axis=2) > 3)
|
||||
out2 = cv2.inpaint(out1, (remain.astype(np.uint8) * 255), r2, cv2.INPAINT_TELEA) if remain.any() else out1
|
||||
return out2
|
||||
|
||||
def _run_backend(self, roi_img: np.ndarray, roi_mask: np.ndarray, backend: str) -> np.ndarray:
|
||||
if backend == InpaintBackends.OPENCV:
|
||||
return self._opencv_text_inpaint(roi_img, roi_mask)
|
||||
elif backend in (InpaintBackends.LAMA, InpaintBackends.LAMA_TORCH_AMP):
|
||||
lama = self._get_lama()
|
||||
dst_pil = lama(Image.fromarray(cv2.cvtColor(roi_img, cv2.COLOR_BGR2RGB)),
|
||||
Image.fromarray(roi_mask, "L"))
|
||||
return cv2.cvtColor(np.array(dst_pil), cv2.COLOR_RGB2BGR)
|
||||
else:
|
||||
raise NotImplementedError(f"Backend {backend} not wired.")
|
||||
|
||||
def inpaint(self, img_bgr: np.ndarray, polygons: List[List[List[int]]],
|
||||
*,
|
||||
backend: Optional[str] = None,
|
||||
# 공통
|
||||
roi_strategy: str = "components", # "components" | "full"
|
||||
max_side: int = 1600,
|
||||
auto_opencv_if_few: bool = False,
|
||||
few_threshold: int = 0,
|
||||
# components 전용
|
||||
comp_min_area: int = 30,
|
||||
pad_ratio: float = 0.12,
|
||||
merge_thresh_factor: float = 0.7,
|
||||
merge_abs_min_px: int = 8,
|
||||
soft_dilate_px: int = 10,
|
||||
soft_blur_px: int = 17,
|
||||
# 디버그 저장
|
||||
debug_save_rois: bool = False,
|
||||
debug_dir: Optional[str] = None,
|
||||
request_id: Optional[str] = None) -> np.ndarray:
|
||||
|
||||
backend = (backend or self.default_backend).lower()
|
||||
H, W = img_bgr.shape[:2]
|
||||
base_mask = polygons_to_mask((H, W), polygons)
|
||||
|
||||
# ── 풀프레임 모드
|
||||
if roi_strategy == "full":
|
||||
img_small, s = resize_long_side(img_bgr, max_side)
|
||||
mask_small = cv2.resize(base_mask, (img_small.shape[1], img_small.shape[0]),
|
||||
interpolation=cv2.INTER_NEAREST) if s != 1.0 else base_mask
|
||||
dst_small = self._run_backend(img_small, mask_small, InpaintBackends.LAMA)
|
||||
|
||||
# 소프트 블렌딩(테두리 얇게)
|
||||
soft_small = _soften_mask(mask_small, dilate_px=soft_dilate_px, blur_px=soft_blur_px)
|
||||
alpha = (soft_small.astype(np.float32) / 255.0)[..., None]
|
||||
blended_small = (alpha * dst_small.astype(np.float32) + (1 - alpha) * img_small.astype(np.float32)).astype(np.uint8)
|
||||
|
||||
out = cv2.resize(blended_small, (W, H), interpolation=cv2.INTER_CUBIC) if s != 1.0 else blended_small
|
||||
return out
|
||||
|
||||
# ── 컴포넌트 기반 ROI 모드
|
||||
boxes = _connected_components(base_mask, min_area=comp_min_area)
|
||||
if not boxes:
|
||||
return img_bgr.copy()
|
||||
|
||||
heights = [h for _,_,_,h in boxes]
|
||||
med_h = float(np.median(heights)) if heights else 0.0
|
||||
merge_px = max(merge_abs_min_px, int(round(med_h * merge_thresh_factor)))
|
||||
merged = _merge_close_boxes(boxes, thresh_px=merge_px)
|
||||
rois = [_expand_box(b, pad_ratio, W, H) for b in merged]
|
||||
rois.sort(key=lambda r: (r[1]//32, r[0]))
|
||||
|
||||
# 디버그 저장 준비
|
||||
save_idx = 0
|
||||
if debug_save_rois and debug_dir:
|
||||
os.makedirs(debug_dir, exist_ok=True)
|
||||
|
||||
out = img_bgr.copy()
|
||||
|
||||
for (x, y, w, h) in rois:
|
||||
if w <= 1 or h <= 1:
|
||||
continue
|
||||
|
||||
roi_img = out[y:y+h, x:x+w]
|
||||
roi_mask = base_mask[y:y+h, x:x+w]
|
||||
roi_soft = _soften_mask(roi_mask, dilate_px=soft_dilate_px, blur_px=soft_blur_px)
|
||||
|
||||
roi_img_small, s = resize_long_side(roi_img, max_side)
|
||||
if s != 1.0:
|
||||
roi_mask_small = cv2.resize(roi_mask, (roi_img_small.shape[1], roi_img_small.shape[0]),
|
||||
interpolation=cv2.INTER_NEAREST)
|
||||
roi_soft_small = cv2.resize(roi_soft, (roi_img_small.shape[1], roi_img_small.shape[0]),
|
||||
interpolation=cv2.INTER_LINEAR)
|
||||
else:
|
||||
roi_mask_small = roi_mask
|
||||
roi_soft_small = roi_soft
|
||||
|
||||
use_backend = (InpaintBackends.OPENCV if (auto_opencv_if_few and len(merged) <= few_threshold)
|
||||
else InpaintBackends.LAMA if backend not in (InpaintBackends.OPENCV,) else backend)
|
||||
|
||||
dst_small = self._run_backend(roi_img_small, roi_mask_small, use_backend)
|
||||
if dst_small.shape[:2] != roi_img_small.shape[:2]:
|
||||
dst_small = cv2.resize(dst_small, (roi_img_small.shape[1], roi_img_small.shape[0]), interpolation=cv2.INTER_CUBIC)
|
||||
|
||||
# 소프트 블렌딩
|
||||
alpha = (roi_soft_small.astype(np.float32) / 255.0)[..., None]
|
||||
blended_small = (alpha * dst_small.astype(np.float32) +
|
||||
(1 - alpha) * roi_img_small.astype(np.float32)).astype(np.uint8)
|
||||
|
||||
# 원 크기로 복원
|
||||
dst_roi = cv2.resize(blended_small, (w, h), interpolation=cv2.INTER_CUBIC) if s != 1.0 else blended_small
|
||||
out[y:y+h, x:x+w] = dst_roi
|
||||
|
||||
# ── 중간 저장 (원본/마스크/결과)
|
||||
if debug_save_rois and debug_dir:
|
||||
base = f"{request_id or 'req'}_roi{save_idx:02d}"
|
||||
cv2.imwrite(os.path.join(debug_dir, base + "_img.png"), roi_img)
|
||||
cv2.imwrite(os.path.join(debug_dir, base + "_mask.png"), roi_mask)
|
||||
cv2.imwrite(os.path.join(debug_dir, base + "_soft.png"), roi_soft)
|
||||
cv2.imwrite(os.path.join(debug_dir, base + "_dst.png"), dst_roi)
|
||||
save_idx += 1
|
||||
|
||||
return out
|
||||
|
|
@ -1,917 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
통합 인페인팅 모듈
|
||||
- OpenCV 텍스트 최적화 인페인트
|
||||
- SimpleLama (PyTorch)
|
||||
- LaMa ONNX (Hugging Face: opencv/inpainting_lama_2025jan.onnx)
|
||||
- MiGAN / EdgeConnect 어댑터 자리 마련
|
||||
|
||||
사용 예:
|
||||
from worker.inpaint_module import Inpainter, InpaintBackends
|
||||
inp = Inpainter(default_backend=InpaintBackends.LAMA_TORCH,
|
||||
lama_device="cuda",
|
||||
lama_onnx_path="/app/worker/models/inpainting_lama_2025jan.onnx")
|
||||
out = inp.inpaint(img_bgr, [poly1, poly2, ...], backend=None, max_side=1024,
|
||||
auto_opencv_if_few=True, few_threshold=4)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import os
|
||||
import cv2
|
||||
import numpy as np
|
||||
from typing import Dict, Any, List, Tuple, Optional
|
||||
from PIL import Image
|
||||
import threading
|
||||
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# 백엔드 식별자
|
||||
# ───────────────────────────────────────────────
|
||||
class InpaintBackends:
|
||||
OPENCV = "opencv"
|
||||
LAMA_TORCH = "lama_torch"
|
||||
LAMA_ONNX_FD = "lama_onnx_fd" # FastDeploy 기반
|
||||
LAMA_ONNX_ORT = "lama_onnx_ort" # 순수 onnxruntime 기반
|
||||
MIGAN = "migan" # placeholder
|
||||
EDGECONNECT = "edgeconnect" # placeholder
|
||||
LAMA_TORCH_AMP = "lama_torch_amp" # 패치한 파일 위치 그대로
|
||||
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# 유틸
|
||||
# ───────────────────────────────────────────────
|
||||
def _log(logger, msg, level=20):
|
||||
"""logger가 있으면 logger.log로, 없으면 print"""
|
||||
if logger and hasattr(logger, "log"):
|
||||
logger.log(msg, level=level)
|
||||
else:
|
||||
print(msg)
|
||||
|
||||
def polygons_to_mask(shape: Tuple[int,int], polygons: List[List[List[int]]]) -> np.ndarray:
|
||||
"""폴리곤 리스트 -> 단일 바이너리 마스크(0/255)"""
|
||||
h, w = shape
|
||||
mask = np.zeros((h, w), dtype=np.uint8)
|
||||
for poly in polygons:
|
||||
pts = np.array(poly, dtype=np.int32).reshape(-1, 2)
|
||||
cv2.fillPoly(mask, [pts], 255)
|
||||
return mask
|
||||
|
||||
def union_bbox_of_mask(mask: np.ndarray, pad_ratio: float = 0.1) -> Tuple[int,int,int,int]:
|
||||
"""마스크의 합집합 영역 bbox + 패딩"""
|
||||
ys, xs = np.where(mask > 0)
|
||||
if len(xs) == 0:
|
||||
return 0,0,mask.shape[1],mask.shape[0]
|
||||
x, y = int(xs.min()), int(ys.min())
|
||||
w, h = int(xs.max()-xs.min()+1), int(ys.max()-ys.min()+1)
|
||||
pad = int(max(w,h) * pad_ratio)
|
||||
x0 = max(0, x - pad); y0 = max(0, y - pad)
|
||||
x1 = min(mask.shape[1], x + w + pad)
|
||||
y1 = min(mask.shape[0], y + h + pad)
|
||||
return x0, y0, x1 - x0, y1 - y0
|
||||
|
||||
def resize_long_side(img: np.ndarray, max_side: int) -> Tuple[np.ndarray, float]:
|
||||
"""가장 긴 변을 max_side로 맞춰 축소(확대 안함) + scale 반환"""
|
||||
h, w = img.shape[:2]
|
||||
if max(h, w) <= max_side:
|
||||
return img, 1.0
|
||||
if h >= w:
|
||||
scale = max_side / float(h)
|
||||
nh, nw = max_side, int(w * scale)
|
||||
else:
|
||||
scale = max_side / float(w)
|
||||
nw, nh = max_side, int(h * scale)
|
||||
out = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_AREA)
|
||||
return out, scale
|
||||
|
||||
def _next_pow2(n: int) -> int:
|
||||
return 1 if n <= 1 else 1 << (n - 1).bit_length()
|
||||
|
||||
def _reflect_pad_to(img: np.ndarray, target_h: int, target_w: int) -> Tuple[np.ndarray, Tuple[int,int,int,int]]:
|
||||
import cv2, numpy as np
|
||||
h, w = img.shape[:2]
|
||||
top = max(0, (target_h - h) // 2)
|
||||
bottom = max(0, target_h - h - top)
|
||||
left = max(0, (target_w - w) // 2)
|
||||
right = max(0, target_w - w - left)
|
||||
if top or bottom or left or right:
|
||||
img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_REFLECT_101)
|
||||
return img, (top, bottom, left, right)
|
||||
|
||||
def _crop_by_pad(img: np.ndarray, pad: Tuple[int,int,int,int]) -> np.ndarray:
|
||||
top, bottom, left, right = pad
|
||||
if not (top or bottom or left or right):
|
||||
return img
|
||||
h, w = img.shape[:2]
|
||||
return img[top:h-bottom, left:w-right]
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# OpenCV 텍스트 특화 인페인트
|
||||
# ───────────────────────────────────────────────
|
||||
def _opencv_text_inpaint(img_bgr: np.ndarray, mask: np.ndarray,
|
||||
small_radius: int = 3, large_radius: int = 7,
|
||||
dilate_px: int = 2, smooth_kernel: int = 3) -> np.ndarray:
|
||||
"""
|
||||
텍스트 제거 최적화:
|
||||
1) 마스크 소폭 팽창 → 글자 테두리까지 포함
|
||||
2) TELEA/r=3 1차 인페인트
|
||||
3) 잔여 노이즈만 r=7로 2차 인페인트
|
||||
4) 경계 feathering(가벼운 블렌딩)
|
||||
"""
|
||||
h, w = mask.shape[:2]
|
||||
dil_k = max(0, int(dilate_px))
|
||||
if dil_k > 0:
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2*dil_k+1, 2*dil_k+1))
|
||||
mask1 = cv2.dilate(mask, kernel, iterations=1)
|
||||
else:
|
||||
mask1 = mask.copy()
|
||||
|
||||
out1 = cv2.inpaint(img_bgr, mask1, small_radius, cv2.INPAINT_TELEA)
|
||||
|
||||
# 남은 영역만 큰 반경으로 한 번 더
|
||||
remain = (mask1 > 0) & (np.abs(out1.astype(np.int16) - img_bgr.astype(np.int16)).max(axis=2) > 3)
|
||||
if remain.any():
|
||||
mask2 = (remain.astype(np.uint8) * 255)
|
||||
out2 = cv2.inpaint(out1, mask2, large_radius, cv2.INPAINT_TELEA)
|
||||
else:
|
||||
out2 = out1
|
||||
|
||||
# Feathering
|
||||
k = (smooth_kernel | 1)
|
||||
blur = cv2.GaussianBlur(mask1, (k, k), 0)
|
||||
alpha = (blur.astype(np.float32) / 255.0)[..., None]
|
||||
blended = (alpha * out2.astype(np.float32) + (1 - alpha) * img_bgr.astype(np.float32)).astype(np.uint8)
|
||||
return blended
|
||||
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# SimpleLama (PyTorch) 어댑터
|
||||
# ───────────────────────────────────────────────
|
||||
# _HAVE_LAMA_TORCH = False
|
||||
# try:
|
||||
# from simple_lama_inpainting.models.model import SimpleLama
|
||||
# _HAVE_LAMA_TORCH = True
|
||||
# except Exception:
|
||||
# _HAVE_LAMA_TORCH = False
|
||||
|
||||
_HAVE_LAMA_TORCH = False
|
||||
try:
|
||||
# 패치한 파일 위치 그대로
|
||||
from simple_lama_inpainting.models.model import SimpleLama
|
||||
_HAVE_LAMA_TORCH = True
|
||||
except Exception:
|
||||
_HAVE_LAMA_TORCH = False
|
||||
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# LaMa ONNX 어댑터 (opencv/inpainting_lama)
|
||||
# ───────────────────────────────────────────────
|
||||
class LamaOnnxORT:
|
||||
def __init__(self, model_path: str, logger=None, providers=None, backend_hint: Optional[str]=None):
|
||||
import os, onnxruntime as ort
|
||||
|
||||
self._log = (lambda m: logger.log(m) if logger and hasattr(logger, "log") else print)
|
||||
|
||||
# 세션 옵션 (원하면 스레드/그래프옵트 추가)
|
||||
so = ort.SessionOptions()
|
||||
so.log_severity_level = 2
|
||||
# so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
|
||||
# so.enable_mem_pattern = False
|
||||
|
||||
avail = ort.get_available_providers() # ['TensorrtExecutionProvider','CUDAExecutionProvider','CPUExecutionProvider'] 기대
|
||||
self._log(f"[ORT] available providers={avail}")
|
||||
|
||||
def _truthy(s: str) -> bool:
|
||||
return str(s).lower() in ("1", "true", "yes", "on")
|
||||
|
||||
# ── provider 리스트 구성 (우선순위: TRT → CUDA → CPU)
|
||||
if providers is None:
|
||||
# hint 강제
|
||||
if backend_hint and backend_hint.lower() == "cpu":
|
||||
providers = ["CPUExecutionProvider"]
|
||||
else:
|
||||
providers = []
|
||||
|
||||
# Tensorrt EP (있으면 최우선)
|
||||
if "TensorrtExecutionProvider" in avail:
|
||||
# env 기반 옵션 주입
|
||||
trt_opts = {
|
||||
"trt_engine_cache_enable": _truthy(os.getenv("ORT_TENSORRT_ENGINE_CACHE_ENABLE", "1")),
|
||||
"trt_engine_cache_path": os.getenv("ORT_TENSORRT_CACHE_PATH", "/app/trt_cache"),
|
||||
"trt_fp16_enable": _truthy(os.getenv("ORT_TENSORRT_FP16_ENABLE", "1")),
|
||||
}
|
||||
# 워크스페이스 (기본 1GB)
|
||||
try:
|
||||
trt_opts["trt_max_workspace_size"] = int(os.getenv("ORT_TENSORRT_MAX_WORKSPACE_SIZE", str(1 << 30)))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 캐시 디렉토리 보장
|
||||
try:
|
||||
os.makedirs(trt_opts["trt_engine_cache_path"], exist_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
providers.append(("TensorrtExecutionProvider", trt_opts))
|
||||
|
||||
# CUDA EP
|
||||
if "CUDAExecutionProvider" in avail and (not backend_hint or backend_hint.lower() in ("cuda", "gpu")):
|
||||
cuda_opts = {
|
||||
# 선택 옵션들 — 버전에 따라 무시될 수 있음
|
||||
"cudnn_conv_use_max_workspace": "1",
|
||||
# "do_copy_in_default_stream": "1",
|
||||
}
|
||||
providers.append(("CUDAExecutionProvider", cuda_opts))
|
||||
|
||||
# CPU EP (항상 폴백)
|
||||
providers.append("CPUExecutionProvider")
|
||||
|
||||
self._log(f"[ORT] providers={providers}")
|
||||
self.sess = ort.InferenceSession(model_path, sess_options=so, providers=providers)
|
||||
|
||||
# IO 이름 로깅
|
||||
self.input_name = self.sess.get_inputs()[0].name
|
||||
self.output_name = self.sess.get_outputs()[0].name
|
||||
self._log(f"[ORT] io: in={self.input_name}, out={self.output_name}")
|
||||
|
||||
def infer(self, img_bgr: np.ndarray, mask_gray: np.ndarray) -> np.ndarray:
|
||||
import numpy as np, cv2
|
||||
H, W = img_bgr.shape[:2]
|
||||
target = 512
|
||||
need_resize = (H != target or W != target)
|
||||
if need_resize:
|
||||
img = cv2.resize(img_bgr, (target, target), cv2.INTER_AREA)
|
||||
msk = cv2.resize(mask_gray, (target, target), cv2.INTER_NEAREST)
|
||||
else:
|
||||
img, msk = img_bgr, mask_gray
|
||||
|
||||
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
|
||||
m = (msk.astype(np.float32) / 255.0)[..., None]
|
||||
inp = np.concatenate([img_rgb, m], axis=2) # H,W,4
|
||||
blob = np.transpose(inp, (2,0,1))[None, ...] # 1,4,H,W
|
||||
out = self.sess.run([self.output_name], {self.input_name: blob})[0] # 1,3,H,W
|
||||
out_rgb = np.transpose(out[0], (1,2,0))
|
||||
out_rgb = np.clip(out_rgb, 0.0, 1.0)
|
||||
out_bgr = cv2.cvtColor((out_rgb * 255.0).astype(np.uint8), cv2.COLOR_RGB2BGR)
|
||||
if need_resize:
|
||||
out_bgr = cv2.resize(out_bgr, (W, H), cv2.INTER_CUBIC)
|
||||
return out_bgr
|
||||
|
||||
def _log(self, msg):
|
||||
if self.logger and hasattr(self.logger, "log"):
|
||||
self.logger.log(msg)
|
||||
else:
|
||||
print(msg)
|
||||
|
||||
# 추가: FastDeploy 기반 ONNX LaMa
|
||||
class LamaOnnxFD:
|
||||
"""
|
||||
FastDeploy Runtime으로 inpainting_lama_2025jan.onnx 실행
|
||||
- model_path: ONNX 경로
|
||||
- device: "gpu" or "cpu"
|
||||
- device_id: GPU index
|
||||
- backend: "ort" | "trt" (기본 ort)
|
||||
"""
|
||||
def __init__(self, model_path: str,
|
||||
device: str = "gpu",
|
||||
device_id: int = 0,
|
||||
backend: str = "ort",
|
||||
logger=None):
|
||||
print("LamaOnnxFD init")
|
||||
import fastdeploy as fd
|
||||
self.fd = fd
|
||||
self.logger = logger
|
||||
self.model_path = model_path
|
||||
|
||||
opt = fd.RuntimeOption()
|
||||
if device.lower() == "gpu":
|
||||
opt.use_gpu(device_id)
|
||||
# 백엔드 선택
|
||||
try:
|
||||
if backend.lower() == "trt":
|
||||
opt.use_trt_backend()
|
||||
# 필요시 워크스페이스/FP16 설정
|
||||
opt.trt_option.enable_fp16 = True
|
||||
opt.trt_option.max_workspace_size = 1 << 28 # 256MB
|
||||
try:
|
||||
print("TRT 프로필 설정 시작")
|
||||
# 1) 입력 이름 알아내기
|
||||
in_infos = None
|
||||
try:
|
||||
tmp_rt = self.fd.Runtime(opt)
|
||||
in_infos = tmp_rt.get_input_info()
|
||||
del tmp_rt
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 2) 이름 모르면 첫 번째 입력을 "input" 가정 (나중에 로그로 확인)
|
||||
input_names = [x.name for x in in_infos] if in_infos else ["input"]
|
||||
|
||||
# 3) min/opt/max 프로필 등록 (예: 256~1024 사이 허용)
|
||||
min_hw, opt_hw, max_hw = 256, 512, 1024
|
||||
for name in input_names:
|
||||
# N,C,H,W = 1,4,*
|
||||
opt.set_trt_input_shape(name,
|
||||
min_shape=[1, 4, min_hw, min_hw],
|
||||
opt_shape=[1, 4, opt_hw, opt_hw],
|
||||
max_shape=[1, 4, max_hw, max_hw]
|
||||
)
|
||||
except Exception:
|
||||
print("TRT 프로필 설정 실패")
|
||||
|
||||
try:
|
||||
print("TRT 런타임 생성 시작")
|
||||
self.runtime = self.fd.Runtime(opt)
|
||||
in_infos = self.runtime.get_input_info()
|
||||
out_infos = self.runtime.get_output_info()
|
||||
self._log(f"[TRT] inputs={[ (i.name, i.shape) for i in in_infos ]}")
|
||||
self._log(f"[TRT] outputs={[ (o.name, o.shape) for o in out_infos ]}")
|
||||
except Exception as e:
|
||||
self._log(f"[TRT] engine build failed: {e}")
|
||||
# 안전하게 ORT fallback
|
||||
opt_fallback = self.fd.RuntimeOption()
|
||||
opt_fallback.use_ort_backend(); opt_fallback.use_gpu(device_id)
|
||||
opt_fallback.set_model_path(model_path, model_format=self.fd.ModelFormat.ONNX)
|
||||
self.runtime = self.fd.Runtime(opt_fallback)
|
||||
self._log("[TRT] Fallback to ORT GPU")
|
||||
print("TRT 런타임 생성 실패")
|
||||
|
||||
elif backend.lower() == "cuda":
|
||||
opt.use_ort_backend()
|
||||
opt.use_gpu(device_id)
|
||||
elif backend.lower() == "cpu":
|
||||
opt.use_ort_backend()
|
||||
opt.use_cpu()
|
||||
else: # "ort"
|
||||
opt.use_ort_backend()
|
||||
opt.use_gpu(device_id) # GPU ORT
|
||||
except Exception as e:
|
||||
self._log(f"[LaMa-ONNX-FD] backend init failed ({backend}), fallback to ORT: {e}")
|
||||
opt = self.fd.RuntimeOption()
|
||||
opt.use_ort_backend(); opt.use_gpu(device_id)
|
||||
|
||||
else:
|
||||
opt.use_cpu()
|
||||
opt.use_ort_backend()
|
||||
opt.set_cpu_thread_num(2)
|
||||
|
||||
# ONNX 모델 지정
|
||||
opt.set_model_path(model_path, model_format=self.fd.ModelFormat.ONNX)
|
||||
|
||||
# Runtime 생성
|
||||
self.runtime = self.fd.Runtime(opt)
|
||||
|
||||
# 입력/출력 메타 확인해두면 디버깅 쉬움
|
||||
try:
|
||||
in_infos = self.runtime.get_input_info()
|
||||
out_infos = self.runtime.get_output_info()
|
||||
names_in = [x.name for x in in_infos]
|
||||
names_out = [x.name for x in out_infos]
|
||||
self._log(f"[LaMa-ONNX-FD] inputs={names_in} outputs={names_out}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _log(self, msg):
|
||||
if self.logger and hasattr(self.logger, "log"):
|
||||
self.logger.log(msg)
|
||||
else:
|
||||
print(msg)
|
||||
|
||||
def infer(self, img_bgr, mask_gray):
|
||||
import numpy as np, cv2
|
||||
H, W = img_bgr.shape[:2]
|
||||
|
||||
# 모델이 자유 크기 지원이면 그대로, 아니면 512 정사각으로
|
||||
target = 512
|
||||
need_resize = not (H == target and W == target)
|
||||
if need_resize:
|
||||
img_resized = cv2.resize(img_bgr, (target, target), interpolation=cv2.INTER_AREA)
|
||||
mask_resized = cv2.resize(mask_gray, (target, target), interpolation=cv2.INTER_NEAREST)
|
||||
else:
|
||||
img_resized, mask_resized = img_bgr, mask_gray
|
||||
|
||||
# 전처리: BGR->RGB, [0,1], mask → [0,1], 채널 concat (img 3ch + mask 1ch = 4ch)
|
||||
img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
|
||||
m = (mask_resized.astype(np.float32) / 255.0)[..., None]
|
||||
inp = np.concatenate([img_rgb, m], axis=2) # H,W,4
|
||||
blob = np.transpose(inp, (2,0,1))[None, ...].astype(np.float32) # 1,4,H,W
|
||||
|
||||
# 추론
|
||||
outputs = self.runtime.infer([blob])
|
||||
out = outputs[0] # numpy array, shape (1,3,H,W) 예상
|
||||
out_rgb = np.transpose(out[0], (1,2,0))
|
||||
out_rgb = np.clip(out_rgb, 0.0, 1.0)
|
||||
out_bgr = cv2.cvtColor((out_rgb * 255.0).astype(np.uint8), cv2.COLOR_RGB2BGR)
|
||||
|
||||
if need_resize:
|
||||
out_bgr = cv2.resize(out_bgr, (W, H), interpolation=cv2.INTER_CUBIC)
|
||||
return out_bgr
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# 메인 Inpainter
|
||||
# ───────────────────────────────────────────────
|
||||
from PIL import Image
|
||||
import threading
|
||||
|
||||
_SIMPLE_LAMA_SINGLETON = None
|
||||
class Inpainter:
|
||||
_lock = threading.Lock() # 내부 초기화 경쟁 방지
|
||||
|
||||
|
||||
def __init__(self,
|
||||
logger=None,
|
||||
default_backend: str = InpaintBackends.LAMA_TORCH,
|
||||
lama_device: str = "cuda",
|
||||
lama_onnx_ort_path: Optional[str] = None,
|
||||
lama_onnx_ort_providers: Optional[list] = None,
|
||||
lama_onnx_fd_path: Optional[str] = None,
|
||||
lama_onnx_fd_device: str = "gpu",
|
||||
lama_onnx_fd_device_id: int = 0,
|
||||
lama_onnx_fd_backend: str = "ort"):
|
||||
print("Inpainter init")
|
||||
self.logger = logger
|
||||
self.default_backend = (default_backend or InpaintBackends.LAMA_TORCH).lower()
|
||||
self.lama_device = lama_device
|
||||
|
||||
# self.lama_onnx_ort_path = lama_onnx_ort_path or os.getenv("INPAINT_LAMA_ONNX", "/app/worker/models/inpainting_lama_2025jan.onnx")
|
||||
self.lama_onnx_ort_path = lama_onnx_ort_path or os.getenv("INPAINT_LAMA_ONNX", "/app/worker/models/lama_fp32.onnx")
|
||||
self.lama_onnx_ort_providers = lama_onnx_ort_providers
|
||||
self._lama_onnx_ort = None
|
||||
|
||||
self._lama_torch = None
|
||||
# self.lama_onnx_fd_path = lama_onnx_fd_path or os.getenv("INPAINT_LAMA_ONNX", "/app/worker/models/inpainting_lama_2025jan.onnx")
|
||||
self.lama_onnx_fd_path = lama_onnx_fd_path or os.getenv("INPAINT_LAMA_ONNX", "/app/worker/models/lama_fp32.onnx")
|
||||
self.lama_onnx_fd_device = lama_onnx_fd_device
|
||||
self.lama_onnx_fd_device_id = lama_onnx_fd_device_id
|
||||
self.lama_onnx_fd_backend = lama_onnx_fd_backend
|
||||
|
||||
self._lama_onnx_fd = None
|
||||
|
||||
self._lama_torch_amp = None # ⬅️ 추가
|
||||
|
||||
self._log(f"Inpainter init: default={self.default_backend}")
|
||||
|
||||
# 통일 로그
|
||||
def _log(self, msg):
|
||||
if self.logger and hasattr(self.logger, "log"): self.logger.log(msg)
|
||||
else: print(msg)
|
||||
|
||||
|
||||
# ── 백엔드별 lazy 생성 (스레드 세이프)
|
||||
def _get_lama_onnx_ort(self, backend_hint: Optional[str] = None) -> LamaOnnxORT:
|
||||
if self._lama_onnx_ort is None:
|
||||
with self._lock:
|
||||
if self._lama_onnx_ort is None:
|
||||
self._log("[Init] LamaOnnxORT")
|
||||
self._lama_onnx_ort = LamaOnnxORT(
|
||||
model_path=self.lama_onnx_ort_path,
|
||||
logger=self.logger,
|
||||
providers=self.lama_onnx_ort_providers,
|
||||
backend_hint=backend_hint
|
||||
)
|
||||
return self._lama_onnx_ort
|
||||
|
||||
def _get_lama_onnx_fd(self):
|
||||
if self._lama_onnx_fd is None:
|
||||
with self._lock:
|
||||
if self._lama_onnx_fd is None:
|
||||
self._log("[Init] LamaOnnxFD")
|
||||
self._lama_onnx_fd = LamaOnnxFD(
|
||||
model_path=self.lama_onnx_fd_path,
|
||||
backend=self.lama_onnx_fd_backend,
|
||||
device="gpu", device_id=0, logger=self.logger
|
||||
)
|
||||
return self._lama_onnx_fd
|
||||
|
||||
def _get_lama_torch(self):
|
||||
if self._lama_torch is None:
|
||||
with self._lock:
|
||||
if self._lama_torch is None:
|
||||
self._log("[Init] SimpleLaMa (torch)")
|
||||
# from simple_lama_inpainting.models.model import SimpleLama
|
||||
# self._lama_torch = SimpleLama(device=self.lama_device)
|
||||
self._lama_torch = self.get_simple_lama(device=self.lama_device)
|
||||
return self._lama_torch
|
||||
|
||||
@staticmethod
|
||||
def get_simple_lama(device="cuda"):
|
||||
global _SIMPLE_LAMA_SINGLETON
|
||||
if _SIMPLE_LAMA_SINGLETON is None:
|
||||
# 캐시 폴더 고정 (있으면 유지)
|
||||
torch_home = "/app/torch_cache"
|
||||
os.makedirs(torch_home, exist_ok=True)
|
||||
os.environ.setdefault("TORCH_HOME", torch_home)
|
||||
|
||||
# (선택) 네가 fp16 체크포인트를 이 경로로 마운트해두면,
|
||||
# 컨테이너 환경변수 또는 여기에서 직접 지정 가능
|
||||
# 예) os.environ.setdefault("LAMA_MODEL", "/app/torch_cache/Big-LaMa.fp16.pt")
|
||||
|
||||
# 순서 힌트가 필요하면(보통 필요 없음): image_first | mask_first
|
||||
# os.environ.setdefault("SIMPLE_LAMA_JIT_ORDER", "mask_first")
|
||||
|
||||
# 디버그(형상/순서 로그): "1"로 켜기
|
||||
# os.environ.setdefault("SIMPLE_LAMA_DEBUG_SHAPES", "0")
|
||||
|
||||
# 패치된 SimpleLama는 내부에서 FP16/순서 자동 처리
|
||||
m = SimpleLama(device=torch.device(device if device != "gpu" else "cuda"))
|
||||
_SIMPLE_LAMA_SINGLETON = m
|
||||
return _SIMPLE_LAMA_SINGLETON
|
||||
|
||||
def _get_lama_torch_amp(self):
|
||||
if self._lama_torch_amp is None:
|
||||
with self._lock:
|
||||
if self._lama_torch_amp is None:
|
||||
self._log("[Init] SimpleLaMa (torch AMP)")
|
||||
# ckpt는 환경변수 SIMPLE_LAMA_CKPT 또는 simple-lama 기본 URL 자동 다운로드
|
||||
self._lama_torch_amp = LamaTorchAMP(device=self.lama_device)
|
||||
return self._lama_torch_amp
|
||||
|
||||
# ── Public API
|
||||
def inpaint(self,
|
||||
image_bgr: np.ndarray,
|
||||
polygons: List[List[List[int]]],
|
||||
*,
|
||||
backend: Optional[str] = None,
|
||||
max_side: int = 1024,
|
||||
auto_opencv_if_few: bool = True,
|
||||
few_threshold: int = 4,
|
||||
backend_hint: Optional[str] = None) -> np.ndarray:
|
||||
"""
|
||||
Args:
|
||||
img_bgr: 원본 BGR 이미지 (H,W,3)
|
||||
polygons: [[ [x,y], [x,y], ... ], ...]
|
||||
backend: 명시 시 강제 사용, None이면 default_backend
|
||||
max_side: ROI 다운스케일 상한 (VRAM/속도 절충)
|
||||
auto_opencv_if_few: 텍스트 박스가 적으면 OpenCV로 자동 전환
|
||||
few_threshold: '적다'의 기준 (기본 4)
|
||||
"""
|
||||
|
||||
backend = backend or self.default_backend
|
||||
|
||||
# 1) 폴리곤 → 마스크
|
||||
mask = np.zeros(image_bgr.shape[:2], np.uint8)
|
||||
for poly in polygons:
|
||||
pts = np.array(poly, dtype=np.int32)
|
||||
cv2.fillPoly(mask, [pts], color=255)
|
||||
|
||||
# 2) 마스크 수가 적으면 OpenCV로 빠르게
|
||||
cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
if auto_opencv_if_few and len(cnts) <= few_threshold and backend != InpaintBackends.OPENCV:
|
||||
backend = InpaintBackends.OPENCV
|
||||
|
||||
# 3) ROI + 다운스케일
|
||||
ys, xs = np.where(mask > 0)
|
||||
if len(xs) == 0 or len(ys) == 0:
|
||||
return image_bgr
|
||||
x1, x2, y1, y2 = xs.min(), xs.max(), ys.min(), ys.max()
|
||||
roi_img = image_bgr[y1:y2+1, x1:x2+1]
|
||||
roi_mask = mask[y1:y2+1, x1:x2+1]
|
||||
h, w = roi_img.shape[:2]
|
||||
if max(h, w) > max_side:
|
||||
scale = max_side / float(max(h, w))
|
||||
roi_img_small = cv2.resize(roi_img, (int(w*scale), int(h*scale)), cv2.INTER_AREA)
|
||||
roi_mask_small = cv2.resize(roi_mask, (int(w*scale), int(h*scale)), cv2.INTER_NEAREST)
|
||||
else:
|
||||
roi_img_small, roi_mask_small = roi_img, roi_mask
|
||||
|
||||
# 4) 백엔드 호출
|
||||
if backend == InpaintBackends.OPENCV:
|
||||
out_small = cv2.inpaint(roi_img_small, roi_mask_small, 3, cv2.INPAINT_TELEA)
|
||||
|
||||
elif backend == InpaintBackends.LAMA_TORCH:
|
||||
mdl = self._get_lama_torch()
|
||||
img_pil = Image.fromarray(cv2.cvtColor(roi_img_small, cv2.COLOR_BGR2RGB))
|
||||
msk_pil = Image.fromarray(roi_mask_small, "L") # 1채널 보장
|
||||
|
||||
# 패치된 SimpleLama가 FP16/순서/채널을 내부에서 처리
|
||||
out_pil = mdl(img_pil, msk_pil)
|
||||
|
||||
out_small = cv2.cvtColor(np.array(out_pil), cv2.COLOR_RGB2BGR)
|
||||
|
||||
elif backend == InpaintBackends.LAMA_TORCH_AMP:
|
||||
# SimpleLama(fp32 가중치) + autocast(fp16) + cuFFT pow2 안전 패딩
|
||||
mdl = self._get_lama_torch() # simple_lama_inpainting.models.model.SimpleLama (fp32)
|
||||
img_roi = roi_img_small
|
||||
msk_roi = roi_mask_small
|
||||
|
||||
# pow2 패딩(AMP에서 FFC/cuFFT 반쯤 쓰는 모델 보호)
|
||||
H, W = img_roi.shape[:2]
|
||||
th, tw = _next_pow2(H), _next_pow2(W)
|
||||
if (th % 8) != 0: th = ((th + 7) // 8) * 8
|
||||
if (tw % 8) != 0: tw = ((tw + 7) // 8) * 8
|
||||
|
||||
pad_info = (0,0,0,0)
|
||||
if (th, tw) != (H, W):
|
||||
img_roi, pad_info = _reflect_pad_to(img_roi, th, tw)
|
||||
msk_roi, _ = _reflect_pad_to(msk_roi, th, tw)
|
||||
|
||||
# PIL 변환
|
||||
img_pil = Image.fromarray(cv2.cvtColor(img_roi, cv2.COLOR_BGR2RGB))
|
||||
msk_pil = Image.fromarray(msk_roi, "L")
|
||||
|
||||
# AMP (모델은 fp32 유지, 연산은 자동 혼합정밀)
|
||||
import torch
|
||||
with torch.cuda.amp.autocast(enabled=(self.lama_device in ("cuda","gpu")), dtype=torch.float16):
|
||||
out_pil = mdl(img_pil, msk_pil)
|
||||
|
||||
out_small = cv2.cvtColor(np.array(out_pil), cv2.COLOR_RGB2BGR)
|
||||
|
||||
# 패딩 되돌리기
|
||||
if pad_info != (0,0,0,0):
|
||||
out_small = _crop_by_pad(out_small, pad_info)
|
||||
|
||||
|
||||
elif backend == InpaintBackends.LAMA_ONNX_FD:
|
||||
mdl = self._get_lama_onnx_fd()
|
||||
out_small = mdl.infer(roi_img_small, roi_mask_small)
|
||||
|
||||
elif backend == InpaintBackends.LAMA_ONNX_ORT:
|
||||
mdl = self._get_lama_onnx_ort(backend_hint=backend_hint) # "cuda"/"cpu" 힌트
|
||||
out_small = mdl.infer(roi_img_small, roi_mask_small)
|
||||
else:
|
||||
# 안전폴백
|
||||
out_small = cv2.inpaint(roi_img_small, roi_mask_small, 3, cv2.INPAINT_TELEA)
|
||||
|
||||
# 5) 업스케일 + 합성
|
||||
if out_small.shape[:2] != roi_img.shape[:2]:
|
||||
out_roi = cv2.resize(out_small, (roi_img.shape[1], roi_img.shape[0]), cv2.INTER_CUBIC)
|
||||
else:
|
||||
out_roi = out_small
|
||||
|
||||
result = image_bgr.copy()
|
||||
m = (roi_mask > 0)[:, :, None]
|
||||
result[y1:y2+1, x1:x2+1] = np.where(m, out_roi, roi_img)
|
||||
return result
|
||||
|
||||
import torch, numpy as np
|
||||
import torch.nn.functional as F
|
||||
from PIL import Image
|
||||
|
||||
class _SimpleLamaFPCompat:
|
||||
"""
|
||||
FP16 TorchScript / state_dict 체크포인트를 SimpleLama처럼 호출 가능하게 래핑.
|
||||
__call__(image_pil|ndarray, mask_pil|ndarray) -> PIL.Image
|
||||
- 가중치가 fp16이면 입력도 fp16으로 자동 캐스팅(AMP 포함)
|
||||
- JIT 빌드(enesmsahin big-lama JIT)는 (mask, image) 순서를 기대
|
||||
원본 SimpleLama는 (image, mask) 순서 → is_jit 플래그로 분기
|
||||
"""
|
||||
def __init__(self, model, device="cuda", is_jit=True, is_fp16=True):
|
||||
self.model = model.eval()
|
||||
self.device = torch.device("cuda" if device in ("cuda","gpu") else device)
|
||||
self.is_jit = is_jit
|
||||
self.is_fp16 = is_fp16
|
||||
self.model.to(self.device)
|
||||
if self.is_fp16:
|
||||
self.model.half()
|
||||
|
||||
@classmethod
|
||||
def load(cls, ckpt_path: str, device="cuda"):
|
||||
# 1) TorchScript 시도
|
||||
try:
|
||||
m = torch.jit.load(ckpt_path, map_location="cpu")
|
||||
# fp16 여부 대략 추정 (파라미터가 없으면 fp16 JIT로 가정)
|
||||
is_fp16 = True
|
||||
try:
|
||||
p = next(m.parameters())
|
||||
is_fp16 = (p.dtype == torch.float16)
|
||||
except StopIteration:
|
||||
pass
|
||||
return cls(m, device=device, is_jit=True, is_fp16=is_fp16)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 2) state_dict 시도 (원 SimpleLama 구조 필요)
|
||||
from simple_lama_inpainting import SimpleLama
|
||||
base = SimpleLama(device="cpu")
|
||||
sd = torch.load(ckpt_path, map_location="cpu")
|
||||
core = getattr(base, "model", base)
|
||||
core.load_state_dict(sd, strict=False)
|
||||
is_fp16 = any(p.dtype == torch.float16 for p in core.parameters())
|
||||
return cls(base, device=device, is_jit=False, is_fp16=is_fp16)
|
||||
|
||||
# ---------- 유틸 ----------
|
||||
@staticmethod
|
||||
def _to_pil(x, mode=None):
|
||||
if isinstance(x, Image.Image):
|
||||
return x.convert(mode) if mode else x
|
||||
if isinstance(x, np.ndarray):
|
||||
if mode == "L":
|
||||
if x.ndim == 2:
|
||||
return Image.fromarray(x.astype(np.uint8), "L")
|
||||
return Image.fromarray(x[..., 0].astype(np.uint8), "L")
|
||||
if x.ndim == 3 and x.shape[2] == 3: # BGR -> RGB
|
||||
x = x[..., ::-1]
|
||||
return Image.fromarray(x.astype(np.uint8), "RGB")
|
||||
raise TypeError(f"Unsupported input type: {type(x)}")
|
||||
|
||||
@staticmethod
|
||||
def _to_numpy_rgb(img: Image.Image) -> np.ndarray:
|
||||
if img.mode != "RGB":
|
||||
img = img.convert("RGB")
|
||||
arr = np.asarray(img, dtype=np.uint8)
|
||||
if not arr.flags['C_CONTIGUOUS']:
|
||||
arr = np.ascontiguousarray(arr)
|
||||
return arr
|
||||
|
||||
@staticmethod
|
||||
def _to_numpy_mask1(mask: Image.Image) -> np.ndarray:
|
||||
if mask.mode != "L":
|
||||
mask = mask.convert("L")
|
||||
m = np.asarray(mask, dtype=np.uint8)
|
||||
if not m.flags['C_CONTIGUOUS']:
|
||||
m = np.ascontiguousarray(m)
|
||||
return (m > 127).astype(np.float32) # 0/1
|
||||
|
||||
@staticmethod
|
||||
def _pad8_reflect(t: torch.Tensor, target_dtype: torch.dtype):
|
||||
h, w = t.shape[-2:]
|
||||
nh = (h + 7) // 8 * 8
|
||||
nw = (w + 7) // 8 * 8
|
||||
if nh == h and nw == w:
|
||||
return t, (0,0,0,0)
|
||||
ph, pw = nh - h, nw - w
|
||||
t32 = t.to(torch.float32)
|
||||
t32 = F.pad(t32, (0, pw, 0, ph), mode="reflect") # reflect는 fp16 미지원 버전 존재
|
||||
return t32.to(target_dtype), (0, pw, 0, ph)
|
||||
|
||||
# ---------- 호출 ----------
|
||||
@torch.inference_mode()
|
||||
def __call__(self, image: Image.Image, mask: Image.Image) -> Image.Image:
|
||||
# 모델 dtype/디바이스
|
||||
try:
|
||||
p0 = next(self.model.parameters())
|
||||
target_dtype = p0.dtype
|
||||
device = p0.device
|
||||
except StopIteration:
|
||||
target_dtype = torch.float16 if self.device.type == "cuda" and self.is_fp16 else torch.float32
|
||||
device = self.device
|
||||
|
||||
# numpy → tensor
|
||||
img_np = self._to_numpy_rgb(self._to_pil(image, "RGB")) # H,W,3 uint8
|
||||
msk_np = self._to_numpy_mask1(self._to_pil(mask, "L")) # H,W float32 {0,1}
|
||||
|
||||
img_t = torch.from_numpy(img_np).permute(2,0,1).unsqueeze(0).to(device=device, dtype=torch.float32) / 255.0 # 1,3,H,W
|
||||
msk_t = torch.from_numpy(msk_np).unsqueeze(0).unsqueeze(0).to(device=device, dtype=torch.float32) # 1,1,H,W
|
||||
|
||||
# pad (fp32) → target dtype
|
||||
img_t, pad_hw = self._pad8_reflect(img_t, torch.float32)
|
||||
msk_t, _ = self._pad8_reflect(msk_t, torch.float32)
|
||||
img_t = img_t.to(dtype=target_dtype)
|
||||
msk_t = msk_t.to(dtype=target_dtype)
|
||||
|
||||
# 호출 순서 분기
|
||||
if self.is_jit:
|
||||
# JIT big-lama는 (mask, image) 순서
|
||||
out = self.model(msk_t, img_t)
|
||||
else:
|
||||
# 원 SimpleLama는 (image, mask) 순서
|
||||
out = self.model(img_t, msk_t)
|
||||
|
||||
# unpad 및 to PIL
|
||||
_, _, H, W = img_t.shape
|
||||
_, pw, _, ph = pad_hw
|
||||
if ph or pw:
|
||||
out = out[..., :H-ph, :W-pw]
|
||||
out = out.clamp(0, 1).to(torch.float32)
|
||||
out_np = (out[0].permute(1,2,0).cpu().numpy() * 255.0 + 0.5).astype(np.uint8)
|
||||
return Image.fromarray(out_np, "RGB")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
import os, torch, torch.nn.functional as F, numpy as np
|
||||
from PIL import Image
|
||||
|
||||
def _to_pil_rgb(x):
|
||||
if isinstance(x, Image.Image):
|
||||
return x.convert("RGB")
|
||||
if isinstance(x, np.ndarray):
|
||||
if x.ndim == 3 and x.shape[2] == 3:
|
||||
# BGR -> RGB
|
||||
x = x[..., ::-1]
|
||||
return Image.fromarray(x.astype(np.uint8)).convert("RGB")
|
||||
raise TypeError(f"unsupported image type: {type(x)}")
|
||||
|
||||
def _to_pil_maskL(x):
|
||||
if isinstance(x, Image.Image):
|
||||
return x.convert("L")
|
||||
if isinstance(x, np.ndarray):
|
||||
if x.ndim == 3:
|
||||
x = x[..., 0]
|
||||
return Image.fromarray(x.astype(np.uint8)).convert("L")
|
||||
raise TypeError(f"unsupported mask type: {type(x)}")
|
||||
|
||||
def _pad_mod8_reflect_nchw(t: torch.Tensor):
|
||||
# t: NCHW (float32)
|
||||
_, _, h, w = t.shape
|
||||
nh = (h + 7) // 8 * 8
|
||||
nw = (w + 7) // 8 * 8
|
||||
if nh == h and nw == w:
|
||||
return t, (0,0,0,0)
|
||||
ph, pw = nh - h, nw - w
|
||||
top = ph // 2; bottom = ph - top
|
||||
left = pw // 2; right = pw - left
|
||||
t32 = F.pad(t, (left, right, top, bottom), mode="reflect")
|
||||
return t32, (top, bottom, left, right)
|
||||
|
||||
def _crop_from_pad_nchw(t: torch.Tensor, pad):
|
||||
top, bottom, left, right = pad
|
||||
if top==bottom==left==right==0:
|
||||
return t
|
||||
return t[..., top:t.shape[-2]-bottom, left:t.shape[-1]-right]
|
||||
|
||||
def _detect_arg_order(script_module) -> str:
|
||||
"""
|
||||
TorchScript LaMa(JIT)의 forward 인자 순서 추정.
|
||||
- enesmsahin big-lama.pt: (mask, image) 가 일반적.
|
||||
- 안전하게 스키마/코드에서 먼저 감지, 실패 시 'mask_im' 기본.
|
||||
"""
|
||||
try:
|
||||
sch = str(getattr(script_module, "forward").schema).lower()
|
||||
if "tensor mask" in sch and "tensor image" in sch:
|
||||
return "mask_im" if sch.index("tensor mask") < sch.index("tensor image") else "im_mask"
|
||||
except Exception:
|
||||
pass
|
||||
code = getattr(script_module, "code", "")
|
||||
if isinstance(code, str):
|
||||
if "forward(mask, image" in code.replace(" ", ""):
|
||||
return "mask_im"
|
||||
if "forward(image, mask" in code.replace(" ", ""):
|
||||
return "im_mask"
|
||||
# 기본값
|
||||
return os.getenv("SIMPLE_LAMA_ARG_ORDER", "mask_im").lower()
|
||||
|
||||
class LamaTorchAMP:
|
||||
"""
|
||||
- 가중치: FP32 유지
|
||||
- 추론: torch.cuda.amp.autocast(dtype=torch.float16)
|
||||
- 입력: RGB/0..1, mask 1ch/0..1, NCHW, mod=8 reflect pad
|
||||
"""
|
||||
def __init__(self, device="cuda", ckpt_path: str|None=None):
|
||||
self.device = torch.device("cuda" if device in ("cuda","gpu") and torch.cuda.is_available() else "cpu")
|
||||
|
||||
# 체크포인트 경로: 우선순위 ENV → 인자 → simple-lama 기본 URL 다운로드
|
||||
if ckpt_path is None:
|
||||
ckpt_path = os.getenv("SIMPLE_LAMA_CKPT")
|
||||
if ckpt_path is None or not os.path.isfile(ckpt_path):
|
||||
# simple-lama의 다운로드 유틸 재사용
|
||||
from simple_lama_inpainting.utils.util import download_model
|
||||
from simple_lama_inpainting.models.model import LAMA_MODEL_URL
|
||||
ckpt_path = download_model(LAMA_MODEL_URL)
|
||||
|
||||
m = torch.jit.load(ckpt_path, map_location="cpu").eval()
|
||||
|
||||
|
||||
try:
|
||||
m = m.to(dtype=torch.float32)
|
||||
except Exception:
|
||||
# TorchScript에서 .to 실패하는 경우 수동 승격
|
||||
for p in m.parameters(recurse=True):
|
||||
if p.dtype != torch.float32:
|
||||
p.data = p.data.float()
|
||||
for b in m.buffers(recurse=True):
|
||||
if b.dtype != torch.float32:
|
||||
b.data = b.data.float()
|
||||
|
||||
m = m.to(self.device) # FP32 유지
|
||||
self.model = m
|
||||
self.device = torch.device(device if device != "gpu" else "cuda")
|
||||
self.arg_order = _detect_arg_order(m)
|
||||
if self.device.type == "cuda":
|
||||
torch.backends.cudnn.benchmark = True
|
||||
|
||||
@torch.inference_mode()
|
||||
def __call__(self, image: Image.Image|np.ndarray, mask: Image.Image|np.ndarray) -> Image.Image:
|
||||
im = _to_pil_rgb(image)
|
||||
mk = _to_pil_maskL(mask)
|
||||
|
||||
im_np = np.asarray(im, dtype=np.uint8)
|
||||
mk_np = np.asarray(mk, dtype=np.uint8)
|
||||
|
||||
im_t = torch.from_numpy(im_np).permute(2,0,1).unsqueeze(0).to(self.device, dtype=torch.float32) / 255.0 # 1,3,H,W
|
||||
mk_f = (mk_np > 127).astype(np.float32)
|
||||
mk_t = torch.from_numpy(mk_f).unsqueeze(0).unsqueeze(0).to(self.device, dtype=torch.float32) # 1,1,H,W
|
||||
|
||||
# mod=8 pad (float32에서)
|
||||
im_t, pad = _pad_mod8_reflect_nchw(im_t)
|
||||
mk_t, _ = _pad_mod8_reflect_nchw(mk_t)
|
||||
|
||||
# AMP 추론
|
||||
if self.device.type == "cuda":
|
||||
with torch.autocast(device_type="cuda", dtype=torch.float16):
|
||||
out = self.model(mk_t, im_t) if self.arg_order == "mask_im" else self.model(im_t, mk_t)
|
||||
else:
|
||||
out = self.model(mk_t, im_t) if self.arg_order == "mask_im" else self.model(im_t, mk_t)
|
||||
|
||||
out = out[0] if isinstance(out, (list, tuple)) else out # NCHW
|
||||
out = _crop_from_pad_nchw(out, pad).clamp(0,1).to(torch.float32)
|
||||
out_np = (out[0].permute(1,2,0).cpu().numpy() * 255.0 + 0.5).astype(np.uint8)
|
||||
return Image.fromarray(out_np, "RGB")
|
||||
|
|
@ -46,11 +46,16 @@ class MaskModule:
|
|||
ocr_results: List[Dict],
|
||||
expansion_size: int = 6,
|
||||
blur_size: int = 7,
|
||||
mask_option: str = "basic"
|
||||
mask_option: str = "basic",
|
||||
# 🔥 ROI 전용 옵션 추가
|
||||
for_roi_processing: bool = False
|
||||
) -> "np.ndarray | None":
|
||||
"""
|
||||
BGR ndarray와 OCR 결과를 직접 받아 마스크 np.ndarray 반환
|
||||
(디스크 I/O 없음)
|
||||
|
||||
Args:
|
||||
for_roi_processing: True면 순수 마스크만 생성 (후처리 없음)
|
||||
"""
|
||||
if image is None or image.size == 0:
|
||||
self.logger.error("ndarray 이미지가 비었습니다.")
|
||||
|
|
@ -63,9 +68,32 @@ class MaskModule:
|
|||
poly = res.get("polygon")
|
||||
if not poly:
|
||||
continue
|
||||
expanded = self.expand_polygon(poly, offset=5)
|
||||
# 🔥 ROI 처리용이면 적절한 확장 적용 (후처리 없는 대신 좀 더 확장)
|
||||
if for_roi_processing:
|
||||
expanded = self.expand_polygon(poly, offset=8) # 3 → 8로 증가
|
||||
else:
|
||||
expanded = self.expand_polygon(poly, offset=5)
|
||||
cv2.fillPoly(mask, [expanded], 255)
|
||||
|
||||
# 🔥 ROI 처리용이면 최소한의 후처리만 적용
|
||||
if for_roi_processing:
|
||||
# 🔥 강화된 후처리: 텍스트 잔상 방지
|
||||
kernel_small = np.ones((3, 3), np.uint8)
|
||||
kernel_large = np.ones((5, 5), np.uint8)
|
||||
|
||||
# 1단계: 작은 노이즈 제거
|
||||
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel_small)
|
||||
|
||||
# 2단계: 텍스트 경계 완전 커버 (강화된 팽창)
|
||||
mask = cv2.dilate(mask, kernel_large, iterations=1)
|
||||
|
||||
# 3단계: 추가 안전 마진
|
||||
mask = cv2.dilate(mask, kernel_small, iterations=1)
|
||||
|
||||
self.logger.log("🔧 ROI용 강화 마스크 생성 (잔상 방지 처리)", level=logging.INFO)
|
||||
return mask
|
||||
|
||||
# 기존 방식 (풀프레임용)
|
||||
processed_mask = self.process_mask(mask, expansion_size, blur_size)
|
||||
return processed_mask
|
||||
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue