import os from typing import Optional, List, Dict import httpx import json from flask import Flask, g, request, abort, session, jsonify from urllib.parse import urlencode from dotenv import load_dotenv from flask_cors import CORS from functools import wraps from pywebpush import webpush, WebPushException from flask_sqlalchemy import SQLAlchemy from datetime import datetime APP_NAME = "1호선 고장코드" def create_app() -> Flask: app = Flask(__name__) app.config.update(TEMPLATES_AUTO_RELOAD=True) # 세션 설정 app.secret_key = os.environ.get("SECRET_KEY", "dev-secret-key-change-in-production") app.config["SESSION_COOKIE_HTTPONLY"] = True app.config["SESSION_COOKIE_SAMESITE"] = "Lax" # SQLAlchemy 설정 app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{os.path.abspath('push_subscriptions.db')}" app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False db = SQLAlchemy(app) # 푸시 구독 모델 class PushSubscription(db.Model): __tablename__ = "push_subscriptions" id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.String(255), unique=True, nullable=False) endpoint = db.Column(db.String(1000), nullable=False) p256dh = db.Column(db.String(500), nullable=False) auth = db.Column(db.String(500), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def to_dict(self): return { "endpoint": self.endpoint, "keys": { "p256dh": self.p256dh, "auth": self.auth } } # DB 테이블 생성 with app.app_context(): db.create_all() # CORS 설정 (Vue PWA에서 접근 허용) # 프로덕션: 특정 도메인만 허용 allowed_origins = [ "https://humetrain.me", "http://localhost:5173", # 개발 서버 "http://localhost:8080", # 로컬 nginx ] if app.debug: allowed_origins.append("*") # 개발 모드에서는 모든 도메인 허용 CORS(app, origins=allowed_origins if not app.debug else ["*"], supports_credentials=True, allow_headers=["Content-Type", "Authorization"], methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"]) # 환경변수 로드 및 Supabase 기본값 설정 load_dotenv() app.config.setdefault("SUPABASE_URL", os.environ.get("SUPABASE_URL", "http://localhost:8000")) app.config.setdefault("SUPABASE_ANON_KEY", os.environ.get("SUPABASE_ANON_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoiYW5vbiIsImlzcyI6InN1cGFiYXNlIiwiaWF0IjoxNzU4NTUxNjY2LCJleHAiOjQxMDI0NDQ4MDB9.jMCGL3Q-N2o_l7JQE_HrO7Uoct86CMgLsVxpabisG4I")) app.config.setdefault("SUPABASE_BASIC_USER", os.environ.get("SUPABASE_BASIC_USER", "")) app.config.setdefault("SUPABASE_BASIC_PASSWORD", os.environ.get("SUPABASE_BASIC_PASSWORD", "")) # PostgREST(REST) 클라이언트 빌더 def build_pg_client() -> httpx.Client: base = app.config["SUPABASE_URL"].rstrip("/") + "/rest/v1" basic_user = app.config.get("SUPABASE_BASIC_USER") or "" basic_pass = app.config.get("SUPABASE_BASIC_PASSWORD") or "" headers = { "apikey": app.config.get("SUPABASE_ANON_KEY", ""), "Accept-Profile": "public", "Content-Profile": "public", } auth = httpx.BasicAuth(basic_user, basic_pass) if basic_user else None if not auth and app.config.get("SUPABASE_ANON_KEY"): headers["Authorization"] = f"Bearer {app.config['SUPABASE_ANON_KEY']}" return httpx.Client(base_url=base, headers=headers, auth=auth, timeout=10) def pg_unique(col: str) -> List[str]: with build_pg_client() as c: r = c.get("/Fault_Code_Table", params={"select": col}) r.raise_for_status() vals = [row.get(col) for row in (r.json() or []) if row.get(col)] seen: Dict[str, bool] = {} out: List[str] = [] for v in vals: if v not in seen: seen[v] = True out.append(v) return out def pg_unique_from(table: str, col: str) -> List[str]: """지정 테이블에서 고유값 리스트를 반환한다.""" with build_pg_client() as c: r = c.get(f"/{table}", params={"select": col}) r.raise_for_status() vals = [row.get(col) for row in (r.json() or []) if row.get(col)] seen: Dict[str, bool] = {} out: List[str] = [] for v in vals: if v not in seen: seen[v] = True out.append(v) return out # 인증 헬퍼 함수 def login_required(f): """로그인이 필요한 라우트를 위한 데코레이터""" @wraps(f) def decorated_function(*args, **kwargs): if "user_id" not in session: return jsonify({"success": False, "error": "Authentication required"}), 401 return f(*args, **kwargs) return decorated_function def get_current_user() -> Optional[Dict]: """현재 로그인한 사용자 정보 반환""" if "user_id" in session: return { "user_id": session.get("user_id"), "email": session.get("email"), "employee_id": session.get("employee_id"), "name": session.get("name"), "department_id": session.get("department_id"), "access_token": session.get("access_token") } return None # ============= 인증 API ============= @app.route("/api/auth/login", methods=["POST"]) def api_login(): """로그인 API""" try: from auth import AuthManager data = request.json employee_id = data.get("employee_id", "").strip() password = data.get("password", "").strip() if not employee_id or not password: return jsonify({ "success": False, "error": "사번과 비밀번호를 모두 입력해주세요." }), 400 auth_manager = AuthManager( app.config["SUPABASE_URL"], app.config["SUPABASE_ANON_KEY"] ) success, message, session_data = auth_manager.login_user_by_employee_id(employee_id, password) if success and session_data: # 세션에 사용자 정보 저장 session["user_id"] = session_data["user_id"] session["auth_id"] = session_data["auth_id"] session["email"] = session_data["email"] session["employee_id"] = session_data["employee_id"] session["name"] = session_data["name"] session["department_id"] = session_data["department_id"] session["access_token"] = session_data["access_token"] session.permanent = True return jsonify({ "success": True, "user": { "user_id": session_data["user_id"], "email": session_data["email"], "employee_id": session_data["employee_id"], "name": session_data["name"], "department_id": session_data["department_id"] } }) else: return jsonify({ "success": False, "error": message }), 401 except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/auth/signup", methods=["POST"]) def api_signup(): """회원가입 API""" try: from auth import AuthManager data = request.json email = data.get("email", "").strip() password = data.get("password", "").strip() password_confirm = data.get("password_confirm", "").strip() employee_id = data.get("employee_id", "").strip() name = data.get("name", "").strip() department_id = data.get("department_id", "").strip() # 입력값 검증 if not all([email, password, password_confirm, employee_id, name, department_id]): return jsonify({ "success": False, "error": "모든 필드를 입력해주세요." }), 400 # 비밀번호 확인 if password != password_confirm: return jsonify({ "success": False, "error": "비밀번호가 일치하지 않습니다." }), 400 # 비밀번호 길이 검증 if len(password) < 8: return jsonify({ "success": False, "error": "비밀번호는 최소 8자 이상이어야 합니다." }), 400 auth_manager = AuthManager( app.config["SUPABASE_URL"], app.config["SUPABASE_ANON_KEY"] ) # 회원가입 처리 success, message, _ = auth_manager.signup_user( email=email, password=password, employee_id=employee_id, name=name, department_id=int(department_id) ) if success: return jsonify({ "success": True, "message": message }) else: return jsonify({ "success": False, "error": message }), 400 except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/auth/logout", methods=["POST"]) def api_logout(): """로그아웃 API""" try: from auth import AuthManager # 세션에서 access_token 가져오기 access_token = session.get("access_token") if access_token: auth_manager = AuthManager( app.config["SUPABASE_URL"], app.config["SUPABASE_ANON_KEY"] ) auth_manager.logout_user(access_token) # 세션 클리어 session.clear() return jsonify({ "success": True, "message": "로그아웃되었습니다." }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/auth/me") @login_required def api_current_user(): """현재 로그인한 사용자 정보 조회 API""" user = get_current_user() return jsonify({ "success": True, "user": user }) @app.route("/api/departments") def api_departments(): """부서 목록 조회 API""" try: from auth import AuthManager auth_manager = AuthManager( app.config["SUPABASE_URL"], app.config["SUPABASE_ANON_KEY"] ) departments = auth_manager.get_departments() return jsonify({ "success": True, "departments": departments }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 # ============= 이메일 코드 인증 API ============= @app.route("/api/email/send-code", methods=["POST"]) def send_email_code(): """이메일 인증 코드 발송""" try: import random import string from datetime import datetime, timedelta data = request.json email = data.get("email", "").strip().lower() code_type = data.get("type", "signup") # signup, password_reset # 이메일 도메인 검증 if not email.endswith("@humetro.busan.kr"): return jsonify({ "success": False, "error": "humetro.busan.kr 도메인의 이메일만 사용 가능합니다." }), 400 # 6자리 숫자 코드 생성 code = ''.join(random.choices(string.digits, k=6)) # 세션에 코드 저장 (5분간 유효) expiry = datetime.now() + timedelta(minutes=5) session[f"email_code_{email}_{code_type}"] = { "code": code, "expiry": expiry.timestamp(), "attempts": 0 } # TODO: 실제 이메일 발송 print(f"\n=== 이메일 인증 코드 ===") print(f"To: {email}") print(f"Code: {code}") print(f"Type: {code_type}") print(f"Expiry: {expiry}") print(f"=======================\n") return jsonify({ "success": True, "message": "인증 코드를 이메일로 전송했습니다.", "debug_code": code if app.debug else None }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/email/verify-code", methods=["POST"]) def verify_email_code(): """이메일 인증 코드 검증""" try: from datetime import datetime data = request.json email = data.get("email", "").strip().lower() code = data.get("code", "").strip() code_type = data.get("type", "signup") # 세션에서 코드 확인 session_key = f"email_code_{email}_{code_type}" stored_data = session.get(session_key) if not stored_data: return jsonify({ "success": False, "error": "인증 코드가 존재하지 않거나 만료되었습니다." }), 400 # 시도 횟수 체크 if stored_data["attempts"] >= 5: session.pop(session_key, None) return jsonify({ "success": False, "error": "인증 시도 횟수를 초과했습니다." }), 400 # 만료 시간 체크 if datetime.now().timestamp() > stored_data["expiry"]: session.pop(session_key, None) return jsonify({ "success": False, "error": "인증 코드가 만료되었습니다." }), 400 # 코드 검증 if stored_data["code"] != code: stored_data["attempts"] += 1 session[session_key] = stored_data return jsonify({ "success": False, "error": f"인증 코드가 올바르지 않습니다. (남은 시도: {5 - stored_data['attempts']}회)" }), 400 # 인증 성공 session[f"email_verified_{email}_{code_type}"] = { "verified_at": datetime.now().timestamp() } session.pop(session_key, None) return jsonify({ "success": True, "message": "인증이 완료되었습니다." }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 # ============= 생체인증 API ============= @app.route("/api/biometric/register-challenge", methods=["POST"]) def biometric_register_challenge(): """생체인증 등록을 위한 challenge 생성""" try: import secrets from datetime import datetime data = request.json employee_id = data.get("employeeId") challenge = secrets.token_urlsafe(32) session[f"biometric_challenge_{employee_id}"] = { "challenge": challenge, "timestamp": datetime.now().timestamp() } return jsonify({ "success": True, "challenge": challenge, "userId": employee_id }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/biometric/register", methods=["POST"]) def biometric_register(): """생체인증 credential 등록""" try: from datetime import datetime data = request.json employee_id = data.get("employeeId") credential = data.get("credential") with build_pg_client() as c: r = c.post( "/biometric_credentials", json={ "employee_id": employee_id, "credential_id": credential["id"], "credential_data": credential, "created_at": datetime.now().isoformat() } ) r.raise_for_status() session.pop(f"biometric_challenge_{employee_id}", None) return jsonify({ "success": True, "message": "생체인증이 등록되었습니다." }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/biometric/login-challenge", methods=["POST"]) def biometric_login_challenge(): """생체인증 로그인을 위한 challenge 생성""" try: import secrets from datetime import datetime data = request.json employee_id = data.get("employeeId") challenge = secrets.token_urlsafe(32) session[f"biometric_login_challenge_{employee_id}"] = { "challenge": challenge, "timestamp": datetime.now().timestamp() } return jsonify({ "success": True, "challenge": challenge }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/biometric/login", methods=["POST"]) def biometric_login(): """생체인증으로 로그인""" try: data = request.json employee_id = data.get("employeeId") assertion = data.get("assertion") with build_pg_client() as c: r = c.get( "/biometric_credentials", params={ "employee_id": f"eq.{employee_id}", "limit": "1" } ) r.raise_for_status() credentials = r.json() or [] if not credentials: return jsonify({ "success": False, "error": "등록된 생체인증 정보가 없습니다." }), 404 stored_credential = credentials[0] if stored_credential["credential_id"] != assertion["id"]: return jsonify({ "success": False, "error": "인증 실패" }), 401 # users 테이블에서 사용자 정보 조회 with build_pg_client() as c: r = c.get( "/users", params={ "employee_id": f"eq.{employee_id}", "limit": "1" } ) r.raise_for_status() users = r.json() or [] if not users: return jsonify({ "success": False, "error": "사용자를 찾을 수 없습니다." }), 404 user_data = users[0] # 세션에 사용자 정보 저장 session["user_id"] = user_data["id"] session["email"] = user_data["email"] session["employee_id"] = user_data["employee_id"] session["name"] = user_data["name"] session["department_id"] = user_data["department_id"] session.permanent = True session.pop(f"biometric_login_challenge_{employee_id}", None) return jsonify({ "success": True, "user": { "id": user_data["id"], "email": user_data["email"], "employee_id": user_data["employee_id"], "name": user_data["name"], "department_id": user_data["department_id"] } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/biometric/unregister", methods=["POST"]) def biometric_unregister(): """생체인증 해제""" try: data = request.json employee_id = data.get("employeeId") with build_pg_client() as c: r = c.delete( "/biometric_credentials", params={"employee_id": f"eq.{employee_id}"} ) r.raise_for_status() return jsonify({ "success": True, "message": "생체인증이 해제되었습니다." }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 # ============= 데이터 조회 API ============= @app.route("/health") def health(): return jsonify({"status": "ok"}) @app.route("/favicon.ico") def favicon(): return ("", 204, {"Content-Type": "image/x-icon"}) @app.route("/api/manufacturers") def api_manufacturers(): """제조사 목록을 JSON으로 반환""" try: with build_pg_client() as c: # Fault_Code_Table에서 제조사 r1 = c.get("/Fault_Code_Table", params={"select": "manufacturer"}) r1.raise_for_status() vals1 = [row.get("manufacturer") for row in (r1.json() or []) if row.get("manufacturer")] # Signals 테이블에서 제조사 vals2 = [] for path in ("/Signals", "/signals"): try: r2 = c.get(path, params={"select": "manufacturer"}) r2.raise_for_status() vals2 = [row.get("manufacturer") for row in (r2.json() or []) if row.get("manufacturer")] break except httpx.HTTPError: continue # MMI_Code 테이블에서 제조사 vals3 = [] try: r3 = c.get("/MMI_Code", params={"select": "manufacturer"}) r3.raise_for_status() vals3 = [row.get("manufacturer") for row in (r3.json() or []) if row.get("manufacturer")] except httpx.HTTPError: pass # 중복 제거 all_manufacturers = vals1 + vals2 + vals3 seen = set() unique_manufacturers = [] for v in all_manufacturers: if v and v not in seen: seen.add(v) unique_manufacturers.append(v) unique_manufacturers.sort() return jsonify({ "success": True, "data": { "manufacturers": unique_manufacturers } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/filters/fault") def api_fault_filters(): """고장코드 필터 옵션을 JSON으로 반환""" try: selected_manufacturer = request.args.get("manufacturer", "").strip() devices = pg_unique("device") car_types = pg_unique("car_type") car_ids = pg_unique("car_id") all_alias = pg_unique("alias_name") def alias_allowed(name: str) -> bool: if not name: return False low = name.lower() if selected_manufacturer.lower() == "woojin": return ("우진" in name) or ("woojin" in low) if selected_manufacturer.lower() == "rotem": return ("로템" in name) or ("다대" in name) or ("rotem" in low) return True alias_names = [a for a in all_alias if alias_allowed(a)] return jsonify({ "success": True, "data": { "devices": sorted(devices), "car_types": sorted(car_types), "car_ids": sorted(car_ids), "alias_names": sorted(alias_names) } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/filters/signal") def api_signal_filters(): """TCMS 신호 필터 옵션을 JSON으로 반환""" try: selected_manufacturer = request.args.get("manufacturer", "").strip() all_alias = pg_unique_from("Signals", "alias_name") def alias_allowed(name: str) -> bool: if not name: return False low = name.lower() if selected_manufacturer.lower() == "woojin": return ("우진" in name) or ("woojin" in low) if selected_manufacturer.lower() == "rotem": return ("로템" in name) or ("다대" in name) or ("rotem" in low) return True alias_names = [a for a in all_alias if alias_allowed(a)] classifications = pg_unique_from("Signals", "classification") return jsonify({ "success": True, "data": { "alias_names": sorted(alias_names), "classifications": sorted(classifications) } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/filters/abbreviation") def api_abbreviation_filters(): """약어 필터 옵션을 JSON으로 반환""" try: classifications = pg_unique_from("drawer_abbreviation", "classification") return jsonify({ "success": True, "data": { "classifications": sorted(classifications) } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/filters/mmi") def api_mmi_filters(): """MMI 코드 필터 옵션을 JSON으로 반환""" try: selected_manufacturer = request.args.get("manufacturer", "").strip() with build_pg_client() as c: res = c.get("/MMI_Code", params={"select": "alias_name"}) res.raise_for_status() all_alias = sorted({row.get("alias_name") for row in (res.json() or []) if row.get("alias_name")}) def alias_allowed(name: str) -> bool: if not name: return False low = name.lower() if selected_manufacturer.lower() == "woojin": return ("우진" in name) or ("woojin" in low) if selected_manufacturer.lower() == "rotem": return ("로템" in name) or ("다대" in name) or ("rotem" in low) return True alias_names = [a for a in all_alias if alias_allowed(a)] return jsonify({ "success": True, "data": { "alias_names": alias_names } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/mmi/list") def api_mmi_list(): """MMI 코드 목록을 JSON으로 반환""" try: PAGE_SIZE = 50 page = int(request.args.get("page", "0")) offset = page * PAGE_SIZE manufacturer = request.args.get("manufacturer", "").strip() alias_name = request.args.get("alias_name", "").strip() q = request.args.get("q", "").strip() params: Dict[str, str] = { "select": "id,code_name,code_description,data_type,car_id,alias_name,manufacturer", "order": "code_name.asc", "limit": str(PAGE_SIZE), "offset": str(offset), } if manufacturer: params["manufacturer"] = f"eq.{manufacturer}" if alias_name: params["alias_name"] = f"eq.{alias_name}" if q: params["or"] = f"(code_name.ilike.*{q}*,code_description.ilike.*{q}*)" with build_pg_client() as c: r = c.get("/MMI_Code", params=params) r.raise_for_status() rows = r.json() or [] return jsonify({ "success": True, "data": { "rows": rows, "total": len(rows), "page": page, "page_size": PAGE_SIZE, "has_next": len(rows) == PAGE_SIZE, "has_prev": page > 0 } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/faults/list") def api_faults_list(): """고장코드 목록을 JSON으로 반환""" try: PAGE_SIZE = 50 page = int(request.args.get("page", "0")) offset = page * PAGE_SIZE manufacturer = request.args.get("manufacturer", "").strip() device = request.args.get("device", "").strip() car_type = request.args.get("car_type", "").strip() car_id = request.args.get("car_id", "").strip() alias_name = request.args.get("alias_name", "").strip() q = request.args.get("q", "").strip() params: Dict[str, str] = { "select": "f_code,f_code_num,f_name,manufacturer,device,car_type,car_id,fault_detail,alias_name", "order": "f_code.asc", "limit": str(PAGE_SIZE), "offset": str(offset), } if manufacturer: params["manufacturer"] = f"eq.{manufacturer}" if device: params["device"] = f"eq.{device}" if car_type: params["car_type"] = f"eq.{car_type}" if car_id: params["car_id"] = f"eq.{car_id}" if alias_name: params["alias_name"] = f"eq.{alias_name}" if q: params["or"] = f"(f_code.ilike.*{q}*,f_name.ilike.*{q}*)" with build_pg_client() as c: r = c.get("/Fault_Code_Table", params=params) r.raise_for_status() rows = r.json() or [] # 그룹핑 옵션 group_code = request.args.get("group_code", "").strip().lower() if group_code == "on": seen: Dict[str, bool] = {} dedup: List[Dict] = [] for row in rows: code = row.get("f_code") if code and code not in seen: seen[code] = True dedup.append(row) rows = dedup return jsonify({ "success": True, "data": { "rows": rows, "total": len(rows), "page": page, "page_size": PAGE_SIZE, "has_next": len(rows) == PAGE_SIZE, "has_prev": page > 0 } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/faults/") def api_fault_detail(f_code: str): """고장코드 상세 정보를 JSON으로 반환""" try: params = { "select": "f_code,f_code_num,f_name,car_type,f_class,grade,device,fault_detail,fault_reaction,fault_detection,fault_clear,fault_action,fault_schematics,car_id,manufacturer,alias_name", "f_code": f"eq.{f_code}", "limit": "1", } with build_pg_client() as c: r = c.get("/Fault_Code_Table", params=params) r.raise_for_status() rows = r.json() or [] if not rows: return jsonify({ "success": False, "error": "고장코드를 찾을 수 없습니다." }), 404 row = rows[0] return jsonify({ "success": True, "data": row }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/signals/list") def api_signals_list(): """TCMS 신호 목록을 JSON으로 반환""" try: PAGE_SIZE = 50 page = int(request.args.get("page", "0")) offset = page * PAGE_SIZE manufacturer = request.args.get("manufacturer", "").strip() classification = request.args.get("classification", "").strip() q = request.args.get("q", "").strip() alias_name = request.args.get("alias_name", "").strip() params: Dict[str, str] = { "select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name", "order": "sig_num.asc", "limit": str(PAGE_SIZE), "offset": str(offset), } if manufacturer: params["manufacturer"] = f"eq.{manufacturer}" if classification: params["classification"] = f"eq.{classification}" if alias_name: params["alias_name"] = f"eq.{alias_name}" if q: params["or"] = f"(signal_abbreviation.ilike.*{q}*,signal_description.ilike.*{q}*)" with build_pg_client() as c: rows = [] last_error: Exception | None = None for path in ("/Signals", "/signals"): try: r = c.get(path, params=params) r.raise_for_status() rows = r.json() or [] last_error = None break except Exception as e: last_error = e rows = [] continue if last_error and not rows: raise last_error group_code = request.args.get("group_code", "").strip().lower() if group_code == "on": seen: Dict[str, bool] = {} dedup: List[Dict] = [] for row in rows: code = row.get("id") if code and code not in seen: seen[code] = True dedup.append(row) rows = dedup return jsonify({ "success": True, "data": { "rows": rows, "total": len(rows), "page": page, "page_size": PAGE_SIZE, "has_next": len(rows) == PAGE_SIZE, "has_prev": page > 0 } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/signals/") def api_signal_detail(item_id: str): """TCMS 신호 상세 정보를 JSON으로 반환""" try: params = { "select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name,original_data,created_at,updated_at", "id": f"eq.{item_id}", "limit": "1", } with build_pg_client() as c: rows = [] last_error: Exception | None = None for path in ("/Signals", "/signals"): try: r = c.get(path, params=params) r.raise_for_status() rows = r.json() or [] last_error = None break except Exception as e: last_error = e rows = [] continue if last_error and not rows: raise last_error if not rows: return jsonify({ "success": False, "error": "TCMS 신호를 찾을 수 없습니다." }), 404 row = rows[0] return jsonify({ "success": True, "data": row }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/abbreviations/list") def api_abbreviations_list(): """약어 목록을 JSON으로 반환""" try: PAGE_SIZE = 50 page = int(request.args.get("page", "0")) offset = page * PAGE_SIZE manufacturer = request.args.get("manufacturer", "").strip() q = request.args.get("q", "").strip() params: Dict[str, str] = { "select": "id,manufacturer,abb,classification,Related_drawings,term", "order": "abb.asc", "limit": str(PAGE_SIZE), "offset": str(offset), } if manufacturer: params["manufacturer"] = f"eq.{manufacturer}" if q: params["or"] = f"(abb.ilike.*{q}*,term.ilike.*{q}*)" with build_pg_client() as c: rows = [] last_error: Exception | None = None for path in ("/drawer_abbreviation",): try: r = c.get(path, params=params) r.raise_for_status() rows = r.json() or [] last_error = None break except Exception as e: last_error = e rows = [] continue if last_error and not rows: raise last_error return jsonify({ "success": True, "data": { "rows": rows, "total": len(rows), "page": page, "page_size": PAGE_SIZE, "has_next": len(rows) == PAGE_SIZE, "has_prev": page > 0 } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/abbreviations") def api_abbreviations(): """약어 목록 조회 (페이지네이션 포함)""" try: PAGE_SIZE = 50 page = int(request.args.get("page", "0")) offset = page * PAGE_SIZE manufacturer = request.args.get("manufacturer", "").strip() q = request.args.get("q", "").strip() params: Dict[str, str] = { "select": "id,manufacturer,abb,classification,Related_drawings,term", "order": "abb.asc", "limit": str(PAGE_SIZE), "offset": str(offset), } if manufacturer: params["manufacturer"] = f"eq.{manufacturer}" if q: params["or"] = f"(abb.ilike.*{q}*,term.ilike.*{q}*)" with build_pg_client() as c: rows = [] last_error: Exception | None = None for path in ("/drawer_abbreviation",): try: r = c.get(path, params=params) r.raise_for_status() rows = r.json() or [] last_error = None break except Exception as e: last_error = e rows = [] continue if last_error and not rows: raise last_error return jsonify({ "success": True, "data": { "rows": rows, "total": len(rows), "page": page, "page_size": PAGE_SIZE, "has_next": len(rows) == PAGE_SIZE, "has_prev": page > 0 } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/abbreviations/") def api_abbreviation_detail(abb_id: str): """약어 상세 정보를 JSON으로 반환""" try: params = { "select": "id,manufacturer,abb,classification,Related_drawings,term", "id": f"eq.{abb_id}", "limit": "1", } with build_pg_client() as c: rows = [] last_error: Exception | None = None for path in ("/drawer_abbreviation",): try: r = c.get(path, params=params) r.raise_for_status() rows = r.json() or [] last_error = None break except Exception as e: last_error = e rows = [] continue if last_error and not rows: raise last_error if not rows: return jsonify({ "success": False, "error": "약어를 찾을 수 없습니다." }), 404 row = rows[0] return jsonify({ "success": True, "data": row }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 # ========== Inbox Items (고장기록) API ========== @app.route("/api/filters/inbox_items") def api_inbox_items_filters(): """고장기록 필터 옵션을 JSON으로 반환""" try: with build_pg_client() as c: # 편성, 호차, 상위장치, 하위장치, 부품 필터 옵션 조회 params = { "select": "train_set,car_no,upper_device,lower_device,component" } r = c.get("/inbox_items", params=params) r.raise_for_status() rows = r.json() or [] # 각 필터별로 고유한 값 추출 train_sets = sorted({str(row.get("train_set", "")).strip() for row in rows if row.get("train_set")}) car_nos = sorted({str(row.get("car_no", "")).strip() for row in rows if row.get("car_no")}) upper_devices = sorted({str(row.get("upper_device", "")).strip() for row in rows if row.get("upper_device")}) lower_devices = sorted({str(row.get("lower_device", "")).strip() for row in rows if row.get("lower_device")}) components = sorted({str(row.get("component", "")).strip() for row in rows if row.get("component")}) return jsonify({ "success": True, "data": { "train_sets": train_sets, "car_nos": car_nos, "upper_devices": upper_devices, "lower_devices": lower_devices, "components": components } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/inbox-items/list") def api_inbox_items_list(): """고장기록 목록을 JSON으로 반환""" try: PAGE_SIZE = 50 page = int(request.args.get("page", "0")) offset = page * PAGE_SIZE # 필터 파라미터 train_set = request.args.get("train_set", "").strip() car_no = request.args.get("car_no", "").strip() upper_device = request.args.get("upper_device", "").strip() lower_device = request.args.get("lower_device", "").strip() component = request.args.get("component", "").strip() q = request.args.get("q", "").strip() # 조회할 필드 select_fields = ",".join([ "item_id", "system", "manufacturer", "occurred_date", "section_desc", "line_no", "service_no", "train_set", "car_no", "run_category", "delay_min", "severity", "severity2", "impact_category", "upper_device", "lower_device", "component", "title", "body", "action_date", "action_team", "labor_minutes", "labor_count", "action_type", "completion_flag", "approval_flag", "rams_flag", "progress_state" ]) params: Dict[str, str] = { "select": select_fields, "order": "occurred_date.desc", "limit": str(PAGE_SIZE), "offset": str(offset), } # 필터 조건 추가 if train_set: params["train_set"] = f"eq.{train_set}" if car_no: params["car_no"] = f"eq.{car_no}" if upper_device: params["upper_device"] = f"eq.{upper_device}" if lower_device: params["lower_device"] = f"eq.{lower_device}" if component: params["component"] = f"eq.{component}" if q: params["or"] = f"(title.ilike.*{q}*,body.ilike.*{q}*,component.ilike.*{q}*)" with build_pg_client() as c: r = c.get("/inbox_items", params=params) r.raise_for_status() rows = r.json() or [] return jsonify({ "success": True, "data": { "rows": rows, "total": len(rows), "page": page, "page_size": PAGE_SIZE, "has_next": len(rows) == PAGE_SIZE, "has_prev": page > 0 } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/api/inbox-items/") def api_inbox_item_detail(item_id: str): """고장기록 상세 정보를 JSON으로 반환""" try: select_fields = ",".join([ "item_id", "system", "manufacturer", "occurred_date", "section_desc", "line_no", "service_no", "train_set", "car_no", "run_category", "delay_min", "severity", "severity2", "impact_category", "upper_device", "lower_device", "component", "title", "body", "action_date", "action_team", "labor_minutes", "labor_count", "action_type", "completion_flag", "approval_flag", "rams_flag", "progress_state" ]) params = { "select": select_fields, "item_id": f"eq.{item_id}", "limit": "1", } with build_pg_client() as c: r = c.get("/inbox_items", params=params) r.raise_for_status() rows = r.json() or [] if not rows: return jsonify({ "success": False, "error": "고장기록을 찾을 수 없습니다." }), 404 row = rows[0] return jsonify({ "success": True, "data": row }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route("/.well-known/assetlinks.json") def assetlinks(): """TWA Digital Asset Links 파일 제공""" return jsonify([ { "relation": ["delegate_permission/common.handle_all_urls"], "target": { "namespace": "android_app", "package_name": "me.humetrain.tr.twa", "sha256_cert_fingerprints": [ "EC:CA:CD:61:52:36:79:14:BC:76:06:11:BF:7C:CE:7A:0D:26:7C:F7:57:FA:CD:98:BA:08:36:ED:95:BB:ED:FC" ] } } ]) @app.route("/sb/health") def sb_health(): try: with build_pg_client() as c: r = c.get("/Fault_Code_Table", params={"select": "f_code", "limit": "1"}) r.raise_for_status() return jsonify({"sb": "ok", "url": app.config.get("SUPABASE_URL")}) except Exception as e: return jsonify({"sb": "error", "url": app.config.get("SUPABASE_URL"), "error": str(e)}) # ============= 웹 푸시 알림 설정 ============= app.config['VAPID_PUBLIC_KEY'] = os.environ.get('VAPID_PUBLIC_KEY') app.config['VAPID_PRIVATE_KEY'] = os.environ.get('VAPID_PRIVATE_KEY') app.config['VAPID_SUBJECT'] = os.environ.get('VAPID_SUBJECT', 'mailto:admin@humetrain.me') @app.route("/api/push/subscribe", methods=["POST"]) def subscribe_push(): """푸시 알림 구독""" try: data = request.json subscription = data.get('subscription') user_id = session.get('user_id', 'anonymous') if not subscription or not subscription.get('endpoint'): return jsonify({'success': False, 'error': '잘못된 구독 정보'}), 400 keys = subscription.get('keys', {}) # DB에 저장 (기존이면 업데이트) existing = PushSubscription.query.filter_by(user_id=user_id).first() if existing: existing.endpoint = subscription['endpoint'] existing.p256dh = keys.get('p256dh', '') existing.auth = keys.get('auth', '') existing.updated_at = datetime.utcnow() else: new_sub = PushSubscription( user_id=user_id, endpoint=subscription['endpoint'], p256dh=keys.get('p256dh', ''), auth=keys.get('auth', '') ) db.session.add(new_sub) db.session.commit() return jsonify({ 'success': True, 'message': '푸시 알림 구독되었습니다.' }) except Exception as e: db.session.rollback() return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route("/api/push/test", methods=["POST"]) def test_push(): """테스트 푸시 알림 발송""" try: user_id = session.get('user_id', 'anonymous') # DB에서 구독 정보 조회 push_sub = PushSubscription.query.filter_by(user_id=user_id).first() if not push_sub: return jsonify({ 'success': False, 'error': '해당 사용자의 푸시 구독 정보가 없습니다.' }), 404 subscription = push_sub.to_dict() subscription['endpoint'] = push_sub.endpoint endpoint = push_sub.endpoint payload = { 'title': '1호선 고장코드 테스트 알림', 'body': '웹 푸시 알림이 정상적으로 작동합니다!', 'icon': 'https://humetrain.me/icons/icon-192.svg', 'badge': 'https://humetrain.me/icons/icon-192.svg', 'tag': 'test-notification' } # 'aud' 클레임을 위해 endpoint에서 scheme://host[:port] 추출 from urllib.parse import urlparse parsed_url = urlparse(endpoint) aud = f"{parsed_url.scheme}://{parsed_url.netloc}" webpush( subscription_info=subscription, data=json.dumps(payload), vapid_private_key=app.config['VAPID_PRIVATE_KEY'], vapid_claims={ 'sub': app.config['VAPID_SUBJECT'], 'aud': aud } ) return jsonify({ 'success': True, 'message': '테스트 푸시 알림이 발송되었습니다.' }) except WebPushException as e: return jsonify({ 'success': False, 'error': f'푸시 발송 실패: {str(e)}' }), 500 except Exception as e: return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route("/api/push/public-key", methods=["GET"]) def get_push_public_key(): """클라이언트가 사용할 VAPID 공개 키 반환""" return jsonify({ 'success': True, 'publicKey': app.config['VAPID_PUBLIC_KEY'] }) return app app = create_app() if __name__ == "__main__": port = int(os.environ.get("PORT", "5000")) app.run(host="0.0.0.0", port=port, debug=True)