VOC_Monitor/app/services/timetable_service.py

268 lines
10 KiB
Python

import pandas as pd
from pathlib import Path
from datetime import datetime
import re
from utils.path_utils import get_base_dir
from utils.logger import get_logger
class TimetableService:
"""
열차 운행 시각표(Parquet) 데이터를 조회하고 분석하는 서비스 클래스.
"""
def __init__(self, parquet_path=None):
self.logger = get_logger("TimetableService")
self.base_dir = get_base_dir()
if parquet_path:
self.parquet_path = Path(parquet_path)
else:
self.parquet_path = self.base_dir / "data" / "line1_sp_timetable.parquet"
self._df = None
self.is_loaded = False
self._load_data()
def _load_data(self):
"""Parquet 파일을 로드합니다."""
try:
if self.parquet_path.exists():
self._df = pd.read_parquet(self.parquet_path)
self.is_loaded = True
self.logger.info(f"시각표 로드 완료: {len(self._df)} 행 (경로: {self.parquet_path})")
else:
self.logger.error(f"시각표 파일을 찾을 수 없음: {self.parquet_path}")
except Exception as e:
self.logger.error(f"시각표 로드 중 오류 발생: {e}")
# def _find_depot_info(self, duty_id, diagram_type):
# """
# [신규 추가] 특정 다이아(duty_id)의 입고(depot_in) 정보를 찾습니다.
# """
# if self.timetable._df is None:
# return None
# df = self.timetable._df
# # 1. 해당 다이아타입 & 다이아번호 필터링
# # 2. run_type이 'depot_in' 인 행 필터링
# condition = (
# (df['diagram_type'] == diagram_type) &
# (df['duty_id'] == duty_id) &
# (df['run_type'] == 'depot_in')
# )
# subset = df[condition]
# if not subset.empty:
# # 입고 행이 여러 개일 경우 마지막 행(최종 입고)을 반환
# return subset.iloc[-1].to_dict()
# return None
def search_train(self, station=None, direction=None, time_str=None, diagram_type=None, train_number=None):
"""
다양한 조건으로 열차 정보를 검색합니다.
Args:
station (str): 역 명칭 (부분 일치 가능)
direction (str): 'up' 또는 'down'
time_str (str): 'HH:MM' 또는 'HH:MM:SS' 형식
diagram_type (str): 'weekday', 'weekend', 'holiday'
train_number (int/str): 열차 번호
Returns:
list: 검색 결과 리스트 (dict 형태)
"""
if not self.is_loaded or self._df is None:
return []
df = self._df.copy()
if diagram_type:
df = df[df['diagram_type'] == diagram_type]
if train_number:
df = df[df['train_number'] == int(train_number)]
if direction:
# direction mapping: 상선->up, 하선->down (데이터 기준에 따름)
dir_query = direction.lower()
if '' in dir_query or 'up' in dir_query:
df = df[df['direction'] == 'up']
elif '' in dir_query or 'down' in dir_query:
df = df[df['direction'] == 'down']
if station:
# 역 이름 정규화 (예: 노포역 -> 노포)
clean_station = station.replace("", "").strip()
df = df[df['station'].str.contains(clean_station)]
if time_str:
# 시간 형식 정규화 (HH:MM -> HH:MM:SS)
if len(time_str) == 5:
time_query = f"{time_str}:00"
else:
time_query = time_str
# 시간 오차 허용 (예: +- 10분) - 여기서는 단순히 가까운 시간대 혹은 필터링
# 구체적인 '가장 가까운 열차' 로직은 별도 메서드로 분리 가능
pass
# 결과를 dict 리스트로 변환
results = df.sort_values(by=['diagram_type', 'train_number', 'seq']).to_dict('records')
return results
def train_exists(self, train_number, diagram_type=None):
"""
해당 열차번호가 시각표에 존재하는지 확인합니다.
(존재하지 않는 열번은 전화번호 등 오탐으로 걸러낼 때 사용)
Args:
train_number (int/str): 열차 번호
diagram_type (str): 'weekday', 'weekend', 'holiday' (None이면 전체)
Returns:
bool: 존재 여부
"""
if not self.is_loaded or self._df is None:
return False
try:
n = int(train_number)
mask = self._df['train_number'] == n
if diagram_type:
mask = mask & (self._df['diagram_type'] == diagram_type)
return self._df[mask].shape[0] > 0
except (ValueError, TypeError):
return False
def find_train_by_time(self, station, direction, time_str, diagram_type='weekday', window_minutes=10):
"""
특정 역에서 특정 시간에 가장 가까운 열차를 찾습니다.
"""
if not self.is_loaded or self._df is None:
return None
# 1. 필터링 (역, 방향, 타입)
clean_station = station.replace("", "").strip()
dir_val = 'up' if '' in direction or 'up' in direction.lower() else 'down'
mask = (self._df['station'].str.contains(clean_station)) & \
(self._df['direction'] == dir_val) & \
(self._df['diagram_type'] == diagram_type)
candidates = self._df[mask].copy()
if candidates.empty:
return None
# 2. 시간 차이 계산
try:
target_time = datetime.strptime(time_str[:5], "%H:%M")
def calculate_diff(row):
row_time = datetime.strptime(row['time'][:5], "%H:%M")
diff = abs((row_time - target_time).total_seconds()) / 60
return diff
candidates['time_diff'] = candidates.apply(calculate_diff, axis=1)
# window_minutes 이내의 가장 가까운 열차
closest = candidates[candidates['time_diff'] <= window_minutes].sort_values('time_diff')
if not closest.empty:
return closest.iloc[0].to_dict()
except Exception as e:
self.logger.error(f"시간 비교 중 오류: {e}")
return None
def get_trains_in_window(self, station, time_str, window_minutes=15, diagram_type='weekday', direction=None):
"""
특정 역, 특정 시간 기준 window_minutes (기본 15분) 전까지의 열차 리스트를 반환합니다.
(과거 시간 ~ 기준 시간)
Args:
station (str): 역 명칭
time_str (str): 기준 시간 (HH:MM)
window_minutes (int): 몇 분 전까지로 검색할지 (양수 입력, 예: 10 -> 10분 전 ~ 현재)
diagram_type (str): 다이어그램 종류
direction (str): 방향 필터 (None이면 양방향 모두)
Returns:
list: 열차 정보 딕셔너리 리스트
"""
if not self.is_loaded or self._df is None:
return []
clean_station = station.replace("", "").strip()
# 기본 필터: 역, 다이어그램 타입
mask = (self._df['station'].str.contains(clean_station)) & \
(self._df['diagram_type'] == diagram_type)
if direction:
dir_val = 'up' if '' in direction or 'up' in direction.lower() else 'down'
mask = mask & (self._df['direction'] == dir_val)
candidates = self._df[mask].copy()
if candidates.empty:
return []
results = []
try:
target_time = datetime.strptime(time_str[:5], "%H:%M")
# 비교를 위해 임의의 날짜(1900-01-01)가 붙으므로, 시간만 비교하기 위해 timedelta 사용
# 하지만 간단히 datetime 객체 간 차이로 계산 (같은 날짜 가정)
for _, row in candidates.iterrows():
try:
row_time_str = row['time'][:5]
row_time = datetime.strptime(row_time_str, "%H:%M")
# 시간 차이 (분) = 기준시간 - 도착시간
# 예: 기준 16:48, 도착 16:36 -> 12분 전 (OK)
# 예: 기준 16:48, 도착 16:50 -> -2분 (미포함, 미래)
diff_mins = (target_time - row_time).total_seconds() / 60
# window_minutes 전 ~ 0분(현재) 사이인 경우 (즉, 이미 지나간 열차)
if 0 <= diff_mins <= window_minutes:
item = row.to_dict()
item['diff_min'] = round(diff_mins, 1)
results.append(item)
except: continue
# 시간순 정렬 (오래된 것 -> 최신)
results.sort(key=lambda x: x['time'])
except Exception as e:
self.logger.error(f"구간 열차 조회 실패: {e}")
return results
def find_depot_info(self, duty_id, diagram_type):
"""
특정 다이아(duty_id)의 입고(depot_in) 정보를 찾습니다.
**시간순 정렬**을 추가하여 항상 도착(종착) 시간을 가져오도록 보장합니다.
"""
if not self.is_loaded or self._df is None:
return None
df = self._df
# 1. 해당 다이아타입 & 다이아번호 & 입고(depot_in) 필터링
condition = (
(df['diagram_type'] == diagram_type) &
(df['duty_id'] == duty_id) &
(df['run_type'] == 'depot_in')
)
subset = df[condition]
if not subset.empty:
# [핵심 수정] 시간(time) 순으로 오름차순 정렬 (00:00 -> 23:59)
# 이렇게 해야 마지막 행(iloc[-1])이 항상 '도착 시간'이 됩니다.
subset = subset.sort_values(by='time', ascending=True)
return subset.iloc[-1].to_dict()
return None