handOver2/core/supabase_client.py

635 lines
22 KiB
Python

# -*- coding: utf-8 -*-
"""
Supabase 클라이언트 모듈
Supabase REST API를 통해 마스터 데이터를 동기화합니다.
지원 테이블:
- Trains: 편성 정보
- Stations: 역 정보
- Manufacturer: 제조사 정보
- Car_Identity: 차량 식별 정보
- Fault_Code_Table: 고장 코드 테이블
"""
import requests
from typing import Optional, List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
import json
from .logger import get_logger
from .settings_manager import (
get_settings_manager, TrainInfo, StationInfo,
ManufacturerInfo, FaultCodeInfo
)
logger = get_logger(__name__)
# ============================================================================
# Supabase 설정
# ============================================================================
SUPABASE_URL = "http://122.35.47.72:8000" # Kong API Gateway (기본 포트)
# 대체 포트: 3000 (PostgREST 직접), 54321 (Supabase Studio 기본)
SUPABASE_ANON_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoiYW5vbiIsImlzcyI6InN1cGFiYXNlIiwiaWF0IjoxNzU4NTUxNjY2LCJleHAiOjQxMDI0NDQ4MDB9.jMCGL3Q-N2o_l7JQE_HrO7Uoct86CMgLsVxpabisG4I"
# ============================================================================
# 데이터 클래스 (Supabase 테이블 매핑)
# ============================================================================
@dataclass
class SupabaseTrainInfo:
"""Supabase Trains 테이블 매핑"""
id: int
train_set: Optional[int] = None # 편성번호 (예: 1, 2, 3...)
train_id: Optional[str] = None # 편성 ID (예: "101")
car_num: Optional[int] = None # 호차 수
car_type: Optional[str] = None # 차량 타입
manufacturer: Optional[str] = None # 제조사
is_new: Optional[bool] = None # 신형 여부
date_of_Commercial_Service: Optional[str] = None # 영업 개시일
date_of_Introduction: Optional[str] = None # 도입일
introduction_stage: Optional[str] = None # 도입 단계
service_nickname: Optional[str] = None # 서비스 별명
@dataclass
class SupabaseStationInfo:
"""Supabase Stations 테이블 매핑"""
id: str
line_number: Optional[int] = None # 호선
station_id: Optional[float] = None # 역 ID
station_name: Optional[str] = None # 역명
station_map: Optional[str] = None # 역 지도
is_underground: Optional[bool] = None # 지하 여부
is_island: Optional[bool] = None # 섬식 승강장
is_exchange: Optional[bool] = None # 환승역
is_end: Optional[bool] = None # 종착역
has_siding_track: Optional[bool] = None # 측선 여부
has_signal_room: Optional[bool] = None # 신호실 여부
@dataclass
class ManufacturerInfo:
"""Supabase Manufacturer 테이블 매핑"""
id: int
manufact: Optional[str] = None # 제조사명
@dataclass
class FaultCodeInfo:
"""Supabase Fault_Code_Table 테이블 매핑"""
id: str
f_code: Optional[str] = None # 고장 코드
f_code_num: Optional[int] = None # 고장 코드 번호
f_name: Optional[str] = None # 고장명
car_type: Optional[str] = None # 차량 타입
f_class: Optional[str] = None # 고장 분류
fault_name: Optional[str] = None # 고장 상세명
grade: Optional[str] = None # 등급
device: Optional[str] = None # 장치
fault_detail: Optional[str] = None # 고장 상세
fault_reaction: Optional[str] = None # 고장 반응
fault_action: Optional[str] = None # 조치 방법
alias_name: Optional[str] = None # 별칭
manufacturer: Optional[str] = None # 제조사
# ============================================================================
# Supabase 클라이언트
# ============================================================================
class SupabaseClient:
"""
Supabase REST API 클라이언트
Docker로 운영 중인 Supabase에서 마스터 데이터를 조회합니다.
Examples:
>>> client = SupabaseClient()
>>> trains = client.get_trains()
>>> stations = client.get_stations()
"""
_instance: Optional['SupabaseClient'] = None
def __new__(cls):
"""싱글톤 패턴"""
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
"""초기화"""
if self._initialized:
return
self.base_url = SUPABASE_URL
self.anon_key = SUPABASE_ANON_KEY
self.headers = {
"apikey": self.anon_key,
"Authorization": f"Bearer {self.anon_key}",
"Content-Type": "application/json",
"Prefer": "return=representation"
}
self._initialized = True
logger.info(f"Supabase 클라이언트 초기화: {self.base_url}")
def _request(
self,
method: str,
endpoint: str,
params: Dict[str, Any] = None,
data: Dict[str, Any] = None,
timeout: int = 30
) -> Optional[List[Dict[str, Any]]]:
"""
Supabase REST API 요청
Args:
method: HTTP 메서드 (GET, POST, PATCH, DELETE)
endpoint: API 엔드포인트 (테이블명)
params: 쿼리 파라미터
data: 요청 바디
timeout: 타임아웃 (초)
Returns:
응답 데이터 리스트 (실패 시 None)
"""
url = f"{self.base_url}/rest/v1/{endpoint}"
try:
logger.debug(f"Supabase 요청: {method} {url}")
logger.debug(f" Headers: apikey=***{self.anon_key[-10:]}")
logger.debug(f" Params: {params}")
response = requests.request(
method=method,
url=url,
headers=self.headers,
params=params,
json=data,
timeout=timeout
)
logger.debug(f" 응답 상태: {response.status_code}")
if response.status_code == 200:
result = response.json()
logger.debug(f" 응답 데이터 수: {len(result) if isinstance(result, list) else 'N/A'}")
return result
else:
logger.error(f"Supabase 요청 실패: {response.status_code} - {response.text[:200]}")
return None
except requests.exceptions.Timeout:
logger.error(f"Supabase 요청 타임아웃: {endpoint}")
return None
except requests.exceptions.ConnectionError as e:
logger.error(f"Supabase 연결 실패: {endpoint} - {e}")
return None
except Exception as e:
logger.error(f"Supabase 요청 오류: {e}")
return None
# ========================================================================
# 편성 정보 (Trains)
# ========================================================================
def get_trains(self) -> List[SupabaseTrainInfo]:
"""
Supabase에서 편성 정보 조회
Returns:
편성 정보 리스트
"""
data = self._request("GET", "Trains", params={"order": "train_set.asc"})
if not data:
return []
trains = []
for row in data:
train = SupabaseTrainInfo(
id=row.get("id"),
train_set=row.get("train_set"),
train_id=row.get("train_id"),
car_num=row.get("car_num"),
car_type=row.get("car_type"),
manufacturer=row.get("manufacturer"),
is_new=row.get("is_new"),
date_of_Commercial_Service=row.get("date_of_Commercial_Service"),
date_of_Introduction=row.get("date_of_Introduction"),
introduction_stage=row.get("introduction_stage"),
service_nickname=row.get("service_nickname")
)
trains.append(train)
logger.info(f"Supabase에서 {len(trains)}개 편성 정보 조회 완료")
return trains
# ========================================================================
# 역 정보 (Stations)
# ========================================================================
def get_stations(self, line_number: int = None) -> List[SupabaseStationInfo]:
"""
Supabase에서 역 정보 조회
Args:
line_number: 호선 (None이면 전체)
Returns:
역 정보 리스트
"""
params = {"order": "station_id.asc"}
if line_number:
params["line_number"] = f"eq.{line_number}"
data = self._request("GET", "Stations", params=params)
if not data:
return []
stations = []
for row in data:
station = SupabaseStationInfo(
id=row.get("id"),
line_number=row.get("line_number"),
station_id=row.get("station_id"),
station_name=row.get("station_name"),
station_map=row.get("station_map"),
is_underground=row.get("is_underground"),
is_island=row.get("is_island"),
is_exchange=row.get("is_exchange"),
is_end=row.get("is_end"),
has_siding_track=row.get("has_siding_track"),
has_signal_room=row.get("has_signal_room")
)
stations.append(station)
logger.info(f"Supabase에서 {len(stations)}개 역 정보 조회 완료")
return stations
# ========================================================================
# 제조사 정보 (Manufacturer)
# ========================================================================
def get_manufacturers(self) -> List[ManufacturerInfo]:
"""
Supabase에서 제조사 정보 조회
Returns:
제조사 정보 리스트
"""
data = self._request("GET", "Manufacturer", params={"order": "id.asc"})
if not data:
return []
manufacturers = []
for row in data:
mfr = ManufacturerInfo(
id=row.get("id"),
manufact=row.get("manufact")
)
manufacturers.append(mfr)
logger.info(f"Supabase에서 {len(manufacturers)}개 제조사 정보 조회 완료")
return manufacturers
# ========================================================================
# 고장 코드 정보 (Fault_Code_Table)
# ========================================================================
def get_fault_codes(self, car_type: str = None, manufacturer: str = None) -> List[FaultCodeInfo]:
"""
Supabase에서 고장 코드 조회
Args:
car_type: 차량 타입 필터
manufacturer: 제조사 필터
Returns:
고장 코드 리스트
"""
params = {"order": "f_code_num.asc"}
if car_type:
params["car_type"] = f"eq.{car_type}"
if manufacturer:
params["manufacturer"] = f"eq.{manufacturer}"
data = self._request("GET", "Fault_Code_Table", params=params)
if not data:
return []
codes = []
for row in data:
code = FaultCodeInfo(
id=row.get("id"),
f_code=row.get("f_code"),
f_code_num=row.get("f_code_num"),
f_name=row.get("f_name"),
car_type=row.get("car_type"),
f_class=row.get("f_class"),
fault_name=row.get("fault_name"),
grade=row.get("grade"),
device=row.get("device"),
fault_detail=row.get("fault_detail"),
fault_reaction=row.get("fault_reaction"),
fault_action=row.get("fault_action"),
alias_name=row.get("alias_name"),
manufacturer=row.get("manufacturer")
)
codes.append(code)
logger.info(f"Supabase에서 {len(codes)}개 고장 코드 조회 완료")
return codes
# ========================================================================
# 연결 테스트
# ========================================================================
def test_connection(self) -> bool:
"""
Supabase 연결 테스트
Returns:
연결 성공 여부
"""
try:
# 간단한 쿼리로 연결 테스트
data = self._request("GET", "Manufacturer", params={"limit": "1"})
if data is not None:
logger.info("Supabase 연결 테스트 성공")
return True
return False
except Exception as e:
logger.error(f"Supabase 연결 테스트 실패: {e}")
return False
# ============================================================================
# 동기화 매니저
# ============================================================================
class SupabaseSyncManager:
"""
Supabase 데이터 동기화 매니저
Supabase에서 마스터 데이터를 가져와 로컬 SQLite DB에 저장합니다.
"""
def __init__(self):
self.client = SupabaseClient()
self.settings = get_settings_manager()
def sync_trains(self) -> int:
"""
편성 정보 동기화
Returns:
동기화된 레코드 수
"""
logger.info("편성 정보 동기화 시작...")
try:
# Supabase에서 데이터 조회
supabase_trains = self.client.get_trains()
if not supabase_trains:
logger.warning("Supabase에서 편성 정보를 가져오지 못했습니다.")
return 0
# 로컬 DB에 저장
count = 0
for train in supabase_trains:
# 편성번호 생성 (train_set 또는 train_id 사용)
train_number = str(train.train_set) if train.train_set else train.train_id
if not train_number:
continue
# TrainInfo 객체 생성
train_info = TrainInfo(
train_number=train_number,
train_type="B" if train.is_new else "A", # 신형=B, 구형=A
manufacturer=train.manufacturer or "",
manufacture_year=0, # 도입일에서 추출 가능
depot="", # Supabase에 없음
is_active=True
)
# 도입일에서 년도 추출
if train.date_of_Introduction:
try:
year = int(train.date_of_Introduction[:4])
train_info.manufacture_year = year
except (ValueError, TypeError):
pass
self.settings.save_train_info(train_info)
count += 1
# 동기화 상태 업데이트
self.settings.update_sync_status("train_info", "synced")
logger.info(f"편성 정보 동기화 완료: {count}")
return count
except Exception as e:
logger.error(f"편성 정보 동기화 실패: {e}")
self.settings.update_sync_status("train_info", "error")
return 0
def sync_stations(self, line_number: int = 1) -> int:
"""
역 정보 동기화
Args:
line_number: 호선 (기본값: 1호선)
Returns:
동기화된 레코드 수
"""
logger.info(f"{line_number}호선 역 정보 동기화 시작...")
try:
# Supabase에서 데이터 조회
supabase_stations = self.client.get_stations(line_number)
if not supabase_stations:
logger.warning("Supabase에서 역 정보를 가져오지 못했습니다.")
return 0
# 로컬 DB에 저장
count = 0
for station in supabase_stations:
if not station.station_name:
continue
# StationInfo 객체 생성
station_info = StationInfo(
station_code=str(int(station.station_id)) if station.station_id else "",
station_name=station.station_name,
line_number=station.line_number or line_number,
order=int(station.station_id) if station.station_id else 0,
is_active=True
)
self.settings.save_station_info(station_info)
count += 1
# 동기화 상태 업데이트
self.settings.update_sync_status("station_info", "synced")
logger.info(f"역 정보 동기화 완료: {count}")
return count
except Exception as e:
logger.error(f"역 정보 동기화 실패: {e}")
self.settings.update_sync_status("station_info", "error")
return 0
def sync_manufacturers(self) -> int:
"""
제조사 정보 동기화
Returns:
동기화된 레코드 수
"""
logger.info("제조사 정보 동기화 시작...")
try:
# Supabase에서 데이터 조회
supabase_manufacturers = self.client.get_manufacturers()
if not supabase_manufacturers:
logger.warning("Supabase에서 제조사 정보를 가져오지 못했습니다.")
return 0
# 로컬 DB에 저장
count = 0
for mfr in supabase_manufacturers:
if not mfr.manufact:
continue
# ManufacturerInfo 객체 생성
manufacturer_info = ManufacturerInfo(
id=mfr.id,
name=mfr.manufact
)
self.settings.save_manufacturer_info(manufacturer_info)
count += 1
# 동기화 상태 업데이트
self.settings.update_sync_status("manufacturer_info", "synced")
logger.info(f"제조사 정보 동기화 완료: {count}")
return count
except Exception as e:
logger.error(f"제조사 정보 동기화 실패: {e}")
self.settings.update_sync_status("manufacturer_info", "error")
return 0
def sync_fault_codes(self, car_type: str = None, manufacturer: str = None) -> int:
"""
고장 코드 동기화
Args:
car_type: 차량 타입 필터
manufacturer: 제조사 필터
Returns:
동기화된 레코드 수
"""
logger.info("고장 코드 동기화 시작...")
try:
# Supabase에서 데이터 조회
supabase_codes = self.client.get_fault_codes(car_type, manufacturer)
if not supabase_codes:
logger.warning("Supabase에서 고장 코드를 가져오지 못했습니다.")
return 0
# 로컬 DB에 저장
count = 0
for code in supabase_codes:
if not code.f_code:
continue
# FaultCodeInfo 객체 생성
fault_code_info = FaultCodeInfo(
f_code=code.f_code,
f_code_num=code.f_code_num,
f_name=code.f_name,
car_type=code.car_type,
f_class=code.f_class,
fault_name=code.fault_name,
grade=code.grade,
device=code.device,
fault_detail=code.fault_detail,
fault_action=code.fault_action,
manufacturer=code.manufacturer
)
self.settings.save_fault_code_info(fault_code_info)
count += 1
# 동기화 상태 업데이트
self.settings.update_sync_status("fault_code_info", "synced")
logger.info(f"고장 코드 동기화 완료: {count}")
return count
except Exception as e:
logger.error(f"고장 코드 동기화 실패: {e}")
self.settings.update_sync_status("fault_code_info", "error")
return 0
def sync_all(self) -> Dict[str, int]:
"""
모든 마스터 데이터 동기화
Returns:
테이블별 동기화 결과 {테이블명: 레코드수}
"""
results = {}
# 편성 정보
results["trains"] = self.sync_trains()
# 역 정보 (1호선)
results["stations"] = self.sync_stations(line_number=1)
# 제조사 정보
results["manufacturers"] = self.sync_manufacturers()
# 고장 코드 (많으면 시간이 걸리므로 선택적으로 호출)
# results["fault_codes"] = self.sync_fault_codes()
return results
# ============================================================================
# 모듈 레벨 함수
# ============================================================================
def get_supabase_client() -> SupabaseClient:
"""Supabase 클라이언트 인스턴스 반환"""
return SupabaseClient()
def sync_master_data() -> Dict[str, int]:
"""마스터 데이터 동기화 실행"""
sync_manager = SupabaseSyncManager()
return sync_manager.sync_all()