handOver2/database/migrations.py

279 lines
9.0 KiB
Python

# -*- coding: utf-8 -*-
"""
데이터베이스 마이그레이션 모듈
데이터베이스 스키마 변경을 관리합니다.
이 모듈은 다음 기능을 제공합니다:
- 마이그레이션 버전 관리
- 스키마 업그레이드/다운그레이드
- 마이그레이션 이력 추적
"""
from datetime import datetime
from typing import List, Callable, Optional
from dataclasses import dataclass
from .db_manager import DatabaseManager, get_db
from core.logger import get_logger
# 로거 설정
logger = get_logger(__name__)
@dataclass
class Migration:
"""
마이그레이션 정의 클래스
Attributes:
version: 마이그레이션 버전
description: 마이그레이션 설명
upgrade: 업그레이드 SQL 또는 함수
downgrade: 다운그레이드 SQL 또는 함수
"""
version: int
description: str
upgrade: str
downgrade: str = ""
class MigrationManager:
"""
마이그레이션 관리자 클래스
데이터베이스 스키마 마이그레이션을 관리합니다.
Attributes:
db: 데이터베이스 관리자
migrations: 마이그레이션 목록
"""
def __init__(self):
"""마이그레이션 관리자 초기화"""
self.db = get_db()
self.migrations: List[Migration] = []
# 마이그레이션 테이블 생성
self._create_migration_table()
# 마이그레이션 정의
self._define_migrations()
def _create_migration_table(self):
"""마이그레이션 이력 테이블 생성"""
query = """
CREATE TABLE IF NOT EXISTS _migrations (
version INTEGER PRIMARY KEY,
description TEXT,
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
self.db.execute(query)
def _define_migrations(self):
"""마이그레이션 정의"""
# 마이그레이션 1: 초기 스키마 (db_manager에서 이미 생성)
self.migrations.append(Migration(
version=1,
description="Initial schema",
upgrade="-- Initial schema created in db_manager",
downgrade=""
))
# 마이그레이션 2: 고장 테이블에 심각도 필드 추가 (예시)
self.migrations.append(Migration(
version=2,
description="Add severity field to faults table",
upgrade="""
ALTER TABLE faults ADD COLUMN severity TEXT DEFAULT 'normal';
""",
downgrade="""
-- SQLite doesn't support DROP COLUMN directly
-- This would require table recreation
"""
))
# 마이그레이션 3: 사용자 테이블에 마지막 로그인 필드 추가 (예시)
self.migrations.append(Migration(
version=3,
description="Add last_login field to users table",
upgrade="""
ALTER TABLE users ADD COLUMN last_login DATETIME;
""",
downgrade=""
))
# 마이그레이션 4: todos 테이블에 category 필드 추가
self.migrations.append(Migration(
version=4,
description="Add category field to todos table",
upgrade="""
ALTER TABLE todos ADD COLUMN category TEXT DEFAULT '일반';
""",
downgrade=""
))
# 마이그레이션 5: faults 테이블에 column_number 필드 추가
self.migrations.append(Migration(
version=5,
description="Add column_number field to faults table",
upgrade="""
ALTER TABLE faults ADD COLUMN column_number TEXT;
""",
downgrade=""
))
# 마이그레이션 6: faults 테이블에 fault_source 필드 추가
self.migrations.append(Migration(
version=6,
description="Add fault_source field to faults table",
upgrade="""
ALTER TABLE faults ADD COLUMN fault_source TEXT;
""",
downgrade=""
))
# 마이그레이션 7: weather 테이블 생성
self.migrations.append(Migration(
version=7,
description="Create weather table",
upgrade="""
CREATE TABLE weather (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datetime TEXT NOT NULL,
location_name TEXT NOT NULL,
location_code TEXT NOT NULL,
temp INTEGER,
feels_like INTEGER,
humidity INTEGER,
wind_speed TEXT,
wind_direction TEXT,
precipitation_prob INTEGER,
weather_condition TEXT,
weather_icon TEXT,
created_at TEXT,
updated_at TEXT,
UNIQUE(datetime, location_code)
);
CREATE INDEX idx_weather_datetime_location ON weather(datetime, location_code);
CREATE INDEX idx_weather_location_code ON weather(location_code);
""",
downgrade=""
))
def get_current_version(self) -> int:
"""
현재 마이그레이션 버전을 반환합니다.
Returns:
현재 버전 (마이그레이션이 없으면 0)
"""
query = "SELECT MAX(version) as version FROM _migrations"
result = self.db.fetch_one(query)
return result['version'] if result and result['version'] else 0
def get_pending_migrations(self) -> List[Migration]:
"""
적용되지 않은 마이그레이션 목록을 반환합니다.
Returns:
대기 중인 마이그레이션 목록
"""
current = self.get_current_version()
return [m for m in self.migrations if m.version > current]
def apply_migration(self, migration: Migration) -> bool:
"""
단일 마이그레이션을 적용합니다.
Args:
migration: 적용할 마이그레이션
Returns:
적용 성공 여부
"""
try:
# 업그레이드 SQL 실행
if migration.upgrade.strip():
with self.db.get_connection() as conn:
conn.executescript(migration.upgrade)
conn.commit()
# 마이그레이션 이력 기록
query = """
INSERT INTO _migrations (version, description, applied_at)
VALUES (?, ?, ?)
"""
self.db.execute(
query,
(migration.version, migration.description, datetime.now().isoformat())
)
logger.info(f"마이그레이션 적용: v{migration.version} - {migration.description}")
return True
except Exception as e:
logger.error(f"마이그레이션 실패: v{migration.version} - {e}")
return False
def migrate(self, target_version: int = None) -> bool:
"""
마이그레이션을 실행합니다.
Args:
target_version: 목표 버전 (None이면 최신 버전)
Returns:
마이그레이션 성공 여부
"""
if target_version is None:
target_version = max(m.version for m in self.migrations) if self.migrations else 0
current = self.get_current_version()
if current >= target_version:
logger.info(f"마이그레이션 불필요: 현재 v{current}")
return True
# 적용할 마이그레이션 필터링
to_apply = [m for m in self.migrations
if current < m.version <= target_version]
to_apply.sort(key=lambda m: m.version)
for migration in to_apply:
if not self.apply_migration(migration):
return False
logger.info(f"마이그레이션 완료: v{current} -> v{target_version}")
return True
def rollback(self, target_version: int) -> bool:
"""
마이그레이션을 롤백합니다.
Args:
target_version: 목표 버전
Returns:
롤백 성공 여부
Note:
SQLite의 제한으로 인해 실제 롤백은 제한적입니다.
"""
current = self.get_current_version()
if current <= target_version:
logger.info(f"롤백 불필요: 현재 v{current}")
return True
logger.warning("SQLite 롤백은 제한적입니다. 백업에서 복원을 권장합니다.")
return False
def run_migrations():
"""마이그레이션을 실행합니다."""
manager = MigrationManager()
return manager.migrate()