858 lines
45 KiB
Python
858 lines
45 KiB
Python
import requests
|
|
import cv2
|
|
import base64
|
|
import numpy as np
|
|
import os
|
|
import logging
|
|
from PIL import Image
|
|
from .bria_background_removal_module import BriaBackgroundRemovalModule
|
|
|
|
|
|
class Request_AI_Server:
|
|
"""로컬 인페인팅/배경제거 어댑터
|
|
|
|
- 인페인팅: 로컬 MIGAN ONNX 파이프라인 사용
|
|
- 배경제거: 로컬 BriaAI RMBG 사용
|
|
|
|
원격 서버 연동 로직은 제거되었습니다.
|
|
"""
|
|
def __init__(self, logger,
|
|
gpu_manager=None,
|
|
local_rembg_model_path: str | None = None):
|
|
"""로컬 인페인팅/배경제거 초기화"""
|
|
self.logger = logger
|
|
self.gpu_manager = gpu_manager
|
|
|
|
inpaint_server_url = None
|
|
rembg_server_url = None
|
|
self.inpaint_base_url = ""
|
|
self.rembg_base_url = ""
|
|
self.local_rembg_model_path = local_rembg_model_path
|
|
|
|
self.inpaint_api_url = None
|
|
# RemoveBG 플러그인 엔드포인트(현재 미사용)
|
|
self.rembg_api_url = None
|
|
|
|
# 백업용 내장 rembg 모듈 (필요시에만 초기화)
|
|
self._backup_rembg = None
|
|
self._backup_rembg_error = None # 백업 모듈 초기화 오류 저장
|
|
|
|
# GPU 사용 가능 여부 로그
|
|
if self.gpu_manager:
|
|
gpu_status = self.gpu_manager.get_cuda_status()
|
|
self.logger.log(f"Request_AI_Server GPU 상태: CUDA 사용 가능={gpu_status['can_use_cuda']}", level=logging.DEBUG)
|
|
# MIGAN 파이프라인 (lazy)
|
|
self._migan = None
|
|
self._migan_error = None
|
|
|
|
def _get_backup_rembg(self):
|
|
"""백업용 내장 rembg 모듈을 lazy loading으로 초기화 (안전한 처리)"""
|
|
if self._backup_rembg is not None:
|
|
return self._backup_rembg
|
|
|
|
# 이미 초기화 실패한 경우
|
|
if self._backup_rembg_error is not None:
|
|
return None
|
|
|
|
try:
|
|
# BriaAI 배경제거 모듈 초기화
|
|
self._backup_rembg = BriaBackgroundRemovalModule(
|
|
logger=self.logger,
|
|
default_model="bria-rmbg-1.4",
|
|
gpu_manager=self.gpu_manager,
|
|
local_rembg_model_path=self.local_rembg_model_path
|
|
)
|
|
|
|
# 백업 모듈의 rembg 사용 가능성 확인
|
|
if not self._backup_rembg.is_available():
|
|
init_error = self._backup_rembg.get_init_error()
|
|
self._backup_rembg_error = f"백업 rembg 모듈 의존성 오류: {init_error}"
|
|
self.logger.log(self._backup_rembg_error, level=logging.ERROR)
|
|
self._backup_rembg = None
|
|
return None
|
|
|
|
self.logger.log("백업용 내장 rembg 모듈 초기화됨", level=logging.DEBUG)
|
|
return self._backup_rembg
|
|
|
|
except ImportError as e:
|
|
self._backup_rembg_error = f"백업용 rembg 모듈 import 실패: {e}"
|
|
self.logger.log(self._backup_rembg_error, level=logging.ERROR)
|
|
except Exception as e:
|
|
self._backup_rembg_error = f"백업용 rembg 모듈 초기화 오류: {e}"
|
|
self.logger.log(self._backup_rembg_error, level=logging.ERROR, exc_info=True)
|
|
|
|
return None
|
|
|
|
def request_external_inpaint(self, image: np.ndarray, mask: np.ndarray, server_url: str, invert_mask: bool = False, model_name: str = "simple-lama") -> np.ndarray:
|
|
"""외부 IOPaint 서버를 이용한 인페인팅 요청 (VIP용)
|
|
|
|
Args:
|
|
image: 이미지 데이터 (BGR np.ndarray) 또는 파일 경로
|
|
mask: 마스크 데이터 (0/255 uint8)
|
|
server_url: 외부 서버 URL (예: http://123.123.123.123:54321)
|
|
invert_mask: 마스크 반전 여부
|
|
model_name: 사용할 모델명
|
|
|
|
Returns:
|
|
인페인트된 이미지 (BGR np.ndarray) 또는 None
|
|
"""
|
|
try:
|
|
# URL 정리
|
|
base_url = server_url.rstrip('/')
|
|
api_url = f"{base_url}/api/v1/inpaint"
|
|
|
|
# 서버 상태 먼저 확인
|
|
if not self.is_server_alive(base_url):
|
|
self.logger.log(f"외부 인페인팅 서버({base_url})가 응답하지 않습니다.", level=logging.WARNING)
|
|
return None
|
|
|
|
image_data = None
|
|
# image가 경로(str)라면 파일을 읽어서 np.ndarray로 변환
|
|
if isinstance(image, str) and os.path.isfile(image):
|
|
image_data = cv2.imread(image)
|
|
elif isinstance(image, np.ndarray):
|
|
image_data = image
|
|
|
|
if image_data is None:
|
|
self.logger.log(f"이미지 데이터를 읽을 수 없습니다: {type(image)}", level=logging.ERROR)
|
|
return None
|
|
|
|
# ---- 마스크 정규화 (서버 호환성) ---------------------------------
|
|
# - 1채널 uint8, 0/255 이진 이미지 보장
|
|
try:
|
|
mask_norm = mask
|
|
if mask_norm is None:
|
|
self.logger.log("마스크가 None입니다", level=logging.ERROR)
|
|
return None
|
|
|
|
if mask_norm.ndim == 3:
|
|
mask_norm = cv2.cvtColor(mask_norm, cv2.COLOR_BGR2GRAY)
|
|
|
|
if mask_norm.dtype != np.uint8:
|
|
mask_norm = mask_norm.astype(np.uint8)
|
|
|
|
# 임계값 0 초과 → 255 (blurred mask 대응)
|
|
_, mask_norm = cv2.threshold(mask_norm, 0, 255, cv2.THRESH_BINARY)
|
|
|
|
# 이미지 크기와 다르면 안전하게 리사이즈
|
|
if mask_norm.shape[:2] != image_data.shape[:2]:
|
|
self.logger.log(
|
|
f"마스크 크기 보정: mask={mask_norm.shape} → image={image_data.shape[:2]}",
|
|
level=logging.WARNING,
|
|
)
|
|
mask_norm = cv2.resize(mask_norm, (image_data.shape[1], image_data.shape[0]), interpolation=cv2.INTER_NEAREST)
|
|
|
|
# 필요 시 반전 (서버 마스크 규약 차이 대응)
|
|
if invert_mask:
|
|
try:
|
|
mask_norm = cv2.bitwise_not(mask_norm)
|
|
self.logger.log("invert_mask=True → 마스크 반전 적용", level=logging.DEBUG)
|
|
except Exception as inv_err:
|
|
self.logger.log(f"마스크 반전 실패: {inv_err}", level=logging.WARNING)
|
|
|
|
except Exception as norm_err:
|
|
self.logger.log(f"마스크 정규화 실패: {norm_err}", level=logging.ERROR)
|
|
return None
|
|
|
|
# 이미지를 base64로 인코딩
|
|
_, img_encoded = cv2.imencode('.png', image_data)
|
|
_, mask_encoded = cv2.imencode('.png', mask_norm)
|
|
img_b64 = base64.b64encode(img_encoded).decode('utf-8')
|
|
mask_b64 = base64.b64encode(mask_encoded).decode('utf-8')
|
|
|
|
payload = {
|
|
"image": img_b64,
|
|
"mask": mask_b64,
|
|
"model_name": model_name
|
|
}
|
|
|
|
# 요청 파라미터 명시: 이 서버는 Accept 에 따라 응답이 달라질 수 있으므로 쿼리로 고정
|
|
params = {
|
|
"response_format": "binary",
|
|
"image_format": "png",
|
|
}
|
|
|
|
self.logger.log(f"외부 인페인팅 서버 요청: {api_url}, model={model_name}", level=logging.DEBUG)
|
|
response = requests.post(api_url, params=params, json=payload, timeout=(5, 45))
|
|
|
|
if response.status_code != 200:
|
|
self.logger.log(f"외부 인페인팅 서버 에러: {response.status_code} {response.text}", level=logging.WARNING)
|
|
return None
|
|
|
|
# 응답이 바이너리 PNG 이미지이므로 바로 디코딩
|
|
nparr = np.frombuffer(response.content, np.uint8)
|
|
result = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
|
|
|
if result is not None:
|
|
self.logger.log("외부 인페인팅 성공", level=logging.DEBUG)
|
|
|
|
return result
|
|
|
|
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.RequestException) as e:
|
|
self.logger.log(f"외부 인페인팅 서버 요청 실패: {e}", level=logging.WARNING)
|
|
return None
|
|
except Exception as e:
|
|
self.logger.log(f"외부 인페인팅 실행 중 에러: {e}", level=logging.ERROR, exc_info=True)
|
|
return None
|
|
|
|
def request_inpaint(self, image: np.ndarray, mask: np.ndarray, invert_mask: bool = False, inpaint_model: str = "migan") -> np.ndarray:
|
|
"""로컬 MIGAN으로 인페인팅 수행.
|
|
|
|
Args:
|
|
image: 경로(str) 또는 BGR np.ndarray
|
|
mask: 0/255 uint8 2D, 텍스트영역=255
|
|
invert_mask: 무시(서버 호환 옵션)
|
|
inpaint_model: 미사용(서버 호환)
|
|
Returns:
|
|
인페인트된 BGR np.ndarray or None
|
|
"""
|
|
try:
|
|
# 이미지 경로 확보
|
|
img_path = None
|
|
cleanup = False
|
|
if isinstance(image, str) and os.path.isfile(image):
|
|
img_path = image
|
|
image_data = cv2.imread(image)
|
|
else:
|
|
image_data = image if isinstance(image, np.ndarray) else None
|
|
if image_data is None:
|
|
self.logger.log(f"이미지 입력이 유효하지 않습니다: {type(image)}", level=logging.ERROR)
|
|
return None
|
|
import tempfile
|
|
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
|
|
cv2.imwrite(tmp.name, image_data)
|
|
img_path = tmp.name
|
|
cleanup = True
|
|
|
|
# 마스크 정규화
|
|
if mask is None:
|
|
self.logger.log("마스크가 None입니다", level=logging.ERROR)
|
|
return None
|
|
mask_norm = mask
|
|
if mask_norm.ndim == 3:
|
|
mask_norm = cv2.cvtColor(mask_norm, cv2.COLOR_BGR2GRAY)
|
|
if mask_norm.dtype != np.uint8:
|
|
mask_norm = mask_norm.astype(np.uint8)
|
|
_, mask_norm = cv2.threshold(mask_norm, 0, 255, cv2.THRESH_BINARY)
|
|
if mask_norm.shape[:2] != image_data.shape[:2]:
|
|
self.logger.log(
|
|
f"마스크 크기 보정: mask={mask_norm.shape} → image={image_data.shape[:2]}",
|
|
level=logging.WARNING,
|
|
)
|
|
mask_norm = cv2.resize(mask_norm, (image_data.shape[1], image_data.shape[0]), interpolation=cv2.INTER_NEAREST)
|
|
|
|
# MIGAN 파이프라인 준비
|
|
migan = self._get_migan()
|
|
if migan is None:
|
|
self.logger.log("MIGAN 파이프라인 초기화 실패", level=logging.ERROR)
|
|
return None
|
|
|
|
result = migan.inpaint(img_path, mask_norm)
|
|
return result
|
|
except Exception as e:
|
|
self.logger.log(f"로컬 MIGAN 인페인팅 실패: {e}", level=logging.ERROR, exc_info=True)
|
|
return None
|
|
finally:
|
|
try:
|
|
if 'cleanup' in locals() and cleanup and img_path and os.path.exists(img_path):
|
|
os.unlink(img_path)
|
|
except Exception:
|
|
pass
|
|
|
|
def request_rembg(self, image: np.ndarray, use_local_rembg: bool = False, local_model_name: str = "bria-rmbg-1.4", object_ratio: float = 0.95) -> np.ndarray:
|
|
"""RemoveBG 플러그인 호출 후 결과 이미지를 흰 배경 중앙 배치로 후처리.
|
|
서버가 다운될 경우 백업으로 내장 rembg 모듈 사용.
|
|
|
|
Args:
|
|
image: 입력 이미지 (np.ndarray 또는 파일 경로)
|
|
use_local_rembg: True면 서버 실패 시 백업으로 내장 rembg 모듈 사용 허용
|
|
local_model_name: 내장 BriaAI에서 사용할 모델명 (기본값: "bria-rmbg-1.4")
|
|
object_ratio: 객체가 캔버스에서 차지할 비율 (기본값: 0.75 = 75%)
|
|
"""
|
|
try:
|
|
self.logger.log(f"request_rembg 호출: use_local_rembg={use_local_rembg}, local_model_name={local_model_name}", level=logging.DEBUG)
|
|
# 입력 이미지 로드/확정
|
|
if isinstance(image, str) and os.path.isfile(image):
|
|
image_data = cv2.imread(image)
|
|
elif isinstance(image, np.ndarray):
|
|
image_data = image
|
|
else:
|
|
self.logger.log(f"이미지 파일을 읽을 수 없습니다: {image}", level=logging.ERROR)
|
|
return None
|
|
|
|
return self._use_backup_rembg(image_data, local_model_name, object_ratio, debug_save=True, debug_prefix="wrong_shape", force_cpu=False)
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"request_rembg 실행 중 오류: {e}", level=logging.WARNING, exc_info=True)
|
|
return None
|
|
|
|
def _use_backup_rembg(self, image_data: np.ndarray, model_name: str = "bria-rmbg-1.4", object_ratio: float = 0.75, debug_save: bool = False, debug_prefix: str = "debug", force_cpu: bool = False) -> np.ndarray:
|
|
"""내장 rembg 모듈을 사용하여 배경 제거 (안전한 처리)
|
|
|
|
Args:
|
|
image_data: 입력 이미지 데이터
|
|
model_name: 사용할 rembg 모델명
|
|
object_ratio: 객체가 캔버스에서 차지할 비율
|
|
debug_save: True면 중간 단계 이미지들을 저장
|
|
debug_prefix: 디버그 이미지 파일명 접두사
|
|
force_cpu: True면 CPU 모드로 강제 실행 (DirectML 알파 채널 이슈 회피)
|
|
"""
|
|
try:
|
|
import tempfile
|
|
import uuid
|
|
import shutil
|
|
|
|
self.logger.log(f"🔧 백업 rembg 시작: model_name={model_name}, object_ratio={object_ratio}, debug_save={debug_save}, force_cpu={force_cpu}", level=logging.INFO)
|
|
|
|
# 디버그 저장을 위한 설정
|
|
debug_dir = None
|
|
if debug_save:
|
|
debug_dir = tempfile.mkdtemp(prefix=f"{debug_prefix}_rembg_debug_")
|
|
self.logger.log(f"📁 디버그 이미지 저장 디렉토리: {debug_dir}", level=logging.INFO)
|
|
|
|
# 1단계: 입력 이미지 저장
|
|
input_path = os.path.join(debug_dir, "01_input_image.png")
|
|
cv2.imwrite(input_path, image_data)
|
|
self.logger.log(f"💾 1단계 저장 완료: 입력 이미지 {image_data.shape} → {input_path}", level=logging.DEBUG)
|
|
|
|
backup_rembg = self._get_backup_rembg()
|
|
if backup_rembg is None:
|
|
error_msg = self._backup_rembg_error or "백업 rembg 모듈을 사용할 수 없습니다"
|
|
self.logger.log(f"백업 rembg 모듈 사용 불가: {error_msg}", level=logging.ERROR)
|
|
return None
|
|
|
|
# 모델명 유효성 확인 및 대체
|
|
supported_models = backup_rembg.get_supported_models()
|
|
if model_name not in supported_models:
|
|
self.logger.log(f"지원하지 않는 모델명 ({model_name}). bria-rmbg-1.4로 대체 사용", level=logging.WARNING)
|
|
model_name = "bria-rmbg-1.4" # 기본 모델로 대체
|
|
|
|
# 안전한 임시 파일 저장 (UTF-8 경로 문제 해결)
|
|
|
|
try:
|
|
# ASCII 경로로 안전한 임시 파일 생성
|
|
temp_dir = tempfile.gettempdir()
|
|
safe_filename = f"rembg_{uuid.uuid4().hex[:8]}.png"
|
|
temp_path = os.path.join(temp_dir, safe_filename)
|
|
|
|
# 이미지 저장 시 인코딩 안전성 확보
|
|
encode_param = [cv2.IMWRITE_PNG_COMPRESSION, 1]
|
|
success = cv2.imwrite(temp_path, image_data, encode_param)
|
|
|
|
if not success or not os.path.exists(temp_path) or os.path.getsize(temp_path) == 0:
|
|
self.logger.log("임시 이미지 파일 생성 실패", level=logging.ERROR)
|
|
return None
|
|
|
|
self.logger.log(f"✅ 안전한 임시 파일 생성: {temp_path} (크기: {os.path.getsize(temp_path)} bytes)", level=logging.DEBUG)
|
|
|
|
# 2단계: 임시 파일 저장 확인 (디버그)
|
|
if debug_save and debug_dir:
|
|
temp_copy_path = os.path.join(debug_dir, "02_temp_file_copy.png")
|
|
shutil.copy2(temp_path, temp_copy_path)
|
|
self.logger.log(f"💾 2단계 저장 완료: 임시 파일 복사 → {temp_copy_path}", level=logging.DEBUG)
|
|
|
|
# 내장 rembg로 배경 제거 (CPU/GPU 모드 선택 가능)
|
|
try:
|
|
self.logger.log(f"백업 rembg 모듈로 배경 제거 시작: {model_name}", level=logging.DEBUG)
|
|
|
|
if force_cpu:
|
|
# CPU 모드로 강제 실행
|
|
self.logger.log("⚠️ CPU 모드로 강제 실행", level=logging.WARNING)
|
|
original_gpu_state = backup_rembg.gpu_manager.can_use_cuda if backup_rembg.gpu_manager else False
|
|
|
|
if backup_rembg.gpu_manager:
|
|
backup_rembg.gpu_manager.can_use_cuda = False
|
|
|
|
try:
|
|
result_pil = backup_rembg.remove_background(temp_path, model_name=model_name, force_cpu=True)
|
|
if result_pil is not None:
|
|
self.logger.log(f"CPU 강제 모드로 배경 제거 성공", level=logging.INFO)
|
|
else:
|
|
self.logger.log(f"내장 rembg 모듈({model_name}) 처리 실패", level=logging.ERROR)
|
|
return None
|
|
finally:
|
|
# GPU 상태 복원
|
|
if backup_rembg.gpu_manager:
|
|
backup_rembg.gpu_manager.can_use_cuda = original_gpu_state
|
|
else:
|
|
# DirectML GPU 모드로 실행
|
|
self.logger.log("🔥 DirectML GPU 모드로 배경 제거 실행", level=logging.INFO)
|
|
result_pil = backup_rembg.remove_background(temp_path, model_name=model_name, force_cpu=force_cpu)
|
|
if result_pil is not None:
|
|
self.logger.log(f"DirectML GPU 모드로 배경 제거 성공", level=logging.INFO)
|
|
else:
|
|
self.logger.log(f"내장 rembg 모듈({model_name}) 처리 실패", level=logging.ERROR)
|
|
return None
|
|
|
|
self.logger.log(f"✅ 백업 rembg 처리 성공: {result_pil.mode}, 크기: {result_pil.size}", level=logging.DEBUG)
|
|
|
|
# 3단계: PIL 결과 저장 (디버그)
|
|
if debug_save and debug_dir:
|
|
pil_result_path = os.path.join(debug_dir, "03_pil_result.png")
|
|
result_pil.save(pil_result_path)
|
|
self.logger.log(f"💾 3단계 저장 완료: PIL 결과 {result_pil.mode} {result_pil.size} → {pil_result_path}", level=logging.DEBUG)
|
|
|
|
# PIL Image 상세 분석 (디버그)
|
|
self.logger.log(f"🔍 PIL 결과 분석: mode={result_pil.mode}, size={result_pil.size}, format={result_pil.format}", level=logging.DEBUG)
|
|
if hasattr(result_pil, 'getbands'):
|
|
bands = result_pil.getbands()
|
|
self.logger.log(f"🔍 PIL 밴드: {bands}", level=logging.DEBUG)
|
|
|
|
# PIL 알파 채널 통계 확인
|
|
if result_pil.mode == 'RGBA':
|
|
pil_array = np.array(result_pil)
|
|
alpha_channel = pil_array[:, :, 3]
|
|
alpha_stats = {
|
|
'min': alpha_channel.min(),
|
|
'max': alpha_channel.max(),
|
|
'mean': alpha_channel.mean(),
|
|
'nonzero_count': np.count_nonzero(alpha_channel),
|
|
'above_128_count': np.sum(alpha_channel > 128)
|
|
}
|
|
self.logger.log(f"🔍 PIL 알파 채널 통계: {alpha_stats}", level=logging.DEBUG)
|
|
|
|
# PIL Image를 numpy array로 변환 (DirectML 알파 채널 이슈 회피)
|
|
if result_pil.mode == 'RGBA':
|
|
# BriaAI에서 RGBA가 나오는 경우 (레거시)
|
|
result_rgba = np.array(result_pil)
|
|
# DirectML 이슈 회피: BGRA 변환 대신 흰 배경 합성으로 RGB 처리
|
|
rgb_part = result_rgba[:, :, :3]
|
|
alpha_part = result_rgba[:, :, 3:4].astype(np.float32) / 255.0
|
|
white_bg = np.full_like(rgb_part, 255, dtype=np.uint8)
|
|
result_rgb = (
|
|
rgb_part.astype(np.float32) * alpha_part +
|
|
white_bg.astype(np.float32) * (1.0 - alpha_part)
|
|
).astype(np.uint8)
|
|
result_img = cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR)
|
|
self.logger.log(f"🔄 RGBA → 흰배경 합성 → BGR 변환: {result_img.shape}", level=logging.DEBUG)
|
|
else:
|
|
# RGB 모드 (BriaAI가 이미 흰 배경 합성을 완료한 경우)
|
|
result_rgb = np.array(result_pil)
|
|
result_img = cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR)
|
|
self.logger.log(f"🔄 RGB → BGR 변환: {result_img.shape}", level=logging.DEBUG)
|
|
|
|
# 4단계: numpy 변환 결과 저장 (디버그) - 이제 BGR 형태
|
|
if debug_save and debug_dir:
|
|
numpy_result_path = os.path.join(debug_dir, "04_numpy_result.png")
|
|
cv2.imwrite(numpy_result_path, result_img)
|
|
self.logger.log(f"💾 4단계 저장 완료: numpy 변환 결과 {result_img.shape} (흰배경 합성 완료) → {numpy_result_path}", level=logging.DEBUG)
|
|
|
|
# 동일한 후처리 적용
|
|
self.logger.log(f"🔧 후처리 시작: object_ratio={object_ratio}", level=logging.DEBUG)
|
|
processed_result = self._postprocess_rembg_result(result_img, object_ratio, debug_save=debug_save, debug_dir=debug_dir, step_offset=4)
|
|
|
|
if processed_result is not None:
|
|
self.logger.log(f"✅ 백업 rembg 후처리 완료: {processed_result.shape}", level=logging.DEBUG)
|
|
|
|
# 최종 결과 저장 (디버그)
|
|
if debug_save and debug_dir:
|
|
final_result_path = os.path.join(debug_dir, "99_final_result.png")
|
|
cv2.imwrite(final_result_path, processed_result)
|
|
self.logger.log(f"💾 최종 결과 저장: {processed_result.shape} → {final_result_path}", level=logging.INFO)
|
|
else:
|
|
self.logger.log("❌ 백업 rembg 후처리 실패", level=logging.ERROR)
|
|
|
|
return processed_result
|
|
|
|
except Exception as rembg_error:
|
|
self.logger.log(f"백업 rembg 내부 처리 중 오류: {rembg_error}", level=logging.ERROR, exc_info=True)
|
|
return None
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"백업 rembg 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
|
return None
|
|
finally:
|
|
# 임시 파일 안전하게 삭제
|
|
if 'temp_path' in locals() and os.path.exists(temp_path):
|
|
try:
|
|
os.unlink(temp_path)
|
|
self.logger.log(f"임시 파일 삭제 완료: {temp_path}", level=logging.DEBUG)
|
|
except Exception as delete_error:
|
|
self.logger.log(f"임시 파일 삭제 실패: {delete_error}", level=logging.WARNING)
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"백업 rembg 모듈 실행 중 치명적 오류: {e}", level=logging.ERROR, exc_info=True)
|
|
return None
|
|
|
|
def _postprocess_rembg_result(self, result_img: np.ndarray, object_ratio: float = 0.75, debug_save: bool = False, debug_dir: str = None, step_offset: int = 0) -> np.ndarray:
|
|
"""RemoveBG 결과 이미지 후처리: 마스크 정제 및 중앙 배치"""
|
|
try:
|
|
# 입력 검증
|
|
if result_img is None:
|
|
self.logger.log("❌ 입력 이미지가 None입니다", level=logging.ERROR)
|
|
return None
|
|
|
|
if not isinstance(result_img, np.ndarray):
|
|
self.logger.log(f"❌ 입력이 numpy array가 아닙니다: {type(result_img)}", level=logging.ERROR)
|
|
return None
|
|
|
|
if result_img.size == 0:
|
|
self.logger.log("❌ 입력 이미지가 비어있습니다", level=logging.ERROR)
|
|
return None
|
|
|
|
if len(result_img.shape) != 3 or result_img.shape[2] not in [3, 4]:
|
|
self.logger.log(f"❌ 잘못된 이미지 형태: {result_img.shape}", level=logging.ERROR)
|
|
return None
|
|
|
|
self.logger.log(f"🎨 후처리 시작: 입력 이미지 크기 {result_img.shape}, object_ratio={object_ratio}", level=logging.DEBUG)
|
|
|
|
# 1) BGR 이미지에서 객체 마스크 생성 (흰 배경이 아닌 부분 감지)
|
|
if result_img.shape[2] == 4:
|
|
# 레거시 RGBA 처리 (이제는 거의 없을 것)
|
|
mask_init = (result_img[:, :, 3] > 128).astype(np.uint8)
|
|
rgba_img = result_img
|
|
self.logger.log(f"✅ RGBA 이미지 처리: 알파 채널 > 128인 픽셀 수: {np.sum(mask_init)}", level=logging.DEBUG)
|
|
else:
|
|
# BGR 이미지에서 흰 배경(255,255,255)이 아닌 부분을 객체로 인식
|
|
gray = cv2.cvtColor(result_img, cv2.COLOR_BGR2GRAY)
|
|
mask_init = (gray < 250).astype(np.uint8) # 거의 흰색(250 미만)이 아닌 부분을 객체로 인식
|
|
# BGR 이미지를 그대로 사용 (이미 흰 배경 합성 완료)
|
|
rgba_img = result_img
|
|
self.logger.log(f"✅ BGR 이미지 처리: 그레이 < 250인 픽셀 수: {np.sum(mask_init)}", level=logging.DEBUG)
|
|
|
|
# 초기 마스크 저장 (디버그)
|
|
if debug_save and debug_dir:
|
|
step_num = step_offset + 1
|
|
mask_init_path = os.path.join(debug_dir, f"{step_num:02d}_initial_mask.png")
|
|
input_img_path = os.path.join(debug_dir, f"{step_num:02d}_input_image.png")
|
|
cv2.imwrite(mask_init_path, mask_init * 255) # 마스크를 흰/검은색으로 표시
|
|
cv2.imwrite(input_img_path, rgba_img)
|
|
self.logger.log(f"💾 {step_num}단계 저장: 초기 마스크 ({np.sum(mask_init)}개 픽셀) → {mask_init_path}", level=logging.DEBUG)
|
|
|
|
# 2) 모폴로지 정제 (너무 공격적이지 않게 수정)
|
|
kernel = np.ones((3, 3), np.uint8)
|
|
mask_before_erode = mask_init.copy()
|
|
# 침식 연산을 건너뛰고 팽창만 적용 (객체 보존)
|
|
mask = cv2.dilate(mask_init, kernel, iterations=1)
|
|
self.logger.log(f"🔧 마스크 정제: 정제 전 {np.sum(mask_before_erode)}개 → 정제 후 {np.sum(mask)}개 픽셀", level=logging.DEBUG)
|
|
|
|
# 정제된 마스크 저장 (디버그)
|
|
if debug_save and debug_dir:
|
|
step_num = step_offset + 2
|
|
refined_mask_path = os.path.join(debug_dir, f"{step_num:02d}_refined_mask.png")
|
|
cv2.imwrite(refined_mask_path, mask * 255)
|
|
self.logger.log(f"💾 {step_num}단계 저장: 정제된 마스크 ({np.sum(mask)}개 픽셀) → {refined_mask_path}", level=logging.DEBUG)
|
|
|
|
# 3) 최대 연결 요소만 유지
|
|
num, labels, stats, _ = cv2.connectedComponentsWithStats(mask, connectivity=8)
|
|
self.logger.log(f"🔍 연결 요소 분석: {num-1}개 객체 발견", level=logging.DEBUG)
|
|
|
|
if num > 1:
|
|
largest = 1 + np.argmax(stats[1:, cv2.CC_STAT_AREA])
|
|
mask = (labels == largest).astype(np.uint8)
|
|
self.logger.log(f"🎯 최대 연결 요소 선택: {np.sum(mask)}개 픽셀", level=logging.DEBUG)
|
|
|
|
# 연결 요소 분석 결과 저장 (디버그)
|
|
if debug_save and debug_dir:
|
|
step_num = step_offset + 3
|
|
component_mask_path = os.path.join(debug_dir, f"{step_num:02d}_component_mask.png")
|
|
cv2.imwrite(component_mask_path, mask * 255)
|
|
self.logger.log(f"💾 {step_num}단계 저장: 연결 요소 마스크 ({np.sum(mask)}개 픽셀) → {component_mask_path}", level=logging.DEBUG)
|
|
|
|
ys, xs = np.where(mask > 0)
|
|
if len(xs) == 0 or len(ys) == 0:
|
|
# 객체 감지 실패 → 원본 이미지를 그대로 1000x1000으로 리사이즈
|
|
self.logger.log("⚠️ 객체 감지 실패: 원본 이미지를 그대로 리사이즈", level=logging.WARNING)
|
|
|
|
# 원본 이미지는 이미 BGR (흰 배경 합성 완료)
|
|
original_bgr = result_img[:, :, :3] if result_img.shape[2] == 4 else result_img
|
|
|
|
# 객체 감지 실패 이미지 저장 (디버그)
|
|
if debug_save and debug_dir:
|
|
step_num = step_offset + 4
|
|
fallback_path = os.path.join(debug_dir, f"{step_num:02d}_fallback_original.png")
|
|
cv2.imwrite(fallback_path, original_bgr)
|
|
self.logger.log(f"💾 {step_num}단계 저장: 폴백 원본 이미지 → {fallback_path}", level=logging.DEBUG)
|
|
|
|
# 1000x1000으로 리사이즈
|
|
result_1000x1000 = self._create_square_canvas_with_dpi(
|
|
original_bgr,
|
|
target_size=1000,
|
|
target_dpi=72,
|
|
object_ratio=object_ratio
|
|
)
|
|
|
|
# 폴백 최종 결과 저장 (디버그)
|
|
if debug_save and debug_dir and result_1000x1000 is not None:
|
|
fallback_final_path = os.path.join(debug_dir, f"98_fallback_final_result.png")
|
|
cv2.imwrite(fallback_final_path, result_1000x1000)
|
|
self.logger.log(f"💾 폴백 최종 결과 저장: {result_1000x1000.shape} → {fallback_final_path}", level=logging.DEBUG)
|
|
|
|
return result_1000x1000
|
|
|
|
# 4) 바운딩 박스 + 마진
|
|
self.logger.log(f"📐 바운딩 박스 계산: 유효 픽셀 {len(xs)}개", level=logging.DEBUG)
|
|
top, left = ys.min(), xs.min()
|
|
bottom, right = ys.max(), xs.max()
|
|
crop_img = rgba_img[top:bottom + 1, left:right + 1]
|
|
self.logger.log(f"✂️ 크롭 영역: ({left},{top}) ~ ({right},{bottom}), 크기: {crop_img.shape}", level=logging.DEBUG)
|
|
|
|
# 바운딩 박스 결과 저장 (디버그)
|
|
if debug_save and debug_dir:
|
|
step_num = step_offset + 5
|
|
bbox_path = os.path.join(debug_dir, f"{step_num:02d}_bbox_crop.png")
|
|
cv2.imwrite(bbox_path, crop_img)
|
|
self.logger.log(f"💾 {step_num}단계 저장: 바운딩 박스 크롭 {crop_img.shape} → {bbox_path}", level=logging.DEBUG)
|
|
|
|
ch, cw = crop_img.shape[:2]
|
|
margin = int(max(ch, cw) * 0.01)
|
|
# BGR 이미지에 흰색 마진 추가
|
|
crop_img = cv2.copyMakeBorder(
|
|
crop_img, margin, margin, margin, margin,
|
|
borderType=cv2.BORDER_CONSTANT,
|
|
value=[255, 255, 255] # BGR 흰색
|
|
)
|
|
self.logger.log(f"📏 마진 추가: {margin}px, 최종 크기: {crop_img.shape}", level=logging.DEBUG)
|
|
|
|
# 마진 추가 결과 저장 (디버그)
|
|
if debug_save and debug_dir:
|
|
step_num = step_offset + 6
|
|
margin_path = os.path.join(debug_dir, f"{step_num:02d}_with_margin.png")
|
|
cv2.imwrite(margin_path, crop_img)
|
|
self.logger.log(f"💾 {step_num}단계 저장: 마진 추가 {crop_img.shape} → {margin_path}", level=logging.DEBUG)
|
|
|
|
# 5) 이미 흰 배경 합성이 완료된 BGR 이미지이므로 그대로 사용
|
|
final_img = crop_img
|
|
|
|
if final_img is None:
|
|
self.logger.log("❌ 이미지 처리 실패", level=logging.ERROR)
|
|
return None
|
|
|
|
self.logger.log(f"✅ 이미지 처리 완료 (이미 흰배경 합성됨): {final_img.shape}", level=logging.DEBUG)
|
|
|
|
# 흰 배경 합성 결과 저장 (디버그)
|
|
if debug_save and debug_dir:
|
|
step_num = step_offset + 7
|
|
white_bg_path = os.path.join(debug_dir, f"{step_num:02d}_white_background.png")
|
|
cv2.imwrite(white_bg_path, final_img)
|
|
self.logger.log(f"💾 {step_num}단계 저장: 흰 배경 합성 {final_img.shape} → {white_bg_path}", level=logging.DEBUG)
|
|
|
|
# 6) 1000x1000 크기로 리사이즈하고 DPI 72 설정
|
|
if final_img is None:
|
|
self.logger.log("❌ final_img가 None이어서 리사이즈 건너뜀", level=logging.ERROR)
|
|
return None
|
|
|
|
self.logger.log(f"🎯 1000x1000 리사이즈 시작: object_ratio={object_ratio}, final_img.shape={final_img.shape}", level=logging.DEBUG)
|
|
result_1000x1000 = self._create_square_canvas_with_dpi(
|
|
final_img,
|
|
target_size=1000,
|
|
target_dpi=72,
|
|
object_ratio=object_ratio
|
|
)
|
|
|
|
if result_1000x1000 is not None:
|
|
self.logger.log(f"✅ 1000x1000 리사이즈 완료: {result_1000x1000.shape}", level=logging.DEBUG)
|
|
else:
|
|
self.logger.log("❌ 1000x1000 리사이즈 실패", level=logging.ERROR)
|
|
|
|
return result_1000x1000
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"❌ rembg 결과 후처리 에러: {e}", level=logging.ERROR, exc_info=True)
|
|
return None
|
|
|
|
def _postprocess_rembg_result_old(self, result_img: np.ndarray) -> np.ndarray:
|
|
"""RemoveBG 결과 이미지 후처리: 마스크 정제 및 중앙 배치"""
|
|
try:
|
|
# 1) 초기 마스크
|
|
if result_img.shape[2] == 4:
|
|
mask_init = (result_img[:, :, 3] > 200).astype(np.uint8)
|
|
rgba_img = result_img
|
|
else:
|
|
gray = cv2.cvtColor(result_img[:, :, :3], cv2.COLOR_BGR2GRAY)
|
|
mask_init = (gray < 230).astype(np.uint8)
|
|
alpha_channel = (mask_init * 255).astype(np.uint8)
|
|
rgba_img = np.dstack([result_img, alpha_channel])
|
|
|
|
# 2) 모폴로지 정제
|
|
kernel = np.ones((3, 3), np.uint8)
|
|
mask = cv2.erode(mask_init, kernel, iterations=1)
|
|
mask = cv2.dilate(mask, kernel, iterations=2)
|
|
|
|
# 3) 최대 연결 요소만 유지
|
|
num, labels, stats, _ = cv2.connectedComponentsWithStats(mask, connectivity=8)
|
|
if num > 1:
|
|
largest = 1 + np.argmax(stats[1:, cv2.CC_STAT_AREA])
|
|
mask = (labels == largest).astype(np.uint8)
|
|
|
|
ys, xs = np.where(mask > 0)
|
|
if len(xs) == 0 or len(ys) == 0:
|
|
# 객체 감지 실패 → 흰 배경 합성만
|
|
white_bg = np.full_like(rgba_img[:, :, :3], 255)
|
|
return white_bg
|
|
|
|
# 4) 바운딩 박스 + 마진
|
|
top, left = ys.min(), xs.min()
|
|
bottom, right = ys.max(), xs.max()
|
|
crop_rgba = rgba_img[top:bottom + 1, left:right + 1]
|
|
|
|
ch, cw = crop_rgba.shape[:2]
|
|
margin = int(max(ch, cw) * 0.01)
|
|
crop_rgba = cv2.copyMakeBorder(
|
|
crop_rgba, margin, margin, margin, margin,
|
|
borderType=cv2.BORDER_CONSTANT,
|
|
value=[255, 255, 255, 0]
|
|
)
|
|
|
|
# 5) 흰 배경으로 합성
|
|
bgr_crop = crop_rgba[:, :, :3].astype(np.float32)
|
|
alpha_crop = crop_rgba[:, :, 3:4].astype(np.float32) / 255.0
|
|
white_bg = np.full_like(bgr_crop, 255.0)
|
|
final_img = (bgr_crop * alpha_crop + white_bg * (1 - alpha_crop)).astype(np.uint8)
|
|
if final_img:
|
|
self.logger.log("흰배경 처리완료.", level=logging.DEBUG)
|
|
return None
|
|
return final_img
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"rembg 결과 후처리 에러: {e}", level=logging.ERROR, exc_info=True)
|
|
return None
|
|
|
|
def _create_square_canvas_with_dpi(self, img: np.ndarray, target_size: int = 1000, target_dpi: int = 72, object_ratio: float = 0.75) -> np.ndarray:
|
|
"""이미지를 1:1 비율로 만들고 지정된 크기와 DPI로 설정하며, 객체를 일정 비율로 스케일링"""
|
|
try:
|
|
# 입력 검증
|
|
if img is None:
|
|
self.logger.log("❌ _create_square_canvas_with_dpi: 입력 이미지가 None입니다", level=logging.ERROR)
|
|
return None
|
|
|
|
if not isinstance(img, np.ndarray):
|
|
self.logger.log(f"❌ _create_square_canvas_with_dpi: 입력이 numpy array가 아닙니다: {type(img)}", level=logging.ERROR)
|
|
return None
|
|
|
|
if img.size == 0:
|
|
self.logger.log("❌ _create_square_canvas_with_dpi: 입력 이미지가 비어있습니다", level=logging.ERROR)
|
|
return None
|
|
|
|
if len(img.shape) != 3 or img.shape[2] != 3:
|
|
self.logger.log(f"❌ _create_square_canvas_with_dpi: 잘못된 이미지 형태: {img.shape} (3채널 BGR 필요)", level=logging.ERROR)
|
|
return None
|
|
|
|
h, w = img.shape[:2]
|
|
self.logger.log(f"🎯 정사각형 캔버스 생성 시작: 입력={h}x{w}, 목표={target_size}x{target_size}, 객체비율={object_ratio}", level=logging.DEBUG)
|
|
|
|
# 1) 목표 객체 크기 계산 (캔버스의 75% 차지하도록)
|
|
target_object_size = int(target_size * object_ratio)
|
|
|
|
# 2) 현재 이미지의 최대 치수
|
|
max_dim = max(h, w)
|
|
|
|
# 3) 스케일 팩터 계산 (객체가 목표 크기를 차지하도록)
|
|
scale_factor = target_object_size / max_dim
|
|
|
|
# 4) 이미지 리사이즈 (객체 크기 정규화)
|
|
new_h = int(h * scale_factor)
|
|
new_w = int(w * scale_factor)
|
|
|
|
# 크기 유효성 검사
|
|
if new_h <= 0 or new_w <= 0:
|
|
self.logger.log(f"⚠️ 스케일링 결과 크기 이상: new_h={new_h}, new_w={new_w}, scale_factor={scale_factor}", level=logging.WARNING)
|
|
scaled_img = img
|
|
new_h, new_w = h, w
|
|
elif new_h > target_size * 2 or new_w > target_size * 2:
|
|
self.logger.log(f"⚠️ 스케일링 결과가 너무 큼: new_h={new_h}, new_w={new_w}, 원본 사용", level=logging.WARNING)
|
|
scaled_img = img
|
|
new_h, new_w = h, w
|
|
else:
|
|
try:
|
|
scaled_img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4)
|
|
self.logger.log(f"✅ 이미지 리사이즈 완료: {h}x{w} → {new_h}x{new_w} (scale_factor={scale_factor:.3f})", level=logging.DEBUG)
|
|
except Exception as resize_error:
|
|
self.logger.log(f"⚠️ 리사이즈 실패: {resize_error}, 원본 사용", level=logging.WARNING)
|
|
scaled_img = img
|
|
new_h, new_w = h, w
|
|
|
|
# 5) 1000x1000 정사각형 캔버스 생성 (흰색 배경)
|
|
square_canvas = np.full((target_size, target_size, 3), 255, dtype=np.uint8)
|
|
|
|
# 6) 중앙에 스케일된 이미지 배치
|
|
y_offset = (target_size - new_h) // 2
|
|
x_offset = (target_size - new_w) // 2
|
|
|
|
# 경계 확인 및 안전성 검사
|
|
if y_offset < 0 or x_offset < 0:
|
|
self.logger.log(f"⚠️ 오프셋 음수: y_offset={y_offset}, x_offset={x_offset}", level=logging.WARNING)
|
|
y_offset = max(0, y_offset)
|
|
x_offset = max(0, x_offset)
|
|
|
|
y_end = min(y_offset + new_h, target_size)
|
|
x_end = min(x_offset + new_w, target_size)
|
|
scaled_h = y_end - y_offset
|
|
scaled_w = x_end - x_offset
|
|
|
|
if scaled_h <= 0 or scaled_w <= 0:
|
|
self.logger.log(f"❌ 배치 영역 크기 이상: scaled_h={scaled_h}, scaled_w={scaled_w}", level=logging.ERROR)
|
|
return None
|
|
|
|
# 배치 영역과 소스 영역 유효성 확인
|
|
if scaled_h > scaled_img.shape[0] or scaled_w > scaled_img.shape[1]:
|
|
self.logger.log(f"⚠️ 배치 영역이 소스보다 큼: scaled={scaled_h}x{scaled_w}, source={scaled_img.shape[:2]}", level=logging.WARNING)
|
|
scaled_h = min(scaled_h, scaled_img.shape[0])
|
|
scaled_w = min(scaled_w, scaled_img.shape[1])
|
|
y_end = y_offset + scaled_h
|
|
x_end = x_offset + scaled_w
|
|
|
|
try:
|
|
# 5) 1000x1000 정사각형 캔버스 생성 (흰색 배경)
|
|
square_canvas = np.full((target_size, target_size, 3), 255, dtype=np.uint8)
|
|
self.logger.log(f"✅ 정사각형 캔버스 생성: {square_canvas.shape}", level=logging.DEBUG)
|
|
|
|
square_canvas[y_offset:y_end, x_offset:x_end] = scaled_img[:scaled_h, :scaled_w]
|
|
self.logger.log(f"✅ 이미지 배치 완료: offset=({x_offset},{y_offset}), size=({scaled_w},{scaled_h})", level=logging.DEBUG)
|
|
|
|
# 7) PIL로 변환하여 DPI 설정
|
|
pil_img = Image.fromarray(cv2.cvtColor(square_canvas, cv2.COLOR_BGR2RGB))
|
|
|
|
# DPI 설정 (인치당 픽셀 수)
|
|
pil_img = pil_img.copy()
|
|
pil_img.info['dpi'] = (target_dpi, target_dpi)
|
|
|
|
# numpy로 다시 변환
|
|
final_img = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
|
|
|
|
self.logger.log(f"✅ 정사각형 캔버스 완성: {object_ratio*100:.0f}% 스케일링, {target_size}x{target_size}, DPI {target_dpi}", level=logging.DEBUG)
|
|
return final_img
|
|
|
|
except Exception as canvas_error:
|
|
self.logger.log(f"❌ 캔버스 생성/배치 실패: {canvas_error}", level=logging.ERROR, exc_info=True)
|
|
return None
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"정사각형 캔버스 생성 에러: {e}", level=logging.ERROR, exc_info=True)
|
|
return None
|
|
|
|
def is_server_alive(self, base_url: str, timeout: int = 3) -> bool:
|
|
"""서버 헬스체크(현재는 사용 안 함). base_url이 비어있으면 False."""
|
|
import requests as _rq
|
|
try:
|
|
if not base_url:
|
|
return False
|
|
model_url = base_url.rstrip('/') + '/api/v1/model'
|
|
response = _rq.get(model_url, timeout=timeout)
|
|
return response.status_code == 200
|
|
except Exception as e:
|
|
self.logger.log(f"서버 상태 확인 실패 ({base_url}): {e}", level=logging.WARNING)
|
|
return False
|
|
|
|
# ------------------------- MIGAN helper -------------------------
|
|
def _get_migan(self):
|
|
if getattr(self, "_migan", None) is not None:
|
|
return self._migan
|
|
if getattr(self, "_migan_error", None) is not None:
|
|
return None
|
|
try:
|
|
from modules.migan_module import build_migan_from_toggle
|
|
modules_dir = os.path.dirname(os.path.abspath(__file__))
|
|
base_dir = modules_dir
|
|
onnx_path = os.path.join(base_dir, "migan_onnx", "migan_pipeline_v2.onnx")
|
|
toggles = {
|
|
"migan_onnx_path": onnx_path,
|
|
"migan_use_accel": True,
|
|
"migan_provider_override": os.environ.get("IMGWK_MIGAN_PROVIDER", "auto"),
|
|
}
|
|
self._migan = build_migan_from_toggle(toggles, logger=self.logger, gpu_manager=self.gpu_manager)
|
|
return self._migan
|
|
except Exception as e:
|
|
self._migan_error = str(e)
|
|
self.logger.log(f"MIGAN 초기화 실패: {e}", level=logging.ERROR, exc_info=True)
|
|
return None |