279 lines
9.0 KiB
Python
279 lines
9.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
데이터베이스 마이그레이션 모듈
|
|
데이터베이스 스키마 변경을 관리합니다.
|
|
|
|
이 모듈은 다음 기능을 제공합니다:
|
|
- 마이그레이션 버전 관리
|
|
- 스키마 업그레이드/다운그레이드
|
|
- 마이그레이션 이력 추적
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from typing import List, Callable, Optional
|
|
from dataclasses import dataclass
|
|
|
|
from .db_manager import DatabaseManager, get_db
|
|
from core.logger import get_logger
|
|
|
|
# 로거 설정
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class Migration:
|
|
"""
|
|
마이그레이션 정의 클래스
|
|
|
|
Attributes:
|
|
version: 마이그레이션 버전
|
|
description: 마이그레이션 설명
|
|
upgrade: 업그레이드 SQL 또는 함수
|
|
downgrade: 다운그레이드 SQL 또는 함수
|
|
"""
|
|
version: int
|
|
description: str
|
|
upgrade: str
|
|
downgrade: str = ""
|
|
|
|
|
|
class MigrationManager:
|
|
"""
|
|
마이그레이션 관리자 클래스
|
|
|
|
데이터베이스 스키마 마이그레이션을 관리합니다.
|
|
|
|
Attributes:
|
|
db: 데이터베이스 관리자
|
|
migrations: 마이그레이션 목록
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""마이그레이션 관리자 초기화"""
|
|
self.db = get_db()
|
|
self.migrations: List[Migration] = []
|
|
|
|
# 마이그레이션 테이블 생성
|
|
self._create_migration_table()
|
|
|
|
# 마이그레이션 정의
|
|
self._define_migrations()
|
|
|
|
def _create_migration_table(self):
|
|
"""마이그레이션 이력 테이블 생성"""
|
|
query = """
|
|
CREATE TABLE IF NOT EXISTS _migrations (
|
|
version INTEGER PRIMARY KEY,
|
|
description TEXT,
|
|
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""
|
|
self.db.execute(query)
|
|
|
|
def _define_migrations(self):
|
|
"""마이그레이션 정의"""
|
|
# 마이그레이션 1: 초기 스키마 (db_manager에서 이미 생성)
|
|
self.migrations.append(Migration(
|
|
version=1,
|
|
description="Initial schema",
|
|
upgrade="-- Initial schema created in db_manager",
|
|
downgrade=""
|
|
))
|
|
|
|
# 마이그레이션 2: 고장 테이블에 심각도 필드 추가 (예시)
|
|
self.migrations.append(Migration(
|
|
version=2,
|
|
description="Add severity field to faults table",
|
|
upgrade="""
|
|
ALTER TABLE faults ADD COLUMN severity TEXT DEFAULT 'normal';
|
|
""",
|
|
downgrade="""
|
|
-- SQLite doesn't support DROP COLUMN directly
|
|
-- This would require table recreation
|
|
"""
|
|
))
|
|
|
|
# 마이그레이션 3: 사용자 테이블에 마지막 로그인 필드 추가 (예시)
|
|
self.migrations.append(Migration(
|
|
version=3,
|
|
description="Add last_login field to users table",
|
|
upgrade="""
|
|
ALTER TABLE users ADD COLUMN last_login DATETIME;
|
|
""",
|
|
downgrade=""
|
|
))
|
|
|
|
# 마이그레이션 4: todos 테이블에 category 필드 추가
|
|
self.migrations.append(Migration(
|
|
version=4,
|
|
description="Add category field to todos table",
|
|
upgrade="""
|
|
ALTER TABLE todos ADD COLUMN category TEXT DEFAULT '일반';
|
|
""",
|
|
downgrade=""
|
|
))
|
|
|
|
# 마이그레이션 5: faults 테이블에 column_number 필드 추가
|
|
self.migrations.append(Migration(
|
|
version=5,
|
|
description="Add column_number field to faults table",
|
|
upgrade="""
|
|
ALTER TABLE faults ADD COLUMN column_number TEXT;
|
|
""",
|
|
downgrade=""
|
|
))
|
|
|
|
# 마이그레이션 6: faults 테이블에 fault_source 필드 추가
|
|
self.migrations.append(Migration(
|
|
version=6,
|
|
description="Add fault_source field to faults table",
|
|
upgrade="""
|
|
ALTER TABLE faults ADD COLUMN fault_source TEXT;
|
|
""",
|
|
downgrade=""
|
|
))
|
|
|
|
# 마이그레이션 7: weather 테이블 생성
|
|
self.migrations.append(Migration(
|
|
version=7,
|
|
description="Create weather table",
|
|
upgrade="""
|
|
CREATE TABLE weather (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
datetime TEXT NOT NULL,
|
|
location_name TEXT NOT NULL,
|
|
location_code TEXT NOT NULL,
|
|
temp INTEGER,
|
|
feels_like INTEGER,
|
|
humidity INTEGER,
|
|
wind_speed TEXT,
|
|
wind_direction TEXT,
|
|
precipitation_prob INTEGER,
|
|
weather_condition TEXT,
|
|
weather_icon TEXT,
|
|
created_at TEXT,
|
|
updated_at TEXT,
|
|
UNIQUE(datetime, location_code)
|
|
);
|
|
CREATE INDEX idx_weather_datetime_location ON weather(datetime, location_code);
|
|
CREATE INDEX idx_weather_location_code ON weather(location_code);
|
|
""",
|
|
downgrade=""
|
|
))
|
|
|
|
def get_current_version(self) -> int:
|
|
"""
|
|
현재 마이그레이션 버전을 반환합니다.
|
|
|
|
Returns:
|
|
현재 버전 (마이그레이션이 없으면 0)
|
|
"""
|
|
query = "SELECT MAX(version) as version FROM _migrations"
|
|
result = self.db.fetch_one(query)
|
|
return result['version'] if result and result['version'] else 0
|
|
|
|
def get_pending_migrations(self) -> List[Migration]:
|
|
"""
|
|
적용되지 않은 마이그레이션 목록을 반환합니다.
|
|
|
|
Returns:
|
|
대기 중인 마이그레이션 목록
|
|
"""
|
|
current = self.get_current_version()
|
|
return [m for m in self.migrations if m.version > current]
|
|
|
|
def apply_migration(self, migration: Migration) -> bool:
|
|
"""
|
|
단일 마이그레이션을 적용합니다.
|
|
|
|
Args:
|
|
migration: 적용할 마이그레이션
|
|
|
|
Returns:
|
|
적용 성공 여부
|
|
"""
|
|
try:
|
|
# 업그레이드 SQL 실행
|
|
if migration.upgrade.strip():
|
|
with self.db.get_connection() as conn:
|
|
conn.executescript(migration.upgrade)
|
|
conn.commit()
|
|
|
|
# 마이그레이션 이력 기록
|
|
query = """
|
|
INSERT INTO _migrations (version, description, applied_at)
|
|
VALUES (?, ?, ?)
|
|
"""
|
|
self.db.execute(
|
|
query,
|
|
(migration.version, migration.description, datetime.now().isoformat())
|
|
)
|
|
|
|
logger.info(f"마이그레이션 적용: v{migration.version} - {migration.description}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"마이그레이션 실패: v{migration.version} - {e}")
|
|
return False
|
|
|
|
def migrate(self, target_version: int = None) -> bool:
|
|
"""
|
|
마이그레이션을 실행합니다.
|
|
|
|
Args:
|
|
target_version: 목표 버전 (None이면 최신 버전)
|
|
|
|
Returns:
|
|
마이그레이션 성공 여부
|
|
"""
|
|
if target_version is None:
|
|
target_version = max(m.version for m in self.migrations) if self.migrations else 0
|
|
|
|
current = self.get_current_version()
|
|
|
|
if current >= target_version:
|
|
logger.info(f"마이그레이션 불필요: 현재 v{current}")
|
|
return True
|
|
|
|
# 적용할 마이그레이션 필터링
|
|
to_apply = [m for m in self.migrations
|
|
if current < m.version <= target_version]
|
|
to_apply.sort(key=lambda m: m.version)
|
|
|
|
for migration in to_apply:
|
|
if not self.apply_migration(migration):
|
|
return False
|
|
|
|
logger.info(f"마이그레이션 완료: v{current} -> v{target_version}")
|
|
return True
|
|
|
|
def rollback(self, target_version: int) -> bool:
|
|
"""
|
|
마이그레이션을 롤백합니다.
|
|
|
|
Args:
|
|
target_version: 목표 버전
|
|
|
|
Returns:
|
|
롤백 성공 여부
|
|
|
|
Note:
|
|
SQLite의 제한으로 인해 실제 롤백은 제한적입니다.
|
|
"""
|
|
current = self.get_current_version()
|
|
|
|
if current <= target_version:
|
|
logger.info(f"롤백 불필요: 현재 v{current}")
|
|
return True
|
|
|
|
logger.warning("SQLite 롤백은 제한적입니다. 백업에서 복원을 권장합니다.")
|
|
return False
|
|
|
|
|
|
def run_migrations():
|
|
"""마이그레이션을 실행합니다."""
|
|
manager = MigrationManager()
|
|
return manager.migrate()
|
|
|
|
|