# -*- coding: utf-8 -*- """ 데이터베이스 연결 관리 모듈 SQLite 데이터베이스 연결 및 관리 기능을 제공합니다. 이 모듈은 다음 기능을 제공합니다: - 데이터베이스 연결 관리 - 테이블 생성 및 마이그레이션 - 트랜잭션 관리 - 연결 풀 관리 """ import sqlite3 import threading from pathlib import Path from typing import Optional, List, Dict, Any, Tuple from contextlib import contextmanager from datetime import datetime from core.constants import DB_FILE, DATA_DIR from core.logger import get_logger from core.exceptions import ( DatabaseConnectionError, DatabaseQueryError, ) # 로거 설정 logger = get_logger(__name__) # ============================================================================ # SQL 스키마 정의 # ============================================================================ CREATE_TABLES_SQL = """ -- 사용자 테이블 CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, name TEXT NOT NULL, department TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'viewer', is_active INTEGER DEFAULT 1, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); -- 팀 테이블 CREATE TABLE IF NOT EXISTS teams ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, shift_type TEXT, is_active INTEGER DEFAULT 1, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); -- 지시 섹션 테이블 CREATE TABLE IF NOT EXISTS instructions ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_date DATE NOT NULL, created_team TEXT NOT NULL, instructor TEXT, instruction_content TEXT NOT NULL, instruction_date DATE, is_continuous INTEGER DEFAULT 0, team_confirmations TEXT DEFAULT '{}', is_completed INTEGER DEFAULT 0, completed_at DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, created_by INTEGER REFERENCES users(id) ); -- 고장 섹션 테이블 CREATE TABLE IF NOT EXISTS faults ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_date DATE NOT NULL, created_team TEXT NOT NULL, occurrence_date DATE, train_number TEXT, car_number TEXT, fault_code TEXT, device_category TEXT, occurrence_station TEXT, occurrence_time TIME, fault_content TEXT, action_content TEXT, action_team TEXT, team_confirmations TEXT DEFAULT '{}', is_completed INTEGER DEFAULT 0, completed_at DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, created_by INTEGER REFERENCES users(id) ); -- 작업 섹션 테이블 CREATE TABLE IF NOT EXISTS works ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_date DATE NOT NULL, created_team TEXT NOT NULL, work_date DATE, work_entity TEXT, target_train TEXT, target_device TEXT, work_content TEXT, remarks TEXT, team_confirmations TEXT DEFAULT '{}', is_completed INTEGER DEFAULT 0, completed_at DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, created_by INTEGER REFERENCES users(id) ); -- 기타 섹션 테이블 CREATE TABLE IF NOT EXISTS miscs ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_date DATE NOT NULL, created_team TEXT NOT NULL, reporter TEXT, report_content TEXT, remarks TEXT, related_document TEXT, team_confirmations TEXT DEFAULT '{}', is_completed INTEGER DEFAULT 0, completed_at DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, created_by INTEGER REFERENCES users(id) ); -- 일상검수 테이블 CREATE TABLE IF NOT EXISTS daily_inspections ( id INTEGER PRIMARY KEY AUTOINCREMENT, inspection_date DATE NOT NULL, shift_type TEXT NOT NULL, slot_number INTEGER NOT NULL, train_number TEXT, cleaning_type TEXT DEFAULT '없음', has_work INTEGER DEFAULT 0, work_content TEXT, is_work_completed INTEGER DEFAULT 0, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, created_by INTEGER REFERENCES users(id), UNIQUE(inspection_date, shift_type, slot_number) ); -- 할일 테이블 CREATE TABLE IF NOT EXISTS todos ( id INTEGER PRIMARY KEY AUTOINCREMENT, todo_date DATE NOT NULL, category TEXT DEFAULT '일반', target_train TEXT, schedule TEXT, content TEXT NOT NULL, is_completed INTEGER DEFAULT 0, completed_at TIMESTAMP, alarm_time TIMESTAMP, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, created_by INTEGER REFERENCES users(id) ); -- 메모 테이블 CREATE TABLE IF NOT EXISTS memos ( id INTEGER PRIMARY KEY AUTOINCREMENT, memo_date DATE NOT NULL, content TEXT NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, created_by INTEGER REFERENCES users(id) ); -- 설정 테이블 CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); -- 팀 인원 테이블 CREATE TABLE IF NOT EXISTS team_members ( id INTEGER PRIMARY KEY AUTOINCREMENT, team TEXT NOT NULL, position TEXT NOT NULL, name TEXT NOT NULL, "order" INTEGER DEFAULT 0, partner_id INTEGER REFERENCES team_members(id), is_active INTEGER DEFAULT 1, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); -- 당무 일정 테이블 CREATE TABLE IF NOT EXISTS duty_schedules ( id INTEGER PRIMARY KEY AUTOINCREMENT, duty_date DATE NOT NULL, team TEXT NOT NULL, shift_type TEXT NOT NULL, vice_leader_id INTEGER REFERENCES team_members(id), operator_id INTEGER REFERENCES team_members(id), vice_leader_name TEXT, operator_name TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE(duty_date, team, shift_type) ); -- 조치 단계 테이블 CREATE TABLE IF NOT EXISTS action_steps ( id INTEGER PRIMARY KEY AUTOINCREMENT, fault_id INTEGER NOT NULL REFERENCES faults(id) ON DELETE CASCADE, step_number INTEGER NOT NULL, action_content TEXT NOT NULL, action_team TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, created_by INTEGER REFERENCES users(id), UNIQUE(fault_id, step_number) ); -- 인덱스 생성 CREATE INDEX IF NOT EXISTS idx_instructions_created_date ON instructions(created_date); CREATE INDEX IF NOT EXISTS idx_instructions_is_completed ON instructions(is_completed); CREATE INDEX IF NOT EXISTS idx_faults_created_date ON faults(created_date); CREATE INDEX IF NOT EXISTS idx_faults_train_number ON faults(train_number); CREATE INDEX IF NOT EXISTS idx_faults_occurrence_date ON faults(occurrence_date); CREATE INDEX IF NOT EXISTS idx_works_work_date ON works(work_date); CREATE INDEX IF NOT EXISTS idx_works_target_train ON works(target_train); CREATE INDEX IF NOT EXISTS idx_daily_inspections_date ON daily_inspections(inspection_date); CREATE INDEX IF NOT EXISTS idx_todos_date ON todos(todo_date); CREATE INDEX IF NOT EXISTS idx_todos_is_completed ON todos(is_completed); CREATE INDEX IF NOT EXISTS idx_memos_date ON memos(memo_date); CREATE INDEX IF NOT EXISTS idx_action_steps_fault_id ON action_steps(fault_id); CREATE INDEX IF NOT EXISTS idx_action_steps_step_number ON action_steps(fault_id, step_number); """ # 기본 데이터 삽입 SQL INSERT_DEFAULT_DATA_SQL = """ -- 기본 팀 데이터 INSERT OR IGNORE INTO teams (name, shift_type, is_active) VALUES ('1팀', '주간', 1); INSERT OR IGNORE INTO teams (name, shift_type, is_active) VALUES ('2팀', '야간', 1); INSERT OR IGNORE INTO teams (name, shift_type, is_active) VALUES ('3팀', '주간', 1); INSERT OR IGNORE INTO teams (name, shift_type, is_active) VALUES ('4팀', '야간', 1); -- 기본 관리자 계정 (비밀번호: admin123) INSERT OR IGNORE INTO users (username, password_hash, name, department, role, is_active) VALUES ('admin', 'pbkdf2:sha256:260000$salt$hash', '관리자', '검수팀', 'admin', 1); """ # ============================================================================ # 데이터베이스 관리자 클래스 # ============================================================================ class DatabaseManager: """ 데이터베이스 관리자 클래스 싱글톤 패턴을 사용하여 애플리케이션 전역에서 하나의 인스턴스만 사용합니다. SQLite 데이터베이스 연결 및 기본 작업을 관리합니다. Attributes: db_path: 데이터베이스 파일 경로 connection: 현재 데이터베이스 연결 Examples: >>> db = DatabaseManager() >>> with db.get_connection() as conn: ... cursor = conn.execute("SELECT * FROM users") ... users = cursor.fetchall() """ _instance: Optional['DatabaseManager'] = None _lock = threading.Lock() def __new__(cls, db_path: Path = None): """싱글톤 패턴 구현""" with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self, db_path: Path = None): """ 데이터베이스 관리자 초기화 Args: db_path: 데이터베이스 파일 경로 (기본값: DB_FILE) """ # 이미 초기화된 경우 건너뛰기 if self._initialized: return self.db_path = db_path or DB_FILE self._local = threading.local() # 데이터 디렉토리 생성 DATA_DIR.mkdir(parents=True, exist_ok=True) # 데이터베이스 초기화 self._initialize_database() self._initialized = True logger.info(f"데이터베이스 관리자 초기화 완료: {self.db_path}") def _initialize_database(self): """데이터베이스 초기화 (테이블 생성 및 마이그레이션)""" try: with self.get_connection() as conn: # 외래 키 활성화 conn.execute("PRAGMA foreign_keys = ON") # 테이블 생성 conn.executescript(CREATE_TABLES_SQL) # 기본 데이터 삽입 conn.executescript(INSERT_DEFAULT_DATA_SQL) # 마이그레이션: daily_inspections 테이블 컬럼 추가 try: conn.execute("ALTER TABLE daily_inspections ADD COLUMN work_content TEXT") conn.execute("ALTER TABLE daily_inspections ADD COLUMN is_work_completed INTEGER DEFAULT 0") except sqlite3.OperationalError: pass # todos 테이블에 alarm_time 컬럼 추가 try: conn.execute("ALTER TABLE todos ADD COLUMN alarm_time TIMESTAMP") except sqlite3.OperationalError: pass # todos 테이블에 category 컬럼 추가 try: conn.execute("ALTER TABLE todos ADD COLUMN category TEXT DEFAULT '일반'") except sqlite3.OperationalError: pass conn.commit() logger.info("데이터베이스 테이블 초기화 완료") except Exception as e: logger.error(f"데이터베이스 초기화 실패: {e}") raise DatabaseConnectionError(f"데이터베이스 초기화 실패: {e}") @contextmanager def get_connection(self): """ 데이터베이스 연결을 반환하는 컨텍스트 매니저 스레드별로 별도의 연결을 유지합니다. Yields: sqlite3.Connection: 데이터베이스 연결 Examples: >>> with db.get_connection() as conn: ... cursor = conn.execute("SELECT * FROM users") """ try: # 스레드별 연결 가져오기 또는 생성 if not hasattr(self._local, 'connection') or self._local.connection is None: self._local.connection = sqlite3.connect( self.db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES ) # Row 팩토리 설정 (딕셔너리처럼 접근 가능) self._local.connection.row_factory = sqlite3.Row # 외래 키 활성화 self._local.connection.execute("PRAGMA foreign_keys = ON") yield self._local.connection except sqlite3.Error as e: logger.error(f"데이터베이스 연결 오류: {e}") raise DatabaseConnectionError(f"데이터베이스 연결 실패: {e}") def execute( self, query: str, params: Tuple = None, commit: bool = True ) -> sqlite3.Cursor: """ SQL 쿼리를 실행합니다. Args: query: SQL 쿼리 params: 쿼리 파라미터 commit: 자동 커밋 여부 Returns: 실행 결과 커서 Examples: >>> cursor = db.execute( ... "INSERT INTO users (name) VALUES (?)", ... ("홍길동",) ... ) >>> print(cursor.lastrowid) """ try: with self.get_connection() as conn: if params: cursor = conn.execute(query, params) else: cursor = conn.execute(query) if commit: conn.commit() return cursor except sqlite3.Error as e: logger.error(f"쿼리 실행 오류: {query[:100]}... - {e}") raise DatabaseQueryError(f"쿼리 실행 실패: {e}", query) def execute_many( self, query: str, params_list: List[Tuple], commit: bool = True ) -> sqlite3.Cursor: """ 여러 SQL 쿼리를 일괄 실행합니다. Args: query: SQL 쿼리 params_list: 파라미터 리스트 commit: 자동 커밋 여부 Returns: 실행 결과 커서 """ try: with self.get_connection() as conn: cursor = conn.executemany(query, params_list) if commit: conn.commit() return cursor except sqlite3.Error as e: logger.error(f"일괄 쿼리 실행 오류: {e}") raise DatabaseQueryError(f"일괄 쿼리 실행 실패: {e}", query) def fetch_one( self, query: str, params: Tuple = None ) -> Optional[Dict[str, Any]]: """ 단일 레코드를 조회합니다. Args: query: SQL 쿼리 params: 쿼리 파라미터 Returns: 레코드 딕셔너리 또는 None Examples: >>> user = db.fetch_one( ... "SELECT * FROM users WHERE id = ?", ... (1,) ... ) """ try: with self.get_connection() as conn: if params: cursor = conn.execute(query, params) else: cursor = conn.execute(query) row = cursor.fetchone() return dict(row) if row else None except sqlite3.Error as e: logger.error(f"단일 조회 오류: {e}") raise DatabaseQueryError(f"조회 실패: {e}", query) def fetch_all( self, query: str, params: Tuple = None ) -> List[Dict[str, Any]]: """ 여러 레코드를 조회합니다. Args: query: SQL 쿼리 params: 쿼리 파라미터 Returns: 레코드 딕셔너리 리스트 Examples: >>> users = db.fetch_all("SELECT * FROM users WHERE is_active = 1") """ try: with self.get_connection() as conn: if params: cursor = conn.execute(query, params) else: cursor = conn.execute(query) rows = cursor.fetchall() return [dict(row) for row in rows] except sqlite3.Error as e: logger.error(f"다중 조회 오류: {e}") raise DatabaseQueryError(f"조회 실패: {e}", query) def table_exists(self, table_name: str) -> bool: """ 테이블 존재 여부를 확인합니다. Args: table_name: 테이블 이름 Returns: 테이블 존재 여부 """ query = """ SELECT name FROM sqlite_master WHERE type='table' AND name=? """ result = self.fetch_one(query, (table_name,)) return result is not None def get_table_columns(self, table_name: str) -> List[str]: """ 테이블의 컬럼 목록을 반환합니다. Args: table_name: 테이블 이름 Returns: 컬럼 이름 리스트 """ query = f"PRAGMA table_info({table_name})" rows = self.fetch_all(query) return [row['name'] for row in rows] def backup(self, backup_path: Path = None) -> bool: """ 데이터베이스를 백업합니다. Args: backup_path: 백업 파일 경로 Returns: 백업 성공 여부 """ try: if backup_path is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = DATA_DIR / f"backup_{timestamp}.db" with self.get_connection() as conn: backup_conn = sqlite3.connect(backup_path) conn.backup(backup_conn) backup_conn.close() logger.info(f"데이터베이스 백업 완료: {backup_path}") return True except Exception as e: logger.error(f"데이터베이스 백업 실패: {e}") return False def vacuum(self): """데이터베이스 최적화 (VACUUM)""" try: with self.get_connection() as conn: conn.execute("VACUUM") logger.info("데이터베이스 VACUUM 완료") except Exception as e: logger.error(f"데이터베이스 VACUUM 실패: {e}") def close(self): """현재 스레드의 연결을 닫습니다.""" if hasattr(self._local, 'connection') and self._local.connection: self._local.connection.close() self._local.connection = None logger.debug("데이터베이스 연결 종료") def close_all(self): """모든 연결을 닫습니다.""" self.close() DatabaseManager._instance = None logger.info("모든 데이터베이스 연결 종료") # ============================================================================ # 모듈 레벨 편의 함수 # ============================================================================ def get_db() -> DatabaseManager: """ 데이터베이스 관리자 인스턴스를 반환합니다. Returns: DatabaseManager 인스턴스 """ return DatabaseManager()