462 lines
16 KiB
Python
462 lines
16 KiB
Python
"""
|
|
리팩토링된 FastAPI 서버 (번역 / 배경제거 전용)
|
|
=================================================
|
|
요구사항 반영 요약
|
|
- 클라이언트는 **1회 호출**로 처리 완료를 시도(`sync=true&wait=60` 등).
|
|
- 지정한 `wait` 시간 내 Celery 작업이 끝나면 **바로 결과 이미지 스트리밍 반환**
|
|
- 시간 내 끝나지 않으면 `202 Accepted` + `task_id`만 반환 → 이후 폴링/다운로드
|
|
- 업로드는 **멀티파트 스트리밍**으로 메모리 사용 최소화
|
|
- 결과/상태 확인 및 파일 다운로드 엔드포인트 유지
|
|
- 인증은 개발 단계이므로 항상 True 반환
|
|
- 원격 Celery 워커 여러 대 사용을 전제로 `send_task` 그대로 사용
|
|
|
|
필요 엔드포인트
|
|
----------------
|
|
POST /translate # 번역 요청 (sync/async 선택 가능)
|
|
POST /remove_background # 배경제거 요청 (비동기 기본)
|
|
GET /tasks/{task_id} # 상태/결과 조회 (옵션: wait, download)
|
|
GET /files/{filename} # 저장 파일 스트리밍 다운로드
|
|
GET /health, / # 헬스체크/루트
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import uuid
|
|
import time
|
|
import json
|
|
import base64
|
|
import asyncio
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, AsyncGenerator
|
|
|
|
import aiofiles
|
|
from fastapi import (
|
|
FastAPI,
|
|
UploadFile,
|
|
File,
|
|
Depends,
|
|
Form,
|
|
HTTPException,
|
|
Query,
|
|
Response,
|
|
)
|
|
from fastapi.responses import StreamingResponse, JSONResponse
|
|
from pydantic import BaseModel
|
|
from pydantic_settings import BaseSettings
|
|
from celery.result import AsyncResult
|
|
from app.deps.auth import auth_dep
|
|
from app.celery_worker_for_main import celery_app # type: ignore
|
|
# from app.supabase_auth import check_user_permission # 실제 사용 시 주석 해제
|
|
from app.stream_utils import upload_to_b64
|
|
|
|
|
|
# =====================================================
|
|
# 설정 & 로깅
|
|
# =====================================================
|
|
class Settings(BaseSettings):
|
|
TEMP_STORAGE: str = "/app/temp_files"
|
|
MAIN_SERVER_URL: str = "http://fastapi:7890"
|
|
CHUNK_SIZE: int = 1024 * 1024 # 1MB
|
|
MAX_UPLOAD_MB: int = 10 # 업로드 크기 제한 (옵션)
|
|
|
|
class Config:
|
|
env_file = ".env"
|
|
env_file_encoding = "utf-8"
|
|
|
|
|
|
settings = Settings()
|
|
|
|
MAX_BYTES = settings.MAX_UPLOAD_MB * 1024 * 1024
|
|
|
|
# 로거 기본 설정 (필요시 logger_module.py로 분리)
|
|
logger = logging.getLogger("fastapi_app")
|
|
if not logger.handlers:
|
|
logger.setLevel(logging.INFO)
|
|
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
|
|
sh = logging.StreamHandler()
|
|
sh.setFormatter(fmt)
|
|
logger.addHandler(sh)
|
|
|
|
|
|
# =====================================================
|
|
# 파일 서비스 (스트리밍 저장/다운로드)
|
|
# =====================================================
|
|
class TempFileService:
|
|
def __init__(self, base_dir: str, chunk_size: int):
|
|
self.base_path = Path(base_dir)
|
|
self.base_path.mkdir(parents=True, exist_ok=True)
|
|
self.chunk_size = chunk_size
|
|
|
|
def _new_filename(self, user_id: str, ext: str = "png") -> Path:
|
|
fname = f"{user_id}_{uuid.uuid4().hex}_{int(time.time())}.{ext}"
|
|
return self.base_path / fname
|
|
|
|
async def save_upload(
|
|
self,
|
|
user_id: str,
|
|
upload: UploadFile,
|
|
max_mb: Optional[int] = None
|
|
) -> str:
|
|
"""업로드 파일을 스트리밍으로 저장"""
|
|
ext = (upload.filename or "png").split(".")[-1].lower()
|
|
path = self._new_filename(user_id, ext)
|
|
|
|
written = 0
|
|
limit = (max_mb or 0) * 1024 * 1024
|
|
|
|
async with aiofiles.open(path, "wb") as f:
|
|
while True:
|
|
chunk = await upload.read(self.chunk_size)
|
|
if not chunk:
|
|
break
|
|
written += len(chunk)
|
|
if limit and written > limit:
|
|
await f.close()
|
|
try:
|
|
path.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
raise HTTPException(status_code=413, detail="파일 크기가 너무 큽니다.")
|
|
await f.write(chunk)
|
|
|
|
logger.info(f"파일 업로드 완료: {path.name} (size={written} bytes)")
|
|
return path.name
|
|
|
|
def save_bytes(self, user_id: str, data: bytes, ext: str = "png") -> str:
|
|
"""바이너리 데이터를 직접 파일로 저장"""
|
|
path = self._new_filename(user_id, ext)
|
|
path.write_bytes(data)
|
|
logger.info(f"파일 저장 완료: {path.name} (size={len(data)} bytes)")
|
|
return path.name
|
|
|
|
async def stream(self, path: Path) -> AsyncGenerator[bytes, None]:
|
|
"""파일을 chunk 단위로 읽어 스트리밍"""
|
|
async with aiofiles.open(path, "rb") as f:
|
|
while True:
|
|
chunk = await f.read(self.chunk_size)
|
|
if not chunk:
|
|
break
|
|
yield chunk
|
|
|
|
def full_path(self, filename: str) -> Path:
|
|
return self.base_path / filename
|
|
|
|
|
|
file_service = TempFileService(settings.TEMP_STORAGE, settings.CHUNK_SIZE)
|
|
|
|
|
|
# =====================================================
|
|
# 태스크 서비스 (Celery 전송/조회)
|
|
# =====================================================
|
|
class TaskService:
|
|
def __init__(self, main_server_url: str):
|
|
self.main_server_url = main_server_url
|
|
|
|
def enqueue(self, task_name: str, **kwargs):
|
|
"""원격 워커로 태스크 전송"""
|
|
kwargs["main_server_url"] = self.main_server_url
|
|
task = celery_app.send_task(task_name, kwargs=kwargs)
|
|
logger.info(f"태스크 등록: {task_name} -> {task.id}")
|
|
return task
|
|
|
|
def result(self, task_id: str) -> AsyncResult:
|
|
return AsyncResult(task_id, app=celery_app)
|
|
|
|
|
|
task_service = TaskService(settings.MAIN_SERVER_URL)
|
|
|
|
|
|
# =====================================================
|
|
# 스키마
|
|
# =====================================================
|
|
class TaskStatusResponse(BaseModel):
|
|
task_id: str
|
|
status: str
|
|
result: Optional[Dict] = None
|
|
error: Optional[str] = None
|
|
|
|
|
|
# =====================================================
|
|
# 인증 (현재는 항상 통과)
|
|
# =====================================================
|
|
async def validate_user(user_id: str) -> None:
|
|
# 실제 구현 예:
|
|
# allowed = await check_user_permission(user_id)
|
|
# if not allowed:
|
|
# raise HTTPException(status_code=403, detail="권한이 없습니다.")
|
|
return None
|
|
|
|
|
|
# =====================================================
|
|
# FastAPI 앱 생성
|
|
# =====================================================
|
|
app = FastAPI(title="이미지 번역 메인 서버", version="1.1.0")
|
|
|
|
|
|
# -----------------------------------------------------
|
|
# 번역 엔드포인트 (1회 호출로 처리 시도: sync + wait)
|
|
# -----------------------------------------------------
|
|
@app.post("/translate")
|
|
async def translate(
|
|
user_id: str = Form(...),
|
|
toggle_states: str = Form(...), # JSON string
|
|
unwanted_texts: str = Form(...), # JSON string
|
|
ocr_method: str = Form("paddleocr"),
|
|
inpaint_method: str = Form("lama"),
|
|
file: UploadFile = File(...),
|
|
sync: bool = Query(True, description="True면 wait 동안 처리 완료 시 바로 결과 반환"),
|
|
wait: int = Query(90, description="sync=True일 때 최대 대기 초"),
|
|
_=Depends(auth_dep) # 여기서 인증
|
|
):
|
|
"""
|
|
이미지 번역 작업 요청 (스트리밍 방식 - 디스크 저장 없음)
|
|
1. 파일을 메모리로 스트리밍 읽기
|
|
2. base64 인코딩하여 Celery 태스크로 전송
|
|
3. sync=True면 wait초 동안 결과를 기다렸다가 완료 시 바로 스트리밍 반환
|
|
- 미완료면 202 + task_id 반환
|
|
4. sync=False면 즉시 task_id 반환
|
|
"""
|
|
await validate_user(user_id)
|
|
|
|
# ① 업로드 → base64 (RAM 중복 제거)
|
|
try:
|
|
image_b64 = await upload_to_b64(file, MAX_BYTES)
|
|
except ValueError as e:
|
|
raise HTTPException(413 if "large" in str(e) else 400, str(e))
|
|
|
|
# (2) JSON 파싱
|
|
try:
|
|
toggle_states_dict: Dict = json.loads(toggle_states)
|
|
unwanted_texts_dict: Dict = json.loads(unwanted_texts)
|
|
except json.JSONDecodeError as e:
|
|
raise HTTPException(status_code=400, detail=f"JSON 파싱 오류: {e}")
|
|
|
|
# (3) 태스크 전송
|
|
task = task_service.enqueue(
|
|
"worker.translate_task",
|
|
user_id=user_id,
|
|
toggle_states=toggle_states_dict,
|
|
unwanted_texts=unwanted_texts_dict,
|
|
ocr_method=ocr_method,
|
|
inpaint_method = inpaint_method,
|
|
image_b64 = image_b64, # ✨
|
|
filename = file.filename or "image.png",
|
|
queue="translate",
|
|
)
|
|
|
|
if not sync:
|
|
return {
|
|
"task_id": task.id,
|
|
"message": "번역 작업이 시작되었습니다.",
|
|
}
|
|
|
|
# (4) sync=True -> wait 동안 결과 대기
|
|
result = task_service.result(task.id)
|
|
deadline = time.monotonic() + max(wait, 0)
|
|
|
|
while time.monotonic() < deadline:
|
|
if result.ready():
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
|
|
if not result.ready():
|
|
# 제한 시간 내 완료 안됨
|
|
return JSONResponse(
|
|
status_code=202,
|
|
content={
|
|
"task_id": task.id,
|
|
"status": result.status,
|
|
"message": "아직 처리 중입니다. /tasks/{task_id} 로 조회하세요.",
|
|
},
|
|
)
|
|
|
|
if not result.successful():
|
|
raise HTTPException(status_code=400, detail=f"작업 실패: {result.result}")
|
|
|
|
data = result.result # dict 가정
|
|
|
|
# 결과가 base64 이미지인 경우 바로 스트리밍 응답
|
|
if isinstance(data, dict) and "result_image" in data:
|
|
image_bytes = base64.b64decode(data["result_image"])
|
|
return Response(
|
|
image_bytes,
|
|
media_type="image/png",
|
|
headers={
|
|
"Content-Disposition": f"attachment; filename=translated_{task.id}.png",
|
|
"Content-Length": str(len(image_bytes)),
|
|
},
|
|
)
|
|
|
|
# 다른 형태면 JSON 반환
|
|
return {"task_id": task.id, "status": "SUCCESS", "result": data}
|
|
|
|
|
|
# -----------------------------------------------------
|
|
# 배경제거 엔드포인트 (비동기 기본 / 필요시 sync 추가 가능)
|
|
# -----------------------------------------------------
|
|
@app.post("/remove_background")
|
|
async def remove_background_api(
|
|
file: UploadFile = File(...),
|
|
sync: bool = Query(False, description="True면 wait 동안 처리 완료 시 바로 결과 반환"),
|
|
wait: int = Query(60, description="sync=True일 때 최대 대기 초"),
|
|
):
|
|
"""배경제거 작업 요청 (스트리밍 방식 - 디스크 저장 없음)"""
|
|
user_id = "rembg_user" # 별도 사용자 처리
|
|
|
|
try:
|
|
# ① 업로드 → base64 (RAM 중복 제거)
|
|
try:
|
|
image_b64 = await upload_to_b64(file, MAX_BYTES)
|
|
except ValueError as e:
|
|
raise HTTPException(413 if "large" in str(e) else 400, str(e))
|
|
|
|
task = task_service.enqueue(
|
|
"worker.rembg_task",
|
|
user_id=user_id,
|
|
image_b64 = image_b64, # ✨
|
|
filename = file.filename or "image.png",
|
|
queue="rembg", # ★ 추가
|
|
|
|
)
|
|
|
|
if not sync:
|
|
return {"task_id": task.id, "message": "배경제거 작업 시작"}
|
|
|
|
# sync=True -> wait 동안 결과 대기
|
|
result = task_service.result(task.id)
|
|
deadline = time.monotonic() + max(wait, 0)
|
|
|
|
while time.monotonic() < deadline:
|
|
if result.ready():
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
|
|
if not result.ready():
|
|
return JSONResponse(
|
|
status_code=202,
|
|
content={
|
|
"task_id": task.id,
|
|
"status": result.status,
|
|
"message": "아직 처리 중입니다. /tasks/{task_id} 로 조회하세요.",
|
|
},
|
|
)
|
|
|
|
if not result.successful():
|
|
raise HTTPException(status_code=400, detail=f"작업 실패: {result.result}")
|
|
|
|
data = result.result
|
|
|
|
# 결과가 base64 이미지인 경우 바로 스트리밍 응답
|
|
if isinstance(data, dict) and "result_image" in data:
|
|
image_bytes = base64.b64decode(data["result_image"])
|
|
return Response(
|
|
image_bytes,
|
|
media_type="image/png",
|
|
headers={
|
|
"Content-Disposition": f"attachment; filename=nobg_{task.id}.png",
|
|
"Content-Length": str(len(image_bytes)),
|
|
},
|
|
)
|
|
|
|
return {"task_id": task.id, "status": "SUCCESS", "result": data}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.exception("/remove_background 처리 중 오류")
|
|
raise HTTPException(status_code=400, detail=f"배경제거 처리 오류: {e}")
|
|
|
|
|
|
# -----------------------------------------------------
|
|
# 작업 상태/결과 조회 (옵션: wait, download)
|
|
# -----------------------------------------------------
|
|
@app.get("/tasks/{task_id}", response_model=TaskStatusResponse)
|
|
async def get_task_status(
|
|
task_id: str,
|
|
wait: int = Query(0, description="최대 대기 초 (long polling)"),
|
|
download: bool = Query(False, description="완료 시 곧바로 파일 스트리밍")
|
|
):
|
|
"""
|
|
- wait>0 : long polling (최대 wait초 대기 후 상태 반환)
|
|
- download=True : 완료 시 바로 파일(또는 base64)을 스트리밍 형태로 반환
|
|
"""
|
|
result = task_service.result(task_id)
|
|
|
|
deadline = time.monotonic() + max(wait, 0)
|
|
while wait and time.monotonic() < deadline:
|
|
if result.ready():
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
|
|
if not result.ready():
|
|
# 아직 처리 중
|
|
raise HTTPException(status_code=202, detail={"task_id": task_id, "status": result.status})
|
|
|
|
if not result.successful():
|
|
raise HTTPException(status_code=400, detail=str(result.result))
|
|
|
|
data = result.result # dict 가정
|
|
|
|
if download:
|
|
# base64 이미지가 있으면 바이너리 반환
|
|
if isinstance(data, dict) and "result_image" in data:
|
|
image_bytes = base64.b64decode(data["result_image"])
|
|
return Response(
|
|
image_bytes,
|
|
media_type="image/png",
|
|
headers={
|
|
"Content-Disposition": f"attachment; filename=result_{task_id}.png",
|
|
"Content-Length": str(len(image_bytes)),
|
|
},
|
|
)
|
|
|
|
# download=False -> JSON 상태
|
|
resp = TaskStatusResponse(task_id=task_id, status="SUCCESS", result=data)
|
|
return resp
|
|
|
|
|
|
# -----------------------------------------------------
|
|
# 파일 다운로드 (워커/클라이언트 공용)
|
|
# -----------------------------------------------------
|
|
@app.get("/files/{filename}")
|
|
async def download_file(filename: str):
|
|
path = file_service.full_path(filename)
|
|
if not path.exists():
|
|
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다.")
|
|
|
|
ext = path.suffix.lower()
|
|
if ext in {".png", ".jpg", ".jpeg"}:
|
|
media_type = f"image/{ext[1:]}"
|
|
else:
|
|
media_type = "application/octet-stream"
|
|
|
|
headers = {
|
|
"Content-Disposition": f"attachment; filename={filename}",
|
|
"Content-Length": str(path.stat().st_size),
|
|
}
|
|
|
|
return StreamingResponse(file_service.stream(path), media_type=media_type, headers=headers)
|
|
|
|
|
|
# -----------------------------------------------------
|
|
# 루트 & 헬스 체크
|
|
# -----------------------------------------------------
|
|
@app.get("/")
|
|
async def root():
|
|
return {
|
|
"message": "이미지 번역 메인 서버",
|
|
"status": "running",
|
|
"temp_storage": settings.TEMP_STORAGE,
|
|
}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
return {
|
|
"status": "healthy",
|
|
"timestamp": time.time(),
|
|
"temp_storage": settings.TEMP_STORAGE,
|
|
}
|