import os import sys import uuid import time import json import signal import atexit import argparse import shutil import subprocess import zipfile import tempfile import urllib.request import ctypes import socket import queue as _queue import threading import multiprocessing as mp from typing import Optional, Dict, Any from contextlib import asynccontextmanager from dotenv import load_dotenv import psutil from fastapi import FastAPI, HTTPException from fastapi import status as http_status from pydantic import BaseModel, Field from loggerModule import Logger from modules.image_worker import worker_main from updater.__version__ import __version__ as app_version def _get_base_dir(): """ 실행 환경별 리소스 베이스 디렉터리 자동 탐지. 우선순위: 1) 환경변수 IMGWK_BASE_DIR 2) (패키징) /lib/modules → /modules → 3) (개발) /modules → /lib/modules → 위 후보 중 실제 리소스(onnx_ocr_module/models, migan_onnx, rembg_models, fonts)가 존재하는 첫 경로를 선택. """ try: env_base = os.environ.get('IMGWK_BASE_DIR') if env_base and os.path.isdir(env_base): return env_base except Exception: pass is_frozen = getattr(sys, 'frozen', False) curr_dir = os.path.abspath(os.path.dirname(__file__)) exe_dir = os.path.dirname(sys.executable) if is_frozen else curr_dir candidates = [] if is_frozen: candidates = [ os.path.join(exe_dir, 'lib', 'modules'), os.path.join(exe_dir, 'modules'), exe_dir, ] else: candidates = [ os.path.join(curr_dir, 'modules'), os.path.join(curr_dir, 'lib', 'modules'), curr_dir, ] def has_resources(base: str) -> bool: try: return ( os.path.isdir(os.path.join(base, 'onnx_ocr_module', 'models')) or os.path.isdir(os.path.join(base, 'migan_onnx')) or os.path.isdir(os.path.join(base, 'rembg_models')) or os.path.isdir(os.path.join(base, 'fonts')) ) except Exception: return False for cand in candidates: if os.path.isdir(cand) and has_resources(cand): return cand # 마지막 폴백: 첫 후보 반환 return candidates[0] if candidates else exe_dir def _should_start_tray() -> bool: try: # 환경변수로 강제 비활성화 가능 flag = (os.environ.get("IMGWK_NO_TRAY", "0") or "0").strip().lower() if flag in ("1", "true", "yes", "on"): return False # 서비스로 실행 중이면 트레이 금지 if any(arg == "--service" for arg in sys.argv): return False # 윈도우에서만 기본 활성화 if os.name != "nt": return False return True except Exception: return False # ------------------------------------------------------------ # 프로젝트/경로 설정 # ------------------------------------------------------------ if getattr(sys, "frozen", False): # cx_Freeze 등으로 패키징된 실행 파일 위치 ROOT_DIR = os.path.dirname(sys.executable) else: ROOT_DIR = os.path.abspath(os.path.dirname(__file__)) load_dotenv(os.path.join(ROOT_DIR, ".env")) # BASE_DIR = os.path.join(ROOT_DIR, "modules") # ImageProcessor3에서 base_dir로 활용 BASE_DIR = _get_base_dir() # ProgramData 하위로 표준화된 작업/출력 경로 구성 _PROGRAMDATA = os.environ.get("PROGRAMDATA") or os.path.join(os.environ.get("ALLUSERSPROFILE", "C:\\ProgramData")) APP_DATA_DIR = os.path.join(_PROGRAMDATA, "ImgWorker") TEMP_DIR = os.path.join(APP_DATA_DIR, "work") # 작업/임시 파일 OUTPUT_DIR = os.path.join(APP_DATA_DIR, "outputs") # 최종 산출물(필요 시 사용) os.makedirs(TEMP_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) LOGS_DIR = os.path.join(APP_DATA_DIR, "logs") os.makedirs(LOGS_DIR, exist_ok=True) # 서버 디스커버리/단일 인스턴스 관리 경로 SERVER_INFO_PATH = os.path.join(APP_DATA_DIR, "server.json") PID_FILE_PATH = os.path.join(APP_DATA_DIR, "imgworker.pid") # ------------------------------------------------------------ # 로거 초기화 # ------------------------------------------------------------ logger = Logger(log_file=os.path.join(LOGS_DIR, "api_server.log"), logger_name="img_worker_api") # ------------------------------------------------------------ # 단일 인스턴스/포트 유틸 # ------------------------------------------------------------ _MUTEX_HANDLE = None _SERVER_BIND = {"host": "127.0.0.1", "port": 8009} _SERVICE_NAME = "ImgWorker" def _ensure_single_instance() -> bool: """Windows 전역 뮤텍스로 단일 인스턴스 보장. 이미 실행 중이면 False.""" try: if os.name != "nt": return True # 윈도우 외 OS는 건너뜀 global _MUTEX_HANDLE # Global 네임스페이스 사용(서비스/세션 간 공유) name = "Global\\ImgWorkerSingletonMutex" CreateMutex = ctypes.windll.kernel32.CreateMutexW GetLastError = ctypes.windll.kernel32.GetLastError _MUTEX_HANDLE = CreateMutex(None, False, name) if not _MUTEX_HANDLE: return True # 뮤텍스 생성 실패 시 강행 ERROR_ALREADY_EXISTS = 183 already = (GetLastError() == ERROR_ALREADY_EXISTS) return not already except Exception: return True def _release_single_instance(): try: if os.name == "nt" and _MUTEX_HANDLE: ctypes.windll.kernel32.CloseHandle(_MUTEX_HANDLE) except Exception: pass def _is_admin() -> bool: try: if os.name != "nt": return True return bool(ctypes.windll.shell32.IsUserAnAdmin()) except Exception: return True def _rerun_as_admin(extra_args: Optional[str] = None): """관리자 권한으로 현재 스크립트를 재실행(UAC)""" if os.name != "nt": return try: params = "\"" + os.path.abspath(__file__) + "\"" if extra_args: params += " " + extra_args ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, params, None, 1) except Exception: pass def _is_port_in_use(host: str, port: int) -> bool: try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(0.3) return s.connect_ex((host, port)) == 0 except Exception: return False def _find_free_port(start: int, end: int, host: str = "127.0.0.1") -> int: start = int(start) end = int(end) for p in range(start, max(start, end) + 1): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((host, p)) # 바인드 가능하면 사용 가능. 즉시 해제 후 그 포트 사용 return p except Exception: continue # 모두 실패 시 0(커널 할당) 사용 시도 return 0 def _write_server_info(host: str, port: int): try: info = { "host": host, "port": port, "base": f"http://{host}:{port}", "pid": os.getpid(), "started_at": time.time(), } with open(SERVER_INFO_PATH, "w", encoding="utf-8") as f: json.dump(info, f, ensure_ascii=False, indent=2) except Exception: pass def _write_pid_file(): try: with open(PID_FILE_PATH, "w", encoding="utf-8") as f: f.write(str(os.getpid())) except Exception: pass def _cleanup_runtime_files(): try: if os.path.isfile(SERVER_INFO_PATH): os.remove(SERVER_INFO_PATH) except Exception: pass def _kill_proc_tree(pid: int): """자식 프로세스까지 포함하여 프로세스 트리를 강제 종료(KILL).""" try: if not pid: return parent = psutil.Process(pid) children = parent.children(recursive=True) # 자식들 먼저 KILL for child in children: try: child.kill() except psutil.NoSuchProcess: pass # 부모 KILL try: parent.kill() except psutil.NoSuchProcess: pass # 확실한 정리를 위해 잠시 대기 psutil.wait_procs(children + [parent], timeout=3) except psutil.NoSuchProcess: pass except Exception: pass def _safe_rmtree_contents(target_dir: str, older_than_sec: int = 0): """대상 디렉터리 하위의 항목을 삭제. older_than_sec > 0 이면, 현재시각 기준 해당 초보다 오래된 항목만 삭제한다. 디렉터리는 내부 파일을 조건에 따라 정리한 뒤 비어 있으면 삭제한다. """ try: if not target_dir: return # 안전장치: ProgramData/ImgWorker 하위만 허용 base = os.path.abspath(APP_DATA_DIR) td = os.path.abspath(target_dir) try: if os.path.commonpath([base, td]) != base: return except Exception: return os.makedirs(td, exist_ok=True) now_ts = time.time() threshold = max(0, int(older_than_sec or 0)) def _remove_file_if_old(fp: str): try: if threshold <= 0: os.remove(fp) return mt = os.path.getmtime(fp) if (now_ts - mt) >= threshold: os.remove(fp) except Exception: pass def _cleanup_dir(dp: str): try: # 파일은 조건부 삭제 for root, dirs, files in os.walk(dp, topdown=False): for fn in files: _remove_file_if_old(os.path.join(root, fn)) # 하위 폴더는 비어 있으면 삭제(항상 안전) for dn in dirs: full = os.path.join(root, dn) try: if not os.listdir(full): os.rmdir(full) except Exception: pass # 최상위 폴더도 비어 있으면 삭제 try: if os.path.isdir(dp) and not os.listdir(dp): os.rmdir(dp) except Exception: pass except Exception: pass for name in os.listdir(td): p = os.path.join(td, name) try: if os.path.isdir(p): _cleanup_dir(p) else: _remove_file_if_old(p) except Exception: pass except Exception: pass def _purge_temp_dirs(): """ProgramData/ImgWorker 하위 임시 폴더 정리(incoming, work, output(s)).""" try: incoming_dir = os.path.join(APP_DATA_DIR, "incoming") work_dir = os.path.join(APP_DATA_DIR, "work") output_dir = os.path.join(APP_DATA_DIR, "output") outputs_dir = os.path.join(APP_DATA_DIR, "outputs") try: ttl = int(os.environ.get("IMGWK_CLEAN_OLDER_THAN_SEC", "300")) except Exception: ttl = 300 for d in (incoming_dir, work_dir, output_dir, outputs_dir): _safe_rmtree_contents(d, older_than_sec=max(0, ttl)) except Exception: pass # ---------------------------- 서비스/NSSM 유틸 ---------------------------- def _run_cmd(cmd: list, check: bool = False) -> subprocess.CompletedProcess: return subprocess.run(cmd, capture_output=True, text=True, check=check, shell=False) def _service_exists(name: str) -> bool: try: r = _run_cmd(["sc", "query", name]) return r.returncode == 0 and ("STATE" in (r.stdout or "") or (r.stderr or "") == "") except Exception: return False def _build_service_bin_path(host: str, port: int) -> str: """서비스용 binPath 문자열 구성. 패키징/스크립트 실행 모두 대응.""" # 서비스 실행 시에도 동일한 인자를 통해 호스트/포트를 고정한다 if getattr(sys, "frozen", False): exe = sys.executable return f'"{exe}" --service --host "{host}" --port {int(port)}' else: py = sys.executable entry = os.path.abspath(__file__) return f'"{py}" "{entry}" --service --host "{host}" --port {int(port)}' def _install_service_via_sc(name: str): # 설치 요청 시 전달받은 환경변수(스크립트에서 설정)를 영구화하기 위해 binPath 인자로 반영 env_host = os.environ.get("IMGWK_HOST", "127.0.0.1") env_port = int(os.environ.get("IMGWK_PORT", "8009")) bin_path = _build_service_bin_path(env_host, env_port) # create service _run_cmd(["sc", "create", name, "binPath=", bin_path, "start=", "delayed-auto"]) # failure actions: restart after 60s _run_cmd(["sc", "failure", name, "reset=", "0", "actions=", "restart/60000"]) _run_cmd(["sc", "description", name, "Image Worker API"]) _run_cmd(["sc", "start", name]) def _uninstall_service(name: str): if _service_exists(name): try: _run_cmd(["sc", "stop", name]) _run_cmd(["sc", "delete", name]) except Exception: pass def _install_service(): if os.name != "nt": print("윈도우가 아닌 환경에서는 서비스 설치를 지원하지 않습니다.") return 1 if not _is_admin(): # 관리자 권한으로 재실행 _rerun_as_admin("--install-service") return 0 if _service_exists(_SERVICE_NAME): # 기존 있으면 우선 제거 후 재설치(설정 반영 목적) _uninstall_service(_SERVICE_NAME) try: _install_service_via_sc(_SERVICE_NAME) print("ImgWorker 서비스 설치/시작 완료") return 0 except Exception as e: print(f"서비스 설치 실패: {e}") return 1 def _remove_service(): if os.name != "nt": print("윈도우가 아닌 환경에서는 서비스 제거를 지원하지 않습니다.") return 1 if not _is_admin(): _rerun_as_admin("--uninstall-service") return 0 try: _uninstall_service(_SERVICE_NAME) print("ImgWorker 서비스 제거 완료") return 0 except Exception as e: print(f"서비스 제거 실패: {e}") return 1 try: if os.path.isfile(PID_FILE_PATH): os.remove(PID_FILE_PATH) except Exception: pass # ------------------------------------------------------------ # 잡/상태 모델 # ------------------------------------------------------------ class ProcessImageRequest(BaseModel): file_path: str = Field(..., description="로컬 이미지 경로") index: int = Field(0, description="이미지 인덱스(표시/로깅용)") file_prefix: Optional[str] = Field("", description="detail|option|thumb 등 접두 구분") toggle_overrides: Optional[Dict[str, Any]] = Field(None, description="처리 시 일시적 토글 오버라이드") group_id: Optional[str] = Field(None, description="클라이언트 측 그룹 식별자") seq: Optional[int] = Field(None, description="클라이언트 측 순번") ocr: Optional[bool] = Field(None, description="OCR 전용 모드 플래그(True면 OCR+인페인팅, False면 전체 번역)") class RemoveBackgroundRequest(BaseModel): file_path: str = Field(..., description="로컬 이미지 경로") file_prefix: Optional[str] = Field("", description="detail|option|thumb 등 접두 구분") toggle_overrides: Optional[Dict[str, Any]] = Field(None, description="처리 시 일시적 토글 오버라이드") class JobInfo(BaseModel): job_id: str status: str # queued|done|error created_at: float updated_at: float kind: str # process_image|remove_background|ping request: Dict[str, Any] result: Optional[Dict[str, Any]] = None error: Optional[str] = None gen: Optional[int] = None # 워커 세대(롤링 구분) timeout_at: Optional[float] = None # 잡 타임아웃 시각 # ------------------------------------------------------------ # 워커 관리자 # ------------------------------------------------------------ class WorkerManager: def __init__(self, base_dir: str, temp_dir: str, logger: Logger): self.base_dir = base_dir self.temp_dir = temp_dir self.logger = logger self._ctx = mp.get_context("spawn") self._task_q: mp.Queue = self._ctx.Queue() self._result_q: mp.Queue = self._ctx.Queue() self._log_q: mp.Queue = self._ctx.Queue() self._proc: Optional[mp.Process] = None self._running = threading.Event() self._ready = threading.Event() # 잡 관리 self._jobs_lock = threading.Lock() self._jobs: Dict[str, JobInfo] = {} # 리스너 self._result_thread: Optional[threading.Thread] = None self._log_thread: Optional[threading.Thread] = None self._job_watch_thread: Optional[threading.Thread] = None # 롤링/버퍼링 self._roll_lock = threading.Lock() self._rolling = False self._buffered_tasks = [] # 롤링 중 신규 작업 임시 보관 # 백프레셔(대기열 제한) self._max_pending_jobs = int(os.environ.get("IMGWK_MAX_PENDING", "200")) # 롤링 임계치 self.ROLL_MAX_RSS_MB = int(os.environ.get("IMGWK_ROLL_MAX_RSS_MB", "1800")) self.ROLL_MAX_JOBS = int(os.environ.get("IMGWK_ROLL_MAX_JOBS", "500")) self.ROLL_MAX_UPTIME_SEC = int(os.environ.get("IMGWK_ROLL_MAX_UPTIME_SEC", str(2*60*60))) if os.environ.get("IMGWK_TEST_ROLL_EVERY_2", "0") == "1": self.ROLL_MAX_JOBS = 2 self.JOB_TIMEOUT_SEC = int(os.environ.get("IMGWK_JOB_TIMEOUT_SEC", "600")) # 워커 세대(롤링 구분) self._generation = 0 self._jobs_processed_in_current_worker = 0 self._worker_started_at = 0.0 self._mem_thread: Optional[threading.Thread] = None # 실행 상태/성능 지표 self._running_jobs = 0 self._recent_total_ms = [] # 최근 작업 total_ms 집계 # 최근 추론 장치(DML/GPU/CPU) 요약 self._last_device = None # 기본 토글/설정(필요 시 요청에서 오버라이드) self._toggle_states: Dict[str, Any] = { "TEMP_IMAGE_DIR": self.temp_dir, "output_image_format": "webp", # 배경제거: 자체(rembg) 기본값 사용 "use_local_rembg": True, "local_rembg_model_path": os.path.join(self.base_dir, "rembg_models"), "local_model_name": "birefnet-general-lite", # 인페인트: MIGAN 기본값(GPU → DirectML/CPU 폴백) "optionIMGTrans_type": "GPU", "detail_IMGTrans_type": "GPU", "thumb_trans_type": "GPU", "migan_use_accel": True, "migan_onnx_path": os.path.join(self.base_dir, "migan_onnx", "migan_pipeline_v2.onnx"), # 프로바이더 강제 설정( auto|dml|cpu ) "ocr_provider_override": os.environ.get("IMGWK_OCR_PROVIDER", "auto"), "migan_provider_override": os.environ.get("IMGWK_MIGAN_PROVIDER", "auto"), "rembg_provider_override": os.environ.get("IMGWK_REMBG_PROVIDER", "auto"), } # 전역 CPU 강제 플래그(1|true|yes|on) → 모든 가속/프로바이더를 CPU로 고정 force_cpu_flag = os.environ.get("IMGWK_FORCE_CPU", "0").strip().lower() if force_cpu_flag in ("1", "true", "yes", "on"): self._toggle_states["optionIMGTrans_type"] = "CPU" self._toggle_states["detail_IMGTrans_type"] = "CPU" self._toggle_states["thumb_trans_type"] = "CPU" self._toggle_states["migan_use_accel"] = False self._toggle_states["ocr_provider_override"] = "cpu" self._toggle_states["migan_provider_override"] = "cpu" self._toggle_states["rembg_provider_override"] = "cpu" # 호환용(use_cuda) 키도 함께 내려 CPU 강제 사실을 명시 self._toggle_states["use_cuda"] = False # 텍스트 치환 규칙(없으면 빈 dict) self._unwanted_words: Dict[str, str] = {} @property def is_ready(self) -> bool: return self._ready.is_set() and self._proc is not None and self._proc.is_alive() @property def pid(self) -> Optional[int]: return self._proc.pid if self._proc is not None else None def start(self): if self._proc is not None and self._proc.is_alive(): return self._running.set() log_path = os.path.join(LOGS_DIR, "worker.log") self._generation += 1 self._proc = self._ctx.Process( target=worker_main, args=( self._task_q, self._result_q, self._log_q, log_path, self.base_dir, self._toggle_states, self._unwanted_words, False, # authenticated_by_admin ), name="ImageWorkerProcess", daemon=True, ) self._proc.start() self.logger.info(f"워커 프로세스 기동: PID={self._proc.pid}") # 리스너 스레드 시작 if self._result_thread is None or not self._result_thread.is_alive(): self._result_thread = threading.Thread(target=self._result_listener, name="ResultListener", daemon=True) self._result_thread.start() if self._log_thread is None or not self._log_thread.is_alive(): self._log_thread = threading.Thread(target=self._log_listener, name="LogListener", daemon=True) self._log_thread.start() # 워커 READY 대기(기본 180초, 환경변수로 조정) try: wait_sec = int(os.environ.get("IMGWK_WORKER_READY_TIMEOUT_SEC", "120")) except Exception: wait_sec = 180 if not self._ready.wait(timeout=max(1, wait_sec)): self.logger.warning("워커 READY 타임아웃") # 메모리/업타임 모니터 시작 self._worker_started_at = time.time() if self._mem_thread is None or not self._mem_thread.is_alive(): self._mem_thread = threading.Thread(target=self._memory_monitor, name="MemMonitor", daemon=True) self._mem_thread.start() if self._job_watch_thread is None or not self._job_watch_thread.is_alive(): self._job_watch_thread = threading.Thread(target=self._job_watchdog, name="JobWatchdog", daemon=True) self._job_watch_thread_start_at = time.time() self._job_watch_thread.start() def stop(self): try: if self._proc is None: return self._running.clear() # PID 백업 (join 후 객체 상태와 무관하게 정리하기 위함) pid = self._proc.pid # 워커에게 종료 신호(None) try: self._task_q.put(None, timeout=1) except Exception: pass # 프로세스 종료 대기 self._proc.join(timeout=10) # 프로세스 트리 전체 확실한 정리 if pid: try: _kill_proc_tree(pid) except Exception: pass self.logger.info("워커 프로세스 종료") finally: self._proc = None self._ready.clear() def _result_listener(self): while self._running.is_set(): try: msg = self._result_q.get(timeout=1) except _queue.Empty: continue except Exception: continue if not isinstance(msg, dict): continue msg_id = msg.get("id") # READY 시그널 if msg_id == "__READY__": self._ready.set() self.logger.info("워커 READY 수신") continue # 일반 잡 결과 with self._jobs_lock: job = self._jobs.get(msg_id) if job is None: continue # 이미 최종 상태면(에러/취소/완료) 늦은 결과는 무시 if job.status in ("error", "done", "cancelled"): continue job.updated_at = time.time() if "error" in msg and msg["error"]: job.status = "error" job.error = msg.get("error") else: job.status = "done" job.result = msg.get("data") if isinstance(msg.get("data"), dict) else {"data": msg.get("data")} self._jobs[msg_id] = job # 잡 처리 카운트 및 롤링 조건 검사(ping 제외) try: if job and job.kind not in ("ping",): self._jobs_processed_in_current_worker += 1 if self._jobs_processed_in_current_worker >= self.ROLL_MAX_JOBS: self._schedule_roll("job-count-threshold") # running count 감소 및 성능 집계 if job and job.kind not in ("ping",): try: if self._running_jobs > 0: self._running_jobs -= 1 except Exception: self._running_jobs = 0 try: timings = (job.result or {}).get("timings") if isinstance(job.result, dict) else None total_ms = float(timings.get("total_ms", 0.0)) if isinstance(timings, dict) else 0.0 if total_ms > 0: self._recent_total_ms.append(total_ms) if len(self._recent_total_ms) > 100: self._recent_total_ms = self._recent_total_ms[-100:] # 최근 장치 정보 업데이트 try: rr = job.result if isinstance(job.result, dict) else {} dev = (rr.get("inpaint_device") or rr.get("device") or rr.get("provider") or "").lower() if dev: if ("dml" in dev) or ("directml" in dev) or ("gpu" in dev): self._last_device = "dml" elif "cpu" in dev: self._last_device = "cpu" except Exception: pass except Exception: pass except Exception: pass def _log_listener(self): # 워커에서 넘어온 로그를 api 로거로 전달 level_map = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR", 50: "CRITICAL", } while self._running.is_set(): try: rec = self._log_q.get(timeout=1) except _queue.Empty: continue except Exception: continue if not isinstance(rec, dict): continue level = rec.get("level", 20) message = rec.get("message", "") # loggerModule의 표준 인터페이스 사용 if level >= 50: self.logger.critical(message) elif level >= 40: self.logger.error(message) elif level >= 30: self.logger.warning(message) elif level >= 10: self.logger.debug(message) else: self.logger.info(message) # -------------------------- 잡 제출 API -------------------------- def _register_job(self, kind: str, req: Dict[str, Any]) -> str: job_id = str(uuid.uuid4()) now = time.time() now = time.time() info = JobInfo( job_id=job_id, status="queued", created_at=now, updated_at=now, kind=kind, request=req, gen=self._generation, timeout_at=(now + self.JOB_TIMEOUT_SEC) if self.JOB_TIMEOUT_SEC > 0 else None, ) with self._jobs_lock: self._jobs[job_id] = info return job_id def get_job(self, job_id: str) -> Optional[JobInfo]: with self._jobs_lock: return self._jobs.get(job_id) def cancel_job(self, job_id: str) -> bool: # 단순 구현: 큐에서 제거는 구현 난이도가 있으므로, 아직 시작 안한 잡만 취소 플래그 처리 with self._jobs_lock: job = self._jobs.get(job_id) if not job: return False if job.status != "queued": return False job.status = "cancelled" job.error = "cancelled" job.updated_at = time.time() self._jobs[job_id] = job return True def pending_jobs_count(self) -> int: with self._jobs_lock: return sum(1 for j in self._jobs.values() if j.status == "queued") def _enqueue_or_buffer(self, task: Dict[str, Any]): # 롤링 중이면 버퍼, 아니면 즉시 큐에 투입 job_id = task.get("id") if self._rolling: self._buffered_tasks.append(task) else: try: self._task_q.put(task) # 상태를 running 으로 변경 if job_id: with self._jobs_lock: job = self._jobs.get(job_id) if job and job.status == "queued": job.status = "running" job.updated_at = time.time() self._jobs[job_id] = job try: self._running_jobs += 1 except Exception: self._running_jobs = 1 except Exception: if job_id: with self._jobs_lock: job = self._jobs.get(job_id) if job: job.status = "error" job.error = "enqueue-failed" job.updated_at = time.time() self._jobs[job_id] = job def _is_http(self, path: str) -> bool: try: return isinstance(path, str) and (path.startswith("http://") or path.startswith("https://")) except Exception: return False def _validate_file_readable(self, path: str): if self._is_http(path): return # 원격 URL은 로컬 파일 검증 생략 if not os.path.isfile(path): raise FileNotFoundError(f"파일을 찾을 수 없습니다: {path}") if not os.access(path, os.R_OK): raise PermissionError(f"파일 읽기 권한이 없습니다: {path}") def submit_process_image(self, *, file_path: str, index: int, file_prefix: str = "", toggle_overrides: Optional[Dict[str, Any]] = None, group_id: Optional[str] = None, seq: Optional[int] = None) -> str: self._validate_file_readable(file_path) if self.pending_jobs_count() >= self._max_pending_jobs: raise RuntimeError("queue-full") req = { "file_path": file_path, "index": index, "file_prefix": file_prefix, "group_id": group_id, "seq": seq, "toggle_overrides": toggle_overrides or {}, } job_id = self._register_job("process_image", req) kwargs = { "original_image_url": file_path, "index": index, "delay": 0.0, "file_prefix": file_prefix or "", "_toggle_states": self._merge_toggle_overrides(toggle_overrides), } task = {"id": job_id, "cmd": "process_single_image", "kwargs": kwargs} self._enqueue_or_buffer(task) return job_id def submit_remove_background(self, *, file_path: str, file_prefix: str = "", toggle_overrides: Optional[Dict[str, Any]] = None) -> str: self._validate_file_readable(file_path) if self.pending_jobs_count() >= self._max_pending_jobs: raise RuntimeError("queue-full") req = { "file_path": file_path, "file_prefix": file_prefix, "toggle_overrides": toggle_overrides or {}, } job_id = self._register_job("remove_background", req) kwargs = { "original_image_url": file_path, "file_prefix": file_prefix or "", "_toggle_states": self._merge_toggle_overrides(toggle_overrides), } task = {"id": job_id, "cmd": "remove_background", "kwargs": kwargs} self._enqueue_or_buffer(task) return job_id def ping(self, timeout: float = 5.0) -> bool: job_id = str(uuid.uuid4()) with self._jobs_lock: now = time.time() self._jobs[job_id] = JobInfo( job_id=job_id, status="queued", created_at=now, updated_at=now, kind="ping", request={}, ) self._task_q.put({"id": job_id, "cmd": "__PING__", "kwargs": {}}) # 간단한 동기 확인(폴링) end = time.time() + timeout while time.time() < end: info = self.get_job(job_id) if info and info.status in ("done", "error"): return info.status == "done" and ((info.result or {}).get("data") == "__PONG__" or (info.result or {}).get("data") == "__PONG__") time.sleep(0.05) return False def _merge_toggle_overrides(self, overrides: Optional[Dict[str, Any]]) -> Dict[str, Any]: merged = dict(self._toggle_states) if overrides: merged.update(overrides) # 경로 보정 merged["TEMP_IMAGE_DIR"] = self.temp_dir os.makedirs(merged["TEMP_IMAGE_DIR"], exist_ok=True) return merged # -------------------------- 제어 작업(동기) -------------------------- def reinit_ocr(self, provider_override: Optional[str] = None, timeout: float = 10.0) -> bool: if provider_override: self._toggle_states["ocr_provider_override"] = provider_override job_id = str(uuid.uuid4()) with self._jobs_lock: now = time.time() self._jobs[job_id] = JobInfo( job_id=job_id, status="queued", created_at=now, updated_at=now, kind="reinit_ocr", request={"provider_override": provider_override}, gen=self._generation, timeout_at=now + 15, ) task = {"id": job_id, "cmd": "reinit_ocr", "kwargs": {"_toggle_states": self._toggle_states}} self._enqueue_or_buffer(task) end = time.time() + timeout while time.time() < end: info = self.get_job(job_id) if info and info.status in ("done", "error"): return bool((info.result or {}).get("ok", False)) and info.status == "done" time.sleep(0.05) return False # REMBG 재초기화/재설정 (호환 목적: 동일 시맨틱 유지) def reinit_rembg(self, provider: Optional[str] = None, timeout: float = 10.0) -> bool: if provider: self._toggle_states["rembg_provider_override"] = provider job_id = str(uuid.uuid4()) with self._jobs_lock: now = time.time() self._jobs[job_id] = JobInfo( job_id=job_id, status="queued", created_at=now, updated_at=now, kind="reinit_rembg", request={"provider": provider}, gen=self._generation, timeout_at=now + 15, ) task = {"id": job_id, "cmd": "reinit_rembg", "kwargs": {"_toggle_states": self._toggle_states, "provider": provider}} self._enqueue_or_buffer(task) end = time.time() + timeout while time.time() < end: info = self.get_job(job_id) if info and info.status in ("done", "error"): return bool((info.result or {}).get("ok", False)) and info.status == "done" time.sleep(0.05) return False def reset_rembg(self, provider: Optional[str] = None, timeout: float = 10.0) -> bool: # 현재 구현은 reinit과 동일 동작(세션 재생성). 향후 차별화 가능 return self.reinit_rembg(provider=provider, timeout=timeout) def reset_migan(self, use_cuda: Optional[bool] = None, provider: Optional[str] = None, timeout: float = 10.0) -> bool: if use_cuda is not None: self._toggle_states["migan_use_accel"] = bool(use_cuda) if provider: self._toggle_states["migan_provider_override"] = provider job_id = str(uuid.uuid4()) with self._jobs_lock: now = time.time() self._jobs[job_id] = JobInfo( job_id=job_id, status="queued", created_at=now, updated_at=now, kind="reset_migan", request={"use_cuda": use_cuda, "provider": provider}, gen=self._generation, timeout_at=now + 15, ) task = {"id": job_id, "cmd": "reset_migan", "kwargs": {"_toggle_states": self._toggle_states, "use_cuda": use_cuda, "provider": provider}} self._enqueue_or_buffer(task) end = time.time() + timeout while time.time() < end: info = self.get_job(job_id) if info and info.status in ("done", "error"): return bool((info.result or {}).get("ok", False)) and info.status == "done" time.sleep(0.05) return False def update_toggle_states(self, updates: Dict[str, Any], timeout: float = 10.0) -> bool: # 로컬 상태 업데이트 self._toggle_states.update(updates) # 경로 보정 (TEMP_IMAGE_DIR 등) self._toggle_states = self._merge_toggle_overrides(None) job_id = str(uuid.uuid4()) with self._jobs_lock: now = time.time() self._jobs[job_id] = JobInfo( job_id=job_id, status="queued", created_at=now, updated_at=now, kind="update_toggle_states", request=updates, gen=self._generation, timeout_at=now + 15, ) # 워커에게 전체 토글 상태 전송 task = { "id": job_id, "cmd": "update_toggle_states", "kwargs": {"_toggle_states": self._toggle_states} } self._enqueue_or_buffer(task) end = time.time() + timeout while time.time() < end: info = self.get_job(job_id) if info and info.status in ("done", "error"): return bool((info.result or {}).get("ok", False)) and info.status == "done" time.sleep(0.05) return False # -------------------------- 롤링 로직 -------------------------- def _memory_monitor(self): # 주기적으로 RSS/업타임 확인 while self._running.is_set(): try: if self._proc is None: time.sleep(1.0) continue pid = self._proc.pid if not pid: time.sleep(1.0) continue rss_mb = 0 try: p = psutil.Process(pid) rss_mb = int(p.memory_info().rss / (1024 * 1024)) except Exception: pass if rss_mb >= self.ROLL_MAX_RSS_MB: self._schedule_roll("memory-threshold") uptime = time.time() - self._worker_started_at if uptime >= self.ROLL_MAX_UPTIME_SEC: self._schedule_roll("uptime-threshold") finally: time.sleep(3.0) def _schedule_roll(self, reason: str): with self._roll_lock: if self._rolling: return self._rolling = True self.logger.warning(f"워커 롤링 스케줄: reason={reason}") t = threading.Thread(target=self._do_roll, args=(reason,), name="WorkerRoller", daemon=True) t.start() def _do_roll(self, reason: str): # 1) 기존 워커 종료 요청(None 전송) try: self.logger.warning(f"워커 롤링 시작: {reason}") try: self._task_q.put(None, timeout=1) except Exception: pass # 2) 종료 대기 및 강제 정리 if self._proc is not None: pid = self._proc.pid self._proc.join(timeout=30) if pid: try: # 롤링 시에도 확실하게 트리 정리 _kill_proc_tree(pid) except Exception: pass except Exception as e: self.logger.error(f"롤링 중 종료 단계 오류: {e}") # 3) 상태 초기화 self._ready.clear() self._jobs_processed_in_current_worker = 0 # 4) 새 워커 기동 try: self.start() except Exception as e: self.logger.error(f"롤링 중 새 워커 기동 실패: {e}") # 실패 시 롤링 종료 플래그는 내려서 요청이 막히지 않게 한다 self._rolling = False return # 5) 버퍼링된 작업 플러시 try: if self._buffered_tasks: self.logger.info(f"버퍼 작업 재개: {len(self._buffered_tasks)}건") for task in self._buffered_tasks: self._task_q.put(task) finally: self._buffered_tasks.clear() self._rolling = False # -------------------------- 잡 타임아웃 -------------------------- def _job_watchdog(self): while self._running.is_set(): now = time.time() to_mark = [] with self._jobs_lock: for job in self._jobs.values(): if job.status in ("done", "error", "cancelled"): continue if job.timeout_at and now >= job.timeout_at: to_mark.append(job.job_id) for job_id in to_mark: with self._jobs_lock: job = self._jobs.get(job_id) if not job or job.status in ("done", "error", "cancelled"): continue job.status = "error" job.error = "timeout" job.updated_at = time.time() self._jobs[job_id] = job time.sleep(0.5) # ------------------------------------------------------------ # FastAPI 앱 # ------------------------------------------------------------ _worker: Optional[WorkerManager] = None @asynccontextmanager async def lifespan(app: FastAPI): # startup global _worker logger.info("API 서버 시작") # 임시 폴더 정리 _purge_temp_dirs() _worker = WorkerManager(base_dir=BASE_DIR, temp_dir=TEMP_DIR, logger=logger) _worker.start() # 헬스 핑 ok = _worker.ping(timeout=10.0) if not ok: logger.warning("워커 핑 실패") # 서버 디스커버리 파일 기록(uvicorn run에서 설정된 바인드 정보 사용) try: _write_server_info(_SERVER_BIND.get("host", "127.0.0.1"), int(_SERVER_BIND.get("port", 8009))) except Exception: pass try: yield finally: # shutdown logger.info("API 서버 종료") # 임시 폴더 정리 _purge_temp_dirs() try: if _worker: _worker.stop() except Exception as e: logger.error(f"워커 종료 중 오류: {e}") # 런타임 파일 정리 _cleanup_runtime_files() app = FastAPI(title="Image Worker API", version=app_version, lifespan=lifespan) @app.get("/health") def health(): ready = _worker.is_ready if _worker else False return {"status": "ok" if ready else "starting", "ready": ready, "pid": _worker.pid if _worker else None} @app.get("/info") def info(): return { "app": "Image Worker API", "version": app.version, "cwd": os.getcwd(), "base_dir": BASE_DIR, "temp_dir": TEMP_DIR, "outputs_dir": OUTPUT_DIR, "worker_pid": _worker.pid if _worker else None, "ready": _worker.is_ready if _worker else False, } @app.get("/v1/worker/status") def worker_status(): ready = _worker.is_ready if _worker else False avg_sec = None active = False provider = None running_jobs = 0 pending_jobs = 0 try: if _worker is not None: running_jobs = int(getattr(_worker, "_running_jobs", 0) or 0) try: pending_jobs = int(_worker.pending_jobs_count()) except Exception: pending_jobs = 0 # 대기중이거나 실행중이면 ACTIVE active = bool((running_jobs + pending_jobs) > 0) arr = getattr(_worker, "_recent_total_ms", []) if arr: avg_sec = (sum(arr) / len(arr)) / 1000.0 # 최근 장치 표시(dml|cpu) provider = getattr(_worker, "_last_device", None) except Exception: pass return { "ready": ready, "pid": _worker.pid if _worker else None, "active": active, "avg_sec_per_image": avg_sec, "provider": provider, "running_jobs": running_jobs, "pending_jobs": pending_jobs, } @app.post("/v1/worker/start") def worker_start(): global _worker if _worker is None: _worker = WorkerManager(base_dir=BASE_DIR, temp_dir=TEMP_DIR, logger=logger) if not _worker.is_ready: _worker.start() ok = _worker.ping(timeout=10.0) if not ok: logger.warning("워커 핑 실패") return {"ok": True, "ready": _worker.is_ready, "pid": _worker.pid} @app.post("/v1/worker/stop") def worker_stop(): global _worker if _worker is None: return {"ok": True, "ready": False, "pid": None} _worker.stop() return {"ok": True, "ready": False, "pid": None} @app.post("/v1/server/shutdown") def server_shutdown(): import threading def _exit_later(): try: time.sleep(0.3) os._exit(0) except Exception: os._exit(0) threading.Thread(target=_exit_later, daemon=True).start() return {"ok": True} @app.post("/v1/process-image", status_code=http_status.HTTP_202_ACCEPTED) def process_image(req: ProcessImageRequest): # 워커가 초기화만 되어 있다면 READY 전이라도 잡을 큐/버퍼에 적재하여 202 반환 if _worker is None: raise HTTPException(status_code=503, detail="Worker not initialized") # 원격 URL 허용: 로컬 파일이 아니면 존재/권한 검사는 생략하고 워커에서 처리 if not (req.file_path.startswith("http://") or req.file_path.startswith("https://")): if not os.path.isfile(req.file_path): raise HTTPException(status_code=400, detail="file_path not found") try: # ocr 플래그가 별도로 전달되면 토글에 병합하여 전달 _toggles = dict(req.toggle_overrides or {}) if req.ocr is not None: _toggles["ocr"] = bool(req.ocr) job_id = _worker.submit_process_image( file_path=req.file_path, index=req.index or (req.seq or 0), file_prefix=req.file_prefix or "", toggle_overrides=_toggles, group_id=req.group_id, seq=req.seq, ) return {"accepted": True, "job_id": job_id} except RuntimeError as e: if str(e) == "queue-full": # 429 Too Many Requests 와 유사 동작 raise HTTPException(status_code=429, detail="queue full, retry later") raise except FileNotFoundError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.exception("process-image 요청 처리 중 오류") raise HTTPException(status_code=500, detail="internal error") @app.post("/v1/remove-background", status_code=http_status.HTTP_202_ACCEPTED) def remove_background(req: RemoveBackgroundRequest): # 워커가 초기화만 되어 있다면 READY 전이라도 큐에 적재 if _worker is None: raise HTTPException(status_code=503, detail="Worker not initialized") if not (req.file_path.startswith("http://") or req.file_path.startswith("https://")): if not os.path.isfile(req.file_path): raise HTTPException(status_code=400, detail="file_path not found") try: job_id = _worker.submit_remove_background( file_path=req.file_path, file_prefix=req.file_prefix or "", toggle_overrides=req.toggle_overrides, ) return {"accepted": True, "job_id": job_id} except RuntimeError as e: if str(e) == "queue-full": raise HTTPException(status_code=429, detail="queue full, retry later") raise except FileNotFoundError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.exception("remove-background 요청 처리 중 오류") raise HTTPException(status_code=500, detail="internal error") @app.get("/v1/jobs/{job_id}") def get_job(job_id: str): if _worker is None: raise HTTPException(status_code=503, detail="Worker not ready") info = _worker.get_job(job_id) if info is None: raise HTTPException(status_code=404, detail="job not found") return json.loads(info.json()) class UpdateToggleRequest(BaseModel): updates: Dict[str, Any] = Field(..., description="업데이트할 토글 상태 딕셔너리") @app.post("/v1/toggle/update") def update_toggle(req: UpdateToggleRequest): if _worker is None or not _worker.is_ready: raise HTTPException(status_code=503, detail="Worker not ready") ok = _worker.update_toggle_states(req.updates) if not ok: raise HTTPException(status_code=500, detail="update toggle failed") return {"ok": True} class ReinitOCRRequest(BaseModel): provider: Optional[str] = Field(None, description="ocr provider override: auto|dml|cpu") @app.post("/v1/ocr/reinit") def reinit_ocr(req: ReinitOCRRequest): if _worker is None or not _worker.is_ready: raise HTTPException(status_code=503, detail="Worker not ready") ok = _worker.reinit_ocr(provider_override=req.provider) if not ok: raise HTTPException(status_code=500, detail="reinit ocr failed") return {"ok": True} class ResetOCRRequest(BaseModel): provider: Optional[str] = Field(None, description="ocr provider override: auto|dml|cpu") @app.post("/v1/ocr/reset") def reset_ocr(req: ResetOCRRequest): if _worker is None or not _worker.is_ready: raise HTTPException(status_code=503, detail="Worker not ready") ok = _worker.reinit_ocr(provider_override=req.provider) if not ok: raise HTTPException(status_code=500, detail="reset ocr failed") return {"ok": True} class ResetMIGANRequest(BaseModel): use_cuda: Optional[bool] = Field(None, description="DirectML 사용 여부(True=시도)") provider: Optional[str] = Field(None, description="migan provider override: auto|dml|cpu") @app.post("/v1/migan/reset") def reset_migan(req: ResetMIGANRequest): if _worker is None or not _worker.is_ready: raise HTTPException(status_code=503, detail="Worker not ready") ok = _worker.reset_migan(use_cuda=req.use_cuda, provider=req.provider) if not ok: raise HTTPException(status_code=500, detail="reset migan failed") return {"ok": True} @app.post("/v1/migan/reinit") def reinit_migan(req: ResetMIGANRequest): if _worker is None or not _worker.is_ready: raise HTTPException(status_code=503, detail="Worker not ready") ok = _worker.reset_migan(use_cuda=req.use_cuda, provider=req.provider) if not ok: raise HTTPException(status_code=500, detail="reinit migan failed") return {"ok": True} class ReinitREMBGRequest(BaseModel): provider: Optional[str] = Field(None, description="rembg provider override: auto|dml|cpu") @app.post("/v1/rembg/reinit") def reinit_rembg(req: ReinitREMBGRequest): if _worker is None or not _worker.is_ready: raise HTTPException(status_code=503, detail="Worker not ready") ok = _worker.reinit_rembg(provider=req.provider) if not ok: raise HTTPException(status_code=500, detail="reinit rembg failed") return {"ok": True} class ResetREMBGRequest(BaseModel): provider: Optional[str] = Field(None, description="rembg provider override: auto|dml|cpu") @app.post("/v1/rembg/reset") def reset_rembg(req: ResetREMBGRequest): if _worker is None or not _worker.is_ready: raise HTTPException(status_code=503, detail="Worker not ready") ok = _worker.reset_rembg(provider=req.provider) if not ok: raise HTTPException(status_code=500, detail="reset rembg failed") return {"ok": True} @app.delete("/v1/jobs/{job_id}", status_code=http_status.HTTP_202_ACCEPTED) def cancel_job(job_id: str): if _worker is None: raise HTTPException(status_code=503, detail="Worker not ready") ok = _worker.cancel_job(job_id) if not ok: raise HTTPException(status_code=409, detail="cannot cancel (already running or done)") return {"cancelled": True, "job_id": job_id} if __name__ == "__main__": import uvicorn # multiprocessing - frozen executable 지원 try: mp.freeze_support() except Exception: pass # 명령행 인자 처리 parser = argparse.ArgumentParser(description="Image Worker API") parser.add_argument("--install-service", action="store_true", help="윈도우 서비스 설치") parser.add_argument("--uninstall-service", action="store_true", help="윈도우 서비스 제거") parser.add_argument("--ensure-service", action="store_true", help="서비스가 없으면 설치") parser.add_argument("--console", action="store_true", help="디버그 콘솔 활성화(Win32GUI 빌드에서 유효)") args, _ = parser.parse_known_args() if args.install_service: sys.exit(_install_service()) if args.uninstall_service: sys.exit(_remove_service()) if args.ensure_service: # 있으면 그대로 성공, 없으면 설치 시도 if _service_exists(_SERVICE_NAME): print("ImgWorker 서비스가 이미 설치되어 있습니다.") sys.exit(0) sys.exit(_install_service()) # 단일 인스턴스 보장(이미 실행 중이면 종료) if not _ensure_single_instance(): print("ImgWorker 이미 실행 중입니다. 기존 인스턴스를 사용하세요.") sys.exit(0) # 종료 시 리소스 해제 atexit.register(_release_single_instance) atexit.register(_cleanup_runtime_files) # 바인드 호스트/포트 결정(포트 충돌 자동 회피) bind_host = os.environ.get("IMGWK_HOST", "127.0.0.1") desired_port = int(os.environ.get("IMGWK_PORT", "8009")) start_port = int(os.environ.get("IMGWK_PORT_START", str(desired_port))) end_port = int(os.environ.get("IMGWK_PORT_END", str(start_port + 20))) if _is_port_in_use(bind_host, desired_port): selected_port = _find_free_port(start_port, end_port, bind_host) else: selected_port = desired_port # 바인드 정보 설정 및 PID/디스커버리 파일 기록 _SERVER_BIND = {"host": bind_host, "port": selected_port} _write_pid_file() _write_server_info(bind_host, selected_port) # 콘솔 플래그 처리 use_console = bool(getattr(args, "console", False)) if use_console and os.name == "nt": try: # 부모 콘솔에 부착 시도, 실패하면 새 콘솔 할당 try: ctypes.windll.kernel32.AttachConsole(-1) except Exception: pass ctypes.windll.kernel32.AllocConsole() try: sys.stdout = open("CONOUT$", "w", buffering=1, encoding="utf-8", errors="replace") sys.stderr = open("CONOUT$", "w", buffering=1, encoding="utf-8", errors="replace") sys.stdin = open("CONIN$", "r", encoding="utf-8", errors="replace") except Exception: pass print("[ImgWorker] 디버그 콘솔 활성화") except Exception: pass # GUI(Win32GUI) 기반 실행에서 콘솔이 없으면 안전한 더미 스트림으로 보호 if not use_console: class _SilentStream: def write(self, *args, **kwargs): pass def flush(self): pass def isatty(self): return False try: if sys.stdout is None: sys.stdout = _SilentStream() if sys.stderr is None: sys.stderr = _SilentStream() except Exception: pass # 콘솔 스트림 상태에 맞춰 로거를 재초기화하여 StreamHandler가 올바른 스트림을 사용하도록 함 try: logger = Logger(log_file=os.path.join(LOGS_DIR, "api_server.log"), logger_name="img_worker_api") except Exception: pass # uvicorn 실행: 콘솔이 있으면 기본 로깅 활성, 없으면 비활성화 uvicorn_kwargs = dict(host=bind_host, port=selected_port, reload=False, log_level="info") if not use_console: uvicorn_kwargs["log_config"] = None # 기본으로 트레이를 백그라운드에서 함께 실행(서비스나 IMGWK_NO_TRAY=1이면 비활성) if _should_start_tray(): try: from modules.tray_app import TrayController def _run_tray(): try: TrayController().run() except Exception as e: logger.error(f"트레이 스레드 오류: {e}") t = threading.Thread(target=_run_tray, name="TrayThread", daemon=True) t.start() except Exception as e: # 내장 트레이 import 실패 시, 외부 트레이 실행 파일로 폴백 logger.warning(f"트레이 시작 실패(모듈 누락?): {e}") try: if getattr(sys, "frozen", False): exe_dir = os.path.dirname(sys.executable) tray_path = os.path.join(exe_dir, f"{_SERVICE_NAME}Tray.exe") if os.path.isfile(tray_path): env = os.environ.copy() if "IMGWK_API_BASE" not in env: # server.json을 트레이가 읽어가므로 기본값 유지 pass subprocess.Popen([tray_path], close_fds=True) else: logger.warning("외부 트레이 실행 파일을 찾을 수 없습니다") except Exception as e2: logger.error(f"외부 트레이 실행 실패: {e2}") uvicorn.run(app, **uvicorn_kwargs)