""" 일일 통계 수집 및 관리 - 처리된 이미지 수 - 네트워크 전송량 - API 호출 통계 """ from __future__ import annotations import os import time import json from datetime import datetime, timedelta from typing import Dict, Any from threading import Lock from collections import defaultdict LOG_DIR = "logs" os.makedirs(LOG_DIR, exist_ok=True) DAILY_STATS_PATH = os.path.join(LOG_DIR, "daily_stats.json") class DailyStatsCollector: def __init__(self): self.lock = Lock() self.current_date = datetime.now().strftime("%Y-%m-%d") self.stats = self._load_or_create_today_stats() def _load_or_create_today_stats(self) -> Dict[str, Any]: """오늘의 통계를 로드하거나 새로 생성""" today = datetime.now().strftime("%Y-%m-%d") try: if os.path.exists(DAILY_STATS_PATH): with open(DAILY_STATS_PATH, "r", encoding="utf-8") as f: all_stats = json.load(f) # 오늘 날짜의 통계가 있으면 반환 if today in all_stats: return all_stats[today] except Exception: pass # 새로운 통계 생성 return { "date": today, "images_processed": { "inpaint": 0, "remove_bg": 0, "gen_image": 0, "total": 0 }, "network": { "bytes_uploaded": 0, "bytes_downloaded": 0, "requests_count": 0 }, "api_calls": { "total": 0, "success": 0, "failed": 0 }, "models_used": defaultdict(int), "peak_concurrent": 0, "start_time": time.time(), "last_update": time.time() } def _check_date_rollover(self): """날짜가 바뀌면 통계 저장 및 리셋""" today = datetime.now().strftime("%Y-%m-%d") if today != self.current_date: # 어제 통계 저장 self._save_stats() # 새로운 날짜로 리셋 self.current_date = today self.stats = self._load_or_create_today_stats() def record_image_processed(self, endpoint_type: str): """이미지 처리 기록""" with self.lock: self._check_date_rollover() if endpoint_type == "inpaint": self.stats["images_processed"]["inpaint"] += 1 elif endpoint_type == "remove_bg": self.stats["images_processed"]["remove_bg"] += 1 elif endpoint_type == "gen_image": self.stats["images_processed"]["gen_image"] += 1 self.stats["images_processed"]["total"] += 1 self.stats["last_update"] = time.time() def record_network_traffic(self, bytes_uploaded: int, bytes_downloaded: int): """네트워크 트래픽 기록""" with self.lock: self._check_date_rollover() self.stats["network"]["bytes_uploaded"] += bytes_uploaded self.stats["network"]["bytes_downloaded"] += bytes_downloaded self.stats["network"]["requests_count"] += 1 self.stats["last_update"] = time.time() def record_api_call(self, success: bool): """API 호출 기록""" with self.lock: self._check_date_rollover() self.stats["api_calls"]["total"] += 1 if success: self.stats["api_calls"]["success"] += 1 else: self.stats["api_calls"]["failed"] += 1 self.stats["last_update"] = time.time() def record_model_usage(self, model_name: str): """모델 사용 기록""" with self.lock: self._check_date_rollover() if "models_used" not in self.stats: self.stats["models_used"] = {} self.stats["models_used"][model_name] = self.stats["models_used"].get(model_name, 0) + 1 self.stats["last_update"] = time.time() def update_peak_concurrent(self, current_concurrent: int): """최대 동시 요청 수 업데이트""" with self.lock: self._check_date_rollover() if current_concurrent > self.stats["peak_concurrent"]: self.stats["peak_concurrent"] = current_concurrent self.stats["last_update"] = time.time() def get_today_stats(self) -> Dict[str, Any]: """오늘의 통계 반환""" with self.lock: self._check_date_rollover() # 읽기 전용 복사본 반환 stats_copy = dict(self.stats) # MB/GB 단위로 변환된 값 추가 stats_copy["network"]["mb_uploaded"] = stats_copy["network"]["bytes_uploaded"] / (1024 * 1024) stats_copy["network"]["mb_downloaded"] = stats_copy["network"]["bytes_downloaded"] / (1024 * 1024) stats_copy["network"]["gb_uploaded"] = stats_copy["network"]["bytes_uploaded"] / (1024 * 1024 * 1024) stats_copy["network"]["gb_downloaded"] = stats_copy["network"]["bytes_downloaded"] / (1024 * 1024 * 1024) return stats_copy def get_historical_stats(self, days: int = 7) -> Dict[str, Any]: """최근 N일간의 통계 반환""" try: if not os.path.exists(DAILY_STATS_PATH): return {} with open(DAILY_STATS_PATH, "r", encoding="utf-8") as f: all_stats = json.load(f) # 최근 N일 필터링 cutoff_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") recent_stats = { date: stats for date, stats in all_stats.items() if date >= cutoff_date } return recent_stats except Exception: return {} def _save_stats(self): """현재 통계를 파일에 저장""" try: # 기존 통계 로드 all_stats = {} if os.path.exists(DAILY_STATS_PATH): with open(DAILY_STATS_PATH, "r", encoding="utf-8") as f: all_stats = json.load(f) # 현재 날짜 통계 업데이트 all_stats[self.current_date] = self.stats # 30일 이상 된 통계 삭제 cutoff_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d") all_stats = { date: stats for date, stats in all_stats.items() if date >= cutoff_date } # 파일에 저장 with open(DAILY_STATS_PATH, "w", encoding="utf-8") as f: json.dump(all_stats, f, indent=2, ensure_ascii=False) except Exception: pass def save(self): """수동 저장""" with self.lock: self._save_stats() # 글로벌 인스턴스 daily_stats = DailyStatsCollector()