1593 lines
59 KiB
Python
1593 lines
59 KiB
Python
import os
|
|
import sys
|
|
import uuid
|
|
import time
|
|
import json
|
|
import signal
|
|
import atexit
|
|
import argparse
|
|
import shutil
|
|
import subprocess
|
|
import zipfile
|
|
import tempfile
|
|
import urllib.request
|
|
import ctypes
|
|
import socket
|
|
import queue as _queue
|
|
import threading
|
|
import multiprocessing as mp
|
|
from typing import Optional, Dict, Any
|
|
from contextlib import asynccontextmanager
|
|
from dotenv import load_dotenv
|
|
import psutil
|
|
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi import status as http_status
|
|
from pydantic import BaseModel, Field
|
|
|
|
from loggerModule import Logger
|
|
from modules.image_worker import worker_main
|
|
from updater.__version__ import __version__ as app_version
|
|
|
|
|
|
|
|
def _get_base_dir():
|
|
"""
|
|
실행 환경별 리소스 베이스 디렉터리 자동 탐지.
|
|
우선순위:
|
|
1) 환경변수 IMGWK_BASE_DIR
|
|
2) (패키징) <exe>/lib/modules → <exe>/modules → <exe>
|
|
3) (개발) <project>/modules → <project>/lib/modules → <project>
|
|
위 후보 중 실제 리소스(onnx_ocr_module/models, migan_onnx, rembg_models, fonts)가 존재하는 첫 경로를 선택.
|
|
"""
|
|
try:
|
|
env_base = os.environ.get('IMGWK_BASE_DIR')
|
|
if env_base and os.path.isdir(env_base):
|
|
return env_base
|
|
except Exception:
|
|
pass
|
|
|
|
is_frozen = getattr(sys, 'frozen', False)
|
|
curr_dir = os.path.abspath(os.path.dirname(__file__))
|
|
exe_dir = os.path.dirname(sys.executable) if is_frozen else curr_dir
|
|
|
|
candidates = []
|
|
if is_frozen:
|
|
candidates = [
|
|
os.path.join(exe_dir, 'lib', 'modules'),
|
|
os.path.join(exe_dir, 'modules'),
|
|
exe_dir,
|
|
]
|
|
else:
|
|
candidates = [
|
|
os.path.join(curr_dir, 'modules'),
|
|
os.path.join(curr_dir, 'lib', 'modules'),
|
|
curr_dir,
|
|
]
|
|
|
|
def has_resources(base: str) -> bool:
|
|
try:
|
|
return (
|
|
os.path.isdir(os.path.join(base, 'onnx_ocr_module', 'models')) or
|
|
os.path.isdir(os.path.join(base, 'migan_onnx')) or
|
|
os.path.isdir(os.path.join(base, 'rembg_models')) or
|
|
os.path.isdir(os.path.join(base, 'fonts'))
|
|
)
|
|
except Exception:
|
|
return False
|
|
|
|
for cand in candidates:
|
|
if os.path.isdir(cand) and has_resources(cand):
|
|
return cand
|
|
|
|
# 마지막 폴백: 첫 후보 반환
|
|
return candidates[0] if candidates else exe_dir
|
|
|
|
|
|
def _should_start_tray() -> bool:
|
|
try:
|
|
# 환경변수로 강제 비활성화 가능
|
|
flag = (os.environ.get("IMGWK_NO_TRAY", "0") or "0").strip().lower()
|
|
if flag in ("1", "true", "yes", "on"):
|
|
return False
|
|
# 서비스로 실행 중이면 트레이 금지
|
|
if any(arg == "--service" for arg in sys.argv):
|
|
return False
|
|
# 윈도우에서만 기본 활성화
|
|
if os.name != "nt":
|
|
return False
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# ------------------------------------------------------------
|
|
# 프로젝트/경로 설정
|
|
# ------------------------------------------------------------
|
|
if getattr(sys, "frozen", False):
|
|
# cx_Freeze 등으로 패키징된 실행 파일 위치
|
|
ROOT_DIR = os.path.dirname(sys.executable)
|
|
else:
|
|
ROOT_DIR = os.path.abspath(os.path.dirname(__file__))
|
|
load_dotenv(os.path.join(ROOT_DIR, ".env"))
|
|
# BASE_DIR = os.path.join(ROOT_DIR, "modules") # ImageProcessor3에서 base_dir로 활용
|
|
BASE_DIR = _get_base_dir()
|
|
|
|
# ProgramData 하위로 표준화된 작업/출력 경로 구성
|
|
_PROGRAMDATA = os.environ.get("PROGRAMDATA") or os.path.join(os.environ.get("ALLUSERSPROFILE", "C:\\ProgramData"))
|
|
APP_DATA_DIR = os.path.join(_PROGRAMDATA, "ImgWorker")
|
|
TEMP_DIR = os.path.join(APP_DATA_DIR, "work") # 작업/임시 파일
|
|
OUTPUT_DIR = os.path.join(APP_DATA_DIR, "outputs") # 최종 산출물(필요 시 사용)
|
|
os.makedirs(TEMP_DIR, exist_ok=True)
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
|
|
LOGS_DIR = os.path.join(APP_DATA_DIR, "logs")
|
|
os.makedirs(LOGS_DIR, exist_ok=True)
|
|
|
|
# 서버 디스커버리/단일 인스턴스 관리 경로
|
|
SERVER_INFO_PATH = os.path.join(APP_DATA_DIR, "server.json")
|
|
PID_FILE_PATH = os.path.join(APP_DATA_DIR, "imgworker.pid")
|
|
|
|
|
|
# ------------------------------------------------------------
|
|
# 로거 초기화
|
|
# ------------------------------------------------------------
|
|
logger = Logger(log_file=os.path.join(LOGS_DIR, "api_server.log"), logger_name="img_worker_api")
|
|
|
|
# ------------------------------------------------------------
|
|
# 단일 인스턴스/포트 유틸
|
|
# ------------------------------------------------------------
|
|
_MUTEX_HANDLE = None
|
|
_SERVER_BIND = {"host": "127.0.0.1", "port": 8009}
|
|
_SERVICE_NAME = "ImgWorker"
|
|
|
|
def _ensure_single_instance() -> bool:
|
|
"""Windows 전역 뮤텍스로 단일 인스턴스 보장. 이미 실행 중이면 False."""
|
|
try:
|
|
if os.name != "nt":
|
|
return True # 윈도우 외 OS는 건너뜀
|
|
global _MUTEX_HANDLE
|
|
# Global 네임스페이스 사용(서비스/세션 간 공유)
|
|
name = "Global\\ImgWorkerSingletonMutex"
|
|
CreateMutex = ctypes.windll.kernel32.CreateMutexW
|
|
GetLastError = ctypes.windll.kernel32.GetLastError
|
|
_MUTEX_HANDLE = CreateMutex(None, False, name)
|
|
if not _MUTEX_HANDLE:
|
|
return True # 뮤텍스 생성 실패 시 강행
|
|
ERROR_ALREADY_EXISTS = 183
|
|
already = (GetLastError() == ERROR_ALREADY_EXISTS)
|
|
return not already
|
|
except Exception:
|
|
return True
|
|
|
|
def _release_single_instance():
|
|
try:
|
|
if os.name == "nt" and _MUTEX_HANDLE:
|
|
ctypes.windll.kernel32.CloseHandle(_MUTEX_HANDLE)
|
|
except Exception:
|
|
pass
|
|
|
|
def _is_admin() -> bool:
|
|
try:
|
|
if os.name != "nt":
|
|
return True
|
|
return bool(ctypes.windll.shell32.IsUserAnAdmin())
|
|
except Exception:
|
|
return True
|
|
|
|
def _rerun_as_admin(extra_args: Optional[str] = None):
|
|
"""관리자 권한으로 현재 스크립트를 재실행(UAC)"""
|
|
if os.name != "nt":
|
|
return
|
|
try:
|
|
params = "\"" + os.path.abspath(__file__) + "\""
|
|
if extra_args:
|
|
params += " " + extra_args
|
|
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, params, None, 1)
|
|
except Exception:
|
|
pass
|
|
|
|
def _is_port_in_use(host: str, port: int) -> bool:
|
|
try:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.settimeout(0.3)
|
|
return s.connect_ex((host, port)) == 0
|
|
except Exception:
|
|
return False
|
|
|
|
def _find_free_port(start: int, end: int, host: str = "127.0.0.1") -> int:
|
|
start = int(start)
|
|
end = int(end)
|
|
for p in range(start, max(start, end) + 1):
|
|
try:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
s.bind((host, p))
|
|
# 바인드 가능하면 사용 가능. 즉시 해제 후 그 포트 사용
|
|
return p
|
|
except Exception:
|
|
continue
|
|
# 모두 실패 시 0(커널 할당) 사용 시도
|
|
return 0
|
|
|
|
def _write_server_info(host: str, port: int):
|
|
try:
|
|
info = {
|
|
"host": host,
|
|
"port": port,
|
|
"base": f"http://{host}:{port}",
|
|
"pid": os.getpid(),
|
|
"started_at": time.time(),
|
|
}
|
|
with open(SERVER_INFO_PATH, "w", encoding="utf-8") as f:
|
|
json.dump(info, f, ensure_ascii=False, indent=2)
|
|
except Exception:
|
|
pass
|
|
|
|
def _write_pid_file():
|
|
try:
|
|
with open(PID_FILE_PATH, "w", encoding="utf-8") as f:
|
|
f.write(str(os.getpid()))
|
|
except Exception:
|
|
pass
|
|
|
|
def _cleanup_runtime_files():
|
|
try:
|
|
if os.path.isfile(SERVER_INFO_PATH):
|
|
os.remove(SERVER_INFO_PATH)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _kill_proc_tree(pid: int):
|
|
"""자식 프로세스까지 포함하여 프로세스 트리를 강제 종료(KILL)."""
|
|
try:
|
|
if not pid:
|
|
return
|
|
parent = psutil.Process(pid)
|
|
children = parent.children(recursive=True)
|
|
|
|
# 자식들 먼저 KILL
|
|
for child in children:
|
|
try:
|
|
child.kill()
|
|
except psutil.NoSuchProcess:
|
|
pass
|
|
|
|
# 부모 KILL
|
|
try:
|
|
parent.kill()
|
|
except psutil.NoSuchProcess:
|
|
pass
|
|
|
|
# 확실한 정리를 위해 잠시 대기
|
|
psutil.wait_procs(children + [parent], timeout=3)
|
|
except psutil.NoSuchProcess:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _safe_rmtree_contents(target_dir: str, older_than_sec: int = 0):
|
|
"""대상 디렉터리 하위의 항목을 삭제.
|
|
|
|
older_than_sec > 0 이면, 현재시각 기준 해당 초보다 오래된 항목만 삭제한다.
|
|
디렉터리는 내부 파일을 조건에 따라 정리한 뒤 비어 있으면 삭제한다.
|
|
"""
|
|
try:
|
|
if not target_dir:
|
|
return
|
|
# 안전장치: ProgramData/ImgWorker 하위만 허용
|
|
base = os.path.abspath(APP_DATA_DIR)
|
|
td = os.path.abspath(target_dir)
|
|
try:
|
|
if os.path.commonpath([base, td]) != base:
|
|
return
|
|
except Exception:
|
|
return
|
|
os.makedirs(td, exist_ok=True)
|
|
|
|
now_ts = time.time()
|
|
threshold = max(0, int(older_than_sec or 0))
|
|
|
|
def _remove_file_if_old(fp: str):
|
|
try:
|
|
if threshold <= 0:
|
|
os.remove(fp)
|
|
return
|
|
mt = os.path.getmtime(fp)
|
|
if (now_ts - mt) >= threshold:
|
|
os.remove(fp)
|
|
except Exception:
|
|
pass
|
|
|
|
def _cleanup_dir(dp: str):
|
|
try:
|
|
# 파일은 조건부 삭제
|
|
for root, dirs, files in os.walk(dp, topdown=False):
|
|
for fn in files:
|
|
_remove_file_if_old(os.path.join(root, fn))
|
|
# 하위 폴더는 비어 있으면 삭제(항상 안전)
|
|
for dn in dirs:
|
|
full = os.path.join(root, dn)
|
|
try:
|
|
if not os.listdir(full):
|
|
os.rmdir(full)
|
|
except Exception:
|
|
pass
|
|
# 최상위 폴더도 비어 있으면 삭제
|
|
try:
|
|
if os.path.isdir(dp) and not os.listdir(dp):
|
|
os.rmdir(dp)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
for name in os.listdir(td):
|
|
p = os.path.join(td, name)
|
|
try:
|
|
if os.path.isdir(p):
|
|
_cleanup_dir(p)
|
|
else:
|
|
_remove_file_if_old(p)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _purge_temp_dirs():
|
|
"""ProgramData/ImgWorker 하위 임시 폴더 정리(incoming, work, output(s))."""
|
|
try:
|
|
incoming_dir = os.path.join(APP_DATA_DIR, "incoming")
|
|
work_dir = os.path.join(APP_DATA_DIR, "work")
|
|
output_dir = os.path.join(APP_DATA_DIR, "output")
|
|
outputs_dir = os.path.join(APP_DATA_DIR, "outputs")
|
|
try:
|
|
ttl = int(os.environ.get("IMGWK_CLEAN_OLDER_THAN_SEC", "300"))
|
|
except Exception:
|
|
ttl = 300
|
|
for d in (incoming_dir, work_dir, output_dir, outputs_dir):
|
|
_safe_rmtree_contents(d, older_than_sec=max(0, ttl))
|
|
except Exception:
|
|
pass
|
|
|
|
# ---------------------------- 서비스/NSSM 유틸 ----------------------------
|
|
def _run_cmd(cmd: list, check: bool = False) -> subprocess.CompletedProcess:
|
|
return subprocess.run(cmd, capture_output=True, text=True, check=check, shell=False)
|
|
|
|
def _service_exists(name: str) -> bool:
|
|
try:
|
|
r = _run_cmd(["sc", "query", name])
|
|
return r.returncode == 0 and ("STATE" in (r.stdout or "") or (r.stderr or "") == "")
|
|
except Exception:
|
|
return False
|
|
|
|
def _build_service_bin_path(host: str, port: int) -> str:
|
|
"""서비스용 binPath 문자열 구성. 패키징/스크립트 실행 모두 대응."""
|
|
# 서비스 실행 시에도 동일한 인자를 통해 호스트/포트를 고정한다
|
|
if getattr(sys, "frozen", False):
|
|
exe = sys.executable
|
|
return f'"{exe}" --service --host "{host}" --port {int(port)}'
|
|
else:
|
|
py = sys.executable
|
|
entry = os.path.abspath(__file__)
|
|
return f'"{py}" "{entry}" --service --host "{host}" --port {int(port)}'
|
|
|
|
def _install_service_via_sc(name: str):
|
|
# 설치 요청 시 전달받은 환경변수(스크립트에서 설정)를 영구화하기 위해 binPath 인자로 반영
|
|
env_host = os.environ.get("IMGWK_HOST", "127.0.0.1")
|
|
env_port = int(os.environ.get("IMGWK_PORT", "8009"))
|
|
bin_path = _build_service_bin_path(env_host, env_port)
|
|
# create service
|
|
_run_cmd(["sc", "create", name, "binPath=", bin_path, "start=", "delayed-auto"])
|
|
# failure actions: restart after 60s
|
|
_run_cmd(["sc", "failure", name, "reset=", "0", "actions=", "restart/60000"])
|
|
_run_cmd(["sc", "description", name, "Image Worker API"])
|
|
_run_cmd(["sc", "start", name])
|
|
|
|
def _uninstall_service(name: str):
|
|
if _service_exists(name):
|
|
try:
|
|
_run_cmd(["sc", "stop", name])
|
|
_run_cmd(["sc", "delete", name])
|
|
except Exception:
|
|
pass
|
|
|
|
def _install_service():
|
|
if os.name != "nt":
|
|
print("윈도우가 아닌 환경에서는 서비스 설치를 지원하지 않습니다.")
|
|
return 1
|
|
if not _is_admin():
|
|
# 관리자 권한으로 재실행
|
|
_rerun_as_admin("--install-service")
|
|
return 0
|
|
if _service_exists(_SERVICE_NAME):
|
|
# 기존 있으면 우선 제거 후 재설치(설정 반영 목적)
|
|
_uninstall_service(_SERVICE_NAME)
|
|
try:
|
|
_install_service_via_sc(_SERVICE_NAME)
|
|
print("ImgWorker 서비스 설치/시작 완료")
|
|
return 0
|
|
except Exception as e:
|
|
print(f"서비스 설치 실패: {e}")
|
|
return 1
|
|
|
|
def _remove_service():
|
|
if os.name != "nt":
|
|
print("윈도우가 아닌 환경에서는 서비스 제거를 지원하지 않습니다.")
|
|
return 1
|
|
if not _is_admin():
|
|
_rerun_as_admin("--uninstall-service")
|
|
return 0
|
|
try:
|
|
_uninstall_service(_SERVICE_NAME)
|
|
print("ImgWorker 서비스 제거 완료")
|
|
return 0
|
|
except Exception as e:
|
|
print(f"서비스 제거 실패: {e}")
|
|
return 1
|
|
try:
|
|
if os.path.isfile(PID_FILE_PATH):
|
|
os.remove(PID_FILE_PATH)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ------------------------------------------------------------
|
|
# 잡/상태 모델
|
|
# ------------------------------------------------------------
|
|
class ProcessImageRequest(BaseModel):
|
|
file_path: str = Field(..., description="로컬 이미지 경로")
|
|
index: int = Field(0, description="이미지 인덱스(표시/로깅용)")
|
|
file_prefix: Optional[str] = Field("", description="detail|option|thumb 등 접두 구분")
|
|
toggle_overrides: Optional[Dict[str, Any]] = Field(None, description="처리 시 일시적 토글 오버라이드")
|
|
group_id: Optional[str] = Field(None, description="클라이언트 측 그룹 식별자")
|
|
seq: Optional[int] = Field(None, description="클라이언트 측 순번")
|
|
ocr: Optional[bool] = Field(None, description="OCR 전용 모드 플래그(True면 OCR+인페인팅, False면 전체 번역)")
|
|
|
|
|
|
class RemoveBackgroundRequest(BaseModel):
|
|
file_path: str = Field(..., description="로컬 이미지 경로")
|
|
file_prefix: Optional[str] = Field("", description="detail|option|thumb 등 접두 구분")
|
|
toggle_overrides: Optional[Dict[str, Any]] = Field(None, description="처리 시 일시적 토글 오버라이드")
|
|
|
|
|
|
class JobInfo(BaseModel):
|
|
job_id: str
|
|
status: str # queued|done|error
|
|
created_at: float
|
|
updated_at: float
|
|
kind: str # process_image|remove_background|ping
|
|
request: Dict[str, Any]
|
|
result: Optional[Dict[str, Any]] = None
|
|
error: Optional[str] = None
|
|
gen: Optional[int] = None # 워커 세대(롤링 구분)
|
|
timeout_at: Optional[float] = None # 잡 타임아웃 시각
|
|
|
|
|
|
# ------------------------------------------------------------
|
|
# 워커 관리자
|
|
# ------------------------------------------------------------
|
|
class WorkerManager:
|
|
def __init__(self, base_dir: str, temp_dir: str, logger: Logger):
|
|
self.base_dir = base_dir
|
|
self.temp_dir = temp_dir
|
|
self.logger = logger
|
|
|
|
self._ctx = mp.get_context("spawn")
|
|
self._task_q: mp.Queue = self._ctx.Queue()
|
|
self._result_q: mp.Queue = self._ctx.Queue()
|
|
self._log_q: mp.Queue = self._ctx.Queue()
|
|
|
|
self._proc: Optional[mp.Process] = None
|
|
self._running = threading.Event()
|
|
self._ready = threading.Event()
|
|
|
|
# 잡 관리
|
|
self._jobs_lock = threading.Lock()
|
|
self._jobs: Dict[str, JobInfo] = {}
|
|
|
|
# 리스너
|
|
self._result_thread: Optional[threading.Thread] = None
|
|
self._log_thread: Optional[threading.Thread] = None
|
|
self._job_watch_thread: Optional[threading.Thread] = None
|
|
|
|
# 롤링/버퍼링
|
|
self._roll_lock = threading.Lock()
|
|
self._rolling = False
|
|
self._buffered_tasks = [] # 롤링 중 신규 작업 임시 보관
|
|
|
|
# 백프레셔(대기열 제한)
|
|
self._max_pending_jobs = int(os.environ.get("IMGWK_MAX_PENDING", "200"))
|
|
|
|
# 롤링 임계치
|
|
self.ROLL_MAX_RSS_MB = int(os.environ.get("IMGWK_ROLL_MAX_RSS_MB", "1800"))
|
|
self.ROLL_MAX_JOBS = int(os.environ.get("IMGWK_ROLL_MAX_JOBS", "500"))
|
|
self.ROLL_MAX_UPTIME_SEC = int(os.environ.get("IMGWK_ROLL_MAX_UPTIME_SEC", str(2*60*60)))
|
|
if os.environ.get("IMGWK_TEST_ROLL_EVERY_2", "0") == "1":
|
|
self.ROLL_MAX_JOBS = 2
|
|
self.JOB_TIMEOUT_SEC = int(os.environ.get("IMGWK_JOB_TIMEOUT_SEC", "600"))
|
|
|
|
# 워커 세대(롤링 구분)
|
|
self._generation = 0
|
|
|
|
self._jobs_processed_in_current_worker = 0
|
|
self._worker_started_at = 0.0
|
|
self._mem_thread: Optional[threading.Thread] = None
|
|
# 실행 상태/성능 지표
|
|
self._running_jobs = 0
|
|
self._recent_total_ms = [] # 최근 작업 total_ms 집계
|
|
# 최근 추론 장치(DML/GPU/CPU) 요약
|
|
self._last_device = None
|
|
|
|
# 기본 토글/설정(필요 시 요청에서 오버라이드)
|
|
self._toggle_states: Dict[str, Any] = {
|
|
"TEMP_IMAGE_DIR": self.temp_dir,
|
|
"output_image_format": "webp",
|
|
# 배경제거: 자체(rembg) 기본값 사용
|
|
"use_local_rembg": True,
|
|
"local_rembg_model_path": os.path.join(self.base_dir, "rembg_models"),
|
|
"local_model_name": "birefnet-general-lite",
|
|
# 인페인트: MIGAN 기본값(GPU → DirectML/CPU 폴백)
|
|
"optionIMGTrans_type": "GPU",
|
|
"detail_IMGTrans_type": "GPU",
|
|
"thumb_trans_type": "GPU",
|
|
"migan_use_accel": True,
|
|
"migan_onnx_path": os.path.join(self.base_dir, "migan_onnx", "migan_pipeline_v2.onnx"),
|
|
# 프로바이더 강제 설정( auto|dml|cpu )
|
|
"ocr_provider_override": os.environ.get("IMGWK_OCR_PROVIDER", "auto"),
|
|
"migan_provider_override": os.environ.get("IMGWK_MIGAN_PROVIDER", "auto"),
|
|
"rembg_provider_override": os.environ.get("IMGWK_REMBG_PROVIDER", "auto"),
|
|
}
|
|
|
|
# 전역 CPU 강제 플래그(1|true|yes|on) → 모든 가속/프로바이더를 CPU로 고정
|
|
force_cpu_flag = os.environ.get("IMGWK_FORCE_CPU", "0").strip().lower()
|
|
if force_cpu_flag in ("1", "true", "yes", "on"):
|
|
self._toggle_states["optionIMGTrans_type"] = "CPU"
|
|
self._toggle_states["detail_IMGTrans_type"] = "CPU"
|
|
self._toggle_states["thumb_trans_type"] = "CPU"
|
|
self._toggle_states["migan_use_accel"] = False
|
|
self._toggle_states["ocr_provider_override"] = "cpu"
|
|
self._toggle_states["migan_provider_override"] = "cpu"
|
|
self._toggle_states["rembg_provider_override"] = "cpu"
|
|
# 호환용(use_cuda) 키도 함께 내려 CPU 강제 사실을 명시
|
|
self._toggle_states["use_cuda"] = False
|
|
|
|
# 텍스트 치환 규칙(없으면 빈 dict)
|
|
self._unwanted_words: Dict[str, str] = {}
|
|
|
|
@property
|
|
def is_ready(self) -> bool:
|
|
return self._ready.is_set() and self._proc is not None and self._proc.is_alive()
|
|
|
|
@property
|
|
def pid(self) -> Optional[int]:
|
|
return self._proc.pid if self._proc is not None else None
|
|
|
|
def start(self):
|
|
if self._proc is not None and self._proc.is_alive():
|
|
return
|
|
|
|
self._running.set()
|
|
log_path = os.path.join(LOGS_DIR, "worker.log")
|
|
|
|
self._generation += 1
|
|
self._proc = self._ctx.Process(
|
|
target=worker_main,
|
|
args=(
|
|
self._task_q,
|
|
self._result_q,
|
|
self._log_q,
|
|
log_path,
|
|
self.base_dir,
|
|
self._toggle_states,
|
|
self._unwanted_words,
|
|
False, # authenticated_by_admin
|
|
),
|
|
name="ImageWorkerProcess",
|
|
daemon=True,
|
|
)
|
|
self._proc.start()
|
|
self.logger.info(f"워커 프로세스 기동: PID={self._proc.pid}")
|
|
|
|
# 리스너 스레드 시작
|
|
if self._result_thread is None or not self._result_thread.is_alive():
|
|
self._result_thread = threading.Thread(target=self._result_listener, name="ResultListener", daemon=True)
|
|
self._result_thread.start()
|
|
|
|
if self._log_thread is None or not self._log_thread.is_alive():
|
|
self._log_thread = threading.Thread(target=self._log_listener, name="LogListener", daemon=True)
|
|
self._log_thread.start()
|
|
|
|
# 워커 READY 대기(기본 180초, 환경변수로 조정)
|
|
try:
|
|
wait_sec = int(os.environ.get("IMGWK_WORKER_READY_TIMEOUT_SEC", "120"))
|
|
except Exception:
|
|
wait_sec = 180
|
|
if not self._ready.wait(timeout=max(1, wait_sec)):
|
|
self.logger.warning("워커 READY 타임아웃")
|
|
|
|
# 메모리/업타임 모니터 시작
|
|
self._worker_started_at = time.time()
|
|
if self._mem_thread is None or not self._mem_thread.is_alive():
|
|
self._mem_thread = threading.Thread(target=self._memory_monitor, name="MemMonitor", daemon=True)
|
|
self._mem_thread.start()
|
|
|
|
if self._job_watch_thread is None or not self._job_watch_thread.is_alive():
|
|
self._job_watch_thread = threading.Thread(target=self._job_watchdog, name="JobWatchdog", daemon=True)
|
|
self._job_watch_thread_start_at = time.time()
|
|
self._job_watch_thread.start()
|
|
|
|
def stop(self):
|
|
try:
|
|
if self._proc is None:
|
|
return
|
|
self._running.clear()
|
|
|
|
# PID 백업 (join 후 객체 상태와 무관하게 정리하기 위함)
|
|
pid = self._proc.pid
|
|
|
|
# 워커에게 종료 신호(None)
|
|
try:
|
|
self._task_q.put(None, timeout=1)
|
|
except Exception:
|
|
pass
|
|
# 프로세스 종료 대기
|
|
self._proc.join(timeout=10)
|
|
|
|
# 프로세스 트리 전체 확실한 정리
|
|
if pid:
|
|
try:
|
|
_kill_proc_tree(pid)
|
|
except Exception:
|
|
pass
|
|
|
|
self.logger.info("워커 프로세스 종료")
|
|
finally:
|
|
self._proc = None
|
|
self._ready.clear()
|
|
|
|
def _result_listener(self):
|
|
while self._running.is_set():
|
|
try:
|
|
msg = self._result_q.get(timeout=1)
|
|
except _queue.Empty:
|
|
continue
|
|
except Exception:
|
|
continue
|
|
|
|
if not isinstance(msg, dict):
|
|
continue
|
|
|
|
msg_id = msg.get("id")
|
|
|
|
# READY 시그널
|
|
if msg_id == "__READY__":
|
|
self._ready.set()
|
|
self.logger.info("워커 READY 수신")
|
|
continue
|
|
|
|
# 일반 잡 결과
|
|
with self._jobs_lock:
|
|
job = self._jobs.get(msg_id)
|
|
if job is None:
|
|
continue
|
|
# 이미 최종 상태면(에러/취소/완료) 늦은 결과는 무시
|
|
if job.status in ("error", "done", "cancelled"):
|
|
continue
|
|
job.updated_at = time.time()
|
|
|
|
if "error" in msg and msg["error"]:
|
|
job.status = "error"
|
|
job.error = msg.get("error")
|
|
else:
|
|
job.status = "done"
|
|
job.result = msg.get("data") if isinstance(msg.get("data"), dict) else {"data": msg.get("data")}
|
|
self._jobs[msg_id] = job
|
|
|
|
# 잡 처리 카운트 및 롤링 조건 검사(ping 제외)
|
|
try:
|
|
if job and job.kind not in ("ping",):
|
|
self._jobs_processed_in_current_worker += 1
|
|
if self._jobs_processed_in_current_worker >= self.ROLL_MAX_JOBS:
|
|
self._schedule_roll("job-count-threshold")
|
|
# running count 감소 및 성능 집계
|
|
if job and job.kind not in ("ping",):
|
|
try:
|
|
if self._running_jobs > 0:
|
|
self._running_jobs -= 1
|
|
except Exception:
|
|
self._running_jobs = 0
|
|
try:
|
|
timings = (job.result or {}).get("timings") if isinstance(job.result, dict) else None
|
|
total_ms = float(timings.get("total_ms", 0.0)) if isinstance(timings, dict) else 0.0
|
|
if total_ms > 0:
|
|
self._recent_total_ms.append(total_ms)
|
|
if len(self._recent_total_ms) > 100:
|
|
self._recent_total_ms = self._recent_total_ms[-100:]
|
|
# 최근 장치 정보 업데이트
|
|
try:
|
|
rr = job.result if isinstance(job.result, dict) else {}
|
|
dev = (rr.get("inpaint_device") or rr.get("device") or rr.get("provider") or "").lower()
|
|
if dev:
|
|
if ("dml" in dev) or ("directml" in dev) or ("gpu" in dev):
|
|
self._last_device = "dml"
|
|
elif "cpu" in dev:
|
|
self._last_device = "cpu"
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
def _log_listener(self):
|
|
# 워커에서 넘어온 로그를 api 로거로 전달
|
|
level_map = {
|
|
10: "DEBUG",
|
|
20: "INFO",
|
|
30: "WARNING",
|
|
40: "ERROR",
|
|
50: "CRITICAL",
|
|
}
|
|
while self._running.is_set():
|
|
try:
|
|
rec = self._log_q.get(timeout=1)
|
|
except _queue.Empty:
|
|
continue
|
|
except Exception:
|
|
continue
|
|
|
|
if not isinstance(rec, dict):
|
|
continue
|
|
level = rec.get("level", 20)
|
|
message = rec.get("message", "")
|
|
# loggerModule의 표준 인터페이스 사용
|
|
if level >= 50:
|
|
self.logger.critical(message)
|
|
elif level >= 40:
|
|
self.logger.error(message)
|
|
elif level >= 30:
|
|
self.logger.warning(message)
|
|
elif level >= 10:
|
|
self.logger.debug(message)
|
|
else:
|
|
self.logger.info(message)
|
|
|
|
# -------------------------- 잡 제출 API --------------------------
|
|
def _register_job(self, kind: str, req: Dict[str, Any]) -> str:
|
|
job_id = str(uuid.uuid4())
|
|
now = time.time()
|
|
now = time.time()
|
|
info = JobInfo(
|
|
job_id=job_id,
|
|
status="queued",
|
|
created_at=now,
|
|
updated_at=now,
|
|
kind=kind,
|
|
request=req,
|
|
gen=self._generation,
|
|
timeout_at=(now + self.JOB_TIMEOUT_SEC) if self.JOB_TIMEOUT_SEC > 0 else None,
|
|
)
|
|
with self._jobs_lock:
|
|
self._jobs[job_id] = info
|
|
return job_id
|
|
|
|
def get_job(self, job_id: str) -> Optional[JobInfo]:
|
|
with self._jobs_lock:
|
|
return self._jobs.get(job_id)
|
|
|
|
def cancel_job(self, job_id: str) -> bool:
|
|
# 단순 구현: 큐에서 제거는 구현 난이도가 있으므로, 아직 시작 안한 잡만 취소 플래그 처리
|
|
with self._jobs_lock:
|
|
job = self._jobs.get(job_id)
|
|
if not job:
|
|
return False
|
|
if job.status != "queued":
|
|
return False
|
|
job.status = "cancelled"
|
|
job.error = "cancelled"
|
|
job.updated_at = time.time()
|
|
self._jobs[job_id] = job
|
|
return True
|
|
|
|
def pending_jobs_count(self) -> int:
|
|
with self._jobs_lock:
|
|
return sum(1 for j in self._jobs.values() if j.status == "queued")
|
|
|
|
def _enqueue_or_buffer(self, task: Dict[str, Any]):
|
|
# 롤링 중이면 버퍼, 아니면 즉시 큐에 투입
|
|
job_id = task.get("id")
|
|
if self._rolling:
|
|
self._buffered_tasks.append(task)
|
|
else:
|
|
try:
|
|
self._task_q.put(task)
|
|
# 상태를 running 으로 변경
|
|
if job_id:
|
|
with self._jobs_lock:
|
|
job = self._jobs.get(job_id)
|
|
if job and job.status == "queued":
|
|
job.status = "running"
|
|
job.updated_at = time.time()
|
|
self._jobs[job_id] = job
|
|
try:
|
|
self._running_jobs += 1
|
|
except Exception:
|
|
self._running_jobs = 1
|
|
except Exception:
|
|
if job_id:
|
|
with self._jobs_lock:
|
|
job = self._jobs.get(job_id)
|
|
if job:
|
|
job.status = "error"
|
|
job.error = "enqueue-failed"
|
|
job.updated_at = time.time()
|
|
self._jobs[job_id] = job
|
|
|
|
def _is_http(self, path: str) -> bool:
|
|
try:
|
|
return isinstance(path, str) and (path.startswith("http://") or path.startswith("https://"))
|
|
except Exception:
|
|
return False
|
|
|
|
def _validate_file_readable(self, path: str):
|
|
if self._is_http(path):
|
|
return # 원격 URL은 로컬 파일 검증 생략
|
|
if not os.path.isfile(path):
|
|
raise FileNotFoundError(f"파일을 찾을 수 없습니다: {path}")
|
|
if not os.access(path, os.R_OK):
|
|
raise PermissionError(f"파일 읽기 권한이 없습니다: {path}")
|
|
|
|
def submit_process_image(self, *, file_path: str, index: int, file_prefix: str = "", toggle_overrides: Optional[Dict[str, Any]] = None, group_id: Optional[str] = None, seq: Optional[int] = None) -> str:
|
|
self._validate_file_readable(file_path)
|
|
if self.pending_jobs_count() >= self._max_pending_jobs:
|
|
raise RuntimeError("queue-full")
|
|
|
|
req = {
|
|
"file_path": file_path,
|
|
"index": index,
|
|
"file_prefix": file_prefix,
|
|
"group_id": group_id,
|
|
"seq": seq,
|
|
"toggle_overrides": toggle_overrides or {},
|
|
}
|
|
job_id = self._register_job("process_image", req)
|
|
|
|
kwargs = {
|
|
"original_image_url": file_path,
|
|
"index": index,
|
|
"delay": 0.0,
|
|
"file_prefix": file_prefix or "",
|
|
"_toggle_states": self._merge_toggle_overrides(toggle_overrides),
|
|
}
|
|
task = {"id": job_id, "cmd": "process_single_image", "kwargs": kwargs}
|
|
self._enqueue_or_buffer(task)
|
|
return job_id
|
|
|
|
def submit_remove_background(self, *, file_path: str, file_prefix: str = "", toggle_overrides: Optional[Dict[str, Any]] = None) -> str:
|
|
self._validate_file_readable(file_path)
|
|
if self.pending_jobs_count() >= self._max_pending_jobs:
|
|
raise RuntimeError("queue-full")
|
|
|
|
req = {
|
|
"file_path": file_path,
|
|
"file_prefix": file_prefix,
|
|
"toggle_overrides": toggle_overrides or {},
|
|
}
|
|
job_id = self._register_job("remove_background", req)
|
|
|
|
kwargs = {
|
|
"original_image_url": file_path,
|
|
"file_prefix": file_prefix or "",
|
|
"_toggle_states": self._merge_toggle_overrides(toggle_overrides),
|
|
}
|
|
task = {"id": job_id, "cmd": "remove_background", "kwargs": kwargs}
|
|
self._enqueue_or_buffer(task)
|
|
return job_id
|
|
|
|
def ping(self, timeout: float = 5.0) -> bool:
|
|
job_id = str(uuid.uuid4())
|
|
with self._jobs_lock:
|
|
now = time.time()
|
|
self._jobs[job_id] = JobInfo(
|
|
job_id=job_id,
|
|
status="queued",
|
|
created_at=now,
|
|
updated_at=now,
|
|
kind="ping",
|
|
request={},
|
|
)
|
|
self._task_q.put({"id": job_id, "cmd": "__PING__", "kwargs": {}})
|
|
|
|
# 간단한 동기 확인(폴링)
|
|
end = time.time() + timeout
|
|
while time.time() < end:
|
|
info = self.get_job(job_id)
|
|
if info and info.status in ("done", "error"):
|
|
return info.status == "done" and ((info.result or {}).get("data") == "__PONG__" or (info.result or {}).get("data") == "__PONG__")
|
|
time.sleep(0.05)
|
|
return False
|
|
|
|
def _merge_toggle_overrides(self, overrides: Optional[Dict[str, Any]]) -> Dict[str, Any]:
|
|
merged = dict(self._toggle_states)
|
|
if overrides:
|
|
merged.update(overrides)
|
|
# 경로 보정
|
|
merged["TEMP_IMAGE_DIR"] = self.temp_dir
|
|
os.makedirs(merged["TEMP_IMAGE_DIR"], exist_ok=True)
|
|
return merged
|
|
|
|
# -------------------------- 제어 작업(동기) --------------------------
|
|
def reinit_ocr(self, provider_override: Optional[str] = None, timeout: float = 10.0) -> bool:
|
|
if provider_override:
|
|
self._toggle_states["ocr_provider_override"] = provider_override
|
|
job_id = str(uuid.uuid4())
|
|
with self._jobs_lock:
|
|
now = time.time()
|
|
self._jobs[job_id] = JobInfo(
|
|
job_id=job_id,
|
|
status="queued",
|
|
created_at=now,
|
|
updated_at=now,
|
|
kind="reinit_ocr",
|
|
request={"provider_override": provider_override},
|
|
gen=self._generation,
|
|
timeout_at=now + 15,
|
|
)
|
|
task = {"id": job_id, "cmd": "reinit_ocr", "kwargs": {"_toggle_states": self._toggle_states}}
|
|
self._enqueue_or_buffer(task)
|
|
end = time.time() + timeout
|
|
while time.time() < end:
|
|
info = self.get_job(job_id)
|
|
if info and info.status in ("done", "error"):
|
|
return bool((info.result or {}).get("ok", False)) and info.status == "done"
|
|
time.sleep(0.05)
|
|
return False
|
|
|
|
# REMBG 재초기화/재설정 (호환 목적: 동일 시맨틱 유지)
|
|
def reinit_rembg(self, provider: Optional[str] = None, timeout: float = 10.0) -> bool:
|
|
if provider:
|
|
self._toggle_states["rembg_provider_override"] = provider
|
|
job_id = str(uuid.uuid4())
|
|
with self._jobs_lock:
|
|
now = time.time()
|
|
self._jobs[job_id] = JobInfo(
|
|
job_id=job_id,
|
|
status="queued",
|
|
created_at=now,
|
|
updated_at=now,
|
|
kind="reinit_rembg",
|
|
request={"provider": provider},
|
|
gen=self._generation,
|
|
timeout_at=now + 15,
|
|
)
|
|
task = {"id": job_id, "cmd": "reinit_rembg", "kwargs": {"_toggle_states": self._toggle_states, "provider": provider}}
|
|
self._enqueue_or_buffer(task)
|
|
end = time.time() + timeout
|
|
while time.time() < end:
|
|
info = self.get_job(job_id)
|
|
if info and info.status in ("done", "error"):
|
|
return bool((info.result or {}).get("ok", False)) and info.status == "done"
|
|
time.sleep(0.05)
|
|
return False
|
|
|
|
def reset_rembg(self, provider: Optional[str] = None, timeout: float = 10.0) -> bool:
|
|
# 현재 구현은 reinit과 동일 동작(세션 재생성). 향후 차별화 가능
|
|
return self.reinit_rembg(provider=provider, timeout=timeout)
|
|
|
|
def reset_migan(self, use_cuda: Optional[bool] = None, provider: Optional[str] = None, timeout: float = 10.0) -> bool:
|
|
if use_cuda is not None:
|
|
self._toggle_states["migan_use_accel"] = bool(use_cuda)
|
|
if provider:
|
|
self._toggle_states["migan_provider_override"] = provider
|
|
job_id = str(uuid.uuid4())
|
|
with self._jobs_lock:
|
|
now = time.time()
|
|
self._jobs[job_id] = JobInfo(
|
|
job_id=job_id,
|
|
status="queued",
|
|
created_at=now,
|
|
updated_at=now,
|
|
kind="reset_migan",
|
|
request={"use_cuda": use_cuda, "provider": provider},
|
|
gen=self._generation,
|
|
timeout_at=now + 15,
|
|
)
|
|
task = {"id": job_id, "cmd": "reset_migan", "kwargs": {"_toggle_states": self._toggle_states, "use_cuda": use_cuda, "provider": provider}}
|
|
self._enqueue_or_buffer(task)
|
|
end = time.time() + timeout
|
|
while time.time() < end:
|
|
info = self.get_job(job_id)
|
|
if info and info.status in ("done", "error"):
|
|
return bool((info.result or {}).get("ok", False)) and info.status == "done"
|
|
time.sleep(0.05)
|
|
return False
|
|
|
|
def update_toggle_states(self, updates: Dict[str, Any], timeout: float = 10.0) -> bool:
|
|
# 로컬 상태 업데이트
|
|
self._toggle_states.update(updates)
|
|
# 경로 보정 (TEMP_IMAGE_DIR 등)
|
|
self._toggle_states = self._merge_toggle_overrides(None)
|
|
|
|
job_id = str(uuid.uuid4())
|
|
with self._jobs_lock:
|
|
now = time.time()
|
|
self._jobs[job_id] = JobInfo(
|
|
job_id=job_id,
|
|
status="queued",
|
|
created_at=now,
|
|
updated_at=now,
|
|
kind="update_toggle_states",
|
|
request=updates,
|
|
gen=self._generation,
|
|
timeout_at=now + 15,
|
|
)
|
|
|
|
# 워커에게 전체 토글 상태 전송
|
|
task = {
|
|
"id": job_id,
|
|
"cmd": "update_toggle_states",
|
|
"kwargs": {"_toggle_states": self._toggle_states}
|
|
}
|
|
self._enqueue_or_buffer(task)
|
|
|
|
end = time.time() + timeout
|
|
while time.time() < end:
|
|
info = self.get_job(job_id)
|
|
if info and info.status in ("done", "error"):
|
|
return bool((info.result or {}).get("ok", False)) and info.status == "done"
|
|
time.sleep(0.05)
|
|
return False
|
|
|
|
# -------------------------- 롤링 로직 --------------------------
|
|
def _memory_monitor(self):
|
|
# 주기적으로 RSS/업타임 확인
|
|
while self._running.is_set():
|
|
try:
|
|
if self._proc is None:
|
|
time.sleep(1.0)
|
|
continue
|
|
pid = self._proc.pid
|
|
if not pid:
|
|
time.sleep(1.0)
|
|
continue
|
|
rss_mb = 0
|
|
try:
|
|
p = psutil.Process(pid)
|
|
rss_mb = int(p.memory_info().rss / (1024 * 1024))
|
|
except Exception:
|
|
pass
|
|
|
|
if rss_mb >= self.ROLL_MAX_RSS_MB:
|
|
self._schedule_roll("memory-threshold")
|
|
|
|
uptime = time.time() - self._worker_started_at
|
|
if uptime >= self.ROLL_MAX_UPTIME_SEC:
|
|
self._schedule_roll("uptime-threshold")
|
|
finally:
|
|
time.sleep(3.0)
|
|
|
|
def _schedule_roll(self, reason: str):
|
|
with self._roll_lock:
|
|
if self._rolling:
|
|
return
|
|
self._rolling = True
|
|
self.logger.warning(f"워커 롤링 스케줄: reason={reason}")
|
|
t = threading.Thread(target=self._do_roll, args=(reason,), name="WorkerRoller", daemon=True)
|
|
t.start()
|
|
|
|
def _do_roll(self, reason: str):
|
|
# 1) 기존 워커 종료 요청(None 전송)
|
|
try:
|
|
self.logger.warning(f"워커 롤링 시작: {reason}")
|
|
try:
|
|
self._task_q.put(None, timeout=1)
|
|
except Exception:
|
|
pass
|
|
# 2) 종료 대기 및 강제 정리
|
|
if self._proc is not None:
|
|
pid = self._proc.pid
|
|
self._proc.join(timeout=30)
|
|
if pid:
|
|
try:
|
|
# 롤링 시에도 확실하게 트리 정리
|
|
_kill_proc_tree(pid)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
self.logger.error(f"롤링 중 종료 단계 오류: {e}")
|
|
|
|
# 3) 상태 초기화
|
|
self._ready.clear()
|
|
self._jobs_processed_in_current_worker = 0
|
|
|
|
# 4) 새 워커 기동
|
|
try:
|
|
self.start()
|
|
except Exception as e:
|
|
self.logger.error(f"롤링 중 새 워커 기동 실패: {e}")
|
|
# 실패 시 롤링 종료 플래그는 내려서 요청이 막히지 않게 한다
|
|
self._rolling = False
|
|
return
|
|
|
|
# 5) 버퍼링된 작업 플러시
|
|
try:
|
|
if self._buffered_tasks:
|
|
self.logger.info(f"버퍼 작업 재개: {len(self._buffered_tasks)}건")
|
|
for task in self._buffered_tasks:
|
|
self._task_q.put(task)
|
|
finally:
|
|
self._buffered_tasks.clear()
|
|
self._rolling = False
|
|
|
|
# -------------------------- 잡 타임아웃 --------------------------
|
|
def _job_watchdog(self):
|
|
while self._running.is_set():
|
|
now = time.time()
|
|
to_mark = []
|
|
with self._jobs_lock:
|
|
for job in self._jobs.values():
|
|
if job.status in ("done", "error", "cancelled"):
|
|
continue
|
|
if job.timeout_at and now >= job.timeout_at:
|
|
to_mark.append(job.job_id)
|
|
for job_id in to_mark:
|
|
with self._jobs_lock:
|
|
job = self._jobs.get(job_id)
|
|
if not job or job.status in ("done", "error", "cancelled"):
|
|
continue
|
|
job.status = "error"
|
|
job.error = "timeout"
|
|
job.updated_at = time.time()
|
|
self._jobs[job_id] = job
|
|
time.sleep(0.5)
|
|
|
|
|
|
# ------------------------------------------------------------
|
|
# FastAPI 앱
|
|
# ------------------------------------------------------------
|
|
_worker: Optional[WorkerManager] = None
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# startup
|
|
global _worker
|
|
logger.info("API 서버 시작")
|
|
# 임시 폴더 정리
|
|
_purge_temp_dirs()
|
|
_worker = WorkerManager(base_dir=BASE_DIR, temp_dir=TEMP_DIR, logger=logger)
|
|
_worker.start()
|
|
# 헬스 핑
|
|
ok = _worker.ping(timeout=10.0)
|
|
if not ok:
|
|
logger.warning("워커 핑 실패")
|
|
# 서버 디스커버리 파일 기록(uvicorn run에서 설정된 바인드 정보 사용)
|
|
try:
|
|
_write_server_info(_SERVER_BIND.get("host", "127.0.0.1"), int(_SERVER_BIND.get("port", 8009)))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
yield
|
|
finally:
|
|
# shutdown
|
|
logger.info("API 서버 종료")
|
|
# 임시 폴더 정리
|
|
_purge_temp_dirs()
|
|
try:
|
|
if _worker:
|
|
_worker.stop()
|
|
except Exception as e:
|
|
logger.error(f"워커 종료 중 오류: {e}")
|
|
# 런타임 파일 정리
|
|
_cleanup_runtime_files()
|
|
|
|
|
|
app = FastAPI(title="Image Worker API", version=app_version, lifespan=lifespan)
|
|
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
ready = _worker.is_ready if _worker else False
|
|
return {"status": "ok" if ready else "starting", "ready": ready, "pid": _worker.pid if _worker else None}
|
|
|
|
|
|
@app.get("/info")
|
|
def info():
|
|
return {
|
|
"app": "Image Worker API",
|
|
"version": app.version,
|
|
"cwd": os.getcwd(),
|
|
"base_dir": BASE_DIR,
|
|
"temp_dir": TEMP_DIR,
|
|
"outputs_dir": OUTPUT_DIR,
|
|
"worker_pid": _worker.pid if _worker else None,
|
|
"ready": _worker.is_ready if _worker else False,
|
|
}
|
|
|
|
|
|
@app.get("/v1/worker/status")
|
|
def worker_status():
|
|
ready = _worker.is_ready if _worker else False
|
|
avg_sec = None
|
|
active = False
|
|
provider = None
|
|
running_jobs = 0
|
|
pending_jobs = 0
|
|
try:
|
|
if _worker is not None:
|
|
running_jobs = int(getattr(_worker, "_running_jobs", 0) or 0)
|
|
try:
|
|
pending_jobs = int(_worker.pending_jobs_count())
|
|
except Exception:
|
|
pending_jobs = 0
|
|
# 대기중이거나 실행중이면 ACTIVE
|
|
active = bool((running_jobs + pending_jobs) > 0)
|
|
arr = getattr(_worker, "_recent_total_ms", [])
|
|
if arr:
|
|
avg_sec = (sum(arr) / len(arr)) / 1000.0
|
|
# 최근 장치 표시(dml|cpu)
|
|
provider = getattr(_worker, "_last_device", None)
|
|
except Exception:
|
|
pass
|
|
return {
|
|
"ready": ready,
|
|
"pid": _worker.pid if _worker else None,
|
|
"active": active,
|
|
"avg_sec_per_image": avg_sec,
|
|
"provider": provider,
|
|
"running_jobs": running_jobs,
|
|
"pending_jobs": pending_jobs,
|
|
}
|
|
|
|
|
|
@app.post("/v1/worker/start")
|
|
def worker_start():
|
|
global _worker
|
|
if _worker is None:
|
|
_worker = WorkerManager(base_dir=BASE_DIR, temp_dir=TEMP_DIR, logger=logger)
|
|
if not _worker.is_ready:
|
|
_worker.start()
|
|
ok = _worker.ping(timeout=10.0)
|
|
if not ok:
|
|
logger.warning("워커 핑 실패")
|
|
return {"ok": True, "ready": _worker.is_ready, "pid": _worker.pid}
|
|
|
|
|
|
@app.post("/v1/worker/stop")
|
|
def worker_stop():
|
|
global _worker
|
|
if _worker is None:
|
|
return {"ok": True, "ready": False, "pid": None}
|
|
_worker.stop()
|
|
return {"ok": True, "ready": False, "pid": None}
|
|
|
|
|
|
@app.post("/v1/server/shutdown")
|
|
def server_shutdown():
|
|
import threading
|
|
def _exit_later():
|
|
try:
|
|
time.sleep(0.3)
|
|
os._exit(0)
|
|
except Exception:
|
|
os._exit(0)
|
|
threading.Thread(target=_exit_later, daemon=True).start()
|
|
return {"ok": True}
|
|
|
|
|
|
@app.post("/v1/process-image", status_code=http_status.HTTP_202_ACCEPTED)
|
|
def process_image(req: ProcessImageRequest):
|
|
# 워커가 초기화만 되어 있다면 READY 전이라도 잡을 큐/버퍼에 적재하여 202 반환
|
|
if _worker is None:
|
|
raise HTTPException(status_code=503, detail="Worker not initialized")
|
|
|
|
# 원격 URL 허용: 로컬 파일이 아니면 존재/권한 검사는 생략하고 워커에서 처리
|
|
if not (req.file_path.startswith("http://") or req.file_path.startswith("https://")):
|
|
if not os.path.isfile(req.file_path):
|
|
raise HTTPException(status_code=400, detail="file_path not found")
|
|
|
|
try:
|
|
# ocr 플래그가 별도로 전달되면 토글에 병합하여 전달
|
|
_toggles = dict(req.toggle_overrides or {})
|
|
if req.ocr is not None:
|
|
_toggles["ocr"] = bool(req.ocr)
|
|
|
|
job_id = _worker.submit_process_image(
|
|
file_path=req.file_path,
|
|
index=req.index or (req.seq or 0),
|
|
file_prefix=req.file_prefix or "",
|
|
toggle_overrides=_toggles,
|
|
group_id=req.group_id,
|
|
seq=req.seq,
|
|
)
|
|
return {"accepted": True, "job_id": job_id}
|
|
except RuntimeError as e:
|
|
if str(e) == "queue-full":
|
|
# 429 Too Many Requests 와 유사 동작
|
|
raise HTTPException(status_code=429, detail="queue full, retry later")
|
|
raise
|
|
except FileNotFoundError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except Exception as e:
|
|
logger.exception("process-image 요청 처리 중 오류")
|
|
raise HTTPException(status_code=500, detail="internal error")
|
|
|
|
|
|
@app.post("/v1/remove-background", status_code=http_status.HTTP_202_ACCEPTED)
|
|
def remove_background(req: RemoveBackgroundRequest):
|
|
# 워커가 초기화만 되어 있다면 READY 전이라도 큐에 적재
|
|
if _worker is None:
|
|
raise HTTPException(status_code=503, detail="Worker not initialized")
|
|
|
|
if not (req.file_path.startswith("http://") or req.file_path.startswith("https://")):
|
|
if not os.path.isfile(req.file_path):
|
|
raise HTTPException(status_code=400, detail="file_path not found")
|
|
|
|
try:
|
|
job_id = _worker.submit_remove_background(
|
|
file_path=req.file_path,
|
|
file_prefix=req.file_prefix or "",
|
|
toggle_overrides=req.toggle_overrides,
|
|
)
|
|
return {"accepted": True, "job_id": job_id}
|
|
except RuntimeError as e:
|
|
if str(e) == "queue-full":
|
|
raise HTTPException(status_code=429, detail="queue full, retry later")
|
|
raise
|
|
except FileNotFoundError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except Exception as e:
|
|
logger.exception("remove-background 요청 처리 중 오류")
|
|
raise HTTPException(status_code=500, detail="internal error")
|
|
|
|
|
|
@app.get("/v1/jobs/{job_id}")
|
|
def get_job(job_id: str):
|
|
if _worker is None:
|
|
raise HTTPException(status_code=503, detail="Worker not ready")
|
|
info = _worker.get_job(job_id)
|
|
if info is None:
|
|
raise HTTPException(status_code=404, detail="job not found")
|
|
return json.loads(info.json())
|
|
|
|
|
|
class UpdateToggleRequest(BaseModel):
|
|
updates: Dict[str, Any] = Field(..., description="업데이트할 토글 상태 딕셔너리")
|
|
|
|
|
|
@app.post("/v1/toggle/update")
|
|
def update_toggle(req: UpdateToggleRequest):
|
|
if _worker is None or not _worker.is_ready:
|
|
raise HTTPException(status_code=503, detail="Worker not ready")
|
|
ok = _worker.update_toggle_states(req.updates)
|
|
if not ok:
|
|
raise HTTPException(status_code=500, detail="update toggle failed")
|
|
return {"ok": True}
|
|
|
|
|
|
class ReinitOCRRequest(BaseModel):
|
|
provider: Optional[str] = Field(None, description="ocr provider override: auto|dml|cpu")
|
|
|
|
|
|
@app.post("/v1/ocr/reinit")
|
|
def reinit_ocr(req: ReinitOCRRequest):
|
|
if _worker is None or not _worker.is_ready:
|
|
raise HTTPException(status_code=503, detail="Worker not ready")
|
|
ok = _worker.reinit_ocr(provider_override=req.provider)
|
|
if not ok:
|
|
raise HTTPException(status_code=500, detail="reinit ocr failed")
|
|
return {"ok": True}
|
|
|
|
|
|
class ResetOCRRequest(BaseModel):
|
|
provider: Optional[str] = Field(None, description="ocr provider override: auto|dml|cpu")
|
|
|
|
|
|
@app.post("/v1/ocr/reset")
|
|
def reset_ocr(req: ResetOCRRequest):
|
|
if _worker is None or not _worker.is_ready:
|
|
raise HTTPException(status_code=503, detail="Worker not ready")
|
|
ok = _worker.reinit_ocr(provider_override=req.provider)
|
|
if not ok:
|
|
raise HTTPException(status_code=500, detail="reset ocr failed")
|
|
return {"ok": True}
|
|
|
|
|
|
class ResetMIGANRequest(BaseModel):
|
|
use_cuda: Optional[bool] = Field(None, description="DirectML 사용 여부(True=시도)")
|
|
provider: Optional[str] = Field(None, description="migan provider override: auto|dml|cpu")
|
|
|
|
|
|
@app.post("/v1/migan/reset")
|
|
def reset_migan(req: ResetMIGANRequest):
|
|
if _worker is None or not _worker.is_ready:
|
|
raise HTTPException(status_code=503, detail="Worker not ready")
|
|
ok = _worker.reset_migan(use_cuda=req.use_cuda, provider=req.provider)
|
|
if not ok:
|
|
raise HTTPException(status_code=500, detail="reset migan failed")
|
|
return {"ok": True}
|
|
|
|
|
|
@app.post("/v1/migan/reinit")
|
|
def reinit_migan(req: ResetMIGANRequest):
|
|
if _worker is None or not _worker.is_ready:
|
|
raise HTTPException(status_code=503, detail="Worker not ready")
|
|
ok = _worker.reset_migan(use_cuda=req.use_cuda, provider=req.provider)
|
|
if not ok:
|
|
raise HTTPException(status_code=500, detail="reinit migan failed")
|
|
return {"ok": True}
|
|
|
|
|
|
class ReinitREMBGRequest(BaseModel):
|
|
provider: Optional[str] = Field(None, description="rembg provider override: auto|dml|cpu")
|
|
|
|
|
|
@app.post("/v1/rembg/reinit")
|
|
def reinit_rembg(req: ReinitREMBGRequest):
|
|
if _worker is None or not _worker.is_ready:
|
|
raise HTTPException(status_code=503, detail="Worker not ready")
|
|
ok = _worker.reinit_rembg(provider=req.provider)
|
|
if not ok:
|
|
raise HTTPException(status_code=500, detail="reinit rembg failed")
|
|
return {"ok": True}
|
|
|
|
|
|
class ResetREMBGRequest(BaseModel):
|
|
provider: Optional[str] = Field(None, description="rembg provider override: auto|dml|cpu")
|
|
|
|
|
|
@app.post("/v1/rembg/reset")
|
|
def reset_rembg(req: ResetREMBGRequest):
|
|
if _worker is None or not _worker.is_ready:
|
|
raise HTTPException(status_code=503, detail="Worker not ready")
|
|
ok = _worker.reset_rembg(provider=req.provider)
|
|
if not ok:
|
|
raise HTTPException(status_code=500, detail="reset rembg failed")
|
|
return {"ok": True}
|
|
|
|
|
|
@app.delete("/v1/jobs/{job_id}", status_code=http_status.HTTP_202_ACCEPTED)
|
|
def cancel_job(job_id: str):
|
|
if _worker is None:
|
|
raise HTTPException(status_code=503, detail="Worker not ready")
|
|
ok = _worker.cancel_job(job_id)
|
|
if not ok:
|
|
raise HTTPException(status_code=409, detail="cannot cancel (already running or done)")
|
|
return {"cancelled": True, "job_id": job_id}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
# multiprocessing - frozen executable 지원
|
|
try:
|
|
mp.freeze_support()
|
|
except Exception:
|
|
pass
|
|
# 명령행 인자 처리
|
|
parser = argparse.ArgumentParser(description="Image Worker API")
|
|
parser.add_argument("--install-service", action="store_true", help="윈도우 서비스 설치")
|
|
parser.add_argument("--uninstall-service", action="store_true", help="윈도우 서비스 제거")
|
|
parser.add_argument("--ensure-service", action="store_true", help="서비스가 없으면 설치")
|
|
parser.add_argument("--console", action="store_true", help="디버그 콘솔 활성화(Win32GUI 빌드에서 유효)")
|
|
args, _ = parser.parse_known_args()
|
|
|
|
if args.install_service:
|
|
sys.exit(_install_service())
|
|
if args.uninstall_service:
|
|
sys.exit(_remove_service())
|
|
if args.ensure_service:
|
|
# 있으면 그대로 성공, 없으면 설치 시도
|
|
if _service_exists(_SERVICE_NAME):
|
|
print("ImgWorker 서비스가 이미 설치되어 있습니다.")
|
|
sys.exit(0)
|
|
sys.exit(_install_service())
|
|
# 단일 인스턴스 보장(이미 실행 중이면 종료)
|
|
if not _ensure_single_instance():
|
|
print("ImgWorker 이미 실행 중입니다. 기존 인스턴스를 사용하세요.")
|
|
sys.exit(0)
|
|
|
|
# 종료 시 리소스 해제
|
|
atexit.register(_release_single_instance)
|
|
atexit.register(_cleanup_runtime_files)
|
|
|
|
# 바인드 호스트/포트 결정(포트 충돌 자동 회피)
|
|
bind_host = os.environ.get("IMGWK_HOST", "127.0.0.1")
|
|
desired_port = int(os.environ.get("IMGWK_PORT", "8009"))
|
|
start_port = int(os.environ.get("IMGWK_PORT_START", str(desired_port)))
|
|
end_port = int(os.environ.get("IMGWK_PORT_END", str(start_port + 20)))
|
|
|
|
if _is_port_in_use(bind_host, desired_port):
|
|
selected_port = _find_free_port(start_port, end_port, bind_host)
|
|
else:
|
|
selected_port = desired_port
|
|
|
|
# 바인드 정보 설정 및 PID/디스커버리 파일 기록
|
|
_SERVER_BIND = {"host": bind_host, "port": selected_port}
|
|
_write_pid_file()
|
|
_write_server_info(bind_host, selected_port)
|
|
|
|
# 콘솔 플래그 처리
|
|
use_console = bool(getattr(args, "console", False))
|
|
if use_console and os.name == "nt":
|
|
try:
|
|
# 부모 콘솔에 부착 시도, 실패하면 새 콘솔 할당
|
|
try:
|
|
ctypes.windll.kernel32.AttachConsole(-1)
|
|
except Exception:
|
|
pass
|
|
ctypes.windll.kernel32.AllocConsole()
|
|
try:
|
|
sys.stdout = open("CONOUT$", "w", buffering=1, encoding="utf-8", errors="replace")
|
|
sys.stderr = open("CONOUT$", "w", buffering=1, encoding="utf-8", errors="replace")
|
|
sys.stdin = open("CONIN$", "r", encoding="utf-8", errors="replace")
|
|
except Exception:
|
|
pass
|
|
print("[ImgWorker] 디버그 콘솔 활성화")
|
|
except Exception:
|
|
pass
|
|
|
|
# GUI(Win32GUI) 기반 실행에서 콘솔이 없으면 안전한 더미 스트림으로 보호
|
|
if not use_console:
|
|
class _SilentStream:
|
|
def write(self, *args, **kwargs):
|
|
pass
|
|
def flush(self):
|
|
pass
|
|
def isatty(self):
|
|
return False
|
|
try:
|
|
if sys.stdout is None:
|
|
sys.stdout = _SilentStream()
|
|
if sys.stderr is None:
|
|
sys.stderr = _SilentStream()
|
|
except Exception:
|
|
pass
|
|
|
|
# 콘솔 스트림 상태에 맞춰 로거를 재초기화하여 StreamHandler가 올바른 스트림을 사용하도록 함
|
|
try:
|
|
logger = Logger(log_file=os.path.join(LOGS_DIR, "api_server.log"), logger_name="img_worker_api")
|
|
except Exception:
|
|
pass
|
|
|
|
# uvicorn 실행: 콘솔이 있으면 기본 로깅 활성, 없으면 비활성화
|
|
uvicorn_kwargs = dict(host=bind_host, port=selected_port, reload=False, log_level="info")
|
|
if not use_console:
|
|
uvicorn_kwargs["log_config"] = None
|
|
|
|
# 기본으로 트레이를 백그라운드에서 함께 실행(서비스나 IMGWK_NO_TRAY=1이면 비활성)
|
|
if _should_start_tray():
|
|
try:
|
|
from modules.tray_app import TrayController
|
|
def _run_tray():
|
|
try:
|
|
TrayController().run()
|
|
except Exception as e:
|
|
logger.error(f"트레이 스레드 오류: {e}")
|
|
t = threading.Thread(target=_run_tray, name="TrayThread", daemon=True)
|
|
t.start()
|
|
except Exception as e:
|
|
# 내장 트레이 import 실패 시, 외부 트레이 실행 파일로 폴백
|
|
logger.warning(f"트레이 시작 실패(모듈 누락?): {e}")
|
|
try:
|
|
if getattr(sys, "frozen", False):
|
|
exe_dir = os.path.dirname(sys.executable)
|
|
tray_path = os.path.join(exe_dir, f"{_SERVICE_NAME}Tray.exe")
|
|
if os.path.isfile(tray_path):
|
|
env = os.environ.copy()
|
|
if "IMGWK_API_BASE" not in env:
|
|
# server.json을 트레이가 읽어가므로 기본값 유지
|
|
pass
|
|
subprocess.Popen([tray_path], close_fds=True)
|
|
else:
|
|
logger.warning("외부 트레이 실행 파일을 찾을 수 없습니다")
|
|
except Exception as e2:
|
|
logger.error(f"외부 트레이 실행 실패: {e2}")
|
|
|
|
uvicorn.run(app, **uvicorn_kwargs)
|
|
|
|
|