t_local_serv/modules/image_processor3.py

686 lines
33 KiB
Python

import os
import asyncio
import aiofiles
import logging
from urllib.parse import urlparse
import shutil
import re
import cv2
import base64
import requests
import numpy as np
from simple_lama_inpainting import SimpleLama
from PIL import Image
import numpy as np
# from modules.easyocr_module import EasyOCREngine
from modules.ocr_module import OCRModule
from modules.mask_module_for_paddle import MaskModule
from modules.mask_module_for_easy import MaskModule_easy
from modules.text_rendering_module import TextRenderingModule
from modules.postImageManager import PostImageManager
from translatepy.translators.google import GoogleTranslate
from modules.background_removal_module import BackgroundRemovalModule
class ImageProcessor3:
"""이미지 다운로드, OCR, 번역 처리를 담당하는 클래스"""
def __init__(self, logger, base_dir, toggle_states = None, unwanted_words = None):
self.logger = logger
self.base_dir = base_dir
self.toggle_states = toggle_states or {}
self.unwanted_words = unwanted_words or {}
self.font_path = os.path.abspath(os.path.join(self.base_dir, 'modules', 'fonts', 'HakgyoansimDunggeunmisoTTFB.ttf'))
self.TEMP_IMAGE_DIR = os.path.join(self.base_dir, "temp_images")
os.makedirs(self.TEMP_IMAGE_DIR, exist_ok=True)
if not os.path.exists(self.font_path):
self.logger.log(f"폰트 파일이 존재하지 않습니다: {self.font_path}", level=logging.ERROR)
else:
self.logger.log(f"폰트 파일 정상 확인: {self.font_path}", level=logging.INFO)
self.logger.log(f"self.font_path: {self.font_path}", level=logging.DEBUG)
self.logger.log(f"self.TEMP_IMAGE_DIR: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG)
self.logger.log(f"self.unwanted_words: {self.unwanted_words}", level=logging.DEBUG)
# self.ocr_module = EasyOCREngine(logger=self.logger, base_dir=self.base_dir, lang_list=['ch_sim'])
self.ocr_module = OCRModule(logger=self.logger, base_dir=self.base_dir)
self.mask_module = MaskModule(logger=self.logger, base_dir=self.base_dir)
self.text_rendering_module = TextRenderingModule(logger=self.logger, font_path=self.font_path)
self.postImageManager = PostImageManager(logger=self.logger, toggle_states=self.toggle_states, font_path=self.font_path)
self.background_removal_module = BackgroundRemovalModule(logger=self.logger)
self.simple_lama = SimpleLama()
self.gtranslate = GoogleTranslate()
def update_unwanted_words(self, texts):
self.unwanted_words = texts
self.logger.log(f"unwanted_words: {self.unwanted_words}", level=logging.DEBUG)
def __del__(self):
"""소멸자에서 리소스 정리"""
self.cleanup()
def cleanup(self):
"""리소스 정리"""
try:
# 임시 폴더 삭제
if hasattr(self, 'TEMP_IMAGE_DIR') and os.path.exists(self.TEMP_IMAGE_DIR):
# shutil.rmtree(self.TEMP_IMAGE_DIR)
self.logger.log(f"임시 폴더 삭제됨: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"리소스 정리 중 오류: {e}", level=logging.ERROR, exc_info=True)
def is_valid_image_path(self, path):
# http/https 또는 로컬 파일(.jpg, .png 등) 모두 허용
if re.match(r'^(http|https)://.*\.(jpg|jpeg|png|bmp|gif|webp|tiff?)(\?.*)?$', path, re.IGNORECASE):
return True
if os.path.isfile(path) and path.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tif', '.tiff')):
return True
return False
async def process_single_image(self, original_image_url, unwanted_words, method, toggle_states, index, delay=1.0, output_image_path=""):
"""
단일 이미지를 처리합니다 (다운로드 -> OCR -> 인페인팅)
Args:
page: Playwright 페이지 객체
original_image_url (str): 처리할 이미지 URL
index (int): 이미지 인덱스
is_localServer (bool): 로컬 서버 사용 여부
delay (float): 요청 간격 (초)
file_prefix (str): 파일명에 추가할 접두사 (예: "detail", "option")
Returns:
dict: 처리 결과를 포함한 딕셔너리
- status: 'inpainted', 'original', 'exclude', 'error' 중 하나
- path: 처리된 이미지 파일 경로 또는 원본 이미지 파일 경로
- error: 오류 메시지 (status가 'error'인 경우에만 포함)
"""
local_image_path = original_image_url
delay = delay / 3 # 봇 탐지 회피를 위해 요청 간격 조절 - 자체번역으로 간격 최소화
self.toggle_states = toggle_states
self.unwanted_words = unwanted_words
self.logger.log(f"unwanted_words: {self.unwanted_words}", level=logging.DEBUG)
try:
# 0. 이미지 URL 유효성 체크 (http/https & 이미지 확장자)
if not original_image_url or not isinstance(original_image_url, str):
self.logger.log(f"이미지 {index+1} 처리 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING)
return {'status': 'failed', 'path': original_image_url, 'error': '이미지 URL 없음 또는 타입 오류'}
if not self.is_valid_image_path(original_image_url):
self.logger.log(f"이미지 {index+1} 처리 중단: 유효하지 않은 이미지 주소 - {original_image_url}", level=logging.WARNING)
return {'status': 'failed', 'path': original_image_url, 'error': '유효하지 않은 이미지 주소'}
# 요청 간격 조절 (봇 탐지 회피)
if delay > 0:
await asyncio.sleep(delay)
# OCR 권한 상태 로그
ocr_enabled = toggle_states.get('ocr', False)
processing_mode = "OCR+인페인팅 모드" if ocr_enabled else "전체 번역 모드"
self.logger.log(f"이미지 {index+1} 처리 시작: {original_image_url} - {processing_mode}", level=logging.DEBUG)
# # 1. 이미지 다운로드
# local_image_path = await self.download_image(page, original_image_url, index, file_prefix)
# if not local_image_path:
# self.logger.log(f"이미지 {index+1} 다운로드 실패, 원본 URL 반환", level=logging.WARNING)
# return {'status': 'failed', 'path': original_image_url, 'error': '다운로드 실패'}
# 2. OCR 텍스트 감지
ocr_results = self.ocr_module.detect_text(local_image_path)
self.logger.log(f"ocr_results: {ocr_results}", level=logging.DEBUG)
filter_ocr_results = self.filter_ocr_results(ocr_results)
self.logger.log(f"filter_ocr_results: {filter_ocr_results}", level=logging.DEBUG)
# 3. 중국어 텍스트 없는 경우 원본 이미지 반환
if not self.ocr_module.filter_chinese_text(filter_ocr_results):
self.logger.log(f"이미지 {index+1} 중국어 텍스트 없음, 원본 이미지 반환", level=logging.DEBUG)
return {'status': 'original', 'path': local_image_path}
# 4. 텍스트 번역
translated_texts = self.google_translate_texts_with_translatepy(filter_ocr_results)
self.logger.log(f"translated_texts: {translated_texts}", level=logging.DEBUG)
# 5. OCR 권한에 따른 텍스트 필터링
if ocr_enabled:
filtered_translated_texts = self.process_translated_texts(translated_texts, local_image_path, index, unwanted_words)
if not filtered_translated_texts:
self.logger.log(f"이미지 {index+1} 제외됨", level=logging.DEBUG)
return {'status': 'exclude', 'path': local_image_path}
else:
self.logger.log(f"이미지 {index+1} 치환됨", level=logging.DEBUG)
else:
# OCR 권한이 없으면 번역된 텍스트를 그대로 사용
filtered_translated_texts = translated_texts
self.logger.log(f"이미지 {index+1} OCR 권한 없음, 전체 번역 모드", level=logging.DEBUG)
# 마스크 생성 (basic 방식만 사용)
masks = self.mask_module.create_masks(
image_path=local_image_path, ocr_results=filter_ocr_results, mask_option="basic"
)
self.logger.log(f"마스크 생성 완료", level=logging.DEBUG)
# 디버깅 이미지 저장 (OCR 박스 + 마스크 시각화)
# self.save_debug_images(local_image_path, filter_ocr_results, masks, index, file_prefix)
# 인페인팅
if method == 'cv':
inpainted_image = self.opencv_inpaint(local_image_path, masks)
elif method == 'lama':
inpainted_image = self.lama_inpaint(local_image_path, masks)
# 인페인팅 실패 시 원본 이미지 사용
if inpainted_image is None:
self.logger.log(f"인페인팅 실패, 원본 이미지 사용", level=logging.WARNING)
inpainted_image = cv2.imread(local_image_path)
if inpainted_image is None:
self.logger.log(f"원본 이미지 로드 실패: {local_image_path}", level=logging.ERROR)
return {'status': 'failed', 'path': local_image_path, 'error': '원본 이미지 로드 실패'}
else:
self.logger.log(f"인페인팅 완료", level=logging.DEBUG)
# 텍스트 렌더링
text_rendered_image = self.text_rendering_module.render_text(
inpainted_image, filter_ocr_results, filtered_translated_texts)
self.logger.log(f"텍스트 렌더링 완료", level=logging.DEBUG)
# 결과 저장
translated_img_path = await self.postProcess_and_save_image(local_image_path, text_rendered_image, index, output_image_path, toggle_states)
self.logger.log(f"이미지 {index+1} 번역 완료: {translated_img_path}", level=logging.DEBUG)
return {'status': 'inpainted', 'path': translated_img_path}
except Exception as e:
self.logger.log(f"이미지 {index+1} 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
return {'status': 'failed', 'path': local_image_path or original_image_url, 'error': str(e)}
# OCR 결과 필터링: 중국어 텍스트만 필터링
def filter_ocr_results(self, ocr_results):
import re
# 중국어 문자 범위 정규식 (한자)
chinese_pattern = re.compile(r'[\u4e00-\u9fff]+')
filtered_results = []
for r in ocr_results:
text = r.get('text', '').strip()
polygon = r.get('polygon', [])
confidence = r.get('confidence', 0.0)
# 텍스트가 비어있거나 polygon이 3점 미만이면 제외
if not text or not polygon or len(polygon) < 3:
self.logger.log(f"[필터링] 제외 (텍스트/폴리곤): '{text}'", level=logging.DEBUG)
continue
# 신뢰도 20% 미만이면 제외
if confidence < 0.05:
self.logger.log(f"[필터링] 제외 (신뢰도 {confidence:.1%}): '{text}'", level=logging.DEBUG)
continue
# 중국어 문자가 포함된 텍스트만 필터링
if chinese_pattern.search(text):
filtered_results.append(r)
self.logger.log(f"[필터링] 포함 (신뢰도 {confidence:.1%}): '{text}'", level=logging.DEBUG)
else:
self.logger.log(f"[필터링] 제외 (중국어 없음): '{text}'", level=logging.DEBUG)
self.logger.log(f"필터링 결과: {len(filtered_results)}/{len(ocr_results)}개 (신뢰도 + & 중국어)", level=logging.INFO)
return filtered_results
async def postProcess_and_save_image(self, local_image_path, text_rendered_image, index, output_image_path="", toggle_states=None):
"""로컬 서버 URL을 사용해 이미지를 번역하고 로컬에 저장합니다"""
try:
# 파일명에 접두사 포함
if output_image_path:
img_path = os.path.join(self.TEMP_IMAGE_DIR, output_image_path)
else:
img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_img_{index+1}.png")
watermarked_image_data = self.postImageManager.add_watermark(image_data=text_rendered_image, watermark_text=toggle_states.get("watermark_text", "이미지 저작권 보유"))
final_image_path = self.postImageManager.save_image_to_path(watermarked_image_data, img_path)
return final_image_path
except Exception as e:
self.logger.log(f"이미지 {index+1} 번역 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
return local_image_path
async def download_image(self, page, image_url, index, file_prefix=""):
"""Playwright를 사용해 이미지를 다운로드합니다"""
# 로컬 파일 경로면 바로 반환
if os.path.isfile(image_url):
self.logger.log(f"로컬 파일 경로 감지, 다운로드 생략: {image_url}", level=logging.DEBUG)
return image_url
# 로컬 파일 경로가 아니면 다운로드 시도
try:
# "https://assets.alicdn.com"으로 시작하는 URL은 건너뛰기
if image_url.startswith("https://assets.alicdn.com"):
self.logger.log(f"다운로드 제외 URL: {image_url}", level=logging.DEBUG)
return None
# URL에서 파일명 추출 및 접두사 포함
parsed_url = urlparse(image_url)
base_filename = f"image_{index:03d}_{os.path.basename(parsed_url.path)}"
if not base_filename.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')):
base_filename += '.jpg'
# 접두사가 있으면 파일명에 포함
if file_prefix:
filename = f"{file_prefix}_{base_filename}"
else:
filename = base_filename
local_path = os.path.join(self.TEMP_IMAGE_DIR, filename)
# Playwright로 이미지 다운로드
response = await page.request.get(image_url)
if response.status == 200:
image_data = await response.body()
# 이미지 데이터 유효성 검사
if self.is_valid_image_data(image_data):
async with aiofiles.open(local_path, 'wb') as f:
await f.write(image_data)
self.logger.log(f"이미지 다운로드 완료: {filename}", level=logging.DEBUG)
return local_path
else:
self.logger.log(f"유효하지 않은 이미지 데이터: {image_url}", level=logging.WARNING)
return None
else:
self.logger.log(f"이미지 다운로드 실패 (HTTP {response.status}): {image_url}", level=logging.ERROR, exc_info=True)
return None
except Exception as e:
self.logger.log(f"이미지 다운로드 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None
def is_valid_image_data(self, image_data):
"""이미지 데이터가 유효한지 확인합니다"""
if not image_data or len(image_data) < 100:
return False
# 일반적인 이미지 파일 시그니처 확인
image_signatures = [
b'\xFF\xD8\xFF', # JPEG
b'\x89PNG\r\n\x1a\n', # PNG
b'GIF87a', # GIF87a
b'GIF89a', # GIF89a
b'RIFF', # WebP (RIFF 컨테이너)
]
return any(image_data.startswith(sig) for sig in image_signatures)
def process_translated_texts(self, translated_texts, local_image_path, index, unwanted_words):
"""
번역된 단어 리스트(translated_texts)에서 unwanted_words의 원본값이
앞이나 뒤에 포함되면 치환값으로 바꿉니다.
치환값이 '이미지삭제'라면 None 반환(이미지 제외)
"""
new_texts = []
for i, text in enumerate(translated_texts):
self.logger.log(f"[치환 처리 {i+1}] 원본 텍스트: '{text}'", level=logging.DEBUG)
# 텍스트를 빈칸으로 분리
words = text.split()
self.logger.log(f"[치환 처리 {i+1}] 분리된 단어: {words}", level=logging.DEBUG)
processed_words = []
text_replaced = False
for word_idx, word in enumerate(words):
word_replaced = False
# unwanted_words와 매칭 확인
for origin, replace in unwanted_words.items():
if word.startswith(origin) or word.endswith(origin) or word == origin:
self.logger.log(f"[치환 처리 {i+1}] 단어 '{word}' -> '{replace}' (원본: '{origin}')", level=logging.INFO)
if replace == "이미지삭제":
self.logger.log(f"이미지 {index+1} 제외됨: {local_image_path}", level=logging.INFO)
return None
# 단어 치환
if word == origin:
new_word = replace
elif word.startswith(origin):
new_word = replace + word[len(origin):]
elif word.endswith(origin):
new_word = word[:-len(origin)] + replace
processed_words.append(new_word)
word_replaced = True
text_replaced = True
self.logger.log(f"[치환 처리 {i+1}] 단어 치환 완료: '{word}' -> '{new_word}'", level=logging.DEBUG)
break
if not word_replaced:
processed_words.append(word)
# 처리된 단어들을 다시 합치기
final_text = ' '.join(processed_words)
new_texts.append(final_text)
if text_replaced:
self.logger.log(f"[치환 처리 {i+1}] 최종 결과: '{text}' -> '{final_text}'", level=logging.INFO)
else:
self.logger.log(f"[치환 처리 {i+1}] 변경 없음: '{text}'", level=logging.DEBUG)
self.logger.log(f"전체 치환 결과: {len(new_texts)}개 텍스트 처리 완료", level=logging.INFO)
for i, (original, processed) in enumerate(zip(translated_texts, new_texts)):
if original != processed:
self.logger.log(f"[최종 치환 {i+1}] '{original}' -> '{processed}'", level=logging.INFO)
return new_texts
def split_and_translate(self, text, translator, src='zh-cn', tgt='ko'):
# 여러 구분자 기준 분리 (공백도 포함)
parts = re.split(r'\s*[/|·,;、]+\s*', text)
result = []
for part in parts:
part = part.strip()
if not part:
continue
try:
t = translator.translate(part, "Korean")
# translatepy에서 t.text가 번역 결과 (에러시 fallback)
result.append(getattr(t, "text", getattr(t, "result", part)))
except Exception as e:
result.append(part)
return " / ".join(result)
def google_translate_texts_with_translatepy(self, ocr_results):
"""
ocr_results에서 추출한 텍스트 리스트를 구글 번역기로 번역하여 반환합니다.
각 텍스트에 split_and_translate를 적용!
"""
texts = [result['text'] for result in ocr_results if result['text'].strip()]
if not texts:
return []
try:
# 부분별로 나눠서 번역하고 다시 합침
translations = [self.split_and_translate(text, self.gtranslate) for text in texts]
return translations
except Exception as e:
self.logger.log(f"구글 번역 실패: {e}", level=logging.ERROR, exc_info=True)
return texts
def lama_inpaint(self, image_path, mask):
"""
simple_lama_inpainting의 SimpleLama를 사용하여 이미지에서 마스크 영역을 채웁니다.
Args:
image_path (str): 원본 이미지 경로
mask (np.ndarray 또는 str): 2D 마스크 이미지 (0/255, shape=(H, W)) 또는 마스크 파일 경로
Returns:
inpainted_image (np.ndarray): 인페인팅된 이미지 (OpenCV BGR)
"""
# 이미지 로드
image = Image.open(image_path)
# 마스크 로드/변환
if isinstance(mask, str):
mask_img = Image.open(mask).convert('L')
elif isinstance(mask, np.ndarray):
if mask.ndim == 3:
mask = mask[:, :, 0] # 첫 채널만 사용
mask_img = Image.fromarray(mask).convert('L')
else:
raise ValueError('mask는 파일 경로나 np.ndarray여야 합니다.')
# 인페인팅 수행
result = self.simple_lama(image, mask_img)
# PIL.Image -> np.ndarray (OpenCV BGR)
result_np = np.array(result)
if result_np.ndim == 3 and result_np.shape[2] == 3:
# RGB -> BGR
result_np = result_np[:, :, ::-1]
return result_np
def opencv_inpaint(self, image_path, mask, method='telea', radius=3):
"""
OpenCV로 인페인팅을 수행합니다.
Args:
image_path (str): 원본 이미지 경로
mask (np.ndarray): 2D 마스크 이미지 (0/255, shape=(H, W))
method (str): 'telea' 또는 'ns'
radius (int): 인페인팅 반경
Returns:
inpainted_image (np.ndarray): 인페인팅된 이미지
"""
import cv2
import numpy as np
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"이미지 로드 실패: {image_path}", level=logging.ERROR)
return None
# 마스크가 2D 배열인지 확인
if mask is None:
self.logger.log(f"마스크가 None입니다", level=logging.ERROR)
return None
if not isinstance(mask, np.ndarray) or mask.ndim != 2:
self.logger.log(f"마스크가 2D numpy 배열이 아닙니다: type={type(mask)}, shape={getattr(mask, 'shape', 'N/A')}", level=logging.ERROR)
return None
# 마스크 크기가 이미지와 일치하는지 확인
if mask.shape != image.shape[:2]:
self.logger.log(f"마스크 크기가 이미지와 다릅니다: mask={mask.shape}, image={image.shape[:2]}", level=logging.ERROR)
return None
inpaint_method = cv2.INPAINT_TELEA if method == 'telea' else cv2.INPAINT_NS
inpainted = cv2.inpaint(image, mask, radius, inpaint_method)
return inpainted
def save_debug_images(self, local_image_path, ocr_results, masks, index, file_prefix=""):
"""디버깅용 OCR 박스와 마스크 이미지를 저장합니다"""
try:
# OCR 박스 시각화 이미지 저장
ocr_debug_path = self.save_ocr_debug_image(local_image_path, ocr_results, index, file_prefix)
# 마스크 시각화 이미지 저장
mask_debug_path = self.save_mask_debug_image(local_image_path, masks, index, file_prefix)
self.logger.log(f"디버깅 이미지 저장 완료: OCR={ocr_debug_path}, Mask={mask_debug_path}", level=logging.INFO)
return ocr_debug_path, mask_debug_path
except Exception as e:
self.logger.log(f"디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None, None
def save_ocr_debug_image(self, image_path, ocr_results, index, file_prefix=""):
"""OCR 감지된 박스들을 이미지에 표시하여 저장합니다"""
try:
import cv2
import numpy as np
# 원본 이미지 로드
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"OCR 디버깅용 이미지 로드 실패: {image_path}", level=logging.ERROR)
return None
# 이미지 복사본 생성
debug_image = image.copy()
# OCR 결과별로 박스 그리기
for i, result in enumerate(ocr_results):
polygon = result.get('polygon', [])
text = result.get('text', '')
confidence = result.get('confidence', 0.0)
if len(polygon) >= 3:
# 폴리곤을 numpy 배열로 변환
pts = np.array(polygon, np.int32)
pts = pts.reshape((-1, 1, 2))
# 신뢰도에 따른 색상 결정
if confidence >= 0.8:
color = (0, 255, 0) # 초록 (높은 신뢰도)
elif confidence >= 0.5:
color = (0, 255, 255) # 노랑 (중간 신뢰도)
elif confidence >= 0.2:
color = (0, 165, 255) # 주황 (낮은 신뢰도)
else:
color = (0, 0, 255) # 빨강 (매우 낮은 신뢰도)
# 폴리곤 테두리 그리기
cv2.polylines(debug_image, [pts], True, color, 2)
# 텍스트와 신뢰도 표시
x, y = polygon[0]
label = f"{i+1}: {text[:10]}... ({confidence:.1%})"
# 텍스트 배경 사각형
(text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(debug_image, (x, y-text_height-5), (x+text_width, y), color, -1)
# 텍스트 표시
cv2.putText(debug_image, label, (x, y-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# 범례 추가
legend_y = 30
cv2.putText(debug_image, "OCR Detection Results:", (10, legend_y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.putText(debug_image, "Green: 80%+ Yellow: 50-80% Orange: 20-50% Red: <20%", (10, legend_y+25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# 파일 저장
if file_prefix:
debug_filename = f"debug_ocr_{file_prefix}_img_{index+1}.png"
else:
debug_filename = f"debug_ocr_img_{index+1}.png"
debug_path = os.path.join(self.TEMP_IMAGE_DIR, debug_filename)
cv2.imwrite(debug_path, debug_image)
self.logger.log(f"OCR 디버깅 이미지 저장: {debug_filename}", level=logging.DEBUG)
return debug_path
except Exception as e:
self.logger.log(f"OCR 디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None
def save_mask_debug_image(self, image_path, masks, index, file_prefix=""):
"""생성된 마스크를 이미지에 오버레이하여 저장합니다"""
try:
import cv2
import numpy as np
# 원본 이미지 로드
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"마스크 디버깅용 이미지 로드 실패: {image_path}", level=logging.ERROR)
return None
if masks is None or not isinstance(masks, np.ndarray):
self.logger.log(f"유효하지 않은 마스크: {type(masks)}", level=logging.ERROR)
return None
# 이미지 복사본 생성
debug_image = image.copy()
# 마스크 영역을 빨간색으로 오버레이
mask_colored = np.zeros_like(image)
mask_colored[:, :, 2] = masks # 빨간색 채널에 마스크 적용
# 마스크 영역 반투명 오버레이 (알파 블렌딩)
alpha = 0.3
overlay_mask = masks > 0
debug_image[overlay_mask] = cv2.addWeighted(
debug_image[overlay_mask], 1-alpha,
mask_colored[overlay_mask], alpha, 0
)
# 마스크 통계 정보 표시
total_pixels = masks.shape[0] * masks.shape[1]
mask_pixels = np.sum(masks > 0)
mask_percentage = (mask_pixels / total_pixels) * 100 if total_pixels > 0 else 0
# 정보 텍스트 표시
info_text = [
f"Mask Statistics:",
f"Total pixels: {total_pixels:,}",
f"Mask pixels: {mask_pixels:,}",
f"Coverage: {mask_percentage:.1f}%"
]
y_offset = 30
for i, text in enumerate(info_text):
y_pos = y_offset + (i * 25)
# 텍스트 배경
(text_width, text_height), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
cv2.rectangle(debug_image, (10, y_pos-text_height-3), (10+text_width+10, y_pos+5), (0, 0, 0), -1)
# 텍스트
cv2.putText(debug_image, text, (15, y_pos), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# 파일 저장
if file_prefix:
debug_filename = f"debug_mask_{file_prefix}_img_{index+1}.png"
else:
debug_filename = f"debug_mask_img_{index+1}.png"
debug_path = os.path.join(self.TEMP_IMAGE_DIR, debug_filename)
cv2.imwrite(debug_path, debug_image)
self.logger.log(f"마스크 디버깅 이미지 저장: {debug_filename} (마스크 커버리지: {mask_percentage:.1f}%)", level=logging.DEBUG)
return debug_path
except Exception as e:
self.logger.log(f"마스크 디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None
async def remove_background(self, image_path, index, output_image_path="", toggle_states=None, **kwargs):
"""
배경제거: 이미지 반환 → 후처리 및 저장 → 경로 반환
"""
try:
# 1. 배경제거 수행(이미지 반환)
removed_img = self.background_removal_module.remove_background(
image_path, **kwargs
)
if removed_img is None:
self.logger.log(f"배경제거 실패: {image_path}", level=40)
return {'status': 'failed', 'path': image_path, 'error': '배경제거 실패'}
if toggle_states.get("remove_background_white", True):
img_result_white = self.background_removal_module.to_white_background(removed_img)
else:
img_result_white = removed_img
# 2. 저장 경로 생성
if output_image_path:
save_path = os.path.join(self.TEMP_IMAGE_DIR, output_image_path)
else:
save_path = os.path.join(self.TEMP_IMAGE_DIR, f"nobg_img_{index+1}.png")
# 3. 워터마크 등 후처리 및 저장
# 워터마크 등 추가하려면 아래처럼
# removed_img = self.postImageManager.add_watermark(image_data=removed_img, watermark_text=...)
final_path = self.postImageManager.save_image_to_path(img_result_white, save_path)
if final_path:
self.logger.log(f"배경제거 이미지 저장됨: {final_path}")
return {'status': 'success', 'path': final_path}
else:
self.logger.log(f"배경제거 후 저장 실패: {save_path}", level=40)
return {'status': 'failed', 'path': save_path, 'error': '저장 실패'}
except Exception as e:
self.logger.log(f"배경제거 중 오류: {e}", level=40, exc_info=True)
return {'status': 'failed', 'path': image_path, 'error': str(e)}