ImageProcessor_MainServer/test/inpaint_module.py

242 lines
9.7 KiB
Python

# -*- coding: utf-8 -*-
"""
FastDeploy 기반 인페인팅 모듈
- 기본: FastDeploy + StableDiffusion Inpainting 파이프라인 사용
- 대체: FastDeploy 파이프라인이 없거나 모델이 준비되지 않은 경우 OpenCV 텔레아/나비에 방식으로 대체(옵션)
- 마스크는 흰색(255)이 인페인팅할 영역, 검정(0)이 유지할 영역을 사용
필수 의존성:
fastdeploy>=1.0
ppdiffusers>=0.6.3 (FastDeploy용 StableDiffusion 파이프라인)
Pillow, numpy, opencv-python
모델 준비:
base_dir/modules/FD_Inpaint/stable-diffusion-inpainting@fastdeploy/
├─ model_index.json
├─ scheduler/...
├─ tokenizer/...
├─ text_encoder/...
├─ unet/...
└─ vae_decoder/...
(PaddleNLP 문서의 stable-diffusion-inpainting@fastdeploy 모델 폴더 구조와 동일)
사용 예:
inpaint = InpaintModule(logger, base_dir)
out_img = inpaint.inpaint(image_path, mask_np, prompt="remove text", negative_prompt="text, letters", steps=30)
"""
import os
import logging
import numpy as np
import cv2
from typing import Optional, Union, Tuple
from PIL import Image
try:
import fastdeploy as fd # noqa: F401 # 실제 사용은 ppdiffusers 파이프라인 내부에서 함
except Exception:
fd = None
# ppdiffusers는 선택적으로 import (없으면 예외 처리)
_FD_PIPE_CLS = None
try:
import importlib
# 후보 클래스명 리스트(버전별로 이름이 바뀌는 경우 대비)
_CANDIDATE_CLASSES = [
"FastDeployStableDiffusionInpaintPipeline",
"FastDeployStableDiffusionInpaintingPipeline",
"FastDeployStableDiffusionInpaint",
]
_FD_PIPE_CLS = None
_ppd = importlib.import_module("ppdiffusers")
for _cls_name in _CANDIDATE_CLASSES:
_FD_PIPE_CLS = getattr(_ppd, _cls_name, None)
if _FD_PIPE_CLS is not None:
break
except Exception:
pass
class InpaintModule:
def __init__(self,
logger,
base_dir: str,
gpu_id: int = 0,
model_dir: Optional[str] = None,
backend: str = "fd_sd", # "fd_sd" | "opencv"
):
"""
:param logger: logger.log(msg, level=...) 형태 지원 객체
:param base_dir: 프로젝트 루트
:param gpu_id: 사용 GPU ID
:param model_dir: StableDiffusion Inpaint 모델 디렉토리 (None이면 기본 위치)
:param backend: "fd_sd" (FastDeploy StableDiffusion) 또는 "opencv" (fallback)
"""
self.logger = logger
self.base_dir = base_dir
self.gpu_id = gpu_id
self.backend = backend
# 모델 디렉토리 기본값
self.model_dir = model_dir or os.path.join(
self.base_dir, "modules", "FD_Inpaint", "stable-diffusion-inpainting@fastdeploy"
)
# 파이프라인 객체
self.pipe = None
if backend == "fd_sd":
self._init_fd_sd_pipeline()
else:
self._log("⚠️ FastDeploy 파이프라인을 사용하지 않고 OpenCV 인페인팅으로 동작합니다.", level=logging.WARNING)
# ------------------------ 내부 공통 로그 ------------------------
def _log(self, msg, level=logging.INFO, exc_info=False):
if hasattr(self.logger, "log"):
self.logger.log(msg, level=level, exc_info=exc_info)
else:
print(msg)
# ------------------------ FastDeploy StableDiffusion Inpaint 초기화 ------------------------
def _init_fd_sd_pipeline(self):
if _FD_PIPE_CLS is None:
self._log("❌ ppdiffusers의 FastDeploy Inpaint 파이프라인 클래스를 찾지 못했습니다. "
"ppdiffusers>=0.6.3 또는 FastDeploy diffusion 예제를 설치/업데이트 하세요.\n"
"임시로 OpenCV 인페인팅으로 fallback 합니다.", level=logging.ERROR)
self.backend = "opencv"
return
if not os.path.isdir(self.model_dir):
self._log(f"❌ 인페인팅 모델 디렉토리가 없습니다: {self.model_dir}\n"
"stable-diffusion-inpainting@fastdeploy 모델을 다운로드/배치하세요.\n"
"임시로 OpenCV 인페인팅으로 fallback 합니다.", level=logging.ERROR)
self.backend = "opencv"
return
try:
# GPU 설정은 ppdiffusers가 내부적으로 fastdeploy.RuntimeOption을 구성할 때 반영
self.pipe = _FD_PIPE_CLS.from_pretrained(
self.model_dir,
use_fp16=True,
device_id=self.gpu_id
)
self._log("✅ FastDeploy StableDiffusion Inpaint 파이프라인 초기화 완료", level=logging.INFO)
except Exception as e:
self._log(f"❌ FastDeploy Inpaint 파이프라인 초기화 실패: {e}\nOpenCV fallback으로 전환합니다.", level=logging.ERROR, exc_info=True)
self.backend = "opencv"
self.pipe = None
# ------------------------ 인페인팅 실행 ------------------------
def inpaint(self,
image_path: str,
mask: Union[np.ndarray, str],
prompt: str = "",
negative_prompt: Optional[str] = None,
steps: int = 30,
guidance_scale: float = 7.5,
strength: float = 1.0,
seed: Optional[int] = None,
cv_method: str = "telea", # fallback시 사용
) -> np.ndarray:
"""
:param image_path: 원본 이미지 경로
:param mask: np.uint8(0~255) 2D 마스크 or 마스크 파일 경로
:param prompt: 텍스트 프롬프트(StableDiffusion용)
:param negative_prompt: 네거티브 프롬프트
:param steps: 확산 스텝 수
:param guidance_scale: CFG 스케일
:param strength: 마스크 영역 재작성 강도(0~1)
:param seed: 랜덤시드
:param cv_method: "telea" | "ns" (OpenCV 알고리즘 선택)
:return: 인페인팅 결과 BGR np.ndarray
"""
if not os.path.isfile(image_path):
self._log(f"이미지를 찾을 수 없습니다: {image_path}", level=logging.ERROR)
return None
# 원본 이미지 로드 (BGR)
src_bgr = cv2.imread(image_path, cv2.IMREAD_COLOR)
if src_bgr is None:
self._log(f"이미지를 읽을 수 없습니다: {image_path}", level=logging.ERROR)
return None
# 마스크 준비 (흰색=수정, 검정=보존)
if isinstance(mask, str):
mask_np = cv2.imread(mask, cv2.IMREAD_GRAYSCALE)
if mask_np is None:
self._log(f"마스크 이미지를 읽을 수 없습니다: {mask}", level=logging.ERROR)
return None
else:
mask_np = mask.copy()
if mask_np.ndim == 3:
mask_np = cv2.cvtColor(mask_np, cv2.COLOR_BGR2GRAY)
mask_np = (mask_np > 0).astype(np.uint8) * 255
if self.backend == "fd_sd" and self.pipe is not None:
return self._run_fd_sd(src_bgr, mask_np, prompt, negative_prompt,
steps, guidance_scale, strength, seed)
else:
return self._run_opencv(src_bgr, mask_np, cv_method)
# ------------------------ FD StableDiffusion 실행부 ------------------------
def _run_fd_sd(self,
src_bgr: np.ndarray,
mask_np: np.ndarray,
prompt: str,
negative_prompt: Optional[str],
steps: int,
guidance_scale: float,
strength: float,
seed: Optional[int]) -> np.ndarray:
# PIL 형식 변환 (ppdiffusers는 RGB + mask를 PIL.Image로 받음)
src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(src_rgb)
pil_mask = Image.fromarray(mask_np).convert("L")
generator = None
if seed is not None:
import random
import numpy as np
import paddle
random.seed(seed)
np.random.seed(seed)
try:
paddle.seed(seed)
except Exception:
pass
try:
result = self.pipe(
prompt=prompt if prompt else "remove text, clean background",
image=pil_img,
mask_image=pil_mask,
negative_prompt=negative_prompt,
guidance_scale=guidance_scale,
num_inference_steps=steps,
strength=strength,
generator=generator
)
if hasattr(result, "images"):
out_pil = result.images[0]
else:
out_pil = result[0]
out_rgb = np.array(out_pil)
out_bgr = cv2.cvtColor(out_rgb, cv2.COLOR_RGB2BGR)
self._log("🧹 StableDiffusion 인페인팅 완료", level=logging.INFO)
return out_bgr
except Exception as e:
self._log(f"❌ StableDiffusion 인페인팅 실패: {e} -> OpenCV fallback", level=logging.ERROR, exc_info=True)
return self._run_opencv(src_bgr, mask_np, "telea")
# ------------------------ OpenCV Fallback ------------------------
def _run_opencv(self, src_bgr: np.ndarray, mask_np: np.ndarray, method: str = "telea") -> np.ndarray:
flag = cv2.INPAINT_TELEA if method.lower() == "telea" else cv2.INPAINT_NS
radius = 3
result = cv2.inpaint(src_bgr, mask_np, inpaintRadius=radius, flags=flag)
self._log(f"🧽 OpenCV inpaint({method}) 수행 완료", level=logging.INFO)
return result