290 lines
14 KiB
Python
290 lines
14 KiB
Python
import os
|
|
import asyncio
|
|
import requests
|
|
import time
|
|
import logging
|
|
from urllib.parse import urlparse
|
|
import sys
|
|
import cv2
|
|
# OpenCV 의 내부 최적화(메모리 풀) 사용을 비활성화하여 파편화 위험을 낮춤
|
|
cv2.setUseOptimized(False)
|
|
from PIL import Image
|
|
|
|
from src.modules.request_inpaint import Request_AI_Server
|
|
|
|
class ImageProcessor3:
|
|
"""이미지 다운로드, OCR, 번역 처리를 담당하는 클래스"""
|
|
|
|
def __init__(self, logger, page, toggle_states, unwanted_words, authenticated_by_admin, base_dir, papago_translator):
|
|
self.logger = logger
|
|
self.page = page
|
|
self.base_dir = base_dir
|
|
self.toggle_states = toggle_states
|
|
self.unwanted_texts = unwanted_words
|
|
self.authenticated_by_admin = authenticated_by_admin
|
|
|
|
self.logger.log(f"ImageProcessor4 Init toggle_states: {self.toggle_states}", level=logging.DEBUG)
|
|
|
|
self.papago_translator = papago_translator
|
|
|
|
self.inpaint_method = 'cv'
|
|
try:
|
|
self.request_inpainting_server_url = self.toggle_states.get("request_inpainting_server_url", None)
|
|
if self.request_inpainting_server_url is None:
|
|
self.logger.log(f"request_inpainting_server_url 설정되지 않았습니다.", level=logging.ERROR)
|
|
self.inpaint_method = 'cv'
|
|
else:
|
|
self.inpaint_method = 'request'
|
|
|
|
self.font_path = self.toggle_states.get('image_font_path', os.path.join(self.base_dir, "HakgyoansimDunggeunmisoTTFB.ttf"))
|
|
|
|
self.TEMP_IMAGE_DIR = self.toggle_states.get('TEMP_IMAGE_DIR', "")
|
|
|
|
self.use_local_rembg = self.toggle_states.get("use_local_rembg", False)
|
|
self.local_model_name = self.toggle_states.get("local_model_name", 'birefnet-general-lite')
|
|
|
|
self.request_rembg_server_url = self.toggle_states.get("request_rembg_server_url", None)
|
|
# self.request_rembg_server_url = self.toggle_states.get("request_rembg_server_url_local", None)
|
|
|
|
if not self.is_frozen():
|
|
self.request_rembg_server_url = self.toggle_states.get("request_rembg_server_url_local", None)
|
|
|
|
self.request_ai_server = Request_AI_Server(logger=self.logger, inpaint_server_url=self.request_inpainting_server_url, rembg_server_url=self.request_rembg_server_url)
|
|
|
|
|
|
self.logger.log(f"self.font_path: {self.font_path}", level=logging.DEBUG)
|
|
|
|
self.logger.log(f"self.TEMP_IMAGE_DIR: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG)
|
|
|
|
self.logger.log(f"self.unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG)
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"ImageProcessor3 초기화 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
|
|
|
def update_toggle_states(self, toggle_states):
|
|
self.toggle_states = toggle_states
|
|
self.postImageManager.update_toggle_states(self.toggle_states)
|
|
self.logger.log(f"이미지 프로세서 toggle_states 업데이트 : {self.toggle_states}", level=logging.DEBUG)
|
|
|
|
def update_unwanted_texts(self, texts):
|
|
self.unwanted_texts = texts
|
|
self.logger.log(f"이미지프로세서 unwanted_texts: {self.unwanted_texts}", level=logging.DEBUG)
|
|
|
|
def __del__(self):
|
|
"""소멸자에서 리소스 정리"""
|
|
self.cleanup()
|
|
self.logger.log("이미지 프로세서 소멸", level=logging.DEBUG)
|
|
def cleanup(self):
|
|
"""리소스 정리"""
|
|
try:
|
|
|
|
# 임시 폴더 삭제
|
|
if hasattr(self, 'TEMP_IMAGE_DIR') and os.path.exists(self.TEMP_IMAGE_DIR):
|
|
# shutil.rmtree(self.TEMP_IMAGE_DIR)
|
|
self.logger.log(f"임시 폴더 삭제됨: {self.TEMP_IMAGE_DIR}", level=logging.DEBUG)
|
|
except Exception as e:
|
|
self.logger.log(f"리소스 정리 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
|
|
|
|
|
|
|
def opencv_inpaint(self, image_path, mask, method='telea', radius=3):
|
|
"""
|
|
OpenCV로 인페인팅을 수행합니다.
|
|
Args:
|
|
image_path (str): 원본 이미지 경로
|
|
mask (np.ndarray): 2D 마스크 이미지 (0/255, shape=(H, W))
|
|
method (str): 'telea' 또는 'ns'
|
|
radius (int): 인페인팅 반경
|
|
Returns:
|
|
inpainted_image (np.ndarray): 인페인팅된 이미지
|
|
"""
|
|
import cv2
|
|
import numpy as np
|
|
image = cv2.imread(image_path)
|
|
if image is None:
|
|
self.logger.log(f"이미지 로드 실패: {image_path}", level=logging.ERROR)
|
|
return None
|
|
|
|
# 마스크가 2D 배열인지 확인
|
|
if mask is None:
|
|
self.logger.log(f"마스크가 None입니다", level=logging.ERROR)
|
|
return None
|
|
|
|
if not isinstance(mask, np.ndarray) or mask.ndim != 2:
|
|
self.logger.log(f"마스크가 2D numpy 배열이 아닙니다: type={type(mask)}, shape={getattr(mask, 'shape', 'N/A')}", level=logging.ERROR)
|
|
return None
|
|
|
|
# 마스크 크기가 이미지와 일치하는지 확인
|
|
if mask.shape != image.shape[:2]:
|
|
self.logger.log(f"마스크 크기가 이미지와 다릅니다: mask={mask.shape}, image={image.shape[:2]}", level=logging.ERROR)
|
|
return None
|
|
|
|
inpaint_method = cv2.INPAINT_TELEA if method == 'telea' else cv2.INPAINT_NS
|
|
inpainted = cv2.inpaint(image, mask, radius, inpaint_method)
|
|
return inpainted
|
|
|
|
|
|
def save_debug_images(self, local_image_path, ocr_results, masks, index, file_prefix=""):
|
|
"""디버깅용 OCR 박스와 마스크 이미지를 저장합니다"""
|
|
try:
|
|
# OCR 박스 시각화 이미지 저장
|
|
ocr_debug_path = self.save_ocr_debug_image(local_image_path, ocr_results, index, file_prefix)
|
|
|
|
# 마스크 시각화 이미지 저장
|
|
mask_debug_path = self.save_mask_debug_image(local_image_path, masks, index, file_prefix)
|
|
|
|
self.logger.log(f"디버깅 이미지 저장 완료: OCR={ocr_debug_path}, Mask={mask_debug_path}", level=logging.INFO)
|
|
return ocr_debug_path, mask_debug_path
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"디버깅 이미지 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
|
return None, None
|
|
|
|
|
|
def is_frozen(self):
|
|
"""
|
|
실행 환경에 따라 배포환경인지 개발환경인지 확인하는 메서드.
|
|
cx_Freeze로 패키징된 경우 실행 파일의 경로, 일반 Python 환경일 경우 __file__을 기준으로 설정.
|
|
"""
|
|
if getattr(sys, 'frozen', False): # 패키징된 경우
|
|
self.logger.log("배포환경", level=logging.DEBUG)
|
|
return True
|
|
else: # 일반 Python 실행 환경
|
|
self.logger.log("개발환경", level=logging.DEBUG)
|
|
return False
|
|
|
|
|
|
|
|
async def remove_background(self, original_image_url, file_prefix=""):
|
|
"""배경제거 전용 메서드 (썸네일 등 외부 호출용).
|
|
|
|
1. 이미지를 다운로드(또는 로컬 경로 사용)
|
|
2. Request_AI_Server.request_rembg 로 배경 제거 (흰 배경 중앙 배치 포함)
|
|
3. TEMP_IMAGE_DIR 하위에 저장 후 경로 반환
|
|
"""
|
|
try:
|
|
index = 0 # 기본값 (외부에서 필요 시 파일명 구분용)
|
|
|
|
# 0. 이미지 URL 유효성 체크
|
|
if not original_image_url or not isinstance(original_image_url, str):
|
|
self.logger.log("배경제거 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING)
|
|
return {"status": "failed", "path": original_image_url, "error": "이미지 URL 오류"}
|
|
|
|
# 1. 다운로드 또는 로컬 경로 확정
|
|
if original_image_url.startswith("http"):
|
|
# 다운로드 재사용을 위해 기존 메서드 호출
|
|
local_path = self.download_image(image_url=original_image_url, index=0, file_prefix=file_prefix)
|
|
if not local_path:
|
|
return {"status": "failed", "path": original_image_url, "error": "다운로드 실패"}
|
|
else:
|
|
local_path = original_image_url # 이미 로컬 경로
|
|
|
|
# 2. 배경 제거 (np.ndarray 반환)
|
|
removed = self.request_ai_server.request_rembg(local_path, use_local_rembg=self.use_local_rembg, local_model_name=self.local_model_name)
|
|
if removed is None:
|
|
self.logger.log("RemoveBG 실패", level=logging.ERROR)
|
|
return {"status": "failed", "path": local_path, "error": "RemoveBG 실패"}
|
|
|
|
# 3. 저장 경로 결정
|
|
os.makedirs(self.TEMP_IMAGE_DIR, exist_ok=True)
|
|
base_name = os.path.basename(local_path)
|
|
name_no_ext, _ = os.path.splitext(base_name)
|
|
save_name = f"nobg_{file_prefix}_{name_no_ext}.png" if file_prefix else f"nobg_{name_no_ext}.png"
|
|
save_path = os.path.join(self.TEMP_IMAGE_DIR, save_name)
|
|
|
|
# 4. 저장 (OpenCV → BGR)
|
|
cv2.imwrite(save_path, removed)
|
|
del removed
|
|
removed = None
|
|
self.logger.log(f"배경제거 이미지 저장: {save_path}", level=logging.INFO)
|
|
|
|
# 5. OCR 검사 후 인페인팅 여부 결정
|
|
# ocr_results = self.ocr_module.detect_text(save_path)
|
|
ocr_results = self.safe_detect(save_path)
|
|
|
|
filter_ocr_results = self.filter_ocr_results(ocr_results)
|
|
|
|
# 중국어 텍스트가 없으면 바로 반환
|
|
if not self.ocr_module.filter_chinese_text(filter_ocr_results):
|
|
return {"status": "success", "path": save_path}
|
|
|
|
# ---- 중국어 텍스트 존재: 인페인팅 준비 ----
|
|
ocr_count = len(filter_ocr_results)
|
|
if ocr_count < 5:
|
|
expansion_size, blur_size = 12, 15
|
|
elif ocr_count < 10:
|
|
expansion_size, blur_size = 9, 12
|
|
elif ocr_count < 15:
|
|
expansion_size, blur_size = 7, 9
|
|
elif ocr_count < 20:
|
|
expansion_size, blur_size = 5, 6
|
|
else:
|
|
expansion_size, blur_size = 10, 15
|
|
|
|
# 마스크 생성
|
|
masks = self.mask_module.create_masks(
|
|
image_path=save_path,
|
|
ocr_results=filter_ocr_results,
|
|
mask_option="basic",
|
|
expansion_size=expansion_size,
|
|
blur_size=blur_size
|
|
)
|
|
self.logger.log("배경제거 후 마스크 생성 완료", level=logging.DEBUG)
|
|
|
|
# 인페인팅 수행 (OpenCV Telea)
|
|
inpainted_image = self.opencv_inpaint(save_path, masks, method='telea', radius=3)
|
|
|
|
# 인페인팅 실패 시 원본 반환
|
|
if inpainted_image is None:
|
|
self.logger.log("인페인팅 실패, 배경제거 이미지를 그대로 반환", level=logging.WARNING)
|
|
return {"status": "success", "path": save_path}
|
|
|
|
# 인페인팅 결과 저장
|
|
inpaint_name = f"inpaint_{file_prefix}_{name_no_ext}.png" if file_prefix else f"inpaint_{name_no_ext}.png"
|
|
inpaint_path = os.path.join(self.TEMP_IMAGE_DIR, inpaint_name)
|
|
cv2.imwrite(inpaint_path, inpainted_image)
|
|
self.logger.log(f"인페인팅 이미지 저장: {inpaint_path}", level=logging.INFO)
|
|
return {"status": "success", "path": inpaint_path}
|
|
|
|
except Exception as e:
|
|
self.logger.log(f"remove_background 오류: {e}", level=logging.ERROR, exc_info=True)
|
|
return {"status": "failed", "path": original_image_url, "error": str(e)}
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# 고해상도 이미지 다운스케일 유틸리티 (메모리 절감용)
|
|
# ------------------------------------------------------------------
|
|
def downscale_image_if_large(self, image_path, max_dim=2048):
|
|
"""주어진 이미지가 max_dim 픽셀을 초과하면 축소하여 같은 경로에 저장하고 경로를 반환합니다"""
|
|
try:
|
|
with Image.open(image_path) as img:
|
|
width, height = img.size
|
|
if max(width, height) <= max_dim:
|
|
return image_path # 변경 없음
|
|
|
|
scale = float(max_dim) / float(max(width, height))
|
|
new_size = (int(width * scale), int(height * scale))
|
|
|
|
self.logger.log(
|
|
f"고해상도({width}x{height}) -> {new_size}로 리사이즈 후 저장", level=logging.INFO)
|
|
|
|
resized = img.resize(new_size, Image.LANCZOS)
|
|
|
|
# RGBA 이미지를 JPEG로 저장할 때 에러 방지
|
|
import os
|
|
file_ext = os.path.splitext(image_path)[1].lower()
|
|
if resized.mode == 'RGBA' and file_ext in ['.jpg', '.jpeg']:
|
|
# RGBA를 RGB로 변환 (흰색 배경 사용)
|
|
rgb_image = Image.new('RGB', resized.size, (255, 255, 255))
|
|
rgb_image.paste(resized, mask=resized.split()[-1]) # 알파 채널을 마스크로 사용
|
|
resized = rgb_image
|
|
self.logger.log(f"RGBA 이미지를 RGB로 변환하여 JPEG 저장", level=logging.INFO)
|
|
|
|
# JPG, PNG 등에 관계없이 원본 확장자를 유지하여 덮어쓰기
|
|
resized.save(image_path)
|
|
return image_path
|
|
except Exception as e:
|
|
self.logger.log(f"다운스케일 실패: {e}", level=logging.WARNING)
|
|
return image_path
|