handOver2/database/crud.py

1964 lines
64 KiB
Python

# -*- coding: utf-8 -*-
"""
CRUD 연산 모듈
데이터베이스의 Create, Read, Update, Delete 연산을 제공합니다.
이 모듈은 모든 데이터베이스 입출력을 담당하며,
다른 모듈들은 이 모듈의 메서드를 통해서만 데이터베이스에 접근합니다.
"""
from datetime import datetime, date, time
from typing import Optional, List, Dict, Any, Type, TypeVar
from dataclasses import dataclass
from .db_manager import DatabaseManager, get_db
from .common_db_manager import CommonDatabaseManager
from .models import (
BaseModel,
User,
Team,
Instruction,
Fault,
Work,
Misc,
DailyInspection,
Todo,
Memo,
Setting,
TeamMember,
DutySchedule,
TrainSchedule,
Weather,
MODEL_REGISTRY,
)
from core.logger import get_logger
from core.exceptions import (
RecordNotFoundError,
DuplicateRecordError,
DatabaseQueryError,
)
from core.signals import get_signals
# 로거 설정
logger = get_logger(__name__)
# 타입 변수 정의
T = TypeVar('T', bound=BaseModel)
class CRUDManager:
"""
CRUD 연산 관리자 클래스
모든 데이터베이스 CRUD 연산을 제공합니다.
각 테이블별로 특화된 메서드와 공통 메서드를 제공합니다.
Attributes:
db: 데이터베이스 관리자 인스턴스
signals: 전역 시그널 인스턴스
Examples:
>>> crud = CRUDManager()
>>> instruction = crud.create_instruction(
... created_date=date.today(),
... created_team="A팀",
... instructor="팀장",
... instruction_content="지시 내용"
... )
>>> print(instruction.id)
"""
_instance: Optional['CRUDManager'] = None
def __new__(cls):
"""싱글톤 패턴 구현"""
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
"""CRUD 관리자 초기화"""
if self._initialized:
return
self.db = get_db()
self.signals = get_signals()
self._initialized = True
logger.info("CRUD 관리자 초기화 완료")
# ========================================================================
# 공통 CRUD 메서드
# ========================================================================
def _create(
self,
table_name: str,
model_class: Type[T],
data: Dict[str, Any]
) -> T:
"""
레코드를 생성합니다.
Args:
table_name: 테이블 이름
model_class: 모델 클래스
data: 생성할 데이터
Returns:
생성된 모델 인스턴스
"""
# id, created_at, updated_at 제외
data = {k: v for k, v in data.items()
if k not in ['id', 'created_at', 'updated_at'] and v is not None}
# 컬럼과 값 분리
columns = list(data.keys())
values = list(data.values())
placeholders = ','.join(['?' for _ in columns])
# SQL 예약어인 컬럼명은 따옴표로 감싸기
quoted_columns = [f'"{col}"' if col.lower() in ['order', 'group', 'select', 'table'] else col
for col in columns]
query = f"""
INSERT INTO {table_name} ({','.join(quoted_columns)})
VALUES ({placeholders})
"""
try:
cursor = self.db.execute(query, tuple(values))
record_id = cursor.lastrowid
# 생성된 레코드 조회
result = self._get_by_id(table_name, model_class, record_id)
# 시그널 발생
self.signals.record_created.emit(table_name, record_id)
self.signals.data_changed.emit(table_name)
logger.info(f"레코드 생성: {table_name} ID={record_id}")
return result
except Exception as e:
logger.error(f"레코드 생성 실패: {table_name} - {e}")
raise DatabaseQueryError(f"레코드 생성 실패: {e}")
def _get_by_id(
self,
table_name: str,
model_class: Type[T],
record_id: int
) -> Optional[T]:
"""
ID로 레코드를 조회합니다.
Args:
table_name: 테이블 이름
model_class: 모델 클래스
record_id: 레코드 ID
Returns:
모델 인스턴스 또는 None
"""
query = f"SELECT * FROM {table_name} WHERE id = ?"
row = self.db.fetch_one(query, (record_id,))
if row:
return model_class(**row)
return None
def _get_all(
self,
table_name: str,
model_class: Type[T],
order_by: str = "id DESC",
limit: int = None,
offset: int = None,
**filters
) -> List[T]:
"""
모든 레코드를 조회합니다.
Args:
table_name: 테이블 이름
model_class: 모델 클래스
order_by: 정렬 기준
limit: 최대 개수
offset: 시작 위치
**filters: 필터 조건
Returns:
모델 인스턴스 리스트
"""
query = f"SELECT * FROM {table_name}"
params = []
# WHERE 절 추가
# 특수 파라미터는 필터에서 제외 (include_completed, include_continuous 등)
special_params = {'include_completed', 'include_continuous'}
if filters:
conditions = []
for key, value in filters.items():
# 특수 파라미터는 제외
if key in special_params:
continue
if value is not None:
# SQL 예약어인 컬럼명은 따옴표로 감싸기
quoted_key = f'"{key}"' if key.lower() in ['order', 'group', 'select', 'table'] else key
conditions.append(f"{quoted_key} = ?")
params.append(value)
if conditions:
query += " WHERE " + " AND ".join(conditions)
# ORDER BY 절 추가
if order_by:
query += f" ORDER BY {order_by}"
# LIMIT, OFFSET 추가
if limit:
query += f" LIMIT {limit}"
if offset:
query += f" OFFSET {offset}"
rows = self.db.fetch_all(query, tuple(params) if params else None)
return [model_class(**row) for row in rows]
def _update(
self,
table_name: str,
model_class: Type[T],
record_id: int,
data: Dict[str, Any]
) -> Optional[T]:
"""
레코드를 업데이트합니다.
Args:
table_name: 테이블 이름
model_class: 모델 클래스
record_id: 레코드 ID
data: 업데이트할 데이터
Returns:
업데이트된 모델 인스턴스
"""
# id, created_at 제외하고 updated_at 자동 설정
data = {k: v for k, v in data.items()
if k not in ['id', 'created_at']}
data['updated_at'] = datetime.now().isoformat()
# SET 절 생성 (SQL 예약어인 컬럼명은 따옴표로 감싸기)
set_clause = ', '.join([
f'"{k}" = ?' if k.lower() in ['order', 'group', 'select', 'table'] else f"{k} = ?"
for k in data.keys()
])
values = list(data.values()) + [record_id]
query = f"""
UPDATE {table_name}
SET {set_clause}
WHERE id = ?
"""
try:
self.db.execute(query, tuple(values))
# 업데이트된 레코드 조회
result = self._get_by_id(table_name, model_class, record_id)
# 시그널 발생
self.signals.record_updated.emit(table_name, record_id)
self.signals.data_changed.emit(table_name)
logger.info(f"레코드 업데이트: {table_name} ID={record_id}")
return result
except Exception as e:
logger.error(f"레코드 업데이트 실패: {table_name} ID={record_id} - {e}")
raise DatabaseQueryError(f"레코드 업데이트 실패: {e}")
def _delete(self, table_name: str, record_id: int) -> bool:
"""
레코드를 삭제합니다.
Args:
table_name: 테이블 이름
record_id: 레코드 ID
Returns:
삭제 성공 여부
"""
query = f"DELETE FROM {table_name} WHERE id = ?"
try:
cursor = self.db.execute(query, (record_id,))
if cursor.rowcount > 0:
# 시그널 발생
self.signals.record_deleted.emit(table_name, record_id)
self.signals.data_changed.emit(table_name)
logger.info(f"레코드 삭제: {table_name} ID={record_id}")
return True
return False
except Exception as e:
logger.error(f"레코드 삭제 실패: {table_name} ID={record_id} - {e}")
raise DatabaseQueryError(f"레코드 삭제 실패: {e}")
# ========================================================================
# 공통 데이터베이스용 헬퍼 메서드
# ========================================================================
def _create_with_db(
self,
db_manager,
table_name: str,
model_class: Type[T],
data: Dict[str, Any]
) -> T:
"""
공통 데이터베이스에 레코드를 생성합니다.
Args:
db_manager: 데이터베이스 관리자 (CommonDatabaseManager)
table_name: 테이블 이름
model_class: 모델 클래스
data: 생성할 데이터
Returns:
생성된 모델 인스턴스
"""
# id, created_at, updated_at 제외
data = {k: v for k, v in data.items()
if k not in ['id', 'created_at', 'updated_at'] and v is not None}
if not data:
raise ValueError("생성할 데이터가 없습니다.")
# 컬럼명과 값 분리
columns = list(data.keys())
placeholders = ', '.join(['?' for _ in columns])
values = [data[col] for col in columns]
query = f"""
INSERT INTO {table_name} ({', '.join(columns)})
VALUES ({placeholders})
"""
try:
cursor = db_manager.execute(query, tuple(values))
record_id = cursor.lastrowid
# 생성된 레코드 조회
result = self._get_by_id_with_db(db_manager, table_name, model_class, record_id)
# 시그널 발생
self.signals.record_created.emit(table_name, record_id)
self.signals.data_changed.emit(table_name)
logger.info(f"레코드 생성: {table_name} ID={record_id}")
return result
except Exception as e:
logger.error(f"레코드 생성 실패: {table_name} - {e}")
raise DatabaseQueryError(f"레코드 생성 실패: {e}")
def _get_by_id_with_db(
self,
db_manager,
table_name: str,
model_class: Type[T],
record_id: int
) -> Optional[T]:
"""
공통 데이터베이스에서 ID로 레코드를 조회합니다.
Args:
db_manager: 데이터베이스 관리자 (CommonDatabaseManager)
table_name: 테이블 이름
model_class: 모델 클래스
record_id: 레코드 ID
Returns:
모델 인스턴스 또는 None
"""
query = f"SELECT * FROM {table_name} WHERE id = ?"
row = db_manager.fetch_one(query, (record_id,))
if row:
return model_class(**row)
return None
def _update_with_db(
self,
db_manager,
table_name: str,
model_class: Type[T],
record_id: int,
data: Dict[str, Any]
) -> Optional[T]:
"""
공통 데이터베이스의 레코드를 업데이트합니다.
Args:
db_manager: 데이터베이스 관리자 (CommonDatabaseManager)
table_name: 테이블 이름
model_class: 모델 클래스
record_id: 레코드 ID
data: 업데이트할 데이터
Returns:
업데이트된 모델 인스턴스
"""
# id, created_at 제외하고 updated_at 자동 설정
data = {k: v for k, v in data.items()
if k not in ['id', 'created_at']}
data['updated_at'] = datetime.now().isoformat()
# SET 절 생성 (SQL 예약어인 컬럼명은 따옴표로 감싸기)
set_clause = ', '.join([
f'"{k}" = ?' if k.lower() in ['order', 'group', 'select', 'table'] else f"{k} = ?"
for k in data.keys()
])
values = list(data.values()) + [record_id]
query = f"""
UPDATE {table_name}
SET {set_clause}
WHERE id = ?
"""
try:
db_manager.execute(query, tuple(values))
# 업데이트된 레코드 조회
result = self._get_by_id_with_db(db_manager, table_name, model_class, record_id)
# 시그널 발생
self.signals.record_updated.emit(table_name, record_id)
self.signals.data_changed.emit(table_name)
logger.info(f"레코드 업데이트: {table_name} ID={record_id}")
return result
except Exception as e:
logger.error(f"레코드 업데이트 실패: {table_name} ID={record_id} - {e}")
raise DatabaseQueryError(f"레코드 업데이트 실패: {e}")
def _delete_with_db(
self,
db_manager,
table_name: str,
record_id: int
) -> bool:
"""
공통 데이터베이스의 레코드를 삭제합니다.
Args:
db_manager: 데이터베이스 관리자 (CommonDatabaseManager)
table_name: 테이블 이름
record_id: 레코드 ID
Returns:
삭제 성공 여부
"""
query = f"DELETE FROM {table_name} WHERE id = ?"
try:
cursor = db_manager.execute(query, (record_id,))
if cursor.rowcount > 0:
# 시그널 발생
self.signals.record_deleted.emit(table_name, record_id)
self.signals.data_changed.emit(table_name)
logger.info(f"레코드 삭제: {table_name} ID={record_id}")
return True
return False
except Exception as e:
logger.error(f"레코드 삭제 실패: {table_name} ID={record_id} - {e}")
raise DatabaseQueryError(f"레코드 삭제 실패: {e}")
def _search(
self,
table_name: str,
model_class: Type[T],
search_fields: List[str],
search_text: str,
**filters
) -> List[T]:
"""
텍스트 검색을 수행합니다.
Args:
table_name: 테이블 이름
model_class: 모델 클래스
search_fields: 검색할 필드 목록
search_text: 검색어
**filters: 추가 필터 조건
Returns:
모델 인스턴스 리스트
"""
conditions = []
params = []
# 검색 조건
search_conditions = [f"{field} LIKE ?" for field in search_fields]
conditions.append(f"({' OR '.join(search_conditions)})")
params.extend([f"%{search_text}%" for _ in search_fields])
# 필터 조건
for key, value in filters.items():
if value is not None:
conditions.append(f"{key} = ?")
params.append(value)
query = f"""
SELECT * FROM {table_name}
WHERE {' AND '.join(conditions)}
ORDER BY id DESC
"""
rows = self.db.fetch_all(query, tuple(params))
return [model_class(**row) for row in rows]
# ========================================================================
# User CRUD
# ========================================================================
def create_user(self, **data) -> User:
"""사용자 생성"""
return self._create("users", User, data)
def get_user(self, user_id: int) -> Optional[User]:
"""ID로 사용자 조회"""
return self._get_by_id("users", User, user_id)
def get_user_by_username(self, username: str) -> Optional[User]:
"""사용자명으로 사용자 조회"""
query = "SELECT * FROM users WHERE username = ?"
row = self.db.fetch_one(query, (username,))
return User(**row) if row else None
def get_all_users(self, **filters) -> List[User]:
"""모든 사용자 조회"""
return self._get_all("users", User, **filters)
def update_user(self, user_id: int, **data) -> Optional[User]:
"""사용자 업데이트"""
return self._update("users", User, user_id, data)
def delete_user(self, user_id: int) -> bool:
"""사용자 삭제"""
return self._delete("users", user_id)
# ========================================================================
# Instruction CRUD
# ========================================================================
def create_instruction(self, **data) -> Instruction:
"""지시 생성"""
return self._create("instructions", Instruction, data)
def get_instruction(self, instruction_id: int) -> Optional[Instruction]:
"""ID로 지시 조회"""
return self._get_by_id("instructions", Instruction, instruction_id)
def get_all_instructions(
self,
order_by: str = "created_date DESC, id DESC",
**filters
) -> List[Instruction]:
"""모든 지시 조회"""
return self._get_all("instructions", Instruction, order_by=order_by, **filters)
def get_instructions_by_date(
self,
target_date: date,
include_continuous: bool = True
) -> List[Instruction]:
"""날짜별 지시 조회 (지속 지시 포함 가능)"""
if include_continuous:
query = """
SELECT * FROM instructions
WHERE created_date = ? OR (is_continuous = 1 AND is_completed = 0)
ORDER BY created_date DESC, id DESC
"""
else:
query = """
SELECT * FROM instructions
WHERE created_date = ?
ORDER BY id DESC
"""
rows = self.db.fetch_all(query, (target_date.isoformat(),))
return [Instruction(**row) for row in rows]
def update_instruction(self, instruction_id: int, **data) -> Optional[Instruction]:
"""지시 업데이트"""
return self._update("instructions", Instruction, instruction_id, data)
def delete_instruction(self, instruction_id: int) -> bool:
"""지시 삭제"""
return self._delete("instructions", instruction_id)
# ========================================================================
# Fault CRUD
# ========================================================================
def create_fault(self, **data) -> Fault:
"""고장 생성"""
return self._create("faults", Fault, data)
def get_fault(self, fault_id: int) -> Optional[Fault]:
"""ID로 고장 조회"""
return self._get_by_id("faults", Fault, fault_id)
def get_all_faults(
self,
order_by: str = "occurrence_date DESC, id DESC",
**filters
) -> List[Fault]:
"""모든 고장 조회"""
return self._get_all("faults", Fault, order_by=order_by, **filters)
def get_faults_by_train(
self,
train_number: str,
limit: int = 10,
months_back: int = 3
) -> List[Fault]:
"""
편성번호별 고장 조회
편성 필드 위에 마우스를 가져갈 때 사용됩니다.
Args:
train_number: 편성번호
limit: 최대 개수
months_back: 몇 달 전까지 조회할지 (기본 3달)
Returns:
고장 리스트
"""
from datetime import timedelta
# 최근 N달 이내 데이터만 조회
end_date = date.today()
start_date = end_date - timedelta(days=months_back * 30)
query = """
SELECT * FROM faults
WHERE train_number = ?
AND occurrence_date >= ?
AND occurrence_date <= ?
ORDER BY occurrence_date DESC, id DESC
LIMIT ?
"""
rows = self.db.fetch_all(query, (
train_number,
start_date.isoformat(),
end_date.isoformat(),
limit
))
return [Fault(**row) for row in rows]
def get_faults_by_device(
self,
device_category: str,
limit: int = 10,
months_back: int = 3
) -> List[Fault]:
"""
장치분류별 고장 조회
장치분류 필드 위에 마우스를 가져갈 때 사용됩니다.
Args:
device_category: 장치분류
limit: 최대 개수
months_back: 몇 달 전까지 조회할지 (기본 3달)
Returns:
고장 리스트
"""
from datetime import timedelta
# 최근 N달 이내 데이터만 조회
end_date = date.today()
start_date = end_date - timedelta(days=months_back * 30)
query = """
SELECT * FROM faults
WHERE device_category = ?
AND occurrence_date >= ?
AND occurrence_date <= ?
ORDER BY occurrence_date DESC, id DESC
LIMIT ?
"""
rows = self.db.fetch_all(query, (
device_category,
start_date.isoformat(),
end_date.isoformat(),
limit
))
return [Fault(**row) for row in rows]
def get_faults_by_code(
self,
fault_code: str,
limit: int = 10,
months_back: int = 3
) -> List[Fault]:
"""
고장코드별 고장 조회
고장코드 필드 위에 마우스를 가져갈 때 사용됩니다.
Args:
fault_code: 고장코드
limit: 최대 개수
months_back: 몇 달 전까지 조회할지 (기본 3달)
Returns:
고장 리스트
"""
from datetime import timedelta
# 최근 N달 이내 데이터만 조회
end_date = date.today()
start_date = end_date - timedelta(days=months_back * 30)
query = """
SELECT * FROM faults
WHERE fault_code = ?
AND occurrence_date >= ?
AND occurrence_date <= ?
ORDER BY occurrence_date DESC, id DESC
LIMIT ?
"""
rows = self.db.fetch_all(query, (
fault_code,
start_date.isoformat(),
end_date.isoformat(),
limit
))
return [Fault(**row) for row in rows]
def get_faults_by_date_range(
self,
start_date: date,
end_date: date,
team: str = None
) -> List[Fault]:
"""
날짜 범위로 고장 조회
Args:
start_date: 시작 날짜
end_date: 종료 날짜
team: 팀 필터 (선택사항)
Returns:
고장 리스트
"""
query = """
SELECT * FROM faults
WHERE occurrence_date BETWEEN ? AND ?
"""
params = [start_date.isoformat(), end_date.isoformat()]
if team:
query += " AND created_team = ?"
params.append(team)
query += " ORDER BY occurrence_date DESC, id DESC"
rows = self.db.fetch_all(query, tuple(params))
return [Fault(**row) for row in rows]
def search_faults(self, search_text: str) -> List[Fault]:
"""고장 검색"""
return self._search(
"faults",
Fault,
["train_number", "fault_content", "action_content", "fault_code"],
search_text
)
def update_fault(self, fault_id: int, **data) -> Optional[Fault]:
"""고장 업데이트"""
return self._update("faults", Fault, fault_id, data)
def delete_fault(self, fault_id: int) -> bool:
"""고장 삭제"""
return self._delete("faults", fault_id)
# ========================================================================
# Work CRUD
# ========================================================================
def create_work(self, **data) -> Work:
"""작업 생성"""
return self._create("works", Work, data)
def get_work(self, work_id: int) -> Optional[Work]:
"""ID로 작업 조회"""
return self._get_by_id("works", Work, work_id)
def get_all_works(
self,
order_by: str = "work_date DESC, id DESC",
**filters
) -> List[Work]:
"""모든 작업 조회"""
return self._get_all("works", Work, order_by=order_by, **filters)
def get_works_by_train(self, train_number: str) -> List[Work]:
"""편성번호별 작업 조회"""
return self._get_all("works", Work, target_train=train_number)
def get_works_by_date(self, target_date: date) -> List[Work]:
"""날짜별 작업 조회"""
return self._get_all("works", Work, work_date=target_date.isoformat())
def get_works_by_date_range(
self,
start_date: date,
end_date: date,
team: str = None
) -> List[Work]:
"""
날짜 범위로 작업 조회
Args:
start_date: 시작 날짜
end_date: 종료 날짜
team: 팀 필터 (선택사항)
Returns:
작업 리스트
"""
query = """
SELECT * FROM works
WHERE work_date BETWEEN ? AND ?
"""
params = [start_date.isoformat(), end_date.isoformat()]
if team:
query += " AND created_team = ?"
params.append(team)
query += " ORDER BY work_date DESC, id DESC"
rows = self.db.fetch_all(query, tuple(params))
return [Work(**row) for row in rows]
def check_train_has_work(self, train_number: str, target_date: date) -> bool:
"""
해당 편성에 작업이 있는지 확인
일상검수 편성 표시에 사용됩니다.
Args:
train_number: 편성번호
target_date: 날짜
Returns:
작업 존재 여부
"""
query = """
SELECT COUNT(*) as count FROM works
WHERE target_train = ? AND work_date = ? AND is_completed = 0
"""
result = self.db.fetch_one(query, (train_number, target_date.isoformat()))
return result['count'] > 0 if result else False
def update_work(self, work_id: int, **data) -> Optional[Work]:
"""작업 업데이트"""
return self._update("works", Work, work_id, data)
def delete_work(self, work_id: int) -> bool:
"""작업 삭제"""
return self._delete("works", work_id)
# ========================================================================
# Misc CRUD
# ========================================================================
def create_misc(self, **data) -> Misc:
"""기타 생성"""
return self._create("miscs", Misc, data)
def get_misc(self, misc_id: int) -> Optional[Misc]:
"""ID로 기타 조회"""
return self._get_by_id("miscs", Misc, misc_id)
def get_all_miscs(
self,
order_by: str = "created_date DESC, id DESC",
**filters
) -> List[Misc]:
"""모든 기타 조회"""
return self._get_all("miscs", Misc, order_by=order_by, **filters)
def get_miscs_by_date_range(
self,
start_date: date,
end_date: date,
team: str = None
) -> List[Misc]:
"""
날짜 범위로 기타 조회
Args:
start_date: 시작 날짜
end_date: 종료 날짜
team: 팀 필터 (선택사항)
Returns:
기타 리스트
"""
query = """
SELECT * FROM miscs
WHERE created_date BETWEEN ? AND ?
"""
params = [start_date.isoformat(), end_date.isoformat()]
if team:
query += " AND created_team = ?"
params.append(team)
query += " ORDER BY created_date DESC, id DESC"
rows = self.db.fetch_all(query, tuple(params))
return [Misc(**row) for row in rows]
def update_misc(self, misc_id: int, **data) -> Optional[Misc]:
"""기타 업데이트"""
return self._update("miscs", Misc, misc_id, data)
def delete_misc(self, misc_id: int) -> bool:
"""기타 삭제"""
return self._delete("miscs", misc_id)
# ========================================================================
# DailyInspection CRUD
# ========================================================================
def create_daily_inspection(self, **data) -> DailyInspection:
"""일상검수 생성"""
return self._create("daily_inspections", DailyInspection, data)
def get_daily_inspection(self, inspection_id: int) -> Optional[DailyInspection]:
"""ID로 일상검수 조회"""
return self._get_by_id("daily_inspections", DailyInspection, inspection_id)
def get_daily_inspections_by_date(
self,
inspection_date: date,
shift_type: str = None
) -> List[DailyInspection]:
"""날짜별 일상검수 조회"""
filters = {"inspection_date": inspection_date.isoformat()}
if shift_type:
filters["shift_type"] = shift_type
return self._get_all(
"daily_inspections",
DailyInspection,
order_by="slot_number ASC",
**filters
)
def upsert_daily_inspection(
self,
inspection_date: date,
shift_type: str,
slot_number: int,
train_number: str,
cleaning_type: str = "없음",
has_work: bool = False,
created_by: int = None
) -> DailyInspection:
"""
일상검수 생성 또는 업데이트 (UPSERT)
Args:
inspection_date: 검수일자
shift_type: 근무유형
slot_number: 슬롯번호
train_number: 편성번호
cleaning_type: 청소유형
has_work: 작업여부
created_by: 생성자 ID
Returns:
생성/업데이트된 일상검수
"""
# 기존 레코드 확인
query = """
SELECT * FROM daily_inspections
WHERE inspection_date = ? AND shift_type = ? AND slot_number = ?
"""
row = self.db.fetch_one(
query,
(inspection_date.isoformat(), shift_type, slot_number)
)
data = {
"inspection_date": inspection_date.isoformat(),
"shift_type": shift_type,
"slot_number": slot_number,
"train_number": train_number,
"cleaning_type": cleaning_type,
"has_work": has_work,
"created_by": created_by,
}
if row:
# 업데이트
return self._update("daily_inspections", DailyInspection, row['id'], data)
else:
# 생성
return self._create("daily_inspections", DailyInspection, data)
def update_daily_inspection(self, inspection_id: int, **data) -> Optional[DailyInspection]:
"""일상검수 업데이트"""
return self._update("daily_inspections", DailyInspection, inspection_id, data)
def delete_daily_inspection(self, inspection_id: int) -> bool:
"""일상검수 삭제"""
return self._delete("daily_inspections", inspection_id)
# ========================================================================
# Todo CRUD
# ========================================================================
def create_todo(self, **data) -> Todo:
"""할일 생성"""
result = self._create("todos", Todo, data)
self.signals.todo_added.emit(result.id)
return result
def get_todo(self, todo_id: int) -> Optional[Todo]:
"""ID로 할일 조회"""
return self._get_by_id("todos", Todo, todo_id)
def get_todos_by_date(
self,
todo_date: date,
include_incomplete: bool = True
) -> List[Todo]:
"""날짜별 할일 조회"""
if include_incomplete:
query = """
SELECT * FROM todos
WHERE todo_date = ? OR (todo_date < ? AND is_completed = 0)
ORDER BY is_completed ASC, todo_date DESC, id DESC
"""
rows = self.db.fetch_all(
query,
(todo_date.isoformat(), todo_date.isoformat())
)
else:
query = """
SELECT * FROM todos
WHERE todo_date = ?
ORDER BY is_completed ASC, id DESC
"""
rows = self.db.fetch_all(query, (todo_date.isoformat(),))
return [Todo(**row) for row in rows]
def update_todo(self, todo_id: int, **data) -> Optional[Todo]:
"""할일 업데이트"""
result = self._update("todos", Todo, todo_id, data)
if result and 'is_completed' in data:
self.signals.todo_status_changed.emit(todo_id, data['is_completed'])
return result
def toggle_todo_complete(self, todo_id: int) -> Optional[Todo]:
"""할일 완료 상태 토글"""
todo = self.get_todo(todo_id)
if todo:
new_status = not todo.is_completed
completed_at = datetime.now().isoformat() if new_status else None
return self.update_todo(
todo_id,
is_completed=new_status,
completed_at=completed_at
)
return None
def delete_todo(self, todo_id: int) -> bool:
"""할일 삭제"""
return self._delete("todos", todo_id)
# ========================================================================
# Memo CRUD
# ========================================================================
def create_memo(self, **data) -> Memo:
"""메모 생성"""
return self._create("memos", Memo, data)
def get_memo(self, memo_id: int) -> Optional[Memo]:
"""ID로 메모 조회"""
return self._get_by_id("memos", Memo, memo_id)
def get_memos_by_date(self, memo_date: date) -> List[Memo]:
"""날짜별 메모 조회"""
return self._get_all("memos", Memo, memo_date=memo_date.isoformat())
def get_latest_memo(self, memo_date: date) -> Optional[Memo]:
"""최신 메모 조회"""
query = """
SELECT * FROM memos
WHERE memo_date = ?
ORDER BY updated_at DESC
LIMIT 1
"""
row = self.db.fetch_one(query, (memo_date.isoformat(),))
return Memo(**row) if row else None
def upsert_memo(
self,
memo_date: date,
content: str,
created_by: int = None
) -> Memo:
"""메모 생성 또는 업데이트"""
memo = self.get_latest_memo(memo_date)
if memo:
return self.update_memo(memo.id, content=content)
else:
return self.create_memo(
memo_date=memo_date.isoformat(),
content=content,
created_by=created_by
)
def update_memo(self, memo_id: int, **data) -> Optional[Memo]:
"""메모 업데이트"""
result = self._update("memos", Memo, memo_id, data)
if result:
self.signals.memo_changed.emit(memo_id)
return result
def delete_memo(self, memo_id: int) -> bool:
"""메모 삭제"""
return self._delete("memos", memo_id)
# ========================================================================
# Setting CRUD
# ========================================================================
def get_setting(self, key: str) -> Optional[str]:
"""설정 값 조회"""
query = "SELECT value FROM settings WHERE key = ?"
row = self.db.fetch_one(query, (key,))
return row['value'] if row else None
def set_setting(self, key: str, value: str) -> bool:
"""설정 값 저장"""
query = """
INSERT OR REPLACE INTO settings (key, value, updated_at)
VALUES (?, ?, ?)
"""
try:
self.db.execute(query, (key, value, datetime.now().isoformat()))
return True
except Exception as e:
logger.error(f"설정 저장 실패: {key} - {e}")
return False
def get_all_settings(self) -> Dict[str, str]:
"""모든 설정 조회"""
query = "SELECT key, value FROM settings"
rows = self.db.fetch_all(query)
return {row['key']: row['value'] for row in rows}
# ========================================================================
# TeamMember CRUD
# ========================================================================
def create_team_member(self, **data) -> TeamMember:
"""팀 인원 생성"""
return self._create("team_members", TeamMember, data)
def get_team_member(self, member_id: int) -> Optional[TeamMember]:
"""ID로 팀 인원 조회"""
return self._get_by_id("team_members", TeamMember, member_id)
def get_team_members_by_team(
self,
team: str,
position: str = None,
active_only: bool = True
) -> List[TeamMember]:
"""
팀별 인원 조회
Args:
team: 팀 (1팀, 2팀, 3팀, 4팀)
position: 직책 필터 (부팀장, 운용)
active_only: 활성화된 인원만
Returns:
팀 인원 리스트
"""
filters = {"team": team}
if position:
filters["position"] = position
if active_only:
filters["is_active"] = 1
return self._get_all(
"team_members",
TeamMember,
order_by='"order" ASC, id ASC',
**filters
)
def get_all_team_members(self, active_only: bool = True) -> List[TeamMember]:
"""모든 팀 인원 조회"""
if active_only:
return self._get_all("team_members", TeamMember, is_active=1)
return self._get_all("team_members", TeamMember)
def update_team_member(self, member_id: int, **data) -> Optional[TeamMember]:
"""팀 인원 업데이트"""
return self._update("team_members", TeamMember, member_id, data)
def delete_team_member(self, member_id: int) -> bool:
"""팀 인원 삭제 (비활성화)"""
return self._update("team_members", TeamMember, member_id, {"is_active": False}) is not None
def set_partner(self, member_id: int, partner_id: Optional[int] = None) -> bool:
"""짝궁 설정/해제"""
# 기존 파트너 해제
member = self.get_team_member(member_id)
if member and member.partner_id:
old_partner = self.get_team_member(member.partner_id)
if old_partner:
self._update("team_members", TeamMember, old_partner.id, {"partner_id": None})
# 새 파트너 설정
self._update("team_members", TeamMember, member_id, {"partner_id": partner_id})
if partner_id:
# 양방향 짝궁 설정
self._update("team_members", TeamMember, partner_id, {"partner_id": member_id})
return True
# ========================================================================
# DutySchedule CRUD
# ========================================================================
def create_duty_schedule(self, **data) -> DutySchedule:
"""당무 일정 생성"""
return self._create("duty_schedules", DutySchedule, data)
def get_duty_schedule(
self,
duty_date: date,
team: str,
shift_type: str
) -> Optional[DutySchedule]:
"""
당무 일정 조회
Args:
duty_date: 날짜
team: 팀
shift_type: 근무 유형
Returns:
당무 일정 또는 None
"""
query = """
SELECT * FROM duty_schedules
WHERE duty_date = ? AND team = ? AND shift_type = ?
"""
row = self.db.fetch_one(
query,
(duty_date.isoformat(), team, shift_type)
)
return DutySchedule(**row) if row else None
def upsert_duty_schedule(
self,
duty_date: date,
team: str,
shift_type: str,
vice_leader_id: int = None,
operator_id: int = None,
vice_leader_name: str = "",
operator_name: str = ""
) -> DutySchedule:
"""
당무 일정 생성 또는 업데이트
Args:
duty_date: 날짜
team: 팀
shift_type: 근무 유형
vice_leader_id: 부팀장 ID
operator_id: 운용 ID
vice_leader_name: 부팀장 이름
operator_name: 운용 이름
Returns:
생성/업데이트된 당무 일정
"""
existing = self.get_duty_schedule(duty_date, team, shift_type)
data = {
"duty_date": duty_date.isoformat(),
"team": team,
"shift_type": shift_type,
"vice_leader_id": vice_leader_id,
"operator_id": operator_id,
"vice_leader_name": vice_leader_name,
"operator_name": operator_name,
}
if existing:
return self._update("duty_schedules", DutySchedule, existing.id, data)
else:
return self._create("duty_schedules", DutySchedule, data)
def get_duty_schedules_by_date_range(
self,
start_date: date,
end_date: date,
team: str = None
) -> List[DutySchedule]:
"""날짜 범위로 당무 일정 조회"""
query = """
SELECT * FROM duty_schedules
WHERE duty_date BETWEEN ? AND ?
"""
params = [start_date.isoformat(), end_date.isoformat()]
if team:
query += " AND team = ?"
params.append(team)
query += " ORDER BY duty_date ASC"
rows = self.db.fetch_all(query, tuple(params))
return [DutySchedule(**row) for row in rows]
def get_next_duty_member(
self,
team: str,
position: str,
current_id: int = None
) -> Optional[TeamMember]:
"""
다음 당무자 조회 (순번 기준)
Args:
team: 팀
position: 직책
current_id: 현재 당무자 ID
Returns:
다음 당무자 또는 None
"""
members = self.get_team_members_by_team(team, position)
if not members:
return None
if current_id is None:
return members[0]
# 현재 당무자의 인덱스 찾기
current_idx = None
for idx, member in enumerate(members):
if member.id == current_id:
current_idx = idx
break
if current_idx is None:
return members[0]
# 다음 순번 (순환)
next_idx = (current_idx + 1) % len(members)
return members[next_idx]
# ========================================================================
# 팀 확인 관련 메서드
# ========================================================================
def update_team_confirmation(
self,
table_name: str,
record_id: int,
team: str,
confirmed: bool
) -> bool:
"""
팀 확인 상태를 업데이트합니다.
Args:
table_name: 테이블 이름 (instructions, faults, works, miscs)
record_id: 레코드 ID
team: 팀 이름
confirmed: 확인 여부
Returns:
업데이트 성공 여부
"""
model_class = MODEL_REGISTRY.get(table_name)
if not model_class:
return False
record = self._get_by_id(table_name, model_class, record_id)
if not record:
return False
# 팀 확인 상태 업데이트
record.set_team_confirmation(team, confirmed)
data = {
"team_confirmations": record.team_confirmations,
"is_completed": record.is_completed,
}
if record.is_completed and record.completed_at:
data["completed_at"] = record.completed_at.isoformat()
self._update(table_name, model_class, record_id, data)
return True
# ========================================================================
# 통계 메서드
# ========================================================================
# ========================================================================
# TrainSchedule CRUD
# ========================================================================
def create_train_schedule(self, **data) -> TrainSchedule:
"""열차 다이아 시각표 생성 (공통 데이터베이스 사용)"""
common_db = CommonDatabaseManager()
return self._create_with_db(common_db, "train_schedules", TrainSchedule, data)
def get_train_schedule(self, schedule_id: int) -> Optional[TrainSchedule]:
"""ID로 열차 다이아 시각표 조회 (공통 데이터베이스 사용)"""
common_db = CommonDatabaseManager()
return self._get_by_id_with_db(common_db, "train_schedules", TrainSchedule, schedule_id)
def get_train_schedule_by_column_station(
self,
column_number: str,
station: str,
is_weekday: bool = True
) -> Optional[TrainSchedule]:
"""
열번과 역명으로 시각표 조회 (공통 데이터베이스 사용)
Args:
column_number: 열번
station: 역명
is_weekday: 평일 여부
Returns:
TrainSchedule 또는 None
"""
common_db = CommonDatabaseManager()
query = """
SELECT * FROM train_schedules
WHERE column_number = ? AND station = ? AND is_weekday = ? AND is_active = 1
"""
row = common_db.fetch_one(query, (column_number, station, is_weekday))
return TrainSchedule(**row) if row else None
def get_schedules_by_column(
self,
column_number: str,
is_weekday: bool = True
) -> List[TrainSchedule]:
"""
열번으로 전체 경로 시각표 조회 (공통 데이터베이스 사용)
Args:
column_number: 열번
is_weekday: 평일 여부
Returns:
역 순서대로 정렬된 시각표 리스트
"""
common_db = CommonDatabaseManager()
query = """
SELECT * FROM train_schedules
WHERE column_number = ? AND is_weekday = ? AND is_active = 1
ORDER BY arrival_time ASC
"""
rows = common_db.fetch_all(query, (column_number, is_weekday))
return [TrainSchedule(**row) for row in rows]
def get_schedules_by_station(
self,
station: str,
is_weekday: bool = True
) -> List[TrainSchedule]:
"""
역명으로 해당 역 통과 열차 시각표 조회 (공통 데이터베이스 사용)
Args:
station: 역명
is_weekday: 평일 여부
Returns:
시간순 정렬된 시각표 리스트
"""
common_db = CommonDatabaseManager()
query = """
SELECT * FROM train_schedules
WHERE station = ? AND is_weekday = ? AND is_active = 1
ORDER BY arrival_time ASC
"""
rows = common_db.fetch_all(query, (station, is_weekday))
return [TrainSchedule(**row) for row in rows]
def upsert_train_schedule(
self,
column_number: str,
station: str,
arrival_time: time = None,
departure_time: time = None,
direction: str = "up",
is_weekday: bool = True
) -> TrainSchedule:
"""
열차 다이아 시각표 생성 또는 업데이트 (UPSERT, 공통 데이터베이스 사용)
Args:
column_number: 열번
station: 역명
arrival_time: 도착 시간
departure_time: 출발 시간
direction: 방향
is_weekday: 평일 여부
Returns:
생성/업데이트된 TrainSchedule
"""
existing = self.get_train_schedule_by_column_station(column_number, station, is_weekday)
common_db = CommonDatabaseManager()
data = {
"column_number": column_number,
"station": station,
"arrival_time": arrival_time.isoformat() if arrival_time else None,
"departure_time": departure_time.isoformat() if departure_time else None,
"direction": direction,
"is_weekday": is_weekday,
}
if existing:
return self._update_with_db(common_db, "train_schedules", TrainSchedule, existing.id, data)
else:
return self._create_with_db(common_db, "train_schedules", TrainSchedule, data)
def delete_train_schedule(self, schedule_id: int) -> bool:
"""열차 다이아 시각표 삭제 (공통 데이터베이스 사용)"""
common_db = CommonDatabaseManager()
return self._delete_with_db(common_db, "train_schedules", schedule_id)
def estimate_time_by_column_station(
self,
column_number: str,
station: str,
occurrence_date: date = None
) -> Optional[time]:
"""
열번과 역명으로 발생 시간 추정
Args:
column_number: 열번
station: 역명
occurrence_date: 발생일 (평일/주말 판단용)
Returns:
추정 시간 또는 None
"""
from datetime import datetime
# 평일/주말 판단
if occurrence_date:
is_weekday = occurrence_date.weekday() < 5 # 0~4: 월~금
else:
is_weekday = True
schedule = self.get_train_schedule_by_column_station(
column_number, station, is_weekday
)
if schedule:
return schedule.arrival_time or schedule.departure_time
return None
# ========================================================================
# 날씨 메서드
# ========================================================================
def upsert_weather(
self,
datetime: datetime,
location_name: str,
location_code: str,
temp: Optional[int] = None,
feels_like: Optional[int] = None,
humidity: Optional[int] = None,
wind_speed: str = "",
wind_direction: str = "",
precipitation_prob: Optional[int] = None,
weather_condition: str = "",
weather_icon: str = ""
) -> Weather:
"""
날씨 정보를 생성하거나 업데이트합니다.
Args:
datetime: 날씨 데이터 시각
location_name: 지역명
location_code: 지역코드
temp: 기온
feels_like: 체감온도
humidity: 습도
wind_speed: 풍속
wind_direction: 풍향
precipitation_prob: 강수확률
weather_condition: 날씨 상태
weather_icon: 날씨 아이콘
Returns:
Weather 객체
"""
try:
# 기존 데이터 확인
existing = self.db.fetch_one(
"""
SELECT id, created_at, updated_at FROM weather
WHERE datetime = ? AND location_code = ?
""",
(datetime.isoformat(), location_code)
)
now = datetime.now()
if existing:
# 업데이트
self.db.execute(
"""
UPDATE weather SET
location_name = ?, temp = ?, feels_like = ?, humidity = ?,
wind_speed = ?, wind_direction = ?, precipitation_prob = ?,
weather_condition = ?, weather_icon = ?, updated_at = ?
WHERE id = ?
""",
(
location_name, temp, feels_like, humidity,
wind_speed, wind_direction, precipitation_prob,
weather_condition, weather_icon, now.isoformat(),
existing['id']
)
)
weather_id = existing['id']
created_at = existing['created_at']
updated_at = now.isoformat()
else:
# 생성
result = self.db.execute(
"""
INSERT INTO weather (
datetime, location_name, location_code, temp, feels_like,
humidity, wind_speed, wind_direction, precipitation_prob,
weather_condition, weather_icon, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
datetime.isoformat(), location_name, location_code,
temp, feels_like, humidity, wind_speed, wind_direction,
precipitation_prob, weather_condition, weather_icon,
now.isoformat(), now.isoformat()
)
)
weather_id = result.lastrowid if result else None
created_at = now.isoformat()
updated_at = now.isoformat()
return Weather(
id=weather_id,
created_at=created_at,
updated_at=updated_at,
datetime=datetime,
location_name=location_name,
location_code=location_code,
temp=temp,
feels_like=feels_like,
humidity=humidity,
wind_speed=wind_speed,
wind_direction=wind_direction,
precipitation_prob=precipitation_prob,
weather_condition=weather_condition,
weather_icon=weather_icon
)
except Exception as e:
logger.error(f"날씨 데이터 upsert 실패: {e}")
raise DatabaseQueryError(f"Failed to upsert weather data: {e}")
def get_weather_by_datetime_range(
self,
start_datetime: datetime,
end_datetime: datetime,
location_code: str = ""
) -> List[Weather]:
"""
지정된 시간 범위의 날씨 데이터를 조회합니다.
Args:
start_datetime: 시작 시각
end_datetime: 종료 시각
location_code: 지역코드 (선택사항)
Returns:
Weather 객체 리스트
"""
try:
query = """
SELECT * FROM weather
WHERE datetime >= ? AND datetime <= ?
"""
params = [start_datetime.isoformat(), end_datetime.isoformat()]
if location_code:
query += " AND location_code = ?"
params.append(location_code)
query += " ORDER BY datetime ASC"
rows = self.db.fetch_all(query, tuple(params))
return [self._row_to_weather(row) for row in rows]
except Exception as e:
logger.error(f"날씨 데이터 조회 실패: {e}")
raise DatabaseQueryError(f"Failed to get weather data: {e}")
def get_weather_for_shift(
self,
shift_type: str,
current_date: date,
location_code: str = ""
) -> List[Weather]:
"""
근무 형태에 따른 날씨 데이터를 조회합니다.
Args:
shift_type: 근무 유형 ("주간" 또는 "야간")
current_date: 현재 날짜
location_code: 지역코드 (선택사항)
Returns:
Weather 객체 리스트
"""
from datetime import timedelta
if shift_type == "주간":
# 주간: 09:00 ~ 18:00
start_datetime = datetime.combine(current_date, time(9, 0))
end_datetime = datetime.combine(current_date, time(18, 0))
elif shift_type == "야간":
# 야간: 당일 18:00 ~ 다음날 09:00
start_datetime = datetime.combine(current_date, time(18, 0))
end_datetime = datetime.combine(current_date + timedelta(days=1), time(9, 0))
else:
# 기본값: 주간
start_datetime = datetime.combine(current_date, time(9, 0))
end_datetime = datetime.combine(current_date, time(18, 0))
return self.get_weather_by_datetime_range(start_datetime, end_datetime, location_code)
def get_weather_stats_for_shift(
self,
shift_type: str,
current_date: date,
location_code: str = ""
) -> Dict[str, Any]:
"""
근무 시간 동안의 날씨 통계를 반환합니다.
Args:
shift_type: 근무 유형 ("주간" 또는 "야간")
current_date: 현재 날짜
location_code: 지역코드 (선택사항)
Returns:
통계 데이터 딕셔너리
"""
weather_data = self.get_weather_for_shift(shift_type, current_date, location_code)
if not weather_data:
return {
"temp_min": None,
"temp_max": None,
"feels_like_min": None,
"feels_like_max": None,
"avg_temp": None,
"avg_feels_like": None,
"max_precipitation_prob": None,
"data_points": 0
}
temps = [w.temp for w in weather_data if w.temp is not None]
feels_likes = [w.feels_like for w in weather_data if w.feels_like is not None]
precip_probs = [w.precipitation_prob for w in weather_data if w.precipitation_prob is not None]
return {
"temp_min": min(temps) if temps else None,
"temp_max": max(temps) if temps else None,
"feels_like_min": min(feels_likes) if feels_likes else None,
"feels_like_max": max(feels_likes) if feels_likes else None,
"avg_temp": round(sum(temps) / len(temps)) if temps else None,
"avg_feels_like": round(sum(feels_likes) / len(feels_likes)) if feels_likes else None,
"max_precipitation_prob": max(precip_probs) if precip_probs else None,
"data_points": len(weather_data)
}
def cleanup_old_weather_data(self, days_to_keep: int = 7):
"""
오래된 날씨 데이터를 삭제합니다.
Args:
days_to_keep: 보관할 일수 (기본값: 7일)
"""
from datetime import timedelta
try:
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
result = self.db.execute(
"DELETE FROM weather WHERE datetime < ?",
(cutoff_date.isoformat(),)
)
deleted_count = result.rowcount if result else 0
logger.info(f"오래된 날씨 데이터 {deleted_count}개 삭제됨 (보관 기간: {days_to_keep}일)")
except Exception as e:
logger.error(f"날씨 데이터 정리 실패: {e}")
raise DatabaseQueryError(f"Failed to cleanup weather data: {e}")
def _row_to_weather(self, row: Dict[str, Any]) -> Weather:
"""데이터베이스 행을 Weather 객체로 변환"""
return Weather(
id=row.get('id'),
created_at=row.get('created_at'),
updated_at=row.get('updated_at'),
datetime=datetime.fromisoformat(row['datetime']) if row.get('datetime') else None,
location_name=row.get('location_name', ''),
location_code=row.get('location_code', ''),
temp=row.get('temp'),
feels_like=row.get('feels_like'),
humidity=row.get('humidity'),
wind_speed=row.get('wind_speed', ''),
wind_direction=row.get('wind_direction', ''),
precipitation_prob=row.get('precipitation_prob'),
weather_condition=row.get('weather_condition', ''),
weather_icon=row.get('weather_icon', '')
)
# ========================================================================
# 통계 메서드
# ========================================================================
def get_fault_statistics(
self,
start_date: date = None,
end_date: date = None
) -> Dict[str, Any]:
"""
고장 통계를 반환합니다.
Args:
start_date: 시작 날짜
end_date: 종료 날짜
Returns:
통계 데이터
"""
base_query = "FROM faults WHERE 1=1"
params = []
if start_date:
base_query += " AND occurrence_date >= ?"
params.append(start_date.isoformat())
if end_date:
base_query += " AND occurrence_date <= ?"
params.append(end_date.isoformat())
# 총 건수
total = self.db.fetch_one(f"SELECT COUNT(*) as count {base_query}", tuple(params))
# 장치별 통계
by_device = self.db.fetch_all(
f"""
SELECT device_category, COUNT(*) as count
{base_query}
GROUP BY device_category
ORDER BY count DESC
""",
tuple(params)
)
# 편성별 통계
by_train = self.db.fetch_all(
f"""
SELECT train_number, COUNT(*) as count
{base_query}
GROUP BY train_number
ORDER BY count DESC
LIMIT 10
""",
tuple(params)
)
return {
"total": total['count'] if total else 0,
"by_device": by_device,
"by_train": by_train,
}
# ============================================================================
# 모듈 레벨 편의 함수
# ============================================================================
def get_crud() -> CRUDManager:
"""
CRUD 관리자 인스턴스를 반환합니다.
Returns:
CRUDManager 인스턴스
"""
return CRUDManager()