TransWorker/ImageTransWorker/image_translate_server.py

178 lines
7.5 KiB
Python

import random
import socket
import uvicorn
from fastapi import FastAPI, Query, Body
from pydantic import BaseModel
from typing import List, Optional
import asyncio
from concurrent.futures import ThreadPoolExecutor
from modules.image_processor2 import ImageProcessor
import os
import base64
# 포트 범위 설정
PORT_RANGE = (7000, 7000)
# 사용 가능한 포트 찾기
def find_free_port():
for _ in range(20):
port = random.randint(*PORT_RANGE)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(("127.0.0.1", port))
return port
except OSError:
continue
raise RuntimeError("사용 가능한 포트를 찾을 수 없습니다.")
# 요청 모델 정의
class ImageRequest(BaseModel):
local_image_path: Optional[str] = None
image_data: Optional[str] = None # base64 인코딩된 이미지 데이터
file_prefix: Optional[str] = ""
use_inpainting: Optional[bool] = False
toggle_states: Optional[dict] = None
unwanted_texts: Optional[dict] = None
watermark_text: Optional[str] = None
watermark_opacity: Optional[float] = None
class ImagesRequest(BaseModel):
local_image_paths: Optional[List[str]] = None
image_data_list: Optional[List[str]] = None # base64 인코딩된 이미지 데이터 리스트
file_prefix: Optional[str] = ""
use_inpainting: Optional[bool] = False
toggle_states: Optional[dict] = None
unwanted_texts: Optional[dict] = None
watermark_text: Optional[str] = None
watermark_opacity: Optional[float] = None
# FastAPI 앱 생성
def create_app(image_processor: ImageProcessor, max_workers: int = 2):
app = FastAPI()
executor = ThreadPoolExecutor(max_workers=max_workers)
@app.post("/translate_image")
async def translate_image(req: ImageRequest):
# 워터마크 관련 옵션을 toggle_states에 병합
toggle_states = req.toggle_states.copy() if req.toggle_states else {}
if req.watermark_text is not None:
toggle_states["watermark_text"] = req.watermark_text
if req.watermark_opacity is not None:
toggle_states["watermark_opacity"] = req.watermark_opacity
# 이미지 경로 또는 데이터 검증
if not req.local_image_path and not req.image_data:
return {"error": "local_image_path 또는 image_data 중 하나는 반드시 제공되어야 합니다."}
# 이미지 경로가 없고 데이터만 있는 경우, 임시 파일로 저장
image_path = req.local_image_path
if not image_path and req.image_data:
try:
image_path = await image_processor.save_base64_to_temp_file(req.image_data)
if not image_path:
return {"error": "이미지 데이터를 임시 파일로 저장하는데 실패했습니다."}
except Exception as e:
return {"error": f"이미지 데이터 처리 중 오류: {str(e)}"}
# 파일 존재 확인
if not os.path.exists(image_path):
return {"error": f"이미지 파일을 찾을 수 없습니다: {image_path}"}
# 단일 이미지 번역
result = await image_processor.process_single_image(
toggle_states, req.unwanted_texts or {}, image_path, 0, req.file_prefix
)
# 결과를 base64로 변환하여 반환
if isinstance(result, dict):
result_path = result.get("path", None)
else:
result_path = result
if result_path and os.path.exists(result_path):
try:
# 처리된 이미지를 base64로 변환
with open(result_path, "rb") as f:
image_data = f.read()
result_base64 = base64.b64encode(image_data).decode('utf-8')
return {"result": result_base64, "format": "base64"}
except Exception as e:
return {"error": f"이미지를 base64로 변환하는 중 오류: {str(e)}"}
else:
return {"error": "처리된 이미지 파일을 찾을 수 없습니다."}
@app.post("/translate_images")
async def translate_images(req: ImagesRequest):
# 워터마크 관련 옵션을 toggle_states에 병합
toggle_states = req.toggle_states.copy() if req.toggle_states else {}
if req.watermark_text is not None:
toggle_states["watermark_text"] = req.watermark_text
if req.watermark_opacity is not None:
toggle_states["watermark_opacity"] = req.watermark_opacity
# 이미지 경로 리스트 또는 데이터 리스트 검증
if not req.local_image_paths and not req.image_data_list:
return {"error": "local_image_paths 또는 image_data_list 중 하나는 반드시 제공되어야 합니다."}
image_paths = []
# 이미지 경로가 있는 경우 그대로 사용
if req.local_image_paths:
image_paths.extend(req.local_image_paths)
# 이미지 데이터가 있는 경우 임시 파일로 저장
if req.image_data_list:
for idx, image_data in enumerate(req.image_data_list):
try:
temp_path = await image_processor.save_base64_to_temp_file(image_data, suffix=f"_data_{idx}")
if temp_path:
image_paths.append(temp_path)
except Exception as e:
return {"error": f"이미지 데이터 {idx} 처리 중 오류: {str(e)}"}
# 여러 이미지 병렬 번역
loop = asyncio.get_event_loop()
tasks = []
sem = asyncio.Semaphore(max_workers)
async def sem_task(idx, path):
async with sem:
return await image_processor.process_single_image(
toggle_states, req.unwanted_texts or {}, path, idx, req.file_prefix
)
for idx, path in enumerate(image_paths):
tasks.append(sem_task(idx, path))
results = await asyncio.gather(*tasks)
# 결과들을 base64로 변환하여 반환
base64_results = []
for result in results:
if isinstance(result, dict):
result_path = result.get("path", None)
else:
result_path = result
if result_path and os.path.exists(result_path):
try:
# 처리된 이미지를 base64로 변환
with open(result_path, "rb") as f:
image_data = f.read()
result_base64 = base64.b64encode(image_data).decode('utf-8')
base64_results.append(result_base64)
except Exception as e:
base64_results.append(None) # 변환 실패시 None
else:
base64_results.append(None) # 파일이 없으면 None
return {"results": base64_results, "format": "base64"}
return app
# 서버 실행 함수
def run_server(image_processor, max_workers=2):
port = find_free_port()
app = create_app(image_processor, max_workers)
uvicorn.run(app, host="0.0.0.0", port=port, workers=1)
# FastAPI의 workers는 프로세스 수이므로, 내부 병렬은 ThreadPoolExecutor로 제어
# 실제 워커 수는 process_single_image 병렬 호출로 제한
# 서버 실행 후 포트 정보 반환 가능
return port