#!/usr/bin/env python3 """ 독립적인 Celery 워커 시스템 메인서버의 Redis와 연결하여 작업을 처리합니다. """ import os import sys import argparse import socket from celery import Celery from modules.image_processor2 import ImageProcessor from modules.loggerModule import Logger1 from modules.gpt_client import GPTClient import asyncio import base64 def get_base_dir(): """ 실행 환경에 따라 base_dir을 설정하는 메서드. """ if getattr(sys, 'frozen', False): base_dir = os.path.dirname(sys.executable) internal_dir = os.path.join(base_dir, 'lib') if os.path.exists(internal_dir): return internal_dir else: base_dir = os.path.dirname(os.path.abspath(__file__)) return base_dir # 환경 변수에서 설정 가져오기 REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") MAIN_SERVER_HOST = os.getenv("MAIN_SERVER_HOST", "localhost") WORKER_NAME = os.getenv("WORKER_NAME", f"worker-{socket.gethostname()}-{os.getpid()}") SHARED_STORAGE = os.getenv("SHARED_STORAGE", "/app/images") # 메인서버와 동일한 경로 # Celery 앱 생성 celery_app = Celery( "image_worker", broker=REDIS_URL, backend=REDIS_URL ) # Celery 설정 celery_app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='Asia/Seoul', enable_utc=True, worker_prefetch_multiplier=1, task_acks_late=True, task_routes={ 'worker.translate_task': {'queue': 'translate'}, 'worker.inpaint_task': {'queue': 'inpaint'}, 'worker.ocr_task': {'queue': 'ocr'}, } ) # 글로벌 ImageProcessor 인스턴스 image_processor = None def initialize_processor(): """ImageProcessor 초기화""" global image_processor if image_processor is None: logger = Logger1() gpt_client = GPTClient() base_dir = get_base_dir() font_path = os.path.join(base_dir, "modules", "fonts", "HakgyoansimDunggeunmisoTTFB.ttf") print(f"워커 초기화 중...") print(f"워커 이름: {WORKER_NAME}") print(f"Base directory: {base_dir}") print(f"Font path: {font_path}") print(f"Redis URL: {REDIS_URL}") print(f"공유 저장소: {SHARED_STORAGE}") image_processor = ImageProcessor(logger, gpt_client, base_dir, font_path) print("ImageProcessor 초기화 완료") return image_processor def get_shared_file_path(user_id: str, filename: str) -> str: """공유 저장소의 파일 전체 경로 반환""" return os.path.join(SHARED_STORAGE, user_id, filename) @celery_app.task(name="worker.translate_task", bind=True) def translate_task(self, **kwargs): """번역 작업 처리""" try: processor = initialize_processor() # 공유 저장소에서 파일 읽기 user_id = kwargs.get('user_id') filename = kwargs.get('filename') shared_storage = kwargs.get('shared_storage', SHARED_STORAGE) toggle_states = kwargs.get('toggle_states', {}) unwanted_texts = kwargs.get('unwanted_texts', {}) if not filename: raise ValueError("filename이 제공되지 않았습니다.") input_file_path = get_shared_file_path(user_id, filename) print(f"입력 파일 경로: {input_file_path}") # 파일 존재 확인 if not os.path.exists(input_file_path): raise FileNotFoundError(f"파일을 찾을 수 없습니다: {input_file_path}") # 실제 번역 로직 호출 import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: # process_single_image 호출 translated_image_path = loop.run_until_complete( processor.process_single_image( toggle_states=toggle_states, unwanted_texts=unwanted_texts, local_image_path=input_file_path, index=0, file_prefix="" ) ) print(f"번역 완료: {translated_image_path}") # 결과 이미지를 base64로 인코딩 if translated_image_path and os.path.exists(translated_image_path): with open(translated_image_path, "rb") as img_file: image_data = img_file.read() inpainted_image_b64 = base64.b64encode(image_data).decode('utf-8') else: # 원본 이미지를 반환 with open(input_file_path, "rb") as img_file: image_data = img_file.read() inpainted_image_b64 = base64.b64encode(image_data).decode('utf-8') # OCR 결과 가져오기 (간단한 버전) ocr_results = processor.ocr_module.detect_text(input_file_path) ocr_texts = [result['text'] for result in ocr_results] ocr_boxes = [ { "text": result['text'], "box": result['bbox'] } for result in ocr_results ] result = { "task_id": self.request.id, "worker_name": WORKER_NAME, "status": "completed", "input_file": input_file_path, "translated_file": translated_image_path, "ocr_texts": ocr_texts, "ocr_boxes": ocr_boxes, "translated_texts": ocr_texts, # 실제로는 번역된 텍스트여야 함 "inpainted_image": inpainted_image_b64 } finally: loop.close() print(f"번역 작업 완료: {self.request.id} (워커: {WORKER_NAME})") return result except Exception as e: print(f"번역 작업 오류: {str(e)} (워커: {WORKER_NAME})") import traceback traceback.print_exc() raise self.retry(countdown=60, max_retries=3) @celery_app.task(name="worker.inpaint_task", bind=True) def inpaint_task(self, **kwargs): """인페인팅 작업 처리""" try: processor = initialize_processor() # 공유 저장소에서 파일들 읽기 user_id = kwargs.get('user_id') image_filename = kwargs.get('image_filename') mask_filename = kwargs.get('mask_filename') inpaint_method = kwargs.get('inpaint_method', 'lama') if not image_filename or not mask_filename: raise ValueError("image_filename 또는 mask_filename이 제공되지 않았습니다.") image_path = get_shared_file_path(user_id, image_filename) mask_path = get_shared_file_path(user_id, mask_filename) print(f"이미지 파일: {image_path}") print(f"마스크 파일: {mask_path}") # 파일 존재 확인 if not os.path.exists(image_path): raise FileNotFoundError(f"이미지 파일을 찾을 수 없습니다: {image_path}") if not os.path.exists(mask_path): raise FileNotFoundError(f"마스크 파일을 찾을 수 없습니다: {mask_path}") # 실제 인페인팅 로직 호출 try: if inpaint_method == 'lama': from modules.lama_inpaint import inpaint_with_simple_lama inpainted_image = inpaint_with_simple_lama(image_path, mask_path) else: # API 방식 인페인팅 inpainted_image = processor.call_inpaint_api(image_path, mask_path) # 결과를 base64로 인코딩 if inpainted_image is not None: import cv2 _, buffer = cv2.imencode('.png', inpainted_image) inpainted_image_b64 = base64.b64encode(buffer.tobytes()).decode('utf-8') else: # 실패 시 원본 이미지 반환 with open(image_path, "rb") as img_file: image_data = img_file.read() inpainted_image_b64 = base64.b64encode(image_data).decode('utf-8') result = { "task_id": self.request.id, "worker_name": WORKER_NAME, "status": "completed", "inpainted_image": inpainted_image_b64 } except Exception as e: print(f"인페인팅 처리 중 오류: {str(e)}") # 오류 시 원본 이미지 반환 with open(image_path, "rb") as img_file: image_data = img_file.read() inpainted_image_b64 = base64.b64encode(image_data).decode('utf-8') result = { "task_id": self.request.id, "worker_name": WORKER_NAME, "status": "error", "error": str(e), "inpainted_image": inpainted_image_b64 } print(f"인페인팅 작업 완료: {self.request.id} (워커: {WORKER_NAME})") return result except Exception as e: print(f"인페인팅 작업 오류: {str(e)} (워커: {WORKER_NAME})") import traceback traceback.print_exc() raise self.retry(countdown=60, max_retries=3) @celery_app.task(name="worker.ocr_task", bind=True) def ocr_task(self, **kwargs): """OCR 작업 처리""" try: processor = initialize_processor() # 공유 저장소에서 파일 읽기 user_id = kwargs.get('user_id') filename = kwargs.get('filename') ocr_method = kwargs.get('ocr_method', 'paddleocr') if not filename: raise ValueError("filename이 제공되지 않았습니다.") input_file_path = get_shared_file_path(user_id, filename) print(f"OCR 입력 파일: {input_file_path}") # 파일 존재 확인 if not os.path.exists(input_file_path): raise FileNotFoundError(f"파일을 찾을 수 없습니다: {input_file_path}") # 실제 OCR 로직 호출 ocr_results = processor.ocr_module.detect_text(input_file_path) ocr_texts = [result['text'] for result in ocr_results] ocr_boxes = [ { "text": result['text'], "box": result['bbox'] } for result in ocr_results ] result = { "task_id": self.request.id, "worker_name": WORKER_NAME, "status": "completed", "ocr_texts": ocr_texts, "ocr_boxes": ocr_boxes } print(f"OCR 작업 완료: {self.request.id} (워커: {WORKER_NAME})") return result except Exception as e: print(f"OCR 작업 오류: {str(e)} (워커: {WORKER_NAME})") import traceback traceback.print_exc() raise self.retry(countdown=60, max_retries=3) if __name__ == "__main__": parser = argparse.ArgumentParser(description="이미지 처리 Celery 워커") parser.add_argument('--concurrency', type=int, default=2, help='동시 처리 작업 수') parser.add_argument('--redis-url', type=str, help='Redis 서버 URL') parser.add_argument('--main-server', type=str, help='메인 서버 주소') parser.add_argument('--worker-name', type=str, help='워커 식별명') parser.add_argument('--shared-storage', type=str, help='공유 저장소 경로') args = parser.parse_args() # 명령행 인수로 설정 오버라이드 if args.redis_url: os.environ["REDIS_URL"] = args.redis_url REDIS_URL = args.redis_url if args.main_server: os.environ["MAIN_SERVER_HOST"] = args.main_server MAIN_SERVER_HOST = args.main_server if args.worker_name: os.environ["WORKER_NAME"] = args.worker_name WORKER_NAME = args.worker_name if args.shared_storage: os.environ["SHARED_STORAGE"] = args.shared_storage SHARED_STORAGE = args.shared_storage print(f"워커 시작 중...") print(f"워커 이름: {WORKER_NAME}") print(f"Redis URL: {REDIS_URL}") print(f"메인 서버: {MAIN_SERVER_HOST}") print(f"동시 처리 수: {args.concurrency}") print(f"공유 저장소: {SHARED_STORAGE}") # 워커 실행 celery_app.worker_main([ 'worker', '--loglevel=info', f'--concurrency={args.concurrency}', '--queues=translate,inpaint,ocr', f'--hostname={WORKER_NAME}@%h' ])