227 lines
7.1 KiB
Python
Executable File
227 lines
7.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
워커 시스템 테스트용 클라이언트
|
|
메인 서버에서 워커로 작업을 전송하고 결과를 받아오는 예제
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import base64
|
|
import json
|
|
import time
|
|
from celery import Celery
|
|
|
|
# Redis 설정
|
|
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
|
|
|
|
# Celery 앱 생성 (클라이언트용)
|
|
celery_client = Celery(
|
|
"image_worker_client",
|
|
broker=REDIS_URL,
|
|
backend=REDIS_URL
|
|
)
|
|
|
|
# Celery 설정
|
|
celery_client.conf.update(
|
|
task_serializer='json',
|
|
accept_content=['json'],
|
|
result_serializer='json',
|
|
timezone='Asia/Seoul',
|
|
enable_utc=True,
|
|
)
|
|
|
|
def load_image_as_base64(image_path):
|
|
"""이미지 파일을 base64로 인코딩"""
|
|
try:
|
|
with open(image_path, 'rb') as f:
|
|
image_data = f.read()
|
|
return base64.b64encode(image_data).decode('utf-8')
|
|
except Exception as e:
|
|
print(f"이미지 로딩 실패: {e}")
|
|
return None
|
|
|
|
def test_translate_me_task(image_path=None, image_data=None):
|
|
"""translate_me 작업 테스트"""
|
|
print("=== translate_me 작업 테스트 ===")
|
|
|
|
# 작업 파라미터 설정
|
|
task_kwargs = {
|
|
'toggle_states': {
|
|
'ocr': True,
|
|
'watermark_text': '번역완료'
|
|
},
|
|
'unwanted_texts': {
|
|
'광고': '이미지삭제',
|
|
'홍보': '공지'
|
|
},
|
|
'index': 0,
|
|
'file_prefix': 'test'
|
|
}
|
|
|
|
if image_path:
|
|
task_kwargs['image_path'] = image_path
|
|
print(f"이미지 파일 경로 사용: {image_path}")
|
|
elif image_data:
|
|
task_kwargs['image_data'] = image_data
|
|
print("base64 이미지 데이터 사용")
|
|
else:
|
|
print("이미지 경로 또는 데이터가 필요합니다")
|
|
return None
|
|
|
|
# 작업 전송
|
|
try:
|
|
print("작업을 워커에 전송 중...")
|
|
result = celery_client.send_task(
|
|
'worker.translate_me_task',
|
|
kwargs=task_kwargs
|
|
)
|
|
|
|
print(f"작업 ID: {result.id}")
|
|
print("결과 대기 중...")
|
|
|
|
# 결과 대기 (최대 300초)
|
|
task_result = result.get(timeout=300)
|
|
|
|
print("\n=== 작업 결과 ===")
|
|
print(f"상태: {task_result.get('status')}")
|
|
print(f"워커 이름: {task_result.get('worker_name')}")
|
|
print(f"원본 경로: {task_result.get('original_path')}")
|
|
print(f"결과 경로: {task_result.get('result_path')}")
|
|
print(f"메시지: {task_result.get('message')}")
|
|
|
|
# 결과 이미지 저장
|
|
output_image = task_result.get('output_image')
|
|
if output_image:
|
|
output_path = f"test_result_{int(time.time())}.png"
|
|
with open(output_path, 'wb') as f:
|
|
f.write(base64.b64decode(output_image))
|
|
print(f"결과 이미지 저장: {output_path}")
|
|
|
|
return task_result
|
|
|
|
except Exception as e:
|
|
print(f"작업 실행 오류: {e}")
|
|
return None
|
|
|
|
def test_ocr_task(image_path=None, image_data=None):
|
|
"""OCR 작업 테스트"""
|
|
print("\n=== OCR 작업 테스트 ===")
|
|
|
|
task_kwargs = {}
|
|
|
|
if image_path:
|
|
task_kwargs['image_path'] = image_path
|
|
print(f"이미지 파일 경로 사용: {image_path}")
|
|
elif image_data:
|
|
task_kwargs['image_data'] = image_data
|
|
print("base64 이미지 데이터 사용")
|
|
else:
|
|
print("이미지 경로 또는 데이터가 필요합니다")
|
|
return None
|
|
|
|
try:
|
|
print("OCR 작업을 워커에 전송 중...")
|
|
result = celery_client.send_task(
|
|
'worker.ocr_task',
|
|
kwargs=task_kwargs
|
|
)
|
|
|
|
print(f"작업 ID: {result.id}")
|
|
print("결과 대기 중...")
|
|
|
|
# 결과 대기 (최대 60초)
|
|
task_result = result.get(timeout=60)
|
|
|
|
print("\n=== OCR 결과 ===")
|
|
print(f"상태: {task_result.get('status')}")
|
|
print(f"워커 이름: {task_result.get('worker_name')}")
|
|
print(f"감지된 텍스트: {task_result.get('ocr_texts')}")
|
|
print(f"중국어 텍스트: {task_result.get('chinese_texts')}")
|
|
print(f"OCR 박스 수: {len(task_result.get('ocr_boxes', []))}")
|
|
|
|
return task_result
|
|
|
|
except Exception as e:
|
|
print(f"OCR 작업 실행 오류: {e}")
|
|
return None
|
|
|
|
def check_worker_status():
|
|
"""워커 상태 확인"""
|
|
print("=== 워커 상태 확인 ===")
|
|
|
|
try:
|
|
# 간단한 ping 작업 (없으면 OCR 작업으로 대체)
|
|
inspect = celery_client.control.inspect()
|
|
|
|
# 활성 워커 목록
|
|
active_workers = inspect.active()
|
|
if active_workers:
|
|
print(f"활성 워커: {list(active_workers.keys())}")
|
|
|
|
# 각 워커의 상태
|
|
for worker_name, tasks in active_workers.items():
|
|
print(f" {worker_name}: {len(tasks)}개 작업 실행 중")
|
|
else:
|
|
print("활성 워커가 없습니다")
|
|
|
|
# 등록된 작업 목록
|
|
registered_tasks = inspect.registered()
|
|
if registered_tasks:
|
|
print("\n등록된 작업:")
|
|
for worker_name, tasks in registered_tasks.items():
|
|
print(f" {worker_name}:")
|
|
for task in tasks:
|
|
if task.startswith('worker.'):
|
|
print(f" - {task}")
|
|
|
|
except Exception as e:
|
|
print(f"워커 상태 확인 오류: {e}")
|
|
|
|
def main():
|
|
"""메인 함수"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="워커 시스템 테스트 클라이언트")
|
|
parser.add_argument('--image', type=str, help='테스트할 이미지 파일 경로')
|
|
parser.add_argument('--redis-url', type=str, help='Redis 서버 URL')
|
|
parser.add_argument('--test-type', choices=['translate', 'ocr', 'status', 'all'],
|
|
default='all', help='테스트 유형')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Redis URL 설정
|
|
if args.redis_url:
|
|
global REDIS_URL
|
|
REDIS_URL = args.redis_url
|
|
celery_client.conf.broker_url = REDIS_URL
|
|
celery_client.conf.result_backend = REDIS_URL
|
|
|
|
print(f"Redis URL: {REDIS_URL}")
|
|
|
|
# 이미지 데이터 준비
|
|
image_data = None
|
|
if args.image:
|
|
if not os.path.exists(args.image):
|
|
print(f"이미지 파일이 존재하지 않습니다: {args.image}")
|
|
return
|
|
image_data = load_image_as_base64(args.image)
|
|
if not image_data:
|
|
return
|
|
|
|
# 테스트 실행
|
|
if args.test_type in ['status', 'all']:
|
|
check_worker_status()
|
|
|
|
if args.test_type in ['ocr', 'all'] and args.image:
|
|
test_ocr_task(image_path=args.image)
|
|
|
|
if args.test_type in ['translate', 'all'] and args.image:
|
|
test_translate_me_task(image_path=args.image)
|
|
|
|
if not args.image and args.test_type != 'status':
|
|
print("\n사용법:")
|
|
print(" python test_worker_client.py --image sample.jpg --test-type translate")
|
|
print(" python test_worker_client.py --test-type status")
|
|
|
|
if __name__ == "__main__":
|
|
main() |