516 lines
19 KiB
Python
516 lines
19 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
공통 데이터베이스 관리 모듈
|
|
변경이 거의 없는 공통 데이터(편성, 시각표, 고장코드, MMI코드, 시그널, 역명, 도면약어 등)를 관리합니다.
|
|
"""
|
|
|
|
import sqlite3
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any
|
|
from contextlib import contextmanager
|
|
|
|
from core.constants import DATA_DIR
|
|
from core.logger import get_logger
|
|
from core.exceptions import DatabaseConnectionError, DatabaseQueryError
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# 공통 데이터베이스 파일 경로
|
|
COMMON_DB_FILE = DATA_DIR / "common_data.db"
|
|
|
|
|
|
# ============================================================================
|
|
# 공통 데이터 테이블 SQL 스키마
|
|
# ============================================================================
|
|
|
|
CREATE_COMMON_TABLES_SQL = """
|
|
-- 전동차 편성 테이블
|
|
CREATE TABLE IF NOT EXISTS train_formations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
train_number TEXT UNIQUE NOT NULL,
|
|
is_new_train INTEGER DEFAULT 1,
|
|
manufacturer TEXT,
|
|
introduction_date DATE,
|
|
depot TEXT,
|
|
alias TEXT,
|
|
introduction_stage TEXT,
|
|
introduction_count INTEGER DEFAULT 0,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
-- 열차 다이아 시각표 테이블
|
|
CREATE TABLE IF NOT EXISTS train_schedules (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
column_number TEXT NOT NULL,
|
|
station TEXT NOT NULL,
|
|
arrival_time TIME,
|
|
departure_time TIME,
|
|
direction TEXT NOT NULL DEFAULT 'up',
|
|
is_weekday INTEGER DEFAULT 1,
|
|
is_active INTEGER DEFAULT 1,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(column_number, station, direction, is_weekday)
|
|
);
|
|
|
|
-- 고장코드 테이블
|
|
CREATE TABLE IF NOT EXISTS fault_codes (
|
|
id TEXT PRIMARY KEY,
|
|
f_code TEXT NOT NULL,
|
|
f_code_num TEXT,
|
|
f_name TEXT NOT NULL,
|
|
car_type TEXT,
|
|
f_class TEXT,
|
|
fault_name TEXT,
|
|
grade TEXT,
|
|
device TEXT,
|
|
fault_detail TEXT,
|
|
fault_reaction TEXT,
|
|
fault_detection TEXT,
|
|
fault_clear TEXT,
|
|
fault_action TEXT,
|
|
fault_schematics TEXT,
|
|
car_id TEXT,
|
|
alias_name TEXT,
|
|
manufacturer TEXT,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
-- MMI 코드 테이블
|
|
CREATE TABLE IF NOT EXISTS mmi_codes (
|
|
id TEXT PRIMARY KEY,
|
|
code_name TEXT NOT NULL,
|
|
code_description TEXT,
|
|
data_type TEXT,
|
|
car_id TEXT,
|
|
alias_name TEXT,
|
|
manufacturer TEXT,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
-- 시그널 코드 테이블
|
|
CREATE TABLE IF NOT EXISTS signals (
|
|
id TEXT PRIMARY KEY,
|
|
sig_num TEXT NOT NULL,
|
|
signal_abbreviation TEXT NOT NULL,
|
|
signal_description TEXT,
|
|
status_value TEXT,
|
|
manufacturer TEXT,
|
|
classification TEXT,
|
|
original_data TEXT,
|
|
alias_name TEXT,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
-- 역명 테이블
|
|
CREATE TABLE IF NOT EXISTS stations (
|
|
id TEXT PRIMARY KEY,
|
|
line_number TEXT NOT NULL,
|
|
station_id TEXT NOT NULL,
|
|
station_name TEXT NOT NULL,
|
|
station_map TEXT,
|
|
is_underground INTEGER DEFAULT 0,
|
|
is_island INTEGER DEFAULT 0,
|
|
is_exchange INTEGER DEFAULT 0,
|
|
is_end INTEGER DEFAULT 0,
|
|
has_siding_track INTEGER DEFAULT 0,
|
|
has_signal_room INTEGER DEFAULT 0,
|
|
etc1 TEXT,
|
|
etc2 TEXT,
|
|
etc3 TEXT,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(line_number, station_id)
|
|
);
|
|
|
|
-- 도면약어 테이블
|
|
CREATE TABLE IF NOT EXISTS drawer_abbreviations (
|
|
id TEXT PRIMARY KEY,
|
|
abb TEXT NOT NULL,
|
|
classification TEXT,
|
|
related_drawings TEXT,
|
|
drawing_id TEXT,
|
|
manufacturer TEXT,
|
|
term TEXT,
|
|
pages TEXT,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
-- 인덱스 생성
|
|
CREATE INDEX IF NOT EXISTS idx_fault_codes_f_code ON fault_codes(f_code);
|
|
CREATE INDEX IF NOT EXISTS idx_fault_codes_device ON fault_codes(device);
|
|
CREATE INDEX IF NOT EXISTS idx_mmi_codes_code_name ON mmi_codes(code_name);
|
|
CREATE INDEX IF NOT EXISTS idx_signals_sig_num ON signals(sig_num);
|
|
CREATE INDEX IF NOT EXISTS idx_signals_abbreviation ON signals(signal_abbreviation);
|
|
CREATE INDEX IF NOT EXISTS idx_stations_station_name ON stations(station_name);
|
|
CREATE INDEX IF NOT EXISTS idx_drawer_abbreviations_abb ON drawer_abbreviations(abb);
|
|
"""
|
|
|
|
|
|
# ============================================================================
|
|
# 공통 데이터베이스 관리자 클래스
|
|
# ============================================================================
|
|
|
|
class CommonDatabaseManager:
|
|
"""
|
|
공통 데이터베이스 관리자
|
|
|
|
변경이 거의 없는 공통 데이터를 관리하는 별도의 데이터베이스입니다.
|
|
"""
|
|
|
|
_instance = None
|
|
_lock = threading.Lock()
|
|
_initialized = False
|
|
|
|
def __new__(cls, db_path: Path = None):
|
|
if cls._instance is None:
|
|
with cls._lock:
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
cls._instance._initialized = False
|
|
return cls._instance
|
|
|
|
def __init__(self, db_path: Path = None):
|
|
"""공통 데이터베이스 관리자 초기화"""
|
|
if self._initialized:
|
|
return
|
|
|
|
self.db_path = db_path or COMMON_DB_FILE
|
|
self._local = threading.local()
|
|
|
|
# 데이터 디렉토리 생성
|
|
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# 데이터베이스 초기화
|
|
self._initialize_database()
|
|
|
|
self._initialized = True
|
|
logger.info(f"공통 데이터베이스 관리자 초기화 완료: {self.db_path}")
|
|
|
|
def _initialize_database(self):
|
|
"""데이터베이스 초기화 (테이블 생성)"""
|
|
try:
|
|
with self.get_connection() as conn:
|
|
conn.execute("PRAGMA foreign_keys = ON")
|
|
conn.executescript(CREATE_COMMON_TABLES_SQL)
|
|
conn.commit()
|
|
|
|
logger.info("공통 데이터베이스 테이블 초기화 완료")
|
|
except Exception as e:
|
|
logger.error(f"공통 데이터베이스 초기화 실패: {e}")
|
|
raise DatabaseConnectionError(f"공통 데이터베이스 초기화 실패: {e}")
|
|
|
|
@contextmanager
|
|
def get_connection(self):
|
|
"""데이터베이스 연결 컨텍스트 매니저"""
|
|
if not hasattr(self._local, 'connection') or self._local.connection is None:
|
|
try:
|
|
self._local.connection = sqlite3.connect(
|
|
str(self.db_path),
|
|
check_same_thread=False,
|
|
timeout=30.0
|
|
)
|
|
self._local.connection.row_factory = sqlite3.Row
|
|
logger.debug(f"공통 데이터베이스 연결 생성: {self.db_path}")
|
|
except sqlite3.Error as e:
|
|
logger.error(f"공통 데이터베이스 연결 실패: {e}")
|
|
raise DatabaseConnectionError(f"공통 데이터베이스 연결 실패: {e}")
|
|
|
|
try:
|
|
yield self._local.connection
|
|
except sqlite3.Error as e:
|
|
logger.error(f"공통 데이터베이스 쿼리 오류: {e}")
|
|
raise DatabaseQueryError(f"공통 데이터베이스 쿼리 오류: {e}")
|
|
finally:
|
|
# 연결은 스레드별로 유지하되, 필요시 닫을 수 있도록
|
|
pass
|
|
|
|
def execute(self, sql: str, params: tuple = ()) -> sqlite3.Cursor:
|
|
"""SQL 실행"""
|
|
with self.get_connection() as conn:
|
|
return conn.execute(sql, params or ())
|
|
|
|
def executemany(self, sql: str, params_list: List[tuple]) -> sqlite3.Cursor:
|
|
"""여러 SQL 실행"""
|
|
with self.get_connection() as conn:
|
|
return conn.executemany(sql, params_list)
|
|
|
|
def fetch_one(self, sql: str, params: tuple = ()) -> Optional[Dict[str, Any]]:
|
|
"""단일 행 조회"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.execute(sql, params or ())
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
def fetch_all(self, sql: str, params: tuple = ()) -> List[Dict[str, Any]]:
|
|
"""모든 행 조회"""
|
|
with self.get_connection() as conn:
|
|
cursor = conn.execute(sql, params or ())
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
def commit(self):
|
|
"""커밋"""
|
|
with self.get_connection() as conn:
|
|
conn.commit()
|
|
|
|
def load_data_from_sql_files(self, sql_dir: Path = None):
|
|
"""SQL 파일에서 데이터 로드"""
|
|
from database.sql_loader import load_sql_file
|
|
|
|
if sql_dir is None:
|
|
sql_dir = Path(__file__).parent.parent / "ori_data"
|
|
|
|
if not sql_dir.exists():
|
|
logger.warning(f"SQL 파일 디렉토리가 없습니다: {sql_dir}")
|
|
return
|
|
|
|
# 각 SQL 파일 처리
|
|
sql_files = {
|
|
"Fault_Code_Table_rows.sql": self._load_fault_codes,
|
|
"MMI_Code_rows.sql": self._load_mmi_codes,
|
|
"Signals_rows.sql": self._load_signals,
|
|
"Stations_rows.sql": self._load_stations,
|
|
"drawer_abbreviation_rows.sql": self._load_drawer_abbreviations,
|
|
}
|
|
|
|
for filename, loader_func in sql_files.items():
|
|
sql_file = sql_dir / filename
|
|
if sql_file.exists():
|
|
try:
|
|
logger.info(f"데이터 로드 중: {filename}")
|
|
records = load_sql_file(sql_file)
|
|
if records:
|
|
loader_func(records)
|
|
logger.info(f"데이터 로드 완료: {filename} ({len(records)}개 레코드)")
|
|
else:
|
|
logger.warning(f"파싱된 레코드가 없습니다: {filename}")
|
|
except Exception as e:
|
|
logger.error(f"데이터 로드 실패 ({filename}): {e}", exc_info=True)
|
|
else:
|
|
logger.warning(f"SQL 파일을 찾을 수 없습니다: {sql_file}")
|
|
|
|
def _load_fault_codes(self, records: List[Dict[str, Any]]):
|
|
"""고장코드 데이터 삽입"""
|
|
if not records:
|
|
return
|
|
|
|
insert_sql = """
|
|
INSERT OR REPLACE INTO fault_codes
|
|
(id, f_code, f_code_num, f_name, car_type, f_class, fault_name, grade, device,
|
|
fault_detail, fault_reaction, fault_detection, fault_clear, fault_action,
|
|
fault_schematics, car_id, alias_name, manufacturer, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
|
|
params_list = []
|
|
for record in records:
|
|
# datetime 문자열 파싱
|
|
created_at = self._parse_datetime(record.get('created_at'))
|
|
updated_at = self._parse_datetime(record.get('updated_at'))
|
|
|
|
params_list.append((
|
|
record.get('id'),
|
|
record.get('f_code'),
|
|
record.get('f_code_num'),
|
|
record.get('f_name'),
|
|
record.get('car_type'),
|
|
record.get('f_class'),
|
|
record.get('fault_name'),
|
|
record.get('grade'),
|
|
record.get('device'),
|
|
record.get('fault_detail'),
|
|
record.get('fault_reaction'),
|
|
record.get('fault_detection'),
|
|
record.get('fault_clear'),
|
|
record.get('fault_action'),
|
|
record.get('fault_schematics'),
|
|
record.get('car_id'),
|
|
record.get('alias_name'),
|
|
record.get('manufacturer'),
|
|
created_at,
|
|
updated_at
|
|
))
|
|
|
|
self.executemany(insert_sql, params_list)
|
|
self.commit()
|
|
logger.info(f"고장코드 데이터 삽입 완료: {len(params_list)}개")
|
|
|
|
def _load_mmi_codes(self, records: List[Dict[str, Any]]):
|
|
"""MMI 코드 데이터 삽입"""
|
|
if not records:
|
|
return
|
|
|
|
insert_sql = """
|
|
INSERT OR REPLACE INTO mmi_codes
|
|
(id, code_name, code_description, data_type, car_id, alias_name, manufacturer, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
|
|
params_list = []
|
|
for record in records:
|
|
created_at = self._parse_datetime(record.get('created_at'))
|
|
updated_at = self._parse_datetime(record.get('updated_at'))
|
|
|
|
params_list.append((
|
|
record.get('id'),
|
|
record.get('code_name'),
|
|
record.get('code_description'),
|
|
record.get('data_type'),
|
|
record.get('car_id'),
|
|
record.get('alias_name'),
|
|
record.get('manufacturer'),
|
|
created_at,
|
|
updated_at
|
|
))
|
|
|
|
self.executemany(insert_sql, params_list)
|
|
self.commit()
|
|
logger.info(f"MMI 코드 데이터 삽입 완료: {len(params_list)}개")
|
|
|
|
def _load_signals(self, records: List[Dict[str, Any]]):
|
|
"""시그널 코드 데이터 삽입"""
|
|
if not records:
|
|
return
|
|
|
|
insert_sql = """
|
|
INSERT OR REPLACE INTO signals
|
|
(id, sig_num, signal_abbreviation, signal_description, status_value,
|
|
manufacturer, classification, original_data, alias_name, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
|
|
params_list = []
|
|
for record in records:
|
|
created_at = self._parse_datetime(record.get('created_at'))
|
|
updated_at = self._parse_datetime(record.get('updated_at'))
|
|
|
|
params_list.append((
|
|
record.get('id'),
|
|
record.get('sig_num'),
|
|
record.get('signal_abbreviation'),
|
|
record.get('signal_description'),
|
|
record.get('status_value'),
|
|
record.get('manufacturer'),
|
|
record.get('classification'),
|
|
record.get('original_data'),
|
|
record.get('alias_name'),
|
|
created_at,
|
|
updated_at
|
|
))
|
|
|
|
self.executemany(insert_sql, params_list)
|
|
self.commit()
|
|
logger.info(f"시그널 코드 데이터 삽입 완료: {len(params_list)}개")
|
|
|
|
def _load_stations(self, records: List[Dict[str, Any]]):
|
|
"""역명 데이터 삽입"""
|
|
if not records:
|
|
return
|
|
|
|
insert_sql = """
|
|
INSERT OR REPLACE INTO stations
|
|
(id, line_number, station_id, station_name, station_map,
|
|
is_underground, is_island, is_exchange, is_end,
|
|
has_siding_track, has_signal_room, etc1, etc2, etc3, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
|
|
params_list = []
|
|
for record in records:
|
|
created_at = self._parse_datetime(record.get('created_at'))
|
|
updated_at = self._parse_datetime(record.get('updated_at'))
|
|
|
|
# 불리언 문자열을 정수로 변환
|
|
is_underground = 1 if str(record.get('is_underground', 'false')).lower() == 'true' else 0
|
|
is_island = 1 if str(record.get('is_island', 'false')).lower() == 'true' else 0
|
|
is_exchange = 1 if str(record.get('is_exchange', 'false')).lower() == 'true' else 0
|
|
is_end = 1 if str(record.get('is_end', 'false')).lower() == 'true' else 0
|
|
has_siding_track = 1 if str(record.get('has_siding_track', 'false')).lower() == 'true' else 0
|
|
has_signal_room = 1 if str(record.get('has_signal_room', 'false')).lower() == 'true' else 0
|
|
|
|
params_list.append((
|
|
record.get('id'),
|
|
record.get('line_number'),
|
|
record.get('station_id'),
|
|
record.get('station_name'),
|
|
record.get('station_map'),
|
|
is_underground,
|
|
is_island,
|
|
is_exchange,
|
|
is_end,
|
|
has_siding_track,
|
|
has_signal_room,
|
|
record.get('etc1'),
|
|
record.get('etc2'),
|
|
record.get('etc3'),
|
|
created_at,
|
|
updated_at
|
|
))
|
|
|
|
self.executemany(insert_sql, params_list)
|
|
self.commit()
|
|
logger.info(f"역명 데이터 삽입 완료: {len(params_list)}개")
|
|
|
|
def _load_drawer_abbreviations(self, records: List[Dict[str, Any]]):
|
|
"""도면약어 데이터 삽입"""
|
|
if not records:
|
|
return
|
|
|
|
insert_sql = """
|
|
INSERT OR REPLACE INTO drawer_abbreviations
|
|
(id, abb, classification, related_drawings, drawing_id,
|
|
manufacturer, term, pages, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
|
|
params_list = []
|
|
for record in records:
|
|
created_at = self._parse_datetime(record.get('created_at'))
|
|
updated_at = self._parse_datetime(record.get('updated_at'))
|
|
|
|
params_list.append((
|
|
record.get('id'),
|
|
record.get('abb'),
|
|
record.get('classification'),
|
|
record.get('Related_drawings'), # 원본 컬럼명
|
|
record.get('drawing_id'),
|
|
record.get('manufacturer'),
|
|
record.get('term'),
|
|
record.get('pages'),
|
|
created_at,
|
|
updated_at
|
|
))
|
|
|
|
self.executemany(insert_sql, params_list)
|
|
self.commit()
|
|
logger.info(f"도면약어 데이터 삽입 완료: {len(params_list)}개")
|
|
|
|
def _parse_datetime(self, dt_str: Optional[str]) -> Optional[str]:
|
|
"""datetime 문자열 파싱"""
|
|
if not dt_str:
|
|
return None
|
|
|
|
try:
|
|
# PostgreSQL 형식: '2025-10-12 15:03:22.31555+00'
|
|
# SQLite 형식으로 변환
|
|
if isinstance(dt_str, str):
|
|
# 타임존 제거
|
|
if '+' in dt_str:
|
|
dt_str = dt_str.split('+')[0]
|
|
# 밀리초 제거 (있는 경우)
|
|
if '.' in dt_str:
|
|
parts = dt_str.split('.')
|
|
dt_str = parts[0]
|
|
return dt_str
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|