ImageProcessor_MainServer/worker/session_pool_integration.py

298 lines
11 KiB
Python

"""
세션풀 동적 관리 통합 모듈
- 기존 워커 시스템에 동적 세션풀 관리 기능 통합
- 런타임 중 메모리 모니터링 및 세션풀 조정
"""
import logging
import threading
import time
from typing import Dict, Any, Optional
from contextlib import contextmanager
from .dynamic_session_pool_manager import (
session_pool_manager,
get_optimal_session_config,
SessionPoolConfig
)
class SessionPoolIntegrator:
"""세션풀 통합 관리자"""
def __init__(self, logger: Optional[logging.Logger] = None):
self.logger = logger or logging.getLogger(__name__)
self._config: Optional[SessionPoolConfig] = None
self._monitoring_thread: Optional[threading.Thread] = None
self._monitoring_active = False
self._session_pools = {}
self._lock = threading.Lock()
def initialize_dynamic_pools(self,
lama_migan_ratio: tuple = (4, 6),
worker_session_ratio: float = 3.0) -> SessionPoolConfig:
"""동적 세션풀 초기화"""
self.logger.info("🚀 동적 세션풀 시스템 초기화 시작...")
try:
# GPU 정보 감지
memory_info = session_pool_manager.detect_gpu_info()
# 최적 구성 계산
config = get_optimal_session_config(
lama_migan_ratio=lama_migan_ratio,
worker_session_ratio=worker_session_ratio
)
self._config = config
# 구성 요약 로깅
self._log_configuration()
# 모니터링 시작
self.start_monitoring()
self.logger.info("✅ 동적 세션풀 시스템 초기화 완료")
return config
except Exception as e:
self.logger.error(f"❌ 동적 세션풀 초기화 실패: {e}")
raise
def _log_configuration(self):
"""구성 정보 로깅"""
if not self._config:
return
config = self._config
memory_info = session_pool_manager._memory_info
self.logger.info("🎯 동적 세션풀 구성:")
self.logger.info(f" GPU: {memory_info.gpu_name} ({memory_info.gpu_tier.value})")
self.logger.info(f" VRAM: {memory_info.total_vram_mb}MB (사용가능: {memory_info.available_vram_mb}MB)")
self.logger.info(f" 세션풀: MIGAN {config.migan_sessions}개, LAMA {config.lama_sessions}개, OCR {config.ocr_sessions}")
self.logger.info(f" 워커: {config.workers}개 (비율 {config.worker_session_ratio:.1f}:1)")
self.logger.info(f" 메모리 사용: {config.total_vram_usage_mb}MB ({config.total_vram_usage_mb/memory_info.total_vram_mb*100:.1f}%)")
self.logger.info(f" LAMA:MIGAN = {config.lama_migan_ratio}")
self.logger.info(f" 예상 동시 클라이언트: {config.expected_concurrent_clients}")
def start_monitoring(self):
"""메모리 모니터링 시작"""
if self._monitoring_active:
return
self._monitoring_active = True
self._monitoring_thread = threading.Thread(
target=self._monitoring_worker,
daemon=True,
name="SessionPoolMonitor"
)
self._monitoring_thread.start()
self.logger.info("📊 메모리 모니터링 시작")
def stop_monitoring(self):
"""메모리 모니터링 중지"""
self._monitoring_active = False
if self._monitoring_thread:
self._monitoring_thread.join(timeout=5)
self.logger.info("📊 메모리 모니터링 중지")
def _monitoring_worker(self):
"""모니터링 워커 스레드"""
while self._monitoring_active:
try:
# 메모리 상태 체크 및 조정
adjusted = session_pool_manager.monitor_and_adjust()
if adjusted:
self.logger.warning("⚠️ 메모리 조정이 수행되었습니다")
# 메모리 상태 로깅
memory_status = session_pool_manager.get_current_memory_status()
if "error" not in memory_status:
self.logger.info(f" 현재 VRAM 사용률: {memory_status['utilization_pct']}%")
self.logger.info(f" 사용량: {memory_status['allocated_mb']}MB / {memory_status['total_mb']}MB")
# 30초 대기
time.sleep(30)
except Exception as e:
self.logger.error(f"❌ 메모리 모니터링 오류: {e}")
time.sleep(60) # 오류 시 더 긴 대기
def get_session_pool_config(self) -> Optional[SessionPoolConfig]:
"""현재 세션풀 구성 반환"""
return self._config
def get_memory_status(self) -> Dict[str, Any]:
"""현재 메모리 상태 반환"""
return session_pool_manager.get_current_memory_status()
def register_session_pool(self, pool_type: str, pool_instance):
"""세션풀 인스턴스 등록"""
with self._lock:
self._session_pools[pool_type] = pool_instance
self.logger.info(f"{pool_type} 세션풀 등록 완료")
def get_session_pool(self, pool_type: str):
"""세션풀 인스턴스 반환"""
with self._lock:
return self._session_pools.get(pool_type)
@contextmanager
def session_context(self, pool_type: str):
"""세션 컨텍스트 매니저"""
pool = self.get_session_pool(pool_type)
if not pool:
raise RuntimeError(f"세션풀 '{pool_type}'이 등록되지 않았습니다")
session = None
try:
# 세션 획득 (구현에 따라 다름)
session = pool.get_session() if hasattr(pool, 'get_session') else pool
yield session
finally:
# 세션 반환 (구현에 따라 다름)
if session and hasattr(pool, 'return_session'):
pool.return_session(session)
def generate_celery_config(self) -> Dict[str, Any]:
"""Celery 설정 생성"""
if not self._config:
raise RuntimeError("세션풀 구성이 초기화되지 않았습니다")
config = self._config
celery_config = {
# 워커 설정
"worker_concurrency": config.workers,
"worker_prefetch_multiplier": 1,
# 세션풀 설정
"session_pools": {
"migan": {
"size": config.migan_sessions,
"timeout": 30,
"retry_attempts": 3
},
"lama": {
"size": config.lama_sessions,
"timeout": 30,
"retry_attempts": 3
},
"ocr": {
"size": config.ocr_sessions,
"timeout": 15,
"retry_attempts": 2
}
},
# 메모리 관리
"memory_management": {
"max_vram_usage_mb": config.total_vram_usage_mb,
"safety_margin_mb": config.safety_margin_mb,
"monitoring_interval": 30,
"gc_threshold": 0.90 # 90% 사용률에서 GC 트리거
},
# 라우팅 전략
"routing": {
"lama_migan_ratio": config.lama_migan_ratio,
"default_method": "migan" if config.migan_sessions >= config.lama_sessions else "lama",
"fallback_method": "lama" if config.migan_sessions >= config.lama_sessions else "migan"
}
}
return celery_config
def print_status(self):
"""현재 상태 출력"""
if not self._config:
print("❌ 세션풀이 초기화되지 않았습니다")
return
config = self._config
memory_info = session_pool_manager._memory_info
memory_status = self.get_memory_status()
print(f"""
🚀 동적 세션풀 상태
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🔧 GPU: {memory_info.gpu_name} ({memory_info.gpu_tier.value})
💾 VRAM: {memory_info.total_vram_mb}MB 총용량
📊 현재 사용률: {memory_status.get('utilization_pct', 'N/A')}%
🎯 세션풀 구성:
- MIGAN: {config.migan_sessions}개 ({config.migan_sessions * 1200}MB)
- LAMA: {config.lama_sessions}개 ({config.lama_sessions * 500}MB)
- OCR: {config.ocr_sessions}개 ({config.ocr_sessions * 400}MB)
- 워커: {config.workers}
📈 성능 지표:
- LAMA:MIGAN 비율: {config.lama_migan_ratio}
- 워커:세션 비율: {config.worker_session_ratio:.1f}:1
- 예상 동시 클라이언트: {config.expected_concurrent_clients}
- 메모리 사용률: {config.total_vram_usage_mb/memory_info.total_vram_mb*100:.1f}%
🔄 모니터링: {'활성' if self._monitoring_active else '비활성'}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
""")
# 전역 통합 관리자 인스턴스
session_integrator = SessionPoolIntegrator()
def initialize_dynamic_session_pools(lama_migan_ratio: tuple = (4, 6),
worker_session_ratio: float = 3.0,
logger: Optional[logging.Logger] = None) -> SessionPoolConfig:
"""동적 세션풀 초기화 헬퍼 함수"""
if logger:
session_integrator.logger = logger
return session_integrator.initialize_dynamic_pools(
lama_migan_ratio=lama_migan_ratio,
worker_session_ratio=worker_session_ratio
)
def get_celery_config() -> Dict[str, Any]:
"""Celery 설정 반환 헬퍼 함수"""
return session_integrator.generate_celery_config()
def print_session_status():
"""세션풀 상태 출력 헬퍼 함수"""
session_integrator.print_status()
if __name__ == "__main__":
# 테스트 실행
import time
logging.basicConfig(level=logging.INFO)
print("🔍 동적 세션풀 통합 테스트...")
# 4:6 비율로 초기화
config = initialize_dynamic_session_pools(lama_migan_ratio=(4, 6), worker_session_ratio=3.0)
# 상태 출력
print_session_status()
# Celery 설정 생성
celery_config = get_celery_config()
print("\n📋 Celery 설정:")
import json
print(json.dumps(celery_config, indent=2, ensure_ascii=False))
# 짧은 모니터링 테스트
print("\n⏱️ 10초간 모니터링 테스트...")
time.sleep(10)
# 정리
session_integrator.stop_monitoring()
print("✅ 테스트 완료")