# -*- coding: utf-8 -*- """ CRUD 연산 모듈 데이터베이스의 Create, Read, Update, Delete 연산을 제공합니다. 이 모듈은 모든 데이터베이스 입출력을 담당하며, 다른 모듈들은 이 모듈의 메서드를 통해서만 데이터베이스에 접근합니다. """ from datetime import datetime, date, time from typing import Optional, List, Dict, Any, Type, TypeVar from dataclasses import dataclass from .db_manager import DatabaseManager, get_db from .common_db_manager import CommonDatabaseManager from .models import ( BaseModel, User, Team, Instruction, Fault, Work, Misc, DailyInspection, Todo, Memo, Setting, TeamMember, DutySchedule, TrainSchedule, Weather, MODEL_REGISTRY, ) from core.logger import get_logger from core.exceptions import ( RecordNotFoundError, DuplicateRecordError, DatabaseQueryError, ) from core.signals import get_signals # 로거 설정 logger = get_logger(__name__) # 타입 변수 정의 T = TypeVar('T', bound=BaseModel) class CRUDManager: """ CRUD 연산 관리자 클래스 모든 데이터베이스 CRUD 연산을 제공합니다. 각 테이블별로 특화된 메서드와 공통 메서드를 제공합니다. Attributes: db: 데이터베이스 관리자 인스턴스 signals: 전역 시그널 인스턴스 Examples: >>> crud = CRUDManager() >>> instruction = crud.create_instruction( ... created_date=date.today(), ... created_team="A팀", ... instructor="팀장", ... instruction_content="지시 내용" ... ) >>> print(instruction.id) """ _instance: Optional['CRUDManager'] = None def __new__(cls): """싱글톤 패턴 구현""" if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): """CRUD 관리자 초기화""" if self._initialized: return self.db = get_db() self.signals = get_signals() self._initialized = True logger.info("CRUD 관리자 초기화 완료") # ======================================================================== # 공통 CRUD 메서드 # ======================================================================== def _create( self, table_name: str, model_class: Type[T], data: Dict[str, Any] ) -> T: """ 레코드를 생성합니다. Args: table_name: 테이블 이름 model_class: 모델 클래스 data: 생성할 데이터 Returns: 생성된 모델 인스턴스 """ # id, created_at, updated_at 제외 data = {k: v for k, v in data.items() if k not in ['id', 'created_at', 'updated_at'] and v is not None} # 컬럼과 값 분리 columns = list(data.keys()) values = list(data.values()) placeholders = ','.join(['?' for _ in columns]) # SQL 예약어인 컬럼명은 따옴표로 감싸기 quoted_columns = [f'"{col}"' if col.lower() in ['order', 'group', 'select', 'table'] else col for col in columns] query = f""" INSERT INTO {table_name} ({','.join(quoted_columns)}) VALUES ({placeholders}) """ try: cursor = self.db.execute(query, tuple(values)) record_id = cursor.lastrowid # 생성된 레코드 조회 result = self._get_by_id(table_name, model_class, record_id) # 시그널 발생 self.signals.record_created.emit(table_name, record_id) self.signals.data_changed.emit(table_name) logger.info(f"레코드 생성: {table_name} ID={record_id}") return result except Exception as e: logger.error(f"레코드 생성 실패: {table_name} - {e}") raise DatabaseQueryError(f"레코드 생성 실패: {e}") def _get_by_id( self, table_name: str, model_class: Type[T], record_id: int ) -> Optional[T]: """ ID로 레코드를 조회합니다. Args: table_name: 테이블 이름 model_class: 모델 클래스 record_id: 레코드 ID Returns: 모델 인스턴스 또는 None """ query = f"SELECT * FROM {table_name} WHERE id = ?" row = self.db.fetch_one(query, (record_id,)) if row: return model_class(**row) return None def _get_all( self, table_name: str, model_class: Type[T], order_by: str = "id DESC", limit: int = None, offset: int = None, **filters ) -> List[T]: """ 모든 레코드를 조회합니다. Args: table_name: 테이블 이름 model_class: 모델 클래스 order_by: 정렬 기준 limit: 최대 개수 offset: 시작 위치 **filters: 필터 조건 Returns: 모델 인스턴스 리스트 """ query = f"SELECT * FROM {table_name}" params = [] # WHERE 절 추가 # 특수 파라미터는 필터에서 제외 (include_completed, include_continuous 등) special_params = {'include_completed', 'include_continuous'} if filters: conditions = [] for key, value in filters.items(): # 특수 파라미터는 제외 if key in special_params: continue if value is not None: # SQL 예약어인 컬럼명은 따옴표로 감싸기 quoted_key = f'"{key}"' if key.lower() in ['order', 'group', 'select', 'table'] else key conditions.append(f"{quoted_key} = ?") params.append(value) if conditions: query += " WHERE " + " AND ".join(conditions) # ORDER BY 절 추가 if order_by: query += f" ORDER BY {order_by}" # LIMIT, OFFSET 추가 if limit: query += f" LIMIT {limit}" if offset: query += f" OFFSET {offset}" rows = self.db.fetch_all(query, tuple(params) if params else None) return [model_class(**row) for row in rows] def _update( self, table_name: str, model_class: Type[T], record_id: int, data: Dict[str, Any] ) -> Optional[T]: """ 레코드를 업데이트합니다. Args: table_name: 테이블 이름 model_class: 모델 클래스 record_id: 레코드 ID data: 업데이트할 데이터 Returns: 업데이트된 모델 인스턴스 """ # id, created_at 제외하고 updated_at 자동 설정 data = {k: v for k, v in data.items() if k not in ['id', 'created_at']} data['updated_at'] = datetime.now().isoformat() # SET 절 생성 (SQL 예약어인 컬럼명은 따옴표로 감싸기) set_clause = ', '.join([ f'"{k}" = ?' if k.lower() in ['order', 'group', 'select', 'table'] else f"{k} = ?" for k in data.keys() ]) values = list(data.values()) + [record_id] query = f""" UPDATE {table_name} SET {set_clause} WHERE id = ? """ try: self.db.execute(query, tuple(values)) # 업데이트된 레코드 조회 result = self._get_by_id(table_name, model_class, record_id) # 시그널 발생 self.signals.record_updated.emit(table_name, record_id) self.signals.data_changed.emit(table_name) logger.info(f"레코드 업데이트: {table_name} ID={record_id}") return result except Exception as e: logger.error(f"레코드 업데이트 실패: {table_name} ID={record_id} - {e}") raise DatabaseQueryError(f"레코드 업데이트 실패: {e}") def _delete(self, table_name: str, record_id: int) -> bool: """ 레코드를 삭제합니다. Args: table_name: 테이블 이름 record_id: 레코드 ID Returns: 삭제 성공 여부 """ query = f"DELETE FROM {table_name} WHERE id = ?" try: cursor = self.db.execute(query, (record_id,)) if cursor.rowcount > 0: # 시그널 발생 self.signals.record_deleted.emit(table_name, record_id) self.signals.data_changed.emit(table_name) logger.info(f"레코드 삭제: {table_name} ID={record_id}") return True return False except Exception as e: logger.error(f"레코드 삭제 실패: {table_name} ID={record_id} - {e}") raise DatabaseQueryError(f"레코드 삭제 실패: {e}") # ======================================================================== # 공통 데이터베이스용 헬퍼 메서드 # ======================================================================== def _create_with_db( self, db_manager, table_name: str, model_class: Type[T], data: Dict[str, Any] ) -> T: """ 공통 데이터베이스에 레코드를 생성합니다. Args: db_manager: 데이터베이스 관리자 (CommonDatabaseManager) table_name: 테이블 이름 model_class: 모델 클래스 data: 생성할 데이터 Returns: 생성된 모델 인스턴스 """ # id, created_at, updated_at 제외 data = {k: v for k, v in data.items() if k not in ['id', 'created_at', 'updated_at'] and v is not None} if not data: raise ValueError("생성할 데이터가 없습니다.") # 컬럼명과 값 분리 columns = list(data.keys()) placeholders = ', '.join(['?' for _ in columns]) values = [data[col] for col in columns] query = f""" INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders}) """ try: cursor = db_manager.execute(query, tuple(values)) record_id = cursor.lastrowid # 생성된 레코드 조회 result = self._get_by_id_with_db(db_manager, table_name, model_class, record_id) # 시그널 발생 self.signals.record_created.emit(table_name, record_id) self.signals.data_changed.emit(table_name) logger.info(f"레코드 생성: {table_name} ID={record_id}") return result except Exception as e: logger.error(f"레코드 생성 실패: {table_name} - {e}") raise DatabaseQueryError(f"레코드 생성 실패: {e}") def _get_by_id_with_db( self, db_manager, table_name: str, model_class: Type[T], record_id: int ) -> Optional[T]: """ 공통 데이터베이스에서 ID로 레코드를 조회합니다. Args: db_manager: 데이터베이스 관리자 (CommonDatabaseManager) table_name: 테이블 이름 model_class: 모델 클래스 record_id: 레코드 ID Returns: 모델 인스턴스 또는 None """ query = f"SELECT * FROM {table_name} WHERE id = ?" row = db_manager.fetch_one(query, (record_id,)) if row: return model_class(**row) return None def _update_with_db( self, db_manager, table_name: str, model_class: Type[T], record_id: int, data: Dict[str, Any] ) -> Optional[T]: """ 공통 데이터베이스의 레코드를 업데이트합니다. Args: db_manager: 데이터베이스 관리자 (CommonDatabaseManager) table_name: 테이블 이름 model_class: 모델 클래스 record_id: 레코드 ID data: 업데이트할 데이터 Returns: 업데이트된 모델 인스턴스 """ # id, created_at 제외하고 updated_at 자동 설정 data = {k: v for k, v in data.items() if k not in ['id', 'created_at']} data['updated_at'] = datetime.now().isoformat() # SET 절 생성 (SQL 예약어인 컬럼명은 따옴표로 감싸기) set_clause = ', '.join([ f'"{k}" = ?' if k.lower() in ['order', 'group', 'select', 'table'] else f"{k} = ?" for k in data.keys() ]) values = list(data.values()) + [record_id] query = f""" UPDATE {table_name} SET {set_clause} WHERE id = ? """ try: db_manager.execute(query, tuple(values)) # 업데이트된 레코드 조회 result = self._get_by_id_with_db(db_manager, table_name, model_class, record_id) # 시그널 발생 self.signals.record_updated.emit(table_name, record_id) self.signals.data_changed.emit(table_name) logger.info(f"레코드 업데이트: {table_name} ID={record_id}") return result except Exception as e: logger.error(f"레코드 업데이트 실패: {table_name} ID={record_id} - {e}") raise DatabaseQueryError(f"레코드 업데이트 실패: {e}") def _delete_with_db( self, db_manager, table_name: str, record_id: int ) -> bool: """ 공통 데이터베이스의 레코드를 삭제합니다. Args: db_manager: 데이터베이스 관리자 (CommonDatabaseManager) table_name: 테이블 이름 record_id: 레코드 ID Returns: 삭제 성공 여부 """ query = f"DELETE FROM {table_name} WHERE id = ?" try: cursor = db_manager.execute(query, (record_id,)) if cursor.rowcount > 0: # 시그널 발생 self.signals.record_deleted.emit(table_name, record_id) self.signals.data_changed.emit(table_name) logger.info(f"레코드 삭제: {table_name} ID={record_id}") return True return False except Exception as e: logger.error(f"레코드 삭제 실패: {table_name} ID={record_id} - {e}") raise DatabaseQueryError(f"레코드 삭제 실패: {e}") def _search( self, table_name: str, model_class: Type[T], search_fields: List[str], search_text: str, **filters ) -> List[T]: """ 텍스트 검색을 수행합니다. Args: table_name: 테이블 이름 model_class: 모델 클래스 search_fields: 검색할 필드 목록 search_text: 검색어 **filters: 추가 필터 조건 Returns: 모델 인스턴스 리스트 """ conditions = [] params = [] # 검색 조건 search_conditions = [f"{field} LIKE ?" for field in search_fields] conditions.append(f"({' OR '.join(search_conditions)})") params.extend([f"%{search_text}%" for _ in search_fields]) # 필터 조건 for key, value in filters.items(): if value is not None: conditions.append(f"{key} = ?") params.append(value) query = f""" SELECT * FROM {table_name} WHERE {' AND '.join(conditions)} ORDER BY id DESC """ rows = self.db.fetch_all(query, tuple(params)) return [model_class(**row) for row in rows] # ======================================================================== # User CRUD # ======================================================================== def create_user(self, **data) -> User: """사용자 생성""" return self._create("users", User, data) def get_user(self, user_id: int) -> Optional[User]: """ID로 사용자 조회""" return self._get_by_id("users", User, user_id) def get_user_by_username(self, username: str) -> Optional[User]: """사용자명으로 사용자 조회""" query = "SELECT * FROM users WHERE username = ?" row = self.db.fetch_one(query, (username,)) return User(**row) if row else None def get_all_users(self, **filters) -> List[User]: """모든 사용자 조회""" return self._get_all("users", User, **filters) def update_user(self, user_id: int, **data) -> Optional[User]: """사용자 업데이트""" return self._update("users", User, user_id, data) def delete_user(self, user_id: int) -> bool: """사용자 삭제""" return self._delete("users", user_id) # ======================================================================== # Instruction CRUD # ======================================================================== def create_instruction(self, **data) -> Instruction: """지시 생성""" return self._create("instructions", Instruction, data) def get_instruction(self, instruction_id: int) -> Optional[Instruction]: """ID로 지시 조회""" return self._get_by_id("instructions", Instruction, instruction_id) def get_all_instructions( self, order_by: str = "created_date DESC, id DESC", **filters ) -> List[Instruction]: """모든 지시 조회""" return self._get_all("instructions", Instruction, order_by=order_by, **filters) def get_instructions_by_date( self, target_date: date, include_continuous: bool = True ) -> List[Instruction]: """날짜별 지시 조회 (지속 지시 포함 가능)""" if include_continuous: query = """ SELECT * FROM instructions WHERE created_date = ? OR (is_continuous = 1 AND is_completed = 0) ORDER BY created_date DESC, id DESC """ else: query = """ SELECT * FROM instructions WHERE created_date = ? ORDER BY id DESC """ rows = self.db.fetch_all(query, (target_date.isoformat(),)) return [Instruction(**row) for row in rows] def update_instruction(self, instruction_id: int, **data) -> Optional[Instruction]: """지시 업데이트""" return self._update("instructions", Instruction, instruction_id, data) def delete_instruction(self, instruction_id: int) -> bool: """지시 삭제""" return self._delete("instructions", instruction_id) # ======================================================================== # Fault CRUD # ======================================================================== def create_fault(self, **data) -> Fault: """고장 생성""" return self._create("faults", Fault, data) def get_fault(self, fault_id: int) -> Optional[Fault]: """ID로 고장 조회""" return self._get_by_id("faults", Fault, fault_id) def get_all_faults( self, order_by: str = "occurrence_date DESC, id DESC", **filters ) -> List[Fault]: """모든 고장 조회""" return self._get_all("faults", Fault, order_by=order_by, **filters) def get_faults_by_train( self, train_number: str, limit: int = 10, months_back: int = 3 ) -> List[Fault]: """ 편성번호별 고장 조회 편성 필드 위에 마우스를 가져갈 때 사용됩니다. Args: train_number: 편성번호 limit: 최대 개수 months_back: 몇 달 전까지 조회할지 (기본 3달) Returns: 고장 리스트 """ from datetime import timedelta # 최근 N달 이내 데이터만 조회 end_date = date.today() start_date = end_date - timedelta(days=months_back * 30) query = """ SELECT * FROM faults WHERE train_number = ? AND occurrence_date >= ? AND occurrence_date <= ? ORDER BY occurrence_date DESC, id DESC LIMIT ? """ rows = self.db.fetch_all(query, ( train_number, start_date.isoformat(), end_date.isoformat(), limit )) return [Fault(**row) for row in rows] def get_faults_by_device( self, device_category: str, limit: int = 10, months_back: int = 3 ) -> List[Fault]: """ 장치분류별 고장 조회 장치분류 필드 위에 마우스를 가져갈 때 사용됩니다. Args: device_category: 장치분류 limit: 최대 개수 months_back: 몇 달 전까지 조회할지 (기본 3달) Returns: 고장 리스트 """ from datetime import timedelta # 최근 N달 이내 데이터만 조회 end_date = date.today() start_date = end_date - timedelta(days=months_back * 30) query = """ SELECT * FROM faults WHERE device_category = ? AND occurrence_date >= ? AND occurrence_date <= ? ORDER BY occurrence_date DESC, id DESC LIMIT ? """ rows = self.db.fetch_all(query, ( device_category, start_date.isoformat(), end_date.isoformat(), limit )) return [Fault(**row) for row in rows] def get_faults_by_code( self, fault_code: str, limit: int = 10, months_back: int = 3 ) -> List[Fault]: """ 고장코드별 고장 조회 고장코드 필드 위에 마우스를 가져갈 때 사용됩니다. Args: fault_code: 고장코드 limit: 최대 개수 months_back: 몇 달 전까지 조회할지 (기본 3달) Returns: 고장 리스트 """ from datetime import timedelta # 최근 N달 이내 데이터만 조회 end_date = date.today() start_date = end_date - timedelta(days=months_back * 30) query = """ SELECT * FROM faults WHERE fault_code = ? AND occurrence_date >= ? AND occurrence_date <= ? ORDER BY occurrence_date DESC, id DESC LIMIT ? """ rows = self.db.fetch_all(query, ( fault_code, start_date.isoformat(), end_date.isoformat(), limit )) return [Fault(**row) for row in rows] def get_faults_by_date_range( self, start_date: date, end_date: date, team: str = None ) -> List[Fault]: """ 날짜 범위로 고장 조회 Args: start_date: 시작 날짜 end_date: 종료 날짜 team: 팀 필터 (선택사항) Returns: 고장 리스트 """ query = """ SELECT * FROM faults WHERE occurrence_date BETWEEN ? AND ? """ params = [start_date.isoformat(), end_date.isoformat()] if team: query += " AND created_team = ?" params.append(team) query += " ORDER BY occurrence_date DESC, id DESC" rows = self.db.fetch_all(query, tuple(params)) return [Fault(**row) for row in rows] def search_faults(self, search_text: str) -> List[Fault]: """고장 검색""" return self._search( "faults", Fault, ["train_number", "fault_content", "action_content", "fault_code"], search_text ) def update_fault(self, fault_id: int, **data) -> Optional[Fault]: """고장 업데이트""" return self._update("faults", Fault, fault_id, data) def delete_fault(self, fault_id: int) -> bool: """고장 삭제""" return self._delete("faults", fault_id) # ======================================================================== # Work CRUD # ======================================================================== def create_work(self, **data) -> Work: """작업 생성""" return self._create("works", Work, data) def get_work(self, work_id: int) -> Optional[Work]: """ID로 작업 조회""" return self._get_by_id("works", Work, work_id) def get_all_works( self, order_by: str = "work_date DESC, id DESC", **filters ) -> List[Work]: """모든 작업 조회""" return self._get_all("works", Work, order_by=order_by, **filters) def get_works_by_train(self, train_number: str) -> List[Work]: """편성번호별 작업 조회""" return self._get_all("works", Work, target_train=train_number) def get_works_by_date(self, target_date: date) -> List[Work]: """날짜별 작업 조회""" return self._get_all("works", Work, work_date=target_date.isoformat()) def get_works_by_date_range( self, start_date: date, end_date: date, team: str = None ) -> List[Work]: """ 날짜 범위로 작업 조회 Args: start_date: 시작 날짜 end_date: 종료 날짜 team: 팀 필터 (선택사항) Returns: 작업 리스트 """ query = """ SELECT * FROM works WHERE work_date BETWEEN ? AND ? """ params = [start_date.isoformat(), end_date.isoformat()] if team: query += " AND created_team = ?" params.append(team) query += " ORDER BY work_date DESC, id DESC" rows = self.db.fetch_all(query, tuple(params)) return [Work(**row) for row in rows] def check_train_has_work(self, train_number: str, target_date: date) -> bool: """ 해당 편성에 작업이 있는지 확인 일상검수 편성 표시에 사용됩니다. Args: train_number: 편성번호 target_date: 날짜 Returns: 작업 존재 여부 """ query = """ SELECT COUNT(*) as count FROM works WHERE target_train = ? AND work_date = ? AND is_completed = 0 """ result = self.db.fetch_one(query, (train_number, target_date.isoformat())) return result['count'] > 0 if result else False def update_work(self, work_id: int, **data) -> Optional[Work]: """작업 업데이트""" return self._update("works", Work, work_id, data) def delete_work(self, work_id: int) -> bool: """작업 삭제""" return self._delete("works", work_id) # ======================================================================== # Misc CRUD # ======================================================================== def create_misc(self, **data) -> Misc: """기타 생성""" return self._create("miscs", Misc, data) def get_misc(self, misc_id: int) -> Optional[Misc]: """ID로 기타 조회""" return self._get_by_id("miscs", Misc, misc_id) def get_all_miscs( self, order_by: str = "created_date DESC, id DESC", **filters ) -> List[Misc]: """모든 기타 조회""" return self._get_all("miscs", Misc, order_by=order_by, **filters) def get_miscs_by_date_range( self, start_date: date, end_date: date, team: str = None ) -> List[Misc]: """ 날짜 범위로 기타 조회 Args: start_date: 시작 날짜 end_date: 종료 날짜 team: 팀 필터 (선택사항) Returns: 기타 리스트 """ query = """ SELECT * FROM miscs WHERE created_date BETWEEN ? AND ? """ params = [start_date.isoformat(), end_date.isoformat()] if team: query += " AND created_team = ?" params.append(team) query += " ORDER BY created_date DESC, id DESC" rows = self.db.fetch_all(query, tuple(params)) return [Misc(**row) for row in rows] def update_misc(self, misc_id: int, **data) -> Optional[Misc]: """기타 업데이트""" return self._update("miscs", Misc, misc_id, data) def delete_misc(self, misc_id: int) -> bool: """기타 삭제""" return self._delete("miscs", misc_id) # ======================================================================== # DailyInspection CRUD # ======================================================================== def create_daily_inspection(self, **data) -> DailyInspection: """일상검수 생성""" return self._create("daily_inspections", DailyInspection, data) def get_daily_inspection(self, inspection_id: int) -> Optional[DailyInspection]: """ID로 일상검수 조회""" return self._get_by_id("daily_inspections", DailyInspection, inspection_id) def get_daily_inspections_by_date( self, inspection_date: date, shift_type: str = None ) -> List[DailyInspection]: """날짜별 일상검수 조회""" filters = {"inspection_date": inspection_date.isoformat()} if shift_type: filters["shift_type"] = shift_type return self._get_all( "daily_inspections", DailyInspection, order_by="slot_number ASC", **filters ) def upsert_daily_inspection( self, inspection_date: date, shift_type: str, slot_number: int, train_number: str, cleaning_type: str = "없음", has_work: bool = False, created_by: int = None ) -> DailyInspection: """ 일상검수 생성 또는 업데이트 (UPSERT) Args: inspection_date: 검수일자 shift_type: 근무유형 slot_number: 슬롯번호 train_number: 편성번호 cleaning_type: 청소유형 has_work: 작업여부 created_by: 생성자 ID Returns: 생성/업데이트된 일상검수 """ # 기존 레코드 확인 query = """ SELECT * FROM daily_inspections WHERE inspection_date = ? AND shift_type = ? AND slot_number = ? """ row = self.db.fetch_one( query, (inspection_date.isoformat(), shift_type, slot_number) ) data = { "inspection_date": inspection_date.isoformat(), "shift_type": shift_type, "slot_number": slot_number, "train_number": train_number, "cleaning_type": cleaning_type, "has_work": has_work, "created_by": created_by, } if row: # 업데이트 return self._update("daily_inspections", DailyInspection, row['id'], data) else: # 생성 return self._create("daily_inspections", DailyInspection, data) def update_daily_inspection(self, inspection_id: int, **data) -> Optional[DailyInspection]: """일상검수 업데이트""" return self._update("daily_inspections", DailyInspection, inspection_id, data) def delete_daily_inspection(self, inspection_id: int) -> bool: """일상검수 삭제""" return self._delete("daily_inspections", inspection_id) # ======================================================================== # Todo CRUD # ======================================================================== def create_todo(self, **data) -> Todo: """할일 생성""" result = self._create("todos", Todo, data) self.signals.todo_added.emit(result.id) return result def get_todo(self, todo_id: int) -> Optional[Todo]: """ID로 할일 조회""" return self._get_by_id("todos", Todo, todo_id) def get_todos_by_date( self, todo_date: date, include_incomplete: bool = True ) -> List[Todo]: """날짜별 할일 조회""" if include_incomplete: query = """ SELECT * FROM todos WHERE todo_date = ? OR (todo_date < ? AND is_completed = 0) ORDER BY is_completed ASC, todo_date DESC, id DESC """ rows = self.db.fetch_all( query, (todo_date.isoformat(), todo_date.isoformat()) ) else: query = """ SELECT * FROM todos WHERE todo_date = ? ORDER BY is_completed ASC, id DESC """ rows = self.db.fetch_all(query, (todo_date.isoformat(),)) return [Todo(**row) for row in rows] def update_todo(self, todo_id: int, **data) -> Optional[Todo]: """할일 업데이트""" result = self._update("todos", Todo, todo_id, data) if result and 'is_completed' in data: self.signals.todo_status_changed.emit(todo_id, data['is_completed']) return result def toggle_todo_complete(self, todo_id: int) -> Optional[Todo]: """할일 완료 상태 토글""" todo = self.get_todo(todo_id) if todo: new_status = not todo.is_completed completed_at = datetime.now().isoformat() if new_status else None return self.update_todo( todo_id, is_completed=new_status, completed_at=completed_at ) return None def delete_todo(self, todo_id: int) -> bool: """할일 삭제""" return self._delete("todos", todo_id) # ======================================================================== # Memo CRUD # ======================================================================== def create_memo(self, **data) -> Memo: """메모 생성""" return self._create("memos", Memo, data) def get_memo(self, memo_id: int) -> Optional[Memo]: """ID로 메모 조회""" return self._get_by_id("memos", Memo, memo_id) def get_memos_by_date(self, memo_date: date) -> List[Memo]: """날짜별 메모 조회""" return self._get_all("memos", Memo, memo_date=memo_date.isoformat()) def get_latest_memo(self, memo_date: date) -> Optional[Memo]: """최신 메모 조회""" query = """ SELECT * FROM memos WHERE memo_date = ? ORDER BY updated_at DESC LIMIT 1 """ row = self.db.fetch_one(query, (memo_date.isoformat(),)) return Memo(**row) if row else None def upsert_memo( self, memo_date: date, content: str, created_by: int = None ) -> Memo: """메모 생성 또는 업데이트""" memo = self.get_latest_memo(memo_date) if memo: return self.update_memo(memo.id, content=content) else: return self.create_memo( memo_date=memo_date.isoformat(), content=content, created_by=created_by ) def update_memo(self, memo_id: int, **data) -> Optional[Memo]: """메모 업데이트""" result = self._update("memos", Memo, memo_id, data) if result: self.signals.memo_changed.emit(memo_id) return result def delete_memo(self, memo_id: int) -> bool: """메모 삭제""" return self._delete("memos", memo_id) # ======================================================================== # Setting CRUD # ======================================================================== def get_setting(self, key: str) -> Optional[str]: """설정 값 조회""" query = "SELECT value FROM settings WHERE key = ?" row = self.db.fetch_one(query, (key,)) return row['value'] if row else None def set_setting(self, key: str, value: str) -> bool: """설정 값 저장""" query = """ INSERT OR REPLACE INTO settings (key, value, updated_at) VALUES (?, ?, ?) """ try: self.db.execute(query, (key, value, datetime.now().isoformat())) return True except Exception as e: logger.error(f"설정 저장 실패: {key} - {e}") return False def get_all_settings(self) -> Dict[str, str]: """모든 설정 조회""" query = "SELECT key, value FROM settings" rows = self.db.fetch_all(query) return {row['key']: row['value'] for row in rows} # ======================================================================== # TeamMember CRUD # ======================================================================== def create_team_member(self, **data) -> TeamMember: """팀 인원 생성""" return self._create("team_members", TeamMember, data) def get_team_member(self, member_id: int) -> Optional[TeamMember]: """ID로 팀 인원 조회""" return self._get_by_id("team_members", TeamMember, member_id) def get_team_members_by_team( self, team: str, position: str = None, active_only: bool = True ) -> List[TeamMember]: """ 팀별 인원 조회 Args: team: 팀 (1팀, 2팀, 3팀, 4팀) position: 직책 필터 (부팀장, 운용) active_only: 활성화된 인원만 Returns: 팀 인원 리스트 """ filters = {"team": team} if position: filters["position"] = position if active_only: filters["is_active"] = 1 return self._get_all( "team_members", TeamMember, order_by='"order" ASC, id ASC', **filters ) def get_all_team_members(self, active_only: bool = True) -> List[TeamMember]: """모든 팀 인원 조회""" if active_only: return self._get_all("team_members", TeamMember, is_active=1) return self._get_all("team_members", TeamMember) def update_team_member(self, member_id: int, **data) -> Optional[TeamMember]: """팀 인원 업데이트""" return self._update("team_members", TeamMember, member_id, data) def delete_team_member(self, member_id: int) -> bool: """팀 인원 삭제 (비활성화)""" return self._update("team_members", TeamMember, member_id, {"is_active": False}) is not None def set_partner(self, member_id: int, partner_id: Optional[int] = None) -> bool: """짝궁 설정/해제""" # 기존 파트너 해제 member = self.get_team_member(member_id) if member and member.partner_id: old_partner = self.get_team_member(member.partner_id) if old_partner: self._update("team_members", TeamMember, old_partner.id, {"partner_id": None}) # 새 파트너 설정 self._update("team_members", TeamMember, member_id, {"partner_id": partner_id}) if partner_id: # 양방향 짝궁 설정 self._update("team_members", TeamMember, partner_id, {"partner_id": member_id}) return True # ======================================================================== # DutySchedule CRUD # ======================================================================== def create_duty_schedule(self, **data) -> DutySchedule: """당무 일정 생성""" return self._create("duty_schedules", DutySchedule, data) def get_duty_schedule( self, duty_date: date, team: str, shift_type: str ) -> Optional[DutySchedule]: """ 당무 일정 조회 Args: duty_date: 날짜 team: 팀 shift_type: 근무 유형 Returns: 당무 일정 또는 None """ query = """ SELECT * FROM duty_schedules WHERE duty_date = ? AND team = ? AND shift_type = ? """ row = self.db.fetch_one( query, (duty_date.isoformat(), team, shift_type) ) return DutySchedule(**row) if row else None def upsert_duty_schedule( self, duty_date: date, team: str, shift_type: str, vice_leader_id: int = None, operator_id: int = None, vice_leader_name: str = "", operator_name: str = "" ) -> DutySchedule: """ 당무 일정 생성 또는 업데이트 Args: duty_date: 날짜 team: 팀 shift_type: 근무 유형 vice_leader_id: 부팀장 ID operator_id: 운용 ID vice_leader_name: 부팀장 이름 operator_name: 운용 이름 Returns: 생성/업데이트된 당무 일정 """ existing = self.get_duty_schedule(duty_date, team, shift_type) data = { "duty_date": duty_date.isoformat(), "team": team, "shift_type": shift_type, "vice_leader_id": vice_leader_id, "operator_id": operator_id, "vice_leader_name": vice_leader_name, "operator_name": operator_name, } if existing: return self._update("duty_schedules", DutySchedule, existing.id, data) else: return self._create("duty_schedules", DutySchedule, data) def get_duty_schedules_by_date_range( self, start_date: date, end_date: date, team: str = None ) -> List[DutySchedule]: """날짜 범위로 당무 일정 조회""" query = """ SELECT * FROM duty_schedules WHERE duty_date BETWEEN ? AND ? """ params = [start_date.isoformat(), end_date.isoformat()] if team: query += " AND team = ?" params.append(team) query += " ORDER BY duty_date ASC" rows = self.db.fetch_all(query, tuple(params)) return [DutySchedule(**row) for row in rows] def get_next_duty_member( self, team: str, position: str, current_id: int = None ) -> Optional[TeamMember]: """ 다음 당무자 조회 (순번 기준) Args: team: 팀 position: 직책 current_id: 현재 당무자 ID Returns: 다음 당무자 또는 None """ members = self.get_team_members_by_team(team, position) if not members: return None if current_id is None: return members[0] # 현재 당무자의 인덱스 찾기 current_idx = None for idx, member in enumerate(members): if member.id == current_id: current_idx = idx break if current_idx is None: return members[0] # 다음 순번 (순환) next_idx = (current_idx + 1) % len(members) return members[next_idx] # ======================================================================== # 팀 확인 관련 메서드 # ======================================================================== def update_team_confirmation( self, table_name: str, record_id: int, team: str, confirmed: bool ) -> bool: """ 팀 확인 상태를 업데이트합니다. Args: table_name: 테이블 이름 (instructions, faults, works, miscs) record_id: 레코드 ID team: 팀 이름 confirmed: 확인 여부 Returns: 업데이트 성공 여부 """ model_class = MODEL_REGISTRY.get(table_name) if not model_class: return False record = self._get_by_id(table_name, model_class, record_id) if not record: return False # 팀 확인 상태 업데이트 record.set_team_confirmation(team, confirmed) data = { "team_confirmations": record.team_confirmations, "is_completed": record.is_completed, } if record.is_completed and record.completed_at: data["completed_at"] = record.completed_at.isoformat() self._update(table_name, model_class, record_id, data) return True # ======================================================================== # 통계 메서드 # ======================================================================== # ======================================================================== # TrainSchedule CRUD # ======================================================================== def create_train_schedule(self, **data) -> TrainSchedule: """열차 다이아 시각표 생성 (공통 데이터베이스 사용)""" common_db = CommonDatabaseManager() return self._create_with_db(common_db, "train_schedules", TrainSchedule, data) def get_train_schedule(self, schedule_id: int) -> Optional[TrainSchedule]: """ID로 열차 다이아 시각표 조회 (공통 데이터베이스 사용)""" common_db = CommonDatabaseManager() return self._get_by_id_with_db(common_db, "train_schedules", TrainSchedule, schedule_id) def get_train_schedule_by_column_station( self, column_number: str, station: str, is_weekday: bool = True ) -> Optional[TrainSchedule]: """ 열번과 역명으로 시각표 조회 (공통 데이터베이스 사용) Args: column_number: 열번 station: 역명 is_weekday: 평일 여부 Returns: TrainSchedule 또는 None """ common_db = CommonDatabaseManager() query = """ SELECT * FROM train_schedules WHERE column_number = ? AND station = ? AND is_weekday = ? AND is_active = 1 """ row = common_db.fetch_one(query, (column_number, station, is_weekday)) return TrainSchedule(**row) if row else None def get_schedules_by_column( self, column_number: str, is_weekday: bool = True ) -> List[TrainSchedule]: """ 열번으로 전체 경로 시각표 조회 (공통 데이터베이스 사용) Args: column_number: 열번 is_weekday: 평일 여부 Returns: 역 순서대로 정렬된 시각표 리스트 """ common_db = CommonDatabaseManager() query = """ SELECT * FROM train_schedules WHERE column_number = ? AND is_weekday = ? AND is_active = 1 ORDER BY arrival_time ASC """ rows = common_db.fetch_all(query, (column_number, is_weekday)) return [TrainSchedule(**row) for row in rows] def get_schedules_by_station( self, station: str, is_weekday: bool = True ) -> List[TrainSchedule]: """ 역명으로 해당 역 통과 열차 시각표 조회 (공통 데이터베이스 사용) Args: station: 역명 is_weekday: 평일 여부 Returns: 시간순 정렬된 시각표 리스트 """ common_db = CommonDatabaseManager() query = """ SELECT * FROM train_schedules WHERE station = ? AND is_weekday = ? AND is_active = 1 ORDER BY arrival_time ASC """ rows = common_db.fetch_all(query, (station, is_weekday)) return [TrainSchedule(**row) for row in rows] def upsert_train_schedule( self, column_number: str, station: str, arrival_time: time = None, departure_time: time = None, direction: str = "up", is_weekday: bool = True ) -> TrainSchedule: """ 열차 다이아 시각표 생성 또는 업데이트 (UPSERT, 공통 데이터베이스 사용) Args: column_number: 열번 station: 역명 arrival_time: 도착 시간 departure_time: 출발 시간 direction: 방향 is_weekday: 평일 여부 Returns: 생성/업데이트된 TrainSchedule """ existing = self.get_train_schedule_by_column_station(column_number, station, is_weekday) common_db = CommonDatabaseManager() data = { "column_number": column_number, "station": station, "arrival_time": arrival_time.isoformat() if arrival_time else None, "departure_time": departure_time.isoformat() if departure_time else None, "direction": direction, "is_weekday": is_weekday, } if existing: return self._update_with_db(common_db, "train_schedules", TrainSchedule, existing.id, data) else: return self._create_with_db(common_db, "train_schedules", TrainSchedule, data) def delete_train_schedule(self, schedule_id: int) -> bool: """열차 다이아 시각표 삭제 (공통 데이터베이스 사용)""" common_db = CommonDatabaseManager() return self._delete_with_db(common_db, "train_schedules", schedule_id) def estimate_time_by_column_station( self, column_number: str, station: str, occurrence_date: date = None ) -> Optional[time]: """ 열번과 역명으로 발생 시간 추정 Args: column_number: 열번 station: 역명 occurrence_date: 발생일 (평일/주말 판단용) Returns: 추정 시간 또는 None """ from datetime import datetime # 평일/주말 판단 if occurrence_date: is_weekday = occurrence_date.weekday() < 5 # 0~4: 월~금 else: is_weekday = True schedule = self.get_train_schedule_by_column_station( column_number, station, is_weekday ) if schedule: return schedule.arrival_time or schedule.departure_time return None # ======================================================================== # 날씨 메서드 # ======================================================================== def upsert_weather( self, datetime: datetime, location_name: str, location_code: str, temp: Optional[int] = None, feels_like: Optional[int] = None, humidity: Optional[int] = None, wind_speed: str = "", wind_direction: str = "", precipitation_prob: Optional[int] = None, weather_condition: str = "", weather_icon: str = "" ) -> Weather: """ 날씨 정보를 생성하거나 업데이트합니다. Args: datetime: 날씨 데이터 시각 location_name: 지역명 location_code: 지역코드 temp: 기온 feels_like: 체감온도 humidity: 습도 wind_speed: 풍속 wind_direction: 풍향 precipitation_prob: 강수확률 weather_condition: 날씨 상태 weather_icon: 날씨 아이콘 Returns: Weather 객체 """ try: # 기존 데이터 확인 existing = self.db.fetch_one( """ SELECT id, created_at, updated_at FROM weather WHERE datetime = ? AND location_code = ? """, (datetime.isoformat(), location_code) ) now = datetime.now() if existing: # 업데이트 self.db.execute( """ UPDATE weather SET location_name = ?, temp = ?, feels_like = ?, humidity = ?, wind_speed = ?, wind_direction = ?, precipitation_prob = ?, weather_condition = ?, weather_icon = ?, updated_at = ? WHERE id = ? """, ( location_name, temp, feels_like, humidity, wind_speed, wind_direction, precipitation_prob, weather_condition, weather_icon, now.isoformat(), existing['id'] ) ) weather_id = existing['id'] created_at = existing['created_at'] updated_at = now.isoformat() else: # 생성 result = self.db.execute( """ INSERT INTO weather ( datetime, location_name, location_code, temp, feels_like, humidity, wind_speed, wind_direction, precipitation_prob, weather_condition, weather_icon, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( datetime.isoformat(), location_name, location_code, temp, feels_like, humidity, wind_speed, wind_direction, precipitation_prob, weather_condition, weather_icon, now.isoformat(), now.isoformat() ) ) weather_id = result.lastrowid if result else None created_at = now.isoformat() updated_at = now.isoformat() return Weather( id=weather_id, created_at=created_at, updated_at=updated_at, datetime=datetime, location_name=location_name, location_code=location_code, temp=temp, feels_like=feels_like, humidity=humidity, wind_speed=wind_speed, wind_direction=wind_direction, precipitation_prob=precipitation_prob, weather_condition=weather_condition, weather_icon=weather_icon ) except Exception as e: logger.error(f"날씨 데이터 upsert 실패: {e}") raise DatabaseQueryError(f"Failed to upsert weather data: {e}") def get_weather_by_datetime_range( self, start_datetime: datetime, end_datetime: datetime, location_code: str = "" ) -> List[Weather]: """ 지정된 시간 범위의 날씨 데이터를 조회합니다. Args: start_datetime: 시작 시각 end_datetime: 종료 시각 location_code: 지역코드 (선택사항) Returns: Weather 객체 리스트 """ try: query = """ SELECT * FROM weather WHERE datetime >= ? AND datetime <= ? """ params = [start_datetime.isoformat(), end_datetime.isoformat()] if location_code: query += " AND location_code = ?" params.append(location_code) query += " ORDER BY datetime ASC" rows = self.db.fetch_all(query, tuple(params)) return [self._row_to_weather(row) for row in rows] except Exception as e: logger.error(f"날씨 데이터 조회 실패: {e}") raise DatabaseQueryError(f"Failed to get weather data: {e}") def get_weather_for_shift( self, shift_type: str, current_date: date, location_code: str = "" ) -> List[Weather]: """ 근무 형태에 따른 날씨 데이터를 조회합니다. Args: shift_type: 근무 유형 ("주간" 또는 "야간") current_date: 현재 날짜 location_code: 지역코드 (선택사항) Returns: Weather 객체 리스트 """ from datetime import timedelta if shift_type == "주간": # 주간: 09:00 ~ 18:00 start_datetime = datetime.combine(current_date, time(9, 0)) end_datetime = datetime.combine(current_date, time(18, 0)) elif shift_type == "야간": # 야간: 당일 18:00 ~ 다음날 09:00 start_datetime = datetime.combine(current_date, time(18, 0)) end_datetime = datetime.combine(current_date + timedelta(days=1), time(9, 0)) else: # 기본값: 주간 start_datetime = datetime.combine(current_date, time(9, 0)) end_datetime = datetime.combine(current_date, time(18, 0)) return self.get_weather_by_datetime_range(start_datetime, end_datetime, location_code) def get_weather_stats_for_shift( self, shift_type: str, current_date: date, location_code: str = "" ) -> Dict[str, Any]: """ 근무 시간 동안의 날씨 통계를 반환합니다. Args: shift_type: 근무 유형 ("주간" 또는 "야간") current_date: 현재 날짜 location_code: 지역코드 (선택사항) Returns: 통계 데이터 딕셔너리 """ weather_data = self.get_weather_for_shift(shift_type, current_date, location_code) if not weather_data: return { "temp_min": None, "temp_max": None, "feels_like_min": None, "feels_like_max": None, "avg_temp": None, "avg_feels_like": None, "max_precipitation_prob": None, "data_points": 0 } temps = [w.temp for w in weather_data if w.temp is not None] feels_likes = [w.feels_like for w in weather_data if w.feels_like is not None] precip_probs = [w.precipitation_prob for w in weather_data if w.precipitation_prob is not None] return { "temp_min": min(temps) if temps else None, "temp_max": max(temps) if temps else None, "feels_like_min": min(feels_likes) if feels_likes else None, "feels_like_max": max(feels_likes) if feels_likes else None, "avg_temp": round(sum(temps) / len(temps)) if temps else None, "avg_feels_like": round(sum(feels_likes) / len(feels_likes)) if feels_likes else None, "max_precipitation_prob": max(precip_probs) if precip_probs else None, "data_points": len(weather_data) } def cleanup_old_weather_data(self, days_to_keep: int = 7): """ 오래된 날씨 데이터를 삭제합니다. Args: days_to_keep: 보관할 일수 (기본값: 7일) """ from datetime import timedelta try: cutoff_date = datetime.now() - timedelta(days=days_to_keep) result = self.db.execute( "DELETE FROM weather WHERE datetime < ?", (cutoff_date.isoformat(),) ) deleted_count = result.rowcount if result else 0 logger.info(f"오래된 날씨 데이터 {deleted_count}개 삭제됨 (보관 기간: {days_to_keep}일)") except Exception as e: logger.error(f"날씨 데이터 정리 실패: {e}") raise DatabaseQueryError(f"Failed to cleanup weather data: {e}") def _row_to_weather(self, row: Dict[str, Any]) -> Weather: """데이터베이스 행을 Weather 객체로 변환""" return Weather( id=row.get('id'), created_at=row.get('created_at'), updated_at=row.get('updated_at'), datetime=datetime.fromisoformat(row['datetime']) if row.get('datetime') else None, location_name=row.get('location_name', ''), location_code=row.get('location_code', ''), temp=row.get('temp'), feels_like=row.get('feels_like'), humidity=row.get('humidity'), wind_speed=row.get('wind_speed', ''), wind_direction=row.get('wind_direction', ''), precipitation_prob=row.get('precipitation_prob'), weather_condition=row.get('weather_condition', ''), weather_icon=row.get('weather_icon', '') ) # ======================================================================== # 통계 메서드 # ======================================================================== def get_fault_statistics( self, start_date: date = None, end_date: date = None ) -> Dict[str, Any]: """ 고장 통계를 반환합니다. Args: start_date: 시작 날짜 end_date: 종료 날짜 Returns: 통계 데이터 """ base_query = "FROM faults WHERE 1=1" params = [] if start_date: base_query += " AND occurrence_date >= ?" params.append(start_date.isoformat()) if end_date: base_query += " AND occurrence_date <= ?" params.append(end_date.isoformat()) # 총 건수 total = self.db.fetch_one(f"SELECT COUNT(*) as count {base_query}", tuple(params)) # 장치별 통계 by_device = self.db.fetch_all( f""" SELECT device_category, COUNT(*) as count {base_query} GROUP BY device_category ORDER BY count DESC """, tuple(params) ) # 편성별 통계 by_train = self.db.fetch_all( f""" SELECT train_number, COUNT(*) as count {base_query} GROUP BY train_number ORDER BY count DESC LIMIT 10 """, tuple(params) ) return { "total": total['count'] if total else 0, "by_device": by_device, "by_train": by_train, } # ============================================================================ # 모듈 레벨 편의 함수 # ============================================================================ def get_crud() -> CRUDManager: """ CRUD 관리자 인스턴스를 반환합니다. Returns: CRUDManager 인스턴스 """ return CRUDManager()