522 lines
22 KiB
Python
522 lines
22 KiB
Python
|
||
|
||
# src/modules/image_worker.py
|
||
"""
|
||
ImageWorker 프로세스 – 별도 프로세스로 구동
|
||
단 한 번의 READY("OK") 신호만 보내며, OCR 모델 Warm‑up 후 작업 루프에 진입한다.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import multiprocessing
|
||
import os
|
||
|
||
import logging
|
||
import asyncio
|
||
import traceback
|
||
import queue
|
||
import time
|
||
import cv2
|
||
import numpy as np
|
||
|
||
from modules.image_processor3 import ImageProcessor3
|
||
from modules.log_bridge import ImageWorkerLogger
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# 로깅 유틸리티 #
|
||
# ------------------------------------------------------------------ #
|
||
|
||
class QueueLogger:
|
||
"""큐를 통해 메인 프로세스의 로거로 전송하는 로거"""
|
||
def __init__(self, log_queue, process_name):
|
||
self.log_queue = log_queue
|
||
self.process_name = process_name
|
||
|
||
def log(self, message, level=logging.INFO, exc_info=False):
|
||
try:
|
||
log_record = {
|
||
'process_name': self.process_name,
|
||
'level': level,
|
||
'message': message,
|
||
'exc_info': exc_info
|
||
}
|
||
self.log_queue.put(log_record)
|
||
except Exception:
|
||
pass # 로그 전송 실패 시 무시
|
||
|
||
def debug(self, msg, *a, **kw): self.log(msg, logging.DEBUG)
|
||
def info(self, msg, *a, **kw): self.log(msg, logging.INFO)
|
||
def warning(self, msg, *a, **kw): self.log(msg, logging.WARNING)
|
||
def error(self, msg, *a, **kw): self.log(msg, logging.ERROR)
|
||
def critical(self, msg, *a, **kw): self.log(msg, logging.CRITICAL)
|
||
|
||
class CompatLogger:
|
||
"""커스텀 Logger 인터페이스를 표준 logging.Logger 로 매핑"""
|
||
def __init__(self, py_logger: logging.Logger):
|
||
self._l = py_logger
|
||
|
||
def log(self, message, level=logging.INFO, exc_info=False):
|
||
self._l.log(level, message, exc_info=exc_info)
|
||
|
||
# 편의 메서드
|
||
def debug(self, msg, *a, **kw): self.log(msg, logging.DEBUG)
|
||
def info(self, msg, *a, **kw): self.log(msg, logging.INFO)
|
||
def warning(self, msg, *a, **kw): self.log(msg, logging.WARNING)
|
||
def error(self, msg, *a, **kw): self.log(msg, logging.ERROR)
|
||
def critical(self, msg, *a, **kw): self.log(msg, logging.CRITICAL)
|
||
|
||
|
||
def _setup_logging(log_path: str) -> logging.Logger:
|
||
"""별도 프로세스용 파일 로거 설정"""
|
||
root = logging.getLogger()
|
||
root.handlers.clear()
|
||
root.setLevel(logging.DEBUG)
|
||
|
||
fh = logging.FileHandler(log_path, encoding="utf-8")
|
||
fmt = logging.Formatter(
|
||
"[%(asctime)s] [%(processName)s] [%(levelname)s] "
|
||
"[%(module)s:%(funcName)s:%(lineno)d] %(message)s"
|
||
)
|
||
fh.setFormatter(fmt)
|
||
root.addHandler(fh)
|
||
return root
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# 워커 메인 함수 #
|
||
# ------------------------------------------------------------------ #
|
||
|
||
def worker_main(
|
||
task_q,
|
||
result_q,
|
||
log_queue, # 추가: 로그 큐
|
||
log_path: str,
|
||
base_dir: str,
|
||
toggle_states: dict,
|
||
unwanted_words: list[str],
|
||
authenticated_by_admin: bool = False,
|
||
):
|
||
# ── 로깅 초기화 ────────────────────────────────────────────
|
||
# 큐 로거 사용 (메인 프로세스로 로그 전송)
|
||
if log_queue:
|
||
logger = ImageWorkerLogger(log_queue, f"ImageWorker-{os.getpid()}")
|
||
print(f"[DEBUG] ImageWorker {os.getpid()}: LogQueue 사용됨")
|
||
else:
|
||
# 폴백: 기존 파일 로거
|
||
py_logger = _setup_logging(log_path)
|
||
logger = CompatLogger(py_logger)
|
||
print(f"[DEBUG] ImageWorker {os.getpid()}: 파일 로거 사용됨")
|
||
|
||
logger.info(
|
||
f"ImageWorker 프로세스 기동 "
|
||
f"(PID={os.getpid()}, Name={multiprocessing.current_process().name})"
|
||
)
|
||
|
||
# 워커 기동 시에도 ProgramData/ImgWorker 하위 임시 폴더를 한 번 더 정리
|
||
try:
|
||
base_program = os.environ.get("PROGRAMDATA", r"C:\\ProgramData")
|
||
app_data_dir = os.path.join(base_program, "ImgWorker")
|
||
def _safe_rmtree_contents(target_dir: str, older_than_sec: int = 300):
|
||
try:
|
||
if not target_dir:
|
||
return
|
||
base = os.path.abspath(app_data_dir)
|
||
td = os.path.abspath(target_dir)
|
||
if os.path.commonpath([base, td]) != base:
|
||
return
|
||
os.makedirs(td, exist_ok=True)
|
||
now_ts = time.time()
|
||
thr = max(0, int(older_than_sec or 0))
|
||
|
||
def _remove_if_old(fp: str):
|
||
try:
|
||
if thr <= 0:
|
||
os.remove(fp)
|
||
return
|
||
mt = os.path.getmtime(fp)
|
||
if (now_ts - mt) >= thr:
|
||
os.remove(fp)
|
||
except Exception:
|
||
pass
|
||
|
||
for root, dirs, files in os.walk(td, topdown=False):
|
||
for fn in files:
|
||
_remove_if_old(os.path.join(root, fn))
|
||
for dn in dirs:
|
||
full = os.path.join(root, dn)
|
||
try:
|
||
if not os.listdir(full):
|
||
os.rmdir(full)
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
for d in (os.path.join(app_data_dir, "incoming"), os.path.join(app_data_dir, "work"), os.path.join(app_data_dir, "output"), os.path.join(app_data_dir, "outputs")):
|
||
_safe_rmtree_contents(d, older_than_sec=int(os.environ.get("IMGWK_CLEAN_OLDER_THAN_SEC", "300")))
|
||
except Exception:
|
||
pass
|
||
|
||
# READY는 모델 로딩 완료 후에만 전송
|
||
|
||
# ── ImageProcessor 초기화 및 Warm‑up ──────────────────────
|
||
processor = None
|
||
try:
|
||
# 안전한 초기화를 위한 메모리 정리
|
||
import gc
|
||
gc.collect()
|
||
|
||
logger.info("🔧 ImageProcessor3 초기화 시작...")
|
||
processor = ImageProcessor3(
|
||
logger=logger,
|
||
page=None,
|
||
toggle_states=toggle_states,
|
||
unwanted_words=unwanted_words,
|
||
authenticated_by_admin=authenticated_by_admin,
|
||
base_dir=base_dir,
|
||
papago_translator=None,
|
||
)
|
||
|
||
# OCR 모델 안전한 Warm-up
|
||
if processor and processor.ocr_module:
|
||
try:
|
||
logger.info("🔰 OCR 모듈 Warm-up 시작...")
|
||
dummy_path = os.path.join(base_dir, "_imgproc_warmup.png")
|
||
tmp = np.zeros((100, 100, 3), dtype=np.uint8) # 더 현실적인 크기
|
||
cv2.imwrite(dummy_path, tmp)
|
||
|
||
# 타임아웃 설정으로 무한 대기 방지
|
||
import threading
|
||
import time
|
||
|
||
warmup_success = [False]
|
||
warmup_error = [None]
|
||
|
||
def warmup_ocr():
|
||
try:
|
||
processor.ocr_module.detect_text(dummy_path)
|
||
warmup_success[0] = True
|
||
except Exception as e:
|
||
warmup_error[0] = e
|
||
|
||
warmup_thread = threading.Thread(target=warmup_ocr)
|
||
warmup_thread.daemon = True
|
||
warmup_thread.start()
|
||
warmup_thread.join(timeout=30) # 30초 타임아웃
|
||
|
||
if warmup_success[0]:
|
||
logger.info("✅ OCR 모듈 Warm-up 성공")
|
||
elif warmup_error[0]:
|
||
logger.warning(f"⚠️ OCR 모듈 Warm-up 실패: {warmup_error[0]}")
|
||
else:
|
||
logger.warning("⚠️ OCR 모듈 Warm-up 타임아웃")
|
||
|
||
try:
|
||
os.remove(dummy_path)
|
||
except Exception:
|
||
pass
|
||
|
||
except Exception as e:
|
||
logger.warning(f"OCR Warm-up 실패: {e}")
|
||
else:
|
||
logger.warning("OCR 모듈이 초기화되지 않아 Warm-up 건너뜀")
|
||
|
||
logger.info("🔰 ImageProcessor Warm‑up 완료")
|
||
|
||
except Exception as e:
|
||
logger.error(f"ImageProcessor 초기화 실패: {e}", exc_info=True)
|
||
|
||
# 초기화 실패 시에도 기본적인 처리가 가능하도록 최소한의 processor 생성 시도
|
||
try:
|
||
logger.info("🔄 안전 모드로 재초기화 시도...")
|
||
|
||
# GPU 설정을 CPU로 강제 변경
|
||
safe_toggle_states = toggle_states.copy()
|
||
safe_toggle_states['use_cuda'] = False
|
||
safe_toggle_states['optionIMGTrans_type'] = 'CPU'
|
||
safe_toggle_states['detail_IMGTrans_type'] = 'CPU'
|
||
safe_toggle_states['thumb_trans_type'] = 'CPU'
|
||
safe_toggle_states['migan_use_cuda'] = False
|
||
|
||
processor = ImageProcessor3(
|
||
logger=logger,
|
||
page=None,
|
||
toggle_states=safe_toggle_states,
|
||
unwanted_words=unwanted_words,
|
||
authenticated_by_admin=authenticated_by_admin,
|
||
base_dir=base_dir,
|
||
papago_translator=None,
|
||
)
|
||
logger.info("✅ 안전 모드로 ImageProcessor 초기화 성공")
|
||
|
||
except Exception as e2:
|
||
logger.error(f"안전 모드 초기화도 실패: {e2}", exc_info=True)
|
||
processor = None
|
||
|
||
# ── READY(OK) 신호 전송 ──────────────────────────────────
|
||
try:
|
||
result_q.put({"id": "__READY__", "data": "OK"})
|
||
logger.info("워커 READY 신호 전송")
|
||
except Exception:
|
||
logger.error("READY 신호 전송 실패", exc_info=True)
|
||
|
||
# ── rembg 세션 비동기 로딩 ──────────────────────────────
|
||
def preload_rembg_sessions():
|
||
"""백그라운드에서 rembg 세션을 미리 로딩"""
|
||
try:
|
||
if processor and hasattr(processor, 'background_removal_module'):
|
||
logger.info("🔄 rembg 세션 백그라운드 로딩 시작...")
|
||
# 기본 세션들을 미리 로딩
|
||
processor.background_removal_module._preload_sessions()
|
||
logger.info("✅ rembg 세션 백그라운드 로딩 완료")
|
||
except Exception as e:
|
||
logger.warning(f"rembg 세션 백그라운드 로딩 실패: {e}")
|
||
|
||
# rembg 세션 로딩을 별도 Thread에서 실행
|
||
import threading
|
||
preload_thread = threading.Thread(target=preload_rembg_sessions, daemon=True)
|
||
preload_thread.start()
|
||
|
||
# ── 작업 루프 ─────────────────────────────────────────────
|
||
# 컨트롤러가 즉시 pick-up 할 수 있도록 READY 신호를 추가 전송
|
||
try:
|
||
result_q.put({"id": "__READY__", "cmd": "__READY__", "kwargs": {}})
|
||
logger.info("📡 추가 READY 신호 전송 완료")
|
||
except Exception as e:
|
||
logger.error(f"추가 READY 신호 전송 실패: {e}")
|
||
|
||
# ── 작업 루프 (비동기) ────────────────────────────────────────
|
||
|
||
async def process_task_async(task):
|
||
uid = task.get("id")
|
||
cmd = task.get("cmd")
|
||
kwargs = task.get("kwargs", {})
|
||
|
||
try:
|
||
logger.info(f"🚀 작업 처리 시작: cmd={cmd}, uid={uid}")
|
||
|
||
# 메타 파라미터 제거 및 실시간 값 반영
|
||
new_toggle = kwargs.pop("_toggle_states", None)
|
||
if new_toggle and processor:
|
||
processor.update_toggle_states(new_toggle)
|
||
|
||
_ = kwargs.pop("_base_dir", None)
|
||
upd_unwanted = kwargs.pop("_update_unwanted_texts", None)
|
||
if upd_unwanted and processor:
|
||
processor.update_unwanted_texts(upd_unwanted)
|
||
|
||
logger.debug(f"작업 실행 직전: cmd={cmd}")
|
||
|
||
data = None
|
||
if cmd == "process_single_image":
|
||
logger.debug("process_single_image 호출 직전")
|
||
# asyncio.run 제거 -> await 직접 호출
|
||
data = await processor.process_single_image(**kwargs)
|
||
|
||
try:
|
||
timings = getattr(processor, '_last_timings', None)
|
||
if isinstance(data, dict) and timings:
|
||
data['timings'] = timings
|
||
except Exception:
|
||
pass
|
||
logger.debug("process_single_image 호출 완료")
|
||
|
||
elif cmd == "remove_background":
|
||
logger.debug("remove_background 호출 직전")
|
||
data = await processor.remove_background(**kwargs)
|
||
logger.debug("remove_background 호출 완료")
|
||
|
||
elif cmd == "reinit_ocr":
|
||
# Blocking 작업이므로 run_in_executor 권장되나, 짧으면 그냥 실행
|
||
ok = processor.reset_ocr_module()
|
||
data = {"ok": bool(ok)}
|
||
|
||
elif cmd == "reinit_rembg":
|
||
try:
|
||
if hasattr(processor, 'background_removal_module'):
|
||
try:
|
||
del processor.background_removal_module
|
||
except Exception:
|
||
processor.background_removal_module = None
|
||
|
||
from modules.bria_background_removal_module import BriaBackgroundRemovalModule
|
||
model_path = processor.toggle_states.get('local_rembg_model_path')
|
||
processor.background_removal_module = BriaBackgroundRemovalModule(
|
||
logger=logger,
|
||
default_model=processor.toggle_states.get('local_model_name', 'bria-rmbg-1.4'),
|
||
gpu_manager=getattr(processor, 'gpu_manager', None),
|
||
local_rembg_model_path=model_path,
|
||
)
|
||
data = {"ok": True}
|
||
except Exception as e:
|
||
logger.error(f"REMBG 재초기화 실패: {e}")
|
||
data = {"ok": False, "error": str(e)}
|
||
|
||
elif cmd == "reset_migan":
|
||
try:
|
||
from modules.migan_module import build_migan_from_toggle
|
||
enhanced_toggle_states = processor.toggle_states.copy()
|
||
if 'migan_use_cuda' in enhanced_toggle_states and 'migan_use_accel' not in enhanced_toggle_states:
|
||
enhanced_toggle_states['migan_use_accel'] = enhanced_toggle_states['migan_use_cuda']
|
||
prov = kwargs.get('provider')
|
||
if prov:
|
||
enhanced_toggle_states['migan_provider_override'] = prov
|
||
|
||
processor.migan = build_migan_from_toggle(enhanced_toggle_states, logger=logger, gpu_manager=getattr(processor, 'gpu_manager', None))
|
||
data = {"ok": bool(processor.migan is not None)}
|
||
except Exception as mm_err:
|
||
logger.error(f"MIGAN 재설정 실패: {mm_err}")
|
||
data = {"ok": False, "error": str(mm_err)}
|
||
|
||
elif cmd == "__PING__":
|
||
data = "__PONG__"
|
||
|
||
elif cmd == "update_toggle_states":
|
||
data = {"ok": True}
|
||
|
||
else:
|
||
raise ValueError(f"unknown cmd: {cmd}")
|
||
|
||
logger.debug(f"작업 결과 반환 중: uid={uid}")
|
||
result_q.put({"id": uid, "data": data})
|
||
logger.debug(f"작업 결과 반환 완료: uid={uid}")
|
||
|
||
except Exception:
|
||
logger.error(f"작업 처리 중 오류: cmd={cmd}, uid={uid}")
|
||
logger.error("작업 처리 중 오류", exc_info=True)
|
||
result_q.put({"id": uid, "error": traceback.format_exc()})
|
||
|
||
async def main_loop():
|
||
active_tasks = set()
|
||
idle_log_last = 0.0
|
||
|
||
logger.info("🚀 Async Worker Loop 시작")
|
||
|
||
last_status_log = 0.0
|
||
|
||
while True:
|
||
now_ts = time.time()
|
||
|
||
# 1. 완료된 태스크 정리
|
||
if active_tasks:
|
||
done, active_tasks = await asyncio.wait(active_tasks, timeout=0.01)
|
||
# 예외 처리는 process_task_async 내부에서 수행하므로 여기선 done 확인만 함
|
||
|
||
# --- [Watchdog & Status Logging] ---
|
||
if active_tasks:
|
||
# 30초마다 상태 로그
|
||
if now_ts - last_status_log >= 30.0:
|
||
last_status_log = now_ts
|
||
try:
|
||
waiting_info = []
|
||
for t in active_tasks:
|
||
t_info = getattr(t, "_task_info", {})
|
||
t_uid = t_info.get("id", "unknown")
|
||
start_t = t_info.get("_started_at", now_ts)
|
||
elapsed = now_ts - start_t
|
||
waiting_info.append(f"{t_uid}({elapsed:.1f}s)")
|
||
|
||
# 200초 이상 경과 시 경고 (DirectML 등 행 의심)
|
||
if elapsed > 200:
|
||
logger.warning(f"⚠️ 태스크 {t_uid}가 {elapsed:.1f}초째 실행 중입니다. (행 의심)")
|
||
|
||
logger.info(f"⚡ 실행 중인 작업({len(active_tasks)}): {', '.join(waiting_info)}")
|
||
except Exception:
|
||
pass
|
||
# -----------------------------------
|
||
|
||
# 2. 동시성 제한 확인
|
||
limit = 1
|
||
if processor and processor.toggle_states:
|
||
limit = int(processor.toggle_states.get("detail_concurrency_limit", 1))
|
||
if limit < 1: limit = 1
|
||
|
||
# 실행 중인 태스크가 제한보다 많으면 대기 (단, 즉시 처리해야 할 시스템 태스크 고려 필요? -> 큐에서 꺼내봐야 아므로 일단 대기)
|
||
if len(active_tasks) >= limit:
|
||
await asyncio.sleep(0.1)
|
||
continue
|
||
|
||
# 3. 큐 폴링 (Non-blocking)
|
||
try:
|
||
task = task_q.get_nowait()
|
||
# 유휴 로그 리셋
|
||
# logger.info(f"🔥 작업 수신 성공")
|
||
except queue.Empty:
|
||
now_ts = time.time()
|
||
if now_ts - idle_log_last >= 600:
|
||
idle_log_last = now_ts
|
||
logger.info("대기 중(유휴)")
|
||
await asyncio.sleep(0.1)
|
||
continue
|
||
except Exception as e:
|
||
logger.error(f"작업 수신 중 오류: {e}", exc_info=True)
|
||
await asyncio.sleep(1.0)
|
||
continue
|
||
|
||
if task is None:
|
||
logger.info("Shutdown signal 수신 → 종료 대기")
|
||
if active_tasks:
|
||
await asyncio.wait(active_tasks)
|
||
break
|
||
|
||
# 4. 태스크 실행
|
||
cmd = task.get("cmd")
|
||
# 병렬 처리 허용 커맨드
|
||
if cmd in ("process_single_image", "remove_background"):
|
||
# 태스크 시작 시간 기록 (Watchdog용)
|
||
task["_started_at"] = time.time()
|
||
t = asyncio.create_task(process_task_async(task))
|
||
# 태스크 객체에 메타데이터 저장 (파이썬 3.8+ name 속성 활용 또는 커스텀 속성)
|
||
setattr(t, "_task_info", task)
|
||
active_tasks.add(t)
|
||
else:
|
||
# 설정 변경 등은 안전을 위해 기존 작업 완료 후 실행
|
||
if active_tasks:
|
||
logger.info(f"설정 변경 명령({cmd}) 감지 - 기존 작업({len(active_tasks)}) 완료 대기...")
|
||
await asyncio.wait(active_tasks)
|
||
active_tasks.clear()
|
||
|
||
# 동기 실행 (await)
|
||
await process_task_async(task)
|
||
|
||
# 종료 처리 (임시 파일 정리 등)
|
||
logger.info("Worker Loop 종료 및 정리")
|
||
try:
|
||
base_program = os.environ.get("PROGRAMDATA", r"C:\\ProgramData")
|
||
app_data_dir = os.path.join(base_program, "ImgWorker")
|
||
def _cleanup_dir(dp: str, older_than_sec: int = 300):
|
||
try:
|
||
now_ts = time.time()
|
||
thr = max(0, int(older_than_sec or 0))
|
||
for root, dirs, files in os.walk(dp, topdown=False):
|
||
for fn in files:
|
||
fp = os.path.join(root, fn)
|
||
try:
|
||
if thr <= 0:
|
||
os.remove(fp)
|
||
else:
|
||
mt = os.path.getmtime(fp)
|
||
if (now_ts - mt) >= thr:
|
||
os.remove(fp)
|
||
except Exception:
|
||
pass
|
||
for dn in dirs:
|
||
full = os.path.join(root, dn)
|
||
try:
|
||
if not os.listdir(full):
|
||
os.rmdir(full)
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
ttl = int(os.environ.get("IMGWK_CLEAN_OLDER_THAN_SEC", "300"))
|
||
for d in (os.path.join(app_data_dir, "incoming"), os.path.join(app_data_dir, "work"), os.path.join(app_data_dir, "output"), os.path.join(app_data_dir, "outputs")):
|
||
_cleanup_dir(d, older_than_sec=ttl)
|
||
except Exception:
|
||
pass
|
||
|
||
# Async Loop 실행
|
||
try:
|
||
asyncio.run(main_loop())
|
||
except KeyboardInterrupt:
|
||
pass
|
||
except Exception as e:
|
||
logger.error(f"Main Loop 치명적 오류: {e}", exc_info=True)
|