""" 리팩토링된 FastAPI 서버 (번역 / 배경제거 전용) ================================================= 요구사항 반영 요약 - 클라이언트는 **1회 호출**로 처리 완료를 시도(`sync=true&wait=60` 등). - 지정한 `wait` 시간 내 Celery 작업이 끝나면 **바로 결과 이미지 스트리밍 반환** - 시간 내 끝나지 않으면 `202 Accepted` + `task_id`만 반환 → 이후 폴링/다운로드 - 업로드는 **멀티파트 스트리밍**으로 메모리 사용 최소화 - 결과/상태 확인 및 파일 다운로드 엔드포인트 유지 - 인증은 개발 단계이므로 항상 True 반환 - 원격 Celery 워커 여러 대 사용을 전제로 `send_task` 그대로 사용 필요 엔드포인트 ---------------- POST /translate # 번역 요청 (sync/async 선택 가능) POST /remove_background # 배경제거 요청 (비동기 기본) GET /tasks/{task_id} # 상태/결과 조회 (옵션: wait, download) GET /files/{filename} # 저장 파일 스트리밍 다운로드 GET /health, / # 헬스체크/루트 """ from __future__ import annotations import os import uuid import time import json import base64 import asyncio import logging from pathlib import Path from typing import Dict, Optional, AsyncGenerator import aiofiles from fastapi import ( FastAPI, UploadFile, File, Depends, Form, HTTPException, Query, Response, ) from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel from pydantic_settings import BaseSettings from celery.result import AsyncResult from app.deps.auth import auth_dep from app.celery_worker_for_main import celery_app # type: ignore # from app.supabase_auth import check_user_permission # 실제 사용 시 주석 해제 from app.stream_utils import upload_to_b64 # ===================================================== # 설정 & 로깅 # ===================================================== class Settings(BaseSettings): TEMP_STORAGE: str = "/app/temp_files" MAIN_SERVER_URL: str = "http://fastapi:7890" CHUNK_SIZE: int = 1024 * 1024 # 1MB MAX_UPLOAD_MB: int = 10 # 업로드 크기 제한 (옵션) class Config: env_file = ".env" env_file_encoding = "utf-8" settings = Settings() MAX_BYTES = settings.MAX_UPLOAD_MB * 1024 * 1024 # 로거 기본 설정 (필요시 logger_module.py로 분리) logger = logging.getLogger("fastapi_app") if not logger.handlers: logger.setLevel(logging.INFO) fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") sh = logging.StreamHandler() sh.setFormatter(fmt) logger.addHandler(sh) # ===================================================== # 파일 서비스 (스트리밍 저장/다운로드) # ===================================================== class TempFileService: def __init__(self, base_dir: str, chunk_size: int): self.base_path = Path(base_dir) self.base_path.mkdir(parents=True, exist_ok=True) self.chunk_size = chunk_size def _new_filename(self, user_id: str, ext: str = "png") -> Path: fname = f"{user_id}_{uuid.uuid4().hex}_{int(time.time())}.{ext}" return self.base_path / fname async def save_upload( self, user_id: str, upload: UploadFile, max_mb: Optional[int] = None ) -> str: """업로드 파일을 스트리밍으로 저장""" ext = (upload.filename or "png").split(".")[-1].lower() path = self._new_filename(user_id, ext) written = 0 limit = (max_mb or 0) * 1024 * 1024 async with aiofiles.open(path, "wb") as f: while True: chunk = await upload.read(self.chunk_size) if not chunk: break written += len(chunk) if limit and written > limit: await f.close() try: path.unlink(missing_ok=True) except Exception: pass raise HTTPException(status_code=413, detail="파일 크기가 너무 큽니다.") await f.write(chunk) logger.info(f"파일 업로드 완료: {path.name} (size={written} bytes)") return path.name def save_bytes(self, user_id: str, data: bytes, ext: str = "png") -> str: """바이너리 데이터를 직접 파일로 저장""" path = self._new_filename(user_id, ext) path.write_bytes(data) logger.info(f"파일 저장 완료: {path.name} (size={len(data)} bytes)") return path.name async def stream(self, path: Path) -> AsyncGenerator[bytes, None]: """파일을 chunk 단위로 읽어 스트리밍""" async with aiofiles.open(path, "rb") as f: while True: chunk = await f.read(self.chunk_size) if not chunk: break yield chunk def full_path(self, filename: str) -> Path: return self.base_path / filename file_service = TempFileService(settings.TEMP_STORAGE, settings.CHUNK_SIZE) # ===================================================== # 태스크 서비스 (Celery 전송/조회) # ===================================================== class TaskService: def __init__(self, main_server_url: str): self.main_server_url = main_server_url def enqueue(self, task_name: str, **kwargs): """원격 워커로 태스크 전송""" kwargs["main_server_url"] = self.main_server_url task = celery_app.send_task(task_name, kwargs=kwargs) logger.info(f"태스크 등록: {task_name} -> {task.id}") return task def result(self, task_id: str) -> AsyncResult: return AsyncResult(task_id, app=celery_app) task_service = TaskService(settings.MAIN_SERVER_URL) # ===================================================== # 스키마 # ===================================================== class TaskStatusResponse(BaseModel): task_id: str status: str result: Optional[Dict] = None error: Optional[str] = None # ===================================================== # 인증 (현재는 항상 통과) # ===================================================== async def validate_user(user_id: str) -> None: # 실제 구현 예: # allowed = await check_user_permission(user_id) # if not allowed: # raise HTTPException(status_code=403, detail="권한이 없습니다.") return None # ===================================================== # FastAPI 앱 생성 # ===================================================== app = FastAPI(title="이미지 번역 메인 서버", version="1.1.0") # ----------------------------------------------------- # 번역 엔드포인트 (1회 호출로 처리 시도: sync + wait) # ----------------------------------------------------- @app.post("/translate") async def translate( user_id: str = Form(...), toggle_states: str = Form(...), # JSON string unwanted_texts: str = Form(...), # JSON string ocr_method: str = Form("paddleocr"), inpaint_method: str = Form("lama"), file: UploadFile = File(...), sync: bool = Query(True, description="True면 wait 동안 처리 완료 시 바로 결과 반환"), wait: int = Query(90, description="sync=True일 때 최대 대기 초"), _=Depends(auth_dep) # 여기서 인증 ): """ 이미지 번역 작업 요청 (스트리밍 방식 - 디스크 저장 없음) 1. 파일을 메모리로 스트리밍 읽기 2. base64 인코딩하여 Celery 태스크로 전송 3. sync=True면 wait초 동안 결과를 기다렸다가 완료 시 바로 스트리밍 반환 - 미완료면 202 + task_id 반환 4. sync=False면 즉시 task_id 반환 """ await validate_user(user_id) # ① 업로드 → base64 (RAM 중복 제거) try: image_b64 = await upload_to_b64(file, MAX_BYTES) except ValueError as e: raise HTTPException(413 if "large" in str(e) else 400, str(e)) # (2) JSON 파싱 try: toggle_states_dict: Dict = json.loads(toggle_states) unwanted_texts_dict: Dict = json.loads(unwanted_texts) except json.JSONDecodeError as e: raise HTTPException(status_code=400, detail=f"JSON 파싱 오류: {e}") # (3) 태스크 전송 task = task_service.enqueue( "worker.translate_task", user_id=user_id, toggle_states=toggle_states_dict, unwanted_texts=unwanted_texts_dict, ocr_method=ocr_method, inpaint_method = inpaint_method, image_b64 = image_b64, # ✨ filename = file.filename or "image.png", queue="translate", ) if not sync: return { "task_id": task.id, "message": "번역 작업이 시작되었습니다.", } # (4) sync=True -> wait 동안 결과 대기 result = task_service.result(task.id) deadline = time.monotonic() + max(wait, 0) while time.monotonic() < deadline: if result.ready(): break await asyncio.sleep(0.5) if not result.ready(): # 제한 시간 내 완료 안됨 return JSONResponse( status_code=202, content={ "task_id": task.id, "status": result.status, "message": "아직 처리 중입니다. /tasks/{task_id} 로 조회하세요.", }, ) if not result.successful(): raise HTTPException(status_code=400, detail=f"작업 실패: {result.result}") data = result.result # dict 가정 # 결과가 base64 이미지인 경우 바로 스트리밍 응답 if isinstance(data, dict) and "result_image" in data: image_bytes = base64.b64decode(data["result_image"]) return Response( image_bytes, media_type="image/png", headers={ "Content-Disposition": f"attachment; filename=translated_{task.id}.png", "Content-Length": str(len(image_bytes)), }, ) # 다른 형태면 JSON 반환 return {"task_id": task.id, "status": "SUCCESS", "result": data} # ----------------------------------------------------- # 배경제거 엔드포인트 (비동기 기본 / 필요시 sync 추가 가능) # ----------------------------------------------------- @app.post("/remove_background") async def remove_background_api( file: UploadFile = File(...), sync: bool = Query(False, description="True면 wait 동안 처리 완료 시 바로 결과 반환"), wait: int = Query(60, description="sync=True일 때 최대 대기 초"), ): """배경제거 작업 요청 (스트리밍 방식 - 디스크 저장 없음)""" user_id = "rembg_user" # 별도 사용자 처리 try: # ① 업로드 → base64 (RAM 중복 제거) try: image_b64 = await upload_to_b64(file, MAX_BYTES) except ValueError as e: raise HTTPException(413 if "large" in str(e) else 400, str(e)) task = task_service.enqueue( "worker.rembg_task", user_id=user_id, image_b64 = image_b64, # ✨ filename = file.filename or "image.png", queue="rembg", # ★ 추가 ) if not sync: return {"task_id": task.id, "message": "배경제거 작업 시작"} # sync=True -> wait 동안 결과 대기 result = task_service.result(task.id) deadline = time.monotonic() + max(wait, 0) while time.monotonic() < deadline: if result.ready(): break await asyncio.sleep(0.5) if not result.ready(): return JSONResponse( status_code=202, content={ "task_id": task.id, "status": result.status, "message": "아직 처리 중입니다. /tasks/{task_id} 로 조회하세요.", }, ) if not result.successful(): raise HTTPException(status_code=400, detail=f"작업 실패: {result.result}") data = result.result # 결과가 base64 이미지인 경우 바로 스트리밍 응답 if isinstance(data, dict) and "result_image" in data: image_bytes = base64.b64decode(data["result_image"]) return Response( image_bytes, media_type="image/png", headers={ "Content-Disposition": f"attachment; filename=nobg_{task.id}.png", "Content-Length": str(len(image_bytes)), }, ) return {"task_id": task.id, "status": "SUCCESS", "result": data} except HTTPException: raise except Exception as e: logger.exception("/remove_background 처리 중 오류") raise HTTPException(status_code=400, detail=f"배경제거 처리 오류: {e}") # ----------------------------------------------------- # 작업 상태/결과 조회 (옵션: wait, download) # ----------------------------------------------------- @app.get("/tasks/{task_id}", response_model=TaskStatusResponse) async def get_task_status( task_id: str, wait: int = Query(0, description="최대 대기 초 (long polling)"), download: bool = Query(False, description="완료 시 곧바로 파일 스트리밍") ): """ - wait>0 : long polling (최대 wait초 대기 후 상태 반환) - download=True : 완료 시 바로 파일(또는 base64)을 스트리밍 형태로 반환 """ result = task_service.result(task_id) deadline = time.monotonic() + max(wait, 0) while wait and time.monotonic() < deadline: if result.ready(): break await asyncio.sleep(0.5) if not result.ready(): # 아직 처리 중 raise HTTPException(status_code=202, detail={"task_id": task_id, "status": result.status}) if not result.successful(): raise HTTPException(status_code=400, detail=str(result.result)) data = result.result # dict 가정 if download: # base64 이미지가 있으면 바이너리 반환 if isinstance(data, dict) and "result_image" in data: image_bytes = base64.b64decode(data["result_image"]) return Response( image_bytes, media_type="image/png", headers={ "Content-Disposition": f"attachment; filename=result_{task_id}.png", "Content-Length": str(len(image_bytes)), }, ) # download=False -> JSON 상태 resp = TaskStatusResponse(task_id=task_id, status="SUCCESS", result=data) return resp # ----------------------------------------------------- # 파일 다운로드 (워커/클라이언트 공용) # ----------------------------------------------------- @app.get("/files/{filename}") async def download_file(filename: str): path = file_service.full_path(filename) if not path.exists(): raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다.") ext = path.suffix.lower() if ext in {".png", ".jpg", ".jpeg"}: media_type = f"image/{ext[1:]}" else: media_type = "application/octet-stream" headers = { "Content-Disposition": f"attachment; filename={filename}", "Content-Length": str(path.stat().st_size), } return StreamingResponse(file_service.stream(path), media_type=media_type, headers=headers) # ----------------------------------------------------- # 루트 & 헬스 체크 # ----------------------------------------------------- @app.get("/") async def root(): return { "message": "이미지 번역 메인 서버", "status": "running", "temp_storage": settings.TEMP_STORAGE, } @app.get("/health") async def health_check(): return { "status": "healthy", "timestamp": time.time(), "temp_storage": settings.TEMP_STORAGE, }