686 lines
33 KiB
Python
686 lines
33 KiB
Python
import os
|
|
import asyncio
|
|
import aiofiles
|
|
import logging
|
|
from urllib.parse import urlparse
|
|
import shutil
|
|
import re
|
|
import cv2
|
|
import base64
|
|
import requests
|
|
import numpy as np
|
|
from simple_lama_inpainting import SimpleLama
|
|
from PIL import Image
|
|
import numpy as np
|
|
|
|
# from modules.easyocr_module import EasyOCREngine
|
|
from modules.ocr_module import OCRModule
|
|
from modules.mask_module_for_paddle import MaskModule
|
|
from modules.mask_module_for_easy import MaskModule_easy
|
|
from modules.text_rendering_module import TextRenderingModule
|
|
from modules.postImageManager import PostImageManager
|
|
from translatepy.translators.google import GoogleTranslate
|
|
from modules.background_removal_module import BackgroundRemovalModule
|
|
|
|
class ImageProcessor3:
|
|
"""이미지 다운로드, OCR, 번역 처리를 담당하는 클래스"""
|
|
|
|
def __init__(self, logger, base_dir, toggle_states = None, unwanted_words = None):
|
|
self.logger = logger
|
|
self.base_dir = base_dir
|
|
self.toggle_states = toggle_states or {}
|
|
self.unwanted_words = unwanted_words or {}
|
|
|
|
self.font_path = os.path.abspath(os.path.join(self.base_dir, 'modules', 'fonts', 'HakgyoansimDunggeunmisoTTFB.ttf'))
|
|
|
|
self.TEMP_IMAGE_DIR = os.path.join(self.base_dir, "temp_images")
|
|
os.makedirs(self.TEMP_IMAGE_DIR, exist_ok=True)
|
|
|
|
if not os.path.exists(self.font_path):
|
|
self.logger.log(f"폰트 파일이 존재하지 않습니다: {self.font_path}", level=logging.ERROR)
|
|
else:
|
|
self.logger.log(f"폰트 파일 정상 확인: {self.font_path}", level=logging.INFO)
|
|
|
|
self.logger.log(f"self.font_path: {self.font_path}", level=logging.DEBUG)
|
|
self.logger.log(f"self.TEMP_IMAGE_DIR: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG)
|
|
self.logger.log(f"self.unwanted_words: {self.unwanted_words}", level=logging.DEBUG)
|
|
|
|
# self.ocr_module = EasyOCREngine(logger=self.logger, base_dir=self.base_dir, lang_list=['ch_sim'])
|
|
self.ocr_module = OCRModule(logger=self.logger, base_dir=self.base_dir)
|
|
self.mask_module = MaskModule(logger=self.logger, base_dir=self.base_dir)
|
|
self.text_rendering_module = TextRenderingModule(logger=self.logger, font_path=self.font_path)
|
|
self.postImageManager = PostImageManager(logger=self.logger, toggle_states=self.toggle_states, font_path=self.font_path)
|
|
self.background_removal_module = BackgroundRemovalModule(logger=self.logger)
|
|
|
|
self.simple_lama = SimpleLama()
|
|
|
|
self.gtranslate = GoogleTranslate()
|
|
|
|
def update_unwanted_words(self, texts):
|
|
self.unwanted_words = texts
|
|
self.logger.log(f"unwanted_words: {self.unwanted_words}", level=logging.DEBUG)
|
|
|
|
def __del__(self):
|
|
"""소멸자에서 리소스 정리"""
|
|
self.cleanup()
|
|
|
|
def cleanup(self):
|
|
"""리소스 정리"""
|
|
try:
|
|
|
|
# 임시 폴더 삭제
|
|
if hasattr(self, 'TEMP_IMAGE_DIR') and os.path.exists(self.TEMP_IMAGE_DIR):
|
|
# shutil.rmtree(self.TEMP_IMAGE_DIR)
|
|
self.logger.log(f"임시 폴더 삭제됨: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG)
|
|
except Exception as e:
|
|
self.logger.log(f"리소스 정리 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
|
|
|
def is_valid_image_path(self, path):
|
|
# http/https 또는 로컬 파일(.jpg, .png 등) 모두 허용
|
|
if re.match(r'^(http|https)://.*\.(jpg|jpeg|png|bmp|gif|webp|tiff?)(\?.*)?$', path, re.IGNORECASE):
|
|
return True
|
|
if os.path.isfile(path) and path.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tif', '.tiff')):
|
|
return True
|
|
return False
|
|
|
|
async def process_single_image(self, original_image_url, unwanted_words, method, toggle_states, index, delay=1.0, output_image_path=""):
|
|
"""
|
|
단일 이미지를 처리합니다 (다운로드 -> OCR -> 인페인팅)
|
|
|
|
Args:
|
|
page: Playwright 페이지 객체
|
|
original_image_url (str): 처리할 이미지 URL
|
|
index (int): 이미지 인덱스
|
|
is_localServer (bool): 로컬 서버 사용 여부
|
|
delay (float): 요청 간격 (초)
|
|
file_prefix (str): 파일명에 추가할 접두사 (예: "detail", "option")
|
|
|
|
Returns:
|
|
dict: 처리 결과를 포함한 딕셔너리
|
|
- status: 'inpainted', 'original', 'exclude', 'error' 중 하나
|
|
- path: 처리된 이미지 파일 경로 또는 원본 이미지 파일 경로
|
|
- error: 오류 메시지 (status가 'error'인 경우에만 포함)
|
|
"""
|
|
local_image_path = original_image_url
|
|
delay = delay / 3 # 봇 탐지 회피를 위해 요청 간격 조절 - 자체번역으로 간격 최소화
|
|
|
|
self.toggle_states = toggle_states
|
|
self.unwanted_words = unwanted_words
|
|
|
|
self.logger.log(f"unwanted_words: {self.unwanted_words}", level=logging.DEBUG)
|
|
try:
|
|
# 0. 이미지 URL 유효성 체크 (http/https & 이미지 확장자)
|
|
if not original_image_url or not isinstance(original_image_url, str):
|
|
self.logger.log(f"이미지 {index+1} 처리 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING)
|
|
return {'status': 'failed', 'path': original_image_url, 'error': '이미지 URL 없음 또는 타입 오류'}
|
|
|
|
if not self.is_valid_image_path(original_image_url):
|
|
self.logger.log(f"이미지 {index+1} 처리 중단: 유효하지 않은 이미지 주소 - {original_image_url}", level=logging.WARNING)
|
|
return {'status': 'failed', 'path': original_image_url, 'error': '유효하지 않은 이미지 주소'}
|
|
|
|
# 요청 간격 조절 (봇 탐지 회피)
|
|
if delay > 0:
|
|
await asyncio.sleep(delay)
|
|
|
|
# OCR 권한 상태 로그
|
|
ocr_enabled = toggle_states.get('ocr', False)
|
|
processing_mode = "OCR+인페인팅 모드" if ocr_enabled else "전체 번역 모드"
|
|
self.logger.log(f"이미지 {index+1} 처리 시작: {original_image_url} - {processing_mode}", level=logging.DEBUG)
|
|
|
|
# # 1. 이미지 다운로드
|
|
# local_image_path = await self.download_image(page, original_image_url, index, file_prefix)
|
|
# if not local_image_path:
|
|
# self.logger.log(f"이미지 {index+1} 다운로드 실패, 원본 URL 반환", level=logging.WARNING)
|
|
# return {'status': 'failed', 'path': original_image_url, 'error': '다운로드 실패'}
|
|
|
|
# 2. OCR 텍스트 감지
|
|
ocr_results = self.ocr_module.detect_text(local_image_path)
|
|
self.logger.log(f"ocr_results: {ocr_results}", level=logging.DEBUG)
|
|
|
|
filter_ocr_results = self.filter_ocr_results(ocr_results)
|
|
self.logger.log(f"filter_ocr_results: {filter_ocr_results}", level=logging.DEBUG)
|
|
|
|
# 3. 중국어 텍스트 없는 경우 원본 이미지 반환
|
|
if not self.ocr_module.filter_chinese_text(filter_ocr_results):
|
|
self.logger.log(f"이미지 {index+1} 중국어 텍스트 없음, 원본 이미지 반환", level=logging.DEBUG)
|
|
return {'status': 'original', 'path': local_image_path}
|
|
|
|
# 4. 텍스트 번역
|
|
translated_texts = self.google_translate_texts_with_translatepy(filter_ocr_results)
|
|
self.logger.log(f"translated_texts: {translated_texts}", level=logging.DEBUG)
|
|
|
|
# 5. OCR 권한에 따른 텍스트 필터링
|
|
if ocr_enabled:
|
|
filtered_translated_texts = self.process_translated_texts(translated_texts, local_image_path, index, unwanted_words)
|
|
if not filtered_translated_texts:
|
|
self.logger.log(f"이미지 {index+1} 제외됨", level=logging.DEBUG)
|
|
return {'status': 'exclude', 'path': local_image_path}
|
|
else:
|
|
self.logger.log(f"이미지 {index+1} 치환됨", level=logging.DEBUG)
|
|
else:
|
|
# OCR 권한이 없으면 번역된 텍스트를 그대로 사용
|
|
filtered_translated_texts = translated_texts
|
|
self.logger.log(f"이미지 {index+1} OCR 권한 없음, 전체 번역 모드", level=logging.DEBUG)
|
|
|
|
# 마스크 생성 (basic 방식만 사용)
|
|
masks = self.mask_module.create_masks(
|
|
image_path=local_image_path, ocr_results=filter_ocr_results, mask_option="basic"
|
|
)
|
|
self.logger.log(f"마스크 생성 완료", level=logging.DEBUG)
|
|
|
|
# 디버깅 이미지 저장 (OCR 박스 + 마스크 시각화)
|
|
# self.save_debug_images(local_image_path, filter_ocr_results, masks, index, file_prefix)
|
|
|
|
# 인페인팅
|
|
if method == 'cv':
|
|
inpainted_image = self.opencv_inpaint(local_image_path, masks)
|
|
elif method == 'lama':
|
|
inpainted_image = self.lama_inpaint(local_image_path, masks)
|
|
|
|
# 인페인팅 실패 시 원본 이미지 사용
|
|
if inpainted_image is None:
|
|
self.logger.log(f"인페인팅 실패, 원본 이미지 사용", level=logging.WARNING)
|
|
inpainted_image = cv2.imread(local_image_path)
|
|
if inpainted_image is None:
|
|
self.logger.log(f"원본 이미지 로드 실패: {local_image_path}", level=logging.ERROR)
|
|
return {'status': 'failed', 'path': local_image_path, 'error': '원본 이미지 로드 실패'}
|
|
else:
|
|
self.logger.log(f"인페인팅 완료", level=logging.DEBUG)
|
|
|
|
# 텍스트 렌더링
|
|
text_rendered_image = self.text_rendering_module.render_text(
|
|
inpainted_image, filter_ocr_results, filtered_translated_texts)
|
|
self.logger.log(f"텍스트 렌더링 완료", level=logging.DEBUG)
|
|
|
|
# 결과 저장
|
|
translated_img_path = await self.postProcess_and_save_image(local_image_path, text_rendered_image, index, output_image_path, toggle_states)
|
|
self.logger.log(f"이미지 {index+1} 번역 완료: {translated_img_path}", level=logging.DEBUG)
|
|
return {'status': 'inpainted', 'path': translated_img_path}
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"이미지 {index+1} 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
|
return {'status': 'failed', 'path': local_image_path or original_image_url, 'error': str(e)}
|
|
|
|
# OCR 결과 필터링: 중국어 텍스트만 필터링
|
|
def filter_ocr_results(self, ocr_results):
|
|
import re
|
|
|
|
# 중국어 문자 범위 정규식 (한자)
|
|
chinese_pattern = re.compile(r'[\u4e00-\u9fff]+')
|
|
|
|
filtered_results = []
|
|
for r in ocr_results:
|
|
text = r.get('text', '').strip()
|
|
polygon = r.get('polygon', [])
|
|
confidence = r.get('confidence', 0.0)
|
|
|
|
# 텍스트가 비어있거나 polygon이 3점 미만이면 제외
|
|
if not text or not polygon or len(polygon) < 3:
|
|
self.logger.log(f"[필터링] 제외 (텍스트/폴리곤): '{text}'", level=logging.DEBUG)
|
|
continue
|
|
|
|
# 신뢰도 20% 미만이면 제외
|
|
if confidence < 0.05:
|
|
self.logger.log(f"[필터링] 제외 (신뢰도 {confidence:.1%}): '{text}'", level=logging.DEBUG)
|
|
continue
|
|
|
|
# 중국어 문자가 포함된 텍스트만 필터링
|
|
if chinese_pattern.search(text):
|
|
filtered_results.append(r)
|
|
self.logger.log(f"[필터링] 포함 (신뢰도 {confidence:.1%}): '{text}'", level=logging.DEBUG)
|
|
else:
|
|
self.logger.log(f"[필터링] 제외 (중국어 없음): '{text}'", level=logging.DEBUG)
|
|
|
|
self.logger.log(f"필터링 결과: {len(filtered_results)}/{len(ocr_results)}개 (신뢰도 + & 중국어)", level=logging.INFO)
|
|
return filtered_results
|
|
|
|
async def postProcess_and_save_image(self, local_image_path, text_rendered_image, index, output_image_path="", toggle_states=None):
|
|
"""로컬 서버 URL을 사용해 이미지를 번역하고 로컬에 저장합니다"""
|
|
try:
|
|
# 파일명에 접두사 포함
|
|
if output_image_path:
|
|
img_path = os.path.join(self.TEMP_IMAGE_DIR, output_image_path)
|
|
else:
|
|
img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_img_{index+1}.png")
|
|
|
|
watermarked_image_data = self.postImageManager.add_watermark(image_data=text_rendered_image, watermark_text=toggle_states.get("watermark_text", "이미지 저작권 보유"))
|
|
final_image_path = self.postImageManager.save_image_to_path(watermarked_image_data, img_path)
|
|
|
|
return final_image_path
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"이미지 {index+1} 번역 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
|
return local_image_path
|
|
|
|
|
|
async def download_image(self, page, image_url, index, file_prefix=""):
|
|
"""Playwright를 사용해 이미지를 다운로드합니다"""
|
|
# 로컬 파일 경로면 바로 반환
|
|
if os.path.isfile(image_url):
|
|
self.logger.log(f"로컬 파일 경로 감지, 다운로드 생략: {image_url}", level=logging.DEBUG)
|
|
return image_url
|
|
|
|
# 로컬 파일 경로가 아니면 다운로드 시도
|
|
try:
|
|
# "https://assets.alicdn.com"으로 시작하는 URL은 건너뛰기
|
|
if image_url.startswith("https://assets.alicdn.com"):
|
|
self.logger.log(f"다운로드 제외 URL: {image_url}", level=logging.DEBUG)
|
|
return None
|
|
|
|
# URL에서 파일명 추출 및 접두사 포함
|
|
parsed_url = urlparse(image_url)
|
|
base_filename = f"image_{index:03d}_{os.path.basename(parsed_url.path)}"
|
|
if not base_filename.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')):
|
|
base_filename += '.jpg'
|
|
|
|
# 접두사가 있으면 파일명에 포함
|
|
if file_prefix:
|
|
filename = f"{file_prefix}_{base_filename}"
|
|
else:
|
|
filename = base_filename
|
|
|
|
local_path = os.path.join(self.TEMP_IMAGE_DIR, filename)
|
|
|
|
# Playwright로 이미지 다운로드
|
|
response = await page.request.get(image_url)
|
|
if response.status == 200:
|
|
image_data = await response.body()
|
|
|
|
# 이미지 데이터 유효성 검사
|
|
if self.is_valid_image_data(image_data):
|
|
async with aiofiles.open(local_path, 'wb') as f:
|
|
await f.write(image_data)
|
|
|
|
self.logger.log(f"이미지 다운로드 완료: {filename}", level=logging.DEBUG)
|
|
return local_path
|
|
else:
|
|
self.logger.log(f"유효하지 않은 이미지 데이터: {image_url}", level=logging.WARNING)
|
|
return None
|
|
else:
|
|
self.logger.log(f"이미지 다운로드 실패 (HTTP {response.status}): {image_url}", level=logging.ERROR, exc_info=True)
|
|
return None
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"이미지 다운로드 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
|
return None
|
|
|
|
|
|
def is_valid_image_data(self, image_data):
|
|
"""이미지 데이터가 유효한지 확인합니다"""
|
|
if not image_data or len(image_data) < 100:
|
|
return False
|
|
|
|
# 일반적인 이미지 파일 시그니처 확인
|
|
image_signatures = [
|
|
b'\xFF\xD8\xFF', # JPEG
|
|
b'\x89PNG\r\n\x1a\n', # PNG
|
|
b'GIF87a', # GIF87a
|
|
b'GIF89a', # GIF89a
|
|
b'RIFF', # WebP (RIFF 컨테이너)
|
|
]
|
|
|
|
return any(image_data.startswith(sig) for sig in image_signatures)
|
|
|
|
|
|
def process_translated_texts(self, translated_texts, local_image_path, index, unwanted_words):
|
|
"""
|
|
번역된 단어 리스트(translated_texts)에서 unwanted_words의 원본값이
|
|
앞이나 뒤에 포함되면 치환값으로 바꿉니다.
|
|
치환값이 '이미지삭제'라면 None 반환(이미지 제외)
|
|
"""
|
|
new_texts = []
|
|
for i, text in enumerate(translated_texts):
|
|
self.logger.log(f"[치환 처리 {i+1}] 원본 텍스트: '{text}'", level=logging.DEBUG)
|
|
|
|
# 텍스트를 빈칸으로 분리
|
|
words = text.split()
|
|
self.logger.log(f"[치환 처리 {i+1}] 분리된 단어: {words}", level=logging.DEBUG)
|
|
|
|
processed_words = []
|
|
text_replaced = False
|
|
|
|
for word_idx, word in enumerate(words):
|
|
word_replaced = False
|
|
|
|
# unwanted_words와 매칭 확인
|
|
for origin, replace in unwanted_words.items():
|
|
if word.startswith(origin) or word.endswith(origin) or word == origin:
|
|
self.logger.log(f"[치환 처리 {i+1}] 단어 '{word}' -> '{replace}' (원본: '{origin}')", level=logging.INFO)
|
|
|
|
if replace == "이미지삭제":
|
|
self.logger.log(f"이미지 {index+1} 제외됨: {local_image_path}", level=logging.INFO)
|
|
return None
|
|
|
|
# 단어 치환
|
|
if word == origin:
|
|
new_word = replace
|
|
elif word.startswith(origin):
|
|
new_word = replace + word[len(origin):]
|
|
elif word.endswith(origin):
|
|
new_word = word[:-len(origin)] + replace
|
|
|
|
processed_words.append(new_word)
|
|
word_replaced = True
|
|
text_replaced = True
|
|
self.logger.log(f"[치환 처리 {i+1}] 단어 치환 완료: '{word}' -> '{new_word}'", level=logging.DEBUG)
|
|
break
|
|
|
|
if not word_replaced:
|
|
processed_words.append(word)
|
|
|
|
# 처리된 단어들을 다시 합치기
|
|
final_text = ' '.join(processed_words)
|
|
new_texts.append(final_text)
|
|
|
|
if text_replaced:
|
|
self.logger.log(f"[치환 처리 {i+1}] 최종 결과: '{text}' -> '{final_text}'", level=logging.INFO)
|
|
else:
|
|
self.logger.log(f"[치환 처리 {i+1}] 변경 없음: '{text}'", level=logging.DEBUG)
|
|
|
|
self.logger.log(f"전체 치환 결과: {len(new_texts)}개 텍스트 처리 완료", level=logging.INFO)
|
|
for i, (original, processed) in enumerate(zip(translated_texts, new_texts)):
|
|
if original != processed:
|
|
self.logger.log(f"[최종 치환 {i+1}] '{original}' -> '{processed}'", level=logging.INFO)
|
|
|
|
return new_texts
|
|
|
|
def split_and_translate(self, text, translator, src='zh-cn', tgt='ko'):
|
|
# 여러 구분자 기준 분리 (공백도 포함)
|
|
parts = re.split(r'\s*[/|·,;、]+\s*', text)
|
|
result = []
|
|
for part in parts:
|
|
part = part.strip()
|
|
if not part:
|
|
continue
|
|
try:
|
|
t = translator.translate(part, "Korean")
|
|
# translatepy에서 t.text가 번역 결과 (에러시 fallback)
|
|
result.append(getattr(t, "text", getattr(t, "result", part)))
|
|
except Exception as e:
|
|
result.append(part)
|
|
return " / ".join(result)
|
|
|
|
def google_translate_texts_with_translatepy(self, ocr_results):
|
|
"""
|
|
ocr_results에서 추출한 텍스트 리스트를 구글 번역기로 번역하여 반환합니다.
|
|
각 텍스트에 split_and_translate를 적용!
|
|
"""
|
|
texts = [result['text'] for result in ocr_results if result['text'].strip()]
|
|
|
|
if not texts:
|
|
return []
|
|
try:
|
|
# 부분별로 나눠서 번역하고 다시 합침
|
|
translations = [self.split_and_translate(text, self.gtranslate) for text in texts]
|
|
return translations
|
|
except Exception as e:
|
|
self.logger.log(f"구글 번역 실패: {e}", level=logging.ERROR, exc_info=True)
|
|
return texts
|
|
|
|
def lama_inpaint(self, image_path, mask):
|
|
"""
|
|
simple_lama_inpainting의 SimpleLama를 사용하여 이미지에서 마스크 영역을 채웁니다.
|
|
Args:
|
|
image_path (str): 원본 이미지 경로
|
|
mask (np.ndarray 또는 str): 2D 마스크 이미지 (0/255, shape=(H, W)) 또는 마스크 파일 경로
|
|
Returns:
|
|
inpainted_image (np.ndarray): 인페인팅된 이미지 (OpenCV BGR)
|
|
"""
|
|
|
|
# 이미지 로드
|
|
image = Image.open(image_path)
|
|
|
|
# 마스크 로드/변환
|
|
if isinstance(mask, str):
|
|
mask_img = Image.open(mask).convert('L')
|
|
elif isinstance(mask, np.ndarray):
|
|
if mask.ndim == 3:
|
|
mask = mask[:, :, 0] # 첫 채널만 사용
|
|
mask_img = Image.fromarray(mask).convert('L')
|
|
else:
|
|
raise ValueError('mask는 파일 경로나 np.ndarray여야 합니다.')
|
|
|
|
# 인페인팅 수행
|
|
result = self.simple_lama(image, mask_img)
|
|
|
|
# PIL.Image -> np.ndarray (OpenCV BGR)
|
|
result_np = np.array(result)
|
|
if result_np.ndim == 3 and result_np.shape[2] == 3:
|
|
# RGB -> BGR
|
|
result_np = result_np[:, :, ::-1]
|
|
return result_np
|
|
|
|
def opencv_inpaint(self, image_path, mask, method='telea', radius=3):
|
|
"""
|
|
OpenCV로 인페인팅을 수행합니다.
|
|
Args:
|
|
image_path (str): 원본 이미지 경로
|
|
mask (np.ndarray): 2D 마스크 이미지 (0/255, shape=(H, W))
|
|
method (str): 'telea' 또는 'ns'
|
|
radius (int): 인페인팅 반경
|
|
Returns:
|
|
inpainted_image (np.ndarray): 인페인팅된 이미지
|
|
"""
|
|
import cv2
|
|
import numpy as np
|
|
image = cv2.imread(image_path)
|
|
if image is None:
|
|
self.logger.log(f"이미지 로드 실패: {image_path}", level=logging.ERROR)
|
|
return None
|
|
|
|
# 마스크가 2D 배열인지 확인
|
|
if mask is None:
|
|
self.logger.log(f"마스크가 None입니다", level=logging.ERROR)
|
|
return None
|
|
|
|
if not isinstance(mask, np.ndarray) or mask.ndim != 2:
|
|
self.logger.log(f"마스크가 2D numpy 배열이 아닙니다: type={type(mask)}, shape={getattr(mask, 'shape', 'N/A')}", level=logging.ERROR)
|
|
return None
|
|
|
|
# 마스크 크기가 이미지와 일치하는지 확인
|
|
if mask.shape != image.shape[:2]:
|
|
self.logger.log(f"마스크 크기가 이미지와 다릅니다: mask={mask.shape}, image={image.shape[:2]}", level=logging.ERROR)
|
|
return None
|
|
|
|
inpaint_method = cv2.INPAINT_TELEA if method == 'telea' else cv2.INPAINT_NS
|
|
inpainted = cv2.inpaint(image, mask, radius, inpaint_method)
|
|
return inpainted
|
|
|
|
def save_debug_images(self, local_image_path, ocr_results, masks, index, file_prefix=""):
|
|
"""디버깅용 OCR 박스와 마스크 이미지를 저장합니다"""
|
|
try:
|
|
# OCR 박스 시각화 이미지 저장
|
|
ocr_debug_path = self.save_ocr_debug_image(local_image_path, ocr_results, index, file_prefix)
|
|
|
|
# 마스크 시각화 이미지 저장
|
|
mask_debug_path = self.save_mask_debug_image(local_image_path, masks, index, file_prefix)
|
|
|
|
self.logger.log(f"디버깅 이미지 저장 완료: OCR={ocr_debug_path}, Mask={mask_debug_path}", level=logging.INFO)
|
|
return ocr_debug_path, mask_debug_path
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
|
return None, None
|
|
|
|
def save_ocr_debug_image(self, image_path, ocr_results, index, file_prefix=""):
|
|
"""OCR 감지된 박스들을 이미지에 표시하여 저장합니다"""
|
|
try:
|
|
import cv2
|
|
import numpy as np
|
|
|
|
# 원본 이미지 로드
|
|
image = cv2.imread(image_path)
|
|
if image is None:
|
|
self.logger.log(f"OCR 디버깅용 이미지 로드 실패: {image_path}", level=logging.ERROR)
|
|
return None
|
|
|
|
# 이미지 복사본 생성
|
|
debug_image = image.copy()
|
|
|
|
# OCR 결과별로 박스 그리기
|
|
for i, result in enumerate(ocr_results):
|
|
polygon = result.get('polygon', [])
|
|
text = result.get('text', '')
|
|
confidence = result.get('confidence', 0.0)
|
|
|
|
if len(polygon) >= 3:
|
|
# 폴리곤을 numpy 배열로 변환
|
|
pts = np.array(polygon, np.int32)
|
|
pts = pts.reshape((-1, 1, 2))
|
|
|
|
# 신뢰도에 따른 색상 결정
|
|
if confidence >= 0.8:
|
|
color = (0, 255, 0) # 초록 (높은 신뢰도)
|
|
elif confidence >= 0.5:
|
|
color = (0, 255, 255) # 노랑 (중간 신뢰도)
|
|
elif confidence >= 0.2:
|
|
color = (0, 165, 255) # 주황 (낮은 신뢰도)
|
|
else:
|
|
color = (0, 0, 255) # 빨강 (매우 낮은 신뢰도)
|
|
|
|
# 폴리곤 테두리 그리기
|
|
cv2.polylines(debug_image, [pts], True, color, 2)
|
|
|
|
# 텍스트와 신뢰도 표시
|
|
x, y = polygon[0]
|
|
label = f"{i+1}: {text[:10]}... ({confidence:.1%})"
|
|
|
|
# 텍스트 배경 사각형
|
|
(text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
|
|
cv2.rectangle(debug_image, (x, y-text_height-5), (x+text_width, y), color, -1)
|
|
|
|
# 텍스트 표시
|
|
cv2.putText(debug_image, label, (x, y-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
|
|
|
# 범례 추가
|
|
legend_y = 30
|
|
cv2.putText(debug_image, "OCR Detection Results:", (10, legend_y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
|
cv2.putText(debug_image, "Green: 80%+ Yellow: 50-80% Orange: 20-50% Red: <20%", (10, legend_y+25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
|
|
|
# 파일 저장
|
|
if file_prefix:
|
|
debug_filename = f"debug_ocr_{file_prefix}_img_{index+1}.png"
|
|
else:
|
|
debug_filename = f"debug_ocr_img_{index+1}.png"
|
|
|
|
debug_path = os.path.join(self.TEMP_IMAGE_DIR, debug_filename)
|
|
cv2.imwrite(debug_path, debug_image)
|
|
|
|
self.logger.log(f"OCR 디버깅 이미지 저장: {debug_filename}", level=logging.DEBUG)
|
|
return debug_path
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"OCR 디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
|
return None
|
|
|
|
def save_mask_debug_image(self, image_path, masks, index, file_prefix=""):
|
|
"""생성된 마스크를 이미지에 오버레이하여 저장합니다"""
|
|
try:
|
|
import cv2
|
|
import numpy as np
|
|
|
|
# 원본 이미지 로드
|
|
image = cv2.imread(image_path)
|
|
if image is None:
|
|
self.logger.log(f"마스크 디버깅용 이미지 로드 실패: {image_path}", level=logging.ERROR)
|
|
return None
|
|
|
|
if masks is None or not isinstance(masks, np.ndarray):
|
|
self.logger.log(f"유효하지 않은 마스크: {type(masks)}", level=logging.ERROR)
|
|
return None
|
|
|
|
# 이미지 복사본 생성
|
|
debug_image = image.copy()
|
|
|
|
# 마스크 영역을 빨간색으로 오버레이
|
|
mask_colored = np.zeros_like(image)
|
|
mask_colored[:, :, 2] = masks # 빨간색 채널에 마스크 적용
|
|
|
|
# 마스크 영역 반투명 오버레이 (알파 블렌딩)
|
|
alpha = 0.3
|
|
overlay_mask = masks > 0
|
|
debug_image[overlay_mask] = cv2.addWeighted(
|
|
debug_image[overlay_mask], 1-alpha,
|
|
mask_colored[overlay_mask], alpha, 0
|
|
)
|
|
|
|
# 마스크 통계 정보 표시
|
|
total_pixels = masks.shape[0] * masks.shape[1]
|
|
mask_pixels = np.sum(masks > 0)
|
|
mask_percentage = (mask_pixels / total_pixels) * 100 if total_pixels > 0 else 0
|
|
|
|
# 정보 텍스트 표시
|
|
info_text = [
|
|
f"Mask Statistics:",
|
|
f"Total pixels: {total_pixels:,}",
|
|
f"Mask pixels: {mask_pixels:,}",
|
|
f"Coverage: {mask_percentage:.1f}%"
|
|
]
|
|
|
|
y_offset = 30
|
|
for i, text in enumerate(info_text):
|
|
y_pos = y_offset + (i * 25)
|
|
# 텍스트 배경
|
|
(text_width, text_height), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
|
|
cv2.rectangle(debug_image, (10, y_pos-text_height-3), (10+text_width+10, y_pos+5), (0, 0, 0), -1)
|
|
# 텍스트
|
|
cv2.putText(debug_image, text, (15, y_pos), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
|
|
|
|
# 파일 저장
|
|
if file_prefix:
|
|
debug_filename = f"debug_mask_{file_prefix}_img_{index+1}.png"
|
|
else:
|
|
debug_filename = f"debug_mask_img_{index+1}.png"
|
|
|
|
debug_path = os.path.join(self.TEMP_IMAGE_DIR, debug_filename)
|
|
cv2.imwrite(debug_path, debug_image)
|
|
|
|
self.logger.log(f"마스크 디버깅 이미지 저장: {debug_filename} (마스크 커버리지: {mask_percentage:.1f}%)", level=logging.DEBUG)
|
|
return debug_path
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"마스크 디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
|
return None
|
|
|
|
async def remove_background(self, image_path, index, output_image_path="", toggle_states=None, **kwargs):
|
|
"""
|
|
배경제거: 이미지 반환 → 후처리 및 저장 → 경로 반환
|
|
"""
|
|
try:
|
|
# 1. 배경제거 수행(이미지 반환)
|
|
removed_img = self.background_removal_module.remove_background(
|
|
image_path, **kwargs
|
|
)
|
|
if removed_img is None:
|
|
self.logger.log(f"배경제거 실패: {image_path}", level=40)
|
|
return {'status': 'failed', 'path': image_path, 'error': '배경제거 실패'}
|
|
|
|
if toggle_states.get("remove_background_white", True):
|
|
img_result_white = self.background_removal_module.to_white_background(removed_img)
|
|
else:
|
|
img_result_white = removed_img
|
|
|
|
# 2. 저장 경로 생성
|
|
if output_image_path:
|
|
save_path = os.path.join(self.TEMP_IMAGE_DIR, output_image_path)
|
|
else:
|
|
save_path = os.path.join(self.TEMP_IMAGE_DIR, f"nobg_img_{index+1}.png")
|
|
|
|
# 3. 워터마크 등 후처리 및 저장
|
|
# 워터마크 등 추가하려면 아래처럼
|
|
# removed_img = self.postImageManager.add_watermark(image_data=removed_img, watermark_text=...)
|
|
|
|
|
|
final_path = self.postImageManager.save_image_to_path(img_result_white, save_path)
|
|
|
|
if final_path:
|
|
self.logger.log(f"배경제거 이미지 저장됨: {final_path}")
|
|
return {'status': 'success', 'path': final_path}
|
|
else:
|
|
self.logger.log(f"배경제거 후 저장 실패: {save_path}", level=40)
|
|
return {'status': 'failed', 'path': save_path, 'error': '저장 실패'}
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"배경제거 중 오류: {e}", level=40, exc_info=True)
|
|
return {'status': 'failed', 'path': image_path, 'error': str(e)}
|