IMG_Worker/modules/old_modules/image_processor.py

383 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import asyncio
import aiofiles
from pathlib import Path
import logging
from urllib.parse import urlparse
import tempfile
import shutil
import json
from src.modules.local_image_server import LocalImageServer
import aiohttp
import threading
from concurrent.futures import ThreadPoolExecutor
import re
class ImageProcessor:
"""이미지 다운로드, OCR, 번역 처리를 담당하는 클래스"""
def __init__(self, logger, browser_page, whale_translator, clipboard_manager, temp_dir, toggle_states):
self.logger = logger
self.page = browser_page
self.whale = whale_translator
self.clipboard_manager = clipboard_manager
self.TEMP_IMAGE_DIR = temp_dir
self.toggle_states = toggle_states
# OCR 관련
self._ocr_counter = 0
self._ocr_reset_interval = 100 # 재초기화 주기
# OCR 전용 워커 스레드 풀 (단일 스레드)
self._ocr_executor = ThreadPoolExecutor(max_workers=1)
# 워커 스레드-로컬 저장소
self._thread_local = threading.local()
self.image_server = LocalImageServer(logger, self.TEMP_IMAGE_DIR)
# 웹서버 시작
self.image_server.start_server()
def update_page(self, page1, toggle_states):
self.page = page1
self.toggle_states = toggle_states
self.logger.log(f"page객체 업데이트 : {page1}", level=logging.DEBUG)
def __del__(self):
"""소멸자에서 리소스 정리"""
self.cleanup()
def cleanup(self):
"""리소스 정리"""
try:
# 웹서버 중지
if hasattr(self, 'image_server'):
self.image_server.stop_server()
# 임시 폴더 삭제
if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
self.logger.log(f"임시 폴더 삭제됨: {self.temp_dir}", level=logging.INFO)
except Exception as e:
self.logger.log(f"리소스 정리 중 오류: {e}", level=logging.ERROR)
async def process_single_image(self, page, original_image_url, index, is_localServer, delay=1.0, file_prefix=""):
"""
단일 이미지를 처리합니다 (다운로드 -> OCR -> 번역 또는 원본)
Args:
image_url (str): 처리할 이미지 URL
index (int): 이미지 인덱스
is_localServer (bool): 로컬 서버 사용 여부
delay (float): 요청 간격 (초)
file_prefix (str): 파일명에 추가할 접두사 (예: "detail", "option")
Returns:
str or False:
- 번역된 이미지 파일 경로 (중국어 있고 번역 성공)
- 원본 이미지 파일 경로 (중국어 없음)
- False (불필요한 키워드 포함)
- 로컬 파일 경로 또는 원본 URL (오류 발생 시)
"""
local_image_path = None
try:
# 0. 이미지 URL 유효성 체크 (http/https & 이미지 확장자)
if not original_image_url or not isinstance(original_image_url, str):
self.logger.log(f"이미지 {index+1} 처리 중단: 이미지 URL 없음 또는 타입 오류", level=logging.WARNING)
return False
image_url_pattern = re.compile(r'^(http|https)://.*\.(jpg|jpeg|png|bmp|gif|webp|tiff?)(\?.*)?$', re.IGNORECASE)
if not image_url_pattern.match(original_image_url):
self.logger.log(f"이미지 {index+1} 처리 중단: 유효하지 않은 이미지 주소 - {original_image_url}", level=logging.WARNING)
return False
# 요청 간격 조절 (봇 탐지 회피)
if index > 0:
await asyncio.sleep(delay)
# OCR 권한 상태 로그
ocr_enabled = self.toggle_states.get('ocr', False)
ocr_status = "고급 사용자 (OCR 활성화)" if ocr_enabled else "일반 사용자 (OCR 비활성화)"
self.logger.log(f"이미지 {index+1} 처리 시작: {original_image_url} - {ocr_status}", level=logging.INFO)
# 1. 이미지 다운로드
local_image_path = await self.download_image(page, original_image_url, index, file_prefix)
if not local_image_path:
self.logger.log(f"이미지 {index+1} 다운로드 실패, 원본 URL 반환", level=logging.WARNING)
return original_image_url
# 2. OCR 수행 (단일 스레드 풀에서만)
# ocr_result = await self._check_chinese_text(image_path=local_image_path)
ocr_result = await self.check_chinese_text(image_path=local_image_path)
if ocr_result == 'exclude':
self.logger.log(f"이미지 {index+1} 불필요한 키워드 포함으로 제외", level=logging.INFO)
return False
if ocr_result == 'original':
self.logger.log(f"이미지 {index+1} 원본 사용(original)", level=logging.INFO)
return local_image_path
# translate
self.logger.log(f"이미지 {index+1} 번역 시작", level=logging.INFO)
if is_localServer:
translate_image_url = await self.get_local_image_url(local_image_path)
else:
translate_image_url = original_image_url
if translate_image_url.startswith("http"):
translated = await self.translate_and_save_image(translate_image_url, local_image_path, index, file_prefix)
if translated:
self.logger.log(f"이미지 {index+1} 번역 완료: {translated}", level=logging.INFO)
return translated
self.logger.log(f"이미지 {index+1} 번역 실패, 원본 반환", level=logging.WARNING)
return local_image_path
except Exception as e:
self.logger.log(f"이미지 {index+1} 처리 중 오류: {e}", level=logging.ERROR)
return local_image_path or original_image_url
async def translate_and_save_image(self, local_server_url, original_local_path, index, file_prefix=""):
"""로컬 서버 URL을 사용해 이미지를 번역하고 로컬에 저장합니다"""
try:
# 파일명에 접두사 포함
if file_prefix:
img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_{file_prefix}_img_{index+1}.png")
else:
img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_img_{index+1}.png")
# 웨일 브라우저로 이미지 번역
is_translated = self.whale.translate_image(local_server_url)
# 번역된 이미지를 임시 파일로 저장
translated_img_path = self.clipboard_manager.process_clipboard_to_save_path_with_local_hosted_image(
local_image_path=original_local_path,
is_success_translated=is_translated,
toggle_states=self.toggle_states,
path=img_path
)
return translated_img_path
except Exception as e:
self.logger.log(f"이미지 {index+1} 번역 처리 중 오류: {e}", level=logging.ERROR)
return original_local_path
async def download_image(self, page, image_url, index, file_prefix=""):
"""Playwright를 사용해 이미지를 다운로드합니다"""
try:
# "https://assets.alicdn.com"으로 시작하는 URL은 건너뛰기
if image_url.startswith("https://assets.alicdn.com"):
self.logger.log(f"다운로드 제외 URL: {image_url}", level=logging.DEBUG)
return None
# URL에서 파일명 추출 및 접두사 포함
parsed_url = urlparse(image_url)
base_filename = f"image_{index:03d}_{os.path.basename(parsed_url.path)}"
if not base_filename.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')):
base_filename += '.jpg'
# 접두사가 있으면 파일명에 포함
if file_prefix:
filename = f"{file_prefix}_{base_filename}"
else:
filename = base_filename
local_path = os.path.join(self.TEMP_IMAGE_DIR, filename)
# Playwright로 이미지 다운로드
response = await page.request.get(image_url)
if response.status == 200:
image_data = await response.body()
# 이미지 데이터 유효성 검사
if self.is_valid_image_data(image_data):
async with aiofiles.open(local_path, 'wb') as f:
await f.write(image_data)
self.logger.log(f"이미지 다운로드 완료: {filename}", level=logging.INFO)
return local_path
else:
self.logger.log(f"유효하지 않은 이미지 데이터: {image_url}", level=logging.WARNING)
return None
else:
self.logger.log(f"이미지 다운로드 실패 (HTTP {response.status}): {image_url}", level=logging.ERROR)
return None
except Exception as e:
self.logger.log(f"이미지 다운로드 중 오류: {e}", level=logging.ERROR)
return None
def is_valid_image_data(self, image_data):
"""이미지 데이터가 유효한지 확인합니다"""
if not image_data or len(image_data) < 100:
return False
# 일반적인 이미지 파일 시그니처 확인
image_signatures = [
b'\xFF\xD8\xFF', # JPEG
b'\x89PNG\r\n\x1a\n', # PNG
b'GIF87a', # GIF87a
b'GIF89a', # GIF89a
b'RIFF', # WebP (RIFF 컨테이너)
]
return any(image_data.startswith(sig) for sig in image_signatures)
async def check_chinese_text(self, image_path):
"""이미지에서 중국어 텍스트가 있는지 확인합니다 (에러 시 OCR 객체 재생성 후 재시도)"""
# ocr_enabled = self.toggle_states.get('ocr', False)
# if not ocr_enabled:
# self.logger.log("OCR 비활성화—항상 번역 모드", level=logging.INFO)
# return 'translate'
return 'translate'
# # OCR 프로세서가 없으면 초기화
# if self.ocr_processor is None:
# self.initialize_ocr_processor()
# if self.ocr_processor is None:
# return 'translate'
# # 실제 OCR 호출을 래핑: 에러 시 재초기화 + 재시도
# try:
# result = await asyncio.to_thread(
# self.ocr_processor.check_local_image_zhcn,
# image_path
# )
# except Exception as e:
# # primitive 실행 에러로 추정
# self.logger.log(f"OCR primitive 오류: {e} → 번역 모드로 전환합니다.", level=logging.WARNING, exc_info=True)
# # 재초기화
# self.initialize_ocr_processor()
# try:
# result = await asyncio.to_thread(
# self.ocr_processor.check_local_image_zhcn,
# image_path
# )
# except Exception as e2:
# self.logger.log(f"OCR 재시도 실패: {e2}", level=logging.ERROR, exc_info=True)
# return 'translate'
# # 사용 횟수 카운트 및 주기 도달 시 재초기화
# self._ocr_counter += 1
# self.logger.log(f"OCR 프로세서 사용 횟수: {self._ocr_counter}", level=logging.DEBUG)
# if self._ocr_counter >= self._ocr_reset_interval:
# self.logger.log("OCR 재초기화 주기 도달 객체 재생성합니다.", level=logging.INFO)
# self.initialize_ocr_processor()
# # OCR 결과 판정
# if result:
# status = result.get('status', 'error')
# message = result.get('message', '')
# if status == 'has_chinese_clean':
# self.logger.log(f"번역 대상 이미지: {image_path} - {message}", level=logging.INFO)
# return 'translate'
# elif status == 'has_chinese_unwanted':
# self.logger.log(f"불필요한 키워드 포함 이미지: {image_path} - {message}", level=logging.INFO)
# return 'exclude'
# elif status == 'no_chinese':
# self.logger.log(f"중국어 텍스트 없음: {image_path} - {message}", level=logging.INFO)
# return 'original'
# else:
# self.logger.log(f"OCR 처리 오류({status}): {image_path} - {message}", level=logging.ERROR)
# return 'error'
# else:
# self.logger.log(f"OCR 결과가 None 또는 빈 값: {image_path}", level=logging.ERROR)
# return 'error'
# async def _check_chinese_text(self, image_path: str) -> str:
# """OCR 전용 풀에서만 실행하여 primitive 에러 방지"""
# if not self.toggle_states.get('ocr', False):
# return 'translate'
# loop = asyncio.get_running_loop()
# try:
# status = await loop.run_in_executor(
# self._ocr_executor,
# self._threaded_ocr,
# image_path
# )
# except Exception as e:
# self.logger.log(f"OCR 워커 에러: {e} → 번역 모드", level=logging.WARNING, exc_info=True)
# return 'translate'
# # 주기적 재초기화
# self._ocr_counter += 1
# if self._ocr_counter >= self._ocr_reset_interval:
# self.logger.log("OCR 재초기화 주기 도달", level=logging.INFO)
# # 워커 스레드에 있는 OCR 객체 갱신
# def _reinit():
# if hasattr(self._thread_local, 'ocr'):
# del self._thread_local.ocr
# await loop.run_in_executor(self._ocr_executor, _reinit)
# self._ocr_counter = 0
# return status
# def _threaded_ocr(self, image_path: str) -> str:
# """단일 워커 스레드 안에서만 OCR 초기화 및 추론"""
# # 최초 호출 시 OCR 인스턴스 생성
# if not hasattr(self._thread_local, 'ocr'):
# self.logger.log("워커 스레드에서 OCR 프로세서 초기화", level=logging.INFO)
# from src.ppocr.ocr import ChineseImageOCRProcessor
# self._thread_local.ocr = ChineseImageOCRProcessor(
# logger=self.logger,
# toggle_states=self.toggle_states,
# use_gpu=False,
# det_enabled=True,
# horizontal_only=False
# )
# # 실제 OCR 실행
# result = self._thread_local.ocr.check_local_image_zhcn(image_path)
# status = result.get('status', 'error')
# self.logger.log(f"OCR 결과: {result}", level=logging.DEBUG)
# self.logger.log(f"status 결과: {status}", level=logging.DEBUG)
# if status == 'has_chinese_clean':
# return 'translate'
# if status == 'has_chinese_unwanted':
# return 'exclude'
# if status == 'no_chinese':
# return 'original'
# return 'translate'
async def get_local_image_url(self, local_path):
"""로컬 이미지를 웹서버 URL로 변환합니다"""
try:
filename = os.path.basename(local_path)
if self.image_server.is_running():
web_url = f"{self.image_server.get_base_url()}/{filename}"
self.logger.log(f"번역용 웹 URL 생성: {web_url}", level=logging.INFO)
return web_url
else:
self.logger.log(f"웹서버가 실행되지 않았습니다.", level=logging.ERROR)
return local_path
except Exception as e:
self.logger.log(f"웹 URL 생성 중 오류: {e}", level=logging.ERROR)
return local_path
async def process_image_list(self, image_urls, delay_between_requests=1.0):
"""이미지 목록을 순차적으로 처리합니다"""
processed_images = []
for index, image_url in enumerate(image_urls):
processed_url = await self.process_single_image(image_url, index, delay_between_requests)
if processed_url:
processed_images.append(processed_url)
# 진행 상황 로그
self.logger.log(f"이미지 처리 진행률: {index+1}/{len(image_urls)}", level=logging.INFO)
return processed_images