메인서버와 워커 시스템 분리

- 메인서버: 클라이언트 요청 중계만 담당
- 워커 시스템: 별도 디렉토리로 분리 (~work/worker-system)
- Docker Compose에서 celery worker 제거
- 태스크 전송을 외부 워커로 변경 (worker.translate_task 등)
- 작업 상태 확인 API 추가 (/task_status/{task_id})
- 워커 자동화 스크립트 및 Docker 설정 포함
This commit is contained in:
AGX 2025-07-17 10:31:39 +09:00
parent fd6815f11a
commit 078b3258f9
11 changed files with 618 additions and 57 deletions

Binary file not shown.

View File

@ -58,8 +58,9 @@ async def validate_user(user_id):
if not allowed:
raise HTTPException(status_code=403, detail="권한이 없습니다.")
# ==== 셀러리 태스크 ====
# ==== 셀러리 태스크 (워커 시스템으로 전송) ====
def start_celery_task(task_name, **kwargs):
"""워커 시스템으로 작업 전송"""
return celery_app.send_task(task_name, kwargs=kwargs)
# ==== 엔드포인트 ====
@ -67,23 +68,43 @@ def start_celery_task(task_name, **kwargs):
async def translate_me(req: TranslateRequest):
await validate_user(req.user_id)
filename = f"{uuid.uuid4().hex}_{int(time.time())}.png"
# (여기서 파일 경로 생성 및 저장)
task = start_celery_task("app.tasks.translate_task", **req.dict(), filename=filename)
logger.info(f"태스크 등록: {task.id}, 파일명: {filename}")
# 워커 시스템의 translate_task로 전송
task = start_celery_task("worker.translate_task", **req.dict(), filename=filename)
logger.info(f"번역 태스크 등록: {task.id}, 파일명: {filename}")
return {"task_id": task.id, "filename": filename}
@app.post("/inpaint_me")
async def inpaint_me(req: InpaintRequest):
await validate_user(req.user_id)
task = start_celery_task("app.tasks.inpaint_task", **req.dict())
# 워커 시스템의 inpaint_task로 전송
task = start_celery_task("worker.inpaint_task", **req.dict())
logger.info(f"인페인팅 태스크 등록: {task.id}")
return {"task_id": task.id}
@app.post("/ocr_me")
async def ocr_me(req: OCRRequest):
await validate_user(req.user_id)
task = start_celery_task("app.tasks.ocr_task", **req.dict())
# 워커 시스템의 ocr_task로 전송
task = start_celery_task("worker.ocr_task", **req.dict())
logger.info(f"OCR 태스크 등록: {task.id}")
return {"task_id": task.id}
@app.get("/task_status/{task_id}")
async def get_task_status(task_id: str):
"""작업 상태 확인"""
try:
result = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None
}
except Exception as e:
raise HTTPException(status_code=404, detail=f"작업을 찾을 수 없습니다: {str(e)}")
@app.post("/upload_image")
async def upload_image(user_id: str = Form(...), file: UploadFile = File(...)):
original_dir = f"images/{user_id}/original/"
@ -104,3 +125,12 @@ def download_image(user_id: str, filename: str):
data = f.read()
os.remove(file_path) # 다운로드 직후 삭제
return Response(content=data, media_type="image/png")
@app.get("/")
async def root():
return {"message": "이미지 번역 메인 서버", "status": "running"}
@app.get("/health")
async def health_check():
"""헬스 체크"""
return {"status": "healthy", "timestamp": time.time()}

View File

@ -1,32 +0,0 @@
from app.celery_worker import celery_app
@celery_app.task(name="app.tasks.translate_task")
def translate_task(**kwargs):
# 실제 번역 처리 로직
return {
"ocr_texts": ["중국어1", "중국어2"],
"ocr_boxes": [
{"text": "중국어1", "box": [10, 20, 100, 120]},
{"text": "중국어2", "box": [110, 120, 200, 220]}
],
"translated_texts": ["한글1", "한글2"],
"inpainted_image": "base64string...."
}
@celery_app.task(name="app.tasks.inpaint_task")
def inpaint_task(**kwargs):
# 실제 인페인팅 처리 로직
return {
"inpainted_image": "base64string...."
}
@celery_app.task(name="app.tasks.ocr_task")
def ocr_task(**kwargs):
# 실제 OCR 처리 로직
return {
"ocr_texts": ["중국어1", "중국어2"],
"ocr_boxes": [
{"text": "중국어1", "box": [10, 20, 100, 120]},
{"text": "중국어2", "box": [110, 120, 200, 220]}
]
}

View File

@ -11,30 +11,13 @@ services:
- "7890:7890"
depends_on:
- redis
celery:
build: .
container_name: celery_worker
command: celery -A app.celery_worker worker --loglevel=info --concurrency=2
volumes:
- ./app:/app/app
depends_on:
- redis
celery-beat:
build: .
container_name: celery_beat
command: celery -A app.celery_worker beat --loglevel=info
volumes:
- ./app:/app/app
- ./images:/app/images
depends_on:
- redis
redis:
image: redis:6.2
container_name: redis_server
ports:
- "6379:6379"
flower:
build: .
container_name: flower_monitor

176
worker.py Normal file
View File

@ -0,0 +1,176 @@
#!/usr/bin/env python3
"""
독립적인 Celery 워커 시스템
메인서버의 Redis와 연결하여 작업을 처리합니다.
"""
import os
import sys
import argparse
from celery import Celery
from modules.image_processor2 import ImageProcessor
from modules.loggerModule import Logger1
from modules.gpt_client import GPTClient
def get_base_dir():
"""
실행 환경에 따라 base_dir을 설정하는 메서드.
"""
if getattr(sys, 'frozen', False):
base_dir = os.path.dirname(sys.executable)
internal_dir = os.path.join(base_dir, 'lib')
if os.path.exists(internal_dir):
return internal_dir
else:
base_dir = os.path.dirname(os.path.abspath(__file__))
return base_dir
# 환경 변수에서 Redis URL 가져오기 (기본값은 로컬 메인서버)
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
MAIN_SERVER_HOST = os.getenv("MAIN_SERVER_HOST", "localhost")
# Celery 앱 생성
celery_app = Celery(
"image_worker",
broker=REDIS_URL,
backend=REDIS_URL
)
# Celery 설정
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Seoul',
enable_utc=True,
task_routes={
'worker.translate_task': {'queue': 'translate'},
'worker.inpaint_task': {'queue': 'inpaint'},
'worker.ocr_task': {'queue': 'ocr'},
}
)
# 글로벌 ImageProcessor 인스턴스
image_processor = None
def initialize_processor():
"""ImageProcessor 초기화"""
global image_processor
if image_processor is None:
logger = Logger1()
gpt_client = GPTClient()
base_dir = get_base_dir()
font_path = os.path.join(base_dir, "modules", "fonts", "HakgyoansimDunggeunmisoTTFB.ttf")
print(f"워커 초기화 중...")
print(f"Base directory: {base_dir}")
print(f"Font path: {font_path}")
print(f"Redis URL: {REDIS_URL}")
image_processor = ImageProcessor(logger, gpt_client, base_dir, font_path)
print("ImageProcessor 초기화 완료")
return image_processor
@celery_app.task(name="worker.translate_task", bind=True)
def translate_task(self, **kwargs):
"""번역 작업 처리"""
try:
processor = initialize_processor()
# 실제 번역 로직 호출
# TODO: ImageProcessor의 번역 메서드와 연결
result = {
"task_id": self.request.id,
"status": "completed",
"ocr_texts": ["번역된 텍스트1", "번역된 텍스트2"],
"ocr_boxes": [
{"text": "번역된 텍스트1", "box": [10, 20, 100, 120]},
{"text": "번역된 텍스트2", "box": [110, 120, 200, 220]}
],
"translated_texts": ["한글 번역1", "한글 번역2"],
"inpainted_image": "base64_encoded_image_data"
}
print(f"번역 작업 완료: {self.request.id}")
return result
except Exception as e:
print(f"번역 작업 오류: {str(e)}")
raise self.retry(countdown=60, max_retries=3)
@celery_app.task(name="worker.inpaint_task", bind=True)
def inpaint_task(self, **kwargs):
"""인페인팅 작업 처리"""
try:
processor = initialize_processor()
# 실제 인페인팅 로직 호출
# TODO: ImageProcessor의 인페인팅 메서드와 연결
result = {
"task_id": self.request.id,
"status": "completed",
"inpainted_image": "base64_encoded_inpainted_image"
}
print(f"인페인팅 작업 완료: {self.request.id}")
return result
except Exception as e:
print(f"인페인팅 작업 오류: {str(e)}")
raise self.retry(countdown=60, max_retries=3)
@celery_app.task(name="worker.ocr_task", bind=True)
def ocr_task(self, **kwargs):
"""OCR 작업 처리"""
try:
processor = initialize_processor()
# 실제 OCR 로직 호출
# TODO: ImageProcessor의 OCR 메서드와 연결
result = {
"task_id": self.request.id,
"status": "completed",
"ocr_texts": ["감지된 텍스트1", "감지된 텍스트2"],
"ocr_boxes": [
{"text": "감지된 텍스트1", "box": [10, 20, 100, 120]},
{"text": "감지된 텍스트2", "box": [110, 120, 200, 220]}
]
}
print(f"OCR 작업 완료: {self.request.id}")
return result
except Exception as e:
print(f"OCR 작업 오류: {str(e)}")
raise self.retry(countdown=60, max_retries=3)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="이미지 처리 Celery 워커")
parser.add_argument('--concurrency', type=int, default=2, help='동시 처리 작업 수')
parser.add_argument('--redis-url', type=str, help='Redis 서버 URL')
parser.add_argument('--main-server', type=str, help='메인 서버 주소')
args = parser.parse_args()
# 명령행 인수로 Redis URL 오버라이드
if args.redis_url:
os.environ["REDIS_URL"] = args.redis_url
REDIS_URL = args.redis_url
if args.main_server:
os.environ["MAIN_SERVER_HOST"] = args.main_server
MAIN_SERVER_HOST = args.main_server
print(f"워커 시작 중...")
print(f"Redis URL: {REDIS_URL}")
print(f"메인 서버: {MAIN_SERVER_HOST}")
print(f"동시 처리 수: {args.concurrency}")
# 워커 실행
celery_app.worker_main([
'worker',
'--loglevel=info',
f'--concurrency={args.concurrency}',
'--queues=translate,inpaint,ocr'
])

View File

@ -0,0 +1,28 @@
FROM python:3.8-slim
# 시스템 패키지 업데이트 및 필요한 패키지 설치
RUN apt-get update && apt-get install -y \
libglib2.0-0 \
libsm6 \
libxext6 \
libxrender-dev \
libgomp1 \
libgl1-mesa-glx \
libglib2.0-0 \
wget \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /worker
# requirements.txt 먼저 복사하여 도커 캐시 활용
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 전체 소스 복사
COPY . .
# 워커 실행 스크립트에 실행 권한 부여
RUN chmod +x worker.py
# 기본 명령어
CMD ["python", "worker.py", "--concurrency", "2"]

View File

@ -0,0 +1,130 @@
# 이미지 처리 워커 시스템
메인서버와 분리된 독립적인 이미지 처리 워커 시스템입니다.
## 🏗️ 아키텍처
```
클라이언트 → 메인서버 (FastAPI) → Redis → 워커 시스템 (ImageProcessor)
```
- **메인서버**: 클라이언트 요청 중계, 인증, API 제공
- **워커**: 실제 OCR, 번역, 인페인팅 작업 처리
- **Redis**: 작업 큐 및 결과 저장소
## 🚀 실행 방법
### 1. Docker 모드 (권장)
```bash
cd ~/work/worker-system
WORKER_MODE=docker ./start_worker.sh
```
### 2. 로컬 모드
```bash
cd ~/work/worker-system
WORKER_MODE=local ./start_worker.sh
```
### 3. 원격 모드
```bash
cd ~/work/worker-system
REDIS_URL="redis://메인서버IP:6379/0" WORKER_MODE=remote ./start_worker.sh
```
## ⚙️ 환경 변수
| 변수 | 설명 | 기본값 |
|------|------|--------|
| `REDIS_URL` | Redis 서버 주소 | `redis://localhost:6379/0` |
| `MAIN_SERVER_HOST` | 메인 서버 주소 | `localhost` |
| `CONCURRENCY` | 동시 처리 작업 수 | `2` |
| `WORKER_MODE` | 실행 모드 | `docker` |
## 📋 사용 예시
### 로컬에서 실행
```bash
# 기본 설정으로 실행
./start_worker.sh
# 동시 처리 수 조정
CONCURRENCY=4 ./start_worker.sh
# 원격 메인서버에 연결
REDIS_URL="redis://192.168.1.100:6379/0" MAIN_SERVER_HOST="192.168.1.100" ./start_worker.sh
```
### 원격 서버에서 실행
```bash
# 원격 서버에 워커 시스템 배포
scp -r ~/work/worker-system user@remote-server:~/
# 원격 서버에서 실행
ssh user@remote-server
cd ~/worker-system
REDIS_URL="redis://메인서버IP:6379/0" WORKER_MODE=docker ./start_worker.sh
```
## 🔧 개발 모드
### 직접 실행
```bash
python worker.py --concurrency 2 --redis-url redis://localhost:6379/0
```
### 특정 큐만 처리
```bash
python worker.py --concurrency 1 --redis-url redis://localhost:6379/0
```
## 📊 모니터링
Flower를 통해 워커 상태를 모니터링할 수 있습니다:
- 메인서버의 Flower: http://메인서버:5555
## 🔄 자동 재시작
systemd를 사용한 자동 재시작 설정:
```bash
# /etc/systemd/system/image-worker.service
[Unit]
Description=Image Processing Worker
After=docker.service
[Service]
Type=forking
User=ckh08045
WorkingDirectory=/home/ckh08045/work/worker-system
Environment=WORKER_MODE=docker
Environment=REDIS_URL=redis://localhost:6379/0
ExecStart=/home/ckh08045/work/worker-system/start_worker.sh
Restart=always
[Install]
WantedBy=multi-user.target
```
```bash
sudo systemctl enable image-worker
sudo systemctl start image-worker
```
## 🐛 문제 해결
### Redis 연결 오류
- 메인서버의 Redis가 실행 중인지 확인
- 포트 6379가 열려있는지 확인
- 방화벽 설정 확인
### Docker 권한 오류
```bash
sudo usermod -aG docker $USER
newgrp docker
```
### 의존성 오류
```bash
pip install -r requirements.txt
```

0
~/work/worker-system/app Normal file
View File

View File

@ -0,0 +1,24 @@
version: "3.8"
services:
worker:
build: .
container_name: image_worker
command: python worker.py --concurrency 2 --redis-url redis://host.docker.internal:6379/0
volumes:
- ./modules:/worker/modules
- ./models:/worker/models
- ./temp:/worker/temp
environment:
- REDIS_URL=redis://host.docker.internal:6379/0
- MAIN_SERVER_HOST=host.docker.internal
extra_hosts:
- "host.docker.internal:host-gateway"
restart: unless-stopped
# 로컬에서 Redis가 필요한 경우 (메인서버와 분리된 환경)
# redis:
# image: redis:6.2
# container_name: worker_redis
# ports:
# - "6380:6379"

View File

@ -0,0 +1,46 @@
#!/bin/bash
# 워커 자동 시작 스크립트
echo "=== 이미지 처리 워커 시작 ==="
# 기본 설정
REDIS_URL=${REDIS_URL:-"redis://localhost:6379/0"}
MAIN_SERVER=${MAIN_SERVER:-"localhost"}
CONCURRENCY=${CONCURRENCY:-"2"}
WORKER_MODE=${WORKER_MODE:-"docker"}
echo "Redis URL: $REDIS_URL"
echo "메인 서버: $MAIN_SERVER"
echo "동시 처리 수: $CONCURRENCY"
echo "실행 모드: $WORKER_MODE"
case $WORKER_MODE in
"docker")
echo "Docker 모드로 워커 시작..."
docker-compose down
docker-compose build
docker-compose up -d
echo "워커 컨테이너 시작 완료!"
docker-compose logs -f worker
;;
"local")
echo "로컬 모드로 워커 시작..."
python worker.py --concurrency $CONCURRENCY --redis-url $REDIS_URL --main-server $MAIN_SERVER
;;
"remote")
echo "원격 모드로 워커 시작..."
# 원격 서버의 Redis에 연결
python worker.py --concurrency $CONCURRENCY --redis-url $REDIS_URL --main-server $MAIN_SERVER
;;
*)
echo "사용법: WORKER_MODE=docker|local|remote ./start_worker.sh"
echo "환경 변수:"
echo " REDIS_URL: Redis 서버 주소 (기본값: redis://localhost:6379/0)"
echo " MAIN_SERVER: 메인 서버 주소 (기본값: localhost)"
echo " CONCURRENCY: 동시 처리 수 (기본값: 2)"
exit 1
;;
esac

View File

@ -0,0 +1,176 @@
#!/usr/bin/env python3
"""
독립적인 Celery 워커 시스템
메인서버의 Redis와 연결하여 작업을 처리합니다.
"""
import os
import sys
import argparse
from celery import Celery
from modules.image_processor2 import ImageProcessor
from modules.loggerModule import Logger1
from modules.gpt_client import GPTClient
def get_base_dir():
"""
실행 환경에 따라 base_dir을 설정하는 메서드.
"""
if getattr(sys, 'frozen', False):
base_dir = os.path.dirname(sys.executable)
internal_dir = os.path.join(base_dir, 'lib')
if os.path.exists(internal_dir):
return internal_dir
else:
base_dir = os.path.dirname(os.path.abspath(__file__))
return base_dir
# 환경 변수에서 Redis URL 가져오기 (기본값은 로컬 메인서버)
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
MAIN_SERVER_HOST = os.getenv("MAIN_SERVER_HOST", "localhost")
# Celery 앱 생성
celery_app = Celery(
"image_worker",
broker=REDIS_URL,
backend=REDIS_URL
)
# Celery 설정
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Seoul',
enable_utc=True,
task_routes={
'worker.translate_task': {'queue': 'translate'},
'worker.inpaint_task': {'queue': 'inpaint'},
'worker.ocr_task': {'queue': 'ocr'},
}
)
# 글로벌 ImageProcessor 인스턴스
image_processor = None
def initialize_processor():
"""ImageProcessor 초기화"""
global image_processor
if image_processor is None:
logger = Logger1()
gpt_client = GPTClient()
base_dir = get_base_dir()
font_path = os.path.join(base_dir, "modules", "fonts", "HakgyoansimDunggeunmisoTTFB.ttf")
print(f"워커 초기화 중...")
print(f"Base directory: {base_dir}")
print(f"Font path: {font_path}")
print(f"Redis URL: {REDIS_URL}")
image_processor = ImageProcessor(logger, gpt_client, base_dir, font_path)
print("ImageProcessor 초기화 완료")
return image_processor
@celery_app.task(name="worker.translate_task", bind=True)
def translate_task(self, **kwargs):
"""번역 작업 처리"""
try:
processor = initialize_processor()
# 실제 번역 로직 호출
# TODO: ImageProcessor의 번역 메서드와 연결
result = {
"task_id": self.request.id,
"status": "completed",
"ocr_texts": ["번역된 텍스트1", "번역된 텍스트2"],
"ocr_boxes": [
{"text": "번역된 텍스트1", "box": [10, 20, 100, 120]},
{"text": "번역된 텍스트2", "box": [110, 120, 200, 220]}
],
"translated_texts": ["한글 번역1", "한글 번역2"],
"inpainted_image": "base64_encoded_image_data"
}
print(f"번역 작업 완료: {self.request.id}")
return result
except Exception as e:
print(f"번역 작업 오류: {str(e)}")
raise self.retry(countdown=60, max_retries=3)
@celery_app.task(name="worker.inpaint_task", bind=True)
def inpaint_task(self, **kwargs):
"""인페인팅 작업 처리"""
try:
processor = initialize_processor()
# 실제 인페인팅 로직 호출
# TODO: ImageProcessor의 인페인팅 메서드와 연결
result = {
"task_id": self.request.id,
"status": "completed",
"inpainted_image": "base64_encoded_inpainted_image"
}
print(f"인페인팅 작업 완료: {self.request.id}")
return result
except Exception as e:
print(f"인페인팅 작업 오류: {str(e)}")
raise self.retry(countdown=60, max_retries=3)
@celery_app.task(name="worker.ocr_task", bind=True)
def ocr_task(self, **kwargs):
"""OCR 작업 처리"""
try:
processor = initialize_processor()
# 실제 OCR 로직 호출
# TODO: ImageProcessor의 OCR 메서드와 연결
result = {
"task_id": self.request.id,
"status": "completed",
"ocr_texts": ["감지된 텍스트1", "감지된 텍스트2"],
"ocr_boxes": [
{"text": "감지된 텍스트1", "box": [10, 20, 100, 120]},
{"text": "감지된 텍스트2", "box": [110, 120, 200, 220]}
]
}
print(f"OCR 작업 완료: {self.request.id}")
return result
except Exception as e:
print(f"OCR 작업 오류: {str(e)}")
raise self.retry(countdown=60, max_retries=3)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="이미지 처리 Celery 워커")
parser.add_argument('--concurrency', type=int, default=2, help='동시 처리 작업 수')
parser.add_argument('--redis-url', type=str, help='Redis 서버 URL')
parser.add_argument('--main-server', type=str, help='메인 서버 주소')
args = parser.parse_args()
# 명령행 인수로 Redis URL 오버라이드
if args.redis_url:
os.environ["REDIS_URL"] = args.redis_url
REDIS_URL = args.redis_url
if args.main_server:
os.environ["MAIN_SERVER_HOST"] = args.main_server
MAIN_SERVER_HOST = args.main_server
print(f"워커 시작 중...")
print(f"Redis URL: {REDIS_URL}")
print(f"메인 서버: {MAIN_SERVER_HOST}")
print(f"동시 처리 수: {args.concurrency}")
# 워커 실행
celery_app.worker_main([
'worker',
'--loglevel=info',
f'--concurrency={args.concurrency}',
'--queues=translate,inpaint,ocr'
])