ImageProcessor_MainServer/test/image_processor3.py

1104 lines
55 KiB
Python

import os
import asyncio
import aiofiles
import logging
from urllib.parse import urlparse
import sys
import re
import cv2
# OpenCV 의 내부 최적화(메모리 풀) 사용을 비활성화하여 파편화 위험을 낮춤
cv2.setUseOptimized(False)
import random
import gc
import numpy as np
from PIL import Image
# from src.modules.easyocr_module import EasyOCREngine
from ocr_module import OCRModule
from mask_module_for_paddle import MaskModule
# from src.modules.mask_module_for_easy import MaskModule_easy
from text_rendering_module import TextRenderingModule
from postImageManager import PostImageManager
# from translatepy.translators.google import GoogleTranslate
# from src.modules.background_removal_module import BackgroundRemovalModule
# from src.modules.background_removal_module_pp import PPMattingBackgroundRemovalModule # (변경)
from src.modules.request_inpaint import Request_AI_Server
class ImageProcessor3:
"""이미지 다운로드, OCR, 번역 처리를 담당하는 클래스"""
def __init__(self, logger, page, toggle_states, unwanted_words, authenticated_by_admin, base_dir, papago_translator):
self.logger = logger
self.page = page
self.base_dir = base_dir
self.toggle_states = toggle_states
self.unwanted_texts = unwanted_words
self.authenticated_by_admin = authenticated_by_admin
self.papago_translator = papago_translator
self.inpaint_method = 'cv'
try:
self.request_inpainting_server_url = self.toggle_states.get("request_inpainting_server_url", None)
if self.request_inpainting_server_url is None:
self.logger.log(f"request_inpainting_server_url 설정되지 않았습니다.", level=logging.ERROR)
self.inpaint_method = 'cv'
else:
self.inpaint_method = 'request'
self.font_path = self.toggle_states.get('image_font_path', os.path.join(self.base_dir, "HakgyoansimDunggeunmisoTTFB.ttf"))
self.TEMP_IMAGE_DIR = self.toggle_states.get('TEMP_IMAGE_DIR', "")
self.logger.log(f"self.font_path: {self.font_path}", level=logging.DEBUG)
self.logger.log(f"self.TEMP_IMAGE_DIR: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG)
self.logger.log(f"self.unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG)
# ----------------------------- 메모리 파편화 완화 -----------------------------
# Pillow 가 거대 이미지를 열 때 과도한 메모리를 점유하지 않도록 최대 픽셀 수 제한
max_px = self.toggle_states.get("max_image_pixels", 20_000_000) # 약 20MP, 필요 시 조정 (20MP = 4500x4500, 50MP=8000x6000)
Image.MAX_IMAGE_PIXELS = max_px
self.logger.log(f"Image.MAX_IMAGE_PIXELS set to {max_px}", level=logging.DEBUG)
# -----------------------------------------------------------------------------
# self.ocr_module = EasyOCREngine(logger=self.logger, base_dir=self.base_dir, lang_list=['ch_sim'])
self.ocr_module = OCRModule(logger=self.logger, base_dir=self.base_dir)
self.mask_module = MaskModule(logger=self.logger, base_dir=self.base_dir)
self.text_rendering_module = TextRenderingModule(logger=self.logger, font_path=self.font_path)
self.postImageManager = PostImageManager(logger=self.logger, toggle_states=self.toggle_states)
# self.background_removal_module = BackgroundRemovalModule(logger=self.logger, default_model="birefnet-general")
# self.background_removal_module = PPMattingBackgroundRemovalModule(logger=self.logger, default_model="ppmatting-hrnet-1x")
# self.background_removal_module = PPMattingBackgroundRemovalModule(logger=self.logger)
self.request_rembg_server_url = self.toggle_states.get("request_rembg_server_url", None)
# self.request_rembg_server_url = self.toggle_states.get("request_rembg_server_url_local", None)
if self.is_frozen():
self.request_rembg_server_url = self.toggle_states.get("request_rembg_server_url_local", None)
self.request_ai_server = Request_AI_Server(logger=self.logger, inpaint_server_url=self.request_inpainting_server_url, rembg_server_url=self.request_rembg_server_url)
# self.gtranslate = GoogleTranslate()
except Exception as e:
self.logger.log(f"ImageProcessor3 초기화 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
def update_page(self, page1, toggle_states):
self.toggle_states = toggle_states
self.page = page1
self.postImageManager.update_toggle_states(self.toggle_states)
self.logger.log(f"page객체 및 toggle_states 업데이트", level=logging.DEBUG)
def update_unwanted_texts(self, texts):
self.unwanted_texts = texts
self.logger.log(f"unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG)
def __del__(self):
"""소멸자에서 리소스 정리"""
self.cleanup()
def cleanup(self):
"""리소스 정리"""
try:
# 임시 폴더 삭제
if hasattr(self, 'TEMP_IMAGE_DIR') and os.path.exists(self.TEMP_IMAGE_DIR):
# shutil.rmtree(self.TEMP_IMAGE_DIR)
self.logger.log(f"임시 폴더 삭제됨: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"리소스 정리 중 오류: {e}", level=logging.ERROR, exc_info=True)
def reset_ocr_module(self):
"""OCR 모듈을 명시적으로 삭제하고 재생성."""
try:
self.logger.log("🔄 OCR 모듈 재초기화 시작", level=logging.INFO)
# 기존 모듈 참조 해제
del self.ocr_module
gc.collect()
# paddle.device.cuda.empty_cache() # GPU 사용 시
# 새 인스턴스 생성
self.ocr_module = OCRModule(logger=self.logger, base_dir=self.base_dir)
self._ocr_call_count = 0
self.logger.log("✅ OCR 모듈 재초기화 완료", level=logging.INFO)
return True
except Exception as e:
self.logger.log(f"❌ OCR 모듈 재초기화 실패: {e}", level=logging.ERROR, exc_info=True)
return False
def is_valid_image_path(self, path):
# http/https 또는 로컬 파일(.jpg, .png 등) 모두 허용
if re.match(r'^(http|https)://.*\.(jpg|jpeg|png|bmp|gif|webp|tiff?)(\?.*)?$', path, re.IGNORECASE):
return True
if os.path.isfile(path) and path.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tif', '.tiff')):
return True
return False
async def process_single_image(self, page, original_image_url, index, delay=1.0, file_prefix=""):
"""
단일 이미지를 처리합니다 (다운로드 -> OCR -> 인페인팅)
Args:
page: Playwright 페이지 객체
original_image_url (str): 처리할 이미지 URL
index (int): 이미지 인덱스
is_localServer (bool): 로컬 서버 사용 여부
delay (float): 요청 간격 (초)
file_prefix (str): 파일명에 추가할 접두사 (예: "detail", "option")
Returns:
dict: 처리 결과를 포함한 딕셔너리
- status: 'inpainted', 'original', 'exclude', 'error' 중 하나
- path: 처리된 이미지 파일 경로 또는 원본 이미지 파일 경로
- error: 오류 메시지 (status가 'error'인 경우에만 포함)
"""
local_image_path = None
delay = random.uniform(0.5, 1.5)
delay = delay / 3 # 봇 탐지 회피를 위해 요청 간격 조절 - 자체번역으로 간격 최소화
self.logger.log(f"unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG)
try:
# 0. 이미지 URL 유효성 체크 (http/https & 이미지 확장자)
if not original_image_url or not isinstance(original_image_url, str):
self.logger.log(f"이미지 {index+1} 처리 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING)
return {'status': 'failed', 'path': original_image_url, 'error': '이미지 URL 없음 또는 타입 오류'}
if not self.is_valid_image_path(original_image_url):
self.logger.log(f"이미지 {index+1} 처리 중단: 유효하지 않은 이미지 주소 - {original_image_url}", level=logging.WARNING)
return {'status': 'failed', 'path': original_image_url, 'error': '유효하지 않은 이미지 주소'}
# 요청 간격 조절 (봇 탐지 회피)
if delay > 0:
await asyncio.sleep(delay)
# OCR 권한 상태 로그
ocr_enabled = self.toggle_states.get('ocr', False)
processing_mode = "OCR+인페인팅 모드" if ocr_enabled else "전체 번역 모드"
self.logger.log(f"이미지 {index+1} 처리 시작: {original_image_url} - {processing_mode}", level=logging.DEBUG)
# 1. 이미지 다운로드
local_image_path = await self.download_image(page, original_image_url, index, file_prefix)
if not local_image_path:
self.logger.log(f"이미지 {index+1} 다운로드 실패, 원본 URL 반환", level=logging.WARNING)
return {'status': 'failed', 'path': original_image_url, 'error': '다운로드 실패'}
# 1-A. 고해상도 입력 다운스케일 (메모리 절약)
# toggle_states 에 max_image_resolution(예: 1200) 값이 있으면 사용, 없으면 1200px 기준
max_dim = self.toggle_states.get('max_image_resolution', 1200)
local_image_path = self.downscale_image_if_large(local_image_path, max_dim=max_dim)
self.logger.log(f"이미지 {index+1} 로컬 저장위치(스케일 처리후): {local_image_path}", level=logging.DEBUG)
# 2. OCR 텍스트 감지
ocr_results = self.ocr_module.detect_text(local_image_path)
self.logger.log(f"ocr_results: {ocr_results}", level=logging.DEBUG)
filter_ocr_results = self.filter_ocr_results(ocr_results)
self.logger.log(f"filter_ocr_results: {filter_ocr_results}", level=logging.DEBUG)
ocr_count = len(filter_ocr_results)
# 3. 중국어 텍스트가 없는 경우 원본 이미지 반환으로 번역 패스
if not self.ocr_module.filter_chinese_text(filter_ocr_results):
self.logger.log(f"이미지 {index+1} 중국어 텍스트 없음, 원본 이미지 반환", level=logging.DEBUG)
return {'status': 'original', 'path': local_image_path}
# 4. 한글 텍스트가 존재하는 경우 원본 이미지 반환으로 번역 패스
if self.ocr_module.filter_korean_text(filter_ocr_results):
self.logger.log(f"이미지 {index+1} 한글 텍스트 존재, 원본 이미지 반환", level=logging.DEBUG)
return {'status': 'original', 'path': local_image_path}
# 4. 텍스트 번역
translated_texts = await self.batch_papago_translate_texts(filter_ocr_results)
self.logger.log(f"translated_texts: {translated_texts}", level=logging.DEBUG)
# 5. OCR 권한에 따른 텍스트 필터링
if ocr_enabled:
filtered_translated_texts = self.process_translated_texts(translated_texts, local_image_path, index)
if not filtered_translated_texts:
self.logger.log(f"이미지 {index+1} 제외됨", level=logging.DEBUG)
return {'status': 'exclude', 'path': local_image_path}
else:
self.logger.log(f"이미지 {index+1} 치환됨", level=logging.DEBUG)
else:
# OCR 권한이 없으면 번역된 텍스트를 그대로 사용
filtered_translated_texts = translated_texts
self.logger.log(f"이미지 {index+1} OCR 권한 없음, 전체 번역 모드", level=logging.DEBUG)
if ocr_count < 5:
expansion_size = 12
blur_size = 15
elif ocr_count < 10:
expansion_size = 9
blur_size = 12
elif ocr_count < 15:
expansion_size = 7
blur_size = 9
elif ocr_count < 20:
expansion_size = 5
blur_size = 6
else:
expansion_size = 10
blur_size = 15
# 마스크 생성 (basic 방식만 사용)
masks = self.mask_module.create_masks(
image_path=local_image_path,
ocr_results=filter_ocr_results,
mask_option="basic",
expansion_size=expansion_size,
blur_size=blur_size
)
self.logger.log(f"마스크 생성 완료", level=logging.DEBUG)
# if not self.is_frozen():
# # 디버깅 이미지 저장 (OCR 박스 + 마스크 시각화)
# self.save_debug_images(local_image_path, filter_ocr_results, masks, index, file_prefix)
# 인페인팅
# is_member_valid = self.toggle_states.get('membership_level', 'basic') == 'premium' or self.toggle_states.get('membership_level', 'basic') == 'vip'
is_member_valid = self.toggle_states.get('membership_level', 'basic') == 'vip' or self.authenticated_by_admin
self.logger.log(f"ocr_count: {ocr_count}", level=logging.DEBUG)
self.logger.log(f"is_member_valid: {is_member_valid}", level=logging.DEBUG)
self.logger.log(f"inpaint_method: {self.inpaint_method}", level=logging.DEBUG)
if self.inpaint_method == 'request' and is_member_valid and ocr_count > 8:
self.logger.log(f"Request 인페인팅 요청", level=logging.DEBUG)
inpainted_image = self.request_ai_server.request_inpaint(local_image_path, masks)
if inpainted_image is None:
self.logger.log(f"Request 인페인팅 실패, opencv 인페인팅으로 대체", level=logging.WARNING)
inpainted_image = self.opencv_inpaint(local_image_path, masks, method='telea', radius=3)
# inpainted_image = self.lama_inpaint(local_image_path, masks)
else:
self.logger.log(f"자체 인페인팅 실행", level=logging.DEBUG)
inpainted_image = self.opencv_inpaint(local_image_path, masks, method='telea', radius=3)
# inpainted_image = self.lama_inpaint(local_image_path, masks)
self.logger.log(f"인페인팅 완료", level=logging.DEBUG)
# 인페인팅 실패 시 원본 이미지 사용
if inpainted_image is None:
self.logger.log(f"인페인팅 실패, 원본 이미지 사용", level=logging.WARNING)
inpainted_image = cv2.imread(local_image_path)
if inpainted_image is None:
self.logger.log(f"원본 이미지 로드 실패: {local_image_path}", level=logging.ERROR)
return {'status': 'failed', 'path': local_image_path, 'error': '원본 이미지 로드 실패'}
# 텍스트 렌더링
text_rendered_image = self.text_rendering_module.render_text(
inpainted_image, filter_ocr_results, filtered_translated_texts)
self.logger.log(f"텍스트 렌더링 완료", level=logging.DEBUG)
# 결과 저장
translated_img_path = await self.postProcess_and_save_image(local_image_path, text_rendered_image, index, file_prefix)
self.logger.log(f"이미지 {index+1} 번역 완료: {translated_img_path}", level=logging.DEBUG)
return {'status': 'translated', 'path': translated_img_path}
except Exception as e:
self.logger.log(f"이미지 {index+1} 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
return {'status': 'failed', 'path': local_image_path or original_image_url, 'error': str(e)}
finally:
# ---- 메모리 해제 ----
try:
ocr_results = None
filter_ocr_results = None
translated_texts = None
masks = None
inpainted_image = None
text_rendered_image = None
except Exception:
pass
# download_image 단계에서 사용한 page, local_image_path 도 참조 제거
page = None
local_image_path = None
# 최종 GC 강제 실행
gc.collect()
# OCR 결과 필터링: 중국어 텍스트만 필터링
def filter_ocr_results(self, ocr_results):
import re
# 중국어 문자 범위 정규식 (한자)
chinese_pattern = re.compile(r'[\u4e00-\u9fff]+')
filtered_results = []
for r in ocr_results:
text = r.get('text', '').strip()
polygon = r.get('polygon', [])
confidence = r.get('confidence', 0.0)
# 텍스트가 비어있거나 polygon이 3점 미만이면 제외
if not text or not polygon or len(polygon) < 3:
self.logger.log(f"[필터링] 제외 (텍스트/폴리곤): '{text}'", level=logging.DEBUG)
continue
# 신뢰도 20% 미만이면 제외
if confidence < 0.05:
self.logger.log(f"[필터링] 제외 (신뢰도 {confidence:.1%}): '{text}'", level=logging.DEBUG)
continue
# 중국어 문자가 포함된 텍스트만 필터링
if chinese_pattern.search(text):
filtered_results.append(r)
self.logger.log(f"[필터링] 포함 (신뢰도 {confidence:.1%}): '{text}'", level=logging.DEBUG)
else:
self.logger.log(f"[필터링] 제외 (중국어 없음): '{text}'", level=logging.DEBUG)
self.logger.log(f"필터링 결과: {len(filtered_results)}/{len(ocr_results)}개 (신뢰도 + & 중국어)", level=logging.INFO)
return filtered_results
async def postProcess_and_save_image(self, local_image_path, text_rendered_image, index, file_prefix=""):
"""로컬 서버 URL을 사용해 이미지를 번역하고 로컬에 저장합니다"""
try:
# 파일명에 접두사 포함
if file_prefix:
img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_{file_prefix}_img_{index+1}.png")
else:
img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_img_{index+1}.png")
watermark_text=self.toggle_states.get("watermark_text", "이미지 저작권 보유")
is_watermark_enabled = watermark_text != "" or self.toggle_states.get("watermark", False)
self.logger.log(f"watermark_text: {watermark_text}", level=logging.DEBUG)
self.logger.log(f"is_watermark_enabled: {is_watermark_enabled}", level=logging.DEBUG)
# file_prefix가 'detail' 또는 'option'일 때만 워터마크 추가
if is_watermark_enabled and file_prefix in ["detail"]:
image_data_to_save = self.postImageManager.add_watermark(
image_data=text_rendered_image,
watermark_text=watermark_text
)
else:
# np.ndarray라면 PIL.Image로 변환
if isinstance(text_rendered_image, np.ndarray):
image_data_to_save = Image.fromarray(cv2.cvtColor(text_rendered_image, cv2.COLOR_BGR2RGB))
else:
image_data_to_save = text_rendered_image
final_image_path = self.postImageManager.save_image_to_path(image_data_to_save, img_path)
return final_image_path
except Exception as e:
self.logger.log(f"이미지 {index+1} 번역 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
return local_image_path
def download_image(self, image_url, index, file_prefix="", max_retries=3):
"""Requests를 사용해 이미지를 다운로드합니다"""
import requests
import time
import random
# 로컬 파일 경로면 바로 반환
if os.path.isfile(image_url):
self.logger.log(f"로컬 파일 경로 감지, 다운로드 생략: {image_url}", level=logging.DEBUG)
return image_url
# 로컬 파일 경로가 아니면 다운로드 시도
try:
# "https://assets.alicdn.com"으로 시작하는 URL은 건너뛰기
if image_url.startswith("https://assets.alicdn.com"):
self.logger.log(f"다운로드 제외 URL: {image_url}", level=logging.DEBUG)
return None
# URL에서 파일명 추출 및 접두사 포함
parsed_url = urlparse(image_url)
base_filename = f"image_{index:03d}_{os.path.basename(parsed_url.path)}"
if not base_filename.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')):
base_filename += '.jpg'
# 접두사가 있으면 파일명에 포함
if file_prefix:
filename = f"{file_prefix}_{base_filename}"
else:
filename = base_filename
local_path = os.path.join(self.TEMP_IMAGE_DIR, filename)
# HTTP 헤더 설정
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1", # Do Not Track 요청 헤더
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Cache-Control": "max-age=0"
}
retries = 0
while retries < max_retries:
try:
self.logger.log(f"이미지 다운로드 중: {filename}", level=logging.DEBUG)
response = requests.get(image_url, headers=headers, stream=True, timeout=30)
if response.status_code == 200:
image_data = response.content
# 이미지 데이터 유효성 검사
if self.is_valid_image_data(image_data):
with open(local_path, 'wb') as f:
f.write(image_data)
self.logger.log(f"이미지 다운로드 완료: {filename}", level=logging.DEBUG)
return local_path
else:
self.logger.log(f"유효하지 않은 이미지 데이터: {image_url}", level=logging.WARNING)
return None
else:
self.logger.log(f"이미지 다운로드 실패 (HTTP {response.status_code}): {image_url}. 재시도 {retries + 1}/{max_retries}", level=logging.ERROR)
retries += 1
if retries < max_retries:
time.sleep(random.randint(2, 5)) # 2~5초 대기 후 재시도
except requests.exceptions.RequestException as e:
self.logger.log(f"이미지 다운로드 중 네트워크 오류: {e}. 재시도 {retries + 1}/{max_retries}", level=logging.ERROR)
retries += 1
if retries < max_retries:
time.sleep(random.randint(2, 5)) # 예외 발생 시 대기 후 재시도
except Exception as e:
self.logger.log(f"이미지 다운로드 중 예상치 못한 오류: {e}. 재시도 {retries + 1}/{max_retries}", level=logging.ERROR)
retries += 1
if retries < max_retries:
time.sleep(random.randint(2, 5))
self.logger.log(f"이미지 다운로드 최대 재시도 횟수를 초과했습니다: {image_url}", level=logging.ERROR)
return None
except Exception as e:
self.logger.log(f"이미지 다운로드 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None
def is_valid_image_data(self, image_data):
"""이미지 데이터가 유효한지 확인합니다"""
if not image_data or len(image_data) < 100:
return False
# 일반적인 이미지 파일 시그니처 확인
image_signatures = [
b'\xFF\xD8\xFF', # JPEG
b'\x89PNG\r\n\x1a\n', # PNG
b'GIF87a', # GIF87a
b'GIF89a', # GIF89a
b'RIFF', # WebP (RIFF 컨테이너)
]
return any(image_data.startswith(sig) for sig in image_signatures)
def process_translated_texts(self, translated_texts, local_image_path, index):
"""
번역된 단어 리스트(translated_texts)에서 unwanted_texts의 원본값이
앞이나 뒤에 포함되면 치환값으로 바꿉니다.
치환값이 '이미지삭제'라면 None 반환(이미지 제외)
"""
new_texts = []
for i, text in enumerate(translated_texts):
self.logger.log(f"[치환 처리 {i+1}] 원본 텍스트: '{text}'", level=logging.DEBUG)
# 텍스트를 빈칸으로 분리
words = text.split()
self.logger.log(f"[치환 처리 {i+1}] 분리된 단어: {words}", level=logging.DEBUG)
processed_words = []
text_replaced = False
for word_idx, word in enumerate(words):
word_replaced = False
# unwanted_texts와 매칭 확인
for origin, replace in self.unwanted_texts.items():
if word.startswith(origin) or word.endswith(origin) or word == origin:
self.logger.log(f"[치환 처리 {i+1}] 단어 '{word}' -> '{replace}' (원본: '{origin}')", level=logging.INFO)
if replace == "이미지삭제":
self.logger.log(f"이미지 {index+1} 제외됨: {local_image_path}", level=logging.INFO)
return None
# 단어 치환
if word == origin:
new_word = replace
elif word.startswith(origin):
new_word = replace + word[len(origin):]
elif word.endswith(origin):
new_word = word[:-len(origin)] + replace
processed_words.append(new_word)
word_replaced = True
text_replaced = True
self.logger.log(f"[치환 처리 {i+1}] 단어 치환 완료: '{word}' -> '{new_word}'", level=logging.DEBUG)
break
if not word_replaced:
processed_words.append(word)
# 처리된 단어들을 다시 합치기
final_text = ' '.join(processed_words)
new_texts.append(final_text)
if text_replaced:
self.logger.log(f"[치환 처리 {i+1}] 최종 결과: '{text}' -> '{final_text}'", level=logging.INFO)
else:
self.logger.log(f"[치환 처리 {i+1}] 변경 없음: '{text}'", level=logging.DEBUG)
self.logger.log(f"전체 치환 결과: {len(new_texts)}개 텍스트 처리 완료", level=logging.INFO)
for i, (original, processed) in enumerate(zip(translated_texts, new_texts)):
if original != processed:
self.logger.log(f"[최종 치환 {i+1}] '{original}' -> '{processed}'", level=logging.INFO)
return new_texts
async def batch_papago_translate_texts(self, ocr_results, delimiter='\n'):
"""
ocr_results에서 추출한 텍스트 리스트를 줄바꿈으로 합쳐 한 번에 파파고로 번역 후, 다시 분리하여 반환합니다.
각 텍스트 내에 여러 구분자(/, |, ·, , 등)가 있을 경우에도 분리하여 번역 후 다시 합칩니다.
"""
import re
texts = [result['text'] for result in ocr_results if result['text'].strip()]
if not texts:
return []
# 각 텍스트를 내부적으로 split (/, |, ·, , 등) 후 다시 합침
split_texts = []
split_indices = [] # 각 텍스트가 몇 개의 파트로 쪼개졌는지 기록
for text in texts:
parts = re.split(r'\s*[/|·,;、]+\s*', text)
parts = [p.strip() for p in parts if p.strip()]
split_texts.extend(parts)
split_indices.append(len(parts))
# Papago에 한 번에 번역 요청
joined = delimiter.join(split_texts)
try:
# 이미 한 번에 여러 줄을 받아서 리스트로 반환!
translated_lines = await self.papago_translator.translate(joined, source_lang="zh", target_lang="ko")
# Papago가 리스트로 반환한다고 가정!
results = translated_lines
# 개수가 다르면 fallback(선택사항, 아래는 로깅만)
if len(results) != len(split_texts):
self.logger.log(
f"파파고 번역 줄 개수 불일치: {len(results)} != {len(split_texts)}",
level=logging.WARNING
)
# 여기서 fallback 처리 또는 그냥 results를 그대로 사용
# 다시 원래 텍스트 단위로 합치기
final_results = []
idx = 0
for count in split_indices:
parts = results[idx:idx+count]
final_results.append(' / '.join(parts))
idx += count
return final_results
except Exception as e:
self.logger.log(f"파파고 번역 실패: {e}", level=logging.ERROR, exc_info=True)
return texts
# def batch_google_translate_texts(self, ocr_results, delimiter='\n'):
# """
# ocr_results에서 추출한 텍스트 리스트를 줄바꿈으로 합쳐 한 번에 구글 번역기로 번역 후, 다시 분리하여 반환합니다.
# 각 텍스트 내에 여러 구분자(/, |, ·, , 등)가 있을 경우에도 분리하여 번역 후 다시 합칩니다.
# """
# import re
# texts = [result['text'] for result in ocr_results if result['text'].strip()]
# if not texts:
# return []
# # 각 텍스트를 내부적으로 split (/, |, ·, , 등) 후 다시 합침
# split_texts = []
# split_indices = [] # 각 텍스트가 몇 개의 파트로 쪼개졌는지 기록
# for text in texts:
# parts = re.split(r'\s*[/|·,;、]+\s*', text)
# parts = [p.strip() for p in parts if p.strip()]
# split_texts.extend(parts)
# split_indices.append(len(parts))
# # 합쳐서 한 번에 번역
# joined = delimiter.join(split_texts)
# try:
# translated_obj = self.gtranslate.translate(joined, "Korean")
# translated_text = getattr(translated_obj, "text", getattr(translated_obj, "result", joined))
# results = translated_text.split(delimiter)
# # 만약 개수가 다르면 fallback
# if len(results) != len(split_texts):
# results = [getattr(self.gtranslate.translate(t, "Korean"), "text", t) for t in split_texts]
# # 다시 원래 텍스트 단위로 합치기
# final_results = []
# idx = 0
# for count in split_indices:
# parts = results[idx:idx+count]
# final_results.append(' / '.join(parts))
# idx += count
# return final_results
# except Exception as e:
# self.logger.log(f"구글 번역 실패: {e}", level=logging.ERROR, exc_info=True)
# return texts
def opencv_inpaint(self, image_path, mask, method='telea', radius=3):
"""
OpenCV로 인페인팅을 수행합니다.
Args:
image_path (str): 원본 이미지 경로
mask (np.ndarray): 2D 마스크 이미지 (0/255, shape=(H, W))
method (str): 'telea' 또는 'ns'
radius (int): 인페인팅 반경
Returns:
inpainted_image (np.ndarray): 인페인팅된 이미지
"""
import cv2
import numpy as np
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"이미지 로드 실패: {image_path}", level=logging.ERROR)
return None
# 마스크가 2D 배열인지 확인
if mask is None:
self.logger.log(f"마스크가 None입니다", level=logging.ERROR)
return None
if not isinstance(mask, np.ndarray) or mask.ndim != 2:
self.logger.log(f"마스크가 2D numpy 배열이 아닙니다: type={type(mask)}, shape={getattr(mask, 'shape', 'N/A')}", level=logging.ERROR)
return None
# 마스크 크기가 이미지와 일치하는지 확인
if mask.shape != image.shape[:2]:
self.logger.log(f"마스크 크기가 이미지와 다릅니다: mask={mask.shape}, image={image.shape[:2]}", level=logging.ERROR)
return None
inpaint_method = cv2.INPAINT_TELEA if method == 'telea' else cv2.INPAINT_NS
inpainted = cv2.inpaint(image, mask, radius, inpaint_method)
return inpainted
def lama_inpaint(self, image_path, mask):
"""
PaddleHub lama 모델로 인페인팅을 수행합니다.
Args:
image_path (str): 원본 이미지 경로
mask (np.ndarray): 2D 마스크 이미지 (0/255, shape=(H, W))
Returns:
inpainted_image (np.ndarray): 인페인팅된 이미지 (BGR)
"""
import cv2
import numpy as np
import logging
try:
import paddlehub as hub
except ImportError:
self.logger.log("paddlehub 미설치: pip install paddlehub", level=logging.ERROR)
return None
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"이미지 로드 실패: {image_path}", level=logging.ERROR)
return None
# 마스크 2D 체크
if mask is None:
self.logger.log(f"마스크가 None입니다", level=logging.ERROR)
return None
if not isinstance(mask, np.ndarray) or mask.ndim != 2:
self.logger.log(f"마스크가 2D numpy 배열이 아닙니다: type={type(mask)}, shape={getattr(mask, 'shape', 'N/A')}", level=logging.ERROR)
return None
if mask.shape != image.shape[:2]:
self.logger.log(f"마스크 크기가 이미지와 다릅니다: mask={mask.shape}, image={image.shape[:2]}", level=logging.ERROR)
return None
try:
# PaddleHub lama 인페인팅 모델 로드 (최초 1회 다운로드)
if not hasattr(self, "_lama_module"):
self._lama_module = hub.Module(name="lama")
if self.logger:
self.logger.log("lama 인페인팅 모델 로드됨")
# 라마 모델 인페인팅 (마스크 0/255, uint8)
result = self._lama_module.predict(images=[image], masks=[mask])
if not result or "inpainted" not in result[0]:
self.logger.log("lama 인페인팅 결과 없음", level=logging.ERROR)
return None
inpainted = result[0]["inpainted"] # np.ndarray(BGR)
return inpainted
except Exception as e:
self.logger.log(f"lama 인페인팅 중 예외 발생: {e}", level=logging.ERROR, exc_info=True)
return None
def save_debug_images(self, local_image_path, ocr_results, masks, index, file_prefix=""):
"""디버깅용 OCR 박스와 마스크 이미지를 저장합니다"""
try:
# OCR 박스 시각화 이미지 저장
ocr_debug_path = self.save_ocr_debug_image(local_image_path, ocr_results, index, file_prefix)
# 마스크 시각화 이미지 저장
mask_debug_path = self.save_mask_debug_image(local_image_path, masks, index, file_prefix)
self.logger.log(f"디버깅 이미지 저장 완료: OCR={ocr_debug_path}, Mask={mask_debug_path}", level=logging.INFO)
return ocr_debug_path, mask_debug_path
except Exception as e:
self.logger.log(f"디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None, None
def save_ocr_debug_image(self, image_path, ocr_results, index, file_prefix=""):
"""OCR 감지된 박스들을 이미지에 표시하여 저장합니다"""
try:
import cv2
import numpy as np
import os
# 원본 이미지 로드
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"OCR 디버깅용 이미지 로드 실패: {image_path}", level=logging.ERROR)
return None
# 이미지 복사본 생성
debug_image = image.copy()
# OCR 결과별로 박스 그리기
for i, result in enumerate(ocr_results):
polygon = result.get('polygon', [])
bbox = result.get('bbox', None)
text = result.get('text', '')
confidence = result.get('confidence', 0.0)
# 신뢰도에 따른 색상 결정
if confidence >= 0.8:
color = (0, 255, 0) # 초록 (높은 신뢰도)
elif confidence >= 0.5:
color = (0, 255, 255) # 노랑 (중간 신뢰도)
elif confidence >= 0.2:
color = (0, 165, 255) # 주황 (낮은 신뢰도)
else:
color = (0, 0, 255) # 빨강 (매우 낮은 신뢰도)
if polygon and len(polygon) >= 3:
# 폴리곤을 numpy 배열로 변환 (좌표를 int로 변환)
pts = np.array([[int(x), int(y)] for x, y in polygon], np.int32)
pts = pts.reshape((-1, 1, 2))
cv2.polylines(debug_image, [pts], True, color, 2)
x, y = pts[0][0]
elif bbox and len(bbox) == 4:
try:
x, y, w, h = [int(float(v)) for v in bbox]
except Exception as e:
self.logger.log(f"bbox 값 변환 오류: {bbox} ({e})", level=logging.ERROR)
continue
cv2.rectangle(debug_image, (x, y), (x + w, y + h), color, 2)
else:
continue # polygon, bbox 둘 다 없으면 skip
label = f"{i+1}: {text[:10]}... ({confidence:.1%})"
# 텍스트 배경 사각형
(text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(debug_image, (x, y-text_height-5), (x+text_width, y), color, -1)
# 텍스트 표시
cv2.putText(debug_image, label, (x, y-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# 범례 추가
legend_y = 30
cv2.putText(debug_image, "OCR Detection Results:", (10, legend_y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.putText(debug_image, "Green: 80%+ Yellow: 50-80% Orange: 20-50% Red: <20%", (10, legend_y+25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# 파일 저장
if file_prefix:
debug_filename = f"debug_ocr_{file_prefix}_img_{index+1}.png"
else:
debug_filename = f"debug_ocr_img_{index+1}.png"
debug_path = os.path.join(self.TEMP_IMAGE_DIR, debug_filename)
cv2.imwrite(debug_path, debug_image)
self.logger.log(f"OCR 디버깅 이미지 저장: {debug_filename}", level=logging.DEBUG)
return debug_path
except Exception as e:
self.logger.log(f"OCR 디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None
def save_mask_debug_image(self, image_path, masks, index, file_prefix=""):
"""생성된 마스크를 이미지에 오버레이하여 저장합니다"""
try:
import cv2
import numpy as np
# 원본 이미지 로드
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"마스크 디버깅용 이미지 로드 실패: {image_path}", level=logging.ERROR)
return None
if masks is None or not isinstance(masks, np.ndarray):
self.logger.log(f"유효하지 않은 마스크: {type(masks)}", level=logging.ERROR)
return None
# 이미지 복사본 생성
debug_image = image.copy()
# 마스크 영역을 빨간색으로 오버레이
mask_colored = np.zeros_like(image)
mask_colored[:, :, 2] = masks # 빨간색 채널에 마스크 적용
# 마스크 영역 반투명 오버레이 (알파 블렌딩)
alpha = 0.3
overlay_mask = masks > 0
debug_image[overlay_mask] = cv2.addWeighted(
debug_image[overlay_mask], 1-alpha,
mask_colored[overlay_mask], alpha, 0
)
# 마스크 통계 정보 표시
total_pixels = masks.shape[0] * masks.shape[1]
mask_pixels = np.sum(masks > 0)
mask_percentage = (mask_pixels / total_pixels) * 100 if total_pixels > 0 else 0
# 정보 텍스트 표시
info_text = [
f"Mask Statistics:",
f"Total pixels: {total_pixels:,}",
f"Mask pixels: {mask_pixels:,}",
f"Coverage: {mask_percentage:.1f}%"
]
y_offset = 30
for i, text in enumerate(info_text):
y_pos = y_offset + (i * 25)
# 텍스트 배경
(text_width, text_height), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
cv2.rectangle(debug_image, (10, y_pos-text_height-3), (10+text_width+10, y_pos+5), (0, 0, 0), -1)
# 텍스트
cv2.putText(debug_image, text, (15, y_pos), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# 파일 저장
if file_prefix:
debug_filename = f"debug_mask_{file_prefix}_img_{index+1}.png"
else:
debug_filename = f"debug_mask_img_{index+1}.png"
debug_path = os.path.join(self.TEMP_IMAGE_DIR, debug_filename)
cv2.imwrite(debug_path, debug_image)
self.logger.log(f"마스크 디버깅 이미지 저장: {debug_filename} (마스크 커버리지: {mask_percentage:.1f}%)", level=logging.DEBUG)
return debug_path
except Exception as e:
self.logger.log(f"마스크 디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None
def is_frozen(self):
"""
실행 환경에 따라 배포환경인지 개발환경인지 확인하는 메서드.
cx_Freeze로 패키징된 경우 실행 파일의 경로, 일반 Python 환경일 경우 __file__을 기준으로 설정.
"""
if getattr(sys, 'frozen', False): # 패키징된 경우
self.logger.log("배포환경", level=logging.DEBUG)
return True
else: # 일반 Python 실행 환경
self.logger.log("개발환경", level=logging.DEBUG)
return False
async def remove_background(self, page, original_image_url, file_prefix=""):
"""배경제거 전용 메서드 (썸네일 등 외부 호출용).
1. 이미지를 다운로드(또는 로컬 경로 사용)
2. Request_AI_Server.request_rembg 로 배경 제거 (흰 배경 중앙 배치 포함)
3. TEMP_IMAGE_DIR 하위에 저장 후 경로 반환
"""
try:
index = 0 # 기본값 (외부에서 필요 시 파일명 구분용)
# 0. 이미지 URL 유효성 체크
if not original_image_url or not isinstance(original_image_url, str):
self.logger.log("배경제거 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING)
return {"status": "failed", "path": original_image_url, "error": "이미지 URL 오류"}
# 1. 다운로드 또는 로컬 경로 확정
if original_image_url.startswith("http"):
# 다운로드 재사용을 위해 기존 메서드 호출
# local_path = await self.download_image(page=page, image_url=original_image_url, index=0, file_prefix=file_prefix)
local_path = await self.download_image(image_url=original_image_url, index=0, file_prefix=file_prefix)
if not local_path:
return {"status": "failed", "path": original_image_url, "error": "다운로드 실패"}
else:
local_path = original_image_url # 이미 로컬 경로
# 2. 배경 제거 (np.ndarray 반환)
removed = self.request_ai_server.request_rembg(local_path)
if removed is None:
self.logger.log("RemoveBG 실패", level=logging.ERROR)
return {"status": "failed", "path": local_path, "error": "RemoveBG 실패"}
# 3. 저장 경로 결정
os.makedirs(self.TEMP_IMAGE_DIR, exist_ok=True)
base_name = os.path.basename(local_path)
name_no_ext, _ = os.path.splitext(base_name)
save_name = f"nobg_{file_prefix}_{name_no_ext}.png" if file_prefix else f"nobg_{name_no_ext}.png"
save_path = os.path.join(self.TEMP_IMAGE_DIR, save_name)
# 4. 저장 (OpenCV → BGR)
cv2.imwrite(save_path, removed)
self.logger.log(f"배경제거 이미지 저장: {save_path}", level=logging.INFO)
return {"status": "success", "path": save_path}
except Exception as e:
self.logger.log(f"remove_background 오류: {e}", level=logging.ERROR, exc_info=True)
return {"status": "failed", "path": original_image_url, "error": str(e)}
# async def remove_background(self, page, original_image_url, index, file_prefix="", **kwargs):
# """
# 배경제거: 이미지 반환 → 후처리 및 저장 → 경로 반환
# """
# try:
# # 0. 이미지 URL 유효성 체크 (http/https & 이미지 확장자)
# if not original_image_url or not isinstance(original_image_url, str):
# self.logger.log(f"이미지 {index+1} 처리 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING)
# return {'status': 'failed', 'path': original_image_url, 'error': '이미지 URL 없음 또는 타입 오류'}
# if not self.is_valid_image_path(original_image_url):
# self.logger.log(f"이미지 {index+1} 처리 중단: 유효하지 않은 이미지 주소 - {original_image_url}", level=logging.WARNING)
# return {'status': 'failed', 'path': original_image_url, 'error': '유효하지 않은 이미지 주소'}
# # 1. 이미지 다운로드
# local_image_path = await self.download_image(page, original_image_url, index, file_prefix)
# if not local_image_path:
# self.logger.log(f"이미지 {index+1} 다운로드 실패, 원본 URL 반환", level=logging.WARNING)
# return {'status': 'failed', 'path': original_image_url, 'error': '다운로드 실패'}
# self.logger.log(f"이미지 {index+1} 로컬 저장위치: {local_image_path}", level=logging.DEBUG)
# # 1. 배경제거 수행(이미지 반환)
# removed_img = self.background_removal_module.remove_background(
# local_image_path, **kwargs
# )
# if removed_img is None:
# self.logger.log(f"배경제거 실패: {local_image_path}", level=40)
# return {'status': 'failed', 'path': local_image_path, 'error': '배경제거 실패'}
# if self.toggle_states.get("remove_background_white", True):
# img_result_white = self.background_removal_module.to_white_background(removed_img)
# else:
# img_result_white = removed_img
# # 2. 저장 경로 생성
# if file_prefix:
# save_path = os.path.join(self.TEMP_IMAGE_DIR, f"nobg_{file_prefix}_img_{index+1}.png")
# else:
# save_path = os.path.join(self.TEMP_IMAGE_DIR, f"nobg_img_{index+1}.png")
# # 3. 워터마크 등 후처리 및 저장
# # 워터마크 등 추가하려면 아래처럼
# # removed_img = self.postImageManager.add_watermark(image_data=removed_img, watermark_text=...)
# final_path = self.postImageManager.save_image_to_path(img_result_white, save_path)
# if final_path:
# self.logger.log(f"배경제거 이미지 저장됨: {final_path}")
# return {'status': 'success', 'path': final_path}
# else:
# self.logger.log(f"배경제거 후 저장 실패: {save_path}", level=40)
# return {'status': 'failed', 'path': save_path, 'error': '저장 실패'}
# except Exception as e:
# self.logger.log(f"배경제거 중 오류: {e}", level=40, exc_info=True)
# return {'status': 'failed', 'path': local_image_path, 'error': str(e)}
# def remove_background_with_ppmatting(image_path, output_path='output_foreground.png', alpha_path='output_alpha.png'):
# """
# PaddleHub의 ppmatting을 이용한 배경제거(투명배경 PNG) 함수입니다.
# Args:
# image_path (str): 입력 이미지 경로
# output_path (str): 결과 투명 배경 PNG 저장 경로
# alpha_path (str): 알파(마스크) 이미지 저장 경로
# Returns:
# foreground (np.ndarray): 알파채널이 포함된 전경 PNG 이미지 (BGRA)
# alpha (np.ndarray): 추출된 알파 마스크
# """
# import paddlehub as hub
# import cv2
# import numpy as np
# # 1. 이미지 로드 (OpenCV는 BGR)
# img = cv2.imread(image_path)
# if img is None:
# print(f"이미지를 불러올 수 없습니다: {image_path}")
# return None, None
# # 2. BGR → RGB (ppmatting은 RGB 입력)
# img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# # 3. ppmatting 모델 로드 (최초 1회 다운로드됨)
# matting = hub.Module(name='ppmatting-hrnet-1x')
# # 4. 예측 수행
# results = matting.predict(images=[img_rgb]) # 결과는 리스트
# # 5. 알파(투명도) 마스크 추출 (float, 0~1)
# alpha = results[0]['alpha']
# # 6. 알파 마스크를 0~255 uint8로 변환 (이미지 저장용)
# alpha_img = (alpha * 255).astype(np.uint8)
# # 7. 원본 이미지를 BGRA(투명도 포함)로 변환
# foreground = cv2.cvtColor(img, cv2.COLOR_BGR2BGRA)
# foreground[..., 3] = alpha_img # 알파 채널 추가
# # 8. 결과 이미지 저장
# cv2.imwrite(output_path, foreground)
# cv2.imwrite(alpha_path, alpha_img)
# print(f"배경 제거 PNG 저장: {output_path}")
# print(f"알파 마스크 저장: {alpha_path}")
# return foreground, alpha_img
# ------------------------------------------------------------------
# 고해상도 이미지 다운스케일 유틸리티 (메모리 절감용)
# ------------------------------------------------------------------
def downscale_image_if_large(self, image_path, max_dim=2048):
"""주어진 이미지가 max_dim 픽셀을 초과하면 축소하여 같은 경로에 저장하고 경로를 반환합니다"""
try:
with Image.open(image_path) as img:
width, height = img.size
if max(width, height) <= max_dim:
return image_path # 변경 없음
scale = float(max_dim) / float(max(width, height))
new_size = (int(width * scale), int(height * scale))
self.logger.log(
f"고해상도({width}x{height}) -> {new_size}로 리사이즈 후 저장", level=logging.INFO)
resized = img.resize(new_size, Image.LANCZOS)
# JPG, PNG 등에 관계없이 원본 확장자를 유지하여 덮어쓰기
resized.save(image_path)
return image_path
except Exception as e:
self.logger.log(f"다운스케일 실패: {e}", level=logging.WARNING)
return image_path