# -*- coding: utf-8 -*- """ Supabase 클라이언트 모듈 Supabase REST API를 통해 마스터 데이터를 동기화합니다. 지원 테이블: - Trains: 편성 정보 - Stations: 역 정보 - Manufacturer: 제조사 정보 - Car_Identity: 차량 식별 정보 - Fault_Code_Table: 고장 코드 테이블 """ import requests from typing import Optional, List, Dict, Any from dataclasses import dataclass from datetime import datetime import json from .logger import get_logger from .settings_manager import ( get_settings_manager, TrainInfo, StationInfo, ManufacturerInfo, FaultCodeInfo ) logger = get_logger(__name__) # ============================================================================ # Supabase 설정 # ============================================================================ SUPABASE_URL = "http://122.35.47.72:8000" # Kong API Gateway (기본 포트) # 대체 포트: 3000 (PostgREST 직접), 54321 (Supabase Studio 기본) SUPABASE_ANON_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoiYW5vbiIsImlzcyI6InN1cGFiYXNlIiwiaWF0IjoxNzU4NTUxNjY2LCJleHAiOjQxMDI0NDQ4MDB9.jMCGL3Q-N2o_l7JQE_HrO7Uoct86CMgLsVxpabisG4I" # ============================================================================ # 데이터 클래스 (Supabase 테이블 매핑) # ============================================================================ @dataclass class SupabaseTrainInfo: """Supabase Trains 테이블 매핑""" id: int train_set: Optional[int] = None # 편성번호 (예: 1, 2, 3...) train_id: Optional[str] = None # 편성 ID (예: "101") car_num: Optional[int] = None # 호차 수 car_type: Optional[str] = None # 차량 타입 manufacturer: Optional[str] = None # 제조사 is_new: Optional[bool] = None # 신형 여부 date_of_Commercial_Service: Optional[str] = None # 영업 개시일 date_of_Introduction: Optional[str] = None # 도입일 introduction_stage: Optional[str] = None # 도입 단계 service_nickname: Optional[str] = None # 서비스 별명 @dataclass class SupabaseStationInfo: """Supabase Stations 테이블 매핑""" id: str line_number: Optional[int] = None # 호선 station_id: Optional[float] = None # 역 ID station_name: Optional[str] = None # 역명 station_map: Optional[str] = None # 역 지도 is_underground: Optional[bool] = None # 지하 여부 is_island: Optional[bool] = None # 섬식 승강장 is_exchange: Optional[bool] = None # 환승역 is_end: Optional[bool] = None # 종착역 has_siding_track: Optional[bool] = None # 측선 여부 has_signal_room: Optional[bool] = None # 신호실 여부 @dataclass class ManufacturerInfo: """Supabase Manufacturer 테이블 매핑""" id: int manufact: Optional[str] = None # 제조사명 @dataclass class FaultCodeInfo: """Supabase Fault_Code_Table 테이블 매핑""" id: str f_code: Optional[str] = None # 고장 코드 f_code_num: Optional[int] = None # 고장 코드 번호 f_name: Optional[str] = None # 고장명 car_type: Optional[str] = None # 차량 타입 f_class: Optional[str] = None # 고장 분류 fault_name: Optional[str] = None # 고장 상세명 grade: Optional[str] = None # 등급 device: Optional[str] = None # 장치 fault_detail: Optional[str] = None # 고장 상세 fault_reaction: Optional[str] = None # 고장 반응 fault_action: Optional[str] = None # 조치 방법 alias_name: Optional[str] = None # 별칭 manufacturer: Optional[str] = None # 제조사 # ============================================================================ # Supabase 클라이언트 # ============================================================================ class SupabaseClient: """ Supabase REST API 클라이언트 Docker로 운영 중인 Supabase에서 마스터 데이터를 조회합니다. Examples: >>> client = SupabaseClient() >>> trains = client.get_trains() >>> stations = client.get_stations() """ _instance: Optional['SupabaseClient'] = None def __new__(cls): """싱글톤 패턴""" if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): """초기화""" if self._initialized: return self.base_url = SUPABASE_URL self.anon_key = SUPABASE_ANON_KEY self.headers = { "apikey": self.anon_key, "Authorization": f"Bearer {self.anon_key}", "Content-Type": "application/json", "Prefer": "return=representation" } self._initialized = True logger.info(f"Supabase 클라이언트 초기화: {self.base_url}") def _request( self, method: str, endpoint: str, params: Dict[str, Any] = None, data: Dict[str, Any] = None, timeout: int = 30 ) -> Optional[List[Dict[str, Any]]]: """ Supabase REST API 요청 Args: method: HTTP 메서드 (GET, POST, PATCH, DELETE) endpoint: API 엔드포인트 (테이블명) params: 쿼리 파라미터 data: 요청 바디 timeout: 타임아웃 (초) Returns: 응답 데이터 리스트 (실패 시 None) """ url = f"{self.base_url}/rest/v1/{endpoint}" try: logger.debug(f"Supabase 요청: {method} {url}") logger.debug(f" Headers: apikey=***{self.anon_key[-10:]}") logger.debug(f" Params: {params}") response = requests.request( method=method, url=url, headers=self.headers, params=params, json=data, timeout=timeout ) logger.debug(f" 응답 상태: {response.status_code}") if response.status_code == 200: result = response.json() logger.debug(f" 응답 데이터 수: {len(result) if isinstance(result, list) else 'N/A'}") return result else: logger.error(f"Supabase 요청 실패: {response.status_code} - {response.text[:200]}") return None except requests.exceptions.Timeout: logger.error(f"Supabase 요청 타임아웃: {endpoint}") return None except requests.exceptions.ConnectionError as e: logger.error(f"Supabase 연결 실패: {endpoint} - {e}") return None except Exception as e: logger.error(f"Supabase 요청 오류: {e}") return None # ======================================================================== # 편성 정보 (Trains) # ======================================================================== def get_trains(self) -> List[SupabaseTrainInfo]: """ Supabase에서 편성 정보 조회 Returns: 편성 정보 리스트 """ data = self._request("GET", "Trains", params={"order": "train_set.asc"}) if not data: return [] trains = [] for row in data: train = SupabaseTrainInfo( id=row.get("id"), train_set=row.get("train_set"), train_id=row.get("train_id"), car_num=row.get("car_num"), car_type=row.get("car_type"), manufacturer=row.get("manufacturer"), is_new=row.get("is_new"), date_of_Commercial_Service=row.get("date_of_Commercial_Service"), date_of_Introduction=row.get("date_of_Introduction"), introduction_stage=row.get("introduction_stage"), service_nickname=row.get("service_nickname") ) trains.append(train) logger.info(f"Supabase에서 {len(trains)}개 편성 정보 조회 완료") return trains # ======================================================================== # 역 정보 (Stations) # ======================================================================== def get_stations(self, line_number: int = None) -> List[SupabaseStationInfo]: """ Supabase에서 역 정보 조회 Args: line_number: 호선 (None이면 전체) Returns: 역 정보 리스트 """ params = {"order": "station_id.asc"} if line_number: params["line_number"] = f"eq.{line_number}" data = self._request("GET", "Stations", params=params) if not data: return [] stations = [] for row in data: station = SupabaseStationInfo( id=row.get("id"), line_number=row.get("line_number"), station_id=row.get("station_id"), station_name=row.get("station_name"), station_map=row.get("station_map"), is_underground=row.get("is_underground"), is_island=row.get("is_island"), is_exchange=row.get("is_exchange"), is_end=row.get("is_end"), has_siding_track=row.get("has_siding_track"), has_signal_room=row.get("has_signal_room") ) stations.append(station) logger.info(f"Supabase에서 {len(stations)}개 역 정보 조회 완료") return stations # ======================================================================== # 제조사 정보 (Manufacturer) # ======================================================================== def get_manufacturers(self) -> List[ManufacturerInfo]: """ Supabase에서 제조사 정보 조회 Returns: 제조사 정보 리스트 """ data = self._request("GET", "Manufacturer", params={"order": "id.asc"}) if not data: return [] manufacturers = [] for row in data: mfr = ManufacturerInfo( id=row.get("id"), manufact=row.get("manufact") ) manufacturers.append(mfr) logger.info(f"Supabase에서 {len(manufacturers)}개 제조사 정보 조회 완료") return manufacturers # ======================================================================== # 고장 코드 정보 (Fault_Code_Table) # ======================================================================== def get_fault_codes(self, car_type: str = None, manufacturer: str = None) -> List[FaultCodeInfo]: """ Supabase에서 고장 코드 조회 Args: car_type: 차량 타입 필터 manufacturer: 제조사 필터 Returns: 고장 코드 리스트 """ params = {"order": "f_code_num.asc"} if car_type: params["car_type"] = f"eq.{car_type}" if manufacturer: params["manufacturer"] = f"eq.{manufacturer}" data = self._request("GET", "Fault_Code_Table", params=params) if not data: return [] codes = [] for row in data: code = FaultCodeInfo( id=row.get("id"), f_code=row.get("f_code"), f_code_num=row.get("f_code_num"), f_name=row.get("f_name"), car_type=row.get("car_type"), f_class=row.get("f_class"), fault_name=row.get("fault_name"), grade=row.get("grade"), device=row.get("device"), fault_detail=row.get("fault_detail"), fault_reaction=row.get("fault_reaction"), fault_action=row.get("fault_action"), alias_name=row.get("alias_name"), manufacturer=row.get("manufacturer") ) codes.append(code) logger.info(f"Supabase에서 {len(codes)}개 고장 코드 조회 완료") return codes # ======================================================================== # 연결 테스트 # ======================================================================== def test_connection(self) -> bool: """ Supabase 연결 테스트 Returns: 연결 성공 여부 """ try: # 간단한 쿼리로 연결 테스트 data = self._request("GET", "Manufacturer", params={"limit": "1"}) if data is not None: logger.info("Supabase 연결 테스트 성공") return True return False except Exception as e: logger.error(f"Supabase 연결 테스트 실패: {e}") return False # ============================================================================ # 동기화 매니저 # ============================================================================ class SupabaseSyncManager: """ Supabase 데이터 동기화 매니저 Supabase에서 마스터 데이터를 가져와 로컬 SQLite DB에 저장합니다. """ def __init__(self): self.client = SupabaseClient() self.settings = get_settings_manager() def sync_trains(self) -> int: """ 편성 정보 동기화 Returns: 동기화된 레코드 수 """ logger.info("편성 정보 동기화 시작...") try: # Supabase에서 데이터 조회 supabase_trains = self.client.get_trains() if not supabase_trains: logger.warning("Supabase에서 편성 정보를 가져오지 못했습니다.") return 0 # 로컬 DB에 저장 count = 0 for train in supabase_trains: # 편성번호 생성 (train_set 또는 train_id 사용) train_number = str(train.train_set) if train.train_set else train.train_id if not train_number: continue # TrainInfo 객체 생성 train_info = TrainInfo( train_number=train_number, train_type="B" if train.is_new else "A", # 신형=B, 구형=A manufacturer=train.manufacturer or "", manufacture_year=0, # 도입일에서 추출 가능 depot="", # Supabase에 없음 is_active=True ) # 도입일에서 년도 추출 if train.date_of_Introduction: try: year = int(train.date_of_Introduction[:4]) train_info.manufacture_year = year except (ValueError, TypeError): pass self.settings.save_train_info(train_info) count += 1 # 동기화 상태 업데이트 self.settings.update_sync_status("train_info", "synced") logger.info(f"편성 정보 동기화 완료: {count}개") return count except Exception as e: logger.error(f"편성 정보 동기화 실패: {e}") self.settings.update_sync_status("train_info", "error") return 0 def sync_stations(self, line_number: int = 1) -> int: """ 역 정보 동기화 Args: line_number: 호선 (기본값: 1호선) Returns: 동기화된 레코드 수 """ logger.info(f"{line_number}호선 역 정보 동기화 시작...") try: # Supabase에서 데이터 조회 supabase_stations = self.client.get_stations(line_number) if not supabase_stations: logger.warning("Supabase에서 역 정보를 가져오지 못했습니다.") return 0 # 로컬 DB에 저장 count = 0 for station in supabase_stations: if not station.station_name: continue # StationInfo 객체 생성 station_info = StationInfo( station_code=str(int(station.station_id)) if station.station_id else "", station_name=station.station_name, line_number=station.line_number or line_number, order=int(station.station_id) if station.station_id else 0, is_active=True ) self.settings.save_station_info(station_info) count += 1 # 동기화 상태 업데이트 self.settings.update_sync_status("station_info", "synced") logger.info(f"역 정보 동기화 완료: {count}개") return count except Exception as e: logger.error(f"역 정보 동기화 실패: {e}") self.settings.update_sync_status("station_info", "error") return 0 def sync_manufacturers(self) -> int: """ 제조사 정보 동기화 Returns: 동기화된 레코드 수 """ logger.info("제조사 정보 동기화 시작...") try: # Supabase에서 데이터 조회 supabase_manufacturers = self.client.get_manufacturers() if not supabase_manufacturers: logger.warning("Supabase에서 제조사 정보를 가져오지 못했습니다.") return 0 # 로컬 DB에 저장 count = 0 for mfr in supabase_manufacturers: if not mfr.manufact: continue # ManufacturerInfo 객체 생성 manufacturer_info = ManufacturerInfo( id=mfr.id, name=mfr.manufact ) self.settings.save_manufacturer_info(manufacturer_info) count += 1 # 동기화 상태 업데이트 self.settings.update_sync_status("manufacturer_info", "synced") logger.info(f"제조사 정보 동기화 완료: {count}개") return count except Exception as e: logger.error(f"제조사 정보 동기화 실패: {e}") self.settings.update_sync_status("manufacturer_info", "error") return 0 def sync_fault_codes(self, car_type: str = None, manufacturer: str = None) -> int: """ 고장 코드 동기화 Args: car_type: 차량 타입 필터 manufacturer: 제조사 필터 Returns: 동기화된 레코드 수 """ logger.info("고장 코드 동기화 시작...") try: # Supabase에서 데이터 조회 supabase_codes = self.client.get_fault_codes(car_type, manufacturer) if not supabase_codes: logger.warning("Supabase에서 고장 코드를 가져오지 못했습니다.") return 0 # 로컬 DB에 저장 count = 0 for code in supabase_codes: if not code.f_code: continue # FaultCodeInfo 객체 생성 fault_code_info = FaultCodeInfo( f_code=code.f_code, f_code_num=code.f_code_num, f_name=code.f_name, car_type=code.car_type, f_class=code.f_class, fault_name=code.fault_name, grade=code.grade, device=code.device, fault_detail=code.fault_detail, fault_action=code.fault_action, manufacturer=code.manufacturer ) self.settings.save_fault_code_info(fault_code_info) count += 1 # 동기화 상태 업데이트 self.settings.update_sync_status("fault_code_info", "synced") logger.info(f"고장 코드 동기화 완료: {count}개") return count except Exception as e: logger.error(f"고장 코드 동기화 실패: {e}") self.settings.update_sync_status("fault_code_info", "error") return 0 def sync_all(self) -> Dict[str, int]: """ 모든 마스터 데이터 동기화 Returns: 테이블별 동기화 결과 {테이블명: 레코드수} """ results = {} # 편성 정보 results["trains"] = self.sync_trains() # 역 정보 (1호선) results["stations"] = self.sync_stations(line_number=1) # 제조사 정보 results["manufacturers"] = self.sync_manufacturers() # 고장 코드 (많으면 시간이 걸리므로 선택적으로 호출) # results["fault_codes"] = self.sync_fault_codes() return results # ============================================================================ # 모듈 레벨 함수 # ============================================================================ def get_supabase_client() -> SupabaseClient: """Supabase 클라이언트 인스턴스 반환""" return SupabaseClient() def sync_master_data() -> Dict[str, int]: """마스터 데이터 동기화 실행""" sync_manager = SupabaseSyncManager() return sync_manager.sync_all()