import os import asyncio import aiofiles import logging from urllib.parse import urlparse import sys import re import cv2 # OpenCV 의 내부 최적화(메모리 풀) 사용을 비활성화하여 파편화 위험을 낮춤 cv2.setUseOptimized(False) import random import gc import numpy as np from PIL import Image # from src.modules.easyocr_module import EasyOCREngine from ocr_module import OCRModule from mask_module_for_paddle import MaskModule # from src.modules.mask_module_for_easy import MaskModule_easy from text_rendering_module import TextRenderingModule from postImageManager import PostImageManager # from translatepy.translators.google import GoogleTranslate # from src.modules.background_removal_module import BackgroundRemovalModule # from src.modules.background_removal_module_pp import PPMattingBackgroundRemovalModule # (변경) from src.modules.request_inpaint import Request_AI_Server class ImageProcessor3: """이미지 다운로드, OCR, 번역 처리를 담당하는 클래스""" def __init__(self, logger, page, toggle_states, unwanted_words, authenticated_by_admin, base_dir, papago_translator): self.logger = logger self.page = page self.base_dir = base_dir self.toggle_states = toggle_states self.unwanted_texts = unwanted_words self.authenticated_by_admin = authenticated_by_admin self.papago_translator = papago_translator self.inpaint_method = 'cv' try: self.request_inpainting_server_url = self.toggle_states.get("request_inpainting_server_url", None) if self.request_inpainting_server_url is None: self.logger.log(f"request_inpainting_server_url 설정되지 않았습니다.", level=logging.ERROR) self.inpaint_method = 'cv' else: self.inpaint_method = 'request' self.font_path = self.toggle_states.get('image_font_path', os.path.join(self.base_dir, "HakgyoansimDunggeunmisoTTFB.ttf")) self.TEMP_IMAGE_DIR = self.toggle_states.get('TEMP_IMAGE_DIR', "") self.logger.log(f"self.font_path: {self.font_path}", level=logging.DEBUG) self.logger.log(f"self.TEMP_IMAGE_DIR: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG) self.logger.log(f"self.unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG) # ----------------------------- 메모리 파편화 완화 ----------------------------- # Pillow 가 거대 이미지를 열 때 과도한 메모리를 점유하지 않도록 최대 픽셀 수 제한 max_px = self.toggle_states.get("max_image_pixels", 20_000_000) # 약 20MP, 필요 시 조정 (20MP = 4500x4500, 50MP=8000x6000) Image.MAX_IMAGE_PIXELS = max_px self.logger.log(f"Image.MAX_IMAGE_PIXELS set to {max_px}", level=logging.DEBUG) # ----------------------------------------------------------------------------- # self.ocr_module = EasyOCREngine(logger=self.logger, base_dir=self.base_dir, lang_list=['ch_sim']) self.ocr_module = OCRModule(logger=self.logger, base_dir=self.base_dir) self.mask_module = MaskModule(logger=self.logger, base_dir=self.base_dir) self.text_rendering_module = TextRenderingModule(logger=self.logger, font_path=self.font_path) self.postImageManager = PostImageManager(logger=self.logger, toggle_states=self.toggle_states) # self.background_removal_module = BackgroundRemovalModule(logger=self.logger, default_model="birefnet-general") # self.background_removal_module = PPMattingBackgroundRemovalModule(logger=self.logger, default_model="ppmatting-hrnet-1x") # self.background_removal_module = PPMattingBackgroundRemovalModule(logger=self.logger) self.request_rembg_server_url = self.toggle_states.get("request_rembg_server_url", None) # self.request_rembg_server_url = self.toggle_states.get("request_rembg_server_url_local", None) if self.is_frozen(): self.request_rembg_server_url = self.toggle_states.get("request_rembg_server_url_local", None) self.request_ai_server = Request_AI_Server(logger=self.logger, inpaint_server_url=self.request_inpainting_server_url, rembg_server_url=self.request_rembg_server_url) # self.gtranslate = GoogleTranslate() except Exception as e: self.logger.log(f"ImageProcessor3 초기화 중 오류 발생: {e}", level=logging.ERROR, exc_info=True) def update_page(self, page1, toggle_states): self.toggle_states = toggle_states self.page = page1 self.postImageManager.update_toggle_states(self.toggle_states) self.logger.log(f"page객체 및 toggle_states 업데이트", level=logging.DEBUG) def update_unwanted_texts(self, texts): self.unwanted_texts = texts self.logger.log(f"unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG) def __del__(self): """소멸자에서 리소스 정리""" self.cleanup() def cleanup(self): """리소스 정리""" try: # 임시 폴더 삭제 if hasattr(self, 'TEMP_IMAGE_DIR') and os.path.exists(self.TEMP_IMAGE_DIR): # shutil.rmtree(self.TEMP_IMAGE_DIR) self.logger.log(f"임시 폴더 삭제됨: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG) except Exception as e: self.logger.log(f"리소스 정리 중 오류: {e}", level=logging.ERROR, exc_info=True) def reset_ocr_module(self): """OCR 모듈을 명시적으로 삭제하고 재생성.""" try: self.logger.log("🔄 OCR 모듈 재초기화 시작", level=logging.INFO) # 기존 모듈 참조 해제 del self.ocr_module gc.collect() # paddle.device.cuda.empty_cache() # GPU 사용 시 # 새 인스턴스 생성 self.ocr_module = OCRModule(logger=self.logger, base_dir=self.base_dir) self._ocr_call_count = 0 self.logger.log("✅ OCR 모듈 재초기화 완료", level=logging.INFO) return True except Exception as e: self.logger.log(f"❌ OCR 모듈 재초기화 실패: {e}", level=logging.ERROR, exc_info=True) return False def is_valid_image_path(self, path): # http/https 또는 로컬 파일(.jpg, .png 등) 모두 허용 if re.match(r'^(http|https)://.*\.(jpg|jpeg|png|bmp|gif|webp|tiff?)(\?.*)?$', path, re.IGNORECASE): return True if os.path.isfile(path) and path.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tif', '.tiff')): return True return False async def process_single_image(self, page, original_image_url, index, delay=1.0, file_prefix=""): """ 단일 이미지를 처리합니다 (다운로드 -> OCR -> 인페인팅) Args: page: Playwright 페이지 객체 original_image_url (str): 처리할 이미지 URL index (int): 이미지 인덱스 is_localServer (bool): 로컬 서버 사용 여부 delay (float): 요청 간격 (초) file_prefix (str): 파일명에 추가할 접두사 (예: "detail", "option") Returns: dict: 처리 결과를 포함한 딕셔너리 - status: 'inpainted', 'original', 'exclude', 'error' 중 하나 - path: 처리된 이미지 파일 경로 또는 원본 이미지 파일 경로 - error: 오류 메시지 (status가 'error'인 경우에만 포함) """ local_image_path = None delay = random.uniform(0.5, 1.5) delay = delay / 3 # 봇 탐지 회피를 위해 요청 간격 조절 - 자체번역으로 간격 최소화 self.logger.log(f"unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG) try: # 0. 이미지 URL 유효성 체크 (http/https & 이미지 확장자) if not original_image_url or not isinstance(original_image_url, str): self.logger.log(f"이미지 {index+1} 처리 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING) return {'status': 'failed', 'path': original_image_url, 'error': '이미지 URL 없음 또는 타입 오류'} if not self.is_valid_image_path(original_image_url): self.logger.log(f"이미지 {index+1} 처리 중단: 유효하지 않은 이미지 주소 - {original_image_url}", level=logging.WARNING) return {'status': 'failed', 'path': original_image_url, 'error': '유효하지 않은 이미지 주소'} # 요청 간격 조절 (봇 탐지 회피) if delay > 0: await asyncio.sleep(delay) # OCR 권한 상태 로그 ocr_enabled = self.toggle_states.get('ocr', False) processing_mode = "OCR+인페인팅 모드" if ocr_enabled else "전체 번역 모드" self.logger.log(f"이미지 {index+1} 처리 시작: {original_image_url} - {processing_mode}", level=logging.DEBUG) # 1. 이미지 다운로드 local_image_path = await self.download_image(page, original_image_url, index, file_prefix) if not local_image_path: self.logger.log(f"이미지 {index+1} 다운로드 실패, 원본 URL 반환", level=logging.WARNING) return {'status': 'failed', 'path': original_image_url, 'error': '다운로드 실패'} # 1-A. 고해상도 입력 다운스케일 (메모리 절약) # toggle_states 에 max_image_resolution(예: 1200) 값이 있으면 사용, 없으면 1200px 기준 max_dim = self.toggle_states.get('max_image_resolution', 1200) local_image_path = self.downscale_image_if_large(local_image_path, max_dim=max_dim) self.logger.log(f"이미지 {index+1} 로컬 저장위치(스케일 처리후): {local_image_path}", level=logging.DEBUG) # 2. OCR 텍스트 감지 ocr_results = self.ocr_module.detect_text(local_image_path) self.logger.log(f"ocr_results: {ocr_results}", level=logging.DEBUG) filter_ocr_results = self.filter_ocr_results(ocr_results) self.logger.log(f"filter_ocr_results: {filter_ocr_results}", level=logging.DEBUG) ocr_count = len(filter_ocr_results) # 3. 중국어 텍스트가 없는 경우 원본 이미지 반환으로 번역 패스 if not self.ocr_module.filter_chinese_text(filter_ocr_results): self.logger.log(f"이미지 {index+1} 중국어 텍스트 없음, 원본 이미지 반환", level=logging.DEBUG) return {'status': 'original', 'path': local_image_path} # 4. 한글 텍스트가 존재하는 경우 원본 이미지 반환으로 번역 패스 if self.ocr_module.filter_korean_text(filter_ocr_results): self.logger.log(f"이미지 {index+1} 한글 텍스트 존재, 원본 이미지 반환", level=logging.DEBUG) return {'status': 'original', 'path': local_image_path} # 4. 텍스트 번역 translated_texts = await self.batch_papago_translate_texts(filter_ocr_results) self.logger.log(f"translated_texts: {translated_texts}", level=logging.DEBUG) # 5. OCR 권한에 따른 텍스트 필터링 if ocr_enabled: filtered_translated_texts = self.process_translated_texts(translated_texts, local_image_path, index) if not filtered_translated_texts: self.logger.log(f"이미지 {index+1} 제외됨", level=logging.DEBUG) return {'status': 'exclude', 'path': local_image_path} else: self.logger.log(f"이미지 {index+1} 치환됨", level=logging.DEBUG) else: # OCR 권한이 없으면 번역된 텍스트를 그대로 사용 filtered_translated_texts = translated_texts self.logger.log(f"이미지 {index+1} OCR 권한 없음, 전체 번역 모드", level=logging.DEBUG) if ocr_count < 5: expansion_size = 12 blur_size = 15 elif ocr_count < 10: expansion_size = 9 blur_size = 12 elif ocr_count < 15: expansion_size = 7 blur_size = 9 elif ocr_count < 20: expansion_size = 5 blur_size = 6 else: expansion_size = 10 blur_size = 15 # 마스크 생성 (basic 방식만 사용) masks = self.mask_module.create_masks( image_path=local_image_path, ocr_results=filter_ocr_results, mask_option="basic", expansion_size=expansion_size, blur_size=blur_size ) self.logger.log(f"마스크 생성 완료", level=logging.DEBUG) # if not self.is_frozen(): # # 디버깅 이미지 저장 (OCR 박스 + 마스크 시각화) # self.save_debug_images(local_image_path, filter_ocr_results, masks, index, file_prefix) # 인페인팅 # is_member_valid = self.toggle_states.get('membership_level', 'basic') == 'premium' or self.toggle_states.get('membership_level', 'basic') == 'vip' is_member_valid = self.toggle_states.get('membership_level', 'basic') == 'vip' or self.authenticated_by_admin self.logger.log(f"ocr_count: {ocr_count}", level=logging.DEBUG) self.logger.log(f"is_member_valid: {is_member_valid}", level=logging.DEBUG) self.logger.log(f"inpaint_method: {self.inpaint_method}", level=logging.DEBUG) if self.inpaint_method == 'request' and is_member_valid and ocr_count > 8: self.logger.log(f"Request 인페인팅 요청", level=logging.DEBUG) inpainted_image = self.request_ai_server.request_inpaint(local_image_path, masks) if inpainted_image is None: self.logger.log(f"Request 인페인팅 실패, opencv 인페인팅으로 대체", level=logging.WARNING) inpainted_image = self.opencv_inpaint(local_image_path, masks, method='telea', radius=3) # inpainted_image = self.lama_inpaint(local_image_path, masks) else: self.logger.log(f"자체 인페인팅 실행", level=logging.DEBUG) inpainted_image = self.opencv_inpaint(local_image_path, masks, method='telea', radius=3) # inpainted_image = self.lama_inpaint(local_image_path, masks) self.logger.log(f"인페인팅 완료", level=logging.DEBUG) # 인페인팅 실패 시 원본 이미지 사용 if inpainted_image is None: self.logger.log(f"인페인팅 실패, 원본 이미지 사용", level=logging.WARNING) inpainted_image = cv2.imread(local_image_path) if inpainted_image is None: self.logger.log(f"원본 이미지 로드 실패: {local_image_path}", level=logging.ERROR) return {'status': 'failed', 'path': local_image_path, 'error': '원본 이미지 로드 실패'} # 텍스트 렌더링 text_rendered_image = self.text_rendering_module.render_text( inpainted_image, filter_ocr_results, filtered_translated_texts) self.logger.log(f"텍스트 렌더링 완료", level=logging.DEBUG) # 결과 저장 translated_img_path = await self.postProcess_and_save_image(local_image_path, text_rendered_image, index, file_prefix) self.logger.log(f"이미지 {index+1} 번역 완료: {translated_img_path}", level=logging.DEBUG) return {'status': 'translated', 'path': translated_img_path} except Exception as e: self.logger.log(f"이미지 {index+1} 처리 중 오류: {e}", level=logging.ERROR, exc_info=True) return {'status': 'failed', 'path': local_image_path or original_image_url, 'error': str(e)} finally: # ---- 메모리 해제 ---- try: ocr_results = None filter_ocr_results = None translated_texts = None masks = None inpainted_image = None text_rendered_image = None except Exception: pass # download_image 단계에서 사용한 page, local_image_path 도 참조 제거 page = None local_image_path = None # 최종 GC 강제 실행 gc.collect() # OCR 결과 필터링: 중국어 텍스트만 필터링 def filter_ocr_results(self, ocr_results): import re # 중국어 문자 범위 정규식 (한자) chinese_pattern = re.compile(r'[\u4e00-\u9fff]+') filtered_results = [] for r in ocr_results: text = r.get('text', '').strip() polygon = r.get('polygon', []) confidence = r.get('confidence', 0.0) # 텍스트가 비어있거나 polygon이 3점 미만이면 제외 if not text or not polygon or len(polygon) < 3: self.logger.log(f"[필터링] 제외 (텍스트/폴리곤): '{text}'", level=logging.DEBUG) continue # 신뢰도 20% 미만이면 제외 if confidence < 0.05: self.logger.log(f"[필터링] 제외 (신뢰도 {confidence:.1%}): '{text}'", level=logging.DEBUG) continue # 중국어 문자가 포함된 텍스트만 필터링 if chinese_pattern.search(text): filtered_results.append(r) self.logger.log(f"[필터링] 포함 (신뢰도 {confidence:.1%}): '{text}'", level=logging.DEBUG) else: self.logger.log(f"[필터링] 제외 (중국어 없음): '{text}'", level=logging.DEBUG) self.logger.log(f"필터링 결과: {len(filtered_results)}/{len(ocr_results)}개 (신뢰도 + & 중국어)", level=logging.INFO) return filtered_results async def postProcess_and_save_image(self, local_image_path, text_rendered_image, index, file_prefix=""): """로컬 서버 URL을 사용해 이미지를 번역하고 로컬에 저장합니다""" try: # 파일명에 접두사 포함 if file_prefix: img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_{file_prefix}_img_{index+1}.png") else: img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_img_{index+1}.png") watermark_text=self.toggle_states.get("watermark_text", "이미지 저작권 보유") is_watermark_enabled = watermark_text != "" or self.toggle_states.get("watermark", False) self.logger.log(f"watermark_text: {watermark_text}", level=logging.DEBUG) self.logger.log(f"is_watermark_enabled: {is_watermark_enabled}", level=logging.DEBUG) # file_prefix가 'detail' 또는 'option'일 때만 워터마크 추가 if is_watermark_enabled and file_prefix in ["detail"]: image_data_to_save = self.postImageManager.add_watermark( image_data=text_rendered_image, watermark_text=watermark_text ) else: # np.ndarray라면 PIL.Image로 변환 if isinstance(text_rendered_image, np.ndarray): image_data_to_save = Image.fromarray(cv2.cvtColor(text_rendered_image, cv2.COLOR_BGR2RGB)) else: image_data_to_save = text_rendered_image final_image_path = self.postImageManager.save_image_to_path(image_data_to_save, img_path) return final_image_path except Exception as e: self.logger.log(f"이미지 {index+1} 번역 처리 중 오류: {e}", level=logging.ERROR, exc_info=True) return local_image_path def download_image(self, image_url, index, file_prefix="", max_retries=3): """Requests를 사용해 이미지를 다운로드합니다""" import requests import time import random # 로컬 파일 경로면 바로 반환 if os.path.isfile(image_url): self.logger.log(f"로컬 파일 경로 감지, 다운로드 생략: {image_url}", level=logging.DEBUG) return image_url # 로컬 파일 경로가 아니면 다운로드 시도 try: # "https://assets.alicdn.com"으로 시작하는 URL은 건너뛰기 if image_url.startswith("https://assets.alicdn.com"): self.logger.log(f"다운로드 제외 URL: {image_url}", level=logging.DEBUG) return None # URL에서 파일명 추출 및 접두사 포함 parsed_url = urlparse(image_url) base_filename = f"image_{index:03d}_{os.path.basename(parsed_url.path)}" if not base_filename.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')): base_filename += '.jpg' # 접두사가 있으면 파일명에 포함 if file_prefix: filename = f"{file_prefix}_{base_filename}" else: filename = base_filename local_path = os.path.join(self.TEMP_IMAGE_DIR, filename) # HTTP 헤더 설정 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate, br", "DNT": "1", # Do Not Track 요청 헤더 "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Cache-Control": "max-age=0" } retries = 0 while retries < max_retries: try: self.logger.log(f"이미지 다운로드 중: {filename}", level=logging.DEBUG) response = requests.get(image_url, headers=headers, stream=True, timeout=30) if response.status_code == 200: image_data = response.content # 이미지 데이터 유효성 검사 if self.is_valid_image_data(image_data): with open(local_path, 'wb') as f: f.write(image_data) self.logger.log(f"이미지 다운로드 완료: {filename}", level=logging.DEBUG) return local_path else: self.logger.log(f"유효하지 않은 이미지 데이터: {image_url}", level=logging.WARNING) return None else: self.logger.log(f"이미지 다운로드 실패 (HTTP {response.status_code}): {image_url}. 재시도 {retries + 1}/{max_retries}", level=logging.ERROR) retries += 1 if retries < max_retries: time.sleep(random.randint(2, 5)) # 2~5초 대기 후 재시도 except requests.exceptions.RequestException as e: self.logger.log(f"이미지 다운로드 중 네트워크 오류: {e}. 재시도 {retries + 1}/{max_retries}", level=logging.ERROR) retries += 1 if retries < max_retries: time.sleep(random.randint(2, 5)) # 예외 발생 시 대기 후 재시도 except Exception as e: self.logger.log(f"이미지 다운로드 중 예상치 못한 오류: {e}. 재시도 {retries + 1}/{max_retries}", level=logging.ERROR) retries += 1 if retries < max_retries: time.sleep(random.randint(2, 5)) self.logger.log(f"이미지 다운로드 최대 재시도 횟수를 초과했습니다: {image_url}", level=logging.ERROR) return None except Exception as e: self.logger.log(f"이미지 다운로드 중 오류: {e}", level=logging.ERROR, exc_info=True) return None def is_valid_image_data(self, image_data): """이미지 데이터가 유효한지 확인합니다""" if not image_data or len(image_data) < 100: return False # 일반적인 이미지 파일 시그니처 확인 image_signatures = [ b'\xFF\xD8\xFF', # JPEG b'\x89PNG\r\n\x1a\n', # PNG b'GIF87a', # GIF87a b'GIF89a', # GIF89a b'RIFF', # WebP (RIFF 컨테이너) ] return any(image_data.startswith(sig) for sig in image_signatures) def process_translated_texts(self, translated_texts, local_image_path, index): """ 번역된 단어 리스트(translated_texts)에서 unwanted_texts의 원본값이 앞이나 뒤에 포함되면 치환값으로 바꿉니다. 치환값이 '이미지삭제'라면 None 반환(이미지 제외) """ new_texts = [] for i, text in enumerate(translated_texts): self.logger.log(f"[치환 처리 {i+1}] 원본 텍스트: '{text}'", level=logging.DEBUG) # 텍스트를 빈칸으로 분리 words = text.split() self.logger.log(f"[치환 처리 {i+1}] 분리된 단어: {words}", level=logging.DEBUG) processed_words = [] text_replaced = False for word_idx, word in enumerate(words): word_replaced = False # unwanted_texts와 매칭 확인 for origin, replace in self.unwanted_texts.items(): if word.startswith(origin) or word.endswith(origin) or word == origin: self.logger.log(f"[치환 처리 {i+1}] 단어 '{word}' -> '{replace}' (원본: '{origin}')", level=logging.INFO) if replace == "이미지삭제": self.logger.log(f"이미지 {index+1} 제외됨: {local_image_path}", level=logging.INFO) return None # 단어 치환 if word == origin: new_word = replace elif word.startswith(origin): new_word = replace + word[len(origin):] elif word.endswith(origin): new_word = word[:-len(origin)] + replace processed_words.append(new_word) word_replaced = True text_replaced = True self.logger.log(f"[치환 처리 {i+1}] 단어 치환 완료: '{word}' -> '{new_word}'", level=logging.DEBUG) break if not word_replaced: processed_words.append(word) # 처리된 단어들을 다시 합치기 final_text = ' '.join(processed_words) new_texts.append(final_text) if text_replaced: self.logger.log(f"[치환 처리 {i+1}] 최종 결과: '{text}' -> '{final_text}'", level=logging.INFO) else: self.logger.log(f"[치환 처리 {i+1}] 변경 없음: '{text}'", level=logging.DEBUG) self.logger.log(f"전체 치환 결과: {len(new_texts)}개 텍스트 처리 완료", level=logging.INFO) for i, (original, processed) in enumerate(zip(translated_texts, new_texts)): if original != processed: self.logger.log(f"[최종 치환 {i+1}] '{original}' -> '{processed}'", level=logging.INFO) return new_texts async def batch_papago_translate_texts(self, ocr_results, delimiter='\n'): """ ocr_results에서 추출한 텍스트 리스트를 줄바꿈으로 합쳐 한 번에 파파고로 번역 후, 다시 분리하여 반환합니다. 각 텍스트 내에 여러 구분자(/, |, ·, , 등)가 있을 경우에도 분리하여 번역 후 다시 합칩니다. """ import re texts = [result['text'] for result in ocr_results if result['text'].strip()] if not texts: return [] # 각 텍스트를 내부적으로 split (/, |, ·, , 등) 후 다시 합침 split_texts = [] split_indices = [] # 각 텍스트가 몇 개의 파트로 쪼개졌는지 기록 for text in texts: parts = re.split(r'\s*[/|·,;、]+\s*', text) parts = [p.strip() for p in parts if p.strip()] split_texts.extend(parts) split_indices.append(len(parts)) # Papago에 한 번에 번역 요청 joined = delimiter.join(split_texts) try: # 이미 한 번에 여러 줄을 받아서 리스트로 반환! translated_lines = await self.papago_translator.translate(joined, source_lang="zh", target_lang="ko") # Papago가 리스트로 반환한다고 가정! results = translated_lines # 개수가 다르면 fallback(선택사항, 아래는 로깅만) if len(results) != len(split_texts): self.logger.log( f"파파고 번역 줄 개수 불일치: {len(results)} != {len(split_texts)}", level=logging.WARNING ) # 여기서 fallback 처리 또는 그냥 results를 그대로 사용 # 다시 원래 텍스트 단위로 합치기 final_results = [] idx = 0 for count in split_indices: parts = results[idx:idx+count] final_results.append(' / '.join(parts)) idx += count return final_results except Exception as e: self.logger.log(f"파파고 번역 실패: {e}", level=logging.ERROR, exc_info=True) return texts # def batch_google_translate_texts(self, ocr_results, delimiter='\n'): # """ # ocr_results에서 추출한 텍스트 리스트를 줄바꿈으로 합쳐 한 번에 구글 번역기로 번역 후, 다시 분리하여 반환합니다. # 각 텍스트 내에 여러 구분자(/, |, ·, , 등)가 있을 경우에도 분리하여 번역 후 다시 합칩니다. # """ # import re # texts = [result['text'] for result in ocr_results if result['text'].strip()] # if not texts: # return [] # # 각 텍스트를 내부적으로 split (/, |, ·, , 등) 후 다시 합침 # split_texts = [] # split_indices = [] # 각 텍스트가 몇 개의 파트로 쪼개졌는지 기록 # for text in texts: # parts = re.split(r'\s*[/|·,;、]+\s*', text) # parts = [p.strip() for p in parts if p.strip()] # split_texts.extend(parts) # split_indices.append(len(parts)) # # 합쳐서 한 번에 번역 # joined = delimiter.join(split_texts) # try: # translated_obj = self.gtranslate.translate(joined, "Korean") # translated_text = getattr(translated_obj, "text", getattr(translated_obj, "result", joined)) # results = translated_text.split(delimiter) # # 만약 개수가 다르면 fallback # if len(results) != len(split_texts): # results = [getattr(self.gtranslate.translate(t, "Korean"), "text", t) for t in split_texts] # # 다시 원래 텍스트 단위로 합치기 # final_results = [] # idx = 0 # for count in split_indices: # parts = results[idx:idx+count] # final_results.append(' / '.join(parts)) # idx += count # return final_results # except Exception as e: # self.logger.log(f"구글 번역 실패: {e}", level=logging.ERROR, exc_info=True) # return texts def opencv_inpaint(self, image_path, mask, method='telea', radius=3): """ OpenCV로 인페인팅을 수행합니다. Args: image_path (str): 원본 이미지 경로 mask (np.ndarray): 2D 마스크 이미지 (0/255, shape=(H, W)) method (str): 'telea' 또는 'ns' radius (int): 인페인팅 반경 Returns: inpainted_image (np.ndarray): 인페인팅된 이미지 """ import cv2 import numpy as np image = cv2.imread(image_path) if image is None: self.logger.log(f"이미지 로드 실패: {image_path}", level=logging.ERROR) return None # 마스크가 2D 배열인지 확인 if mask is None: self.logger.log(f"마스크가 None입니다", level=logging.ERROR) return None if not isinstance(mask, np.ndarray) or mask.ndim != 2: self.logger.log(f"마스크가 2D numpy 배열이 아닙니다: type={type(mask)}, shape={getattr(mask, 'shape', 'N/A')}", level=logging.ERROR) return None # 마스크 크기가 이미지와 일치하는지 확인 if mask.shape != image.shape[:2]: self.logger.log(f"마스크 크기가 이미지와 다릅니다: mask={mask.shape}, image={image.shape[:2]}", level=logging.ERROR) return None inpaint_method = cv2.INPAINT_TELEA if method == 'telea' else cv2.INPAINT_NS inpainted = cv2.inpaint(image, mask, radius, inpaint_method) return inpainted def lama_inpaint(self, image_path, mask): """ PaddleHub lama 모델로 인페인팅을 수행합니다. Args: image_path (str): 원본 이미지 경로 mask (np.ndarray): 2D 마스크 이미지 (0/255, shape=(H, W)) Returns: inpainted_image (np.ndarray): 인페인팅된 이미지 (BGR) """ import cv2 import numpy as np import logging try: import paddlehub as hub except ImportError: self.logger.log("paddlehub 미설치: pip install paddlehub", level=logging.ERROR) return None image = cv2.imread(image_path) if image is None: self.logger.log(f"이미지 로드 실패: {image_path}", level=logging.ERROR) return None # 마스크 2D 체크 if mask is None: self.logger.log(f"마스크가 None입니다", level=logging.ERROR) return None if not isinstance(mask, np.ndarray) or mask.ndim != 2: self.logger.log(f"마스크가 2D numpy 배열이 아닙니다: type={type(mask)}, shape={getattr(mask, 'shape', 'N/A')}", level=logging.ERROR) return None if mask.shape != image.shape[:2]: self.logger.log(f"마스크 크기가 이미지와 다릅니다: mask={mask.shape}, image={image.shape[:2]}", level=logging.ERROR) return None try: # PaddleHub lama 인페인팅 모델 로드 (최초 1회 다운로드) if not hasattr(self, "_lama_module"): self._lama_module = hub.Module(name="lama") if self.logger: self.logger.log("lama 인페인팅 모델 로드됨") # 라마 모델 인페인팅 (마스크 0/255, uint8) result = self._lama_module.predict(images=[image], masks=[mask]) if not result or "inpainted" not in result[0]: self.logger.log("lama 인페인팅 결과 없음", level=logging.ERROR) return None inpainted = result[0]["inpainted"] # np.ndarray(BGR) return inpainted except Exception as e: self.logger.log(f"lama 인페인팅 중 예외 발생: {e}", level=logging.ERROR, exc_info=True) return None def save_debug_images(self, local_image_path, ocr_results, masks, index, file_prefix=""): """디버깅용 OCR 박스와 마스크 이미지를 저장합니다""" try: # OCR 박스 시각화 이미지 저장 ocr_debug_path = self.save_ocr_debug_image(local_image_path, ocr_results, index, file_prefix) # 마스크 시각화 이미지 저장 mask_debug_path = self.save_mask_debug_image(local_image_path, masks, index, file_prefix) self.logger.log(f"디버깅 이미지 저장 완료: OCR={ocr_debug_path}, Mask={mask_debug_path}", level=logging.INFO) return ocr_debug_path, mask_debug_path except Exception as e: self.logger.log(f"디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True) return None, None def save_ocr_debug_image(self, image_path, ocr_results, index, file_prefix=""): """OCR 감지된 박스들을 이미지에 표시하여 저장합니다""" try: import cv2 import numpy as np import os # 원본 이미지 로드 image = cv2.imread(image_path) if image is None: self.logger.log(f"OCR 디버깅용 이미지 로드 실패: {image_path}", level=logging.ERROR) return None # 이미지 복사본 생성 debug_image = image.copy() # OCR 결과별로 박스 그리기 for i, result in enumerate(ocr_results): polygon = result.get('polygon', []) bbox = result.get('bbox', None) text = result.get('text', '') confidence = result.get('confidence', 0.0) # 신뢰도에 따른 색상 결정 if confidence >= 0.8: color = (0, 255, 0) # 초록 (높은 신뢰도) elif confidence >= 0.5: color = (0, 255, 255) # 노랑 (중간 신뢰도) elif confidence >= 0.2: color = (0, 165, 255) # 주황 (낮은 신뢰도) else: color = (0, 0, 255) # 빨강 (매우 낮은 신뢰도) if polygon and len(polygon) >= 3: # 폴리곤을 numpy 배열로 변환 (좌표를 int로 변환) pts = np.array([[int(x), int(y)] for x, y in polygon], np.int32) pts = pts.reshape((-1, 1, 2)) cv2.polylines(debug_image, [pts], True, color, 2) x, y = pts[0][0] elif bbox and len(bbox) == 4: try: x, y, w, h = [int(float(v)) for v in bbox] except Exception as e: self.logger.log(f"bbox 값 변환 오류: {bbox} ({e})", level=logging.ERROR) continue cv2.rectangle(debug_image, (x, y), (x + w, y + h), color, 2) else: continue # polygon, bbox 둘 다 없으면 skip label = f"{i+1}: {text[:10]}... ({confidence:.1%})" # 텍스트 배경 사각형 (text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) cv2.rectangle(debug_image, (x, y-text_height-5), (x+text_width, y), color, -1) # 텍스트 표시 cv2.putText(debug_image, label, (x, y-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) # 범례 추가 legend_y = 30 cv2.putText(debug_image, "OCR Detection Results:", (10, legend_y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) cv2.putText(debug_image, "Green: 80%+ Yellow: 50-80% Orange: 20-50% Red: <20%", (10, legend_y+25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) # 파일 저장 if file_prefix: debug_filename = f"debug_ocr_{file_prefix}_img_{index+1}.png" else: debug_filename = f"debug_ocr_img_{index+1}.png" debug_path = os.path.join(self.TEMP_IMAGE_DIR, debug_filename) cv2.imwrite(debug_path, debug_image) self.logger.log(f"OCR 디버깅 이미지 저장: {debug_filename}", level=logging.DEBUG) return debug_path except Exception as e: self.logger.log(f"OCR 디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True) return None def save_mask_debug_image(self, image_path, masks, index, file_prefix=""): """생성된 마스크를 이미지에 오버레이하여 저장합니다""" try: import cv2 import numpy as np # 원본 이미지 로드 image = cv2.imread(image_path) if image is None: self.logger.log(f"마스크 디버깅용 이미지 로드 실패: {image_path}", level=logging.ERROR) return None if masks is None or not isinstance(masks, np.ndarray): self.logger.log(f"유효하지 않은 마스크: {type(masks)}", level=logging.ERROR) return None # 이미지 복사본 생성 debug_image = image.copy() # 마스크 영역을 빨간색으로 오버레이 mask_colored = np.zeros_like(image) mask_colored[:, :, 2] = masks # 빨간색 채널에 마스크 적용 # 마스크 영역 반투명 오버레이 (알파 블렌딩) alpha = 0.3 overlay_mask = masks > 0 debug_image[overlay_mask] = cv2.addWeighted( debug_image[overlay_mask], 1-alpha, mask_colored[overlay_mask], alpha, 0 ) # 마스크 통계 정보 표시 total_pixels = masks.shape[0] * masks.shape[1] mask_pixels = np.sum(masks > 0) mask_percentage = (mask_pixels / total_pixels) * 100 if total_pixels > 0 else 0 # 정보 텍스트 표시 info_text = [ f"Mask Statistics:", f"Total pixels: {total_pixels:,}", f"Mask pixels: {mask_pixels:,}", f"Coverage: {mask_percentage:.1f}%" ] y_offset = 30 for i, text in enumerate(info_text): y_pos = y_offset + (i * 25) # 텍스트 배경 (text_width, text_height), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1) cv2.rectangle(debug_image, (10, y_pos-text_height-3), (10+text_width+10, y_pos+5), (0, 0, 0), -1) # 텍스트 cv2.putText(debug_image, text, (15, y_pos), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) # 파일 저장 if file_prefix: debug_filename = f"debug_mask_{file_prefix}_img_{index+1}.png" else: debug_filename = f"debug_mask_img_{index+1}.png" debug_path = os.path.join(self.TEMP_IMAGE_DIR, debug_filename) cv2.imwrite(debug_path, debug_image) self.logger.log(f"마스크 디버깅 이미지 저장: {debug_filename} (마스크 커버리지: {mask_percentage:.1f}%)", level=logging.DEBUG) return debug_path except Exception as e: self.logger.log(f"마스크 디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True) return None def is_frozen(self): """ 실행 환경에 따라 배포환경인지 개발환경인지 확인하는 메서드. cx_Freeze로 패키징된 경우 실행 파일의 경로, 일반 Python 환경일 경우 __file__을 기준으로 설정. """ if getattr(sys, 'frozen', False): # 패키징된 경우 self.logger.log("배포환경", level=logging.DEBUG) return True else: # 일반 Python 실행 환경 self.logger.log("개발환경", level=logging.DEBUG) return False async def remove_background(self, page, original_image_url, file_prefix=""): """배경제거 전용 메서드 (썸네일 등 외부 호출용). 1. 이미지를 다운로드(또는 로컬 경로 사용) 2. Request_AI_Server.request_rembg 로 배경 제거 (흰 배경 중앙 배치 포함) 3. TEMP_IMAGE_DIR 하위에 저장 후 경로 반환 """ try: index = 0 # 기본값 (외부에서 필요 시 파일명 구분용) # 0. 이미지 URL 유효성 체크 if not original_image_url or not isinstance(original_image_url, str): self.logger.log("배경제거 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING) return {"status": "failed", "path": original_image_url, "error": "이미지 URL 오류"} # 1. 다운로드 또는 로컬 경로 확정 if original_image_url.startswith("http"): # 다운로드 재사용을 위해 기존 메서드 호출 # local_path = await self.download_image(page=page, image_url=original_image_url, index=0, file_prefix=file_prefix) local_path = await self.download_image(image_url=original_image_url, index=0, file_prefix=file_prefix) if not local_path: return {"status": "failed", "path": original_image_url, "error": "다운로드 실패"} else: local_path = original_image_url # 이미 로컬 경로 # 2. 배경 제거 (np.ndarray 반환) removed = self.request_ai_server.request_rembg(local_path) if removed is None: self.logger.log("RemoveBG 실패", level=logging.ERROR) return {"status": "failed", "path": local_path, "error": "RemoveBG 실패"} # 3. 저장 경로 결정 os.makedirs(self.TEMP_IMAGE_DIR, exist_ok=True) base_name = os.path.basename(local_path) name_no_ext, _ = os.path.splitext(base_name) save_name = f"nobg_{file_prefix}_{name_no_ext}.png" if file_prefix else f"nobg_{name_no_ext}.png" save_path = os.path.join(self.TEMP_IMAGE_DIR, save_name) # 4. 저장 (OpenCV → BGR) cv2.imwrite(save_path, removed) self.logger.log(f"배경제거 이미지 저장: {save_path}", level=logging.INFO) return {"status": "success", "path": save_path} except Exception as e: self.logger.log(f"remove_background 오류: {e}", level=logging.ERROR, exc_info=True) return {"status": "failed", "path": original_image_url, "error": str(e)} # async def remove_background(self, page, original_image_url, index, file_prefix="", **kwargs): # """ # 배경제거: 이미지 반환 → 후처리 및 저장 → 경로 반환 # """ # try: # # 0. 이미지 URL 유효성 체크 (http/https & 이미지 확장자) # if not original_image_url or not isinstance(original_image_url, str): # self.logger.log(f"이미지 {index+1} 처리 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING) # return {'status': 'failed', 'path': original_image_url, 'error': '이미지 URL 없음 또는 타입 오류'} # if not self.is_valid_image_path(original_image_url): # self.logger.log(f"이미지 {index+1} 처리 중단: 유효하지 않은 이미지 주소 - {original_image_url}", level=logging.WARNING) # return {'status': 'failed', 'path': original_image_url, 'error': '유효하지 않은 이미지 주소'} # # 1. 이미지 다운로드 # local_image_path = await self.download_image(page, original_image_url, index, file_prefix) # if not local_image_path: # self.logger.log(f"이미지 {index+1} 다운로드 실패, 원본 URL 반환", level=logging.WARNING) # return {'status': 'failed', 'path': original_image_url, 'error': '다운로드 실패'} # self.logger.log(f"이미지 {index+1} 로컬 저장위치: {local_image_path}", level=logging.DEBUG) # # 1. 배경제거 수행(이미지 반환) # removed_img = self.background_removal_module.remove_background( # local_image_path, **kwargs # ) # if removed_img is None: # self.logger.log(f"배경제거 실패: {local_image_path}", level=40) # return {'status': 'failed', 'path': local_image_path, 'error': '배경제거 실패'} # if self.toggle_states.get("remove_background_white", True): # img_result_white = self.background_removal_module.to_white_background(removed_img) # else: # img_result_white = removed_img # # 2. 저장 경로 생성 # if file_prefix: # save_path = os.path.join(self.TEMP_IMAGE_DIR, f"nobg_{file_prefix}_img_{index+1}.png") # else: # save_path = os.path.join(self.TEMP_IMAGE_DIR, f"nobg_img_{index+1}.png") # # 3. 워터마크 등 후처리 및 저장 # # 워터마크 등 추가하려면 아래처럼 # # removed_img = self.postImageManager.add_watermark(image_data=removed_img, watermark_text=...) # final_path = self.postImageManager.save_image_to_path(img_result_white, save_path) # if final_path: # self.logger.log(f"배경제거 이미지 저장됨: {final_path}") # return {'status': 'success', 'path': final_path} # else: # self.logger.log(f"배경제거 후 저장 실패: {save_path}", level=40) # return {'status': 'failed', 'path': save_path, 'error': '저장 실패'} # except Exception as e: # self.logger.log(f"배경제거 중 오류: {e}", level=40, exc_info=True) # return {'status': 'failed', 'path': local_image_path, 'error': str(e)} # def remove_background_with_ppmatting(image_path, output_path='output_foreground.png', alpha_path='output_alpha.png'): # """ # PaddleHub의 ppmatting을 이용한 배경제거(투명배경 PNG) 함수입니다. # Args: # image_path (str): 입력 이미지 경로 # output_path (str): 결과 투명 배경 PNG 저장 경로 # alpha_path (str): 알파(마스크) 이미지 저장 경로 # Returns: # foreground (np.ndarray): 알파채널이 포함된 전경 PNG 이미지 (BGRA) # alpha (np.ndarray): 추출된 알파 마스크 # """ # import paddlehub as hub # import cv2 # import numpy as np # # 1. 이미지 로드 (OpenCV는 BGR) # img = cv2.imread(image_path) # if img is None: # print(f"이미지를 불러올 수 없습니다: {image_path}") # return None, None # # 2. BGR → RGB (ppmatting은 RGB 입력) # img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # # 3. ppmatting 모델 로드 (최초 1회 다운로드됨) # matting = hub.Module(name='ppmatting-hrnet-1x') # # 4. 예측 수행 # results = matting.predict(images=[img_rgb]) # 결과는 리스트 # # 5. 알파(투명도) 마스크 추출 (float, 0~1) # alpha = results[0]['alpha'] # # 6. 알파 마스크를 0~255 uint8로 변환 (이미지 저장용) # alpha_img = (alpha * 255).astype(np.uint8) # # 7. 원본 이미지를 BGRA(투명도 포함)로 변환 # foreground = cv2.cvtColor(img, cv2.COLOR_BGR2BGRA) # foreground[..., 3] = alpha_img # 알파 채널 추가 # # 8. 결과 이미지 저장 # cv2.imwrite(output_path, foreground) # cv2.imwrite(alpha_path, alpha_img) # print(f"배경 제거 PNG 저장: {output_path}") # print(f"알파 마스크 저장: {alpha_path}") # return foreground, alpha_img # ------------------------------------------------------------------ # 고해상도 이미지 다운스케일 유틸리티 (메모리 절감용) # ------------------------------------------------------------------ def downscale_image_if_large(self, image_path, max_dim=2048): """주어진 이미지가 max_dim 픽셀을 초과하면 축소하여 같은 경로에 저장하고 경로를 반환합니다""" try: with Image.open(image_path) as img: width, height = img.size if max(width, height) <= max_dim: return image_path # 변경 없음 scale = float(max_dim) / float(max(width, height)) new_size = (int(width * scale), int(height * scale)) self.logger.log( f"고해상도({width}x{height}) -> {new_size}로 리사이즈 후 저장", level=logging.INFO) resized = img.resize(new_size, Image.LANCZOS) # JPG, PNG 등에 관계없이 원본 확장자를 유지하여 덮어쓰기 resized.save(image_path) return image_path except Exception as e: self.logger.log(f"다운스케일 실패: {e}", level=logging.WARNING) return image_path