import os import asyncio import requests import time import logging from urllib.parse import urlparse import sys import re import cv2 import psutil import tracemalloc # OpenCV 의 내부 최적화(메모리 풀) 사용을 비활성화하여 파편화 위험을 낮춤 cv2.setUseOptimized(False) import random import gc import numpy as np from PIL import Image from PIL import features from modules.onnx_ocr_module.src.onnx_ocr_wrapper import ONNXOCRModule as Onnx_OCRModule # from modules.ocr_module import OCRModule as Paddle_OCRModule from modules.mask_module_for_paddle import MaskModule # from modules.mask_module_for_easy import MaskModule_easy from modules.text_rendering_module import TextRenderingModuleOptimized from modules.postImageManager import PostImageManager from translatepy.translators.google import GoogleTranslate # Gemma 번역 클라이언트(옵셔널): 배포 환경에서 누락되어도 동작하도록 안전 임포트 try: from modules.gemma_client import GemmaTranslator # 표준 경로 except Exception: try: # 개발 환경에서 상대 경로 임포트가 남아있는 경우 대비 from gemma_client import GemmaTranslator # noqa: F401 except Exception: GemmaTranslator = None # 사용 시 체크 후 동작 # from modules.background_removal_module import BackgroundRemovalModule # from modules.background_removal_module_pp import PPMattingBackgroundRemovalModule # (변경) from modules.request_inpaint import Request_AI_Server from modules.gpu_utils import GPUManager class ImageProcessor3: """이미지 다운로드, OCR, 번역 처리를 담당하는 클래스""" def __init__(self, logger, page, toggle_states, unwanted_words, authenticated_by_admin, base_dir, papago_translator): self.logger = logger self.page = page self.base_dir = base_dir self.toggle_states = toggle_states self.unwanted_texts = unwanted_words self.authenticated_by_admin = authenticated_by_admin # 메모리 추적 시작 (파이썬 객체 메모리 할당 추적) try: if not tracemalloc.is_tracing(): tracemalloc.start() self.logger.log("tracemalloc 메모리 추적 시작", level=logging.DEBUG) except Exception as e: self.logger.log(f"tracemalloc 시작 실패: {e}", level=logging.WARNING) # 기본 속성들을 먼저 None으로 초기화하여 안전성 확보 self.postImageManager = None self.ocr_module = None self.mask_module = None self.text_rendering_module = None self.request_ai_server = None self.gtranslate = None self.migan = None self.gpu_manager = None try: # GPU 관리자 초기화 self.gpu_manager = GPUManager(logger=logger) self.gpu_manager.initialize_gpu_state(toggle_states) # GPU 상태 상세 로깅 gpu_status = self.gpu_manager.get_cuda_status() self.logger.log(f"🔧 ImageProcessor3 GPU 상태 요약:", level=logging.DEBUG) self.logger.log(f" - CUDA 사용 가능: {gpu_status['can_use_cuda']}", level=logging.DEBUG) self.logger.log(f" - toggle_states['use_cuda']: {toggle_states.get('use_cuda', 'NOT_SET')}", level=logging.DEBUG) self.logger.log(f" - GPU 하드웨어 정보: {gpu_status['gpu_info']}", level=logging.DEBUG) self.logger.log(f"ImageProcessor3 Init toggle_states: {self.toggle_states}", level=logging.DEBUG) self.is_member_valid = self.toggle_states.get('membership_level', 'basic') == 'vip' or self.authenticated_by_admin self.logger.log(f"is_member_valid: {self.is_member_valid}", level=logging.DEBUG) self.papago_translator = papago_translator self.inpaint_method = 'migan' # 폰트 경로 결정 self.font_path = self._resolve_font_path(self.toggle_states.get("font_type", "폰트1")) # PostImageManager 등 하위 모듈과 공유 try: self.toggle_states['image_font_path'] = self.font_path except Exception: pass self.TEMP_IMAGE_DIR = self.toggle_states.get('TEMP_IMAGE_DIR', "") self.debugging_save_Dir = os.path.join(self.base_dir, "debug_images") if not os.path.exists(self.debugging_save_Dir): os.makedirs(self.debugging_save_Dir) self.logger.log(f"debug_images 디렉토리 생성: {self.debugging_save_Dir}", level=logging.DEBUG) else: self.logger.log(f"debug_images 디렉토리 이미 존재: {self.debugging_save_Dir}", level=logging.DEBUG) self.use_local_rembg = self.toggle_states.get("use_local_rembg", False) self.local_model_name = self.toggle_states.get("local_model_name", 'birefnet-general-lite') # self.logger.log(f"self.toggle_states: {self.toggle_states}", level=logging.DEBUG) self.logger.log(f"self.font_path: {self.font_path}", level=logging.DEBUG) self.logger.log(f"toggle_states font_path: {self.toggle_states.get('image_font_path', '')}", level=logging.DEBUG) self.logger.log(f"self.TEMP_IMAGE_DIR: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG) self.logger.log(f"self.debugging_save_Dir: {self.debugging_save_Dir}", level=logging.DEBUG) self.logger.log(f"self.unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG) self.logger.log(f"self.inpaint_method: {self.inpaint_method}", level=logging.DEBUG) # ----------------------------- 메모리 파편화 완화 ----------------------------- # Pillow 가 거대 이미지를 열 때 과도한 메모리를 점유하지 않도록 최대 픽셀 수 제한 max_px = self.toggle_states.get("max_image_pixels", 20_000_000) # 약 20MP, 필요 시 조정 (20MP = 4500x4500, 50MP=8000x6000) Image.MAX_IMAGE_PIXELS = max_px self.logger.log(f"Image.MAX_IMAGE_PIXELS set to {max_px}", level=logging.DEBUG) # ----------------------------------------------------------------------------- # 각 모듈을 개별적으로 초기화하고 예외 처리 self.ocr_module = None # 기본값 설정 try: self.ocr_module = Onnx_OCRModule(logger=self.logger, base_dir=self.base_dir, gpu_manager=self.gpu_manager, toggle_states=self.toggle_states) self.logger.log("✅ ONNX OCR 모듈 초기화 성공", level=logging.INFO) except Exception as e: self.logger.log(f"❌ ONNX OCR 모듈 초기화 실패: {e}", level=logging.ERROR, exc_info=True) self.ocr_module = None # 명시적으로 None 설정 # try: # self.ai_translator = GemmaTranslator( # base_url=self.toggle_states.get("gemma_api_base_url", "http://192.168.0.146:8000"), # timeout=int(self.toggle_states.get("gemma_api_timeout", 120)), # ) # self.logger.log(f"gemma_api_base_url: {self.ai_translator.base_url}", level=logging.DEBUG) # self.logger.log(f"GemmaTranslator 연결: base={self.ai_translator.base_url}", level=logging.INFO) # except Exception as e: # self.logger.log(f"GemmaTranslator 연결 실패: {e}", level=logging.ERROR, exc_info=True) # self.ai_translator = None # try: # # CUDNN 버전 불일치 문제 해결을 위한 force_cpu 옵션 # # toggle_states에서 force_cpu_ocr 설정 확인 (기본값: False) # force_cpu_ocr = self.toggle_states.get('force_cpu_ocr', False) # force_cpu_ocr = True # self.ocr_module = Paddle_OCRModule( # logger=self.logger, # base_dir=self.base_dir, # gpu_manager=self.gpu_manager, # force_cpu=force_cpu_ocr # ) # if force_cpu_ocr: # self.logger.log("✅ PaddleOCR 모듈 초기화 성공 (CPU 강제 모드)", level=logging.INFO) # else: # self.logger.log("✅ PaddleOCR 모듈 초기화 성공", level=logging.INFO) # except Exception as e: # self.logger.log(f"❌ PaddleOCR 모듈 초기화 실패: {e}", level=logging.ERROR, exc_info=True) # self.ocr_module = None # 명시적으로 None 설정 try: self.mask_module = MaskModule(logger=self.logger, base_dir=self.base_dir) self.logger.log("MaskModule 초기화 성공", level=logging.DEBUG) except Exception as e: self.logger.log(f"MaskModule 초기화 실패: {e}", level=logging.ERROR, exc_info=True) try: self.text_rendering_module = TextRenderingModuleOptimized(logger=self.logger, font_path=self.font_path) self.logger.log("TextRenderingModule 초기화 성공", level=logging.DEBUG) except Exception as e: self.logger.log(f"TextRenderingModule 초기화 실패: {e}", level=logging.ERROR, exc_info=True) try: self.postImageManager = PostImageManager(logger=self.logger, toggle_states=self.toggle_states) self.logger.log("PostImageManager 초기화 성공", level=logging.DEBUG) except Exception as e: self.logger.log(f"PostImageManager 초기화 실패: {e}", level=logging.ERROR, exc_info=True) # PostImageManager는 중요한 모듈이므로 기본적인 fallback 생성 try: self.postImageManager = self._create_fallback_post_image_manager() self.logger.log("PostImageManager fallback 생성 성공", level=logging.INFO) except Exception as e2: self.logger.log(f"PostImageManager fallback 생성도 실패: {e2}", level=logging.ERROR, exc_info=True) # self.background_removal_module = BackgroundRemovalModule(logger=self.logger, default_model="birefnet-general") # self.background_removal_module = PPMattingBackgroundRemovalModule(logger=self.logger, default_model="ppmatting-hrnet-1x") # self.background_removal_module = PPMattingBackgroundRemovalModule(logger=self.logger) self.local_rembg_model_path = os.path.join(self.base_dir, "rembg_models", "BriaRMBG1.4_model_fp16.onnx") try: # Request_AI_Server에도 GPU 상태 전달 self.request_ai_server = Request_AI_Server( logger=self.logger, gpu_manager=self.gpu_manager, local_rembg_model_path=self.local_rembg_model_path ) self.logger.log("Request_AI_Server 초기화 성공", level=logging.DEBUG) except Exception as e: self.logger.log(f"Request_AI_Server 초기화 실패: {e}", level=logging.ERROR, exc_info=True) try: self.gtranslate = GoogleTranslate() if self.gtranslate is not None: self.logger.log(f"GoogleTranslate 초기화 성공", level=logging.DEBUG) else: self.logger.log(f"GoogleTranslate 초기화 결과: None", level=logging.DEBUG) except Exception as e: self.logger.log(f"GoogleTranslate 초기화 실패: {e}", level=logging.ERROR, exc_info=True) # MIGAN ONNX 파이프라인 준비(옵션 토글 기반) try: from modules.migan_module import build_migan_from_toggle # MIGAN이 실제로 사용될 때만 초기화 (inpaint_method이 'migan'이거나 로컬 inpaint_method가 'migan'일 때) inpaint_method = self.toggle_states.get("inpaint_method", "request") local_inpaint_method = self.toggle_states.get("local_inpaint_method", "migan") should_init_migan = ( self.toggle_states.get("migan_onnx_path") and (inpaint_method == "migan" or local_inpaint_method == "migan") ) if should_init_migan: # GPU 상태에 따라 CUDA 사용 여부 결정 enhanced_toggle_states = self.toggle_states.copy() if self.gpu_manager and self.gpu_manager.can_use_cuda: enhanced_toggle_states["migan_use_cuda"] = enhanced_toggle_states.get("migan_use_cuda", False) self.logger.log(f"MIGAN CUDA 사용 설정: {enhanced_toggle_states['migan_use_cuda']}", level=logging.DEBUG) else: enhanced_toggle_states["migan_use_cuda"] = False self.logger.log("MIGAN CUDA 사용 불가 - CPU 모드로 설정", level=logging.DEBUG) self.logger.log(f"[MIGAN] GPU 관리자 전달: {type(self.gpu_manager).__name__}, can_use_cuda: {self.gpu_manager.can_use_cuda if self.gpu_manager else 'N/A'}", level=logging.DEBUG) self.migan = build_migan_from_toggle(enhanced_toggle_states, logger=self.logger, gpu_manager=self.gpu_manager) self.logger.log(f"[MIGAN] 초기화 완료: gpu_manager 속성={hasattr(self.migan, 'gpu_manager')}, 값={getattr(self.migan, 'gpu_manager', None)}", level=logging.DEBUG) else: self.migan = None self.logger.log(f"MIGAN 초기화 건너뜀: inpaint_method={inpaint_method}, local_inpaint_method={local_inpaint_method}, migan_onnx_path={bool(self.toggle_states.get('migan_onnx_path'))}", level=logging.DEBUG) except Exception as e: self.migan = None self.logger.log(f"MIGAN 초기화 실패: {e}", level=logging.ERROR, exc_info=True) # 인페인팅 실행 정보(마지막 사용 방식/장치) 추적용 내부 상태 self._last_inpaint_used = None self._last_inpaint_device = None # 외부 서버 헬스 체크 플래그 self.is_external_server_alive = False except Exception as e: self.logger.log(f"ImageProcessor3 초기화 중 치명적 오류 발생: {e}", level=logging.ERROR, exc_info=True) # 치명적 오류 발생 시에도 기본 속성들이 None으로라도 설정되도록 보장 def _resolve_font_path(self, font_type): """font_type("폰트1" 등)에 해당하는 실제 폰트 파일 경로를 반환""" import json fonts_dir = os.path.join(self.base_dir, "fonts") map_file = os.path.join(fonts_dir, "fonts_map.json") # 기본 매핑 테이블 (파일 로드 실패 시 사용) default_map = { "폰트1": "HakgyoansimDunggeunmisoTTFB.ttf", "폰트2": "NanumBarunGothic.ttf", "폰트3": "NanumSquareRoundR.ttf", "폰트4": "gamtanload.ttf", "폰트5": "Cafe24Ohsquare-v2.0.ttf", "폰트6": "GmarketSansTTFMedium.ttf", "폰트7": "Paperlogy-5Medium.ttf", "폰트8": "Pretendard-Regular.ttf", } font_map = default_map # fonts_map.json 로드 시도 try: if os.path.exists(map_file): with open(map_file, "r", encoding="utf-8") as f: font_map = json.load(f) except Exception as e: self.logger.log(f"폰트 매핑 파일 로드 실패 ({map_file}): {e}, 기본값 사용", level=logging.WARNING) # 1. font_type에 매핑된 파일 찾기 font_key = str(font_type).strip() # 매핑에 없으면 기본값(폰트1) 사용 font_filename = font_map.get(font_key, font_map.get("폰트1", default_map["폰트1"])) # 2. 실제 경로 구성 candidate_path = os.path.join(fonts_dir, font_filename) if os.path.exists(candidate_path): return candidate_path # 3. 파일이 없으면 기본 폰트(폰트1)로 폴백 default_filename = font_map.get("폰트1", default_map["폰트1"]) default_path = os.path.join(fonts_dir, default_filename) if os.path.exists(default_path): return default_path # 4. 정말 아무것도 없으면 시스템 폰트 등 최후의 수단 (혹은 예외) fallback_candidates = [ "C:/Windows/Fonts/malgun.ttf", "C:/Windows/Fonts/gulim.ttc" ] for p in fallback_candidates: if os.path.exists(p): return p return default_path # 경로가 없어도 일단 리턴 def _create_fallback_post_image_manager(self): """PostImageManager 초기화 실패 시 사용할 fallback 객체 생성""" class FallbackPostImageManager: """PostImageManager의 최소한 기능을 제공하는 fallback 클래스""" def __init__(self, logger, toggle_states): self.logger = logger self.toggle_states = toggle_states self.font = None def update_toggle_states(self, toggle_states): """toggle_states 업데이트""" self.toggle_states = toggle_states self.logger.log("FallbackPostImageManager toggle_states 업데이트됨", level=logging.DEBUG) def save_image_to_path(self, image, path): """기본적인 이미지 저장 기능""" try: if hasattr(image, 'save'): image.save(path) return True else: self.logger.log("이미지 객체에 save 메서드가 없습니다", level=logging.ERROR) return False except Exception as e: self.logger.log(f"Fallback 이미지 저장 실패: {e}", level=logging.ERROR) return False def crop_image(self, image, is_thumb=False, crop_percentage=0.01): """기본적인 이미지 크롭 기능""" try: if hasattr(image, 'size') and hasattr(image, 'crop'): width, height = image.size left = width * crop_percentage top = height * crop_percentage right = width * (1 - crop_percentage) bottom = height * (1 - crop_percentage) return image.crop((left, top, right, bottom)) else: self.logger.log("이미지 객체에 필요한 메서드가 없습니다", level=logging.ERROR) return image except Exception as e: self.logger.log(f"Fallback 이미지 크롭 실패: {e}", level=logging.ERROR) return image return FallbackPostImageManager(self.logger, self.toggle_states) def update_toggle_states(self, toggle_states): self.toggle_states = toggle_states # 1. 멤버십 및 권한 정보 업데이트 self.is_member_valid = self.toggle_states.get('membership_level', 'basic') == 'vip' or self.authenticated_by_admin # 2. 인페인팅 설정 업데이트 self.inpaint_method = self.toggle_states.get("inpaint_method", "migan") self.use_local_rembg = self.toggle_states.get("use_local_rembg", False) self.local_model_name = self.toggle_states.get("local_model_name", 'birefnet-general-lite') # 3. 폰트 경로 및 렌더링 모듈 업데이트 try: new_font_path = self._resolve_font_path(self.toggle_states.get("font_type", "폰트1")) # 폰트가 변경되었다면 렌더링 모듈 재생성 if new_font_path and new_font_path != self.font_path: self.font_path = new_font_path # toggle_states에도 반영 self.toggle_states['image_font_path'] = self.font_path if hasattr(self, 'text_rendering_module'): try: from modules.text_rendering_module import TextRenderingModuleOptimized self.text_rendering_module = TextRenderingModuleOptimized(logger=self.logger, font_path=self.font_path) self.logger.log(f"폰트 변경으로 텍스트 렌더링 모듈 재생성: {self.font_path}", level=logging.INFO) except Exception as tr_err: self.logger.log(f"텍스트 렌더링 모듈 재생성 실패: {tr_err}", level=logging.ERROR) except Exception as e: self.logger.log(f"폰트 정보 업데이트 중 오류: {e}", level=logging.WARNING) if self.postImageManager is not None: try: self.postImageManager.update_toggle_states(self.toggle_states) self.logger.log(f"이미지 프로세서 toggle_states 업데이트 : {self.toggle_states}", level=logging.DEBUG) except Exception as e: self.logger.log(f"PostImageManager toggle_states 업데이트 중 오류: {e}", level=logging.ERROR, exc_info=True) else: self.logger.log("PostImageManager가 None이므로 toggle_states 업데이트를 건너뜁니다.", level=logging.WARNING) # 외부 서버 헬스 체크 (toggle_states 업데이트 시마다 수행) if self.inpaint_method == 'external_request': self.is_external_server_alive = self.check_external_server_availability() self.logger.log(f"외부 인페인팅 서버 상태 확인: {self.is_external_server_alive}", level=logging.DEBUG) self.logger.log(f"[UpdateToggle] 완료: member={self.is_member_valid}, inpaint={self.inpaint_method}, font={os.path.basename(self.font_path)}", level=logging.DEBUG) def update_unwanted_texts(self, texts): self.unwanted_texts = texts self.logger.log(f"이미지프로세서 unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG) def __del__(self): """소멸자에서 리소스 정리""" self.cleanup() self.logger.log("이미지 프로세서 소멸", level=logging.DEBUG) def cleanup(self): """리소스 정리""" try: # OCR 모듈 정리 if hasattr(self, 'ocr_module'): try: del self.ocr_module self.logger.log("OCR 모듈 정리 완료", level=logging.DEBUG) except Exception as e: self.logger.log(f"OCR 모듈 정리 중 오류: {e}", level=logging.WARNING) # 마스크 모듈 정리 if hasattr(self, 'mask_module'): try: del self.mask_module self.logger.log("마스크 모듈 정리 완료", level=logging.DEBUG) except Exception as e: self.logger.log(f"마스크 모듈 정리 중 오류: {e}", level=logging.WARNING) # 텍스트 렌더링 모듈 정리 if hasattr(self, 'text_renderer'): try: del self.text_renderer self.logger.log("텍스트 렌더링 모듈 정리 완료", level=logging.DEBUG) except Exception as e: self.logger.log(f"텍스트 렌더링 모듈 정리 중 오류: {e}", level=logging.WARNING) # GPU 메모리 정리 if hasattr(self, 'gpu_manager') and self.gpu_manager and self.gpu_manager.can_use_cuda: try: import paddle if hasattr(paddle, 'device') and hasattr(paddle.device, 'cuda'): paddle.device.cuda.empty_cache() self.logger.log("CUDA 캐시 정리 완료", level=logging.DEBUG) except Exception as e: self.logger.log(f"CUDA 캐시 정리 중 오류: {e}", level=logging.WARNING) # Python GC 강제 실행 import gc gc.collect() # OpenCV 윈도우 정리 try: cv2.destroyAllWindows() except: pass # 임시 폴더 삭제 if hasattr(self, 'TEMP_IMAGE_DIR') and os.path.exists(self.TEMP_IMAGE_DIR): # shutil.rmtree(self.TEMP_IMAGE_DIR) self.logger.log(f"임시 폴더 삭제됨: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG) except Exception as e: self.logger.log(f"리소스 정리 중 오류: {e}", level=logging.ERROR, exc_info=True) def reset_ocr_module(self): """OCR 모듈을 명시적으로 삭제하고 재생성.""" try: self.logger.log("🔄 OCR 모듈 재초기화 시작", level=logging.DEBUG) # 기존 모듈 참조 해제 if hasattr(self, 'ocr_module'): del self.ocr_module self.logger.log("기존 OCR 모듈 참조 해제 완료", level=logging.DEBUG) # 안전을 위해 먼저 None으로 설정 self.ocr_module = None # CUDA 메모리 정리 (GPU 사용 시) if hasattr(self, 'gpu_manager') and self.gpu_manager and self.gpu_manager.can_use_cuda: try: import paddle if hasattr(paddle, 'device') and hasattr(paddle.device, 'cuda'): paddle.device.cuda.empty_cache() self.logger.log("CUDA 캐시 정리 완료", level=logging.DEBUG) except Exception as e: self.logger.log(f"CUDA 캐시 정리 실패: {e}", level=logging.WARNING) # Python GC 강제 실행 (여러 번 실행으로 더 강력한 정리) import gc for i in range(3): collected = gc.collect() if collected > 0: self.logger.log(f"GC 실행 {i+1}: {collected}개 객체 정리", level=logging.DEBUG) # ONNX OCR 모듈로 재초기화 try: from modules.onnx_ocr_module.src.onnx_ocr_wrapper import ONNXOCRModule as OCRModule self.ocr_module = OCRModule( logger=self.logger, base_dir=self.base_dir, gpu_manager=self.gpu_manager, toggle_states=self.toggle_states, ) self._ocr_call_count = 0 self.logger.log("✅ ONNX OCR 모듈 재초기화 완료", level=logging.INFO) return True except Exception as init_error: self.logger.log(f"❌ ONNX OCR 모듈 재초기화 중 오류: {init_error}", level=logging.ERROR, exc_info=True) self.ocr_module = None return False except Exception as e: self.logger.log(f"❌ OCR 모듈 재초기화 실패: {e}", level=logging.ERROR, exc_info=True) # 안전을 위해 None으로 설정 self.ocr_module = None return False def is_valid_image_path(self, path): # http/https 또는 로컬 파일(.jpg, .png 등) 모두 허용 if re.match(r'^(http|https)://.*\.(jpg|jpeg|png|bmp|gif|webp|tiff?)(\?.*)?$', path, re.IGNORECASE): return True if os.path.isfile(path) and path.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tif', '.tiff')): return True return False async def process_single_image(self, original_image_url, index, delay=1.0, file_prefix=""): """ 단일 이미지를 처리합니다 (다운로드 -> OCR -> 인페인팅) Args: page: Playwright 페이지 객체 original_image_url (str): 처리할 이미지 URL index (int): 이미지 인덱스 is_localServer (bool): 로컬 서버 사용 여부 delay (float): 요청 간격 (초) file_prefix (str): 파일명에 추가할 접두사 (예: "detail", "option") Returns: dict: 처리 결과를 포함한 딕셔너리 - status: 'inpainted', 'original', 'exclude', 'error' 중 하나 - path: 처리된 이미지 파일 경로 또는 원본 이미지 파일 경로 - error: 오류 메시지 (status가 'error'인 경우에만 포함) """ local_image_path = None delay = random.uniform(0.5, 1.5) delay = delay / 3 # 봇 탐지 회피를 위해 요청 간격 조절 - 자체번역으로 간격 최소화 # 파이프라인 타이밍 계측 시작 import time as _time _t_all_start = time.time() _timings_ms = {} # 직전 인페인트 사용 방식 초기화 self._last_inpaint_used = None self._last_inpaint_device = None # self.logger.log(f"unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG) self.logger.log(f"이미지 번역시작", level=logging.DEBUG) self.logger.log(f"toggle_states: {self.toggle_states}", level=logging.DEBUG) try: # 0. 이미지 URL 유효성 체크 (http/https & 이미지 확장자) if not original_image_url or not isinstance(original_image_url, str): self.logger.log(f"이미지 {index+1} 처리 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING) return {'status': 'failed', 'path': original_image_url, 'error': '이미지 URL 없음 또는 타입 오류', 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device} if not self.is_valid_image_path(original_image_url): self.logger.log(f"이미지 {index+1} 처리 중단: 유효하지 않은 이미지 주소 - {original_image_url}", level=logging.WARNING) return {'status': 'failed', 'path': original_image_url, 'error': '유효하지 않은 이미지 주소', 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device} # 요청 간격 조절 (봇 탐지 회피) if delay > 0: await asyncio.sleep(delay) # OCR 권한 상태 로그 ocr_enabled = self.toggle_states.get('ocr', False) processing_mode = "OCR+인페인팅 모드" if ocr_enabled else "전체 번역 모드" self.logger.log(f"이미지 {index+1} 처리 시작: {original_image_url} - {processing_mode}", level=logging.DEBUG) # 1. 이미지 다운로드 _t = _time.time() local_image_path = self.download_image(image_url=original_image_url, index=index, file_prefix=file_prefix) _timings_ms["download"] = (_time.time() - _t) * 1000.0 if not local_image_path: self.logger.log(f"이미지 {index+1} 다운로드 실패, 원본 URL 반환", level=logging.WARNING) return {'status': 'failed', 'path': original_image_url, 'error': '다운로드 실패', 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device} # 1-A. 상세페이지 이미지 전처리 (크기 표준화 및 분할) if file_prefix == "detail": local_image_path = await self.preprocess_detail_image(local_image_path, index) if not local_image_path: self.logger.log(f"상세페이지 이미지 {index+1} 전처리 실패", level=logging.WARNING) return {'status': 'failed', 'path': original_image_url, 'error': '상세페이지 이미지 전처리 실패'} # elif file_prefix == "thumb": # 1-B. 썸네일 이미지는 고해상도 입력 다운스케일 (메모리 절약) # max_dim = self.toggle_states.get('max_image_resolution', 1200) # local_image_path = self.downscale_image_if_large(local_image_path, max_dim=max_dim) else: # 1-C. 옵션 이미지는 스케일 처리 건너뛰기 (이미 작은 크기) self.logger.log(f"옵션 이미지는 스케일 처리 건너뛰기: {file_prefix}", level=logging.DEBUG) # 전처리 유형에 따른 로그 메시지 if file_prefix == "detail": processing_type = "상세페이지 전처리 완료" elif file_prefix == "thumb": processing_type = "썸네일 스케일 처리 완료" else: processing_type = "옵션 이미지 원본 유지" self.logger.log(f"이미지 {index+1} 로컬 저장위치({processing_type}): {local_image_path}", level=logging.DEBUG) # 2. OCR 텍스트 감지 _t = _time.time() # 메모리 추적: OCR 시작 전 ocr_before_mem = psutil.virtual_memory() ocr_before_mb = ocr_before_mem.used / 1024 / 1024 ocr_results = self.safe_detect(local_image_path) _timings_ms["ocr"] = (_time.time() - _t) * 1000.0 # 메모리 추적: OCR 완료 후 ocr_after_mem = psutil.virtual_memory() ocr_after_mb = ocr_after_mem.used / 1024 / 1024 ocr_change_mb = ocr_after_mb - ocr_before_mb ocr_change_percent = (ocr_change_mb / ocr_before_mb) * 100 if ocr_before_mb > 0 else 0 self.logger.log( f"메모리 변화 [OCR 처리]: {ocr_before_mb:.1f}MB -> {ocr_after_mb:.1f}MB " f"({ocr_change_mb:+.1f}MB, {ocr_change_percent:+.1f}%) - 이미지 {index+1}", level=logging.DEBUG if abs(ocr_change_mb) < 10 else logging.INFO ) self.logger.log(f"ocr_results: {ocr_results}", level=logging.DEBUG) # 2-A. 상세페이지인 경우 OCR 정보 수집 및 저장 if file_prefix == "detail": store_ocr_to_db = self.toggle_states.get('store_ocr_data_to_db', False) # 기본값: False if store_ocr_to_db: await self.collect_and_store_ocr_data(original_image_url, local_image_path, ocr_results, index) else: # 메모리에만 저장 (현재 세션용) await self.collect_ocr_data_in_memory(original_image_url, local_image_path, ocr_results, index) filter_ocr_results = self.filter_ocr_results(ocr_results) self.logger.log(f"filter_ocr_results: {filter_ocr_results}", level=logging.DEBUG) ocr_count = len(filter_ocr_results) # 3. OCR 모듈 상태 확인 및 중국어 텍스트 검사 if not hasattr(self, 'ocr_module') or self.ocr_module is None: self.logger.log("⚠️ OCR 모듈이 초기화되지 않아 원본 이미지 반환", level=logging.WARNING) return {'status': 'original', 'path': local_image_path, 'message': 'OCR 모듈 초기화 실패', 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device} # 중국어 텍스트가 없는 경우 정상 케이스로 처리 if not self.ocr_module.filter_chinese_text(filter_ocr_results): self.logger.log(f"이미지 {index+1} 중국어 텍스트 없음 - 정상 케이스 (NO_TEXT)", level=logging.INFO) return {'status': 'original', 'path': local_image_path, 'message': '중국어 텍스트가 발견되지 않았습니다', 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device} # 4. 한글 텍스트가 존재하는 경우 원본 이미지 반환으로 번역 패스 if self.ocr_module.filter_korean_text(filter_ocr_results): self.logger.log(f"이미지 {index+1} 한글 텍스트 존재, 원본 이미지 반환", level=logging.DEBUG) return {'status': 'original', 'path': local_image_path, 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device} # 4. 병렬 실행: 번역(I/O) ↔ 마스크 생성(CPU) # - OCR 결과를 기반으로 두 작업은 서로 독립이므로 동시에 수행 가능 if ocr_count < 5: expansion_size = 12 blur_size = 15 elif ocr_count < 10: expansion_size = 9 blur_size = 12 elif ocr_count < 15: expansion_size = 7 blur_size = 9 elif ocr_count < 20: expansion_size = 5 blur_size = 6 else: expansion_size = 10 blur_size = 15 loop = asyncio.get_running_loop() _t_trans = _time.time() _t_mask = _time.time() translate_future = loop.run_in_executor( None, lambda: self.batch_google_translate_texts(filter_ocr_results) ) mask_future = loop.run_in_executor( None, lambda: self.mask_module.create_masks( image_path=local_image_path, ocr_results=filter_ocr_results, mask_option="basic", expansion_size=expansion_size, blur_size=blur_size ) ) translated_texts, masks = await asyncio.gather(translate_future, mask_future) _timings_ms["translate"] = (_time.time() - _t_trans) * 1000.0 _timings_ms["mask"] = (_time.time() - _t_mask) * 1000.0 self.logger.log(f"translated_texts: {translated_texts}", level=logging.DEBUG) self.logger.log(f"마스크 생성 완료", level=logging.DEBUG) # 5. OCR 권한에 따른 텍스트 필터링 if ocr_enabled: filtered_translated_texts = self.process_translated_texts(translated_texts, local_image_path, index) if not filtered_translated_texts: self.logger.log(f"이미지 {index+1} 제외됨", level=logging.DEBUG) return {'status': 'exclude', 'path': local_image_path, 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device} else: self.logger.log(f"이미지 {index+1} 치환됨", level=logging.DEBUG) else: # OCR 권한이 없으면 번역된 텍스트를 그대로 사용 filtered_translated_texts = translated_texts self.logger.log(f"이미지 {index+1} OCR 권한 없음, 전체 번역 모드", level=logging.DEBUG) # ------------------- 인페인트 엔진 자동 선택 (옵션) ------------------- try: inpaint_model_pref = self.toggle_states.get('inpaint_model', 'auto').lower() except Exception: inpaint_model_pref = 'auto' if inpaint_model_pref in ['lama', 'migan', 'cv']: # 명시적 모델 지정 시 강제 forced_map = {'lama': 'request', 'migan': 'migan', 'cv': 'cv'} self.inpaint_method = forced_map.get(inpaint_model_pref, self.inpaint_method) self.logger.log(f"[inpaint_model 강제] {inpaint_model_pref} → inpaint_method={self.inpaint_method}", level=logging.INFO) else: # auto: 마스크 통계 기반으로 결정 try: analysis = self._analyze_mask_stats(masks) # 임계값 (토글로 조정 가능) area_large_thr = float(self.toggle_states.get('inpaint_auto_area_large_thresh', 0.08)) area_small_thr = float(self.toggle_states.get('inpaint_auto_area_small_thresh', 0.03)) dist_thr = float(self.toggle_states.get('inpaint_auto_min_distance_ratio', 0.10)) choose = 'external_request' # 기본: lama(서버) if analysis['coverage_ratio'] <= area_small_thr and \ analysis['component_count'] >= 1 and \ analysis['min_centroid_distance_ratio'] >= dist_thr: choose = 'migan' elif analysis['coverage_ratio'] >= area_large_thr: choose = 'external_request' else: # 중간대: lama 우선 (품질 우선) choose = 'external_request' self.logger.log( f"[AUTO Inpaint] coverage={analysis['coverage_ratio']:.3f}, comps={analysis['component_count']}, " f"min_center_dist={analysis['min_centroid_distance_ratio']:.3f} → {choose}", level=logging.INFO, ) self.inpaint_method = choose except Exception as auto_err: self.logger.log(f"AUTO 인페인트 선택 실패: {auto_err}", level=logging.WARNING) # if not self.is_frozen(): # # 디버깅 이미지 저장 (OCR 박스 + 마스크 시각화) # self.save_debug_images(local_image_path, filter_ocr_results, masks, index, file_prefix) self.logger.log(f"ocr_count: {ocr_count}", level=logging.DEBUG) self.logger.log(f"is_member_valid: {self.is_member_valid}", level=logging.DEBUG) # 인페인팅 방법 설정 self.set_inpaint_method(file_prefix) self.logger.log(f"최종 inpaint_method: {self.inpaint_method}", level=logging.DEBUG) # self.inpaint_method = 'migan' # 인페인팅 실행 (폴백 순서: 자체서버 > GPU > CPU) _t = _time.time() inpainted_image = self.execute_inpaint_with_fallback(local_image_path, masks, ocr_count) _timings_ms["inpaint"] = (_time.time() - _t) * 1000.0 self.logger.log(f"인페인팅 완료", level=logging.DEBUG) # # 개발환경에서 인페인트 결과 디버깅 저장 # if not self.is_frozen(): # try: # self.save_inpaint_debug_image(inpainted_image, index, file_prefix) # except Exception: # pass # 인페인팅 실패 시 원본 이미지 사용 if inpainted_image is None: self.logger.log(f"인페인팅 실패, 원본 이미지 사용", level=logging.WARNING) inpainted_image = cv2.imread(local_image_path) if inpainted_image is None: self.logger.log(f"원본 이미지 로드 실패: {local_image_path}", level=logging.ERROR) return {'status': 'failed', 'path': local_image_path, 'error': '원본 이미지 로드 실패', 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device} # 텍스트 렌더링 _t = _time.time() text_rendered_image = self.text_rendering_module.render_text( inpainted_image, filter_ocr_results, filtered_translated_texts) _timings_ms["render"] = (_time.time() - _t) * 1000.0 self.logger.log(f"텍스트 렌더링 완료", level=logging.DEBUG) # 결과 저장 _t = _time.time() translated_img_path = await self.postProcess_and_save_image(local_image_path, text_rendered_image, index, file_prefix) _timings_ms["save"] = (_time.time() - _t) * 1000.0 self.logger.log(f"이미지 {index+1} 번역 완료: {translated_img_path}", level=logging.DEBUG) # GPU 메모리 사용량 로깅 (CUDA 사용 시) if self.gpu_manager and self.gpu_manager.can_use_cuda: self.gpu_manager.log_gpu_memory_usage() return {'status': 'translated', 'path': translated_img_path, 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device} except Exception as e: self.logger.log(f"이미지 {index+1} 처리 중 오류: {e}", level=logging.ERROR, exc_info=True) return {'status': 'failed', 'path': local_image_path or original_image_url, 'error': str(e), 'inpaint_method': self._last_inpaint_used, 'inpaint_device': self._last_inpaint_device} finally: # ---- 메모리 해제 ---- try: ocr_results = None filter_ocr_results = None translated_texts = None masks = None inpainted_image = None text_rendered_image = None except Exception: pass # download_image 단계에서 사용한 page, local_image_path 도 참조 제거 local_image_path = None # 최종 GC 강제 실행 gc.collect() # 파이프라인 시간 요약 로그 try: _t_all_end = _time.time() total_ms = (_t_all_end - _t_all_start) * 1000.0 parts = [] label_map = { "download": "download", "ocr": "ocr", "translate": "translate", "mask": "mask", "inpaint": "inpaint", "render": "render", "save": "save", } for k in ["download", "ocr", "translate", "mask", "inpaint", "render", "save"]: if k in _timings_ms: if k == "inpaint" and getattr(self, "_last_inpaint_used", None): used = self._last_inpaint_used dev = getattr(self, "_last_inpaint_device", None) or "CPU" parts.append(f"{label_map[k]}={_timings_ms[k]:.1f}ms({used}/{dev})") else: parts.append(f"{label_map[k]}={_timings_ms[k]:.1f}ms") timeline = " | ".join(parts) if parts else "" self.logger.log( f"⏱ 이미지 파이프라인 총 {total_ms:.1f}ms{(' | ' + timeline) if timeline else ''}", level=logging.DEBUG, ) # 결과 timings를 리턴 데이터에 포함시키기 위해 attrs에 저장(워커가 그대로 전달) self._last_timings = {"total_ms": total_ms, **{k: float(f"{v:.1f}") for k, v in _timings_ms.items()}} except Exception: pass def set_inpaint_method(self, file_prefix: str) -> None: """인페인팅 방법 설정 (CPU=cv, GPU=migan, 자체서버=request, 기타=cv)""" # file_prefix → toggle_states 키 매핑 key_by_prefix = { "thumb": "thumb_trans_type", "detail": "detail_IMGTrans_type", "option": "optionIMGTrans_type", # 수정: option_IMGTrans_type → optionIMGTrans_type } # 해당 키가 없으면 기본 'CPU' target_key = key_by_prefix.get(file_prefix, "") trans_type = self.toggle_states.get(target_key, "CPU") # 변환 타입 → 실제 메서드 매핑 method_map = { "CPU": "cv", # CPU 선택 시 OpenCV 인페인팅 "GPU": "migan", # GPU 선택 시 MIGAN 인페인팅 "자체서버": "external_request", # 자체서버 선택 시 Request 인페인팅 } self.inpaint_method = method_map.get(trans_type, "cv") # 기타는 cv로 폴백 self.logger.log(f"[set_inpaint_method] prefix={file_prefix}, target_key={target_key}, trans_type={trans_type} → inpaint_method={self.inpaint_method}", level=logging.DEBUG) def execute_inpaint_with_fallback(self, local_image_path: str, masks, ocr_count: int): """ 인페인팅 실행 - toggle_states['inpaint_method'] 설정에 따라 분기. 'external_request'인 경우 VIP 체크 후 외부 서버 시도, 실패 시 MIGAN 폴백. 그 외의 경우(또는 폴백 시) MIGAN 사용. Args: local_image_path: 이미지 파일 경로 masks: 마스크 데이터 ocr_count: OCR 결과 개수 Returns: 인페인팅된 이미지 또는 None """ # 메모리 추적: 인페인팅 시작 전 inpaint_before_mem = psutil.virtual_memory() inpaint_before_mb = inpaint_before_mem.used / 1024 / 1024 inpainted_image = None # 1. 사용자 설정 확인 (self.inpaint_method가 설정되어 있으면 최우선, 없으면 토글값) # set_inpaint_method() 또는 자동 로직에 의해 설정된 값이 있으면 그것을 따름 preferred_method = getattr(self, 'inpaint_method', None) if not preferred_method: preferred_method = self.toggle_states.get("inpaint_method", "migan") server_url = self.toggle_states.get("request_inpainting_server_url", "") # 2. External Request 모드일 때 처리 if preferred_method == "external_request": if self.is_member_valid: if not self.is_external_server_alive: self.logger.log("외부 서버 상태 비정상(헬스 체크 실패) -> 로컬 MIGAN으로 폴백", level=logging.WARNING) elif server_url and str(server_url).strip().startswith("http"): # 외부 서버 시도 self.inpaint_method = 'external_request' inpainted_image = self._try_external_inpaint(local_image_path, masks, str(server_url).strip()) if inpainted_image is not None: self._last_inpaint_used = "external_request" self._last_inpaint_device = "SERVER" else: self.logger.log("외부 서버 인페인팅 실패 -> 로컬 MIGAN으로 폴백", level=logging.WARNING) else: self.logger.log("외부 서버 URL이 유효하지 않음 -> 로컬 MIGAN으로 폴백", level=logging.WARNING) else: self.logger.log("VIP 회원이 아님 -> 로컬 MIGAN으로 폴백", level=logging.WARNING) # 3. 로컬 MIGAN 인페인팅 (기본값, 또는 외부 요청 실패/조건 미충족 시) if inpainted_image is None: self.inpaint_method = 'migan' inpainted_image = self._try_migan_inpaint(local_image_path, masks) # _try_migan_inpaint 내부에서 _last_inpaint_used 설정함 # 메모리 추적: 인페인팅 완료 후 inpaint_after_mem = psutil.virtual_memory() inpaint_after_mb = inpaint_after_mem.used / 1024 / 1024 inpaint_change_mb = inpaint_after_mb - inpaint_before_mb inpaint_change_percent = (inpaint_change_mb / inpaint_before_mb) * 100 if inpaint_before_mb > 0 else 0 self.logger.log( f"메모리 변화 [인페인팅]: {inpaint_before_mb:.1f}MB -> {inpaint_after_mb:.1f}MB " f"({inpaint_change_mb:+.1f}MB, {inpaint_change_percent:+.1f}%) - 방법: {self.inpaint_method}", level=logging.DEBUG if abs(inpaint_change_mb) < 10 else logging.INFO ) return inpainted_image def _try_request_inpaint(self, local_image_path: str, masks, ocr_count: int): """자체서버 인페인팅 시도""" try: if not self.is_member_valid: self.logger.log("멤버십이 유효하지 않아 자체서버 인페인팅 건너뜀", level=logging.DEBUG) return None if ocr_count <= 3: self.logger.log(f"OCR 결과가 적어 자체서버 인페인팅 건너뜀 (ocr_count: {ocr_count})", level=logging.DEBUG) return None self.logger.log(f"자체서버 인페인팅 시도", level=logging.DEBUG) # 서버 마스크 규약: 정상(반전 불필요)로 확정됨 # invert_mask = bool(self.toggle_states.get("inpaint_mask_invert", False)) invert_mask = False # inpaint_model = self.toggle_states.get("inpaint_model", "simple-lama") inpaint_model = "migan" inpaint_model = "simple-lama" result = self.request_ai_server.request_inpaint(local_image_path, masks, invert_mask=invert_mask, inpaint_model=inpaint_model) if result is not None: self.logger.log("자체서버 인페인팅 성공", level=logging.DEBUG) self._last_inpaint_used = "external_request" self._last_inpaint_device = "SERVER" return result else: self.logger.log("자체서버 인페인팅 실패", level=logging.WARNING) return None except Exception as e: self.logger.log(f"자체서버 인페인팅 중 오류: {e}", level=logging.WARNING, exc_info=True) return None def check_external_server_availability(self): """외부 인페인팅 서버 유효성 체크""" try: server_url = self.toggle_states.get("request_inpainting_server_url", "") if not server_url or not str(server_url).strip().startswith("http"): return False if self.request_ai_server: return self.request_ai_server.is_server_alive(str(server_url).strip()) return False except Exception: return False def _try_external_inpaint(self, local_image_path: str, masks, server_url: str): """외부 서버 인페인팅 시도""" try: if self.request_ai_server is None: return None self.logger.log(f"외부 인페인팅 시도: {server_url}", level=logging.DEBUG) # 모델명은 필요하면 토글에서 가져올 수 있음, 현재는 기본값 result = self.request_ai_server.request_external_inpaint(local_image_path, masks, server_url) return result except Exception as e: self.logger.log(f"외부 인페인팅 중 오류: {e}", level=logging.WARNING, exc_info=True) return None def _try_migan_inpaint(self, local_image_path: str, masks): """MIGAN GPU 인페인팅 시도""" try: if getattr(self, "migan", None) is None: self.logger.log("MIGAN 모듈이 초기화되지 않아 건너뜀", level=logging.DEBUG) return None self.logger.log("MIGAN 인페인팅 시도", level=logging.DEBUG) result = self.migan.inpaint(local_image_path, masks) if result is not None: self.logger.log("MIGAN 인페인팅 성공", level=logging.DEBUG) # 사용 장치 기록 try: providers = [] if hasattr(self.migan, "session") and hasattr(self.migan.session, "get_providers"): providers = self.migan.session.get_providers() if any("Dml" in p for p in providers): dev = "DirectML" elif any("CUDA" in p for p in providers): dev = "CUDA" else: dev = "CPU" except Exception: dev = "GPU" if (hasattr(self, "gpu_manager") and self.gpu_manager and getattr(self.gpu_manager, "can_use_cuda", False)) else "CPU" self._last_inpaint_used = "migan" self._last_inpaint_device = dev return result else: self.logger.log("MIGAN 인페인팅 실패", level=logging.WARNING) return None except Exception as e: self.logger.log(f"MIGAN 인페인팅 중 오류: {e}", level=logging.WARNING, exc_info=True) return None def _try_opencv_inpaint(self, local_image_path: str, masks): """MIGAN 통일 이후 비활성화(호환용). 항상 None 반환""" self.logger.log("OpenCV 인페인팅 경로는 비활성화됨(MIGAN 통일)", level=logging.DEBUG) return None # OCR 결과 필터링: 중국어 텍스트만 필터링 def filter_ocr_results(self, ocr_results): import re # 중국어 문자 범위 정규식 (한자) chinese_pattern = re.compile(r'[\u4e00-\u9fff]+') filtered_results = [] for r in ocr_results: text = r.get('text', '').strip() polygon = r.get('polygon', []) confidence = r.get('confidence', 0.0) # 텍스트가 비어있거나 polygon이 3점 미만이면 제외 if not text or not polygon or len(polygon) < 3: self.logger.log(f"[필터링] 제외 (텍스트/폴리곤): '{text}'", level=logging.DEBUG) continue # 신뢰도 20% 미만이면 제외 if confidence < 0.05: self.logger.log(f"[필터링] 제외 (신뢰도 {confidence:.1%}): '{text}'", level=logging.DEBUG) continue # 중국어 문자가 포함된 텍스트만 필터링 if chinese_pattern.search(text): filtered_results.append(r) self.logger.log(f"[필터링] 포함 (신뢰도 {confidence:.1%}): '{text}'", level=logging.DEBUG) else: self.logger.log(f"[필터링] 제외 (중국어 없음): '{text}'", level=logging.DEBUG) self.logger.log(f"필터링 결과: {len(filtered_results)}/{len(ocr_results)}개 (신뢰도 + & 중국어)", level=logging.DEBUG) return filtered_results async def postProcess_and_save_image(self, local_image_path, text_rendered_image, index, file_prefix=""): """로컬 서버 URL을 사용해 이미지를 번역하고 로컬에 저장합니다""" try: # text_rendered_image가 None인 경우 처리 if text_rendered_image is None: self.logger.log(f"이미지 {index+1} 번역 결과가 None입니다. 원본 이미지를 반환합니다.", level=logging.WARNING) return local_image_path # 파일명에 접두사 포함 (설정에 따른 이미지 형식 선택) # 기본값: WebP (최고 압축률), 호환성 필요시 PNG 사용 가능 image_format = self.toggle_states.get('output_image_format', 'webp').lower() # WebP 지원 여부 확인 (PIL에서 WebP를 지원하지 않는 경우 PNG로 폴백) if image_format == 'webp': try: if not features.check('webp'): self.logger.log("WebP 지원되지 않음, PNG로 폴백", level=logging.WARNING) image_format = 'png' except Exception: image_format = 'png' # 지원되지 않는 형식인 경우 PNG로 폴백 if image_format not in ['webp', 'png', 'jpg', 'jpeg']: self.logger.log(f"지원되지 않는 형식 {image_format}, PNG로 폴백", level=logging.WARNING) image_format = 'png' file_ext = 'webp' if image_format == 'webp' else ('png' if image_format == 'png' else 'jpg') if file_prefix: img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_{file_prefix}_img_{index+1}.{file_ext}") else: img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_img_{index+1}.{file_ext}") watermark_text=self.toggle_states.get("watermark_text", "이미지 저작권 보유") is_watermark_enabled = watermark_text != "" or self.toggle_states.get("watermark_toggle", False) self.logger.log(f"watermark_text: {watermark_text}", level=logging.DEBUG) self.logger.log(f"is_watermark_enabled: {is_watermark_enabled}", level=logging.DEBUG) image_data_to_save = None # file_prefix가 'detail' 또는 'option'일 때만 워터마크 추가 if is_watermark_enabled and file_prefix in ["detail"]: # image_data_to_save = self.postImageManager.add_watermark( # image_data=text_rendered_image, # watermark_text=watermark_text # ) # 워터마크 기능 비활성화 중이므로 원본 이미지 사용 if isinstance(text_rendered_image, np.ndarray): image_data_to_save = text_rendered_image # 그대로 넘김 else: image_data_to_save = text_rendered_image else: # # np.ndarray라면 PIL.Image로 변환 # if isinstance(text_rendered_image, np.ndarray): # image_data_to_save = Image.fromarray(cv2.cvtColor(text_rendered_image, cv2.COLOR_BGR2RGB)) # else: # image_data_to_save = text_rendered_image if isinstance(text_rendered_image, np.ndarray): image_data_to_save = text_rendered_image # 그대로 넘김 else: image_data_to_save = text_rendered_image final_image_path = self.postImageManager.save_image_to_path(image_data_to_save, img_path) # save_image_to_path가 None을 반환한 경우 처리 if final_image_path is None: self.logger.log(f"이미지 {index+1} 저장 실패. 원본 이미지를 반환합니다.", level=logging.WARNING) return local_image_path return final_image_path except Exception as e: self.logger.log(f"이미지 {index+1} 번역 처리 중 오류: {e}", level=logging.ERROR, exc_info=True) return local_image_path def download_image(self, image_url, index, file_prefix="", max_retries=3): """Requests를 사용해 이미지를 다운로드합니다""" # 로컬 파일 경로면 바로 반환 if os.path.isfile(image_url): self.logger.log(f"로컬 파일 경로 감지, 다운로드 생략: {image_url}", level=logging.DEBUG) return image_url # 로컬 파일 경로가 아니면 다운로드 시도 try: # "https://assets.alicdn.com"으로 시작하는 URL은 건너뛰기 if image_url.startswith("https://assets.alicdn.com") or image_url.startswith("https://gtms01.alicdn.com"): self.logger.log(f"다운로드 제외 URL: {image_url}", level=logging.DEBUG) return None # URL에서 파일명 추출 및 접두사 포함 parsed_url = urlparse(image_url) base_filename = f"image_{index:03d}_{os.path.basename(parsed_url.path)}" if not base_filename.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')): base_filename += '.jpg' # 접두사가 있으면 파일명에 포함 if file_prefix: filename = f"{file_prefix}_{base_filename}" else: filename = base_filename local_path = os.path.join(self.TEMP_IMAGE_DIR, filename) # HTTP 헤더 설정 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate, br", "DNT": "1", # Do Not Track 요청 헤더 "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Cache-Control": "max-age=0" } retries = 0 while retries < max_retries: try: # 메모리 추적: 다운로드 시작 전 before_mem = psutil.virtual_memory() before_mb = before_mem.used / 1024 / 1024 self.logger.log(f"이미지 다운로드 중: {filename}", level=logging.DEBUG) response = requests.get(image_url, headers=headers, stream=True, timeout=30) if response.status_code == 200: image_data = response.content # 이미지 데이터 유효성 검사 if self.is_valid_image_data(image_data): with open(local_path, 'wb') as f: f.write(image_data) # 메모리 추적: 다운로드 완료 후 after_mem = psutil.virtual_memory() after_mb = after_mem.used / 1024 / 1024 change_mb = after_mb - before_mb change_percent = (change_mb / before_mb) * 100 if before_mb > 0 else 0 self.logger.log( f"메모리 변화 [다운로드 완료]: {before_mb:.1f}MB -> {after_mb:.1f}MB " f"({change_mb:+.1f}MB, {change_percent:+.1f}%) - {filename}", level=logging.DEBUG if abs(change_mb) < 10 else logging.INFO ) self.logger.log(f"이미지 다운로드 완료: {filename}", level=logging.DEBUG) return local_path else: self.logger.log(f"유효하지 않은 이미지 데이터: {image_url}", level=logging.WARNING) return None else: self.logger.log(f"이미지 다운로드 실패 (HTTP {response.status_code}): {image_url}. 재시도 {retries + 1}/{max_retries}", level=logging.ERROR) retries += 1 if retries < max_retries: time.sleep(random.randint(2, 5)) # 2~5초 대기 후 재시도 except requests.exceptions.RequestException as e: self.logger.log(f"이미지 다운로드 중 네트워크 오류: {e}. 재시도 {retries + 1}/{max_retries}", level=logging.ERROR) retries += 1 if retries < max_retries: time.sleep(random.randint(2, 5)) # 예외 발생 시 대기 후 재시도 except Exception as e: self.logger.log(f"이미지 다운로드 중 예상치 못한 오류: {e}. 재시도 {retries + 1}/{max_retries}", level=logging.ERROR) retries += 1 if retries < max_retries: time.sleep(random.randint(2, 5)) self.logger.log(f"이미지 다운로드 최대 재시도 횟수를 초과했습니다: {image_url}", level=logging.ERROR) return None except Exception as e: self.logger.log(f"이미지 다운로드 중 오류: {e}", level=logging.ERROR, exc_info=True) return None def is_valid_image_data(self, image_data): """이미지 데이터가 유효한지 확인합니다""" if not image_data or len(image_data) < 100: return False # 일반적인 이미지 파일 시그니처 확인 image_signatures = [ b'\xFF\xD8\xFF', # JPEG b'\x89PNG\r\n\x1a\n', # PNG b'GIF87a', # GIF87a b'GIF89a', # GIF89a b'RIFF', # WebP (RIFF 컨테이너) ] return any(image_data.startswith(sig) for sig in image_signatures) def process_translated_texts(self, translated_texts, local_image_path, index): """ 번역된 단어 리스트(translated_texts)에서 unwanted_texts의 원본값이 앞이나 뒤에 포함되면 치환값으로 바꿉니다. 치환값이 '이미지삭제'라면 None 반환(이미지 제외) """ new_texts = [] for i, text in enumerate(translated_texts): self.logger.log(f"[치환 처리 {i+1}] 원본 텍스트: '{text}'", level=logging.DEBUG) # 텍스트를 빈칸으로 분리 words = text.split() self.logger.log(f"[치환 처리 {i+1}] 분리된 단어: {words}", level=logging.DEBUG) processed_words = [] text_replaced = False for word_idx, word in enumerate(words): word_replaced = False # unwanted_texts와 매칭 확인 for origin, replace in self.unwanted_texts.items(): if word.startswith(origin) or word.endswith(origin) or word == origin: self.logger.log(f"[치환 처리 {i+1}] 단어 '{word}' -> '{replace}' (원본: '{origin}')", level=logging.DEBUG) if replace == "이미지삭제": self.logger.log(f"이미지 {index+1} 제외됨: {local_image_path}", level=logging.DEBUG) return None # 단어 치환 if word == origin: new_word = replace elif word.startswith(origin): new_word = replace + word[len(origin):] elif word.endswith(origin): new_word = word[:-len(origin)] + replace processed_words.append(new_word) word_replaced = True text_replaced = True self.logger.log(f"[치환 처리 {i+1}] 단어 치환 완료: '{word}' -> '{new_word}'", level=logging.DEBUG) break if not word_replaced: processed_words.append(word) # 처리된 단어들을 다시 합치기 final_text = ' '.join(processed_words) new_texts.append(final_text) if text_replaced: self.logger.log(f"[치환 처리 {i+1}] 최종 결과: '{text}' -> '{final_text}'", level=logging.DEBUG) else: self.logger.log(f"[치환 처리 {i+1}] 변경 없음: '{text}'", level=logging.DEBUG) self.logger.log(f"전체 치환 결과: {len(new_texts)}개 텍스트 처리 완료", level=logging.DEBUG) for i, (original, processed) in enumerate(zip(translated_texts, new_texts)): if original != processed: self.logger.log(f"[최종 치환 {i+1}] '{original}' -> '{processed}'", level=logging.DEBUG) return new_texts async def batch_papago_translate_texts(self, ocr_results, delimiter='\n'): """ ocr_results에서 추출한 텍스트 리스트를 줄바꿈으로 합쳐 한 번에 파파고로 번역 후, 다시 분리하여 반환합니다. 각 텍스트 내에 여러 구분자(/, |, ·, , 등)가 있을 경우에도 분리하여 번역 후 다시 합칩니다. """ import re texts = [result['text'] for result in ocr_results if result['text'].strip()] if not texts: return [] # 각 텍스트를 내부적으로 split (/, |, ·, , 등) 후 다시 합침 split_texts = [] split_indices = [] # 각 텍스트가 몇 개의 파트로 쪼개졌는지 기록 for text in texts: parts = re.split(r'\s*[/|·,;、]+\s*', text) parts = [p.strip() for p in parts if p.strip()] split_texts.extend(parts) split_indices.append(len(parts)) # Papago에 한 번에 번역 요청 joined = delimiter.join(split_texts) # 메모리 추적: 번역 시작 전 trans_before_mem = psutil.virtual_memory() trans_before_mb = trans_before_mem.used / 1024 / 1024 try: # 이미 한 번에 여러 줄을 받아서 리스트로 반환! translated_lines = await self.papago_translator.translate(joined, source_lang="zh", target_lang="ko") # Papago가 리스트로 반환한다고 가정! results = translated_lines # 개수가 다르면 fallback(선택사항, 아래는 로깅만) if len(results) != len(split_texts): self.logger.log( f"파파고 번역 줄 개수 불일치: {len(results)} != {len(split_texts)}", level=logging.WARNING ) # 여기서 fallback 처리 또는 그냥 results를 그대로 사용 # 다시 원래 텍스트 단위로 합치기 final_results = [] idx = 0 for count in split_indices: parts = results[idx:idx+count] final_results.append(' / '.join(parts)) idx += count # 메모리 추적: 번역 완료 후 trans_after_mem = psutil.virtual_memory() trans_after_mb = trans_after_mem.used / 1024 / 1024 trans_change_mb = trans_after_mb - trans_before_mb trans_change_percent = (trans_change_mb / trans_before_mb) * 100 if trans_before_mb > 0 else 0 self.logger.log( f"메모리 변화 [Papago 번역]: {trans_before_mb:.1f}MB -> {trans_after_mb:.1f}MB " f"({trans_change_mb:+.1f}MB, {trans_change_percent:+.1f}%) - {len(split_texts)}개 텍스트", level=logging.DEBUG if abs(trans_change_mb) < 10 else logging.INFO ) return final_results except Exception as e: self.logger.log(f"파파고 번역 실패: {e}", level=logging.ERROR, exc_info=True) return texts def batch_google_translate_texts(self, ocr_results, delimiter='\n'): """ ocr_results에서 추출한 텍스트 리스트를 줄바꿈으로 합쳐 한 번에 구글 번역기로 번역 후, 다시 분리하여 반환합니다. 각 텍스트 내에 여러 구분자(/, |, ·, , 등)가 있을 경우에도 분리하여 번역 후 다시 합칩니다. """ import re texts = [result['text'] for result in ocr_results if result['text'].strip()] if not texts: return [] # 각 텍스트를 내부적으로 split (/, |, ·, , 등) 후 다시 합침 split_texts = [] split_indices = [] # 각 텍스트가 몇 개의 파트로 쪼개졌는지 기록 for text in texts: parts = re.split(r'\s*[/|·,;、]+\s*', text) parts = [p.strip() for p in parts if p.strip()] split_texts.extend(parts) split_indices.append(len(parts)) # 합쳐서 한 번에 번역 joined = delimiter.join(split_texts) try: translated_obj = self.gtranslate.translate(joined, "Korean") translated_text = getattr(translated_obj, "text", getattr(translated_obj, "result", joined)) results = translated_text.split(delimiter) # 만약 개수가 다르면 fallback if len(results) != len(split_texts): results = [getattr(self.gtranslate.translate(t, "Korean"), "text", t) for t in split_texts] # 다시 원래 텍스트 단위로 합치기 final_results = [] idx = 0 for count in split_indices: parts = results[idx:idx+count] final_results.append(' / '.join(parts)) idx += count return final_results except Exception as e: self.logger.log(f"구글 번역 실패: {e}", level=logging.ERROR, exc_info=True) return texts def opencv_inpaint(self, image_path, mask, method='telea', radius=3): """MIGAN 통일 이후 비활성화(호환용). 항상 None 반환""" return None def lama_inpaint(self, image_path, mask): """ PaddleHub lama 모델로 인페인팅을 수행합니다. Args: image_path (str): 원본 이미지 경로 mask (np.ndarray): 2D 마스크 이미지 (0/255, shape=(H, W)) Returns: inpainted_image (np.ndarray): 인페인팅된 이미지 (BGR) """ try: import paddlehub as hub except ImportError: self.logger.log("paddlehub 미설치: pip install paddlehub", level=logging.ERROR) return None image = cv2.imread(image_path) if image is None: self.logger.log(f"이미지 로드 실패: {image_path}", level=logging.ERROR) return None # 마스크 2D 체크 if mask is None: self.logger.log(f"마스크가 None입니다", level=logging.ERROR) return None if not isinstance(mask, np.ndarray) or mask.ndim != 2: self.logger.log(f"마스크가 2D numpy 배열이 아닙니다: type={type(mask)}, shape={getattr(mask, 'shape', 'N/A')}", level=logging.ERROR) return None if mask.shape != image.shape[:2]: self.logger.log(f"마스크 크기가 이미지와 다릅니다: mask={mask.shape}, image={image.shape[:2]}", level=logging.ERROR) return None try: # PaddleHub lama 인페인팅 모델 로드 (최초 1회 다운로드) if not hasattr(self, "_lama_module"): self._lama_module = hub.Module(name="lama") if self.logger: self.logger.log("lama 인페인팅 모델 로드됨") # 라마 모델 인페인팅 (마스크 0/255, uint8) result = self._lama_module.predict(images=[image], masks=[mask]) if not result or "inpainted" not in result[0]: self.logger.log("lama 인페인팅 결과 없음", level=logging.ERROR) return None inpainted = result[0]["inpainted"] # np.ndarray(BGR) return inpainted except Exception as e: self.logger.log(f"lama 인페인팅 중 예외 발생: {e}", level=logging.ERROR, exc_info=True) return None def save_debug_images(self, local_image_path, ocr_results, masks, index, file_prefix=""): """디버깅용 OCR 박스와 마스크 이미지를 저장합니다""" try: # OCR 박스 시각화 이미지 저장 ocr_debug_path = self.save_ocr_debug_image(local_image_path, ocr_results, index, file_prefix) # 마스크 시각화 이미지 저장 mask_debug_path = self.save_mask_debug_image(local_image_path, masks, index, file_prefix) self.logger.log(f"디버깅 이미지 저장 완료: OCR={ocr_debug_path}, Mask={mask_debug_path}", level=logging.DEBUG) return ocr_debug_path, mask_debug_path except Exception as e: self.logger.log(f"디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True) return None, None def save_ocr_debug_image(self, image_path, ocr_results, index, file_prefix=""): """OCR 감지된 박스들을 이미지에 표시하여 저장합니다""" try: # 원본 이미지 로드 image = cv2.imread(image_path) if image is None: self.logger.log(f"OCR 디버깅용 이미지 로드 실패: {image_path}", level=logging.ERROR) return None # 이미지 복사본 생성 debug_image = image.copy() # OCR 결과별로 박스 그리기 for i, result in enumerate(ocr_results): polygon = result.get('polygon', []) bbox = result.get('bbox', None) text = result.get('text', '') confidence = result.get('confidence', 0.0) # 신뢰도에 따른 색상 결정 if confidence >= 0.8: color = (0, 255, 0) # 초록 (높은 신뢰도) elif confidence >= 0.5: color = (0, 255, 255) # 노랑 (중간 신뢰도) elif confidence >= 0.2: color = (0, 165, 255) # 주황 (낮은 신뢰도) else: color = (0, 0, 255) # 빨강 (매우 낮은 신뢰도) if polygon and len(polygon) >= 3: # 폴리곤을 numpy 배열로 변환 (좌표를 int로 변환) pts = np.array([[int(x), int(y)] for x, y in polygon], np.int32) pts = pts.reshape((-1, 1, 2)) cv2.polylines(debug_image, [pts], True, color, 2) x, y = pts[0][0] elif bbox and len(bbox) == 4: try: x, y, w, h = [int(float(v)) for v in bbox] except Exception as e: self.logger.log(f"bbox 값 변환 오류: {bbox} ({e})", level=logging.ERROR) continue cv2.rectangle(debug_image, (x, y), (x + w, y + h), color, 2) else: continue # polygon, bbox 둘 다 없으면 skip label = f"{i+1}: {text[:10]}... ({confidence:.1%})" # 텍스트 배경 사각형 (text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) cv2.rectangle(debug_image, (x, y-text_height-5), (x+text_width, y), color, -1) # 텍스트 표시 cv2.putText(debug_image, label, (x, y-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) # 범례 추가 legend_y = 30 cv2.putText(debug_image, "OCR Detection Results:", (10, legend_y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) cv2.putText(debug_image, "Green: 80%+ Yellow: 50-80% Orange: 20-50% Red: <20%", (10, legend_y+25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) # 파일 저장 if file_prefix: debug_filename = f"debug_ocr_{file_prefix}_img_{index+1}.png" else: debug_filename = f"debug_ocr_img_{index+1}.png" debug_path = os.path.join(self.debugging_save_Dir, debug_filename) cv2.imwrite(debug_path, debug_image) self.logger.log(f"OCR 디버깅 이미지 저장: {debug_filename}", level=logging.DEBUG) return debug_path except Exception as e: self.logger.log(f"OCR 디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True) return None def save_mask_debug_image(self, image_path, masks, index, file_prefix=""): """생성된 마스크를 이미지에 오버레이하여 저장합니다""" try: # 원본 이미지 로드 image = cv2.imread(image_path) if image is None: self.logger.log(f"마스크 디버깅용 이미지 로드 실패: {image_path}", level=logging.ERROR) return None if masks is None or not isinstance(masks, np.ndarray): self.logger.log(f"유효하지 않은 마스크: {type(masks)}", level=logging.ERROR) return None # 이미지 복사본 생성 debug_image = image.copy() # 마스크 영역을 빨간색으로 오버레이 mask_colored = np.zeros_like(image) mask_colored[:, :, 2] = masks # 빨간색 채널에 마스크 적용 # 마스크 영역 반투명 오버레이 (알파 블렌딩) alpha = 0.3 overlay_mask = masks > 0 debug_image[overlay_mask] = cv2.addWeighted( debug_image[overlay_mask], 1-alpha, mask_colored[overlay_mask], alpha, 0 ) # 마스크 통계 정보 표시 total_pixels = masks.shape[0] * masks.shape[1] mask_pixels = np.sum(masks > 0) mask_percentage = (mask_pixels / total_pixels) * 100 if total_pixels > 0 else 0 # 정보 텍스트 표시 info_text = [ f"Mask Statistics:", f"Total pixels: {total_pixels:,}", f"Mask pixels: {mask_pixels:,}", f"Coverage: {mask_percentage:.1f}%" ] y_offset = 30 for i, text in enumerate(info_text): y_pos = y_offset + (i * 25) # 텍스트 배경 (text_width, text_height), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1) cv2.rectangle(debug_image, (10, y_pos-text_height-3), (10+text_width+10, y_pos+5), (0, 0, 0), -1) # 텍스트 cv2.putText(debug_image, text, (15, y_pos), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) # 파일 저장 if file_prefix: debug_filename = f"debug_mask_{file_prefix}_img_{index+1}.png" else: debug_filename = f"debug_mask_img_{index+1}.png" debug_path = os.path.join(self.debugging_save_Dir, debug_filename) cv2.imwrite(debug_path, debug_image) self.logger.log(f"마스크 디버깅 이미지 저장: {debug_filename} (마스크 커버리지: {mask_percentage:.1f}%)", level=logging.DEBUG) return debug_path except Exception as e: self.logger.log(f"마스크 디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True) return None def save_inpaint_debug_image(self, inpainted_image, index, file_prefix=""): """인페인팅 결과 이미지를 디버그용으로 저장합니다""" try: # import cv2 # import numpy as np # import os if inpainted_image is None: self.logger.log("인페인트 결과가 None이어서 디버그 저장을 건너뜁니다", level=logging.WARNING) return None if file_prefix: debug_filename = f"debug_inpaint_{file_prefix}_img_{index+1}.png" else: debug_filename = f"debug_inpaint_img_{index+1}.png" debug_path = os.path.join(self.debugging_save_Dir, debug_filename) # PIL -> ndarray 변환 if not isinstance(inpainted_image, np.ndarray): try: from PIL import Image as _Image if isinstance(inpainted_image, _Image.Image): inpainted_image = cv2.cvtColor(np.array(inpainted_image.convert("RGB")), cv2.COLOR_RGB2BGR) else: self.logger.log(f"지원하지 않는 인페인트 이미지 타입: {type(inpainted_image)}", level=logging.WARNING) return None except Exception: return None cv2.imwrite(debug_path, inpainted_image) self.logger.log(f"인페인트 디버그 이미지 저장: {debug_filename}", level=logging.DEBUG) return debug_path except Exception as e: self.logger.log(f"인페인트 디버그 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True) return None def _analyze_mask_stats(self, masks: np.ndarray) -> dict: """마스크의 커버리지/컴포넌트/최소 중심거리 비율을 계산합니다""" import numpy as _np import cv2 as _cv2 h, w = masks.shape[:2] total = max(1, h * w) mask_bin = masks if masks.ndim == 3: mask_bin = _cv2.cvtColor(masks, _cv2.COLOR_BGR2GRAY) if mask_bin.dtype != _np.uint8: mask_bin = mask_bin.astype(_np.uint8) _, mask_bin = _cv2.threshold(mask_bin, 0, 255, _cv2.THRESH_BINARY) coverage_ratio = float((_np.sum(mask_bin > 0)) / total) # 컴포넌트 분석 (외곽선 기반) try: contours, _ = _cv2.findContours(mask_bin, _cv2.RETR_EXTERNAL, _cv2.CHAIN_APPROX_SIMPLE) except ValueError: # OpenCV 버전 차이 호환 _, contours, _ = _cv2.findContours(mask_bin, _cv2.RETR_EXTERNAL, _cv2.CHAIN_APPROX_SIMPLE) centers = [] for cnt in contours: if cnt is None or len(cnt) < 3: continue m = _cv2.moments(cnt) if m['m00'] == 0: x, y, w_box, h_box = _cv2.boundingRect(cnt) cx, cy = x + w_box / 2.0, y + h_box / 2.0 else: cx, cy = m['m10'] / m['m00'], m['m01'] / m['m00'] centers.append((float(cx), float(cy))) # 최소 중심 거리 (정규화) min_dist = 0.0 if len(centers) >= 2: min_dist = min( ((_np.hypot(cx1 - cx2, cy1 - cy2)) for (cx1, cy1) in centers for (cx2, cy2) in centers if (cx1, cy1) != (cx2, cy2)) ) diag = float(_np.hypot(w, h)) min_dist_ratio = float(min_dist / diag) if diag > 0 else 0.0 return { 'coverage_ratio': coverage_ratio, 'component_count': len(centers), 'min_centroid_distance_ratio': min_dist_ratio, } def is_frozen(self): """ 실행 환경에 따라 배포환경인지 개발환경인지 확인하는 메서드. cx_Freeze로 패키징된 경우 실행 파일의 경로, 일반 Python 환경일 경우 __file__을 기준으로 설정. """ if getattr(sys, 'frozen', False): # 패키징된 경우 self.logger.log("배포환경", level=logging.DEBUG) return True else: # 일반 Python 실행 환경 self.logger.log("개발환경", level=logging.DEBUG) return False def safe_detect(self, img_path): try: if not hasattr(self, 'ocr_module') or self.ocr_module is None: self.logger.log("⚠️ ONNX OCR 모듈이 초기화되지 않았습니다. 재초기화를 시도합니다.", level=logging.WARNING) # OCR 모듈 재초기화 시도 try: if self.reset_ocr_module() and hasattr(self, 'ocr_module') and self.ocr_module is not None: self.logger.log("✅ ONNX OCR 모듈 재초기화 성공", level=logging.INFO) else: self.logger.log("❌ ONNX OCR 모듈 재초기화 실패, 빈 결과 반환", level=logging.ERROR) return [] except Exception as reset_error: self.logger.log(f"❌ ONNX OCR 모듈 재초기화 중 예외 발생: {reset_error}", level=logging.ERROR) return [] result = self.ocr_module.detect_text(img_path) # 빈 OCR 결과는 정상 케이스로 처리 if not result or len(result) == 0: self.logger.log(f"OCR 결과 없음 - 정상 케이스 (NO_TEXT): {img_path}", level=logging.INFO) return [] # 빈 리스트 반환 (정상 처리) return result except Exception as e: msg = str(e).lower() # 메모리 / primitive 관련 오류 → OCR 모듈 재초기화 후 1회 재시도 if any(err in msg for err in ["create a primitive", "memory object", "unable to allocate", "out of memory", "cv::outofmemoryerror"]): ok = self.reset_ocr_module() if ok and self.ocr_module is not None: # 재시도 시 실패하면 MemoryError를 재전파하여 상위에서 워커 재시작 트리거 return self.ocr_module.detect_text(img_path, raise_on_memory_error=True) # 그 외 예외는 그대로 상위로 전달 raise async def remove_background(self, original_image_url, file_prefix=""): """배경제거 전용 메서드 (썸네일 등 외부 호출용). 1. 이미지를 다운로드(또는 로컬 경로 사용) 2. Request_AI_Server.request_rembg 로 배경 제거 (흰 배경 중앙 배치 포함) 3. TEMP_IMAGE_DIR 하위에 저장 후 경로 반환 """ try: index = 0 # 기본값 (외부에서 필요 시 파일명 구분용) # 0. 이미지 URL 유효성 체크 if not original_image_url or not isinstance(original_image_url, str): self.logger.log("배경제거 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING) return {"status": "failed", "path": original_image_url, "error": "이미지 URL 오류"} # 1. 다운로드 또는 로컬 경로 확정 if original_image_url.startswith("http"): # 다운로드 재사용을 위해 기존 메서드 호출 local_path = self.download_image(image_url=original_image_url, index=0, file_prefix=file_prefix) if not local_path: return {"status": "failed", "path": original_image_url, "error": "다운로드 실패"} else: local_path = original_image_url # 이미 로컬 경로 # 2. 배경 제거 (np.ndarray 반환) removed = self.request_ai_server.request_rembg( local_path, use_local_rembg=self.use_local_rembg, local_model_name=self.local_model_name ) if removed is None: self.logger.log("RemoveBG 실패", level=logging.ERROR) return {"status": "failed", "path": local_path, "error": "RemoveBG 실패"} # 3. 저장 경로 결정 os.makedirs(self.TEMP_IMAGE_DIR, exist_ok=True) base_name = os.path.basename(local_path) name_no_ext, _ = os.path.splitext(base_name) save_name = f"nobg_{file_prefix}_{name_no_ext}.png" if file_prefix else f"nobg_{name_no_ext}.png" save_path = os.path.join(self.TEMP_IMAGE_DIR, save_name) # 4. 저장 (OpenCV → BGR) cv2.imwrite(save_path, removed) del removed removed = None self.logger.log(f"배경제거 이미지 저장: {save_path}", level=logging.DEBUG) # 5. OCR 검사 후 인페인팅 여부 결정 # ocr_results = self.ocr_module.detect_text(save_path) ocr_results = self.safe_detect(save_path) filter_ocr_results = self.filter_ocr_results(ocr_results) # 중국어 텍스트가 없으면 바로 반환 if self.ocr_module is None or not self.ocr_module.filter_chinese_text(filter_ocr_results): return {"status": "success", "path": save_path} # ---- 중국어 텍스트 존재: 인페인팅 준비 ---- ocr_count = len(filter_ocr_results) if ocr_count < 5: expansion_size, blur_size = 12, 15 elif ocr_count < 10: expansion_size, blur_size = 9, 12 elif ocr_count < 15: expansion_size, blur_size = 7, 9 elif ocr_count < 20: expansion_size, blur_size = 5, 6 else: expansion_size, blur_size = 10, 15 # 마스크 생성 masks = self.mask_module.create_masks( image_path=save_path, ocr_results=filter_ocr_results, mask_option="basic", expansion_size=expansion_size, blur_size=blur_size ) self.logger.log("배경제거 후 마스크 생성 완료", level=logging.DEBUG) # 인페인팅 수행: MIGAN으로 통일 self.inpaint_method = 'migan' inpainted_image = self._try_migan_inpaint(save_path, masks) # 인페인팅 실패 시 원본 반환 if inpainted_image is None: self.logger.log("인페인팅 실패, 배경제거 이미지를 그대로 반환", level=logging.WARNING) return {"status": "success", "path": save_path} # 인페인팅 결과 저장 inpaint_name = f"inpaint_{file_prefix}_{name_no_ext}.png" if file_prefix else f"inpaint_{name_no_ext}.png" inpaint_path = os.path.join(self.TEMP_IMAGE_DIR, inpaint_name) cv2.imwrite(inpaint_path, inpainted_image) self.logger.log(f"인페인팅 이미지 저장: {inpaint_path}", level=logging.DEBUG) return {"status": "success", "path": inpaint_path} except Exception as e: self.logger.log(f"remove_background 오류: {e}", level=logging.ERROR, exc_info=True) return {"status": "failed", "path": original_image_url, "error": str(e)} # async def remove_background(self, page, original_image_url, index, file_prefix="", **kwargs): # """ # 배경제거: 이미지 반환 → 후처리 및 저장 → 경로 반환 # """ # try: # # 0. 이미지 URL 유효성 체크 (http/https & 이미지 확장자) # if not original_image_url or not isinstance(original_image_url, str): # self.logger.log(f"이미지 {index+1} 처리 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING) # return {'status': 'failed', 'path': original_image_url, 'error': '이미지 URL 없음 또는 타입 오류'} # if not self.is_valid_image_path(original_image_url): # self.logger.log(f"이미지 {index+1} 처리 중단: 유효하지 않은 이미지 주소 - {original_image_url}", level=logging.WARNING) # return {'status': 'failed', 'path': original_image_url, 'error': '유효하지 않은 이미지 주소'} # # 1. 이미지 다운로드 # local_image_path = await self.download_image(page, original_image_url, index, file_prefix) # if not local_image_path: # self.logger.log(f"이미지 {index+1} 다운로드 실패, 원본 URL 반환", level=logging.WARNING) # return {'status': 'failed', 'path': original_image_url, 'error': '다운로드 실패'} # self.logger.log(f"이미지 {index+1} 로컬 저장위치: {local_image_path}", level=logging.DEBUG) # # 1. 배경제거 수행(이미지 반환) # removed_img = self.background_removal_module.remove_background( # local_image_path, **kwargs # ) # if removed_img is None: # self.logger.log(f"배경제거 실패: {local_image_path}", level=40) # return {'status': 'failed', 'path': local_image_path, 'error': '배경제거 실패'} # if self.toggle_states.get("remove_background_white", True): # img_result_white = self.background_removal_module.to_white_background(removed_img) # else: # img_result_white = removed_img # # 2. 저장 경로 생성 # if file_prefix: # save_path = os.path.join(self.TEMP_IMAGE_DIR, f"nobg_{file_prefix}_img_{index+1}.png") # else: # save_path = os.path.join(self.TEMP_IMAGE_DIR, f"nobg_img_{index+1}.png") # # 3. 워터마크 등 후처리 및 저장 # # 워터마크 등 추가하려면 아래처럼 # # removed_img = self.postImageManager.add_watermark(image_data=removed_img, watermark_text=...) # final_path = self.postImageManager.save_image_to_path(img_result_white, save_path) # if final_path: # self.logger.log(f"배경제거 이미지 저장됨: {final_path}") # return {'status': 'success', 'path': final_path} # else: # self.logger.log(f"배경제거 후 저장 실패: {save_path}", level=40) # return {'status': 'failed', 'path': save_path, 'error': '저장 실패'} # except Exception as e: # self.logger.log(f"배경제거 중 오류: {e}", level=40, exc_info=True) # return {'status': 'failed', 'path': local_image_path, 'error': str(e)} # def remove_background_with_ppmatting(image_path, output_path='output_foreground.png', alpha_path='output_alpha.png'): # """ # PaddleHub의 ppmatting을 이용한 배경제거(투명배경 PNG) 함수입니다. # Args: # image_path (str): 입력 이미지 경로 # output_path (str): 결과 투명 배경 PNG 저장 경로 # alpha_path (str): 알파(마스크) 이미지 저장 경로 # Returns: # foreground (np.ndarray): 알파채널이 포함된 전경 PNG 이미지 (BGRA) # alpha (np.ndarray): 추출된 알파 마스크 # """ # import paddlehub as hub # import cv2 # import numpy as np # # 1. 이미지 로드 (OpenCV는 BGR) # img = cv2.imread(image_path) # if img is None: # print(f"이미지를 불러올 수 없습니다: {image_path}") # return None, None # # 2. BGR → RGB (ppmatting은 RGB 입력) # img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # # 3. ppmatting 모델 로드 (최초 1회 다운로드됨) # matting = hub.Module(name='ppmatting-hrnet-1x') # # 4. 예측 수행 # results = matting.predict(images=[img_rgb]) # 결과는 리스트 # # 5. 알파(투명도) 마스크 추출 (float, 0~1) # alpha = results[0]['alpha'] # # 6. 알파 마스크를 0~255 uint8로 변환 (이미지 저장용) # alpha_img = (alpha * 255).astype(np.uint8) # # 7. 원본 이미지를 BGRA(투명도 포함)로 변환 # foreground = cv2.cvtColor(img, cv2.COLOR_BGR2BGRA) # foreground[..., 3] = alpha_img # 알파 채널 추가 # # 8. 결과 이미지 저장 # cv2.imwrite(output_path, foreground) # cv2.imwrite(alpha_path, alpha_img) # print(f"배경 제거 PNG 저장: {output_path}") # print(f"알파 마스크 저장: {alpha_path}") # return foreground, alpha_img # ------------------------------------------------------------------ # 고해상도 이미지 다운스케일 유틸리티 (메모리 절감용) # ------------------------------------------------------------------ def downscale_image_if_large(self, image_path, max_dim=1200): """이미지가 너무 클 경우 다운스케일링""" try: image = cv2.imread(image_path) if image is None: return image_path h, w = image.shape[:2] if max(h, w) <= max_dim: # 원본 크기가 허용 범위 내면 그대로 반환 del image # 명시적 해제 return image_path # 비율 유지하며 다운스케일링 if h > w: new_h, new_w = max_dim, int(w * max_dim / h) else: new_h, new_w = int(h * max_dim / w), max_dim # .copy() 사용으로 뷰 문제 해결 resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA).copy() # 원본 이미지 명시적 해제 del image # 임시 파일로 저장 temp_path = image_path.replace('.jpg', '_temp.jpg').replace('.png', '_temp.png') cv2.imwrite(temp_path, resized) # 리사이즈된 이미지 명시적 해제 del resized return temp_path except Exception as e: self.logger.log(f"이미지 다운스케일링 실패: {e}", level=logging.ERROR, exc_info=True) # 에러 발생 시 원본 경로 반환 return image_path finally: # 안전장치: 남은 이미지 객체 정리 try: if 'image' in locals(): del image if 'resized' in locals(): del resized except: pass async def preprocess_detail_image(self, image_path: str, index: int, target_width: int = 860, max_height: int = 3000) -> str: """ 상세페이지 이미지 전처리: 가로 860px 통일, 세로 3000px 초과시 분할 Args: image_path: 원본 이미지 경로 index: 이미지 인덱스 target_width: 목표 가로 크기 (기본 860px) max_height: 최대 세로 크기 (기본 3000px) Returns: str: 전처리된 이미지 경로 (분할된 경우 첫 번째 이미지) """ try: # 원본 이미지 로드 image = cv2.imread(image_path) if image is None: self.logger.log(f"상세페이지 이미지 로드 실패: {image_path}", level=logging.ERROR) return image_path orig_h, orig_w = image.shape[:2] self.logger.log(f"상세페이지 이미지 {index+1} 원본 크기: {orig_w}x{orig_h}", level=logging.DEBUG) # 1단계: 가로를 860px로 리사이즈 (비율 유지) if orig_w != target_width: scale_factor = target_width / orig_w new_height = int(orig_h * scale_factor) resized_image = cv2.resize(image, (target_width, new_height), interpolation=cv2.INTER_LANCZOS4) self.logger.log(f"상세페이지 이미지 {index+1} 가로 크기 조정: {orig_w}x{orig_h} → {target_width}x{new_height}", level=logging.DEBUG) else: resized_image = image new_height = orig_h # 2단계: 세로가 3000px 초과하는지 확인 if new_height <= max_height: # 분할 불필요 - 리사이즈된 이미지 저장 output_path = image_path.replace('.jpg', '_resized.jpg').replace('.png', '_resized.png') cv2.imwrite(output_path, resized_image) self.logger.log(f"상세페이지 이미지 {index+1} 크기 조정 완료: {target_width}x{new_height}", level=logging.INFO) return output_path # 3단계: 분할 필요 - 3000px 단위로 분할 split_count = (new_height + max_height - 1) // max_height # 올림 계산 split_paths = [] for i in range(split_count): start_y = i * max_height end_y = min((i + 1) * max_height, new_height) split_height = end_y - start_y # 이미지 분할 split_image = resized_image[start_y:end_y, :, :] # 분할된 이미지 저장 base_name = os.path.splitext(os.path.basename(image_path))[0] extension = os.path.splitext(image_path)[1] split_filename = f"{base_name}_split_{i+1}{extension}" split_path = os.path.join(os.path.dirname(image_path), split_filename) cv2.imwrite(split_path, split_image) split_paths.append(split_path) self.logger.log(f"상세페이지 이미지 {index+1} 분할 {i+1}/{split_count}: {target_width}x{split_height} → {split_path}", level=logging.DEBUG) self.logger.log(f"상세페이지 이미지 {index+1} 분할 완료: {split_count}개 파일 생성", level=logging.INFO) # OCR 정보 수집을 위해 분할된 모든 이미지 경로 저장 self._store_split_image_paths(index, split_paths) # 첫 번째 분할 이미지 반환 (process_single_image는 하나의 이미지만 처리) return split_paths[0] if split_paths else image_path except Exception as e: self.logger.log(f"상세페이지 이미지 {index+1} 전처리 중 오류: {e}", level=logging.ERROR, exc_info=True) return image_path finally: # 메모리 정리 try: if 'image' in locals(): del image if 'resized_image' in locals(): del resized_image if 'split_image' in locals(): del split_image except: pass def _store_split_image_paths(self, index: int, split_paths: list): """분할된 이미지 경로들을 저장 (나중에 OCR 정보 수집용)""" if not hasattr(self, '_detail_split_images'): self._detail_split_images = {} self._detail_split_images[index] = split_paths self.logger.log(f"이미지 {index+1} 분할 경로 저장: {len(split_paths)}개", level=logging.DEBUG) async def collect_and_store_ocr_data(self, image_url: str, image_path: str, ocr_results: list, index: int): """ 상세페이지 OCR 결과에서 상품 정보를 수집하고 저장 Args: image_url: 원본 이미지 URL image_path: 로컬 이미지 경로 ocr_results: OCR 감지 결과 index: 이미지 인덱스 """ try: if not ocr_results: return # OCR 결과에서 텍스트만 추출 extracted_texts = [result.get('text', '').strip() for result in ocr_results if result.get('text', '').strip()] if not extracted_texts: return # 데이터베이스에 저장 (단순화됨) await self._save_ocr_data_to_db(image_url, image_path, extracted_texts, index) self.logger.log(f"이미지 {index+1} OCR raw 데이터 수집 완료: {len(extracted_texts)}개 텍스트", level=logging.DEBUG) except Exception as e: self.logger.log(f"OCR 정보 수집 중 오류: {e}", level=logging.ERROR, exc_info=True) # _classify_product_texts 함수는 단순화로 인해 비활성화됨 (raw 데이터만 저장) async def _save_ocr_data_to_db(self, image_url: str, image_path: str, texts: list, index: int): """ OCR raw 데이터를 SQLite 데이터베이스에 저장 Args: image_url: 원본 이미지 URL image_path: 로컬 이미지 경로 texts: raw OCR 텍스트 리스트 index: 이미지 인덱스 """ try: import sqlite3 import json import os from datetime import datetime # 데이터베이스 파일 경로 설정 db_path = os.path.join(self.base_dir, "user_data", "product_ocr_data.db") os.makedirs(os.path.dirname(db_path), exist_ok=True) # 데이터베이스 연결 및 테이블 생성 conn = sqlite3.connect(db_path) cursor = conn.cursor() # 테이블 생성 (단순화된 버전) cursor.execute(''' CREATE TABLE IF NOT EXISTS ocr_raw_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, image_url TEXT NOT NULL, image_path TEXT, image_index INTEGER, raw_texts TEXT, text_count INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 데이터 삽입 (단순화됨) cursor.execute(''' INSERT INTO ocr_raw_data (image_url, image_path, image_index, raw_texts, text_count) VALUES (?, ?, ?, ?, ?) ''', ( image_url, image_path, index, json.dumps(texts, ensure_ascii=False), len(texts) )) conn.commit() conn.close() self.logger.log(f"이미지 {index+1} OCR 데이터 DB 저장 완료: {db_path}", level=logging.DEBUG) except Exception as e: self.logger.log(f"OCR 데이터 DB 저장 중 오류: {e}", level=logging.ERROR, exc_info=True) async def collect_ocr_data_in_memory(self, image_url: str, image_path: str, ocr_results: list, index: int): """ OCR 데이터를 메모리에만 저장 (현재 세션용) - 전처리 포함 Args: image_url: 원본 이미지 URL image_path: 로컬 이미지 경로 ocr_results: OCR 감지 결과 index: 이미지 인덱스 """ try: if not ocr_results: return # OCR 결과에서 텍스트만 추출 extracted_texts = [result.get('text', '').strip() for result in ocr_results if result.get('text', '').strip()] if not extracted_texts: return # 단순 OCR 텍스트 저장 (raw 데이터 그대로 사용) filtered_texts = extracted_texts # 메모리에 저장 if not hasattr(self, '_session_ocr_data'): self._session_ocr_data = [] self._session_ocr_data.append({ 'image_url': image_url, 'image_path': image_path, 'image_index': index, 'raw_texts': filtered_texts, 'text_count': len(filtered_texts) }) self.logger.log(f"이미지 {index+1} OCR raw 데이터 메모리 저장 완료: {len(filtered_texts)}개 텍스트", level=logging.DEBUG) except Exception as e: self.logger.log(f"OCR 정보 메모리 저장 중 오류: {e}", level=logging.ERROR, exc_info=True) # 복잡한 전처리 함수들은 제거됨 (단순 raw 데이터 저장 방식으로 변경) def get_session_ocr_data(self) -> list: """현재 세션의 OCR 데이터 반환""" return getattr(self, '_session_ocr_data', []) def clear_session_ocr_data(self): """현재 세션의 OCR 데이터 초기화""" if hasattr(self, '_session_ocr_data'): del self._session_ocr_data self.logger.log("세션 OCR 데이터 초기화 완료", level=logging.DEBUG)