343 lines
12 KiB
Python
Executable File
343 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
독립적인 Celery 워커 시스템
|
|
메인서버의 Redis와 연결하여 작업을 처리합니다.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import socket
|
|
from celery import Celery
|
|
from modules.image_processor2 import ImageProcessor
|
|
from modules.loggerModule import Logger1
|
|
from modules.gpt_client import GPTClient
|
|
import asyncio
|
|
import base64
|
|
|
|
def get_base_dir():
|
|
"""
|
|
실행 환경에 따라 base_dir을 설정하는 메서드.
|
|
"""
|
|
if getattr(sys, 'frozen', False):
|
|
base_dir = os.path.dirname(sys.executable)
|
|
internal_dir = os.path.join(base_dir, 'lib')
|
|
if os.path.exists(internal_dir):
|
|
return internal_dir
|
|
else:
|
|
base_dir = os.path.dirname(os.path.abspath(__file__))
|
|
return base_dir
|
|
|
|
# 환경 변수에서 설정 가져오기
|
|
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
|
|
MAIN_SERVER_HOST = os.getenv("MAIN_SERVER_HOST", "localhost")
|
|
WORKER_NAME = os.getenv("WORKER_NAME", f"worker-{socket.gethostname()}-{os.getpid()}")
|
|
SHARED_STORAGE = os.getenv("SHARED_STORAGE", "/app/images") # 메인서버와 동일한 경로
|
|
|
|
# Celery 앱 생성
|
|
celery_app = Celery(
|
|
"image_worker",
|
|
broker=REDIS_URL,
|
|
backend=REDIS_URL
|
|
)
|
|
|
|
# Celery 설정
|
|
celery_app.conf.update(
|
|
task_serializer='json',
|
|
accept_content=['json'],
|
|
result_serializer='json',
|
|
timezone='Asia/Seoul',
|
|
enable_utc=True,
|
|
worker_prefetch_multiplier=1,
|
|
task_acks_late=True,
|
|
task_routes={
|
|
'worker.translate_task': {'queue': 'translate'},
|
|
'worker.inpaint_task': {'queue': 'inpaint'},
|
|
'worker.ocr_task': {'queue': 'ocr'},
|
|
}
|
|
)
|
|
|
|
# 글로벌 ImageProcessor 인스턴스
|
|
image_processor = None
|
|
|
|
def initialize_processor():
|
|
"""ImageProcessor 초기화"""
|
|
global image_processor
|
|
if image_processor is None:
|
|
logger = Logger1()
|
|
gpt_client = GPTClient()
|
|
base_dir = get_base_dir()
|
|
font_path = os.path.join(base_dir, "modules", "fonts", "HakgyoansimDunggeunmisoTTFB.ttf")
|
|
|
|
print(f"워커 초기화 중...")
|
|
print(f"워커 이름: {WORKER_NAME}")
|
|
print(f"Base directory: {base_dir}")
|
|
print(f"Font path: {font_path}")
|
|
print(f"Redis URL: {REDIS_URL}")
|
|
print(f"공유 저장소: {SHARED_STORAGE}")
|
|
|
|
image_processor = ImageProcessor(logger, gpt_client, base_dir, font_path)
|
|
print("ImageProcessor 초기화 완료")
|
|
|
|
return image_processor
|
|
|
|
def get_shared_file_path(user_id: str, filename: str) -> str:
|
|
"""공유 저장소의 파일 전체 경로 반환"""
|
|
return os.path.join(SHARED_STORAGE, user_id, filename)
|
|
|
|
@celery_app.task(name="worker.translate_task", bind=True)
|
|
def translate_task(self, **kwargs):
|
|
"""번역 작업 처리"""
|
|
try:
|
|
processor = initialize_processor()
|
|
|
|
# 공유 저장소에서 파일 읽기
|
|
user_id = kwargs.get('user_id')
|
|
filename = kwargs.get('filename')
|
|
shared_storage = kwargs.get('shared_storage', SHARED_STORAGE)
|
|
toggle_states = kwargs.get('toggle_states', {})
|
|
unwanted_texts = kwargs.get('unwanted_texts', {})
|
|
|
|
if not filename:
|
|
raise ValueError("filename이 제공되지 않았습니다.")
|
|
|
|
input_file_path = get_shared_file_path(user_id, filename)
|
|
print(f"입력 파일 경로: {input_file_path}")
|
|
|
|
# 파일 존재 확인
|
|
if not os.path.exists(input_file_path):
|
|
raise FileNotFoundError(f"파일을 찾을 수 없습니다: {input_file_path}")
|
|
|
|
# 실제 번역 로직 호출
|
|
import asyncio
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
try:
|
|
# process_single_image 호출
|
|
translated_image_path = loop.run_until_complete(
|
|
processor.process_single_image(
|
|
toggle_states=toggle_states,
|
|
unwanted_texts=unwanted_texts,
|
|
local_image_path=input_file_path,
|
|
index=0,
|
|
file_prefix=""
|
|
)
|
|
)
|
|
|
|
print(f"번역 완료: {translated_image_path}")
|
|
|
|
# 결과 이미지를 base64로 인코딩
|
|
if translated_image_path and os.path.exists(translated_image_path):
|
|
with open(translated_image_path, "rb") as img_file:
|
|
image_data = img_file.read()
|
|
inpainted_image_b64 = base64.b64encode(image_data).decode('utf-8')
|
|
else:
|
|
# 원본 이미지를 반환
|
|
with open(input_file_path, "rb") as img_file:
|
|
image_data = img_file.read()
|
|
inpainted_image_b64 = base64.b64encode(image_data).decode('utf-8')
|
|
|
|
# OCR 결과 가져오기 (간단한 버전)
|
|
ocr_results = processor.ocr_module.detect_text(input_file_path)
|
|
ocr_texts = [result['text'] for result in ocr_results]
|
|
ocr_boxes = [
|
|
{
|
|
"text": result['text'],
|
|
"box": result['bbox']
|
|
} for result in ocr_results
|
|
]
|
|
|
|
result = {
|
|
"task_id": self.request.id,
|
|
"worker_name": WORKER_NAME,
|
|
"status": "completed",
|
|
"input_file": input_file_path,
|
|
"translated_file": translated_image_path,
|
|
"ocr_texts": ocr_texts,
|
|
"ocr_boxes": ocr_boxes,
|
|
"translated_texts": ocr_texts, # 실제로는 번역된 텍스트여야 함
|
|
"inpainted_image": inpainted_image_b64
|
|
}
|
|
|
|
finally:
|
|
loop.close()
|
|
|
|
print(f"번역 작업 완료: {self.request.id} (워커: {WORKER_NAME})")
|
|
return result
|
|
|
|
except Exception as e:
|
|
print(f"번역 작업 오류: {str(e)} (워커: {WORKER_NAME})")
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise self.retry(countdown=60, max_retries=3)
|
|
|
|
@celery_app.task(name="worker.inpaint_task", bind=True)
|
|
def inpaint_task(self, **kwargs):
|
|
"""인페인팅 작업 처리"""
|
|
try:
|
|
processor = initialize_processor()
|
|
|
|
# 공유 저장소에서 파일들 읽기
|
|
user_id = kwargs.get('user_id')
|
|
image_filename = kwargs.get('image_filename')
|
|
mask_filename = kwargs.get('mask_filename')
|
|
inpaint_method = kwargs.get('inpaint_method', 'lama')
|
|
|
|
if not image_filename or not mask_filename:
|
|
raise ValueError("image_filename 또는 mask_filename이 제공되지 않았습니다.")
|
|
|
|
image_path = get_shared_file_path(user_id, image_filename)
|
|
mask_path = get_shared_file_path(user_id, mask_filename)
|
|
|
|
print(f"이미지 파일: {image_path}")
|
|
print(f"마스크 파일: {mask_path}")
|
|
|
|
# 파일 존재 확인
|
|
if not os.path.exists(image_path):
|
|
raise FileNotFoundError(f"이미지 파일을 찾을 수 없습니다: {image_path}")
|
|
if not os.path.exists(mask_path):
|
|
raise FileNotFoundError(f"마스크 파일을 찾을 수 없습니다: {mask_path}")
|
|
|
|
# 실제 인페인팅 로직 호출
|
|
try:
|
|
if inpaint_method == 'lama':
|
|
from modules.lama_inpaint import inpaint_with_simple_lama
|
|
inpainted_image = inpaint_with_simple_lama(image_path, mask_path)
|
|
else:
|
|
# API 방식 인페인팅
|
|
inpainted_image = processor.call_inpaint_api(image_path, mask_path)
|
|
|
|
# 결과를 base64로 인코딩
|
|
if inpainted_image is not None:
|
|
import cv2
|
|
_, buffer = cv2.imencode('.png', inpainted_image)
|
|
inpainted_image_b64 = base64.b64encode(buffer.tobytes()).decode('utf-8')
|
|
else:
|
|
# 실패 시 원본 이미지 반환
|
|
with open(image_path, "rb") as img_file:
|
|
image_data = img_file.read()
|
|
inpainted_image_b64 = base64.b64encode(image_data).decode('utf-8')
|
|
|
|
result = {
|
|
"task_id": self.request.id,
|
|
"worker_name": WORKER_NAME,
|
|
"status": "completed",
|
|
"inpainted_image": inpainted_image_b64
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"인페인팅 처리 중 오류: {str(e)}")
|
|
# 오류 시 원본 이미지 반환
|
|
with open(image_path, "rb") as img_file:
|
|
image_data = img_file.read()
|
|
inpainted_image_b64 = base64.b64encode(image_data).decode('utf-8')
|
|
|
|
result = {
|
|
"task_id": self.request.id,
|
|
"worker_name": WORKER_NAME,
|
|
"status": "error",
|
|
"error": str(e),
|
|
"inpainted_image": inpainted_image_b64
|
|
}
|
|
|
|
print(f"인페인팅 작업 완료: {self.request.id} (워커: {WORKER_NAME})")
|
|
return result
|
|
|
|
except Exception as e:
|
|
print(f"인페인팅 작업 오류: {str(e)} (워커: {WORKER_NAME})")
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise self.retry(countdown=60, max_retries=3)
|
|
|
|
@celery_app.task(name="worker.ocr_task", bind=True)
|
|
def ocr_task(self, **kwargs):
|
|
"""OCR 작업 처리"""
|
|
try:
|
|
processor = initialize_processor()
|
|
|
|
# 공유 저장소에서 파일 읽기
|
|
user_id = kwargs.get('user_id')
|
|
filename = kwargs.get('filename')
|
|
ocr_method = kwargs.get('ocr_method', 'paddleocr')
|
|
|
|
if not filename:
|
|
raise ValueError("filename이 제공되지 않았습니다.")
|
|
|
|
input_file_path = get_shared_file_path(user_id, filename)
|
|
print(f"OCR 입력 파일: {input_file_path}")
|
|
|
|
# 파일 존재 확인
|
|
if not os.path.exists(input_file_path):
|
|
raise FileNotFoundError(f"파일을 찾을 수 없습니다: {input_file_path}")
|
|
|
|
# 실제 OCR 로직 호출
|
|
ocr_results = processor.ocr_module.detect_text(input_file_path)
|
|
|
|
ocr_texts = [result['text'] for result in ocr_results]
|
|
ocr_boxes = [
|
|
{
|
|
"text": result['text'],
|
|
"box": result['bbox']
|
|
} for result in ocr_results
|
|
]
|
|
|
|
result = {
|
|
"task_id": self.request.id,
|
|
"worker_name": WORKER_NAME,
|
|
"status": "completed",
|
|
"ocr_texts": ocr_texts,
|
|
"ocr_boxes": ocr_boxes
|
|
}
|
|
|
|
print(f"OCR 작업 완료: {self.request.id} (워커: {WORKER_NAME})")
|
|
return result
|
|
|
|
except Exception as e:
|
|
print(f"OCR 작업 오류: {str(e)} (워커: {WORKER_NAME})")
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise self.retry(countdown=60, max_retries=3)
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="이미지 처리 Celery 워커")
|
|
parser.add_argument('--concurrency', type=int, default=2, help='동시 처리 작업 수')
|
|
parser.add_argument('--redis-url', type=str, help='Redis 서버 URL')
|
|
parser.add_argument('--main-server', type=str, help='메인 서버 주소')
|
|
parser.add_argument('--worker-name', type=str, help='워커 식별명')
|
|
parser.add_argument('--shared-storage', type=str, help='공유 저장소 경로')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# 명령행 인수로 설정 오버라이드
|
|
if args.redis_url:
|
|
os.environ["REDIS_URL"] = args.redis_url
|
|
REDIS_URL = args.redis_url
|
|
|
|
if args.main_server:
|
|
os.environ["MAIN_SERVER_HOST"] = args.main_server
|
|
MAIN_SERVER_HOST = args.main_server
|
|
|
|
if args.worker_name:
|
|
os.environ["WORKER_NAME"] = args.worker_name
|
|
WORKER_NAME = args.worker_name
|
|
|
|
if args.shared_storage:
|
|
os.environ["SHARED_STORAGE"] = args.shared_storage
|
|
SHARED_STORAGE = args.shared_storage
|
|
|
|
print(f"워커 시작 중...")
|
|
print(f"워커 이름: {WORKER_NAME}")
|
|
print(f"Redis URL: {REDIS_URL}")
|
|
print(f"메인 서버: {MAIN_SERVER_HOST}")
|
|
print(f"동시 처리 수: {args.concurrency}")
|
|
print(f"공유 저장소: {SHARED_STORAGE}")
|
|
|
|
# 워커 실행
|
|
celery_app.worker_main([
|
|
'worker',
|
|
'--loglevel=info',
|
|
f'--concurrency={args.concurrency}',
|
|
'--queues=translate,inpaint,ocr',
|
|
f'--hostname={WORKER_NAME}@%h'
|
|
]) |