383 lines
17 KiB
Python
383 lines
17 KiB
Python
import os
|
||
import sys
|
||
import asyncio
|
||
import aiofiles
|
||
from pathlib import Path
|
||
import logging
|
||
from urllib.parse import urlparse
|
||
import tempfile
|
||
import shutil
|
||
import json
|
||
from src.modules.local_image_server import LocalImageServer
|
||
import aiohttp
|
||
import threading
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
import re
|
||
|
||
class ImageProcessor:
|
||
"""이미지 다운로드, OCR, 번역 처리를 담당하는 클래스"""
|
||
|
||
def __init__(self, logger, browser_page, whale_translator, clipboard_manager, temp_dir, toggle_states):
|
||
self.logger = logger
|
||
self.page = browser_page
|
||
self.whale = whale_translator
|
||
self.clipboard_manager = clipboard_manager
|
||
self.TEMP_IMAGE_DIR = temp_dir
|
||
self.toggle_states = toggle_states
|
||
|
||
# OCR 관련
|
||
self._ocr_counter = 0
|
||
self._ocr_reset_interval = 100 # 재초기화 주기
|
||
|
||
# OCR 전용 워커 스레드 풀 (단일 스레드)
|
||
self._ocr_executor = ThreadPoolExecutor(max_workers=1)
|
||
# 워커 스레드-로컬 저장소
|
||
self._thread_local = threading.local()
|
||
|
||
self.image_server = LocalImageServer(logger, self.TEMP_IMAGE_DIR)
|
||
|
||
# 웹서버 시작
|
||
self.image_server.start_server()
|
||
|
||
def update_page(self, page1, toggle_states):
|
||
self.page = page1
|
||
self.toggle_states = toggle_states
|
||
self.logger.log(f"page객체 업데이트 : {page1}", level=logging.DEBUG)
|
||
|
||
def __del__(self):
|
||
"""소멸자에서 리소스 정리"""
|
||
self.cleanup()
|
||
|
||
def cleanup(self):
|
||
"""리소스 정리"""
|
||
try:
|
||
# 웹서버 중지
|
||
if hasattr(self, 'image_server'):
|
||
self.image_server.stop_server()
|
||
|
||
# 임시 폴더 삭제
|
||
if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir):
|
||
shutil.rmtree(self.temp_dir)
|
||
self.logger.log(f"임시 폴더 삭제됨: {self.temp_dir}", level=logging.INFO)
|
||
except Exception as e:
|
||
self.logger.log(f"리소스 정리 중 오류: {e}", level=logging.ERROR)
|
||
|
||
async def process_single_image(self, page, original_image_url, index, is_localServer, delay=1.0, file_prefix=""):
|
||
"""
|
||
단일 이미지를 처리합니다 (다운로드 -> OCR -> 번역 또는 원본)
|
||
|
||
Args:
|
||
image_url (str): 처리할 이미지 URL
|
||
index (int): 이미지 인덱스
|
||
is_localServer (bool): 로컬 서버 사용 여부
|
||
delay (float): 요청 간격 (초)
|
||
file_prefix (str): 파일명에 추가할 접두사 (예: "detail", "option")
|
||
|
||
Returns:
|
||
str or False:
|
||
- 번역된 이미지 파일 경로 (중국어 있고 번역 성공)
|
||
- 원본 이미지 파일 경로 (중국어 없음)
|
||
- False (불필요한 키워드 포함)
|
||
- 로컬 파일 경로 또는 원본 URL (오류 발생 시)
|
||
"""
|
||
local_image_path = None
|
||
|
||
try:
|
||
|
||
# 0. 이미지 URL 유효성 체크 (http/https & 이미지 확장자)
|
||
if not original_image_url or not isinstance(original_image_url, str):
|
||
self.logger.log(f"이미지 {index+1} 처리 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING)
|
||
return False
|
||
|
||
image_url_pattern = re.compile(r'^(http|https)://.*\.(jpg|jpeg|png|bmp|gif|webp|tiff?)(\?.*)?$', re.IGNORECASE)
|
||
if not image_url_pattern.match(original_image_url):
|
||
self.logger.log(f"이미지 {index+1} 처리 중단: 유효하지 않은 이미지 주소 - {original_image_url}", level=logging.WARNING)
|
||
return False
|
||
|
||
# 요청 간격 조절 (봇 탐지 회피)
|
||
if index > 0:
|
||
await asyncio.sleep(delay)
|
||
|
||
# OCR 권한 상태 로그
|
||
ocr_enabled = self.toggle_states.get('ocr', False)
|
||
ocr_status = "고급 사용자 (OCR 활성화)" if ocr_enabled else "일반 사용자 (OCR 비활성화)"
|
||
self.logger.log(f"이미지 {index+1} 처리 시작: {original_image_url} - {ocr_status}", level=logging.INFO)
|
||
|
||
# 1. 이미지 다운로드
|
||
local_image_path = await self.download_image(page, original_image_url, index, file_prefix)
|
||
if not local_image_path:
|
||
self.logger.log(f"이미지 {index+1} 다운로드 실패, 원본 URL 반환", level=logging.WARNING)
|
||
return original_image_url
|
||
|
||
# 2. OCR 수행 (단일 스레드 풀에서만)
|
||
# ocr_result = await self._check_chinese_text(image_path=local_image_path)
|
||
ocr_result = await self.check_chinese_text(image_path=local_image_path)
|
||
|
||
if ocr_result == 'exclude':
|
||
self.logger.log(f"이미지 {index+1} 불필요한 키워드 포함으로 제외", level=logging.INFO)
|
||
return False
|
||
|
||
if ocr_result == 'original':
|
||
self.logger.log(f"이미지 {index+1} 원본 사용(original)", level=logging.INFO)
|
||
return local_image_path
|
||
|
||
# translate
|
||
self.logger.log(f"이미지 {index+1} 번역 시작", level=logging.INFO)
|
||
if is_localServer:
|
||
translate_image_url = await self.get_local_image_url(local_image_path)
|
||
else:
|
||
translate_image_url = original_image_url
|
||
|
||
if translate_image_url.startswith("http"):
|
||
translated = await self.translate_and_save_image(translate_image_url, local_image_path, index, file_prefix)
|
||
if translated:
|
||
self.logger.log(f"이미지 {index+1} 번역 완료: {translated}", level=logging.INFO)
|
||
return translated
|
||
|
||
self.logger.log(f"이미지 {index+1} 번역 실패, 원본 반환", level=logging.WARNING)
|
||
return local_image_path
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"이미지 {index+1} 처리 중 오류: {e}", level=logging.ERROR)
|
||
return local_image_path or original_image_url
|
||
|
||
async def translate_and_save_image(self, local_server_url, original_local_path, index, file_prefix=""):
|
||
"""로컬 서버 URL을 사용해 이미지를 번역하고 로컬에 저장합니다"""
|
||
try:
|
||
# 파일명에 접두사 포함
|
||
if file_prefix:
|
||
img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_{file_prefix}_img_{index+1}.png")
|
||
else:
|
||
img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_img_{index+1}.png")
|
||
|
||
# 웨일 브라우저로 이미지 번역
|
||
is_translated = self.whale.translate_image(local_server_url)
|
||
|
||
# 번역된 이미지를 임시 파일로 저장
|
||
translated_img_path = self.clipboard_manager.process_clipboard_to_save_path_with_local_hosted_image(
|
||
local_image_path=original_local_path,
|
||
is_success_translated=is_translated,
|
||
toggle_states=self.toggle_states,
|
||
path=img_path
|
||
)
|
||
|
||
return translated_img_path
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"이미지 {index+1} 번역 처리 중 오류: {e}", level=logging.ERROR)
|
||
return original_local_path
|
||
|
||
|
||
async def download_image(self, page, image_url, index, file_prefix=""):
|
||
"""Playwright를 사용해 이미지를 다운로드합니다"""
|
||
try:
|
||
# "https://assets.alicdn.com"으로 시작하는 URL은 건너뛰기
|
||
if image_url.startswith("https://assets.alicdn.com"):
|
||
self.logger.log(f"다운로드 제외 URL: {image_url}", level=logging.DEBUG)
|
||
return None
|
||
|
||
# URL에서 파일명 추출 및 접두사 포함
|
||
parsed_url = urlparse(image_url)
|
||
base_filename = f"image_{index:03d}_{os.path.basename(parsed_url.path)}"
|
||
if not base_filename.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')):
|
||
base_filename += '.jpg'
|
||
|
||
# 접두사가 있으면 파일명에 포함
|
||
if file_prefix:
|
||
filename = f"{file_prefix}_{base_filename}"
|
||
else:
|
||
filename = base_filename
|
||
|
||
local_path = os.path.join(self.TEMP_IMAGE_DIR, filename)
|
||
|
||
# Playwright로 이미지 다운로드
|
||
response = await page.request.get(image_url)
|
||
if response.status == 200:
|
||
image_data = await response.body()
|
||
|
||
# 이미지 데이터 유효성 검사
|
||
if self.is_valid_image_data(image_data):
|
||
async with aiofiles.open(local_path, 'wb') as f:
|
||
await f.write(image_data)
|
||
|
||
self.logger.log(f"이미지 다운로드 완료: {filename}", level=logging.INFO)
|
||
return local_path
|
||
else:
|
||
self.logger.log(f"유효하지 않은 이미지 데이터: {image_url}", level=logging.WARNING)
|
||
return None
|
||
else:
|
||
self.logger.log(f"이미지 다운로드 실패 (HTTP {response.status}): {image_url}", level=logging.ERROR)
|
||
return None
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"이미지 다운로드 중 오류: {e}", level=logging.ERROR)
|
||
return None
|
||
|
||
def is_valid_image_data(self, image_data):
|
||
"""이미지 데이터가 유효한지 확인합니다"""
|
||
if not image_data or len(image_data) < 100:
|
||
return False
|
||
|
||
# 일반적인 이미지 파일 시그니처 확인
|
||
image_signatures = [
|
||
b'\xFF\xD8\xFF', # JPEG
|
||
b'\x89PNG\r\n\x1a\n', # PNG
|
||
b'GIF87a', # GIF87a
|
||
b'GIF89a', # GIF89a
|
||
b'RIFF', # WebP (RIFF 컨테이너)
|
||
]
|
||
|
||
return any(image_data.startswith(sig) for sig in image_signatures)
|
||
|
||
|
||
async def check_chinese_text(self, image_path):
|
||
"""이미지에서 중국어 텍스트가 있는지 확인합니다 (에러 시 OCR 객체 재생성 후 재시도)"""
|
||
# ocr_enabled = self.toggle_states.get('ocr', False)
|
||
# if not ocr_enabled:
|
||
# self.logger.log("OCR 비활성화—항상 번역 모드", level=logging.INFO)
|
||
# return 'translate'
|
||
return 'translate'
|
||
|
||
# # OCR 프로세서가 없으면 초기화
|
||
# if self.ocr_processor is None:
|
||
# self.initialize_ocr_processor()
|
||
# if self.ocr_processor is None:
|
||
# return 'translate'
|
||
|
||
# # 실제 OCR 호출을 래핑: 에러 시 재초기화 + 재시도
|
||
# try:
|
||
# result = await asyncio.to_thread(
|
||
# self.ocr_processor.check_local_image_zhcn,
|
||
# image_path
|
||
# )
|
||
# except Exception as e:
|
||
# # primitive 실행 에러로 추정
|
||
# self.logger.log(f"OCR primitive 오류: {e} → 번역 모드로 전환합니다.", level=logging.WARNING, exc_info=True)
|
||
# # 재초기화
|
||
# self.initialize_ocr_processor()
|
||
# try:
|
||
# result = await asyncio.to_thread(
|
||
# self.ocr_processor.check_local_image_zhcn,
|
||
# image_path
|
||
# )
|
||
# except Exception as e2:
|
||
# self.logger.log(f"OCR 재시도 실패: {e2}", level=logging.ERROR, exc_info=True)
|
||
# return 'translate'
|
||
|
||
# # 사용 횟수 카운트 및 주기 도달 시 재초기화
|
||
# self._ocr_counter += 1
|
||
# self.logger.log(f"OCR 프로세서 사용 횟수: {self._ocr_counter}", level=logging.DEBUG)
|
||
# if self._ocr_counter >= self._ocr_reset_interval:
|
||
# self.logger.log("OCR 재초기화 주기 도달 – 객체 재생성합니다.", level=logging.INFO)
|
||
# self.initialize_ocr_processor()
|
||
|
||
# # OCR 결과 판정
|
||
# if result:
|
||
# status = result.get('status', 'error')
|
||
# message = result.get('message', '')
|
||
# if status == 'has_chinese_clean':
|
||
# self.logger.log(f"번역 대상 이미지: {image_path} - {message}", level=logging.INFO)
|
||
# return 'translate'
|
||
# elif status == 'has_chinese_unwanted':
|
||
# self.logger.log(f"불필요한 키워드 포함 이미지: {image_path} - {message}", level=logging.INFO)
|
||
# return 'exclude'
|
||
# elif status == 'no_chinese':
|
||
# self.logger.log(f"중국어 텍스트 없음: {image_path} - {message}", level=logging.INFO)
|
||
# return 'original'
|
||
# else:
|
||
# self.logger.log(f"OCR 처리 오류({status}): {image_path} - {message}", level=logging.ERROR)
|
||
# return 'error'
|
||
# else:
|
||
# self.logger.log(f"OCR 결과가 None 또는 빈 값: {image_path}", level=logging.ERROR)
|
||
# return 'error'
|
||
|
||
|
||
# async def _check_chinese_text(self, image_path: str) -> str:
|
||
# """OCR 전용 풀에서만 실행하여 primitive 에러 방지"""
|
||
# if not self.toggle_states.get('ocr', False):
|
||
# return 'translate'
|
||
|
||
# loop = asyncio.get_running_loop()
|
||
# try:
|
||
# status = await loop.run_in_executor(
|
||
# self._ocr_executor,
|
||
# self._threaded_ocr,
|
||
# image_path
|
||
# )
|
||
# except Exception as e:
|
||
# self.logger.log(f"OCR 워커 에러: {e} → 번역 모드", level=logging.WARNING, exc_info=True)
|
||
# return 'translate'
|
||
|
||
# # 주기적 재초기화
|
||
# self._ocr_counter += 1
|
||
# if self._ocr_counter >= self._ocr_reset_interval:
|
||
# self.logger.log("OCR 재초기화 주기 도달", level=logging.INFO)
|
||
# # 워커 스레드에 있는 OCR 객체 갱신
|
||
# def _reinit():
|
||
# if hasattr(self._thread_local, 'ocr'):
|
||
# del self._thread_local.ocr
|
||
# await loop.run_in_executor(self._ocr_executor, _reinit)
|
||
# self._ocr_counter = 0
|
||
|
||
# return status
|
||
|
||
|
||
# def _threaded_ocr(self, image_path: str) -> str:
|
||
# """단일 워커 스레드 안에서만 OCR 초기화 및 추론"""
|
||
# # 최초 호출 시 OCR 인스턴스 생성
|
||
# if not hasattr(self._thread_local, 'ocr'):
|
||
# self.logger.log("워커 스레드에서 OCR 프로세서 초기화", level=logging.INFO)
|
||
# from src.ppocr.ocr import ChineseImageOCRProcessor
|
||
# self._thread_local.ocr = ChineseImageOCRProcessor(
|
||
# logger=self.logger,
|
||
# toggle_states=self.toggle_states,
|
||
# use_gpu=False,
|
||
# det_enabled=True,
|
||
# horizontal_only=False
|
||
# )
|
||
|
||
# # 실제 OCR 실행
|
||
# result = self._thread_local.ocr.check_local_image_zhcn(image_path)
|
||
# status = result.get('status', 'error')
|
||
# self.logger.log(f"OCR 결과: {result}", level=logging.DEBUG)
|
||
# self.logger.log(f"status 결과: {status}", level=logging.DEBUG)
|
||
|
||
# if status == 'has_chinese_clean':
|
||
# return 'translate'
|
||
# if status == 'has_chinese_unwanted':
|
||
# return 'exclude'
|
||
# if status == 'no_chinese':
|
||
# return 'original'
|
||
# return 'translate'
|
||
|
||
async def get_local_image_url(self, local_path):
|
||
"""로컬 이미지를 웹서버 URL로 변환합니다"""
|
||
try:
|
||
filename = os.path.basename(local_path)
|
||
if self.image_server.is_running():
|
||
web_url = f"{self.image_server.get_base_url()}/{filename}"
|
||
self.logger.log(f"번역용 웹 URL 생성: {web_url}", level=logging.INFO)
|
||
return web_url
|
||
else:
|
||
self.logger.log(f"웹서버가 실행되지 않았습니다.", level=logging.ERROR)
|
||
return local_path
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"웹 URL 생성 중 오류: {e}", level=logging.ERROR)
|
||
return local_path
|
||
|
||
|
||
async def process_image_list(self, image_urls, delay_between_requests=1.0):
|
||
"""이미지 목록을 순차적으로 처리합니다"""
|
||
processed_images = []
|
||
|
||
for index, image_url in enumerate(image_urls):
|
||
processed_url = await self.process_single_image(image_url, index, delay_between_requests)
|
||
if processed_url:
|
||
processed_images.append(processed_url)
|
||
|
||
# 진행 상황 로그
|
||
self.logger.log(f"이미지 처리 진행률: {index+1}/{len(image_urls)}", level=logging.INFO)
|
||
|
||
return processed_images
|