""" 세션풀 동적 관리 통합 모듈 - 기존 워커 시스템에 동적 세션풀 관리 기능 통합 - 런타임 중 메모리 모니터링 및 세션풀 조정 """ import logging import threading import time from typing import Dict, Any, Optional from contextlib import contextmanager from .dynamic_session_pool_manager import ( session_pool_manager, get_optimal_session_config, SessionPoolConfig ) class SessionPoolIntegrator: """세션풀 통합 관리자""" def __init__(self, logger: Optional[logging.Logger] = None): self.logger = logger or logging.getLogger(__name__) self._config: Optional[SessionPoolConfig] = None self._monitoring_thread: Optional[threading.Thread] = None self._monitoring_active = False self._session_pools = {} self._lock = threading.Lock() def initialize_dynamic_pools(self, lama_migan_ratio: tuple = (4, 6), worker_session_ratio: float = 3.0) -> SessionPoolConfig: """동적 세션풀 초기화""" self.logger.info("🚀 동적 세션풀 시스템 초기화 시작...") try: # GPU 정보 감지 memory_info = session_pool_manager.detect_gpu_info() # 최적 구성 계산 config = get_optimal_session_config( lama_migan_ratio=lama_migan_ratio, worker_session_ratio=worker_session_ratio ) self._config = config # 구성 요약 로깅 self._log_configuration() # 모니터링 시작 self.start_monitoring() self.logger.info("✅ 동적 세션풀 시스템 초기화 완료") return config except Exception as e: self.logger.error(f"❌ 동적 세션풀 초기화 실패: {e}") raise def _log_configuration(self): """구성 정보 로깅""" if not self._config: return config = self._config memory_info = session_pool_manager._memory_info self.logger.info("🎯 동적 세션풀 구성:") self.logger.info(f" GPU: {memory_info.gpu_name} ({memory_info.gpu_tier.value})") self.logger.info(f" VRAM: {memory_info.total_vram_mb}MB (사용가능: {memory_info.available_vram_mb}MB)") self.logger.info(f" 세션풀: MIGAN {config.migan_sessions}개, LAMA {config.lama_sessions}개, OCR {config.ocr_sessions}개") self.logger.info(f" 워커: {config.workers}개 (비율 {config.worker_session_ratio:.1f}:1)") self.logger.info(f" 메모리 사용: {config.total_vram_usage_mb}MB ({config.total_vram_usage_mb/memory_info.total_vram_mb*100:.1f}%)") self.logger.info(f" LAMA:MIGAN = {config.lama_migan_ratio}") self.logger.info(f" 예상 동시 클라이언트: {config.expected_concurrent_clients}명") def start_monitoring(self): """메모리 모니터링 시작""" if self._monitoring_active: return self._monitoring_active = True self._monitoring_thread = threading.Thread( target=self._monitoring_worker, daemon=True, name="SessionPoolMonitor" ) self._monitoring_thread.start() self.logger.info("📊 메모리 모니터링 시작") def stop_monitoring(self): """메모리 모니터링 중지""" self._monitoring_active = False if self._monitoring_thread: self._monitoring_thread.join(timeout=5) self.logger.info("📊 메모리 모니터링 중지") def _monitoring_worker(self): """모니터링 워커 스레드""" while self._monitoring_active: try: # 메모리 상태 체크 및 조정 adjusted = session_pool_manager.monitor_and_adjust() if adjusted: self.logger.warning("⚠️ 메모리 조정이 수행되었습니다") # 메모리 상태 로깅 memory_status = session_pool_manager.get_current_memory_status() if "error" not in memory_status: self.logger.info(f" 현재 VRAM 사용률: {memory_status['utilization_pct']}%") self.logger.info(f" 사용량: {memory_status['allocated_mb']}MB / {memory_status['total_mb']}MB") # 30초 대기 time.sleep(30) except Exception as e: self.logger.error(f"❌ 메모리 모니터링 오류: {e}") time.sleep(60) # 오류 시 더 긴 대기 def get_session_pool_config(self) -> Optional[SessionPoolConfig]: """현재 세션풀 구성 반환""" return self._config def get_memory_status(self) -> Dict[str, Any]: """현재 메모리 상태 반환""" return session_pool_manager.get_current_memory_status() def register_session_pool(self, pool_type: str, pool_instance): """세션풀 인스턴스 등록""" with self._lock: self._session_pools[pool_type] = pool_instance self.logger.info(f"✅ {pool_type} 세션풀 등록 완료") def get_session_pool(self, pool_type: str): """세션풀 인스턴스 반환""" with self._lock: return self._session_pools.get(pool_type) @contextmanager def session_context(self, pool_type: str): """세션 컨텍스트 매니저""" pool = self.get_session_pool(pool_type) if not pool: raise RuntimeError(f"세션풀 '{pool_type}'이 등록되지 않았습니다") session = None try: # 세션 획득 (구현에 따라 다름) session = pool.get_session() if hasattr(pool, 'get_session') else pool yield session finally: # 세션 반환 (구현에 따라 다름) if session and hasattr(pool, 'return_session'): pool.return_session(session) def generate_celery_config(self) -> Dict[str, Any]: """Celery 설정 생성""" if not self._config: raise RuntimeError("세션풀 구성이 초기화되지 않았습니다") config = self._config celery_config = { # 워커 설정 "worker_concurrency": config.workers, "worker_prefetch_multiplier": 1, # 세션풀 설정 "session_pools": { "migan": { "size": config.migan_sessions, "timeout": 30, "retry_attempts": 3 }, "lama": { "size": config.lama_sessions, "timeout": 30, "retry_attempts": 3 }, "ocr": { "size": config.ocr_sessions, "timeout": 15, "retry_attempts": 2 } }, # 메모리 관리 "memory_management": { "max_vram_usage_mb": config.total_vram_usage_mb, "safety_margin_mb": config.safety_margin_mb, "monitoring_interval": 30, "gc_threshold": 0.90 # 90% 사용률에서 GC 트리거 }, # 라우팅 전략 "routing": { "lama_migan_ratio": config.lama_migan_ratio, "default_method": "migan" if config.migan_sessions >= config.lama_sessions else "lama", "fallback_method": "lama" if config.migan_sessions >= config.lama_sessions else "migan" } } return celery_config def print_status(self): """현재 상태 출력""" if not self._config: print("❌ 세션풀이 초기화되지 않았습니다") return config = self._config memory_info = session_pool_manager._memory_info memory_status = self.get_memory_status() print(f""" 🚀 동적 세션풀 상태 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 🔧 GPU: {memory_info.gpu_name} ({memory_info.gpu_tier.value}) 💾 VRAM: {memory_info.total_vram_mb}MB 총용량 📊 현재 사용률: {memory_status.get('utilization_pct', 'N/A')}% 🎯 세션풀 구성: - MIGAN: {config.migan_sessions}개 ({config.migan_sessions * 1200}MB) - LAMA: {config.lama_sessions}개 ({config.lama_sessions * 500}MB) - OCR: {config.ocr_sessions}개 ({config.ocr_sessions * 400}MB) - 워커: {config.workers}개 📈 성능 지표: - LAMA:MIGAN 비율: {config.lama_migan_ratio} - 워커:세션 비율: {config.worker_session_ratio:.1f}:1 - 예상 동시 클라이언트: {config.expected_concurrent_clients}명 - 메모리 사용률: {config.total_vram_usage_mb/memory_info.total_vram_mb*100:.1f}% 🔄 모니터링: {'활성' if self._monitoring_active else '비활성'} ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """) # 전역 통합 관리자 인스턴스 session_integrator = SessionPoolIntegrator() def initialize_dynamic_session_pools(lama_migan_ratio: tuple = (4, 6), worker_session_ratio: float = 3.0, logger: Optional[logging.Logger] = None) -> SessionPoolConfig: """동적 세션풀 초기화 헬퍼 함수""" if logger: session_integrator.logger = logger return session_integrator.initialize_dynamic_pools( lama_migan_ratio=lama_migan_ratio, worker_session_ratio=worker_session_ratio ) def get_celery_config() -> Dict[str, Any]: """Celery 설정 반환 헬퍼 함수""" return session_integrator.generate_celery_config() def print_session_status(): """세션풀 상태 출력 헬퍼 함수""" session_integrator.print_status() if __name__ == "__main__": # 테스트 실행 import time logging.basicConfig(level=logging.INFO) print("🔍 동적 세션풀 통합 테스트...") # 4:6 비율로 초기화 config = initialize_dynamic_session_pools(lama_migan_ratio=(4, 6), worker_session_ratio=3.0) # 상태 출력 print_session_status() # Celery 설정 생성 celery_config = get_celery_config() print("\n📋 Celery 설정:") import json print(json.dumps(celery_config, indent=2, ensure_ascii=False)) # 짧은 모니터링 테스트 print("\n⏱️ 10초간 모니터링 테스트...") time.sleep(10) # 정리 session_integrator.stop_monitoring() print("✅ 테스트 완료")