add modules

This commit is contained in:
Envy_PC 2025-07-04 08:52:38 +09:00
parent 2999ff50de
commit 6c7447efd9
35 changed files with 6613 additions and 1 deletions

1
.gitignore vendored
View File

@ -1,5 +1,4 @@
lib/ lib/
modules/
fonts/ fonts/
scripts/ scripts/
include/ include/

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

0
modules/__init__.py Normal file
View File

3662
modules/app.log Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,764 @@
import base64
import pyperclip
import win32clipboard
from io import BytesIO
from PIL import Image, ImageGrab, ImageFont, ImageDraw
import requests
import numpy as np
import cv2
import time
import os, sys
from datetime import datetime
import random
import pywinauto
import io
import logging
class ClipboardImageManager:
def __init__(self, logger, watermark_font_size=36, debug_flag=False):
self.logger = logger
self.debug = debug_flag # 디버그 플래그를 클래스 변수로 사용
# 프로그램이 위치한 경로 기준으로 폰트 경로 설정
self.base_path = self.get_base_dir()
# 먼저 현재 모듈과 같은 디렉토리에서 폰트 파일 찾기
current_dir = os.path.dirname(os.path.abspath(__file__))
self.font_path = os.path.join(current_dir, 'HakgyoansimDunggeunmisoTTFB.ttf')
# 폰트 파일이 없으면 다른 경로들을 시도
if not os.path.exists(self.font_path):
alternative_paths = [
os.path.join(self.base_path, 'HakgyoansimDunggeunmisoTTFB.ttf'),
os.path.join(self.base_path, 'src', 'modules', 'HakgyoansimDunggeunmisoTTFB.ttf'),
os.path.join(os.path.dirname(self.base_path), 'src', 'modules', 'HakgyoansimDunggeunmisoTTFB.ttf')
]
for alt_path in alternative_paths:
if os.path.exists(alt_path):
self.font_path = alt_path
break
# 폰트 로드 (예외 처리 추가)
try:
self.font = ImageFont.truetype(self.font_path, watermark_font_size)
self.logger.log(f"폰트 로드 성공: {self.font_path}", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"커스텀 폰트 로드 실패 ({self.font_path}): {e}", level=logging.WARNING)
try:
# 기본 폰트 사용
self.font = ImageFont.load_default()
self.logger.log("기본 폰트를 사용합니다.", level=logging.INFO)
except Exception as e2:
self.logger.log(f"기본 폰트 로드도 실패: {e2}", level=logging.ERROR)
# 최후의 수단으로 None 설정
self.font = None
# self.debug = True
def reset_state(self):
"""클립보드 이미지 관리자의 상태를 초기화합니다."""
self.logger.log("ClipboardImageManager 상태 초기화", level=logging.DEBUG)
# 클립보드 비우기
self.clear_clipboard()
def get_base_dir(self):
"""
실행 환경에 따라 base_dir을 설정하는 메서드.
cx_Freeze로 패키징된 경우 실행 파일의 경로, 일반 Python 환경일 경우 __file__을 기준으로 설정.
"""
if getattr(sys, 'frozen', False): # 패키징된 경우
base_dir = os.path.dirname(sys.executable)
internal_dir = os.path.join(base_dir, 'lib', 'src') # lib 디렉토리 포함
if os.path.exists(internal_dir): # lib 디렉토리가 존재하면 base_dir로 설정
return internal_dir
else: # 일반 Python 실행 환경
base_dir = os.path.dirname(os.path.abspath(__file__))
debug_dir = os.path.join(base_dir, 'src') # lib 디렉토리 포함
return debug_dir
def get_clipboard_data(self):
"""클립보드의 텍스트 또는 이미지 데이터를 가져옵니다."""
self.logger.log("클립보드의 텍스트 또는 이미지 데이터를 가져옵니다", level=logging.DEBUG)
max_attempts = 5
attempt = 0
while attempt < max_attempts:
try:
# 1. 텍스트 데이터 우선 시도
clipboard_text = pyperclip.paste()
if clipboard_text:
return clipboard_text
# 2. 텍스트가 없으면 이미지 확인
self.logger.log("텍스트 데이터가 없어 이미지 데이터 확인 시도", level=logging.DEBUG)
image = ImageGrab.grabclipboard()
if isinstance(image, Image.Image): # 이미지 데이터가 있는 경우
self.logger.log("클립보드에 이미지 데이터가 확인되었습니다.", level=logging.DEBUG)
return image # PIL 이미지 객체 반환
else:
self.logger.log("클립보드에 텍스트 또는 이미지 데이터가 없습니다.", level=logging.DEBUG)
return None
except Exception as e:
attempt += 1
self.logger.log(f"클립보드 데이터를 가져오는 중 오류 발생 (시도 {attempt}/{max_attempts}): {e}", level=logging.WARNING)
if attempt < max_attempts:
time.sleep(0.5) # 0.5초 대기 후 재시도
else:
self.logger.log(f"클립보드 데이터를 가져오는 중 최대 시도 횟수 초과: {e}", level=logging.ERROR, exc_info=True)
return None
def set_image_to_clipboard(self, image):
"""이미지를 클립보드에 넣는 함수 (Windows 전용)"""
output = BytesIO()
image.save(output, "BMP")
self.logger.log(f"이미지 데이터 BMP 변환", level=logging.DEBUG)
data = output.getvalue()[14:] # BMP 헤더 제거
output.close()
self.logger.log(f"이미지 BMP 헤더 제거", level=logging.DEBUG)
# 클립보드 접근 재시도 로직
max_attempts = 5
attempt = 0
success = False
while attempt < max_attempts and not success:
try:
# 클립보드에 이미지 데이터 넣기
win32clipboard.OpenClipboard()
win32clipboard.EmptyClipboard()
win32clipboard.SetClipboardData(win32clipboard.CF_DIB, data)
win32clipboard.CloseClipboard()
success = True
self.logger.log(f"클립보드 데이터 저장 성공 (시도 {attempt+1}/{max_attempts})", level=logging.DEBUG)
except Exception as e:
attempt += 1
self.logger.log(f"클립보드 데이터 저장 실패 (시도 {attempt}/{max_attempts}): {e}", level=logging.WARNING)
if attempt < max_attempts:
time.sleep(0.5) # 0.5초 대기 후 재시도
# 클립보드가 제대로 설정되었는지 확인하는 로그
if success:
try:
time.sleep(0.1) # 아주 짧은 대기 시간
win32clipboard.OpenClipboard()
if win32clipboard.IsClipboardFormatAvailable(win32clipboard.CF_DIB):
self.logger.log("클립보드 데이터 확인 성공", level=logging.DEBUG)
else:
self.logger.log("클립보드 데이터 확인 실패", level=logging.ERROR)
win32clipboard.CloseClipboard()
except Exception as e:
self.logger.log(f"클립보드 데이터 확인 중 오류: {e}", level=logging.ERROR)
def save_image_to_path(self, image, path):
try:
if image:
# 이미지를 저장 경로에 저장
self.logger.log(f"이미지 저장 완료 : {path}", level=logging.INFO)
image.save(path, format='PNG')
return path
except Exception as e:
raise RuntimeError(f"이미지 저장 중 오류 발생: {e}")
def add_watermark(self, image, watermark_text="Watermark", opacity_percent=30, angle=30, font_size=36):
"""
이미지에 텍스트 워터마크를 이미지 전체에 걸쳐서 추가하는 함수
:param image: PIL 이미지 객체
:param watermark_text: 워터마크로 추가할 텍스트
:param opacity_percent: 워터마크의 투명도 (0~100)
:param angle: 워터마크 텍스트 회전 각도 (기본 30)
:param font_size: 워터마크 텍스트의 폰트 크기 (기본 36)
:return: 워터마크가 추가된 이미지
"""
# 폰트가 로드되지 않은 경우 원본 이미지 반환
if self.font is None:
self.logger.log("폰트가 로드되지 않아 워터마크를 추가할 수 없습니다. 원본 이미지를 반환합니다.", level=logging.WARNING)
return image
# 이미지 복사본 생성
watermark_image = image.copy()
# 폰트 설정 (안전한 폰트 로딩)
try:
# self.font가 있으면 크기만 조정해서 새 폰트 생성
if hasattr(self, 'font_path') and os.path.exists(self.font_path):
font = ImageFont.truetype(self.font_path, font_size)
else:
# 크기를 조정할 수 없으면 기존 폰트 사용
font = self.font
except Exception as e:
self.logger.log(f"폰트 크기 조정 실패: {e}. 기본 폰트를 사용합니다.", level=logging.WARNING)
font = self.font
# 텍스트 투명도를 0~255로 변환
opacity = int(255 * (opacity_percent / 100))
# 텍스트 크기 측정 (textbbox 사용)
draw = ImageDraw.Draw(watermark_image)
bbox = draw.textbbox((0, 0), watermark_text, font=font)
text_width, text_height = bbox[2] - bbox[0], bbox[3] - bbox[1]
# 이미지 크기
width, height = image.size
# 워터마크 레이어 생성
watermark_layer = Image.new("RGBA", (width, height)) # RGBA 이미지 생성
# 지그재그 간격 설정
zigzag_step = int(text_height * 2) # Y축의 지그재그 간격
# 이미지 전체에 반복적으로 워터마크 텍스트 그리기 (지그재그 형태)
for y in range(0, height, zigzag_step):
for x in range(0, width, int(text_width * 3)): # 3배 너비 간격으로 반복
# 텍스트가 한 줄씩 지그재그 형태로 X축을 교차하여 이동
x_offset = (y // zigzag_step) % 2 * int(text_width * 1.5) # 짝수 행에서는 X축을 약간 이동
# 텍스트 레이어 생성
text_layer = Image.new("RGBA", (text_width, text_height), (255, 255, 255, 0))
text_draw = ImageDraw.Draw(text_layer)
# 텍스트 그리기
text_draw.text((0, 0), watermark_text, fill=(255, 255, 255, opacity), font=font)
# 텍스트 회전
rotated_text_layer = text_layer.rotate(angle, expand=1)
# 회전된 텍스트를 워터마크 레이어에 추가
watermark_layer.paste(rotated_text_layer, (x + x_offset, y), rotated_text_layer)
# 원본 이미지와 워터마크 레이어 합성
watermark_image = Image.alpha_composite(watermark_image.convert("RGBA"), watermark_layer)
# 최종적으로 RGB 형식으로 변환 후 반환
return watermark_image.convert("RGB")
def base64_to_image(self, base64_data):
"""Base64 데이터를 이미지로 변환하는 함수"""
if base64_data.startswith('data:image'):
header, encoded = base64_data.split(',', 1)
img_data = base64.b64decode(encoded)
image = Image.open(BytesIO(img_data))
return image
else:
self.logger.log("유효하지 않은 Base64 이미지 데이터입니다.", level=logging.DEBUG)
return None
def image_to_base64(self, image):
# 이미지 Base64로 변환
buffer = io.BytesIO()
image.save(buffer, format="PNG")
base64_image = base64.b64encode(buffer.getvalue()).decode('utf-8')
return base64_image
def download_image_from_url(self, url, max_retries=3):
"""URL에서 이미지를 다운로드하고 PIL 이미지 객체로 반환"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1", # Do Not Track 요청 헤더
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Cache-Control": "max-age=0"
}
retries = 0
while retries < max_retries:
try:
self.logger.log(f"이미지 URL 다운로드 중: {url}", level=logging.DEBUG)
response = requests.get(url, headers=headers, stream=True)
# 상태 코드가 200이 아니면 재시도
if response.status_code == 200:
# OpenCV로 이미지를 로드하여 변환
image = np.asarray(bytearray(response.content), dtype="uint8")
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
# OpenCV에서 이미지를 PIL로 변환
if image is not None:
pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
return pil_image
else:
self.logger.log(f"이미지 파일 형식이 올바르지 않습니다. 대상 URL: {url}", level=logging.DEBUG)
return None
else:
self.logger.log(f"이미지 로딩 실패, HTTP 상태 코드: {response.status_code}. 재시도 {retries + 1}/{max_retries}", level=logging.DEBUG)
retries += 1
# await asyncio.sleep(random.randint(2, 5)) # 2~5초 대기 후 재시도
time.sleep(random.randint(2, 5))
except Exception as e:
self.logger.log(f"이미지 로딩 중 오류 발생: {e}. 재시도 {retries + 1}/{max_retries}", level=logging.DEBUG)
retries += 1
# await asyncio.sleep(random.randint(2, 5)) # 예외 발생 시 대기 후 재시도
time.sleep(random.randint(2, 5))
self.logger.log("이미지 다운로드 최대 재시도 횟수를 초과했습니다.", level=logging.DEBUG)
return None
def process_clipboard(self, original_url, is_success_translated, toggle_states, path=None, is_thumb=False):
"""클립보드의 내용을 처리하고, 필요한 경우 이미지 변환, 크롭 또는 클립보드 비우기"""
try:
is_watermark = toggle_states.get('watermark')
self.logger.log(f"is_watermark : {is_watermark}", level=logging.DEBUG)
watermark_text = toggle_states.get('watermark_text')
self.logger.log(f"watermark_text : {watermark_text}", level=logging.DEBUG)
opacity_percent = toggle_states.get('opacity_percent')
self.logger.log(f"opacity_percent : {opacity_percent}", level=logging.DEBUG)
clipboard_data = self.get_clipboard_data()
self.logger.log("clipboard_data", level=logging.DEBUG)
self.logger.log(f"{clipboard_data}", level=logging.DEBUG)
self.logger.log(f"============================", level=logging.DEBUG)
# 1. 클립보드의 데이터가 Base64 이미지일 경우
if isinstance(clipboard_data, str) and clipboard_data.startswith('data:image'):
self.logger.log("[process_clipboard] data:image 감지 : 이미지 데이터로 변환", level=logging.INFO)
image = self.base64_to_image(clipboard_data)
if image:
width, _ = image.size
self.logger.log(f"Base64 이미지 크기: {width}px", level=logging.DEBUG)
# 가로 크기가 200픽셀 이상이면 크롭
if width >= 200:
self.logger.log("이미지 가로 크기 200픽셀 이상: 크롭 진행 중...", level=logging.DEBUG)
cropped_image = self.crop_image(image, is_thumb) # 크롭 메서드 사용
# 워터마크 추가
if is_watermark and not is_thumb: # is_thumb가 True라면 워터마크 추가를 건너뜁니다
self.logger.log("워터마크 추가 중...", level=logging.DEBUG)
cropped_watermark_image = self.add_watermark(cropped_image, watermark_text, opacity_percent) # 워터마크 추가
cropped_image = cropped_watermark_image
self.set_image_to_clipboard(cropped_image) # 클립보드에 저장
if path:
self.logger.log("이미지 저장 시도...", level=logging.DEBUG)
self.save_image_to_path(cropped_image, path)
else:
self.logger.log("이미지 가로 크기 200픽셀 이하: 클립보드 비움.", level=logging.DEBUG)
self.clear_clipboard()
else:
self.logger.log("Base64 이미지 변환 실패.", level=logging.DEBUG)
# 2. 클립보드에 이미지가 있을 경우
elif isinstance(clipboard_data, Image.Image):
self.logger.log("[process_clipboard] 클립보드 이미지 확인", level=logging.INFO)
image = clipboard_data
width, _ = image.size
self.logger.log(f"클립보드에 있는 이미지 크기: {width}px", level=logging.DEBUG)
if width >= 200:
self.logger.log("이미지 가로 크기 200픽셀 이상: 크롭 진행 중...", level=logging.DEBUG)
cropped_image = self.crop_image(image, is_thumb) # 크롭 메서드 사용
# 워터마크 추가
if is_watermark and not is_thumb: # is_thumb가 True라면 워터마크 추가를 건너뜁니다
self.logger.log("워터마크 추가 중...", level=logging.DEBUG)
cropped_watermark_image = self.add_watermark(cropped_image, watermark_text, opacity_percent) # 워터마크 추가
cropped_image = cropped_watermark_image
self.set_image_to_clipboard(cropped_image) # 클립보드에 저장
if path:
self.logger.log("이미지 저장 시도...", level=logging.DEBUG)
self.save_image_to_path(cropped_image, path)
else:
self.logger.log("이미지 가로 크기 200픽셀 이하: 클립보드 비움.", level=logging.DEBUG)
self.clear_clipboard()
# 3. 클립보드에 데이터가 없거나 html > whale-ocr 처리
elif clipboard_data == "html > whale-ocr" or clipboard_data is None or not is_success_translated:
if clipboard_data == "html > whale-ocr":
self.logger.log("[process_clipboard] html > whale-ocr 감지 : 이미지 번역 실패 확인", level=logging.INFO)
elif clipboard_data is None:
self.logger.log("[process_clipboard] 클립보드에 이미지 없음", level=logging.INFO)
elif is_success_translated is None:
self.logger.log("[process_clipboard] 번역 실패로 인한 원본이미지 다운로드", level=logging.INFO)
if original_url:
image = self.download_image_from_url(original_url)
if image:
self.logger.log("원본 이미지 다운로드 성공!", level=logging.DEBUG)
self.set_image_to_clipboard(image) # 크롭 없이 저장
if path:
self.logger.log("이미지 저장 시도...", level=logging.DEBUG)
self.save_image_to_path(image, path)
else:
self.logger.log("원본 이미지 다운로드 실패.", level=logging.DEBUG)
else:
self.logger.log("원본 이미지 URL을 찾을 수 없습니다.", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"클립보드에서 이미지를 처리하는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
def process_clipboard_to_save_path(self, original_url, is_success_translated, toggle_states, path=None, is_thumb=False):
"""클립보드의 내용을 처리하고, 필요한 경우 이미지 변환, 크롭 또는 클립보드 비우기"""
try:
is_watermark = toggle_states.get('watermark')
self.logger.log(f"is_watermark : {is_watermark}", level=logging.DEBUG)
watermark_text = toggle_states.get('watermark_text')
self.logger.log(f"watermark_text : {watermark_text}", level=logging.DEBUG)
opacity_percent = toggle_states.get('opacity_percent')
self.logger.log(f"opacity_percent : {opacity_percent}", level=logging.DEBUG)
clipboard_data = self.get_clipboard_data()
self.logger.log("clipboard_data", level=logging.DEBUG)
self.logger.log(f"{clipboard_data}", level=logging.DEBUG)
self.logger.log(f"============================", level=logging.DEBUG)
# 1. 클립보드의 데이터가 Base64 이미지일 경우
if isinstance(clipboard_data, str) and clipboard_data.startswith('data:image'):
self.logger.log("[process_clipboard] data:image 감지 : 이미지 데이터로 변환", level=logging.INFO)
image = self.base64_to_image(clipboard_data)
if image:
width, _ = image.size
self.logger.log(f"Base64 이미지 크기: {width}px", level=logging.DEBUG)
# 가로 크기가 200픽셀 이상이면 크롭
if width >= 200:
self.logger.log("이미지 가로 크기 200픽셀 이상: 크롭 진행 중...", level=logging.DEBUG)
cropped_image = self.crop_image(image, is_thumb) # 크롭 메서드 사용
# 워터마크 추가
if is_watermark and not is_thumb: # is_thumb가 True라면 워터마크 추가를 건너뜁니다
self.logger.log("워터마크 추가 중...", level=logging.DEBUG)
cropped_watermark_image = self.add_watermark(cropped_image, watermark_text, opacity_percent) # 워터마크 추가
cropped_image = cropped_watermark_image
if path:
self.logger.log("이미지 저장 시도...", level=logging.DEBUG)
return self.save_image_to_path(cropped_image, path)
else:
self.set_image_to_clipboard(cropped_image) # 클립보드에 저장
else:
self.logger.log("이미지 가로 크기 200픽셀 이하로 처리불가: 클립보드 비움.", level=logging.DEBUG)
self.clear_clipboard()
else:
self.logger.log("Base64 이미지 변환 실패.", level=logging.DEBUG)
# 2. 클립보드에 이미지가 있을 경우
elif isinstance(clipboard_data, Image.Image):
self.logger.log("[process_clipboard] 클립보드 이미지 확인", level=logging.INFO)
image = clipboard_data
width, _ = image.size
self.logger.log(f"클립보드에 있는 이미지 크기: {width}px", level=logging.DEBUG)
if width >= 200:
self.logger.log("이미지 가로 크기 200픽셀 이상: 크롭 진행 중...", level=logging.DEBUG)
cropped_image = self.crop_image(image, is_thumb) # 크롭 메서드 사용
# 워터마크 추가
if is_watermark and not is_thumb: # is_thumb가 True라면 워터마크 추가를 건너뜁니다
self.logger.log("워터마크 추가 중...", level=logging.DEBUG)
cropped_watermark_image = self.add_watermark(cropped_image, watermark_text, opacity_percent) # 워터마크 추가
cropped_image = cropped_watermark_image
if path:
self.logger.log("이미지 저장 시도...", level=logging.DEBUG)
return self.save_image_to_path(cropped_image, path)
else:
self.set_image_to_clipboard(cropped_image) # 클립보드에 저장
else:
self.logger.log("이미지 가로 크기 200픽셀 이하로 처리불가: 클립보드 비움.", level=logging.DEBUG)
self.clear_clipboard()
# 3. 클립보드에 데이터가 없거나 html > whale-ocr 처리
elif clipboard_data == "html > whale-ocr" or clipboard_data is None or not is_success_translated or clipboard_data.startswith("https://") or clipboard_data.startswith("http://"):
if clipboard_data == "html > whale-ocr":
self.logger.log("[process_clipboard] html > whale-ocr 감지 : 이미지 번역 실패 확인", level=logging.INFO)
elif clipboard_data is None:
self.logger.log("[process_clipboard] 클립보드에 이미지 없음", level=logging.INFO)
elif is_success_translated is None:
self.logger.log("[process_clipboard] 번역 실패로 인한 원본이미지 다운로드", level=logging.INFO)
elif clipboard_data.startswith("https://") or clipboard_data.startswith("http://"):
self.logger.log("[process_clipboard] 타임아웃으로 인한 번역 실패 - 원본이미지 다운로드", level=logging.INFO)
if original_url:
image = self.download_image_from_url(original_url)
if image:
self.logger.log("원본 이미지 다운로드 성공!", level=logging.DEBUG)
if path:
self.logger.log("이미지 저장 시도...", level=logging.DEBUG)
return self.save_image_to_path(image, path)
else:
self.set_image_to_clipboard(cropped_image) # 클립보드에 저장
else:
self.logger.log("원본 이미지 다운로드 실패.", level=logging.DEBUG)
else:
self.logger.log("원본 이미지 URL을 찾을 수 없습니다.", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"클립보드에서 이미지를 처리하는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
def process_clipboard_to_save_path_with_local_hosted_image(self, local_image_path, is_success_translated, toggle_states, path=None, is_thumb=False):
"""클립보드의 내용을 처리하고, 필요한 경우 이미지 변환, 크롭 또는 클립보드 비우기
Returns:
str: 처리된 이미지 파일 경로 (성공 )
str: 원본 이미지 파일 경로 (실패 )
"""
# 매개변수 유효성 검사
if not local_image_path or not os.path.exists(local_image_path):
self.logger.log(f"유효하지 않은 로컬 이미지 경로: {local_image_path}", level=logging.ERROR)
return local_image_path if local_image_path else None
if not toggle_states:
self.logger.log("toggle_states가 제공되지 않았습니다", level=logging.WARNING)
toggle_states = {}
try:
is_watermark = toggle_states.get('watermark', False)
self.logger.log(f"is_watermark : {is_watermark}", level=logging.DEBUG)
watermark_text = toggle_states.get('watermark_text', '')
self.logger.log(f"watermark_text : {watermark_text}", level=logging.DEBUG)
opacity_percent = toggle_states.get('opacity_percent', 20)
self.logger.log(f"opacity_percent : {opacity_percent}", level=logging.DEBUG)
clipboard_data = self.get_clipboard_data()
self.logger.log(f"type(clipboard_data) : {type(clipboard_data)}", level=logging.DEBUG)
self.logger.log(f"============================", level=logging.DEBUG)
# 1. 클립보드의 데이터가 Base64 이미지일 경우
if isinstance(clipboard_data, str) and clipboard_data.startswith('data:image'):
self.logger.log("[process_clipboard] data:image 감지 : 이미지 데이터로 변환", level=logging.INFO)
image = self.base64_to_image(clipboard_data)
if image:
width, _ = image.size
self.logger.log(f"Base64 이미지 크기: {width}px", level=logging.DEBUG)
# 가로 크기가 200픽셀 이상이면 크롭
if width >= 200:
self.logger.log("이미지 가로 크기 200픽셀 이상: 크롭 진행 중...", level=logging.DEBUG)
cropped_image = self.crop_image(image, is_thumb) # 크롭 메서드 사용
# 워터마크 추가
if is_watermark and not is_thumb: # is_thumb가 True라면 워터마크 추가를 건너뜁니다
self.logger.log("워터마크 추가 중...", level=logging.DEBUG)
cropped_watermark_image = self.add_watermark(cropped_image, watermark_text, opacity_percent) # 워터마크 추가
cropped_image = cropped_watermark_image
if path:
self.logger.log("이미지 저장 시도...", level=logging.DEBUG)
saved_path = self.save_image_to_path(cropped_image, path)
return saved_path if saved_path else local_image_path
else:
self.set_image_to_clipboard(cropped_image) # 클립보드에 저장
return local_image_path # path가 없으면 원본 경로 반환
else:
self.logger.log("이미지 가로 크기 200픽셀 이하로 처리불가: 클립보드 비움.", level=logging.DEBUG)
self.clear_clipboard()
return local_image_path
else:
self.logger.log("Base64 이미지 변환 실패.", level=logging.DEBUG)
return local_image_path
# 2. 클립보드에 이미지가 있을 경우
elif isinstance(clipboard_data, Image.Image):
self.logger.log("[process_clipboard] 클립보드 이미지 확인", level=logging.INFO)
image = clipboard_data
width, _ = image.size
self.logger.log(f"클립보드에 있는 이미지 크기: {width}px", level=logging.DEBUG)
if width >= 200:
self.logger.log("이미지 가로 크기 200픽셀 이상: 크롭 진행 중...", level=logging.DEBUG)
cropped_image = self.crop_image(image, is_thumb) # 크롭 메서드 사용
# 워터마크 추가
if is_watermark and not is_thumb: # is_thumb가 True라면 워터마크 추가를 건너뜁니다
self.logger.log("워터마크 추가 중...", level=logging.DEBUG)
cropped_watermark_image = self.add_watermark(cropped_image, watermark_text, opacity_percent) # 워터마크 추가
cropped_image = cropped_watermark_image
if path:
self.logger.log("이미지 저장 시도...", level=logging.DEBUG)
saved_path = self.save_image_to_path(cropped_image, path)
return saved_path if saved_path else local_image_path
else:
self.set_image_to_clipboard(cropped_image) # 클립보드에 저장
return local_image_path # path가 없으면 원본 경로 반환
else:
self.logger.log("이미지 가로 크기 200픽셀 이하로 처리불가: 클립보드 비움.", level=logging.DEBUG)
self.clear_clipboard()
return local_image_path
# 3. 클립보드에 데이터가 없거나 html > whale-ocr 처리
elif clipboard_data == "html > whale-ocr" or clipboard_data is None or not is_success_translated or clipboard_data.startswith("https://") or clipboard_data.startswith("http://"):
if clipboard_data == "html > whale-ocr":
self.logger.log("[process_clipboard] html > whale-ocr 감지 : 이미지 번역 실패 확인", level=logging.INFO)
elif clipboard_data is None:
self.logger.log("[process_clipboard] 클립보드에 이미지 없음", level=logging.INFO)
elif not is_success_translated:
self.logger.log("[process_clipboard] 번역 실패로 인한 원본이미지 사용", level=logging.INFO)
elif clipboard_data.startswith("https://") or clipboard_data.startswith("http://"):
self.logger.log("[process_clipboard] 타임아웃으로 인한 번역 실패 - 원본이미지 사용", level=logging.INFO)
return local_image_path
# 4. 기타 예상하지 못한 클립보드 데이터
else:
self.logger.log(f"[process_clipboard] 예상하지 못한 클립보드 데이터 타입: {type(clipboard_data)}", level=logging.WARNING)
return local_image_path
except Exception as e:
self.logger.log(f"클립보드에서 이미지를 처리하는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return local_image_path # 오류 시 원본 경로 반환
def is_clipboard_image(self):
"""클립보드에 이미지가 있는지 확인하는 함수"""
max_attempts = 5
attempt = 0
while attempt < max_attempts:
try:
win32clipboard.OpenClipboard()
is_clipboard_image_flag = win32clipboard.IsClipboardFormatAvailable(win32clipboard.CF_DIB)
win32clipboard.CloseClipboard()
if is_clipboard_image_flag:
self.logger.log("클립보드에 이미지가 존재합니다.", level=logging.DEBUG)
else:
self.logger.log("클립보드에 이미지가 없습니다.", level=logging.DEBUG)
return is_clipboard_image_flag
except Exception as e:
attempt += 1
# 클립보드가 열려있으면 닫기 시도
try:
win32clipboard.CloseClipboard()
except:
pass
self.logger.log(f"클립보드 이미지 확인 중 오류 발생 (시도 {attempt}/{max_attempts}): {e}", level=logging.WARNING)
if attempt < max_attempts:
time.sleep(0.5) # 0.5초 대기 후 재시도
else:
self.logger.log(f"클립보드 이미지 확인 중 최대 시도 횟수 초과: {e}", level=logging.ERROR, exc_info=True)
return False
def get_image_from_clipboard(self):
"""클립보드에서 이미지를 가져오는 함수"""
max_attempts = 5
attempt = 0
while attempt < max_attempts:
try:
win32clipboard.OpenClipboard()
if self.is_clipboard_image():
dib_data = win32clipboard.GetClipboardData(win32clipboard.CF_DIB)
win32clipboard.CloseClipboard()
image = Image.open(BytesIO(dib_data))
return image
else:
win32clipboard.CloseClipboard()
self.logger.log("클립보드에 이미지가 없습니다.", level=logging.DEBUG)
return None
except Exception as e:
attempt += 1
# 클립보드가 열려있으면 닫기 시도
try:
win32clipboard.CloseClipboard()
except:
pass
self.logger.log(f"클립보드에서 이미지를 가져오는 중 오류 발생 (시도 {attempt}/{max_attempts}): {e}", level=logging.WARNING)
if attempt < max_attempts:
time.sleep(0.5) # 0.5초 대기 후 재시도
else:
self.logger.log(f"클립보드에서 이미지를 가져오는 중 최대 시도 횟수 초과: {e}", level=logging.ERROR, exc_info=True)
return None
def clear_clipboard(self):
"""클립보드를 비우는 함수"""
max_attempts = 5
attempt = 0
success = False
while attempt < max_attempts and not success:
try:
# 먼저 pywinauto로 시도
try:
pywinauto.clipboard.EmptyClipboard()
success = True
except:
# pywinauto 실패 시 win32clipboard로 시도
win32clipboard.OpenClipboard()
win32clipboard.EmptyClipboard()
win32clipboard.CloseClipboard()
success = True
self.logger.log(f"클립보드가 비워졌습니다. (시도 {attempt+1}/{max_attempts})", level=logging.DEBUG)
except Exception as e:
attempt += 1
self.logger.log(f"클립보드를 비우는 중 오류 발생 (시도 {attempt}/{max_attempts}): {e}", level=logging.WARNING)
if attempt < max_attempts:
time.sleep(0.5) # 0.5초 대기 후 재시도
if not success:
self.logger.log("최대 시도 횟수를 초과하여 클립보드를 비우지 못했습니다.", level=logging.ERROR)
def crop_image(self, image, is_thumb=False, crop_percentage=0.01):
"""이미지를 주어진 퍼센트만큼 크롭하는 함수"""
if is_thumb:
crop_percentage = 0.03
self.logger.log(f"썸네일 이미지 이므로 크롭 3%로 조정", level=logging.DEBUG)
width, height = image.size
left = width * crop_percentage
top = height * crop_percentage
right = width * (1 - crop_percentage)
bottom = height * (1 - crop_percentage)
cropped_image = image.crop((left, top, right, bottom))
if self.debug:
# 디버그 모드일 경우 크롭 전후 다양한 비율로 이미지 저장
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
original_image_path = os.path.join(os.getcwd(), f"original_image_{timestamp}.png")
image.save(original_image_path) # 크롭 전 이미지 저장
self.logger.log(f"크롭 전 이미지 저장됨: {original_image_path}", level=logging.DEBUG)
# 1%, 2%, 3% 크롭 이미지 저장
crop_alternatives = [0.01, 0.02, 0.03]
for crop in crop_alternatives:
left_alt = width * crop
top_alt = height * crop
right_alt = width * (1 - crop)
bottom_alt = height * (1 - crop)
cropped_alt_image = image.crop((left_alt, top_alt, right_alt, bottom_alt))
cropped_image_path = os.path.join(os.getcwd(), f"cropped_image_{int(crop*100)}_{timestamp}.png")
cropped_alt_image.save(cropped_image_path)
self.logger.log(f"{int(crop*100)}% 크롭된 이미지 저장됨: {cropped_image_path}", level=logging.DEBUG)
return cropped_image

41
modules/gpt_client.py Normal file
View File

@ -0,0 +1,41 @@
import logging
from openai import OpenAI
import json
import re
import logging
class GPTClient:
def __init__(self, model="gpt-4o-mini", temperature=0.2):
self.client = None
self.model = model
self.temperature = temperature
self.set_client(api_key='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')
def set_client(self, api_key):
self.client = OpenAI(api_key=api_key)
def ask(self, prompt: str) -> dict:
"""프롬프트를 이용하여 GPT 모델로부터 응답을 받습니다. 항상 JSON 형식으로 반환."""
try:
response = self.client.chat.completions.create(
model=self.model,
temperature=self.temperature,
messages=[{"role": "user", "content": prompt}]
)
# GPT 응답 내용 가져오기
content = response.choices[0].message.content.strip()
print(f'GPT 응답: {content}')
# 불필요한 포맷팅 제거 (```json```)
cleaned_content = re.sub(r"^```json|```$", "", content).strip()
# JSON 변환 시도
return json.loads(cleaned_content)
except json.JSONDecodeError as e:
print(f'JSON 디코딩 실패: {e}. 원본 응답: {content}')
return {}
except Exception as e:
print(f'GPT 통신 오류: {e}')
return {}

299
modules/image_processor2.py Normal file
View File

@ -0,0 +1,299 @@
import os
import asyncio
import aiofiles
import logging
from urllib.parse import urlparse
import shutil
import sys
from concurrent.futures import ThreadPoolExecutor
import re
import cv2
import base64
import requests
import numpy as np
from modules.ocr_module import OCRModule
from modules.mask_module import MaskModule
from modules.text_rendering_module import TextRenderingModule
from modules.postImageManager import PostImageManager
class ImageProcessor:
"""이미지 다운로드, OCR, 번역 처리를 담당하는 클래스"""
def __init__(self, logger, gpt_client, base_dir, font_path):
self.logger = logger
self.base_dir = base_dir
self.gpt_client = gpt_client
# OCR 관련
self.inpaint_sv_port = 8080
self.font_path = font_path
self.TEMP_IMAGE_DIR = os.path.join(self.base_dir, "temp_images")
os.makedirs(self.TEMP_IMAGE_DIR, exist_ok=True)
self.ocr_module = OCRModule(logger=self.logger, base_dir=self.base_dir)
self.mask_module = MaskModule(logger=self.logger, base_dir=self.base_dir)
self.text_rendering_module = TextRenderingModule(logger=self.logger, font_path=self.font_path)
self.postImageManager = PostImageManager(logger=self.logger, font_path=self.font_path)
def __del__(self):
"""소멸자에서 리소스 정리"""
self.cleanup()
def cleanup(self):
"""리소스 정리"""
try:
# 임시 폴더 삭제
if hasattr(self, 'TEMP_IMAGE_DIR') and os.path.exists(self.TEMP_IMAGE_DIR):
shutil.rmtree(self.TEMP_IMAGE_DIR)
self.logger.log(f"임시 폴더 삭제됨: {self.TEMP_IMAGE_DIR}", level=logging.INFO)
except Exception as e:
self.logger.log(f"리소스 정리 중 오류: {e}", level=logging.ERROR, exc_info=True)
def is_valid_image_path(self, path):
# http/https 또는 로컬 파일(.jpg, .png 등) 모두 허용
if re.match(r'^(http|https)://.*\\.(jpg|jpeg|png|bmp|gif|webp|tiff?)(\\?.*)?$', path, re.IGNORECASE):
return True
if os.path.isfile(path) and path.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp', '.tif', '.tiff')):
return True
return False
async def process_single_image(self, toggle_states, unwanted_texts, local_image_path, index, file_prefix=""):
"""
단일 이미지를 처리합니다 (다운로드 -> OCR -> 인페인팅)
Args:
toggle_states: 토글 상태 딕셔너리
local_image_path (str): 처리할 이미지 경로
index (int): 이미지 인덱스
unwanted_texts: 치환할 텍스트 딕셔너리
file_prefix (str): 파일명에 추가할 접두사 (: "detail", "option")
Returns:
dict: 처리 결과를 포함한 딕셔너리
- status: 'inpainted', 'original', 'exclude', 'error' 하나
- path: 처리된 이미지 파일 경로 또는 원본 이미지 파일 경로
- error: 오류 메시지 (status가 'error' 경우에만 포함)
"""
ocr_enabled = toggle_states.get('ocr', False)
unwanted_texts = unwanted_texts
try:
ocr_results = self.ocr_module.detect_text(local_image_path)
# 3. 중국어 텍스트 없는 경우 원본 이미지 반환
if not self.ocr_module.filter_chinese_text(ocr_results):
self.logger.log(f"이미지 {index+1} 중국어 텍스트 없음, 원본 이미지 반환", level=logging.INFO)
return local_image_path
# 4. 텍스트 번역 (GPT)
translated_texts = self.gpt_translate_texts(ocr_results, self.gpt_client)
if ocr_enabled:
filtered_translated_texts = self.process_translated_texts(translated_texts, unwanted_texts, local_image_path, index)
if not filtered_translated_texts:
self.logger.log(f"이미지 {index+1} 제외됨", level=logging.INFO)
return None
else:
self.logger.log(f"이미지 {index+1} 치환됨", level=logging.INFO)
# 마스크 생성 (basic 방식만 사용)
masks = self.mask_module.create_masks(
image_path=local_image_path, ocr_results=ocr_results, mask_option="basic"
)
self.logger.log(f"마스크 생성 완료", level=logging.INFO)
# 인페인팅
inpainted_image = self.call_inpaint_api(local_image_path, masks)
self.logger.log(f"인페인팅 완료", level=logging.INFO)
# 텍스트 렌더링
text_rendered_image = self.text_rendering_module.render_text(
inpainted_image, ocr_results, filtered_translated_texts, font_path=self.font_path)
self.logger.log(f"텍스트 렌더링 완료", level=logging.INFO)
# 결과 저장
translated_img_path = await self.postProcess_and_save_image(local_image_path, text_rendered_image, index, file_prefix, toggle_states)
self.logger.log(f"이미지 {index+1} 번역 완료: {translated_img_path}", level=logging.INFO)
return translated_img_path
except Exception as e:
self.logger.log(f"이미지 {index+1} 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
return {'status': 'failed', 'path': local_image_path, 'error': str(e)}
async def postProcess_and_save_image(self, local_image_path, text_rendered_image, index, file_prefix, toggle_states):
"""로컬 서버 URL을 사용해 이미지를 번역하고 로컬에 저장합니다"""
try:
# 파일명에 접두사 포함
if file_prefix:
img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_{file_prefix}_img_{index+1}.png")
else:
img_path = os.path.join(self.TEMP_IMAGE_DIR, f"translated_img_{index+1}.png")
watermarked_image_data = self.postImageManager.add_watermark(image_data=text_rendered_image, watermark_text=toggle_states.get("watermark_text", "워터마크"))
final_image_path = self.postImageManager.save_image_to_path(watermarked_image_data, img_path)
return final_image_path
except Exception as e:
self.logger.log(f"이미지 {index+1} 번역 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
return local_image_path
def is_valid_image_data(self, image_data):
"""이미지 데이터가 유효한지 확인합니다"""
if not image_data or len(image_data) < 100:
return False
# 일반적인 이미지 파일 시그니처 확인
image_signatures = [
b'\xFF\xD8\xFF', # JPEG
b'\x89PNG\r\n\x1a\n', # PNG
b'GIF87a', # GIF87a
b'GIF89a', # GIF89a
b'RIFF', # WebP (RIFF 컨테이너)
]
return any(image_data.startswith(sig) for sig in image_signatures)
def call_inpaint_api(self, image, mask):
"""
인페인팅 API를 호출하여 이미지를 인페인팅합니다.
"""
try:
# 이미지 처리
if isinstance(image, str):
image_np = cv2.imread(image)
if image_np is None:
self.logger.log(f"이미지 로딩 실패: {image}", level=logging.ERROR)
return None
else:
image_np = image
# 마스크 처리
if isinstance(mask, str):
mask_np = cv2.imread(mask, cv2.IMREAD_GRAYSCALE)
if mask_np is None:
self.logger.log(f"마스크 로딩 실패: {mask}", level=logging.ERROR)
return None
else:
mask_np = mask
api_url = f"http://localhost:{self.inpaint_sv_port}/api/v1/inpaint"
_, img_encoded = cv2.imencode('.png', image_np)
_, mask_encoded = cv2.imencode('.png', mask_np)
img_b64 = base64.b64encode(img_encoded).decode('utf-8')
mask_b64 = base64.b64encode(mask_encoded).decode('utf-8')
payload = {
"image": img_b64,
"mask": mask_b64
}
response = requests.post(api_url, json=payload)
if response.status_code != 200:
self.logger.log(f"IOPaint 서버 에러: {response.text}", level=logging.ERROR)
return None
nparr = np.frombuffer(response.content, np.uint8)
result = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
return result
except Exception as e:
self.logger.log(f"인페인팅 API 호출 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None
def process_translated_texts(self, translated_texts, unwanted_texts, local_image_path, index):
"""
번역된 단어 리스트(translated_texts)에서 unwanted_texts의 원본값이
앞이나 뒤에 포함되면 치환값으로 바꿉니다.
치환값이 '이미지삭제'라면 None 반환(이미지 제외)
"""
new_texts = []
for text in translated_texts:
replaced = False
for origin, replace in unwanted_texts.items():
# 앞/뒤에 원본값이 있는지 확인
if text.startswith(origin) or text.endswith(origin):
self.logger.log(f"[{text}] -> [{replace}] (치환)", level=logging.INFO)
if replace == "이미지삭제":
self.logger.log(f"이미지 {index+1} 제외됨: {local_image_path}", level=logging.INFO)
return None
# 앞/뒤 원본값만 치환
if text.startswith(origin):
new = replace + text[len(origin):]
elif text.endswith(origin):
new = text[:-len(origin)] + replace
new_texts.append(new)
replaced = True
break
if not replaced:
new_texts.append(text)
self.logger.log(f"최종 치환 결과: {new_texts}", level=logging.INFO)
return new_texts
async def process_image_list(self, image_urls, delay=1.0, file_prefix="", use_inpainting=False):
"""
이미지 리스트를 순차적으로 처리합니다.
"""
if not image_urls:
self.logger.log("처리할 이미지가 없습니다.", level=logging.INFO)
return []
processing_mode = "인페인팅" if use_inpainting else "웨일 번역"
self.logger.log(f"이미지 {len(image_urls)}개를 {processing_mode} 모드로 처리 시작", level=logging.INFO)
processed_images = []
for i, url in enumerate(image_urls):
self.logger.log(f"이미지 {i+1}/{len(image_urls)} 처리 중... ({processing_mode} 모드)", level=logging.INFO)
result = await self.process_single_image(
url, i, delay, file_prefix, use_inpainting
)
# 결과 처리
if isinstance(result, dict):
status = result.get('status')
path = result.get('path')
if status == 'inpainted':
processed_images.append(path)
self.logger.log(f"이미지 {i+1} 인페인팅 처리 완료", level=logging.INFO)
elif status == 'original':
processed_images.append(path)
self.logger.log(f"이미지 {i+1} 원본 사용", level=logging.INFO)
elif status == 'exclude':
self.logger.log(f"이미지 {i+1} 제외됨", level=logging.INFO)
# 제외된 이미지는 리스트에 추가하지 않음
else: # failed
self.logger.log(f"이미지 {i+1} 처리 실패: {result.get('error', '알 수 없는 오류')}", level=logging.WARNING)
# 실패한 이미지는 원본 경로 추가
processed_images.append(path)
else:
# 이전 버전과의 호환성을 위한 처리
if result:
processed_images.append(result)
self.logger.log(f"이미지 처리 완료: 총 {len(processed_images)}개 ({processing_mode} 모드)", level=logging.INFO)
return processed_images
def gpt_translate_texts(self, ocr_results, gpt_client):
texts = [result['text'] for result in ocr_results]
if not texts:
return []
prompt = (
"다음 중국어 문장들을 한국어로 자연스럽고 의미가 잘 전달되게 번역해줘. "
"순서와 개수는 반드시 그대로 유지하고, 결과는 JSON 배열(리스트)로만 반환해. "
"중국어 리스트:\n" +
str(texts)
)
response = gpt_client.ask(prompt)
if isinstance(response, list):
return response
elif isinstance(response, dict) and 'result' in response:
return response['result']
else:
print("GPT 번역 결과 파싱 실패, 원본 반환")
return texts

View File

@ -0,0 +1,104 @@
import random
import socket
import uvicorn
from fastapi import FastAPI, Query, Body
from pydantic import BaseModel
from typing import List, Optional
import asyncio
from concurrent.futures import ThreadPoolExecutor
from modules.image_processor2 import ImageProcessor
# 포트 범위 설정
PORT_RANGE = (7000, 7000)
# 사용 가능한 포트 찾기
def find_free_port():
for _ in range(20):
port = random.randint(*PORT_RANGE)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(("127.0.0.1", port))
return port
except OSError:
continue
raise RuntimeError("사용 가능한 포트를 찾을 수 없습니다.")
# 요청 모델 정의
class ImageRequest(BaseModel):
local_image_path: str
file_prefix: Optional[str] = ""
use_inpainting: Optional[bool] = False
toggle_states: Optional[dict] = None
unwanted_texts: Optional[dict] = None
watermark_text: Optional[str] = None
watermark_opacity: Optional[float] = None
class ImagesRequest(BaseModel):
local_image_paths: List[str]
file_prefix: Optional[str] = ""
use_inpainting: Optional[bool] = False
toggle_states: Optional[dict] = None
unwanted_texts: Optional[dict] = None
watermark_text: Optional[str] = None
watermark_opacity: Optional[float] = None
# FastAPI 앱 생성
def create_app(image_processor: ImageProcessor, max_workers: int = 2):
app = FastAPI()
executor = ThreadPoolExecutor(max_workers=max_workers)
@app.post("/translate_image")
async def translate_image(req: ImageRequest):
# 워터마크 관련 옵션을 toggle_states에 병합
toggle_states = req.toggle_states.copy() if req.toggle_states else {}
if req.watermark_text is not None:
toggle_states["watermark_text"] = req.watermark_text
if req.watermark_opacity is not None:
toggle_states["watermark_opacity"] = req.watermark_opacity
# 단일 이미지 번역
result = await image_processor.process_single_image(
toggle_states, req.unwanted_texts or {}, req.local_image_path, 0, req.file_prefix
)
# 경로만 반환
if isinstance(result, dict):
return {"result": result.get("path", None)}
return {"result": result}
@app.post("/translate_images")
async def translate_images(req: ImagesRequest):
# 워터마크 관련 옵션을 toggle_states에 병합
toggle_states = req.toggle_states.copy() if req.toggle_states else {}
if req.watermark_text is not None:
toggle_states["watermark_text"] = req.watermark_text
if req.watermark_opacity is not None:
toggle_states["watermark_opacity"] = req.watermark_opacity
# 여러 이미지 병렬 번역
loop = asyncio.get_event_loop()
tasks = []
sem = asyncio.Semaphore(max_workers)
async def sem_task(idx, path):
async with sem:
return await image_processor.process_single_image(
toggle_states, req.unwanted_texts or {}, path, idx, req.file_prefix
)
for idx, path in enumerate(req.local_image_paths):
tasks.append(sem_task(idx, path))
results = await asyncio.gather(*tasks)
# 경로만 리스트로 반환
def extract_path(res):
if isinstance(res, dict):
return res.get("path", None)
return res
return {"results": [extract_path(r) for r in results]}
return app
# 서버 실행 함수
def run_server(image_processor, max_workers=2):
port = find_free_port()
app = create_app(image_processor, max_workers)
uvicorn.run(app, host="127.0.0.1", port=port, workers=1)
# FastAPI의 workers는 프로세스 수이므로, 내부 병렬은 ThreadPoolExecutor로 제어
# 실제 워커 수는 process_single_image 병렬 호출로 제한
# 서버 실행 후 포트 정보 반환 가능
return port

BIN
modules/img/1.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 102 KiB

BIN
modules/img/2.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 380 KiB

BIN
modules/img/3.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 261 KiB

BIN
modules/img/4.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 112 KiB

BIN
modules/img/5.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 238 KiB

BIN
modules/img/6.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 352 KiB

BIN
modules/img/7.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 165 KiB

View File

@ -0,0 +1,27 @@
import numpy as np
import requests
import cv2
import base64
class IOPaintInpainting:
"""IOPaint 서버 연동 인페인팅 모델 (REST API /api/v1/inpaint 사용, 바이너리 PNG 반환)"""
def __init__(self, server_url="http://localhost:8080"):
self.api_url = f"http://localhost:8080/api/v1/inpaint"
def inpaint(self, image: np.ndarray, mask: np.ndarray, api_url:str = 'http://localhost:8080/api/v1/inpaint', ) -> np.ndarray:
# 이미지를 base64로 인코딩
_, img_encoded = cv2.imencode('.png', image)
_, mask_encoded = cv2.imencode('.png', mask)
img_b64 = base64.b64encode(img_encoded).decode('utf-8')
mask_b64 = base64.b64encode(mask_encoded).decode('utf-8')
payload = {
"image": img_b64,
"mask": mask_b64
}
response = requests.post(api_url, json=payload)
if response.status_code != 200:
print("IOPaint 서버 에러:", response.text)
return None
# 응답이 바이너리 PNG 이미지이므로 바로 디코딩
nparr = np.frombuffer(response.content, np.uint8)
result = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
return result

199
modules/iop_Manager.py Normal file
View File

@ -0,0 +1,199 @@
import numpy as np
import requests
import cv2
import base64
import subprocess
import random
import time
import threading
import os
import logging
class IOPaintManager:
"""IOPaint 서버 인스턴스 및 인페인팅 요청을 통합 관리하는 매니저"""
class ServerInstance:
def __init__(self, port, process):
self.port = port
self.process = process
self.busy = False
self.last_used = time.time()
def mark_busy(self):
self.busy = True
self.last_used = time.time()
def mark_idle(self):
self.busy = False
self.last_used = time.time()
def is_alive(self):
return self.process.poll() is None
def __init__(self, logger, num_instances=1, port_range=(8099, 8199), base_dir=None, wait_ready=30, model_dir=None):
self.logger = logger
self.instances = []
self.port_range = port_range
self.lock = threading.Lock()
self.base_dir = base_dir or os.getcwd()
self.model_dir = model_dir or os.path.join(self.base_dir, 'iop', 'models')
self.exe_path = os.path.join(self.base_dir, 'iop', 'iop.exe')
self._start_instances(num_instances, wait_ready)
def _get_random_port(self):
used_ports = {inst.port for inst in self.instances}
candidates = [p for p in range(self.port_range[0], self.port_range[1]+1) if p not in used_ports]
if not candidates:
self.logger.log("사용 가능한 포트가 없습니다.", level=logging.ERROR)
raise RuntimeError("사용 가능한 포트가 없습니다.")
return random.choice(candidates)
def wait_for_server_ready(self, port, timeout=30):
url = f"http://localhost:{port}/api/v1/server-config"
start = time.time()
last_error = None
self.logger.log(f"[{port}] 서버 준비 체크 시작 (최대 {timeout}초 대기)", level=logging.INFO)
tries = 0
while time.time() - start < timeout:
tries += 1
try:
r = requests.get(url, timeout=2)
self.logger.log(f"응답 : {r}", level=logging.INFO)
if r.status_code == 200:
elapsed = time.time() - start
self.logger.log(f"[{port}] 서버 준비 완료! (시도 {tries}회, {elapsed:.1f}초 소요)", level=logging.INFO)
return True
else:
self.logger.log(f"[{port}] 응답 코드: {r.status_code}", level=logging.INFO)
except Exception as e:
last_error = str(e)
self.logger.log(f"[{port}] 준비 체크 실패 (시도 {tries}회): {last_error}", level=logging.ERROR, exc_info=True)
time.sleep(0.5)
self.logger.log(f"[{port}] 서버 준비 실패 (총 {tries}회 시도, 마지막 에러: {last_error})", level=logging.ERROR, exc_info=True)
return False
def _start_instances(self, num, wait_ready):
self.logger.log(f"IOPaint 인스턴스 {num} 개 시작", level=logging.INFO)
for _ in range(num):
port = self._get_random_port()
cmd = [self.exe_path, 'start', '--model=lama', '--device=cpu', '--port', str(port), '--model-dir', self.model_dir]
self.logger.log(f"[{port}] 인스턴스 실행 명령: {' '.join(cmd)}", level=logging.INFO)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
instance = self.ServerInstance(port, proc)
self.instances.append(instance)
start_wait = 8
time.sleep(start_wait)
self.logger.log(f"[{port}] 인스턴스 실행 명시대기: {start_wait}", level=logging.INFO)
if self.wait_for_server_ready(port, timeout=wait_ready):
self.logger.log(f"IOPaint 인스턴스 {instance.port} 준비됨", level=logging.INFO)
else:
self.logger.log(f"IOPaint 인스턴스 {instance.port} 시작 실패", level=logging.ERROR)
# 에러 메시지 출력
try:
out, err = proc.communicate(timeout=3)
self.logger.log(f"[{port}] 표준출력:\n{out.decode(errors='ignore')}", level=logging.INFO)
self.logger.log(f"[{port}] 표준에러:\n{err.decode(errors='ignore')}", level=logging.INFO)
except Exception as e:
self.logger.log(f"[{port}] 에러 메시지 읽기 실패: {e}", level=logging.ERROR)
def get_instance_info(self):
"""모든 인스턴스의 정보를 반환"""
info = []
for inst in self.instances:
info.append({
"port": inst.port,
"busy": inst.busy,
"alive": inst.is_alive(),
"last_used": inst.last_used
})
return info
def get_idle_instance(self):
"""놀고 있는(사용 가능한) 인스턴스 반환 (없으면 None)"""
with self.lock:
for inst in self.instances:
if not inst.busy and inst.is_alive():
inst.mark_busy()
self.logger.log(f"IOPaint 인스턴스 {inst.port} 사용 중", level=logging.INFO)
return inst
return None
def mark_instance_idle(self, port):
"""작업이 끝난 인스턴스를 idle로 표시"""
for inst in self.instances:
if inst.port == port:
inst.mark_idle()
self.logger.log(f"IOPaint 인스턴스 {inst.port} 유휴", level=logging.INFO)
break
def shutdown_all(self):
"""모든 서버 인스턴스 종료"""
for inst in self.instances:
if inst.is_alive():
inst.process.terminate()
self.logger.log(f"IOPaint 인스턴스 {inst.port} 종료", level=logging.INFO)
self.instances = []
self.logger.log("모든 IOPaint 인스턴스 종료", level=logging.INFO)
def inpaint(self, image, mask, instance=None) -> np.ndarray:
"""image와 mask를 경로나 np.ndarray 모두 지원"""
# 이미지 처리
if isinstance(image, str):
image_np = cv2.imread(image)
if image_np is None:
self.logger.log(f"이미지 로딩 실패: {image}", level=logging.ERROR)
return None
else:
image_np = image
# 마스크 처리
if isinstance(mask, str):
mask_np = cv2.imread(mask, cv2.IMREAD_GRAYSCALE)
if mask_np is None:
self.logger.log(f"마스크 로딩 실패: {mask}", level=logging.ERROR)
return None
else:
mask_np = mask
if instance is None:
instance = self.get_idle_instance()
if instance is None:
self.logger.log("사용 가능한 IOPaint 인스턴스가 없습니다.", level=logging.ERROR)
return None
api_url = f"http://localhost:{instance.port}/api/v1/inpaint"
self.logger.log(f"IOPaint 인스턴스 {instance.port} 사용", level=logging.INFO)
try:
_, img_encoded = cv2.imencode('.png', image_np)
_, mask_encoded = cv2.imencode('.png', mask_np)
img_b64 = base64.b64encode(img_encoded).decode('utf-8')
mask_b64 = base64.b64encode(mask_encoded).decode('utf-8')
payload = {
"image": img_b64,
"mask": mask_b64
}
response = requests.post(api_url, json=payload)
if response.status_code != 200:
self.logger.log(f"IOPaint 서버 에러: {response.text}", level=logging.ERROR)
return None
nparr = np.frombuffer(response.content, np.uint8)
result = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
return result
finally:
self.mark_instance_idle(instance.port)
def add_instance(self, wait_ready=30):
port = self._get_random_port()
cmd = [self.exe_path, 'start', '--model=lama', '--device=cpu', '--port', str(port), '--model-dir', self.model_dir]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
instance = self.ServerInstance(port, proc)
self.instances.append(instance)
if self.wait_for_server_ready(port, timeout=wait_ready):
self.logger.log(f"IOPaint 인스턴스 {instance.port} 시작", level=logging.INFO)
else:
self.logger.log(f"IOPaint 인스턴스 {instance.port} 시작 실패", level=logging.ERROR)
return instance
# if __name__ == '__main__':
# manager = IOPaintManager(num_instances=1)
# # result = manager.inpaint(image, mask) # 자동으로 idle 인스턴스에 요청
# print(manager.get_instance_info())
# manager.shutdown_all()

View File

@ -0,0 +1,131 @@
import os
import socket
import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler
import logging
class LocalImageServer:
"""로컬 이미지 파일을 웹에서 접근 가능하도록 하는 HTTP 서버"""
def __init__(self, logger, image_dir, port=8000):
self.logger = logger
self.image_dir = os.path.abspath(image_dir) # 절대 경로로 변환
self.original_cwd = os.getcwd() # 원래 작업 디렉토리 저장
self.port = self.find_available_port(port)
self.server = None
self.server_thread = None
def find_available_port(self, start_port=8000, max_port=8100):
"""사용 가능한 포트를 찾습니다"""
for port in range(start_port, max_port + 1):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('localhost', port))
return port
except OSError:
continue
raise RuntimeError(f"포트 {start_port}-{max_port} 범위에서 사용 가능한 포트를 찾을 수 없습니다.")
def start_server(self):
"""HTTP 서버를 시작합니다"""
if self.server_thread and self.server_thread.is_alive():
self.logger.log(f"로컬 이미지 서버가 이미 포트 {self.port}에서 실행 중입니다.", level=logging.DEBUG)
return
# 이미지 디렉토리 존재 확인
if not os.path.exists(self.image_dir):
try:
os.makedirs(self.image_dir, exist_ok=True)
self.logger.log(f"이미지 디렉토리 생성: {self.image_dir}", level=logging.INFO)
except Exception as e:
self.logger.log(f"이미지 디렉토리 생성 실패: {e}", level=logging.ERROR)
raise
try:
# 작업 디렉토리 변경 없이 CustomHandler에서 직접 경로 처리
class CustomHandler(SimpleHTTPRequestHandler):
def __init__(self, *args, image_directory=None, **kwargs):
self.image_directory = image_directory
super().__init__(*args, **kwargs)
def translate_path(self, path):
"""요청 경로를 이미지 디렉토리 내의 실제 파일 경로로 변환"""
# 기본 translate_path 호출하여 상대 경로 얻기
path = super().translate_path(path)
# 현재 작업 디렉토리 대신 이미지 디렉토리 사용
rel_path = os.path.relpath(path, os.getcwd())
return os.path.join(self.image_directory, rel_path)
def log_message(self, format, *args):
# 로그 출력을 비활성화 (너무 많은 로그 방지)
pass
def end_headers(self):
# CORS 헤더 추가
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
super().end_headers()
# 핸들러에 이미지 디렉토리 전달
def handler_factory(*args, **kwargs):
return CustomHandler(*args, image_directory=self.image_dir, **kwargs)
self.server = HTTPServer(('localhost', self.port), handler_factory)
self.server_thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.server_thread.start()
self.logger.log(f"로컬 이미지 서버가 포트 {self.port}에서 시작되었습니다. (디렉토리: {self.image_dir})", level=logging.INFO)
except Exception as e:
self.logger.log(f"로컬 웹서버 시작 실패: {e}", level=logging.ERROR)
# 실패 시 상태 정리
self.server = None
self.server_thread = None
raise
def stop_server(self):
"""HTTP 서버를 중지합니다"""
if self.server:
try:
self.server.shutdown()
self.server.server_close()
if self.server_thread and self.server_thread.is_alive():
self.server_thread.join(timeout=5)
self.logger.log("로컬 이미지 서버가 중지되었습니다.", level=logging.INFO)
except Exception as e:
self.logger.log(f"로컬 웹서버 중지 중 오류 발생: {e}", level=logging.ERROR)
finally:
self.server = None
self.server_thread = None
def restart_server(self):
"""서버를 재시작합니다"""
self.logger.log("로컬 이미지 서버 재시작 중...", level=logging.INFO)
self.stop_server()
# 새로운 포트 찾기
self.port = self.find_available_port(self.port)
self.start_server()
def get_base_url(self):
"""서버의 기본 URL을 반환합니다"""
return f"http://localhost:{self.port}"
def is_running(self):
"""서버가 실행 중인지 확인합니다"""
return self.server is not None and self.server_thread and self.server_thread.is_alive()
def get_file_url(self, filename):
"""특정 파일의 URL을 반환합니다"""
if not self.is_running():
self.logger.log("서버가 실행되지 않았습니다.", level=logging.WARNING)
return None
return f"{self.get_base_url()}/{filename}"
def __del__(self):
"""소멸자에서 서버 정리"""
try:
self.stop_server()
except:
pass

156
modules/loggerModule.py Normal file
View File

@ -0,0 +1,156 @@
import logging
from logging.handlers import RotatingFileHandler, BaseRotatingHandler
import os
import glob
import traceback
import inspect
class Logger1():
def __init__(self, log_file="ITServer.log", logger_name="MainLogger",
file_log_level=logging.DEBUG):
"""
Logger 초기화
:param log_file: 로그 파일 이름
:param logger_name: 로거 이름
:param file_log_level: 파일 로거의 로그 레벨
"""
super().__init__()
self.file_log_level = file_log_level
# 로그 설정
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(file_log_level) # 파일 로거 레벨 설정
# 포맷 설정
self.simple_format = "[%(asctime)s] [%(levelname)s] %(message)s"
self.detailed_format = (
"[%(asctime)s] [%(threadName)s] [%(levelname)s] "
"[%(filename)s:%(funcName)s:%(lineno)d] %(message)s"
)
# 핸들러 추가
self._add_console_handler(file_log_level)
self._add_file_handler(log_file, file_log_level)
def _add_console_handler(self, level):
"""콘솔 핸들러 추가"""
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
formatter = logging.Formatter(
self.detailed_format if level <= logging.DEBUG else self.simple_format
)
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
def _add_file_handler(self, log_file, level):
"""파일 핸들러 추가"""
# 확장자가 .log가 아니면 .log로 변경
if not log_file.endswith('.log'):
base_name, _ = os.path.splitext(log_file)
log_file = base_name + '.log'
# 커스텀 로테이팅 핸들러 사용
file_handler = CustomRotatingFileHandler(
log_file, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8"
)
file_handler.setLevel(level)
formatter = logging.Formatter(
self.detailed_format if level <= logging.DEBUG else self.simple_format
)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def log(self, message, level=logging.INFO, exc_info=False):
"""로그 메시지 기록"""
if exc_info:
message = f"{message}\n{traceback.format_exc()}"
# 호출 위치 정보를 동적으로 추출
caller_frame = logging.currentframe().f_back
record = self.logger.makeRecord(
self.logger.name, level, caller_frame.f_code.co_filename,
caller_frame.f_lineno, message, None, None, caller_frame.f_code.co_name
)
# 파일 로거에 메시지 전달
if level >= self.file_log_level:
self.logger.handle(record)
class CustomRotatingFileHandler(BaseRotatingHandler):
"""로그 파일을 모두 .log 확장자로 생성하는 커스텀 핸들러"""
def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None):
super().__init__(filename, mode, encoding)
self.maxBytes = maxBytes
self.backupCount = backupCount
# 기존 로그 파일 확인 및 인덱스 설정
self._base_filename = filename
self._extension = '.log'
def doRollover(self):
"""롤오버 수행 - 파일이 최대 크기에 도달하면 새 파일 생성"""
if self.stream:
self.stream.close()
self.stream = None
# 기존 로그 파일 이름을 기반으로 백업 파일 생성
base_name, ext = os.path.splitext(self._base_filename)
# 현재 디렉토리의 모든 로그 파일 확인
log_dir = os.path.dirname(self._base_filename) or '.'
existing_logs = glob.glob(f"{base_name}*.log")
existing_logs.sort()
# 최대 백업 수 초과하는 파일 제거
while len(existing_logs) >= self.backupCount:
try:
oldest_file = existing_logs.pop(0)
os.remove(oldest_file)
except Exception:
pass
# 새 로그 파일 이름 생성 (주 로그 파일 이름은 그대로 유지)
# 예: log.log, log_1.log, log_2.log, ...
if os.path.exists(self._base_filename):
# 인덱스가 있는 로그 파일들 찾기
indexed_logs = [f for f in existing_logs if f != self._base_filename]
max_index = 0
for log_file in indexed_logs:
try:
# 파일 이름에서 인덱스 부분 추출
name_part = os.path.basename(log_file)
name_without_ext = os.path.splitext(name_part)[0]
if '_' in name_without_ext:
idx_str = name_without_ext.split('_')[-1]
if idx_str.isdigit():
max_index = max(max_index, int(idx_str))
except Exception:
pass
# 새 인덱스로 파일 이름 설정
new_index = max_index + 1
new_log_file = f"{base_name}_{new_index}.log"
# 기존 파일 이름 변경
try:
os.rename(self._base_filename, new_log_file)
except Exception:
pass
# 스트림 다시 열기
self.mode = 'w'
self.stream = self._open()
def shouldRollover(self, record):
"""롤오버가 필요한지 확인"""
if self.stream is None: # 첫 번째 로그 쓰기 시도
self.stream = self._open()
if self.maxBytes > 0: # 최대 크기가 지정된 경우만 검사
self.stream.seek(0, 2) # 파일 끝으로 이동
if self.stream.tell() >= self.maxBytes:
return True
return False

44
modules/mask_module.py Normal file
View File

@ -0,0 +1,44 @@
import cv2
import numpy as np
from typing import List, Dict, Any
from shapely.geometry import Polygon
import logging
class MaskModule:
def __init__(self, logger, base_dir):
self.logger = logger
self.base_dir = base_dir
self.logger.log("마스크 모듈 초기화 완료", level=logging.INFO)
def create_masks(self, image_path: str, ocr_results: List[Dict], expansion_size: int = 10, blur_size: int = 15, mask_option: str = "basic") -> np.ndarray:
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"이미지를 읽을 수 없습니다: {image_path}", level=logging.ERROR)
return None
height, width = image.shape[:2]
mask = np.zeros((height, width), dtype=np.uint8)
for i, result in enumerate(ocr_results, 1):
polygon = result['polygon']
expanded_poly = self.expand_polygon(polygon, offset=5)
cv2.fillPoly(mask, [expanded_poly], 255)
processed_mask = self.process_mask(mask, expansion_size, blur_size)
return processed_mask
def expand_polygon(self, polygon, offset=15):
poly = Polygon(polygon)
expanded = poly.buffer(offset)
if expanded.is_empty:
return np.array(polygon, dtype=np.int32)
return np.array(expanded.exterior.coords, dtype=np.int32)
def process_mask(self, mask: np.ndarray, expansion_size: int = 5, blur_size: int = 3) -> np.ndarray:
processed_mask = mask.copy()
if expansion_size > 0:
kernel = np.ones((expansion_size, expansion_size), np.uint8)
processed_mask = cv2.dilate(processed_mask, kernel, iterations=1)
if blur_size > 0:
blur_size = blur_size if blur_size % 2 == 1 else blur_size + 1
processed_mask = cv2.GaussianBlur(processed_mask, (blur_size, blur_size), 0)
return processed_mask

326
modules/ocr_module.py Normal file
View File

@ -0,0 +1,326 @@
import cv2
import numpy as np
import os
import logging
from typing import List, Dict, Any
class OCRModule:
def __init__(self, logger=None, base_dir=None):
self.logger = logger
self.base_dir = base_dir
# CPU만 사용하도록 환경 변수 설정
os.environ['CUDA_VISIBLE_DEVICES'] = ''
self.ocr = None
self.ocr = self.initialize_ocr()
if self.ocr is None:
raise Exception("PaddleOCR 초기화 실패")
def initialize_ocr(self):
"""
PaddleOCR 초기화. det_enabled 옵션에 따라 Detection 모델 사용 여부 결정.
"""
# 모델 디렉토리 설정
self.rec_model_dir = os.path.join(self.base_dir, "modules", "PP_Models", "rec")
self.det_model_dir = os.path.join(self.base_dir, "modules", "PP_Models", "det")
self.cls_model_dir = os.path.join(self.base_dir, "modules", "PP_Models", "cls")
try:
from paddleocr import PaddleOCR
ocr = PaddleOCR(
use_gpu=False,
use_angle_cls=True, # 텍스트 방향 분류 활성화
lang="ch",
det_model_dir=self.det_model_dir,
rec_model_dir=self.rec_model_dir,
cls_model_dir=self.cls_model_dir
)
return ocr
except Exception as e:
self.logger.log(f"❌ PaddleOCR 초기화 실패: {e}", level=logging.ERROR, exc_info=True)
# raise e # 에러 발생시 프로그램 종료
return None
def detect_text(self, image_path: str, method: str = 'polygon') -> List[Dict[str, Any]]:
"""
이미지에서 텍스트를 감지하고 다양한 방식으로 영역 반환
Args:
image_path (str): 이미지 파일 경로
method (str): 감지 방식 ('polygon', 'bbox', 'expanded_bbox', 'rotated_bbox', 'contour')
Returns:
List[Dict]: 감지된 텍스트 정보 리스트
- text: 감지된 텍스트
- confidence: 신뢰도
- polygon: 폴리곤 좌표 (4 )
- bbox: 바운딩 박스 좌표 (x, y, w, h)
- method: 사용된 감지 방식
"""
if not os.path.exists(image_path):
self.logger.log(f"이미지 파일을 찾을 수 없습니다: {image_path}", level=logging.ERROR)
return []
try:
# 이미지 읽기
image = cv2.imread(image_path)
if image is None:
self.logger.log(f"이미지를 읽을 수 없습니다: {image_path}", level=logging.ERROR)
return []
self.logger.log(f"🔍 OCR 감지 방식: {method}", level=logging.INFO)
# 실제 OCR 실행
# ocr_raw_results = self.ocr.predict(image)
ocr_raw_results = self.ocr.ocr(image)
self.logger.log(f"ocr_raw_results: {ocr_raw_results}", level=logging.INFO)
for line in ocr_raw_results:
self.logger.log(f"line: {line}", level=logging.INFO)
if not ocr_raw_results or len(ocr_raw_results) == 0:
self.logger.log("⚠️ OCR 결과가 비어있습니다.", level=logging.WARNING)
return []
# paddleocr 2.x 결과 파싱
converted_results = []
for page in ocr_raw_results: # page는 텍스트별 결과 리스트
for line in page:
poly = line[0]
text = line[1][0]
score = line[1][1]
converted_results.append([poly, [text, score]])
# 감지 방식에 따라 결과 처리
if method == 'polygon':
ocr_results = self._detect_with_polygon(image, converted_results)
elif method == 'bbox':
ocr_results = self._detect_with_bbox(image, converted_results)
elif method == 'expanded_bbox':
ocr_results = self._detect_with_expanded_bbox(image, converted_results)
elif method == 'rotated_bbox':
ocr_results = self._detect_with_rotated_bbox(image, converted_results)
elif method == 'contour':
ocr_results = self._detect_with_contour(image, converted_results)
else:
self.logger.log(f"⚠️ 지원하지 않는 감지 방식: {method}, 기본 polygon 방식 사용", level=logging.WARNING)
ocr_results = self._detect_with_polygon(image, converted_results)
return ocr_results
except Exception as e:
self.logger.log(f"❌ OCR 처리 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return []
def filter_chinese_text(self, ocr_results: List[Dict]) -> List[Dict]:
"""
중국어 텍스트만 필터링
Args:
ocr_results (List[Dict]): OCR 결과
Returns:
List[Dict]: 중국어 텍스트만 포함된 결과
"""
chinese_results = []
for result in ocr_results:
text = result['text']
# 중국어 문자 범위 확인 (간체/번체 포함)
if any('\u4e00' <= char <= '\u9fff' for char in text):
chinese_results.append(result)
self.logger.log(f"중국어 텍스트 {len(chinese_results)}개 필터링 완료", level=logging.INFO)
return chinese_results
def _detect_with_polygon(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
"""폴리곤 방식으로 텍스트 영역 감지 (기본 방식)"""
ocr_results = []
for line in ocr_raw_results:
if len(line) >= 2:
polygon = line[0] # 폴리곤 좌표 (4개 점)
text_info = line[1] # (텍스트, 신뢰도)
if len(text_info) >= 2:
text = text_info[0]
confidence = text_info[1]
# 폴리곤을 바운딩 박스로 변환
polygon_np = np.array(polygon, dtype=np.int32)
x, y, w, h = cv2.boundingRect(polygon_np)
ocr_result = {
'text': text,
'confidence': confidence,
'polygon': polygon,
'bbox': (x, y, w, h),
'method': 'polygon'
}
ocr_results.append(ocr_result)
return ocr_results
def _detect_with_bbox(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
"""바운딩 박스 방식으로 텍스트 영역 감지"""
ocr_results = []
for line in ocr_raw_results:
if len(line) >= 2:
polygon = line[0]
text_info = line[1]
if len(text_info) >= 2:
text = text_info[0]
confidence = text_info[1]
# 바운딩 박스 계산
polygon_np = np.array(polygon, dtype=np.int32)
x, y, w, h = cv2.boundingRect(polygon_np)
# 바운딩 박스를 폴리곤으로 변환
bbox_polygon = [
[x, y],
[x + w, y],
[x + w, y + h],
[x, y + h]
]
ocr_result = {
'text': text,
'confidence': confidence,
'polygon': bbox_polygon,
'bbox': (x, y, w, h),
'method': 'bbox'
}
ocr_results.append(ocr_result)
return ocr_results
def _detect_with_expanded_bbox(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
"""확장된 바운딩 박스 방식으로 텍스트 영역 감지"""
ocr_results = []
h_img, w_img = image.shape[:2]
for line in ocr_raw_results:
if len(line) >= 2:
polygon = line[0]
text_info = line[1]
if len(text_info) >= 2:
text = text_info[0]
confidence = text_info[1]
# 기본 바운딩 박스
polygon_np = np.array(polygon, dtype=np.int32)
x, y, w, h = cv2.boundingRect(polygon_np)
# 확장 크기 계산 (텍스트 크기의 20%)
expand_x = max(1, int(w * 0.2))
expand_y = max(1, int(h * 0.2))
# 확장된 바운딩 박스
x_exp = max(0, x - expand_x)
y_exp = max(0, y - expand_y)
w_exp = min(w_img - x_exp, w + 2 * expand_x)
h_exp = min(h_img - y_exp, h + 2 * expand_y)
# 확장된 바운딩 박스를 폴리곤으로 변환
expanded_polygon = [
[x_exp, y_exp],
[x_exp + w_exp, y_exp],
[x_exp + w_exp, y_exp + h_exp],
[x_exp, y_exp + h_exp]
]
ocr_result = {
'text': text,
'confidence': confidence,
'polygon': expanded_polygon,
'bbox': (x_exp, y_exp, w_exp, h_exp),
'method': 'expanded_bbox'
}
ocr_results.append(ocr_result)
return ocr_results
def _detect_with_rotated_bbox(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
"""회전된 바운딩 박스 방식으로 텍스트 영역 감지"""
ocr_results = []
for line in ocr_raw_results:
if len(line) >= 2:
polygon = line[0]
text_info = line[1]
if len(text_info) >= 2:
text = text_info[0]
confidence = text_info[1]
# 회전된 바운딩 박스 계산
polygon_np = np.array(polygon, dtype=np.float32)
rect = cv2.minAreaRect(polygon_np)
box = cv2.boxPoints(rect)
box = np.int32(box)
# 일반 바운딩 박스도 계산
x, y, w, h = cv2.boundingRect(polygon_np.astype(np.int32))
ocr_result = {
'text': text,
'confidence': confidence,
'polygon': box.tolist(),
'bbox': (x, y, w, h),
'method': 'rotated_bbox',
'rotation_info': {
'center': rect[0],
'size': rect[1],
'angle': rect[2]
}
}
ocr_results.append(ocr_result)
return ocr_results
def _detect_with_contour(self, image: np.ndarray, ocr_raw_results: List) -> List[Dict[str, Any]]:
"""컨투어 방식으로 텍스트 영역 감지"""
ocr_results = []
for line in ocr_raw_results:
if len(line) >= 2:
polygon = line[0]
text_info = line[1]
if len(text_info) >= 2:
text = text_info[0]
confidence = text_info[1]
# 폴리곤을 컨투어로 변환
polygon_np = np.array(polygon, dtype=np.int32)
# 컨투어 근사화
epsilon = 0.02 * cv2.arcLength(polygon_np, True)
approx_contour = cv2.approxPolyDP(polygon_np, epsilon, True)
# 컨투어를 다시 폴리곤으로 변환
contour_polygon = approx_contour.reshape(-1, 2).tolist()
# 바운딩 박스 계산
x, y, w, h = cv2.boundingRect(polygon_np)
ocr_result = {
'text': text,
'confidence': confidence,
'polygon': contour_polygon,
'bbox': (x, y, w, h),
'method': 'contour',
'contour_points': len(contour_polygon)
}
ocr_results.append(ocr_result)
return ocr_results

160
modules/postImageManager.py Normal file
View File

@ -0,0 +1,160 @@
from PIL import Image, ImageFont, ImageDraw
import requests
import numpy as np
import cv2
import os
from datetime import datetime
import logging
class PostImageManager:
def __init__(self, logger, font_path):
self.logger = logger
self.font_path = font_path
# 폰트 로드
self.font_load()
def font_load(self):
# 폰트 로드
try:
self.font = ImageFont.truetype(self.font_path, 36)
self.logger.log(f"폰트 로드 성공: {self.font_path}", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"커스텀 폰트 로드 실패 ({self.font_path}): {e}", level=logging.WARNING)
try:
# 기본 폰트 사용
self.font = ImageFont.load_default()
self.logger.log("기본 폰트를 사용합니다.", level=logging.INFO)
except Exception as e2:
self.logger.log(f"기본 폰트 로드도 실패: {e2}", level=logging.ERROR)
# 최후의 수단으로 None 설정
self.font = None
def save_image_to_path(self, image, path):
try:
if image:
# 이미지를 저장 경로에 저장
self.logger.log(f"이미지 저장 완료 : {path}", level=logging.INFO)
image.save(path, format='PNG')
return path
except Exception as e:
raise RuntimeError(f"이미지 저장 중 오류 발생: {e}")
def add_watermark(self, image_data, watermark_text="Watermark", opacity_percent=30, angle=30, font_size=36):
"""
이미지에 텍스트 워터마크를 이미지 전체에 걸쳐서 추가하는 함수
:param image_data: PIL 이미지 객체
:param watermark_text: 워터마크로 추가할 텍스트
:param opacity_percent: 워터마크의 투명도 (0~100)
:param angle: 워터마크 텍스트 회전 각도 (기본 30)
:param font_size: 워터마크 텍스트의 폰트 크기 (기본 36)
:return: 워터마크가 추가된 이미지
"""
try:
if isinstance(image_data, np.ndarray):
image_data = Image.fromarray(cv2.cvtColor(image_data, cv2.COLOR_BGR2RGB))
# 폰트가 로드되지 않은 경우 원본 이미지 반환
if self.font is None:
self.logger.log("폰트가 로드되지 않아 워터마크를 추가할 수 없습니다. 원본 이미지를 반환합니다.", level=logging.WARNING)
return image_data
# 이미지 복사본 생성
watermark_image = image_data.copy()
# 폰트 설정 (안전한 폰트 로딩)
try:
# self.font가 있으면 크기만 조정해서 새 폰트 생성
if hasattr(self, 'font_path') and os.path.exists(self.font_path):
font = ImageFont.truetype(self.font_path, font_size)
else:
# 크기를 조정할 수 없으면 기존 폰트 사용
font = self.font
except Exception as e:
self.logger.log(f"폰트 크기 조정 실패: {e}. 기본 폰트를 사용합니다.", level=logging.WARNING)
font = self.font
# 텍스트 투명도를 0~255로 변환
opacity = int(255 * (opacity_percent / 100))
# 텍스트 크기 측정 (textbbox 사용)
draw = ImageDraw.Draw(watermark_image)
bbox = draw.textbbox((0, 0), watermark_text, font=font)
text_width, text_height = bbox[2] - bbox[0], bbox[3] - bbox[1]
# 이미지 크기
width, height = image_data.size
# 워터마크 레이어 생성
watermark_layer = Image.new("RGBA", (width, height)) # RGBA 이미지 생성
# 지그재그 간격 설정
zigzag_step = int(text_height * 2) # Y축의 지그재그 간격
# 이미지 전체에 반복적으로 워터마크 텍스트 그리기 (지그재그 형태)
for y in range(0, height, zigzag_step):
for x in range(0, width, int(text_width * 3)): # 3배 너비 간격으로 반복
# 텍스트가 한 줄씩 지그재그 형태로 X축을 교차하여 이동
x_offset = (y // zigzag_step) % 2 * int(text_width * 1.5) # 짝수 행에서는 X축을 약간 이동
# 텍스트 레이어 생성
text_layer = Image.new("RGBA", (text_width, text_height), (255, 255, 255, 0))
text_draw = ImageDraw.Draw(text_layer)
# 텍스트 그리기
text_draw.text((0, 0), watermark_text, fill=(255, 255, 255, opacity), font=font)
# 텍스트 회전
rotated_text_layer = text_layer.rotate(angle, expand=1)
# 회전된 텍스트를 워터마크 레이어에 추가
watermark_layer.paste(rotated_text_layer, (x + x_offset, y), rotated_text_layer)
# 원본 이미지와 워터마크 레이어 합성
watermark_image = Image.alpha_composite(watermark_image.convert("RGBA"), watermark_layer)
# 최종적으로 RGB 형식으로 변환 후 반환
return watermark_image.convert("RGB")
except Exception as e:
self.logger.log(f"워터마크 추가 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return image_data
def crop_image(self, image, is_thumb=False, crop_percentage=0.01):
"""이미지를 주어진 퍼센트만큼 크롭하는 함수"""
if is_thumb:
crop_percentage = 0.03
self.logger.log(f"썸네일 이미지 이므로 크롭 3%로 조정", level=logging.DEBUG)
width, height = image.size
left = width * crop_percentage
top = height * crop_percentage
right = width * (1 - crop_percentage)
bottom = height * (1 - crop_percentage)
cropped_image = image.crop((left, top, right, bottom))
if self.debug:
# 디버그 모드일 경우 크롭 전후 다양한 비율로 이미지 저장
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
original_image_path = os.path.join(os.getcwd(), f"original_image_{timestamp}.png")
image.save(original_image_path) # 크롭 전 이미지 저장
self.logger.log(f"크롭 전 이미지 저장됨: {original_image_path}", level=logging.DEBUG)
# 1%, 2%, 3% 크롭 이미지 저장
crop_alternatives = [0.01, 0.02, 0.03]
for crop in crop_alternatives:
left_alt = width * crop
top_alt = height * crop
right_alt = width * (1 - crop)
bottom_alt = height * (1 - crop)
cropped_alt_image = image.crop((left_alt, top_alt, right_alt, bottom_alt))
cropped_image_path = os.path.join(os.getcwd(), f"cropped_image_{int(crop*100)}_{timestamp}.png")
cropped_alt_image.save(cropped_image_path)
self.logger.log(f"{int(crop*100)}% 크롭된 이미지 저장됨: {cropped_image_path}", level=logging.DEBUG)
return cropped_image

260
modules/settings_manager.py Normal file
View File

@ -0,0 +1,260 @@
from PySide6.QtCore import QSettings
import logging
class SettingsManager:
"""
사용자 설정값(토글, 스핀, 텍스트 ) 저장/불러오고,
각종 위젯에 따라 UI 상태까지 자동 적용하는 매니저 클래스입니다.
"""
def __init__(self, logger=None, organization="WhenRideMycar", application="EditPartTimer3"):
"""
QSettings 기반 설정 매니저 초기화
:param logger: logging.Logger 또는 print 대체용 함수
:param organization: QSettings 구분용 회사명
:param application: QSettings 구분용 앱명
"""
self.logger = logger or logging.getLogger(__name__)
self.settings = QSettings(organization, application)
self.widget_map = {} # 위젯명 → 저장키 매핑
self.toggle_dependencies = {} # 토글명 → 종속 위젯 매핑
def _log(self, message, level=logging.INFO):
"""
커스텀/표준 로거 모두 지원하는 내부 로깅 함수
"""
if hasattr(self.logger, "log") and "level" in self.logger.log.__code__.co_varnames:
# 커스텀 로거: self.logger.log(msg, level=logging.INFO)
self.logger.log(message, level=level)
else:
# 표준 로거
if level == logging.DEBUG:
self.logger.debug(message)
elif level == logging.INFO:
self.logger.info(message)
elif level == logging.WARNING:
self.logger.warning(message)
elif level == logging.ERROR:
self.logger.error(message)
elif level == logging.CRITICAL:
self.logger.critical(message)
else:
self.logger.info(message)
def bind_widgets(self, widget_map, toggle_dependencies):
"""
위젯-, 토글-종속 딕셔너리를 등록합니다.
:param widget_map: { "widget명": "저장키" }
:param toggle_dependencies: { "toggle명": { "dependents": [...], "visible": [...] } }
"""
self.widget_map = widget_map
self.toggle_dependencies = toggle_dependencies
def save_settings(self, widget_obj):
"""
현재 UI에 연결된 각종 위젯의 값을 QSettings에 저장합니다.
:param widget_obj: 실제 위젯 객체(self )
config 예시:
{
'discord_notify_toggle': {
'dependents': ['webhook_input'],
'visible': ['webhook_input'],
},
'ocr_toggle': {
'dependents': ['unwanted_words_button']
},
...
}
"""
for widget_name, key in self.widget_map.items():
widget = getattr(widget_obj, widget_name, None)
if widget is None:
self._log(f"[SettingsManager] '{widget_name}' 위젯을 찾을 수 없습니다.", logging.WARNING)
continue
try:
# 체크박스, 토글, 커스텀토글 등
if hasattr(widget, 'isChecked'):
value = bool(widget.isChecked())
self.logger.log(f"[SettingsManager] bool 타입 저장: {key} 값 저장: {value}", level=logging.DEBUG)
self.settings.setValue(key, value)
# SpinBox/DoubleSpinBox 등
elif hasattr(widget, 'value'):
self.settings.setValue(key, widget.value())
self.logger.log(f"[SettingsManager] int/float 타입 저장: {key} 값 저장: {widget.value()}", level=logging.DEBUG)
# QLineEdit 등 (단일줄 텍스트)
elif hasattr(widget, 'text'):
self.settings.setValue(key, widget.text())
self.logger.log(f"[SettingsManager] str 타입 저장: {key} 값 저장: {widget.text()}", level=logging.DEBUG)
# QTextEdit 등 (여러줄 텍스트)
elif hasattr(widget, 'toPlainText'):
self.settings.setValue(key, widget.toPlainText())
self.logger.log(f"[SettingsManager] str 타입 저장: {key} 값 저장: {widget.toPlainText()}", level=logging.DEBUG)
else:
self._log(f"[SettingsManager] '{widget_name}'의 값을 저장하는 방법을 알 수 없습니다.", logging.WARNING)
except Exception as e:
self._log(f"[SettingsManager] '{widget_name}' 저장 중 오류: {e}", logging.ERROR, exc_info=True)
def save_value(self, key, value):
"""특정 키로 단일 값을 설정에 저장합니다. 타입별로 정확히 저장."""
try:
# bool
if isinstance(value, bool):
self.settings.setValue(key, value)
self.logger.log(f"[SettingsManager] bool 타입 저장: {key} 값 저장: {value}", level=logging.DEBUG)
# int/float
elif isinstance(value, (int, float)):
self.settings.setValue(key, value)
self.logger.log(f"[SettingsManager] int/float 타입 저장: {key} 값 저장: {value}", level=logging.DEBUG)
# 그 외(특히 str)
else:
self.settings.setValue(key, str(value))
self.logger.log(f"[SettingsManager] str 타입 저장: {key} 값 저장: {value}", level=logging.DEBUG)
self.settings.sync()
self._log(f"[SettingsManager] {key} 값을 저장했습니다: {value}")
except Exception as e:
self._log(f"[SettingsManager] {key} 값 저장 중 오류: {e}", level=logging.WARNING)
# 기타 값 직접 접근용
def get_value(self, key, default=None):
return self.settings.value(key, default)
def load_settings(self, widget_obj):
"""
QSettings에서 저장된 값을 위젯에 복원합니다.
:param widget_obj: 실제 위젯 객체(self )
"""
for widget_name, key in self.widget_map.items():
widget = getattr(widget_obj, widget_name, None)
if widget is None:
self._log(f"[SettingsManager] '{widget_name}' 위젯을 찾을 수 없습니다.", logging.WARNING)
continue
val = self.settings.value(key, None)
if val is None:
continue # 미저장 값
try:
# 체크박스/토글 등 (bool 변환)
if hasattr(widget, 'setChecked'):
# QSettings는 bool을 str로 저장할 수 있어 안전하게 변환
if isinstance(val, bool):
widget.setChecked(val)
elif isinstance(val, (int, float)):
widget.setChecked(bool(val))
elif isinstance(val, str):
widget.setChecked(val.lower() in ['true', '1', 'yes'])
# SpinBox류
elif hasattr(widget, 'setValue'):
if isinstance(val, (int, float)):
widget.setValue(val)
elif isinstance(val, str):
if '.' in val:
widget.setValue(float(val))
else:
widget.setValue(int(val))
# QLineEdit 등
elif hasattr(widget, 'setText'):
widget.setText(str(val))
# QTextEdit 등
elif hasattr(widget, 'setPlainText'):
widget.setPlainText(str(val))
else:
self._log(f"[SettingsManager] '{widget_name}'에 값을 복원할 수 없습니다.", logging.WARNING)
except Exception as e:
self._log(f"[SettingsManager] '{widget_name}' 불러오기 오류: {e}", logging.ERROR, exc_info=True)
def reset_settings(self):
"""
전체 QSettings 값을 초기화(삭제)합니다.
"""
try:
self.settings.clear()
self._log("[SettingsManager] 모든 설정이 초기화되었습니다.", logging.INFO)
except Exception as e:
self._log(f"[SettingsManager] 설정 초기화 실패: {e}", logging.ERROR, exc_info=True)
def remove_setting(self, key):
"""
특정 키의 설정만 삭제합니다.
:param key: 삭제할 설정 (str)
"""
try:
self.settings.remove(key)
self._log(f"[SettingsManager] '{key}' 설정이 삭제되었습니다.", logging.INFO)
except Exception as e:
self._log(f"[SettingsManager] '{key}' 삭제 실패: {e}", logging.ERROR, exc_info=True)
def debug_print(self):
"""
QSettings에 저장된 모든 값을 로그로 출력합니다.
"""
try:
self.settings.sync()
all_keys = self.settings.allKeys()
self.logger.info("[SettingsManager] 저장된 모든 설정값:")
for key in all_keys:
value = self.settings.value(key)
self.logger.info(f" {key}: {value}")
except Exception as e:
self._log(f"[SettingsManager] 설정값 출력 실패: {e}", logging.ERROR, exc_info=True)
def apply_settings_to_ui(self, widget_obj):
"""
종속 딕셔너리(toggle_dependencies) 따라,
토글/체크박스 상태에 따라 dependents/visible 위젯을 자동으로 활성/비활성, 표시/숨김 처리합니다.
:param widget_obj: 실제 위젯 객체(self )
"""
for toggle_name, deps in self.toggle_dependencies.items():
toggle_widget = getattr(widget_obj, toggle_name, None)
if toggle_widget is None or not hasattr(toggle_widget, "isChecked"):
continue
checked = toggle_widget.isChecked()
# 종속 위젯 enable/disable
for dep in deps.get("dependents", []):
dep_widget = getattr(widget_obj, dep, None)
if dep_widget and hasattr(dep_widget, "setEnabled"):
try:
dep_widget.setEnabled(checked)
except Exception as e:
self._log(f"[SettingsManager] {dep} setEnabled 실패: {e}", logging.ERROR, exc_info=True)
# 종속 위젯 visible
for vis in deps.get("visible", []):
vis_widget = getattr(widget_obj, vis, None)
if vis_widget and hasattr(vis_widget, "setVisible"):
try:
vis_widget.setVisible(checked)
except Exception as e:
self._log(f"[SettingsManager] {vis} setVisible 실패: {e}", logging.ERROR, exc_info=True)
# (필요 시) 확장: 토글에 따라 특정 값을 리셋, 콜백 트리거 등
def save_user_info(self, user_info: dict):
for key, value in user_info.items():
self.settings.setValue(f"user/{key}", value)
self.settings.sync()
def load_user_info(self) -> dict:
info = {}
for key in ["email", "password", "id", "membership_level", "name"]:
info[key] = self.settings.value(f"user/{key}", "")
return info

134
modules/test_img.py Normal file
View File

@ -0,0 +1,134 @@
import os
import sys
import asyncio
import shutil
from modules.image_processor2 import ImageProcessor
from modules.loggerModule import Logger1
from modules.gpt_client import GPTClient
import logging
import cv2
# 더미 Logger
class DummyLogger:
def log(self, msg, level=logging.INFO, exc_info=None):
print(f"[{logging.getLevelName(level)}] {msg}")
# 테스트용 치환단어
unwanted_texts = {
'크리스탈': '이미지삭제',
'세탁기': '세탁기는개뿔',
}
def get_image_list(img_dir):
files = [f for f in os.listdir(img_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.webp'))]
files.sort()
return [os.path.join(img_dir, f) for f in files]
def ensure_dir(path):
if not os.path.exists(path):
os.makedirs(path)
def save_image(image, path):
cv2.imwrite(path, image)
async def sequential_process(image_paths, processor, output_dir):
print("[순차처리] 시작")
results = []
for idx, img_path in enumerate(image_paths):
print(f"[{idx+1}] {img_path} 처리 중...")
# OCR, 번역, 치환, 인페인팅 등 전체 파이프라인 실행
# process_single_image는 내부적으로 모든 로직을 처리함
result = await processor.process_single_image(
page=None, # 실제 Playwright 객체 대신 None
original_image_url=img_path,
index=idx,
is_localServer=True,
delay=0.1,
file_prefix="seq",
use_inpainting=True
)
# 결과 파일명 결정
if isinstance(result, dict):
status = result.get('status', 'unknown')
path = result.get('path', img_path)
if status == 'failed':
out_name = f"{idx+1}_failed_{os.path.basename(img_path)}"
shutil.copy(img_path, os.path.join(output_dir, out_name))
elif status == 'exclude':
# 이미지삭제: 파일을 output에 저장하지 않음
print(f"[{idx+1}] 이미지삭제로 제외됨: {img_path}")
continue
else:
out_name = f"{idx+1}_{status}_{os.path.basename(img_path)}"
shutil.copy(path, os.path.join(output_dir, out_name))
else:
# result가 경로(str)라면 원본/번역된 이미지로 간주
out_name = f"{idx+1}_original_{os.path.basename(img_path)}"
shutil.copy(result, os.path.join(output_dir, out_name))
results.append(out_name)
print("[순차처리] 완료")
return results
async def parallel_process(image_paths, processor, output_dir):
print("[동시처리] 시작")
tasks = []
for idx, img_path in enumerate(image_paths):
tasks.append(processor.process_single_image(
page=None,
original_image_url=img_path,
index=idx,
is_localServer=True,
delay=0.1,
file_prefix="par",
use_inpainting=True
))
results = await asyncio.gather(*tasks)
for idx, (img_path, result) in enumerate(zip(image_paths, results)):
if isinstance(result, dict):
status = result.get('status', 'unknown')
path = result.get('path', img_path)
if status == 'failed':
out_name = f"{idx+1}_failed_{os.path.basename(img_path)}"
shutil.copy(img_path, os.path.join(output_dir, out_name))
elif status == 'exclude':
print(f"[{idx+1}] 이미지삭제로 제외됨: {img_path}")
continue
else:
out_name = f"{idx+1}_{status}_{os.path.basename(img_path)}"
shutil.copy(path, os.path.join(output_dir, out_name))
else:
out_name = f"{idx+1}_original_{os.path.basename(img_path)}"
shutil.copy(result, os.path.join(output_dir, out_name))
print("[동시처리] 완료")
return results
async def main():
base_dir = os.path.dirname(os.path.abspath(__file__))
img_dir = os.path.join(base_dir, 'img')
output_dir = os.path.join(base_dir, 'output')
ensure_dir(output_dir)
image_paths = get_image_list(img_dir)
print(f"테스트 이미지: {image_paths}")
# 더미 logger, gpt_client, toggle_states
logger = DummyLogger()
set_log = Logger1()
gpt_client = GPTClient()
toggle_states = {
'image_font_path': os.path.join(base_dir, "HakgyoansimDunggeunmisoTTFB.ttf"),
'TEMP_IMAGE_DIR': output_dir,
'ocr': True,
'watermark_text': '테스트워터마크',
}
processor = ImageProcessor(set_log, None, toggle_states, gpt_client, base_dir)
processor.update_unwanted_texts(unwanted_texts)
print("1. 순차처리 테스트")
await sequential_process(image_paths, processor, output_dir)
print("2. 동시처리 테스트")
await parallel_process(image_paths, processor, output_dir)
if __name__ == '__main__':
asyncio.run(main())

View File

@ -0,0 +1,22 @@
import requests
import json
API_URL = "http://127.0.0.1:7000/translate_image"
payload = {
"local_image_path": "d:/py/IT_Server/modules/img/6.jpg",
"file_prefix": "test",
"toggle_states": {"ocr": True},
"unwanted_texts": {"크리스탈": "크리미"},
"watermark_text": "테스트 워터마크",
"watermark_opacity": 0.5,
"watermark_font_size": 32
}
headers = {"Content-Type": "application/json"}
response = requests.post(API_URL, data=json.dumps(payload), headers=headers)
print("응답 결과:")
print(response.status_code)
print(response.json())

View File

@ -0,0 +1,32 @@
import requests
import json
import os
import time
API_URL = "http://127.0.0.1:7000/translate_images"
IMG_DIR = "d:/py/IT_Server/modules/img"
# img 폴더의 모든 이미지 파일 리스트업
image_files = [os.path.join(IMG_DIR, f) for f in os.listdir(IMG_DIR)
if f.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".gif", ".webp", ".tif", ".tiff"))]
payload = {
"local_image_paths": image_files,
"file_prefix": "multi",
"toggle_states": {"ocr": True},
"unwanted_texts": {"크리스탈": "크리미", "세탁기": "이미지삭제"},
"watermark_text": "테스트 워터마크",
"watermark_opacity": 0.5,
"watermark_font_size": 32
}
headers = {"Content-Type": "application/json"}
start = time.time()
response = requests.post(API_URL, data=json.dumps(payload), headers=headers)
elapsed = time.time() - start
print("응답 결과:")
print(response.status_code)
print(response.json())
print(f"총 소요 시간: {elapsed:.2f}")

View File

@ -0,0 +1,252 @@
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from typing import List, Dict, Any, Tuple, Optional
import os
import math
import logging
class TextRenderingModule:
def __init__(self, logger, font_path: Optional[str] = None):
self.logger = logger
self.font_path = font_path
self.default_font_size = 20
self.font_cache = {}
def get_font(self, size: int, font_path: Optional[str] = None) -> ImageFont.FreeTypeFont:
font_path = font_path or self.font_path
cache_key = f"{font_path}_{size}"
if cache_key not in self.font_cache:
try:
if font_path and os.path.exists(font_path):
font = ImageFont.truetype(font_path, size)
else:
font = ImageFont.load_default()
self.font_cache[cache_key] = font
except Exception as e:
print(f"폰트 로드 오류: {e}")
font = ImageFont.load_default()
self.font_cache[cache_key] = font
return self.font_cache[cache_key]
def estimate_text_size(self, text: str, font_size: int, font_path: Optional[str] = None) -> Tuple[int, int]:
font = self.get_font(font_size, font_path)
try:
bbox = font.getbbox(text)
width = bbox[2] - bbox[0]
height = bbox[3] - bbox[1]
except AttributeError:
width, height = font.getsize(text)
return width, height
def calculate_optimal_font_size(self, text: str, target_width: int, target_height: int, min_size: int = 8, max_size: int = 100, font_path: Optional[str] = None) -> int:
best_size = min_size
for size in range(min_size, max_size + 1):
width, height = self.estimate_text_size(text, size, font_path)
if width <= target_width and height <= target_height:
best_size = size
else:
break
return best_size
def _estimate_background_color(self, image: np.ndarray, x1: int, y1: int, x2: int, y2: int) -> Tuple[int, int, int]:
margin = 5
y1_exp = max(0, y1 - margin)
y2_exp = min(image.shape[0], y2 + margin)
x1_exp = max(0, x1 - margin)
x2_exp = min(image.shape[1], x2 + margin)
region = image[y1_exp:y2_exp, x1_exp:x2_exp]
mean_color = np.mean(region, axis=(0, 1))
return (int(mean_color[2]), int(mean_color[1]), int(mean_color[0]))
def _get_contrasting_color(self, bg_color: Tuple[int, int, int]) -> Tuple[int, int, int]:
brightness = (bg_color[0] * 0.299 + bg_color[1] * 0.587 + bg_color[2] * 0.114)
if brightness > 128:
return (0, 0, 0)
else:
return (255, 255, 255)
def render_text(self, image: np.ndarray, ocr_results: List[Dict], translated_texts: List[str], font_path: Optional[str] = None) -> np.ndarray:
result_image = image.copy()
for i, (ocr_result, translated_text) in enumerate(zip(ocr_results, translated_texts)):
polygon = ocr_result['polygon']
polygon_array = np.array(polygon)
x_coords = polygon_array[:, 0]
y_coords = polygon_array[:, 1]
x_min, x_max = int(np.min(x_coords)), int(np.max(x_coords))
y_min, y_max = int(np.min(y_coords)), int(np.max(y_coords))
width = x_max - x_min
height = y_max - y_min
optimal_font_size = self.calculate_optimal_font_size(translated_text, width, height, font_path=font_path)
text_width, text_height = self.estimate_text_size(translated_text, optimal_font_size, font_path)
center_x = (x_min + x_max) // 2
center_y = (y_min + y_max) // 2
text_x = center_x - text_width // 2
text_y = center_y - text_height // 2
angle = 0
if len(polygon_array) >= 2:
dx = polygon_array[1][0] - polygon_array[0][0]
dy = polygon_array[1][1] - polygon_array[0][1]
angle = math.degrees(math.atan2(dy, dx))
bg_color = self._estimate_background_color(image, x_min, y_min, x_max, y_max)
text_color = self._get_contrasting_color(bg_color)
result_image = self.render_text_on_image(
result_image, translated_text, (text_x, text_y),
font_size=optimal_font_size,
font_path=font_path,
text_color=text_color,
background_color=None,
angle=angle
)
return result_image
def render_text_on_image(self, image: np.ndarray, text: str, position: Tuple[int, int], font_size: Optional[int] = None, font_path: Optional[str] = None, text_color: Tuple[int, int, int] = (0, 0, 0), background_color: Optional[Tuple[int, int, int]] = None, angle: float = 0) -> np.ndarray:
if font_size is None:
font_size = self.default_font_size
pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(pil_image)
font = self.get_font(font_size, font_path)
print(f"render_text_on_image font: {font}")
text_width, text_height = self.estimate_text_size(text, font_size, font_path)
if background_color is not None:
bg_x1 = position[0] - 2
bg_y1 = position[1] - 2
bg_x2 = position[0] + text_width + 2
bg_y2 = position[1] + text_height + 2
draw.rectangle([bg_x1, bg_y1, bg_x2, bg_y2], fill=background_color)
if angle != 0:
text_image = Image.new('RGBA', (text_width + 10, text_height + 10), (255, 255, 255, 0))
text_draw = ImageDraw.Draw(text_image)
text_draw.text((5, 5), text, font=font, fill=text_color + (255,))
rotated_text = text_image.rotate(angle, expand=True)
pil_image.paste(rotated_text, position, rotated_text)
else:
draw.text(position, text, font=font, fill=text_color)
result_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
return result_image
def create_text_styles(self) -> Dict[str, Dict[str, Any]]:
"""다양한 텍스트 스타일 정의"""
styles = {
'default': {
'color': (0, 0, 0),
'bg_color': None,
'outline': True,
'outline_color': (255, 255, 255),
'outline_width': 1
},
'bold': {
'color': (0, 0, 0),
'bg_color': (255, 255, 255),
'outline': True,
'outline_color': (128, 128, 128),
'outline_width': 2
},
'highlight': {
'color': (255, 255, 255),
'bg_color': (255, 0, 0),
'outline': False,
'outline_color': None,
'outline_width': 0
},
'subtle': {
'color': (128, 128, 128),
'bg_color': None,
'outline': True,
'outline_color': (255, 255, 255),
'outline_width': 1
}
}
return styles
def render_with_style(self, image: np.ndarray, ocr_results: List[Dict],
translated_texts: List[str], style_name: str = 'default') -> np.ndarray:
"""스타일을 적용한 텍스트 렌더링"""
styles = self.create_text_styles()
if style_name not in styles:
print(f"알 수 없는 스타일: {style_name}")
style_name = 'default'
style = styles[style_name]
# 기본 렌더링 후 스타일 적용
result = self.render_text(image, ocr_results, translated_texts)
# 추가 스타일 처리는 여기서 구현
# (예: 그림자, 글로우 효과 등)
return result
def adjust_text_for_space(self, text: str, max_width: int, max_height: int,
font_size: int) -> Tuple[str, int]:
"""
공간에 맞게 텍스트 조정
Args:
text (str): 원본 텍스트
max_width (int): 최대 너비
max_height (int): 최대 높이
font_size (int): 폰트 크기
Returns:
Tuple[str, int]: 조정된 텍스트와 폰트 크기
"""
# 텍스트가 너무 길면 줄바꿈 또는 생략
if len(text) > 20:
# 긴 텍스트는 줄바꿈
words = text.split(' ')
if len(words) > 1:
mid = len(words) // 2
text = ' '.join(words[:mid]) + '\n' + ' '.join(words[mid:])
else:
# 단어가 하나면 생략
text = text[:15] + '...'
# 폰트 크기 조정
adjusted_font_size = font_size
while adjusted_font_size > 8:
# 실제로는 텍스트 크기를 측정해서 비교
estimated_width = len(text) * adjusted_font_size * 0.6
if estimated_width <= max_width:
break
adjusted_font_size -= 2
return text, adjusted_font_size
def _create_style_comparison(self, images: List[np.ndarray], style_names: List[str]):
"""스타일 비교 이미지 생성"""
if not images:
return
# 이미지 크기 조정
target_width = 200
target_height = int(images[0].shape[0] * target_width / images[0].shape[1])
resized_images = []
for img in images:
resized = cv2.resize(img, (target_width, target_height))
resized_images.append(resized)
# 비교 이미지 생성
num_images = len(resized_images)
comparison_width = target_width * num_images
comparison_height = target_height + 30
comparison = np.ones((comparison_height, comparison_width, 3), dtype=np.uint8) * 255
# 원본 이미지
comparison[30:30+target_height, 0:target_width] = resized_images[0]
cv2.putText(comparison, "Original", (10, 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
# 스타일 이미지들
for i, (img, style_name) in enumerate(zip(resized_images[1:], style_names)):
x_offset = target_width * (i + 1)
comparison[30:30+target_height, x_offset:x_offset+target_width] = img
cv2.putText(comparison, style_name, (x_offset + 10, 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
cv2.imwrite("test_output/text_style_comparison.jpg", comparison)
self.logger.log("스타일 비교 이미지 저장 완료", level=logging.INFO)