IMG_Worker/modules/image_processor3.py

2559 lines
128 KiB
Python

import os
import asyncio
import requests
import time
import logging
import threading
from urllib.parse import urlparse
import sys
import re
import cv2
import psutil
import tracemalloc
# OpenCV 의 내부 최적화(메모리 풀) 사용을 비활성화하여 파편화 위험을 낮춤
cv2.setUseOptimized(False)
import random
import gc
import numpy as np
from PIL import Image
from PIL import features
from modules.onnx_ocr_module.src.onnx_ocr_wrapper import ONNXOCRModule as Onnx_OCRModule
# from modules.ocr_module import OCRModule as Paddle_OCRModule
from modules.mask_module_for_paddle import MaskModule
# from modules.mask_module_for_easy import MaskModule_easy
from modules.text_rendering_module import TextRenderingModuleOptimized
from modules.postImageManager import PostImageManager
from translatepy.translators.google import GoogleTranslate
# Gemma 번역 클라이언트(옵셔널): 배포 환경에서 누락되어도 동작하도록 안전 임포트
try:
from modules.gemma_client import GemmaTranslator # 표준 경로
except Exception:
try:
# 개발 환경에서 상대 경로 임포트가 남아있는 경우 대비
from gemma_client import GemmaTranslator # noqa: F401
except Exception:
GemmaTranslator = None # 사용 시 체크 후 동작
# OpenRouter 번역 클라이언트(옵셔널): 다양한 LLM 모델 지원
try:
from modules.openrouter_client import OpenRouterTranslator
except Exception:
try:
from openrouter_client import OpenRouterTranslator
except Exception:
OpenRouterTranslator = None
# from modules.background_removal_module import BackgroundRemovalModule
# from modules.background_removal_module_pp import PPMattingBackgroundRemovalModule # (변경)
from modules.request_inpaint import Request_AI_Server
from modules.gpu_utils import GPUManager
class ImageProcessor3:
"""이미지 다운로드, OCR, 번역 처리를 담당하는 클래스"""
def __init__(self, logger, page, toggle_states, unwanted_words, authenticated_by_admin, base_dir, papago_translator):
self.logger = logger
self.page = page
self.base_dir = base_dir
self.toggle_states = toggle_states
self.unwanted_texts = unwanted_words
self.authenticated_by_admin = authenticated_by_admin
# 메모리 추적 시작 (파이썬 객체 메모리 할당 추적)
try:
if not tracemalloc.is_tracing():
tracemalloc.start()
self.logger.log("tracemalloc 메모리 추적 시작", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"tracemalloc 시작 실패: {e}", level=logging.WARNING)
# 기본 속성들을 먼저 None으로 초기화하여 안전성 확보
self.postImageManager = None
self.ocr_module = None
self.mask_module = None
self.text_rendering_module = None
self.request_ai_server = None
self.gtranslate = None
self.migan = None
self.gpu_manager = None
try:
# GPU 관리자 초기화
self.gpu_manager = GPUManager(logger=logger)
self.gpu_manager.initialize_gpu_state(toggle_states)
# GPU 상태 상세 로깅
gpu_status = self.gpu_manager.get_cuda_status()
self.logger.log(f"🔧 ImageProcessor3 GPU 상태 요약:", level=logging.DEBUG)
self.logger.log(f" - CUDA 사용 가능: {gpu_status['can_use_cuda']}", level=logging.DEBUG)
self.logger.log(f" - toggle_states['use_cuda']: {toggle_states.get('use_cuda', 'NOT_SET')}", level=logging.DEBUG)
self.logger.log(f" - GPU 하드웨어 정보: {gpu_status['gpu_info']}", level=logging.DEBUG)
self.logger.log(f"ImageProcessor3 Init toggle_states: {self.toggle_states}", level=logging.DEBUG)
self.is_member_valid = self.toggle_states.get('membership_level', 'basic') == 'vip' or self.authenticated_by_admin
self.logger.log(f"is_member_valid: {self.is_member_valid}", level=logging.DEBUG)
self.papago_translator = papago_translator
self.inpaint_method = 'migan'
# 폰트 경로 결정
self.font_path = self._resolve_font_path(self.toggle_states.get("font_type", "폰트1"))
# PostImageManager 등 하위 모듈과 공유
try:
self.toggle_states['image_font_path'] = self.font_path
except Exception:
pass
self.TEMP_IMAGE_DIR = self.toggle_states.get('TEMP_IMAGE_DIR', "")
self.debugging_save_Dir = os.path.join(self.base_dir, "debug_images")
if not os.path.exists(self.debugging_save_Dir):
os.makedirs(self.debugging_save_Dir)
self.logger.log(f"debug_images 디렉토리 생성: {self.debugging_save_Dir}", level=logging.DEBUG)
else:
self.logger.log(f"debug_images 디렉토리 이미 존재: {self.debugging_save_Dir}", level=logging.DEBUG)
self.use_local_rembg = self.toggle_states.get("use_local_rembg", False)
self.local_model_name = self.toggle_states.get("local_model_name", 'birefnet-general-lite')
# self.logger.log(f"self.toggle_states: {self.toggle_states}", level=logging.DEBUG)
self.logger.log(f"self.font_path: {self.font_path}", level=logging.DEBUG)
self.logger.log(f"toggle_states font_path: {self.toggle_states.get('image_font_path', '')}", level=logging.DEBUG)
self.logger.log(f"self.TEMP_IMAGE_DIR: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG)
self.logger.log(f"self.debugging_save_Dir: {self.debugging_save_Dir}", level=logging.DEBUG)
self.logger.log(f"self.unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG)
self.logger.log(f"self.inpaint_method: {self.inpaint_method}", level=logging.DEBUG)
# ----------------------------- 메모리 파편화 완화 -----------------------------
# Pillow 가 거대 이미지를 열 때 과도한 메모리를 점유하지 않도록 최대 픽셀 수 제한
max_px = self.toggle_states.get("max_image_pixels", 20_000_000) # 약 20MP, 필요 시 조정 (20MP = 4500x4500, 50MP=8000x6000)
Image.MAX_IMAGE_PIXELS = max_px
self.logger.log(f"Image.MAX_IMAGE_PIXELS set to {max_px}", level=logging.DEBUG)
# -----------------------------------------------------------------------------
# 각 모듈을 개별적으로 초기화하고 예외 처리
self.ocr_module = None # 기본값 설정
try:
self.ocr_module = Onnx_OCRModule(logger=self.logger, base_dir=self.base_dir, gpu_manager=self.gpu_manager, toggle_states=self.toggle_states)
self.logger.log("✅ ONNX OCR 모듈 초기화 성공", level=logging.INFO)
except Exception as e:
self.logger.log(f"❌ ONNX OCR 모듈 초기화 실패: {e}", level=logging.ERROR, exc_info=True)
self.ocr_module = None # 명시적으로 None 설정
# AI 번역기 초기화 (OpenRouter 또는 Gemma 선택)
# toggle_states에서 llm_provider 확인: "openrouter" 또는 "gemma"
llm_provider = self.toggle_states.get("llm_provider", "gemma")
self.ai_translator = None
if llm_provider == "openrouter" and OpenRouterTranslator is not None:
try:
openrouter_api_key = self.toggle_states.get("openrouter_api_key", "")
openrouter_model_id = self.toggle_states.get("openrouter_model_id", "xiaomi/mimo-v2-flash:free")
if openrouter_api_key:
self.ai_translator = OpenRouterTranslator(
api_key=openrouter_api_key,
model_id=openrouter_model_id,
timeout=int(self.toggle_states.get("llm_api_timeout", 120)),
logger=self.logger
)
self.logger.log(f"✅ OpenRouterTranslator 초기화 성공: 모델={openrouter_model_id}", level=logging.INFO)
else:
self.logger.log("⚠️ OpenRouter API 키가 설정되지 않음. Gemma로 폴백.", level=logging.WARNING)
llm_provider = "gemma" # 폴백
except Exception as e:
self.logger.log(f"❌ OpenRouterTranslator 초기화 실패: {e}. Gemma로 폴백.", level=logging.ERROR, exc_info=True)
llm_provider = "gemma" # 폴백
# Gemma 초기화 (기본 또는 폴백)
if self.ai_translator is None and GemmaTranslator is not None:
try:
self.ai_translator = GemmaTranslator(
base_url=self.toggle_states.get("gemma_api_base_url", "http://192.168.0.146:8008"),
timeout=int(self.toggle_states.get("gemma_api_timeout", 120)),
logger=self.logger
)
self.logger.log(f"gemma_api_base_url: {self.ai_translator.base_url}", level=logging.DEBUG)
self.logger.log(f"✅ GemmaTranslator 연결: base={self.ai_translator.base_url}", level=logging.INFO)
except Exception as e:
self.logger.log(f"❌ GemmaTranslator 연결 실패: {e}", level=logging.ERROR, exc_info=True)
self.ai_translator = None
# try:
# # CUDNN 버전 불일치 문제 해결을 위한 force_cpu 옵션
# # toggle_states에서 force_cpu_ocr 설정 확인 (기본값: False)
# force_cpu_ocr = self.toggle_states.get('force_cpu_ocr', False)
# force_cpu_ocr = True
# self.ocr_module = Paddle_OCRModule(
# logger=self.logger,
# base_dir=self.base_dir,
# gpu_manager=self.gpu_manager,
# force_cpu=force_cpu_ocr
# )
# if force_cpu_ocr:
# self.logger.log("✅ PaddleOCR 모듈 초기화 성공 (CPU 강제 모드)", level=logging.INFO)
# else:
# self.logger.log("✅ PaddleOCR 모듈 초기화 성공", level=logging.INFO)
# except Exception as e:
# self.logger.log(f"❌ PaddleOCR 모듈 초기화 실패: {e}", level=logging.ERROR, exc_info=True)
# self.ocr_module = None # 명시적으로 None 설정
try:
self.mask_module = MaskModule(logger=self.logger, base_dir=self.base_dir)
self.logger.log("MaskModule 초기화 성공", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"MaskModule 초기화 실패: {e}", level=logging.ERROR, exc_info=True)
try:
self.text_rendering_module = TextRenderingModuleOptimized(logger=self.logger, font_path=self.font_path)
self.logger.log("TextRenderingModule 초기화 성공", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"TextRenderingModule 초기화 실패: {e}", level=logging.ERROR, exc_info=True)
try:
self.postImageManager = PostImageManager(logger=self.logger, toggle_states=self.toggle_states)
self.logger.log("PostImageManager 초기화 성공", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"PostImageManager 초기화 실패: {e}", level=logging.ERROR, exc_info=True)
# PostImageManager는 중요한 모듈이므로 기본적인 fallback 생성
try:
self.postImageManager = self._create_fallback_post_image_manager()
self.logger.log("PostImageManager fallback 생성 성공", level=logging.INFO)
except Exception as e2:
self.logger.log(f"PostImageManager fallback 생성도 실패: {e2}", level=logging.ERROR, exc_info=True)
# self.background_removal_module = BackgroundRemovalModule(logger=self.logger, default_model="birefnet-general")
# self.background_removal_module = PPMattingBackgroundRemovalModule(logger=self.logger, default_model="ppmatting-hrnet-1x")
# self.background_removal_module = PPMattingBackgroundRemovalModule(logger=self.logger)
self.local_rembg_model_path = os.path.join(self.base_dir, "rembg_models", "BriaRMBG1.4_model_fp16.onnx")
try:
# Request_AI_Server에도 GPU 상태 전달
self.request_ai_server = Request_AI_Server(
logger=self.logger,
gpu_manager=self.gpu_manager,
local_rembg_model_path=self.local_rembg_model_path
)
self.logger.log("Request_AI_Server 초기화 성공", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"Request_AI_Server 초기화 실패: {e}", level=logging.ERROR, exc_info=True)
try:
self.gtranslate = GoogleTranslate()
if self.gtranslate is not None:
self.logger.log(f"GoogleTranslate 초기화 성공", level=logging.DEBUG)
else:
self.logger.log(f"GoogleTranslate 초기화 결과: None", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"GoogleTranslate 초기화 실패: {e}", level=logging.ERROR, exc_info=True)
# MIGAN ONNX 파이프라인 준비 (base_dir 기반 고정 경로 사용)
# ※ toggle_states['migan_onnx_path']는 더 이상 사용하지 않음
try:
from modules.migan_module import build_migan_from_toggle
# base_dir 기반 MIGAN 모델 경로 (고정)
migan_model_candidates = [
os.path.join(self.base_dir, "migan_onnx", "migan_pipeline_v2_simplified.onnx"),
os.path.join(self.base_dir, "migan_onnx", "migan_pipeline_v2.onnx"),
]
migan_onnx_path = ""
for candidate in migan_model_candidates:
if os.path.exists(candidate):
migan_onnx_path = candidate
self.logger.log(f"[MIGAN] 모델 경로 확인: {migan_onnx_path}", level=logging.DEBUG)
break
if not migan_onnx_path:
self.logger.log(f"[MIGAN] 모델 파일을 찾을 수 없음 (base_dir: {self.base_dir})", level=logging.WARNING)
self.logger.log(f"[MIGAN] 시도한 경로: {migan_model_candidates}", level=logging.DEBUG)
# MIGAN 초기화 조건
inpaint_method = self.toggle_states.get("inpaint_method", "request")
local_inpaint_method = self.toggle_states.get("local_inpaint_method", "migan")
should_init_migan = (
migan_onnx_path and
(inpaint_method == "migan" or local_inpaint_method == "migan")
)
if should_init_migan:
# GPU 상태에 따라 CUDA 사용 여부 결정
enhanced_toggle_states = self.toggle_states.copy()
# migan_onnx_path를 내부 고정 경로로 덮어쓰기
enhanced_toggle_states["migan_onnx_path"] = migan_onnx_path
if self.gpu_manager and self.gpu_manager.can_use_cuda:
enhanced_toggle_states["migan_use_cuda"] = enhanced_toggle_states.get("migan_use_cuda", False)
self.logger.log(f"MIGAN CUDA 사용 설정: {enhanced_toggle_states['migan_use_cuda']}", level=logging.DEBUG)
else:
enhanced_toggle_states["migan_use_cuda"] = False
self.logger.log("MIGAN CUDA 사용 불가 - CPU 모드로 설정", level=logging.DEBUG)
self.logger.log(f"[MIGAN] GPU 관리자 전달: {type(self.gpu_manager).__name__}, can_use_cuda: {self.gpu_manager.can_use_cuda if self.gpu_manager else 'N/A'}", level=logging.DEBUG)
self.migan = build_migan_from_toggle(enhanced_toggle_states, logger=self.logger, gpu_manager=self.gpu_manager)
self.logger.log(f"[MIGAN] 초기화 완료: gpu_manager 속성={hasattr(self.migan, 'gpu_manager')}, 값={getattr(self.migan, 'gpu_manager', None)}", level=logging.DEBUG)
else:
self.migan = None
self.logger.log(f"MIGAN 초기화 건너뜀: inpaint_method={inpaint_method}, local_inpaint_method={local_inpaint_method}, migan_onnx_path={bool(migan_onnx_path)}", level=logging.DEBUG)
# MIGAN 모델이 없으면 inpaint_method를 cv로 강제 설정
if not migan_onnx_path:
self.inpaint_method = 'cv'
self.logger.log(f"[MIGAN] 모델 없음 → inpaint_method를 'cv'로 강제 설정", level=logging.WARNING)
except Exception as e:
self.migan = None
self.inpaint_method = 'cv' # 초기화 실패 시 cv로 폴백
self.logger.log(f"MIGAN 초기화 실패 → inpaint_method를 'cv'로 강제 설정: {e}", level=logging.ERROR, exc_info=True)
# 인페인팅 실행 정보(마지막 사용 방식/장치) 추적용 내부 상태
self._last_inpaint_used = None
self._last_inpaint_device = None
# 외부 서버 헬스 체크 플래그
self.is_external_server_alive = False
# 초기 헬스체크 실행 (비동기로 실행하거나, 여기서 한 번 체크)
try:
if self.check_external_server_availability():
self.is_external_server_alive = True
self.logger.log(f"외부 인페인팅 서버 활성화 확인됨: {self.toggle_states.get('request_inpainting_server_url')}", level=logging.INFO)
except Exception:
pass
# MIGAN 동시 접근 방지용 락
self._migan_lock = threading.Lock()
except Exception as e:
self.logger.log(f"ImageProcessor3 초기화 중 치명적 오류 발생: {e}", level=logging.ERROR, exc_info=True)
# 치명적 오류 발생 시에도 기본 속성들이 None으로라도 설정되도록 보장
def _resolve_font_path(self, font_type):
"""font_type("폰트1" 등)에 해당하는 실제 폰트 파일 경로를 반환"""
import json
fonts_dir = os.path.join(self.base_dir, "fonts")
map_file = os.path.join(fonts_dir, "fonts_map.json")
# 기본 매핑 테이블 (파일 로드 실패 시 사용)
default_map = {
"폰트1": "HakgyoansimDunggeunmisoTTFB.ttf",
"폰트2": "NanumBarunGothic.ttf",
"폰트3": "NanumSquareRoundR.ttf",
"폰트4": "gamtanload.ttf",
"폰트5": "Cafe24Ohsquare-v2.0.ttf",
"폰트6": "GmarketSansTTFMedium.ttf",
"폰트7": "Paperlogy-5Medium.ttf",
"폰트8": "Pretendard-Regular.ttf",
}
font_map = default_map
# fonts_map.json 로드 시도
try:
if os.path.exists(map_file):
with open(map_file, "r", encoding="utf-8") as f:
font_map = json.load(f)
except Exception as e:
self.logger.log(f"폰트 매핑 파일 로드 실패 ({map_file}): {e}, 기본값 사용", level=logging.WARNING)
# 1. font_type에 매핑된 파일 찾기
font_key = str(font_type).strip()
# 매핑에 없으면 기본값(폰트1) 사용
font_filename = font_map.get(font_key, font_map.get("폰트1", default_map["폰트1"]))
# 2. 실제 경로 구성
candidate_path = os.path.join(fonts_dir, font_filename)
if os.path.exists(candidate_path):
return candidate_path
# 3. 파일이 없으면 기본 폰트(폰트1)로 폴백
default_filename = font_map.get("폰트1", default_map["폰트1"])
default_path = os.path.join(fonts_dir, default_filename)
if os.path.exists(default_path):
return default_path
# 4. 정말 아무것도 없으면 시스템 폰트 등 최후의 수단 (혹은 예외)
fallback_candidates = [
"C:/Windows/Fonts/malgun.ttf",
"C:/Windows/Fonts/gulim.ttc"
]
for p in fallback_candidates:
if os.path.exists(p):
return p
return default_path # 경로가 없어도 일단 리턴
def _create_fallback_post_image_manager(self):
"""PostImageManager 초기화 실패 시 사용할 fallback 객체 생성"""
class FallbackPostImageManager:
"""PostImageManager의 최소한 기능을 제공하는 fallback 클래스"""
def __init__(self, logger, toggle_states):
self.logger = logger
self.toggle_states = toggle_states
self.font = None
def update_toggle_states(self, toggle_states):
"""toggle_states 업데이트"""
self.toggle_states = toggle_states
self.logger.log("FallbackPostImageManager toggle_states 업데이트됨", level=logging.DEBUG)
def save_image_to_path(self, image, path):
"""기본적인 이미지 저장 기능"""
try:
if hasattr(image, 'save'):
image.save(path)
return True
else:
self.logger.log("이미지 객체에 save 메서드가 없습니다", level=logging.ERROR)
return False
except Exception as e:
self.logger.log(f"Fallback 이미지 저장 실패: {e}", level=logging.ERROR)
return False
def crop_image(self, image, is_thumb=False, crop_percentage=0.01):
"""기본적인 이미지 크롭 기능"""
try:
if hasattr(image, 'size') and hasattr(image, 'crop'):
width, height = image.size
left = width * crop_percentage
top = height * crop_percentage
right = width * (1 - crop_percentage)
bottom = height * (1 - crop_percentage)
return image.crop((left, top, right, bottom))
else:
self.logger.log("이미지 객체에 필요한 메서드가 없습니다", level=logging.ERROR)
return image
except Exception as e:
self.logger.log(f"Fallback 이미지 크롭 실패: {e}", level=logging.ERROR)
return image
return FallbackPostImageManager(self.logger, self.toggle_states)
def update_toggle_states(self, toggle_states):
self.toggle_states = toggle_states
# 1. 멤버십 및 권한 정보 업데이트
self.is_member_valid = self.toggle_states.get('membership_level', 'basic') == 'vip' or self.authenticated_by_admin
# 2. 인페인팅 설정 업데이트
# MIGAN이 초기화되지 않은 경우 inpaint_method를 'cv'로 강제 유지
requested_inpaint_method = self.toggle_states.get("inpaint_method", "migan")
if getattr(self, 'migan', None) is None and requested_inpaint_method == 'migan':
self.inpaint_method = 'cv'
self.logger.log(f"[UpdateToggle] MIGAN 미초기화 상태 → inpaint_method를 'cv'로 강제 유지 (요청: {requested_inpaint_method})", level=logging.WARNING)
else:
self.inpaint_method = requested_inpaint_method
self.use_local_rembg = self.toggle_states.get("use_local_rembg", False)
self.local_model_name = self.toggle_states.get("local_model_name", 'birefnet-general-lite')
# 3. 폰트 경로 및 렌더링 모듈 업데이트
try:
new_font_path = self._resolve_font_path(self.toggle_states.get("font_type", "폰트1"))
# 폰트가 변경되었다면 렌더링 모듈 재생성
if new_font_path and new_font_path != self.font_path:
self.font_path = new_font_path
# toggle_states에도 반영
self.toggle_states['image_font_path'] = self.font_path
if hasattr(self, 'text_rendering_module'):
try:
from modules.text_rendering_module import TextRenderingModuleOptimized
self.text_rendering_module = TextRenderingModuleOptimized(logger=self.logger, font_path=self.font_path)
self.logger.log(f"폰트 변경으로 텍스트 렌더링 모듈 재생성: {self.font_path}", level=logging.INFO)
except Exception as tr_err:
self.logger.log(f"텍스트 렌더링 모듈 재생성 실패: {tr_err}", level=logging.ERROR)
except Exception as e:
self.logger.log(f"폰트 정보 업데이트 중 오류: {e}", level=logging.WARNING)
if self.postImageManager is not None:
try:
self.postImageManager.update_toggle_states(self.toggle_states)
self.logger.log(f"이미지 프로세서 toggle_states 업데이트 : {self.toggle_states}", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"PostImageManager toggle_states 업데이트 중 오류: {e}", level=logging.ERROR, exc_info=True)
else:
self.logger.log("PostImageManager가 None이므로 toggle_states 업데이트를 건너뜁니다.", level=logging.WARNING)
# 외부 서버 헬스 체크 (toggle_states 업데이트 시마다 수행)
if self.inpaint_method == 'external_request':
self.is_external_server_alive = self.check_external_server_availability()
self.logger.log(f"외부 인페인팅 서버 상태 확인: {self.is_external_server_alive}", level=logging.DEBUG)
self.logger.log(f"[UpdateToggle] 완료: member={self.is_member_valid}, inpaint={self.inpaint_method}, font={os.path.basename(self.font_path)}", level=logging.DEBUG)
def update_unwanted_texts(self, texts):
self.unwanted_texts = texts
self.logger.log(f"이미지프로세서 unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG)
def __del__(self):
"""소멸자에서 리소스 정리"""
self.cleanup()
self.logger.log("이미지 프로세서 소멸", level=logging.DEBUG)
def cleanup(self):
"""리소스 정리"""
try:
# OCR 모듈 정리
if hasattr(self, 'ocr_module'):
try:
del self.ocr_module
self.logger.log("OCR 모듈 정리 완료", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"OCR 모듈 정리 중 오류: {e}", level=logging.WARNING)
# 마스크 모듈 정리
if hasattr(self, 'mask_module'):
try:
del self.mask_module
self.logger.log("마스크 모듈 정리 완료", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"마스크 모듈 정리 중 오류: {e}", level=logging.WARNING)
# 텍스트 렌더링 모듈 정리
if hasattr(self, 'text_renderer'):
try:
del self.text_renderer
self.logger.log("텍스트 렌더링 모듈 정리 완료", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"텍스트 렌더링 모듈 정리 중 오류: {e}", level=logging.WARNING)
# GPU 메모리 정리
if hasattr(self, 'gpu_manager') and self.gpu_manager and self.gpu_manager.can_use_cuda:
try:
import paddle
if hasattr(paddle, 'device') and hasattr(paddle.device, 'cuda'):
paddle.device.cuda.empty_cache()
self.logger.log("CUDA 캐시 정리 완료", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"CUDA 캐시 정리 중 오류: {e}", level=logging.WARNING)
# Python GC 강제 실행
import gc
gc.collect()
# OpenCV 윈도우 정리
try:
cv2.destroyAllWindows()
except:
pass
# 임시 폴더 삭제
if hasattr(self, 'TEMP_IMAGE_DIR') and os.path.exists(self.TEMP_IMAGE_DIR):
# shutil.rmtree(self.TEMP_IMAGE_DIR)
self.logger.log(f"임시 폴더 삭제됨: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"리소스 정리 중 오류: {e}", level=logging.ERROR, exc_info=True)
def reset_ocr_module(self):
"""OCR 모듈을 명시적으로 삭제하고 재생성."""
try:
self.logger.log("🔄 OCR 모듈 재초기화 시작", level=logging.DEBUG)
# 기존 모듈 참조 해제
if hasattr(self, 'ocr_module'):
del self.ocr_module
self.logger.log("기존 OCR 모듈 참조 해제 완료", level=logging.DEBUG)
# 안전을 위해 먼저 None으로 설정
self.ocr_module = None
# CUDA 메모리 정리 (GPU 사용 시)
if hasattr(self, 'gpu_manager') and self.gpu_manager and self.gpu_manager.can_use_cuda:
try:
import paddle
if hasattr(paddle, 'device') and hasattr(paddle.device, 'cuda'):
paddle.device.cuda.empty_cache()
self.logger.log("CUDA 캐시 정리 완료", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"CUDA 캐시 정리 실패: {e}", level=logging.WARNING)
# Python GC 강제 실행 (여러 번 실행으로 더 강력한 정리)
import gc
for i in range(3):
collected = gc.collect()
if collected > 0:
self.logger.log(f"GC 실행 {i+1}: {collected}개 객체 정리", level=logging.DEBUG)
# ONNX OCR 모듈로 재초기화
try:
from modules.onnx_ocr_module.src.onnx_ocr_wrapper import ONNXOCRModule as OCRModule
self.ocr_module = OCRModule(
logger=self.logger,
base_dir=self.base_dir,
gpu_manager=self.gpu_manager,
toggle_states=self.toggle_states,
)
self._ocr_call_count = 0
self.logger.log("✅ ONNX OCR 모듈 재초기화 완료", level=logging.INFO)
return True
except Exception as init_error:
self.logger.log(f"❌ ONNX OCR 모듈 재초기화 중 오류: {init_error}", level=logging.ERROR, exc_info=True)
self.ocr_module = None
return False
except Exception as e:
self.logger.log(f"❌ OCR 모듈 재초기화 실패: {e}", level=logging.ERROR, exc_info=True)
# 안전을 위해 None으로 설정
self.ocr_module = None
return False
def is_valid_image_path(self, path):
# http/https 또는 로컬 파일(.jpg, .png 등) 모두 허용
if re.match(r'^(http|https)://.*\.(jpg|jpeg|png|bmp|gif|webp|tiff?)(\?.*)?$', path, re.IGNORECASE):
return True
if os.path.isfile(path) and path.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tif', '.tiff')):
return True
return False
async def process_single_image(self, original_image_url, index, delay=1.0, file_prefix="", model_name: str = "migan"):
"""
단일 이미지를 처리합니다 (다운로드 -> OCR -> 인페인팅)
Args:
page: Playwright 페이지 객체
original_image_url (str): 처리할 이미지 URL
index (int): 이미지 인덱스
is_localServer (bool): 로컬 서버 사용 여부
delay (float): 요청 간격 (초)
file_prefix (str): 파일명에 추가할 접두사 (예: "detail", "option")
Returns:
dict: 처리 결과를 포함한 딕셔너리
- status: 'inpainted', 'original', 'exclude', 'error' 중 하나
- path: 처리된 이미지 파일 경로 또는 원본 이미지 파일 경로
- error: 오류 메시지 (status가 'error'인 경우에만 포함)
"""
local_image_path = None
delay = random.uniform(0.5, 1.5)
delay = delay / 3 # 봇 탐지 회피를 위해 요청 간격 조절 - 자체번역으로 간격 최소화
# 파이프라인 타이밍 계측 시작
import time as _time
_t_all_start = time.time()
_timings_ms = {}
# 직전 인페인트 사용 방식 초기화
self._last_inpaint_used = None
self._last_inpaint_device = None
# self.logger.log(f"unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG)
self.logger.log(f"이미지 번역시작", level=logging.DEBUG)
# self.logger.log(f"toggle_states: {self.toggle_states}", level=logging.DEBUG)
try:
# 0. 이미지 URL 유효성 체크 (http/https & 이미지 확장자)
if not original_image_url or not isinstance(original_image_url, str):
self.logger.log(f"이미지 {index+1} 처리 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING)
return {'status': 'failed', 'path': original_image_url, 'error': '이미지 URL 없음 또는 타입 오류', 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device}
if not self.is_valid_image_path(original_image_url):
self.logger.log(f"이미지 {index+1} 처리 중단: 유효하지 않은 이미지 주소 - {original_image_url}", level=logging.WARNING)
return {'status': 'failed', 'path': original_image_url, 'error': '유효하지 않은 이미지 주소', 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device}
# 요청 간격 조절 (봇 탐지 회피)
if delay > 0:
await asyncio.sleep(delay)
# OCR 권한 상태 로그
ocr_enabled = self.toggle_states.get('ocr', False)
processing_mode = "OCR+인페인팅 모드" if ocr_enabled else "전체 번역 모드"
self.logger.log(f"이미지 {index+1} 처리 시작: {original_image_url} - {processing_mode}", level=logging.DEBUG)
# 1. 이미지 다운로드
_t = _time.time()
local_image_path = self.download_image(image_url=original_image_url, index=index, file_prefix=file_prefix)
_timings_ms["download"] = (_time.time() - _t) * 1000.0
if not local_image_path:
self.logger.log(f"이미지 {index+1} 다운로드 실패, 원본 URL 반환", level=logging.WARNING)
return {'status': 'failed', 'path': original_image_url, 'error': '다운로드 실패', 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device}
# 1-A. 상세페이지 이미지 전처리 (크기 표준화 및 분할)
if file_prefix == "detail":
local_image_path = await self.preprocess_detail_image(local_image_path, index)
if not local_image_path:
self.logger.log(f"상세페이지 이미지 {index+1} 전처리 실패", level=logging.WARNING)
return {'status': 'failed', 'path': original_image_url, 'error': '상세페이지 이미지 전처리 실패'}
# elif file_prefix == "thumb":
# 1-B. 썸네일 이미지는 고해상도 입력 다운스케일 (메모리 절약)
# max_dim = self.toggle_states.get('max_image_resolution', 1200)
# local_image_path = self.downscale_image_if_large(local_image_path, max_dim=max_dim)
else:
# 1-C. 옵션 이미지는 스케일 처리 건너뛰기 (이미 작은 크기)
self.logger.log(f"옵션 이미지는 스케일 처리 건너뛰기: {file_prefix}", level=logging.DEBUG)
# 전처리 유형에 따른 로그 메시지
if file_prefix == "detail":
processing_type = "상세페이지 전처리 완료"
elif file_prefix == "thumb":
processing_type = "썸네일 스케일 처리 완료"
else:
processing_type = "옵션 이미지 원본 유지"
self.logger.log(f"이미지 {index+1} 로컬 저장위치({processing_type}): {local_image_path}", level=logging.DEBUG)
# 2. OCR 텍스트 감지
_t = _time.time()
# 메모리 추적: OCR 시작 전
ocr_before_mem = psutil.virtual_memory()
ocr_before_mb = ocr_before_mem.used / 1024 / 1024
ocr_results = self.safe_detect(local_image_path)
_timings_ms["ocr"] = (_time.time() - _t) * 1000.0
# 메모리 추적: OCR 완료 후
ocr_after_mem = psutil.virtual_memory()
ocr_after_mb = ocr_after_mem.used / 1024 / 1024
ocr_change_mb = ocr_after_mb - ocr_before_mb
ocr_change_percent = (ocr_change_mb / ocr_before_mb) * 100 if ocr_before_mb > 0 else 0
self.logger.log(
f"메모리 변화 [OCR 처리]: {ocr_before_mb:.1f}MB -> {ocr_after_mb:.1f}MB "
f"({ocr_change_mb:+.1f}MB, {ocr_change_percent:+.1f}%) - 이미지 {index+1}",
level=logging.DEBUG if abs(ocr_change_mb) < 10 else logging.INFO
)
self.logger.log(f"ocr_results: {ocr_results}", level=logging.DEBUG)
# 2-A. 상세페이지인 경우 OCR 정보 수집 및 저장
if file_prefix == "detail":
store_ocr_to_db = self.toggle_states.get('store_ocr_data_to_db', False) # 기본값: False
if store_ocr_to_db:
await self.collect_and_store_ocr_data(original_image_url, local_image_path, ocr_results, index)
else:
# 메모리에만 저장 (현재 세션용)
await self.collect_ocr_data_in_memory(original_image_url, local_image_path, ocr_results, index)
filter_ocr_results = self.filter_ocr_results(ocr_results)
self.logger.log(f"filter_ocr_results: {filter_ocr_results}", level=logging.DEBUG)
ocr_count = len(filter_ocr_results)
# 3. OCR 모듈 상태 확인 및 중국어 텍스트 검사
if not hasattr(self, 'ocr_module') or self.ocr_module is None:
self.logger.log("⚠️ OCR 모듈이 초기화되지 않아 원본 이미지 반환", level=logging.WARNING)
return {'status': 'original', 'path': local_image_path, 'message': 'OCR 모듈 초기화 실패', 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device}
# 중국어 텍스트가 없는 경우 정상 케이스로 처리
if not self.ocr_module.filter_chinese_text(filter_ocr_results):
self.logger.log(f"이미지 {index+1} 중국어 텍스트 없음 - 정상 케이스 (NO_TEXT)", level=logging.INFO)
return {'status': 'original', 'path': local_image_path, 'message': '중국어 텍스트가 발견되지 않았습니다', 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device}
# 4. 한글 텍스트가 존재하는 경우 원본 이미지 반환으로 번역 패스
if self.ocr_module.filter_korean_text(filter_ocr_results):
self.logger.log(f"이미지 {index+1} 한글 텍스트 존재, 원본 이미지 반환", level=logging.DEBUG)
return {'status': 'original', 'path': local_image_path, 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device}
# 4. 병렬 실행: 번역(I/O) ↔ 마스크 생성(CPU)
# - OCR 결과를 기반으로 두 작업은 서로 독립이므로 동시에 수행 가능
if ocr_count < 5:
expansion_size = 12
blur_size = 15
elif ocr_count < 10:
expansion_size = 9
blur_size = 12
elif ocr_count < 15:
expansion_size = 7
blur_size = 9
elif ocr_count < 20:
expansion_size = 5
blur_size = 6
else:
expansion_size = 10
blur_size = 15
loop = asyncio.get_running_loop()
_t_parallel_start = _time.time()
# 개별 태스크 완료 시간 기록용 변수
_translate_done_time = None
_mask_done_time = None
async def timed_translate():
nonlocal _translate_done_time
result = await loop.run_in_executor(
None,
lambda: self.translate_ocr_results(filter_ocr_results)
)
_translate_done_time = _time.time()
return result
async def timed_mask():
nonlocal _mask_done_time
result = await loop.run_in_executor(
None,
lambda: self.mask_module.create_masks(
image_path=local_image_path,
ocr_results=filter_ocr_results,
mask_option="basic",
expansion_size=expansion_size,
blur_size=blur_size
)
)
_mask_done_time = _time.time()
return result
translated_texts, masks = await asyncio.gather(timed_translate(), timed_mask())
# 개별 태스크 시간 계산 (병렬 시작 시점 기준)
_timings_ms["translate"] = (_translate_done_time - _t_parallel_start) * 1000.0 if _translate_done_time else 0
_timings_ms["mask"] = (_mask_done_time - _t_parallel_start) * 1000.0 if _mask_done_time else 0
self.logger.log(f"translated_texts: {translated_texts}", level=logging.DEBUG)
self.logger.log(f"마스크 생성 완료", level=logging.DEBUG)
# 5. OCR 권한에 따른 텍스트 필터링
if ocr_enabled:
filtered_translated_texts = self.process_translated_texts(translated_texts, local_image_path, index)
if not filtered_translated_texts:
self.logger.log(f"이미지 {index+1} 제외됨", level=logging.DEBUG)
return {'status': 'exclude', 'path': local_image_path, 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device}
else:
self.logger.log(f"이미지 {index+1} 치환됨", level=logging.DEBUG)
else:
# OCR 권한이 없으면 번역된 텍스트를 그대로 사용
filtered_translated_texts = translated_texts
self.logger.log(f"이미지 {index+1} OCR 권한 없음, 전체 번역 모드", level=logging.DEBUG)
# ------------------- 인페인트 엔진 자동 선택 (옵션) -------------------
try:
inpaint_model_pref = self.toggle_states.get('inpaint_model', 'auto').lower()
except Exception:
inpaint_model_pref = 'auto'
if inpaint_model_pref in ['lama', 'migan', 'cv']:
# 명시적 모델 지정 시 강제
forced_map = {'lama': 'request', 'migan': 'migan', 'cv': 'cv'}
self.inpaint_method = forced_map.get(inpaint_model_pref, self.inpaint_method)
self.logger.log(f"[inpaint_model 강제] {inpaint_model_pref} → inpaint_method={self.inpaint_method}", level=logging.INFO)
else:
# auto: 마스크 통계 기반으로 결정
try:
analysis = self._analyze_mask_stats(masks)
# 임계값 (토글로 조정 가능)
area_large_thr = float(self.toggle_states.get('inpaint_auto_area_large_thresh', 0.08))
area_small_thr = float(self.toggle_states.get('inpaint_auto_area_small_thresh', 0.03))
dist_thr = float(self.toggle_states.get('inpaint_auto_min_distance_ratio', 0.10))
choose = 'external_request' # 기본: lama(서버)
if analysis['coverage_ratio'] <= area_small_thr and \
analysis['component_count'] >= 1 and \
analysis['min_centroid_distance_ratio'] >= dist_thr:
choose = 'migan'
elif analysis['coverage_ratio'] >= area_large_thr:
choose = 'external_request'
else:
# 중간대: lama 우선 (품질 우선)
choose = 'external_request'
self.logger.log(
f"[AUTO Inpaint] coverage={analysis['coverage_ratio']:.3f}, comps={analysis['component_count']}, "
f"min_center_dist={analysis['min_centroid_distance_ratio']:.3f}{choose}",
level=logging.INFO,
)
self.inpaint_method = choose
except Exception as auto_err:
self.logger.log(f"AUTO 인페인트 선택 실패: {auto_err}", level=logging.WARNING)
# if not self.is_frozen():
# # 디버깅 이미지 저장 (OCR 박스 + 마스크 시각화)
# self.save_debug_images(local_image_path, filter_ocr_results, masks, index, file_prefix)
self.logger.log(f"ocr_count: {ocr_count}", level=logging.DEBUG)
self.logger.log(f"is_member_valid: {self.is_member_valid}", level=logging.DEBUG)
# 인페인팅 방법 설정
preferred_method = self.set_inpaint_method(file_prefix)
self.logger.log(f"최종 inpaint_method: {preferred_method}", level=logging.DEBUG)
# self.inpaint_method = 'migan'
# 인페인팅 실행 (폴백 순서: 자체서버 > GPU > CPU)
_t = _time.time()
# 동기 함수인 execute_inpaint_with_fallback을 스레드 풀에서 실행
inpainted_image = await loop.run_in_executor(
None,
lambda: self.execute_inpaint_with_fallback(local_image_path, masks, ocr_count, preferred_method, model_name=model_name)
)
_timings_ms["inpaint"] = (_time.time() - _t) * 1000.0
self.logger.log(f"인페인팅 완료", level=logging.DEBUG)
# # 개발환경에서 인페인트 결과 디버깅 저장
# if not self.is_frozen():
# try:
# self.save_inpaint_debug_image(inpainted_image, index, file_prefix)
# except Exception:
# pass
# 인페인팅 실패 시 원본 이미지 사용
if inpainted_image is None:
self.logger.log(f"인페인팅 실패, 원본 이미지 사용", level=logging.WARNING)
inpainted_image = cv2.imread(local_image_path)
if inpainted_image is None:
self.logger.log(f"원본 이미지 로드 실패: {local_image_path}", level=logging.ERROR)
return {'status': 'failed', 'path': local_image_path, 'error': '원본 이미지 로드 실패', 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device}
# 텍스트 렌더링
_t = _time.time()
text_rendered_image = self.text_rendering_module.render_text(
inpainted_image, filter_ocr_results, filtered_translated_texts)
_timings_ms["render"] = (_time.time() - _t) * 1000.0
self.logger.log(f"텍스트 렌더링 완료", level=logging.DEBUG)
# 결과 저장
_t = _time.time()
translated_img_path = await self.postProcess_and_save_image(local_image_path, text_rendered_image, index, file_prefix)
_timings_ms["save"] = (_time.time() - _t) * 1000.0
self.logger.log(f"이미지 {index+1} 번역 완료: {translated_img_path}", level=logging.DEBUG)
# GPU 메모리 사용량 로깅 (CUDA 사용 시)
if self.gpu_manager and self.gpu_manager.can_use_cuda:
self.gpu_manager.log_gpu_memory_usage()
return {'status': 'translated', 'path': translated_img_path, 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device}
except Exception as e:
self.logger.log(f"이미지 {index+1} 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
return {'status': 'failed', 'path': local_image_path or original_image_url, 'error': str(e), 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device}
finally:
# ---- 메모리 해제 ----
try:
ocr_results = None
filter_ocr_results = None
translated_texts = None
masks = None
inpainted_image = None
text_rendered_image = None
except Exception:
pass
# download_image 단계에서 사용한 page, local_image_path 도 참조 제거
local_image_path = None
# 최종 GC 강제 실행
gc.collect()
# 파이프라인 시간 요약 로그
try:
_t_all_end = _time.time()
total_ms = (_t_all_end - _t_all_start) * 1000.0
parts = []
label_map = {
"download": "download",
"ocr": "ocr",
"translate": "translate",
"mask": "mask",
"inpaint": "inpaint",
"render": "render",
"save": "save",
}
for k in ["download", "ocr", "translate", "mask", "inpaint", "render", "save"]:
if k in _timings_ms:
if k == "inpaint" and getattr(self, "_last_inpaint_used", None):
used = self._last_inpaint_used
dev = getattr(self, "_last_inpaint_device", None) or "CPU"
parts.append(f"{label_map[k]}={_timings_ms[k]:.1f}ms({used}/{dev})")
else:
parts.append(f"{label_map[k]}={_timings_ms[k]:.1f}ms")
timeline = " | ".join(parts) if parts else ""
self.logger.log(
f"⏱ 이미지 파이프라인 총 {total_ms:.1f}ms{(' | ' + timeline) if timeline else ''}",
level=logging.DEBUG,
)
# 결과 timings를 리턴 데이터에 포함시키기 위해 attrs에 저장(워커가 그대로 전달)
self._last_timings = {"total_ms": total_ms, **{k: float(f"{v:.1f}") for k, v in _timings_ms.items()}}
except Exception:
pass
def set_inpaint_method(self, file_prefix: str) -> str:
"""인페인팅 방법 설정
우선순위:
1. AUTO 모드에서 이미 external_request가 선택된 경우 → external_request
2. self.inpaint_method가 설정된 경우 → 해당 값 사용
3. 기본값 → migan
"""
# 1. AUTO 모드에서 이미 external_request가 선택된 경우 우선 사용
current_method = getattr(self, 'inpaint_method', None)
if current_method == 'external_request':
self.logger.log(f"[set_inpaint_method] AUTO 모드에서 external_request 선택됨 → 유지", level=logging.DEBUG)
return "external_request"
# 2. self.inpaint_method가 설정된 경우 해당 값 사용
if current_method and current_method in ('migan', 'cv', 'external_request'):
self.logger.log(f"[set_inpaint_method] 기존 설정 사용 → {current_method}", level=logging.DEBUG)
return current_method
# 3. 기본값: migan
self.logger.log(f"[set_inpaint_method] 기본값 사용 → migan", level=logging.DEBUG)
return "migan"
def execute_inpaint_with_fallback(self, local_image_path: str, masks, ocr_count: int, preferred_method: str = None, model_name: str = "migan"):
"""
인페인팅 실행 - preferred_method 설정에 따라 분기.
'external_request'인 경우 VIP 체크 후 외부 서버 시도, 실패 시 MIGAN 폴백.
그 외의 경우(또는 폴백 시) MIGAN 사용.
Args:
local_image_path: 이미지 파일 경로
masks: 마스크 데이터
ocr_count: OCR 결과 개수
preferred_method: 선호하는 인페인팅 방식
Returns:
인페인팅된 이미지 또는 None
"""
# 메모리 추적: 인페인팅 시작 전
inpaint_before_mem = psutil.virtual_memory()
inpaint_before_mb = inpaint_before_mem.used / 1024 / 1024
inpainted_image = None
current_method = preferred_method
# 1. 사용자 설정 확인 (preferred_method가 설정되어 있으면 최우선, 없으면 토글값)
if not current_method:
current_method = self.toggle_states.get("inpaint_method", "migan")
server_url = self.toggle_states.get("request_inpainting_server_url", "")
# 2. External Request 모드일 때 처리
if current_method == "external_request":
if self.is_member_valid:
# 실시간 헬스 체크 수행 (캐시된 값 대신)
is_server_alive = self.check_external_server_availability()
self.is_external_server_alive = is_server_alive # 캐시 업데이트
if not is_server_alive:
self.logger.log("외부 서버 상태 비정상(헬스 체크 실패) -> 로컬 MIGAN으로 폴백", level=logging.WARNING)
elif server_url and str(server_url).strip().startswith("http"):
# 외부 서버 시도
current_method = 'external_request'
inpainted_image = self._try_external_inpaint(local_image_path, masks, str(server_url).strip(), model_name=model_name)
if inpainted_image is not None:
self._last_inpaint_used = "external_request"
self._last_inpaint_device = "SERVER"
else:
self.logger.log("외부 서버 인페인팅 실패 -> 로컬 MIGAN으로 폴백", level=logging.WARNING)
else:
self.logger.log("외부 서버 URL이 유효하지 않음 -> 로컬 MIGAN으로 폴백", level=logging.WARNING)
else:
self.logger.log("VIP 회원이 아님 -> 로컬 MIGAN으로 폴백", level=logging.WARNING)
# 3. 로컬 MIGAN 인페인팅 (기본값, 또는 외부 요청 실패/조건 미충족 시)
if inpainted_image is None and current_method != 'cv':
current_method = 'migan'
inpainted_image = self._try_migan_inpaint(local_image_path, masks)
# _try_migan_inpaint 내부에서 _last_inpaint_used 설정함
# 4. CV 인페인팅 폴백 (MIGAN 실패, 미초기화 또는 애초에 cv 선택 시)
if inpainted_image is None:
# 명시적으로 cv를 선택했거나, 위 단계들에서 모두 실패하여 최종 폴백이 필요한 경우
current_method = 'cv'
# MIGAN/Server 실패 로그가 이미 출력되었을 수 있으므로, 여기서 CV 시도 로그 출력
self.logger.log("인페인팅 폴백: OpenCV(Telea) 방식 시도", level=logging.INFO)
try:
# 원본 이미지 로드
img = cv2.imread(local_image_path)
if img is None:
raise Exception(f"이미지 로드 실패: {local_image_path}")
# 마스크 병합 (다양한 형태 지원)
combined_mask = np.zeros(img.shape[:2], dtype=np.uint8)
mask_count = 0
if isinstance(masks, np.ndarray):
# 단일 numpy 배열인 경우
if masks.ndim == 2:
if masks.shape != combined_mask.shape:
masks = cv2.resize(masks, (combined_mask.shape[1], combined_mask.shape[0]))
combined_mask = masks.astype(np.uint8)
mask_count = 1
self.logger.log(f"[CV Inpaint] numpy 마스크 사용: shape={masks.shape}", level=logging.DEBUG)
elif isinstance(masks, list):
for m_item in masks:
if isinstance(m_item, str) and os.path.exists(m_item):
m_img = cv2.imread(m_item, cv2.IMREAD_GRAYSCALE)
if m_img is not None:
# 크기 맞추기
if m_img.shape != combined_mask.shape:
m_img = cv2.resize(m_img, (combined_mask.shape[1], combined_mask.shape[0]))
combined_mask = cv2.bitwise_or(combined_mask, m_img)
mask_count += 1
elif isinstance(m_item, np.ndarray) and m_item.ndim == 2:
# 리스트 내 numpy 배열
if m_item.shape != combined_mask.shape:
m_item = cv2.resize(m_item, (combined_mask.shape[1], combined_mask.shape[0]))
combined_mask = cv2.bitwise_or(combined_mask, m_item.astype(np.uint8))
mask_count += 1
self.logger.log(f"[CV Inpaint] 리스트 마스크 병합: {mask_count}", level=logging.DEBUG)
elif isinstance(masks, str) and os.path.exists(masks):
# 단일 파일 경로
m_img = cv2.imread(masks, cv2.IMREAD_GRAYSCALE)
if m_img is not None:
if m_img.shape != combined_mask.shape:
m_img = cv2.resize(m_img, (combined_mask.shape[1], combined_mask.shape[0]))
combined_mask = m_img
mask_count = 1
self.logger.log(f"[CV Inpaint] 단일 파일 마스크 사용: {masks}", level=logging.DEBUG)
# 마스크가 비어있으면 경고
if np.sum(combined_mask) == 0:
self.logger.log(f"[CV Inpaint] 경고: 마스크가 비어있음 (mask_count={mask_count}, masks type={type(masks).__name__})", level=logging.WARNING)
# 인페인팅 실행
inpainted_image = cv2.inpaint(img, combined_mask, 3, cv2.INPAINT_TELEA)
if inpainted_image is not None:
self.logger.log("OpenCV 인페인팅 성공", level=logging.DEBUG)
self._last_inpaint_used = "cv"
self._last_inpaint_device = "CPU"
except Exception as e:
self.logger.log(f"OpenCV 인페인팅 실패: {e}", level=logging.WARNING)
inpainted_image = None
# 메모리 추적: 인페인팅 완료 후
inpaint_after_mem = psutil.virtual_memory()
inpaint_after_mb = inpaint_after_mem.used / 1024 / 1024
inpaint_change_mb = inpaint_after_mb - inpaint_before_mb
inpaint_change_percent = (inpaint_change_mb / inpaint_before_mb) * 100 if inpaint_before_mb > 0 else 0
self.logger.log(
f"메모리 변화 [인페인팅]: {inpaint_before_mb:.1f}MB -> {inpaint_after_mb:.1f}MB "
f"({inpaint_change_mb:+.1f}MB, {inpaint_change_percent:+.1f}%) - 방법: {current_method}",
level=logging.DEBUG if abs(inpaint_change_mb) < 10 else logging.INFO
)
return inpainted_image
def _try_request_inpaint(self, local_image_path: str, masks, ocr_count: int):
"""자체서버 인페인팅 시도"""
try:
if not self.is_member_valid:
self.logger.log("멤버십이 유효하지 않아 자체서버 인페인팅 건너뜀", level=logging.DEBUG)
return None
if ocr_count <= 3:
self.logger.log(f"OCR 결과가 적어 자체서버 인페인팅 건너뜀 (ocr_count: {ocr_count})", level=logging.DEBUG)
return None
self.logger.log(f"자체서버 인페인팅 시도", level=logging.DEBUG)
# 서버 마스크 규약: 정상(반전 불필요)로 확정됨
# invert_mask = bool(self.toggle_states.get("inpaint_mask_invert", False))
invert_mask = False
# inpaint_model = self.toggle_states.get("inpaint_model", "simple-lama")
inpaint_model = "migan"
inpaint_model = "simple-lama"
result = self.request_ai_server.request_inpaint(local_image_path, masks, invert_mask=invert_mask, inpaint_model=inpaint_model)
if result is not None:
self.logger.log("자체서버 인페인팅 성공", level=logging.DEBUG)
self._last_inpaint_used = "external_request"
self._last_inpaint_device = "SERVER"
return result
else:
self.logger.log("자체서버 인페인팅 실패", level=logging.WARNING)
return None
except Exception as e:
self.logger.log(f"자체서버 인페인팅 중 오류: {e}", level=logging.WARNING, exc_info=True)
return None
def check_external_server_availability(self):
"""외부 인페인팅 서버 유효성 체크"""
try:
server_url = self.toggle_states.get("request_inpainting_server_url", "")
if not server_url or not str(server_url).strip().startswith("http"):
return False
if self.request_ai_server:
return self.request_ai_server.is_server_alive(str(server_url).strip())
return False
except Exception:
return False
def _try_external_inpaint(self, local_image_path: str, masks, server_url: str, model_name: str = "migan"):
"""외부 서버 인페인팅 시도"""
try:
if self.request_ai_server is None:
return None
self.logger.log(f"외부 인페인팅 시도: {server_url}", level=logging.DEBUG)
# 모델명은 필요하면 토글에서 가져올 수 있음, 현재는 기본값
result = self.request_ai_server.request_external_inpaint(local_image_path, masks, server_url, model_name=model_name)
return result
except Exception as e:
self.logger.log(f"외부 인페인팅 중 오류: {e}", level=logging.WARNING, exc_info=True)
return None
def _try_migan_inpaint(self, local_image_path: str, masks):
"""MIGAN GPU 인페인팅 시도"""
try:
if getattr(self, "migan", None) is None:
self.logger.log("MIGAN 모듈이 초기화되지 않아 건너뜀", level=logging.DEBUG)
return None
# MIGAN 실행 시 락 처리
# - DirectML/CUDA: 동시성 이슈(드라이버 행) 방지를 위해 락 사용 권장
# - CPU: 동시 실행해도 안전하므로 락 없이 병렬 처리 가능 (CPU 점유율은 올라감)
is_cpu_mode = False
try:
# 현재 세션의 provider 확인
if hasattr(self.migan, "session") and hasattr(self.migan.session, "get_providers"):
providers = self.migan.session.get_providers()
if providers and "CPUExecutionProvider" in providers[0]: # CPU가 최우선 순위인 경우
is_cpu_mode = True
except Exception:
pass
if is_cpu_mode:
# CPU 모드: 락 없이 병렬 실행하되, 과도한 점유 방지를 위해 약간의 슬립(양보)
# (OS 스케줄러가 알아서 분배하겠지만, 명시적 양보로 UI 프리징 등 방지)
time.sleep(0.01)
self.logger.log("MIGAN 인페인팅 시도 (CPU Mode - No Lock)", level=logging.DEBUG)
result = self.migan.inpaint(local_image_path, masks)
else:
# GPU(DirectML/CUDA) 모드: 락 사용
with self._migan_lock:
self.logger.log("MIGAN 인페인팅 시도 (GPU Mode - Lock Acquired)", level=logging.DEBUG)
result = self.migan.inpaint(local_image_path, masks)
if result is not None:
self.logger.log("MIGAN 인페인팅 성공", level=logging.DEBUG)
# 사용 장치 기록
try:
providers = []
if hasattr(self.migan, "session") and hasattr(self.migan.session, "get_providers"):
providers = self.migan.session.get_providers()
if any("Dml" in p for p in providers):
dev = "DirectML"
elif any("CUDA" in p for p in providers):
dev = "CUDA"
else:
dev = "CPU"
except Exception:
dev = "GPU" if (hasattr(self, "gpu_manager") and self.gpu_manager and getattr(self.gpu_manager, "can_use_cuda", False)) else "CPU"
self._last_inpaint_used = "migan"
self._last_inpaint_device = dev
return result
else:
self.logger.log("MIGAN 인페인팅 실패", level=logging.WARNING)
return None
except Exception as e:
self.logger.log(f"MIGAN 인페인팅 중 오류: {e}", level=logging.WARNING, exc_info=True)
return None
def _try_opencv_inpaint(self, local_image_path: str, masks):
"""MIGAN 통일 이후 비활성화(호환용). 항상 None 반환"""
self.logger.log("OpenCV 인페인팅 경로는 비활성화됨(MIGAN 통일)", level=logging.DEBUG)
return None
# OCR 결과 필터링: 중국어 텍스트만 필터링
def filter_ocr_results(self, ocr_results):
import re
# 중국어 문자 범위 정규식 (한자)
chinese_pattern = re.compile(r'[\u4e00-\u9fff]+')
filtered_results = []
for r in ocr_results:
text = r.get('text', '').strip()
polygon = r.get('polygon', [])
confidence = r.get('confidence', 0.0)
# 텍스트가 비어있거나 polygon이 3점 미만이면 제외
if not text or not polygon or len(polygon) < 3:
self.logger.log(f"[필터링] 제외 (텍스트/폴리곤): '{text}'", level=logging.DEBUG)
continue
# 신뢰도 20% 미만이면 제외
if confidence < 0.05:
self.logger.log(f"[필터링] 제외 (신뢰도 {confidence:.1%}): '{text}'", level=logging.DEBUG)
continue
# 중국어 문자가 포함된 텍스트만 필터링
if chinese_pattern.search(text):
filtered_results.append(r)
self.logger.log(f"[필터링] 포함 (신뢰도 {confidence:.1%}): '{text}'", level=logging.DEBUG)
else:
self.logger.log(f"[필터링] 제외 (중국어 없음): '{text}'", level=logging.DEBUG)
self.logger.log(f"필터링 결과: {len(filtered_results)}/{len(ocr_results)}개 (신뢰도 + & 중국어)", level=logging.DEBUG)
return filtered_results
async def postProcess_and_save_image(self, local_image_path, text_rendered_image, index, file_prefix=""):
"""로컬 서버 URL을 사용해 이미지를 번역하고 로컬에 저장합니다"""
try:
# text_rendered_image가 None인 경우 처리
if text_rendered_image is None:
self.logger.log(f"이미지 {index+1} 번역 결과가 None입니다. 원본 이미지를 반환합니다.", level=logging.WARNING)
return local_image_path
# 파일명에 접두사 포함 (설정에 따른 이미지 형식 선택)
# 기본값: WebP (최고 압축률), 호환성 필요시 PNG 사용 가능
image_format = self.toggle_states.get('output_image_format', 'webp').lower()
# WebP 지원 여부 확인 (PIL에서 WebP를 지원하지 않는 경우 PNG로 폴백)
if image_format == 'webp':
try:
if not features.check('webp'):
self.logger.log("WebP 지원되지 않음, PNG로 폴백", level=logging.WARNING)
image_format = 'png'
except Exception:
image_format = 'png'
# 지원되지 않는 형식인 경우 PNG로 폴백
if image_format not in ['webp', 'png', 'jpg', 'jpeg']:
self.logger.log(f"지원되지 않는 형식 {image_format}, PNG로 폴백", level=logging.WARNING)
image_format = 'png'
file_ext = 'webp' if image_format == 'webp' else ('png' if image_format == 'png' else 'jpg')
if file_prefix:
img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_{file_prefix}_img_{index+1}.{file_ext}")
else:
img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_img_{index+1}.{file_ext}")
watermark_text=self.toggle_states.get("watermark_text", "이미지 저작권 보유")
is_watermark_enabled = watermark_text != "" or self.toggle_states.get("watermark_toggle", False)
self.logger.log(f"watermark_text: {watermark_text}", level=logging.DEBUG)
self.logger.log(f"is_watermark_enabled: {is_watermark_enabled}", level=logging.DEBUG)
image_data_to_save = None
# file_prefix가 'detail' 또는 'option'일 때만 워터마크 추가
if is_watermark_enabled and file_prefix in ["detail"]:
# image_data_to_save = self.postImageManager.add_watermark(
# image_data=text_rendered_image,
# watermark_text=watermark_text
# )
# 워터마크 기능 비활성화 중이므로 원본 이미지 사용
if isinstance(text_rendered_image, np.ndarray):
image_data_to_save = text_rendered_image # 그대로 넘김
else:
image_data_to_save = text_rendered_image
else:
# # np.ndarray라면 PIL.Image로 변환
# if isinstance(text_rendered_image, np.ndarray):
# image_data_to_save = Image.fromarray(cv2.cvtColor(text_rendered_image, cv2.COLOR_BGR2RGB))
# else:
# image_data_to_save = text_rendered_image
if isinstance(text_rendered_image, np.ndarray):
image_data_to_save = text_rendered_image # 그대로 넘김
else:
image_data_to_save = text_rendered_image
final_image_path = self.postImageManager.save_image_to_path(image_data_to_save, img_path)
# save_image_to_path가 None을 반환한 경우 처리
if final_image_path is None:
self.logger.log(f"이미지 {index+1} 저장 실패. 원본 이미지를 반환합니다.", level=logging.WARNING)
return local_image_path
return final_image_path
except Exception as e:
self.logger.log(f"이미지 {index+1} 번역 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
return local_image_path
def download_image(self, image_url, index, file_prefix="", max_retries=3):
"""Requests를 사용해 이미지를 다운로드합니다"""
# 로컬 파일 경로면 바로 반환
if os.path.isfile(image_url):
self.logger.log(f"로컬 파일 경로 감지, 다운로드 생략: {image_url}", level=logging.DEBUG)
return image_url
# 로컬 파일 경로가 아니면 다운로드 시도
try:
# "https://assets.alicdn.com"으로 시작하는 URL은 건너뛰기
if image_url.startswith("https://assets.alicdn.com") or image_url.startswith("https://gtms01.alicdn.com"):
self.logger.log(f"다운로드 제외 URL: {image_url}", level=logging.DEBUG)
return None
# URL에서 파일명 추출 및 접두사 포함
parsed_url = urlparse(image_url)
base_filename = f"image_{index:03d}_{os.path.basename(parsed_url.path)}"
if not base_filename.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')):
base_filename += '.jpg'
# 접두사가 있으면 파일명에 포함
if file_prefix:
filename = f"{file_prefix}_{base_filename}"
else:
filename = base_filename
local_path = os.path.join(self.TEMP_IMAGE_DIR, filename)
# HTTP 헤더 설정
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1", # Do Not Track 요청 헤더
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Cache-Control": "max-age=0"
}
retries = 0
while retries < max_retries:
try:
# 메모리 추적: 다운로드 시작 전
before_mem = psutil.virtual_memory()
before_mb = before_mem.used / 1024 / 1024
self.logger.log(f"이미지 다운로드 중: {filename}", level=logging.DEBUG)
response = requests.get(image_url, headers=headers, stream=True, timeout=30)
if response.status_code == 200:
image_data = response.content
# 이미지 데이터 유효성 검사
if self.is_valid_image_data(image_data):
with open(local_path, 'wb') as f:
f.write(image_data)
# 메모리 추적: 다운로드 완료 후
after_mem = psutil.virtual_memory()
after_mb = after_mem.used / 1024 / 1024
change_mb = after_mb - before_mb
change_percent = (change_mb / before_mb) * 100 if before_mb > 0 else 0
self.logger.log(
f"메모리 변화 [다운로드 완료]: {before_mb:.1f}MB -> {after_mb:.1f}MB "
f"({change_mb:+.1f}MB, {change_percent:+.1f}%) - {filename}",
level=logging.DEBUG if abs(change_mb) < 10 else logging.INFO
)
self.logger.log(f"이미지 다운로드 완료: {filename}", level=logging.DEBUG)
return local_path
else:
self.logger.log(f"유효하지 않은 이미지 데이터: {image_url}", level=logging.WARNING)
return None
else:
self.logger.log(f"이미지 다운로드 실패 (HTTP {response.status_code}): {image_url}. 재시도 {retries + 1}/{max_retries}", level=logging.ERROR)
retries += 1
if retries < max_retries:
time.sleep(random.randint(2, 5)) # 2~5초 대기 후 재시도
except requests.exceptions.RequestException as e:
self.logger.log(f"이미지 다운로드 중 네트워크 오류: {e}. 재시도 {retries + 1}/{max_retries}", level=logging.ERROR)
retries += 1
if retries < max_retries:
time.sleep(random.randint(2, 5)) # 예외 발생 시 대기 후 재시도
except Exception as e:
self.logger.log(f"이미지 다운로드 중 예상치 못한 오류: {e}. 재시도 {retries + 1}/{max_retries}", level=logging.ERROR)
retries += 1
if retries < max_retries:
time.sleep(random.randint(2, 5))
self.logger.log(f"이미지 다운로드 최대 재시도 횟수를 초과했습니다: {image_url}", level=logging.ERROR)
return None
except Exception as e:
self.logger.log(f"이미지 다운로드 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None
def is_valid_image_data(self, image_data):
"""이미지 데이터가 유효한지 확인합니다"""
if not image_data or len(image_data) < 100:
return False
# 일반적인 이미지 파일 시그니처 확인
image_signatures = [
b'\xFF\xD8\xFF', # JPEG
b'\x89PNG\r\n\x1a\n', # PNG
b'GIF87a', # GIF87a
b'GIF89a', # GIF89a
b'RIFF', # WebP (RIFF 컨테이너)
]
return any(image_data.startswith(sig) for sig in image_signatures)
def process_translated_texts(self, translated_texts, local_image_path, index):
"""
번역된 단어 리스트(translated_texts)에서 unwanted_texts의 원본값이
앞이나 뒤에 포함되면 치환값으로 바꿉니다.
치환값이 '이미지삭제'라면 None 반환(이미지 제외)
"""
new_texts = []
for i, text in enumerate(translated_texts):
self.logger.log(f"[치환 처리 {i+1}] 원본 텍스트: '{text}'", level=logging.DEBUG)
# 텍스트를 빈칸으로 분리
words = text.split()
self.logger.log(f"[치환 처리 {i+1}] 분리된 단어: {words}", level=logging.DEBUG)
processed_words = []
text_replaced = False
for word_idx, word in enumerate(words):
word_replaced = False
# unwanted_texts와 매칭 확인
for origin, replace in self.unwanted_texts.items():
if word.startswith(origin) or word.endswith(origin) or word == origin:
self.logger.log(f"[치환 처리 {i+1}] 단어 '{word}' -> '{replace}' (원본: '{origin}')", level=logging.DEBUG)
if replace == "이미지삭제":
self.logger.log(f"이미지 {index+1} 제외됨: {local_image_path}", level=logging.DEBUG)
return None
# 단어 치환
if word == origin:
new_word = replace
elif word.startswith(origin):
new_word = replace + word[len(origin):]
elif word.endswith(origin):
new_word = word[:-len(origin)] + replace
processed_words.append(new_word)
word_replaced = True
text_replaced = True
self.logger.log(f"[치환 처리 {i+1}] 단어 치환 완료: '{word}' -> '{new_word}'", level=logging.DEBUG)
break
if not word_replaced:
processed_words.append(word)
# 처리된 단어들을 다시 합치기
final_text = ' '.join(processed_words)
new_texts.append(final_text)
if text_replaced:
self.logger.log(f"[치환 처리 {i+1}] 최종 결과: '{text}' -> '{final_text}'", level=logging.DEBUG)
else:
self.logger.log(f"[치환 처리 {i+1}] 변경 없음: '{text}'", level=logging.DEBUG)
self.logger.log(f"전체 치환 결과: {len(new_texts)}개 텍스트 처리 완료", level=logging.DEBUG)
for i, (original, processed) in enumerate(zip(translated_texts, new_texts)):
if original != processed:
self.logger.log(f"[최종 치환 {i+1}] '{original}' -> '{processed}'", level=logging.DEBUG)
return new_texts
async def batch_papago_translate_texts(self, ocr_results, delimiter='\n'):
"""
ocr_results에서 추출한 텍스트 리스트를 줄바꿈으로 합쳐 한 번에 파파고로 번역 후, 다시 분리하여 반환합니다.
각 텍스트 내에 여러 구분자(/, |, ·, , 등)가 있을 경우에도 분리하여 번역 후 다시 합칩니다.
"""
import re
texts = [result['text'] for result in ocr_results if result['text'].strip()]
if not texts:
return []
# 각 텍스트를 내부적으로 split (/, |, ·, , 등) 후 다시 합침
split_texts = []
split_indices = [] # 각 텍스트가 몇 개의 파트로 쪼개졌는지 기록
for text in texts:
parts = re.split(r'\s*[/|·,;、]+\s*', text)
parts = [p.strip() for p in parts if p.strip()]
split_texts.extend(parts)
split_indices.append(len(parts))
# Papago에 한 번에 번역 요청
joined = delimiter.join(split_texts)
# 메모리 추적: 번역 시작 전
trans_before_mem = psutil.virtual_memory()
trans_before_mb = trans_before_mem.used / 1024 / 1024
try:
# 이미 한 번에 여러 줄을 받아서 리스트로 반환!
translated_lines = await self.papago_translator.translate(joined, source_lang="zh", target_lang="ko")
# Papago가 리스트로 반환한다고 가정!
results = translated_lines
# 개수가 다르면 fallback(선택사항, 아래는 로깅만)
if len(results) != len(split_texts):
self.logger.log(
f"파파고 번역 줄 개수 불일치: {len(results)} != {len(split_texts)}",
level=logging.WARNING
)
# 여기서 fallback 처리 또는 그냥 results를 그대로 사용
# 다시 원래 텍스트 단위로 합치기
final_results = []
idx = 0
for count in split_indices:
parts = results[idx:idx+count]
final_results.append(' / '.join(parts))
idx += count
# 메모리 추적: 번역 완료 후
trans_after_mem = psutil.virtual_memory()
trans_after_mb = trans_after_mem.used / 1024 / 1024
trans_change_mb = trans_after_mb - trans_before_mb
trans_change_percent = (trans_change_mb / trans_before_mb) * 100 if trans_before_mb > 0 else 0
self.logger.log(
f"메모리 변화 [Papago 번역]: {trans_before_mb:.1f}MB -> {trans_after_mb:.1f}MB "
f"({trans_change_mb:+.1f}MB, {trans_change_percent:+.1f}%) - {len(split_texts)}개 텍스트",
level=logging.DEBUG if abs(trans_change_mb) < 10 else logging.INFO
)
return final_results
except Exception as e:
self.logger.log(f"파파고 번역 실패: {e}", level=logging.ERROR, exc_info=True)
return texts
def batch_google_translate_texts(self, ocr_results, delimiter='\n'):
"""
ocr_results에서 추출한 텍스트 리스트를 줄바꿈으로 합쳐 한 번에 구글 번역기로 번역 후, 다시 분리하여 반환합니다.
각 텍스트 내에 여러 구분자(/, |, ·, , 등)가 있을 경우에도 분리하여 번역 후 다시 합칩니다.
"""
import re
texts = [result['text'] for result in ocr_results if result['text'].strip()]
if not texts:
return []
# 각 텍스트를 내부적으로 split (/, |, ·, , 등) 후 다시 합침
split_texts = []
split_indices = [] # 각 텍스트가 몇 개의 파트로 쪼개졌는지 기록
for text in texts:
parts = re.split(r'\s*[/|·,;、]+\s*', text)
parts = [p.strip() for p in parts if p.strip()]
split_texts.extend(parts)
split_indices.append(len(parts))
# 합쳐서 한 번에 번역
joined = delimiter.join(split_texts)
try:
translated_obj = self.gtranslate.translate(joined, "Korean")
translated_text = getattr(translated_obj, "text", getattr(translated_obj, "result", joined))
results = translated_text.split(delimiter)
# 만약 개수가 다르면 fallback
if len(results) != len(split_texts):
results = [getattr(self.gtranslate.translate(t, "Korean"), "text", t) for t in split_texts]
# 다시 원래 텍스트 단위로 합치기
final_results = []
idx = 0
for count in split_indices:
parts = results[idx:idx+count]
final_results.append(' / '.join(parts))
idx += count
return final_results
except Exception as e:
self.logger.log(f"구글 번역 실패: {e}", level=logging.ERROR, exc_info=True)
return texts
def batch_llm_translate_texts(self, ocr_results, delimiter='\n'):
"""
LLM API를 이용한 번역 메서드.
batch_google_translate_texts 대체용.
"""
# 1. LLM 클라이언트 확인
if not self.ai_translator:
# self.logger.log("LLM Translator(ai_translator)가 초기화되지 않음. 구글 번역으로 폴백.", level=logging.WARNING)
return self.batch_google_translate_texts(ocr_results, delimiter)
# 2. 필요한 파라미터 준비 (toggle_states 등에서 가져오기)
# product_name과 category가 필수이나, 현재 문맥에서 명확하지 않으면 기본값 사용
product_name = self.toggle_states.get("product_name", "Unknown Product")
category = self.toggle_states.get("category", "General")
# job_type, prompt_name (기본값 설정)
job_type = "ocr_translator_step1"
prompt_name = "ocr_translator_step1"
# steps: 번역 단계 (1=직역만, 2=직역+마케팅톤 변환)
# toggle_states에서 가져오거나 기본값 1 사용
steps = int(self.toggle_states.get("llm_translation_steps", 1))
try:
# 3. LLM 번역 요청
return self.ai_translator.run_llm_translation(
product_name=product_name,
category=category,
ocr_results=ocr_results,
job_type=job_type,
prompt_name=prompt_name,
retry_count=3,
steps=steps
)
except Exception as e:
self.logger.log(f"LLM 번역 실패: {e}. 구글 번역으로 폴백.", level=logging.ERROR, exc_info=True)
# 실패 시 구글 번역으로 폴백
return self.batch_google_translate_texts(ocr_results, delimiter)
def translate_ocr_results(self, ocr_results):
"""
사용자 설정(toggle_states)에 따라 번역 방식을 선택하여 실행
"""
method = self.toggle_states.get("translation_method", "google")
if method == "llm":
return self.batch_llm_translate_texts(ocr_results)
else:
return self.batch_google_translate_texts(ocr_results)
def opencv_inpaint(self, image_path, mask, method='telea', radius=3):
"""MIGAN 통일 이후 비활성화(호환용). 항상 None 반환"""
return None
def lama_inpaint(self, image_path, mask):
"""
PaddleHub lama 모델로 인페인팅을 수행합니다.
Args:
image_path (str): 원본 이미지 경로
mask (np.ndarray): 2D 마스크 이미지 (0/255, shape=(H, W))
Returns:
inpainted_image (np.ndarray): 인페인팅된 이미지 (BGR)
"""
try:
import paddlehub as hub
except ImportError:
self.logger.log("paddlehub 미설치: pip install paddlehub", level=logging.ERROR)
return None
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"이미지 로드 실패: {image_path}", level=logging.ERROR)
return None
# 마스크 2D 체크
if mask is None:
self.logger.log(f"마스크가 None입니다", level=logging.ERROR)
return None
if not isinstance(mask, np.ndarray) or mask.ndim != 2:
self.logger.log(f"마스크가 2D numpy 배열이 아닙니다: type={type(mask)}, shape={getattr(mask, 'shape', 'N/A')}", level=logging.ERROR)
return None
if mask.shape != image.shape[:2]:
self.logger.log(f"마스크 크기가 이미지와 다릅니다: mask={mask.shape}, image={image.shape[:2]}", level=logging.ERROR)
return None
try:
# PaddleHub lama 인페인팅 모델 로드 (최초 1회 다운로드)
if not hasattr(self, "_lama_module"):
self._lama_module = hub.Module(name="lama")
if self.logger:
self.logger.log("lama 인페인팅 모델 로드됨")
# 라마 모델 인페인팅 (마스크 0/255, uint8)
result = self._lama_module.predict(images=[image], masks=[mask])
if not result or "inpainted" not in result[0]:
self.logger.log("lama 인페인팅 결과 없음", level=logging.ERROR)
return None
inpainted = result[0]["inpainted"] # np.ndarray(BGR)
return inpainted
except Exception as e:
self.logger.log(f"lama 인페인팅 중 예외 발생: {e}", level=logging.ERROR, exc_info=True)
return None
def save_debug_images(self, local_image_path, ocr_results, masks, index, file_prefix=""):
"""디버깅용 OCR 박스와 마스크 이미지를 저장합니다"""
try:
# OCR 박스 시각화 이미지 저장
ocr_debug_path = self.save_ocr_debug_image(local_image_path, ocr_results, index, file_prefix)
# 마스크 시각화 이미지 저장
mask_debug_path = self.save_mask_debug_image(local_image_path, masks, index, file_prefix)
self.logger.log(f"디버깅 이미지 저장 완료: OCR={ocr_debug_path}, Mask={mask_debug_path}", level=logging.DEBUG)
return ocr_debug_path, mask_debug_path
except Exception as e:
self.logger.log(f"디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None, None
def save_ocr_debug_image(self, image_path, ocr_results, index, file_prefix=""):
"""OCR 감지된 박스들을 이미지에 표시하여 저장합니다"""
try:
# 원본 이미지 로드
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"OCR 디버깅용 이미지 로드 실패: {image_path}", level=logging.ERROR)
return None
# 이미지 복사본 생성
debug_image = image.copy()
# OCR 결과별로 박스 그리기
for i, result in enumerate(ocr_results):
polygon = result.get('polygon', [])
bbox = result.get('bbox', None)
text = result.get('text', '')
confidence = result.get('confidence', 0.0)
# 신뢰도에 따른 색상 결정
if confidence >= 0.8:
color = (0, 255, 0) # 초록 (높은 신뢰도)
elif confidence >= 0.5:
color = (0, 255, 255) # 노랑 (중간 신뢰도)
elif confidence >= 0.2:
color = (0, 165, 255) # 주황 (낮은 신뢰도)
else:
color = (0, 0, 255) # 빨강 (매우 낮은 신뢰도)
if polygon and len(polygon) >= 3:
# 폴리곤을 numpy 배열로 변환 (좌표를 int로 변환)
pts = np.array([[int(x), int(y)] for x, y in polygon], np.int32)
pts = pts.reshape((-1, 1, 2))
cv2.polylines(debug_image, [pts], True, color, 2)
x, y = pts[0][0]
elif bbox and len(bbox) == 4:
try:
x, y, w, h = [int(float(v)) for v in bbox]
except Exception as e:
self.logger.log(f"bbox 값 변환 오류: {bbox} ({e})", level=logging.ERROR)
continue
cv2.rectangle(debug_image, (x, y), (x + w, y + h), color, 2)
else:
continue # polygon, bbox 둘 다 없으면 skip
label = f"{i+1}: {text[:10]}... ({confidence:.1%})"
# 텍스트 배경 사각형
(text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(debug_image, (x, y-text_height-5), (x+text_width, y), color, -1)
# 텍스트 표시
cv2.putText(debug_image, label, (x, y-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# 범례 추가
legend_y = 30
cv2.putText(debug_image, "OCR Detection Results:", (10, legend_y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.putText(debug_image, "Green: 80%+ Yellow: 50-80% Orange: 20-50% Red: <20%", (10, legend_y+25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# 파일 저장
if file_prefix:
debug_filename = f"debug_ocr_{file_prefix}_img_{index+1}.png"
else:
debug_filename = f"debug_ocr_img_{index+1}.png"
debug_path = os.path.join(self.debugging_save_Dir, debug_filename)
cv2.imwrite(debug_path, debug_image)
self.logger.log(f"OCR 디버깅 이미지 저장: {debug_filename}", level=logging.DEBUG)
return debug_path
except Exception as e:
self.logger.log(f"OCR 디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None
def save_mask_debug_image(self, image_path, masks, index, file_prefix=""):
"""생성된 마스크를 이미지에 오버레이하여 저장합니다"""
try:
# 원본 이미지 로드
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"마스크 디버깅용 이미지 로드 실패: {image_path}", level=logging.ERROR)
return None
if masks is None or not isinstance(masks, np.ndarray):
self.logger.log(f"유효하지 않은 마스크: {type(masks)}", level=logging.ERROR)
return None
# 이미지 복사본 생성
debug_image = image.copy()
# 마스크 영역을 빨간색으로 오버레이
mask_colored = np.zeros_like(image)
mask_colored[:, :, 2] = masks # 빨간색 채널에 마스크 적용
# 마스크 영역 반투명 오버레이 (알파 블렌딩)
alpha = 0.3
overlay_mask = masks > 0
debug_image[overlay_mask] = cv2.addWeighted(
debug_image[overlay_mask], 1-alpha,
mask_colored[overlay_mask], alpha, 0
)
# 마스크 통계 정보 표시
total_pixels = masks.shape[0] * masks.shape[1]
mask_pixels = np.sum(masks > 0)
mask_percentage = (mask_pixels / total_pixels) * 100 if total_pixels > 0 else 0
# 정보 텍스트 표시
info_text = [
f"Mask Statistics:",
f"Total pixels: {total_pixels:,}",
f"Mask pixels: {mask_pixels:,}",
f"Coverage: {mask_percentage:.1f}%"
]
y_offset = 30
for i, text in enumerate(info_text):
y_pos = y_offset + (i * 25)
# 텍스트 배경
(text_width, text_height), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
cv2.rectangle(debug_image, (10, y_pos-text_height-3), (10+text_width+10, y_pos+5), (0, 0, 0), -1)
# 텍스트
cv2.putText(debug_image, text, (15, y_pos), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# 파일 저장
if file_prefix:
debug_filename = f"debug_mask_{file_prefix}_img_{index+1}.png"
else:
debug_filename = f"debug_mask_img_{index+1}.png"
debug_path = os.path.join(self.debugging_save_Dir, debug_filename)
cv2.imwrite(debug_path, debug_image)
self.logger.log(f"마스크 디버깅 이미지 저장: {debug_filename} (마스크 커버리지: {mask_percentage:.1f}%)", level=logging.DEBUG)
return debug_path
except Exception as e:
self.logger.log(f"마스크 디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None
def save_inpaint_debug_image(self, inpainted_image, index, file_prefix=""):
"""인페인팅 결과 이미지를 디버그용으로 저장합니다"""
try:
# import cv2
# import numpy as np
# import os
if inpainted_image is None:
self.logger.log("인페인트 결과가 None이어서 디버그 저장을 건너뜁니다", level=logging.WARNING)
return None
if file_prefix:
debug_filename = f"debug_inpaint_{file_prefix}_img_{index+1}.png"
else:
debug_filename = f"debug_inpaint_img_{index+1}.png"
debug_path = os.path.join(self.debugging_save_Dir, debug_filename)
# PIL -> ndarray 변환
if not isinstance(inpainted_image, np.ndarray):
try:
from PIL import Image as _Image
if isinstance(inpainted_image, _Image.Image):
inpainted_image = cv2.cvtColor(np.array(inpainted_image.convert("RGB")), cv2.COLOR_RGB2BGR)
else:
self.logger.log(f"지원하지 않는 인페인트 이미지 타입: {type(inpainted_image)}", level=logging.WARNING)
return None
except Exception:
return None
cv2.imwrite(debug_path, inpainted_image)
self.logger.log(f"인페인트 디버그 이미지 저장: {debug_filename}", level=logging.DEBUG)
return debug_path
except Exception as e:
self.logger.log(f"인페인트 디버그 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None
def _analyze_mask_stats(self, masks: np.ndarray) -> dict:
"""마스크의 커버리지/컴포넌트/최소 중심거리 비율을 계산합니다"""
import numpy as _np
import cv2 as _cv2
h, w = masks.shape[:2]
total = max(1, h * w)
mask_bin = masks
if masks.ndim == 3:
mask_bin = _cv2.cvtColor(masks, _cv2.COLOR_BGR2GRAY)
if mask_bin.dtype != _np.uint8:
mask_bin = mask_bin.astype(_np.uint8)
_, mask_bin = _cv2.threshold(mask_bin, 0, 255, _cv2.THRESH_BINARY)
coverage_ratio = float((_np.sum(mask_bin > 0)) / total)
# 컴포넌트 분석 (외곽선 기반)
try:
contours, _ = _cv2.findContours(mask_bin, _cv2.RETR_EXTERNAL, _cv2.CHAIN_APPROX_SIMPLE)
except ValueError:
# OpenCV 버전 차이 호환
_, contours, _ = _cv2.findContours(mask_bin, _cv2.RETR_EXTERNAL, _cv2.CHAIN_APPROX_SIMPLE)
centers = []
for cnt in contours:
if cnt is None or len(cnt) < 3:
continue
m = _cv2.moments(cnt)
if m['m00'] == 0:
x, y, w_box, h_box = _cv2.boundingRect(cnt)
cx, cy = x + w_box / 2.0, y + h_box / 2.0
else:
cx, cy = m['m10'] / m['m00'], m['m01'] / m['m00']
centers.append((float(cx), float(cy)))
# 최소 중심 거리 (정규화)
min_dist = 0.0
if len(centers) >= 2:
min_dist = min(
((_np.hypot(cx1 - cx2, cy1 - cy2)) for (cx1, cy1) in centers for (cx2, cy2) in centers if (cx1, cy1) != (cx2, cy2))
)
diag = float(_np.hypot(w, h))
min_dist_ratio = float(min_dist / diag) if diag > 0 else 0.0
return {
'coverage_ratio': coverage_ratio,
'component_count': len(centers),
'min_centroid_distance_ratio': min_dist_ratio,
}
def is_frozen(self):
"""
실행 환경에 따라 배포환경인지 개발환경인지 확인하는 메서드.
cx_Freeze로 패키징된 경우 실행 파일의 경로, 일반 Python 환경일 경우 __file__을 기준으로 설정.
"""
if getattr(sys, 'frozen', False): # 패키징된 경우
self.logger.log("배포환경", level=logging.DEBUG)
return True
else: # 일반 Python 실행 환경
self.logger.log("개발환경", level=logging.DEBUG)
return False
def safe_detect(self, img_path):
try:
if not hasattr(self, 'ocr_module') or self.ocr_module is None:
self.logger.log("⚠️ ONNX OCR 모듈이 초기화되지 않았습니다. 재초기화를 시도합니다.", level=logging.WARNING)
# OCR 모듈 재초기화 시도
try:
if self.reset_ocr_module() and hasattr(self, 'ocr_module') and self.ocr_module is not None:
self.logger.log("✅ ONNX OCR 모듈 재초기화 성공", level=logging.INFO)
else:
self.logger.log("❌ ONNX OCR 모듈 재초기화 실패, 빈 결과 반환", level=logging.ERROR)
return []
except Exception as reset_error:
self.logger.log(f"❌ ONNX OCR 모듈 재초기화 중 예외 발생: {reset_error}", level=logging.ERROR)
return []
result = self.ocr_module.detect_text(img_path)
# 빈 OCR 결과는 정상 케이스로 처리
if not result or len(result) == 0:
self.logger.log(f"OCR 결과 없음 - 정상 케이스 (NO_TEXT): {img_path}", level=logging.INFO)
return [] # 빈 리스트 반환 (정상 처리)
return result
except Exception as e:
msg = str(e).lower()
# 메모리 / primitive 관련 오류 → OCR 모듈 재초기화 후 1회 재시도
if any(err in msg for err in ["create a primitive", "memory object", "unable to allocate", "out of memory", "cv::outofmemoryerror"]):
ok = self.reset_ocr_module()
if ok and self.ocr_module is not None:
# 재시도 시 실패하면 MemoryError를 재전파하여 상위에서 워커 재시작 트리거
return self.ocr_module.detect_text(img_path, raise_on_memory_error=True)
# 그 외 예외는 그대로 상위로 전달
raise
async def remove_background(self, original_image_url, file_prefix=""):
"""배경제거 전용 메서드 (썸네일 등 외부 호출용).
1. 이미지를 다운로드(또는 로컬 경로 사용)
2. Request_AI_Server.request_rembg 로 배경 제거 (흰 배경 중앙 배치 포함)
3. TEMP_IMAGE_DIR 하위에 저장 후 경로 반환
"""
try:
index = 0 # 기본값 (외부에서 필요 시 파일명 구분용)
# 0. 이미지 URL 유효성 체크
if not original_image_url or not isinstance(original_image_url, str):
self.logger.log("배경제거 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING)
return {"status": "failed", "path": original_image_url, "error": "이미지 URL 오류"}
# 1. 다운로드 또는 로컬 경로 확정
if original_image_url.startswith("http"):
# 다운로드 재사용을 위해 기존 메서드 호출
local_path = self.download_image(image_url=original_image_url, index=0, file_prefix=file_prefix)
if not local_path:
return {"status": "failed", "path": original_image_url, "error": "다운로드 실패"}
else:
local_path = original_image_url # 이미 로컬 경로
# 2. 배경 제거 (np.ndarray 반환)
removed = self.request_ai_server.request_rembg(
local_path,
use_local_rembg=self.use_local_rembg,
local_model_name=self.local_model_name
)
if removed is None:
self.logger.log("RemoveBG 실패", level=logging.ERROR)
return {"status": "failed", "path": local_path, "error": "RemoveBG 실패"}
# 3. 저장 경로 결정
os.makedirs(self.TEMP_IMAGE_DIR, exist_ok=True)
base_name = os.path.basename(local_path)
name_no_ext, _ = os.path.splitext(base_name)
save_name = f"nobg_{file_prefix}_{name_no_ext}.png" if file_prefix else f"nobg_{name_no_ext}.png"
save_path = os.path.join(self.TEMP_IMAGE_DIR, save_name)
# 4. 저장 (OpenCV → BGR)
cv2.imwrite(save_path, removed)
del removed
removed = None
self.logger.log(f"배경제거 이미지 저장: {save_path}", level=logging.DEBUG)
# 5. OCR 검사 후 인페인팅 여부 결정
# ocr_results = self.ocr_module.detect_text(save_path)
ocr_results = self.safe_detect(save_path)
filter_ocr_results = self.filter_ocr_results(ocr_results)
# 중국어 텍스트가 없으면 바로 반환
if self.ocr_module is None or not self.ocr_module.filter_chinese_text(filter_ocr_results):
return {"status": "success", "path": save_path}
# ---- 중국어 텍스트 존재: 인페인팅 준비 ----
ocr_count = len(filter_ocr_results)
if ocr_count < 5:
expansion_size, blur_size = 12, 15
elif ocr_count < 10:
expansion_size, blur_size = 9, 12
elif ocr_count < 15:
expansion_size, blur_size = 7, 9
elif ocr_count < 20:
expansion_size, blur_size = 5, 6
else:
expansion_size, blur_size = 10, 15
# 마스크 생성
masks = self.mask_module.create_masks(
image_path=save_path,
ocr_results=filter_ocr_results,
mask_option="basic",
expansion_size=expansion_size,
blur_size=blur_size
)
self.logger.log("배경제거 후 마스크 생성 완료", level=logging.DEBUG)
# 인페인팅 수행: MIGAN으로 통일
self.inpaint_method = 'migan'
inpainted_image = self._try_migan_inpaint(save_path, masks)
# 인페인팅 실패 시 원본 반환
if inpainted_image is None:
self.logger.log("인페인팅 실패, 배경제거 이미지를 그대로 반환", level=logging.WARNING)
return {"status": "success", "path": save_path}
# 인페인팅 결과 저장
inpaint_name = f"inpaint_{file_prefix}_{name_no_ext}.png" if file_prefix else f"inpaint_{name_no_ext}.png"
inpaint_path = os.path.join(self.TEMP_IMAGE_DIR, inpaint_name)
cv2.imwrite(inpaint_path, inpainted_image)
self.logger.log(f"인페인팅 이미지 저장: {inpaint_path}", level=logging.DEBUG)
return {"status": "success", "path": inpaint_path}
except Exception as e:
self.logger.log(f"remove_background 오류: {e}", level=logging.ERROR, exc_info=True)
return {"status": "failed", "path": original_image_url, "error": str(e)}
# async def remove_background(self, page, original_image_url, index, file_prefix="", **kwargs):
# """
# 배경제거: 이미지 반환 → 후처리 및 저장 → 경로 반환
# """
# try:
# # 0. 이미지 URL 유효성 체크 (http/https & 이미지 확장자)
# if not original_image_url or not isinstance(original_image_url, str):
# self.logger.log(f"이미지 {index+1} 처리 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING)
# return {'status': 'failed', 'path': original_image_url, 'error': '이미지 URL 없음 또는 타입 오류'}
# if not self.is_valid_image_path(original_image_url):
# self.logger.log(f"이미지 {index+1} 처리 중단: 유효하지 않은 이미지 주소 - {original_image_url}", level=logging.WARNING)
# return {'status': 'failed', 'path': original_image_url, 'error': '유효하지 않은 이미지 주소'}
# # 1. 이미지 다운로드
# local_image_path = await self.download_image(page, original_image_url, index, file_prefix)
# if not local_image_path:
# self.logger.log(f"이미지 {index+1} 다운로드 실패, 원본 URL 반환", level=logging.WARNING)
# return {'status': 'failed', 'path': original_image_url, 'error': '다운로드 실패'}
# self.logger.log(f"이미지 {index+1} 로컬 저장위치: {local_image_path}", level=logging.DEBUG)
# # 1. 배경제거 수행(이미지 반환)
# removed_img = self.background_removal_module.remove_background(
# local_image_path, **kwargs
# )
# if removed_img is None:
# self.logger.log(f"배경제거 실패: {local_image_path}", level=40)
# return {'status': 'failed', 'path': local_image_path, 'error': '배경제거 실패'}
# if self.toggle_states.get("remove_background_white", True):
# img_result_white = self.background_removal_module.to_white_background(removed_img)
# else:
# img_result_white = removed_img
# # 2. 저장 경로 생성
# if file_prefix:
# save_path = os.path.join(self.TEMP_IMAGE_DIR, f"nobg_{file_prefix}_img_{index+1}.png")
# else:
# save_path = os.path.join(self.TEMP_IMAGE_DIR, f"nobg_img_{index+1}.png")
# # 3. 워터마크 등 후처리 및 저장
# # 워터마크 등 추가하려면 아래처럼
# # removed_img = self.postImageManager.add_watermark(image_data=removed_img, watermark_text=...)
# final_path = self.postImageManager.save_image_to_path(img_result_white, save_path)
# if final_path:
# self.logger.log(f"배경제거 이미지 저장됨: {final_path}")
# return {'status': 'success', 'path': final_path}
# else:
# self.logger.log(f"배경제거 후 저장 실패: {save_path}", level=40)
# return {'status': 'failed', 'path': save_path, 'error': '저장 실패'}
# except Exception as e:
# self.logger.log(f"배경제거 중 오류: {e}", level=40, exc_info=True)
# return {'status': 'failed', 'path': local_image_path, 'error': str(e)}
# def remove_background_with_ppmatting(image_path, output_path='output_foreground.png', alpha_path='output_alpha.png'):
# """
# PaddleHub의 ppmatting을 이용한 배경제거(투명배경 PNG) 함수입니다.
# Args:
# image_path (str): 입력 이미지 경로
# output_path (str): 결과 투명 배경 PNG 저장 경로
# alpha_path (str): 알파(마스크) 이미지 저장 경로
# Returns:
# foreground (np.ndarray): 알파채널이 포함된 전경 PNG 이미지 (BGRA)
# alpha (np.ndarray): 추출된 알파 마스크
# """
# import paddlehub as hub
# import cv2
# import numpy as np
# # 1. 이미지 로드 (OpenCV는 BGR)
# img = cv2.imread(image_path)
# if img is None:
# print(f"이미지를 불러올 수 없습니다: {image_path}")
# return None, None
# # 2. BGR → RGB (ppmatting은 RGB 입력)
# img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# # 3. ppmatting 모델 로드 (최초 1회 다운로드됨)
# matting = hub.Module(name='ppmatting-hrnet-1x')
# # 4. 예측 수행
# results = matting.predict(images=[img_rgb]) # 결과는 리스트
# # 5. 알파(투명도) 마스크 추출 (float, 0~1)
# alpha = results[0]['alpha']
# # 6. 알파 마스크를 0~255 uint8로 변환 (이미지 저장용)
# alpha_img = (alpha * 255).astype(np.uint8)
# # 7. 원본 이미지를 BGRA(투명도 포함)로 변환
# foreground = cv2.cvtColor(img, cv2.COLOR_BGR2BGRA)
# foreground[..., 3] = alpha_img # 알파 채널 추가
# # 8. 결과 이미지 저장
# cv2.imwrite(output_path, foreground)
# cv2.imwrite(alpha_path, alpha_img)
# print(f"배경 제거 PNG 저장: {output_path}")
# print(f"알파 마스크 저장: {alpha_path}")
# return foreground, alpha_img
# ------------------------------------------------------------------
# 고해상도 이미지 다운스케일 유틸리티 (메모리 절감용)
# ------------------------------------------------------------------
def downscale_image_if_large(self, image_path, max_dim=1200):
"""이미지가 너무 클 경우 다운스케일링"""
try:
image = cv2.imread(image_path)
if image is None:
return image_path
h, w = image.shape[:2]
if max(h, w) <= max_dim:
# 원본 크기가 허용 범위 내면 그대로 반환
del image # 명시적 해제
return image_path
# 비율 유지하며 다운스케일링
if h > w:
new_h, new_w = max_dim, int(w * max_dim / h)
else:
new_h, new_w = int(h * max_dim / w), max_dim
# .copy() 사용으로 뷰 문제 해결
resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA).copy()
# 원본 이미지 명시적 해제
del image
# 임시 파일로 저장
temp_path = image_path.replace('.jpg', '_temp.jpg').replace('.png', '_temp.png')
cv2.imwrite(temp_path, resized)
# 리사이즈된 이미지 명시적 해제
del resized
return temp_path
except Exception as e:
self.logger.log(f"이미지 다운스케일링 실패: {e}", level=logging.ERROR, exc_info=True)
# 에러 발생 시 원본 경로 반환
return image_path
finally:
# 안전장치: 남은 이미지 객체 정리
try:
if 'image' in locals():
del image
if 'resized' in locals():
del resized
except:
pass
async def preprocess_detail_image(self, image_path: str, index: int, target_width: int = 860, max_height: int = 3000) -> str:
"""
상세페이지 이미지 전처리: 가로 860px 통일, 세로 3000px 초과시 분할
Args:
image_path: 원본 이미지 경로
index: 이미지 인덱스
target_width: 목표 가로 크기 (기본 860px)
max_height: 최대 세로 크기 (기본 3000px)
Returns:
str: 전처리된 이미지 경로 (분할된 경우 첫 번째 이미지)
"""
try:
# 원본 이미지 로드
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"상세페이지 이미지 로드 실패: {image_path}", level=logging.ERROR)
return image_path
orig_h, orig_w = image.shape[:2]
self.logger.log(f"상세페이지 이미지 {index+1} 원본 크기: {orig_w}x{orig_h}", level=logging.DEBUG)
# 1단계: 가로를 860px로 리사이즈 (비율 유지)
if orig_w != target_width:
scale_factor = target_width / orig_w
new_height = int(orig_h * scale_factor)
resized_image = cv2.resize(image, (target_width, new_height), interpolation=cv2.INTER_LANCZOS4)
self.logger.log(f"상세페이지 이미지 {index+1} 가로 크기 조정: {orig_w}x{orig_h}{target_width}x{new_height}", level=logging.DEBUG)
else:
resized_image = image
new_height = orig_h
# 2단계: 세로가 3000px 초과하는지 확인
if new_height <= max_height:
# 분할 불필요 - 리사이즈된 이미지 저장
output_path = image_path.replace('.jpg', '_resized.jpg').replace('.png', '_resized.png')
cv2.imwrite(output_path, resized_image)
self.logger.log(f"상세페이지 이미지 {index+1} 크기 조정 완료: {target_width}x{new_height}", level=logging.INFO)
return output_path
# 3단계: 분할 필요 - 3000px 단위로 분할
split_count = (new_height + max_height - 1) // max_height # 올림 계산
split_paths = []
for i in range(split_count):
start_y = i * max_height
end_y = min((i + 1) * max_height, new_height)
split_height = end_y - start_y
# 이미지 분할
split_image = resized_image[start_y:end_y, :, :]
# 분할된 이미지 저장
base_name = os.path.splitext(os.path.basename(image_path))[0]
extension = os.path.splitext(image_path)[1]
split_filename = f"{base_name}_split_{i+1}{extension}"
split_path = os.path.join(os.path.dirname(image_path), split_filename)
cv2.imwrite(split_path, split_image)
split_paths.append(split_path)
self.logger.log(f"상세페이지 이미지 {index+1} 분할 {i+1}/{split_count}: {target_width}x{split_height}{split_path}", level=logging.DEBUG)
self.logger.log(f"상세페이지 이미지 {index+1} 분할 완료: {split_count}개 파일 생성", level=logging.INFO)
# OCR 정보 수집을 위해 분할된 모든 이미지 경로 저장
self._store_split_image_paths(index, split_paths)
# 첫 번째 분할 이미지 반환 (process_single_image는 하나의 이미지만 처리)
return split_paths[0] if split_paths else image_path
except Exception as e:
self.logger.log(f"상세페이지 이미지 {index+1} 전처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
return image_path
finally:
# 메모리 정리
try:
if 'image' in locals():
del image
if 'resized_image' in locals():
del resized_image
if 'split_image' in locals():
del split_image
except:
pass
def _store_split_image_paths(self, index: int, split_paths: list):
"""분할된 이미지 경로들을 저장 (나중에 OCR 정보 수집용)"""
if not hasattr(self, '_detail_split_images'):
self._detail_split_images = {}
self._detail_split_images[index] = split_paths
self.logger.log(f"이미지 {index+1} 분할 경로 저장: {len(split_paths)}", level=logging.DEBUG)
async def collect_and_store_ocr_data(self, image_url: str, image_path: str, ocr_results: list, index: int):
"""
상세페이지 OCR 결과에서 상품 정보를 수집하고 저장
Args:
image_url: 원본 이미지 URL
image_path: 로컬 이미지 경로
ocr_results: OCR 감지 결과
index: 이미지 인덱스
"""
try:
if not ocr_results:
return
# OCR 결과에서 텍스트만 추출
extracted_texts = [result.get('text', '').strip() for result in ocr_results if result.get('text', '').strip()]
if not extracted_texts:
return
# 데이터베이스에 저장 (단순화됨)
await self._save_ocr_data_to_db(image_url, image_path, extracted_texts, index)
self.logger.log(f"이미지 {index+1} OCR raw 데이터 수집 완료: {len(extracted_texts)}개 텍스트", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"OCR 정보 수집 중 오류: {e}", level=logging.ERROR, exc_info=True)
# _classify_product_texts 함수는 단순화로 인해 비활성화됨 (raw 데이터만 저장)
async def _save_ocr_data_to_db(self, image_url: str, image_path: str, texts: list, index: int):
"""
OCR raw 데이터를 SQLite 데이터베이스에 저장
Args:
image_url: 원본 이미지 URL
image_path: 로컬 이미지 경로
texts: raw OCR 텍스트 리스트
index: 이미지 인덱스
"""
try:
import sqlite3
import json
import os
from datetime import datetime
# 데이터베이스 파일 경로 설정
db_path = os.path.join(self.base_dir, "user_data", "product_ocr_data.db")
os.makedirs(os.path.dirname(db_path), exist_ok=True)
# 데이터베이스 연결 및 테이블 생성
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 테이블 생성 (단순화된 버전)
cursor.execute('''
CREATE TABLE IF NOT EXISTS ocr_raw_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
image_url TEXT NOT NULL,
image_path TEXT,
image_index INTEGER,
raw_texts TEXT,
text_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 데이터 삽입 (단순화됨)
cursor.execute('''
INSERT INTO ocr_raw_data
(image_url, image_path, image_index, raw_texts, text_count)
VALUES (?, ?, ?, ?, ?)
''', (
image_url,
image_path,
index,
json.dumps(texts, ensure_ascii=False),
len(texts)
))
conn.commit()
conn.close()
self.logger.log(f"이미지 {index+1} OCR 데이터 DB 저장 완료: {db_path}", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"OCR 데이터 DB 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def collect_ocr_data_in_memory(self, image_url: str, image_path: str, ocr_results: list, index: int):
"""
OCR 데이터를 메모리에만 저장 (현재 세션용) - 전처리 포함
Args:
image_url: 원본 이미지 URL
image_path: 로컬 이미지 경로
ocr_results: OCR 감지 결과
index: 이미지 인덱스
"""
try:
if not ocr_results:
return
# OCR 결과에서 텍스트만 추출
extracted_texts = [result.get('text', '').strip() for result in ocr_results if result.get('text', '').strip()]
if not extracted_texts:
return
# 단순 OCR 텍스트 저장 (raw 데이터 그대로 사용)
filtered_texts = extracted_texts
# 메모리에 저장
if not hasattr(self, '_session_ocr_data'):
self._session_ocr_data = []
self._session_ocr_data.append({
'image_url': image_url,
'image_path': image_path,
'image_index': index,
'raw_texts': filtered_texts,
'text_count': len(filtered_texts)
})
self.logger.log(f"이미지 {index+1} OCR raw 데이터 메모리 저장 완료: {len(filtered_texts)}개 텍스트", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"OCR 정보 메모리 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
# 복잡한 전처리 함수들은 제거됨 (단순 raw 데이터 저장 방식으로 변경)
def get_session_ocr_data(self) -> list:
"""현재 세션의 OCR 데이터 반환"""
return getattr(self, '_session_ocr_data', [])
def clear_session_ocr_data(self):
"""현재 세션의 OCR 데이터 초기화"""
if hasattr(self, '_session_ocr_data'):
del self._session_ocr_data
self.logger.log("세션 OCR 데이터 초기화 완료", level=logging.DEBUG)