TransWorker/test_worker_client.py

227 lines
7.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
워커 시스템 테스트용 클라이언트
메인 서버에서 워커로 작업을 전송하고 결과를 받아오는 예제
"""
import os
import sys
import base64
import json
import time
from celery import Celery
# Redis 설정
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Celery 앱 생성 (클라이언트용)
celery_client = Celery(
"image_worker_client",
broker=REDIS_URL,
backend=REDIS_URL
)
# Celery 설정
celery_client.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Seoul',
enable_utc=True,
)
def load_image_as_base64(image_path):
"""이미지 파일을 base64로 인코딩"""
try:
with open(image_path, 'rb') as f:
image_data = f.read()
return base64.b64encode(image_data).decode('utf-8')
except Exception as e:
print(f"이미지 로딩 실패: {e}")
return None
def test_translate_me_task(image_path=None, image_data=None):
"""translate_me 작업 테스트"""
print("=== translate_me 작업 테스트 ===")
# 작업 파라미터 설정
task_kwargs = {
'toggle_states': {
'ocr': True,
'watermark_text': '번역완료'
},
'unwanted_texts': {
'광고': '이미지삭제',
'홍보': '공지'
},
'index': 0,
'file_prefix': 'test'
}
if image_path:
task_kwargs['image_path'] = image_path
print(f"이미지 파일 경로 사용: {image_path}")
elif image_data:
task_kwargs['image_data'] = image_data
print("base64 이미지 데이터 사용")
else:
print("이미지 경로 또는 데이터가 필요합니다")
return None
# 작업 전송
try:
print("작업을 워커에 전송 중...")
result = celery_client.send_task(
'worker.translate_me_task',
kwargs=task_kwargs
)
print(f"작업 ID: {result.id}")
print("결과 대기 중...")
# 결과 대기 (최대 300초)
task_result = result.get(timeout=300)
print("\n=== 작업 결과 ===")
print(f"상태: {task_result.get('status')}")
print(f"워커 이름: {task_result.get('worker_name')}")
print(f"원본 경로: {task_result.get('original_path')}")
print(f"결과 경로: {task_result.get('result_path')}")
print(f"메시지: {task_result.get('message')}")
# 결과 이미지 저장
output_image = task_result.get('output_image')
if output_image:
output_path = f"test_result_{int(time.time())}.png"
with open(output_path, 'wb') as f:
f.write(base64.b64decode(output_image))
print(f"결과 이미지 저장: {output_path}")
return task_result
except Exception as e:
print(f"작업 실행 오류: {e}")
return None
def test_ocr_task(image_path=None, image_data=None):
"""OCR 작업 테스트"""
print("\n=== OCR 작업 테스트 ===")
task_kwargs = {}
if image_path:
task_kwargs['image_path'] = image_path
print(f"이미지 파일 경로 사용: {image_path}")
elif image_data:
task_kwargs['image_data'] = image_data
print("base64 이미지 데이터 사용")
else:
print("이미지 경로 또는 데이터가 필요합니다")
return None
try:
print("OCR 작업을 워커에 전송 중...")
result = celery_client.send_task(
'worker.ocr_task',
kwargs=task_kwargs
)
print(f"작업 ID: {result.id}")
print("결과 대기 중...")
# 결과 대기 (최대 60초)
task_result = result.get(timeout=60)
print("\n=== OCR 결과 ===")
print(f"상태: {task_result.get('status')}")
print(f"워커 이름: {task_result.get('worker_name')}")
print(f"감지된 텍스트: {task_result.get('ocr_texts')}")
print(f"중국어 텍스트: {task_result.get('chinese_texts')}")
print(f"OCR 박스 수: {len(task_result.get('ocr_boxes', []))}")
return task_result
except Exception as e:
print(f"OCR 작업 실행 오류: {e}")
return None
def check_worker_status():
"""워커 상태 확인"""
print("=== 워커 상태 확인 ===")
try:
# 간단한 ping 작업 (없으면 OCR 작업으로 대체)
inspect = celery_client.control.inspect()
# 활성 워커 목록
active_workers = inspect.active()
if active_workers:
print(f"활성 워커: {list(active_workers.keys())}")
# 각 워커의 상태
for worker_name, tasks in active_workers.items():
print(f" {worker_name}: {len(tasks)}개 작업 실행 중")
else:
print("활성 워커가 없습니다")
# 등록된 작업 목록
registered_tasks = inspect.registered()
if registered_tasks:
print("\n등록된 작업:")
for worker_name, tasks in registered_tasks.items():
print(f" {worker_name}:")
for task in tasks:
if task.startswith('worker.'):
print(f" - {task}")
except Exception as e:
print(f"워커 상태 확인 오류: {e}")
def main():
"""메인 함수"""
import argparse
parser = argparse.ArgumentParser(description="워커 시스템 테스트 클라이언트")
parser.add_argument('--image', type=str, help='테스트할 이미지 파일 경로')
parser.add_argument('--redis-url', type=str, help='Redis 서버 URL')
parser.add_argument('--test-type', choices=['translate', 'ocr', 'status', 'all'],
default='all', help='테스트 유형')
args = parser.parse_args()
# Redis URL 설정
if args.redis_url:
global REDIS_URL
REDIS_URL = args.redis_url
celery_client.conf.broker_url = REDIS_URL
celery_client.conf.result_backend = REDIS_URL
print(f"Redis URL: {REDIS_URL}")
# 이미지 데이터 준비
image_data = None
if args.image:
if not os.path.exists(args.image):
print(f"이미지 파일이 존재하지 않습니다: {args.image}")
return
image_data = load_image_as_base64(args.image)
if not image_data:
return
# 테스트 실행
if args.test_type in ['status', 'all']:
check_worker_status()
if args.test_type in ['ocr', 'all'] and args.image:
test_ocr_task(image_path=args.image)
if args.test_type in ['translate', 'all'] and args.image:
test_translate_me_task(image_path=args.image)
if not args.image and args.test_type != 'status':
print("\n사용법:")
print(" python test_worker_client.py --image sample.jpg --test-type translate")
print(" python test_worker_client.py --test-type status")
if __name__ == "__main__":
main()