#!/usr/bin/env python3 """ 워커 시스템 테스트용 클라이언트 메인 서버에서 워커로 작업을 전송하고 결과를 받아오는 예제 """ import os import sys import base64 import json import time from celery import Celery # Redis 설정 REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") # Celery 앱 생성 (클라이언트용) celery_client = Celery( "image_worker_client", broker=REDIS_URL, backend=REDIS_URL ) # Celery 설정 celery_client.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='Asia/Seoul', enable_utc=True, ) def load_image_as_base64(image_path): """이미지 파일을 base64로 인코딩""" try: with open(image_path, 'rb') as f: image_data = f.read() return base64.b64encode(image_data).decode('utf-8') except Exception as e: print(f"이미지 로딩 실패: {e}") return None def test_translate_me_task(image_path=None, image_data=None): """translate_me 작업 테스트""" print("=== translate_me 작업 테스트 ===") # 작업 파라미터 설정 task_kwargs = { 'toggle_states': { 'ocr': True, 'watermark_text': '번역완료' }, 'unwanted_texts': { '광고': '이미지삭제', '홍보': '공지' }, 'index': 0, 'file_prefix': 'test' } if image_path: task_kwargs['image_path'] = image_path print(f"이미지 파일 경로 사용: {image_path}") elif image_data: task_kwargs['image_data'] = image_data print("base64 이미지 데이터 사용") else: print("이미지 경로 또는 데이터가 필요합니다") return None # 작업 전송 try: print("작업을 워커에 전송 중...") result = celery_client.send_task( 'worker.translate_me_task', kwargs=task_kwargs ) print(f"작업 ID: {result.id}") print("결과 대기 중...") # 결과 대기 (최대 300초) task_result = result.get(timeout=300) print("\n=== 작업 결과 ===") print(f"상태: {task_result.get('status')}") print(f"워커 이름: {task_result.get('worker_name')}") print(f"원본 경로: {task_result.get('original_path')}") print(f"결과 경로: {task_result.get('result_path')}") print(f"메시지: {task_result.get('message')}") # 결과 이미지 저장 output_image = task_result.get('output_image') if output_image: output_path = f"test_result_{int(time.time())}.png" with open(output_path, 'wb') as f: f.write(base64.b64decode(output_image)) print(f"결과 이미지 저장: {output_path}") return task_result except Exception as e: print(f"작업 실행 오류: {e}") return None def test_ocr_task(image_path=None, image_data=None): """OCR 작업 테스트""" print("\n=== OCR 작업 테스트 ===") task_kwargs = {} if image_path: task_kwargs['image_path'] = image_path print(f"이미지 파일 경로 사용: {image_path}") elif image_data: task_kwargs['image_data'] = image_data print("base64 이미지 데이터 사용") else: print("이미지 경로 또는 데이터가 필요합니다") return None try: print("OCR 작업을 워커에 전송 중...") result = celery_client.send_task( 'worker.ocr_task', kwargs=task_kwargs ) print(f"작업 ID: {result.id}") print("결과 대기 중...") # 결과 대기 (최대 60초) task_result = result.get(timeout=60) print("\n=== OCR 결과 ===") print(f"상태: {task_result.get('status')}") print(f"워커 이름: {task_result.get('worker_name')}") print(f"감지된 텍스트: {task_result.get('ocr_texts')}") print(f"중국어 텍스트: {task_result.get('chinese_texts')}") print(f"OCR 박스 수: {len(task_result.get('ocr_boxes', []))}") return task_result except Exception as e: print(f"OCR 작업 실행 오류: {e}") return None def check_worker_status(): """워커 상태 확인""" print("=== 워커 상태 확인 ===") try: # 간단한 ping 작업 (없으면 OCR 작업으로 대체) inspect = celery_client.control.inspect() # 활성 워커 목록 active_workers = inspect.active() if active_workers: print(f"활성 워커: {list(active_workers.keys())}") # 각 워커의 상태 for worker_name, tasks in active_workers.items(): print(f" {worker_name}: {len(tasks)}개 작업 실행 중") else: print("활성 워커가 없습니다") # 등록된 작업 목록 registered_tasks = inspect.registered() if registered_tasks: print("\n등록된 작업:") for worker_name, tasks in registered_tasks.items(): print(f" {worker_name}:") for task in tasks: if task.startswith('worker.'): print(f" - {task}") except Exception as e: print(f"워커 상태 확인 오류: {e}") def main(): """메인 함수""" import argparse parser = argparse.ArgumentParser(description="워커 시스템 테스트 클라이언트") parser.add_argument('--image', type=str, help='테스트할 이미지 파일 경로') parser.add_argument('--redis-url', type=str, help='Redis 서버 URL') parser.add_argument('--test-type', choices=['translate', 'ocr', 'status', 'all'], default='all', help='테스트 유형') args = parser.parse_args() # Redis URL 설정 if args.redis_url: global REDIS_URL REDIS_URL = args.redis_url celery_client.conf.broker_url = REDIS_URL celery_client.conf.result_backend = REDIS_URL print(f"Redis URL: {REDIS_URL}") # 이미지 데이터 준비 image_data = None if args.image: if not os.path.exists(args.image): print(f"이미지 파일이 존재하지 않습니다: {args.image}") return image_data = load_image_as_base64(args.image) if not image_data: return # 테스트 실행 if args.test_type in ['status', 'all']: check_worker_status() if args.test_type in ['ocr', 'all'] and args.image: test_ocr_task(image_path=args.image) if args.test_type in ['translate', 'all'] and args.image: test_translate_me_task(image_path=args.image) if not args.image and args.test_type != 'status': print("\n사용법:") print(" python test_worker_client.py --image sample.jpg --test-type translate") print(" python test_worker_client.py --test-type status") if __name__ == "__main__": main()