""" 경량 모니터링 이벤트(JSONL) 기록 및 읽기 유틸 - worker 스케일 업/다운 - 세션 생성/회수 """ from __future__ import annotations import os import time import json from typing import Dict, Any, List LOG_DIR = "logs" os.makedirs(LOG_DIR, exist_ok=True) EVENT_LOG_PATH = os.path.join(LOG_DIR, "scale_events.jsonl") MAX_BYTES = 10 * 1024 * 1024 # 10MB BACKUP = 10 def _rotate_if_needed(): try: if os.path.exists(EVENT_LOG_PATH) and os.path.getsize(EVENT_LOG_PATH) > MAX_BYTES: ts = time.strftime("%Y%m%d-%H%M%S") os.replace(EVENT_LOG_PATH, os.path.join(LOG_DIR, f"scale_events_{ts}.jsonl")) rotated = [os.path.join(LOG_DIR, f) for f in os.listdir(LOG_DIR) if f.startswith("scale_events_")] rotated.sort(key=lambda p: os.path.getmtime(p), reverse=True) for p in rotated[BACKUP:]: try: os.remove(p) except Exception: pass except Exception: pass def append_event(event: Dict[str, Any]) -> None: try: _rotate_if_needed() if "timestamp" not in event: event["timestamp"] = time.time() with open(EVENT_LOG_PATH, "a", encoding="utf-8") as f: f.write(json.dumps(event, ensure_ascii=False) + "\n") except Exception: pass def read_recent_events(limit: int = 300) -> List[Dict[str, Any]]: try: if not os.path.exists(EVENT_LOG_PATH): return [] events: List[Dict[str, Any]] = [] with open(EVENT_LOG_PATH, "r", encoding="utf-8") as f: # 간단히 끝에서 limit줄만 읽기 (파일이 크지 않다고 가정) lines = f.readlines()[-limit:] for line in lines: line = line.strip() if not line: continue try: events.append(json.loads(line)) except Exception: continue return events except Exception: return []