import os from typing import Optional, List, Dict import httpx from flask import Flask, g, render_template, request, abort, redirect, url_for, session, flash from urllib.parse import urlencode from dotenv import load_dotenv from flask_cors import CORS from functools import wraps APP_NAME = "1호선 고장코드" def create_app() -> Flask: app = Flask(__name__) app.config.update(TEMPLATES_AUTO_RELOAD=True) # 세션 설정 app.secret_key = os.environ.get("SECRET_KEY", "dev-secret-key-change-in-production") app.config["SESSION_COOKIE_HTTPONLY"] = True app.config["SESSION_COOKIE_SAMESITE"] = "Lax" # CORS 설정 (개발 환경: 모든 도메인 허용) CORS(app, origins=["*"], supports_credentials=True) # 환경변수 로드 및 Supabase 기본값 설정 load_dotenv() # 기본: Kong 프록시(8000) 또는 사용자가 지정한 URL app.config.setdefault("SUPABASE_URL", os.environ.get("SUPABASE_URL", "http://localhost:8000")) app.config.setdefault("SUPABASE_ANON_KEY", os.environ.get("SUPABASE_ANON_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoiYW5vbiIsImlzcyI6InN1cGFiYXNlIiwiaWF0IjoxNzU4NTUxNjY2LCJleHAiOjQxMDI0NDQ4MDB9.jMCGL3Q-N2o_l7JQE_HrO7Uoct86CMgLsVxpabisG4I")) # Kong Basic Auth(선택) app.config.setdefault("SUPABASE_BASIC_USER", os.environ.get("SUPABASE_BASIC_USER", "")) app.config.setdefault("SUPABASE_BASIC_PASSWORD", os.environ.get("SUPABASE_BASIC_PASSWORD", "")) # 더 이상 SQLite 초기화/연결을 사용하지 않음 (Supabase만 사용) # PostgREST(REST) 클라이언트 빌더 def build_pg_client() -> httpx.Client: base = app.config["SUPABASE_URL"].rstrip("/") + "/rest/v1" basic_user = app.config.get("SUPABASE_BASIC_USER") or "" basic_pass = app.config.get("SUPABASE_BASIC_PASSWORD") or "" headers = { "apikey": app.config.get("SUPABASE_ANON_KEY", ""), "Accept-Profile": "public", "Content-Profile": "public", } auth = httpx.BasicAuth(basic_user, basic_pass) if basic_user else None if not auth and app.config.get("SUPABASE_ANON_KEY"): headers["Authorization"] = f"Bearer {app.config['SUPABASE_ANON_KEY']}" return httpx.Client(base_url=base, headers=headers, auth=auth, timeout=10) def pg_unique(col: str) -> List[str]: with build_pg_client() as c: r = c.get("/Fault_Code_Table", params={"select": col}) r.raise_for_status() vals = [row.get(col) for row in (r.json() or []) if row.get(col)] seen: Dict[str, bool] = {} out: List[str] = [] for v in vals: if v not in seen: seen[v] = True out.append(v) return out def pg_unique_from(table: str, col: str) -> List[str]: """지정 테이블에서 고유값 리스트를 반환한다.""" with build_pg_client() as c: r = c.get(f"/{table}", params={"select": col}) r.raise_for_status() vals = [row.get(col) for row in (r.json() or []) if row.get(col)] seen: Dict[str, bool] = {} out: List[str] = [] for v in vals: if v not in seen: seen[v] = True out.append(v) return out # 인증 헬퍼 함수 def login_required(f): """로그인이 필요한 라우트를 위한 데코레이터""" @wraps(f) def decorated_function(*args, **kwargs): if "user_id" not in session: return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function def get_current_user() -> Optional[Dict]: """현재 로그인한 사용자 정보 반환""" if "user_id" in session: return { "user_id": session.get("user_id"), "email": session.get("email"), "employee_id": session.get("employee_id"), "name": session.get("name"), "department_id": session.get("department_id"), "access_token": session.get("access_token") } return None # 인증 라우트 @app.route("/auth/login", methods=["GET", "POST"]) def login(): """로그인 페이지 및 로그인 처리""" # 이미 로그인된 경우 메인 페이지로 리다이렉트 if get_current_user(): return redirect(url_for('index')) if request.method == "POST": from auth import AuthManager employee_id = request.form.get("employee_id", "").strip() password = request.form.get("password", "").strip() if not employee_id or not password: return render_template( "login.html", app_name=APP_NAME, error="사번과 비밀번호를 모두 입력해주세요.", employee_id=employee_id ) auth_manager = AuthManager( app.config["SUPABASE_URL"], app.config["SUPABASE_ANON_KEY"] ) success, message, session_data = auth_manager.login_user_by_employee_id(employee_id, password) if success and session_data: # 세션에 사용자 정보 저장 session["user_id"] = session_data["user_id"] session["auth_id"] = session_data["auth_id"] session["email"] = session_data["email"] session["employee_id"] = session_data["employee_id"] session["name"] = session_data["name"] session["department_id"] = session_data["department_id"] session["access_token"] = session_data["access_token"] session.permanent = True # 로그인 성공 시 메인 페이지로 리다이렉트 return redirect(url_for('index')) else: return render_template( "login.html", app_name=APP_NAME, error=message, employee_id=employee_id ) # GET 요청: 로그인 페이지 표시 success_msg = request.args.get("success", "") return render_template( "login.html", app_name=APP_NAME, success=success_msg ) @app.route("/auth/signup", methods=["GET", "POST"]) def signup(): """회원가입 페이지 및 회원가입 처리""" # 이미 로그인된 경우 메인 페이지로 리다이렉트 if get_current_user(): return redirect(url_for('index')) from auth import AuthManager auth_manager = AuthManager( app.config["SUPABASE_URL"], app.config["SUPABASE_ANON_KEY"] ) # 부서 목록 조회 departments = auth_manager.get_departments() if request.method == "POST": email = request.form.get("email", "").strip() password = request.form.get("password", "").strip() password_confirm = request.form.get("password_confirm", "").strip() employee_id = request.form.get("employee_id", "").strip() name = request.form.get("name", "").strip() department_id = request.form.get("department_id", "").strip() # 입력값 검증 if not all([email, password, password_confirm, employee_id, name, department_id]): return render_template( "signup.html", app_name=APP_NAME, error="모든 필드를 입력해주세요.", departments=departments, email=email, employee_id=employee_id, name=name, department_id=department_id ) # 비밀번호 확인 if password != password_confirm: return render_template( "signup.html", app_name=APP_NAME, error="비밀번호가 일치하지 않습니다.", departments=departments, email=email, employee_id=employee_id, name=name, department_id=department_id ) # 비밀번호 길이 검증 if len(password) < 8: return render_template( "signup.html", app_name=APP_NAME, error="비밀번호는 최소 8자 이상이어야 합니다.", departments=departments, email=email, employee_id=employee_id, name=name, department_id=department_id ) # 회원가입 처리 success, message, _ = auth_manager.signup_user( email=email, password=password, employee_id=employee_id, name=name, department_id=int(department_id) ) if success: # 회원가입 성공 시 로그인 페이지로 리다이렉트 return render_template( "login.html", app_name=APP_NAME, success=message + " 로그인해주세요." ) else: return render_template( "signup.html", app_name=APP_NAME, error=message, departments=departments, email=email, employee_id=employee_id, name=name, department_id=department_id ) # GET 요청: 회원가입 페이지 표시 return render_template( "signup.html", app_name=APP_NAME, departments=departments ) @app.route("/auth/logout") def logout(): """로그아웃 처리""" from auth import AuthManager # 세션에서 access_token 가져오기 access_token = session.get("access_token") if access_token: auth_manager = AuthManager( app.config["SUPABASE_URL"], app.config["SUPABASE_ANON_KEY"] ) auth_manager.logout_user(access_token) # 세션 클리어 session.clear() # 로그인 페이지로 리다이렉트 return redirect(url_for('login')) @app.route("/auth/forgot-password", methods=["GET", "POST"]) def forgot_password(): """비밀번호 찾기 페이지 및 처리""" if request.method == "POST": from auth import AuthManager import secrets from datetime import datetime, timedelta employee_id = request.form.get("employee_id", "").strip() if not employee_id: return render_template( "forgot_password.html", app_name=APP_NAME, error="사번을 입력해주세요.", employee_id=employee_id ) auth_manager = AuthManager( app.config["SUPABASE_URL"], app.config["SUPABASE_ANON_KEY"] ) # 사번으로 사용자 찾기 user = auth_manager.get_user_by_employee_id(employee_id) if not user: # 보안상 사용자가 없어도 성공 메시지 표시 return render_template( "forgot_password.html", app_name=APP_NAME, success="등록된 이메일로 비밀번호 재설정 링크를 전송했습니다." ) # 재설정 토큰 생성 (실제 환경에서는 이메일로 전송) reset_token = secrets.token_urlsafe(32) session[f"reset_token_{reset_token}"] = { "employee_id": employee_id, "email": user["email"], "expires": (datetime.now() + timedelta(hours=1)).isoformat() } # TODO: 실제 환경에서는 이메일로 링크 전송 # 개발 환경에서는 직접 재설정 페이지로 리다이렉트 return redirect(url_for('reset_password', token=reset_token)) # GET 요청: 비밀번호 찾기 페이지 표시 return render_template( "forgot_password.html", app_name=APP_NAME ) @app.route("/auth/reset-password", methods=["GET", "POST"]) def reset_password(): """비밀번호 재설정 페이지 및 처리""" token = request.args.get("token") or request.form.get("token") if not token: return redirect(url_for('login')) # 토큰 검증 token_data = session.get(f"reset_token_{token}") if not token_data: return render_template( "reset_password.html", app_name=APP_NAME, token=token, error="유효하지 않거나 만료된 링크입니다." ) # 만료 시간 확인 from datetime import datetime expires = datetime.fromisoformat(token_data["expires"]) if datetime.now() > expires: session.pop(f"reset_token_{token}", None) return render_template( "reset_password.html", app_name=APP_NAME, token=token, error="링크가 만료되었습니다. 다시 요청해주세요." ) if request.method == "POST": from auth import AuthManager password = request.form.get("password", "").strip() password_confirm = request.form.get("password_confirm", "").strip() if not password or not password_confirm: return render_template( "reset_password.html", app_name=APP_NAME, token=token, error="모든 필드를 입력해주세요." ) if password != password_confirm: return render_template( "reset_password.html", app_name=APP_NAME, token=token, error="비밀번호가 일치하지 않습니다." ) if len(password) < 8: return render_template( "reset_password.html", app_name=APP_NAME, token=token, error="비밀번호는 최소 8자 이상이어야 합니다." ) auth_manager = AuthManager( app.config["SUPABASE_URL"], app.config["SUPABASE_ANON_KEY"] ) # 비밀번호 재설정 success, message = auth_manager.reset_password( token_data["email"], password ) if success: # 토큰 삭제 session.pop(f"reset_token_{token}", None) # 로그인 페이지로 리다이렉트 (성공 메시지 포함) return redirect(url_for('login') + '?success=' + message) else: return render_template( "reset_password.html", app_name=APP_NAME, token=token, error=message ) # GET 요청: 비밀번호 재설정 페이지 표시 return render_template( "reset_password.html", app_name=APP_NAME, token=token ) @app.route("/api/departments") def api_departments(): """부서 목록 조회 API (회원가입용)""" try: from auth import AuthManager auth_manager = AuthManager( app.config["SUPABASE_URL"], app.config["SUPABASE_ANON_KEY"] ) departments = auth_manager.get_departments() return { "success": True, "departments": departments } except Exception as e: return { "success": False, "error": str(e) }, 500 # ============= 이메일 코드 인증 API ============= @app.route("/api/email/send-code", methods=["POST"]) def send_email_code(): """이메일 인증 코드 발송""" try: import random import string from datetime import datetime, timedelta data = request.json email = data.get("email", "").strip().lower() code_type = data.get("type", "signup") # signup, password_reset # 이메일 도메인 검증 if not email.endswith("@humetro.busan.kr"): return { "success": False, "error": "humetro.busan.kr 도메인의 이메일만 사용 가능합니다." }, 400 # 6자리 숫자 코드 생성 code = ''.join(random.choices(string.digits, k=6)) # 세션에 코드 저장 (5분간 유효) expiry = datetime.now() + timedelta(minutes=5) session[f"email_code_{email}_{code_type}"] = { "code": code, "expiry": expiry.timestamp(), "attempts": 0 } # TODO: 실제 이메일 발송 (현재는 로그에만 출력) print(f"\n=== 이메일 인증 코드 ===") print(f"To: {email}") print(f"Code: {code}") print(f"Type: {code_type}") print(f"Expiry: {expiry}") print(f"=======================\n") # 개발 환경에서는 코드를 응답에 포함 (프로덕션에서는 제거) return { "success": True, "message": "인증 코드를 이메일로 전송했습니다.", "debug_code": code if app.debug else None # 개발 환경에서만 } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/api/email/verify-code", methods=["POST"]) def verify_email_code(): """이메일 인증 코드 검증""" try: from datetime import datetime data = request.json email = data.get("email", "").strip().lower() code = data.get("code", "").strip() code_type = data.get("type", "signup") # 세션에서 코드 확인 session_key = f"email_code_{email}_{code_type}" stored_data = session.get(session_key) if not stored_data: return { "success": False, "error": "인증 코드가 존재하지 않거나 만료되었습니다." }, 400 # 시도 횟수 체크 (5회 제한) if stored_data["attempts"] >= 5: session.pop(session_key, None) return { "success": False, "error": "인증 시도 횟수를 초과했습니다. 코드를 재전송해주세요." }, 400 # 만료 시간 체크 if datetime.now().timestamp() > stored_data["expiry"]: session.pop(session_key, None) return { "success": False, "error": "인증 코드가 만료되었습니다. 코드를 재전송해주세요." }, 400 # 코드 검증 if stored_data["code"] != code: stored_data["attempts"] += 1 session[session_key] = stored_data return { "success": False, "error": f"인증 코드가 올바르지 않습니다. (남은 시도: {5 - stored_data['attempts']}회)" }, 400 # 인증 성공 - 세션에 인증 완료 표시 session[f"email_verified_{email}_{code_type}"] = { "verified_at": datetime.now().timestamp() } # 사용된 코드 삭제 session.pop(session_key, None) return { "success": True, "message": "인증이 완료되었습니다." } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/api/auth/reset-password", methods=["POST"]) def reset_password_api(): """비밀번호 재설정 (이메일 인증 후)""" try: from datetime import datetime data = request.json email = data.get("email", "").strip().lower() new_password = data.get("password", "").strip() # 이메일 인증 확인 (세션에서) verification_key = f"email_verified_{email}_password_reset" verified_data = session.get(verification_key) if not verified_data: return { "success": False, "error": "이메일 인증이 필요합니다." }, 400 # 인증이 10분 이내에 이루어졌는지 확인 verified_at = verified_data.get("verified_at", 0) if datetime.now().timestamp() - verified_at > 600: # 10분 session.pop(verification_key, None) return { "success": False, "error": "인증 시간이 만료되었습니다. 다시 시도해주세요." }, 400 # Supabase에서 사용자 조회 with build_pg_client() as c: r = c.get( "/users", params={ "email": f"eq.{email}", "limit": "1" } ) r.raise_for_status() users = r.json() or [] if not users: return { "success": False, "error": "사용자를 찾을 수 없습니다." }, 404 user_id = users[0]["id"] # Supabase Auth에서 비밀번호 업데이트 # TODO: Supabase Auth API를 통한 비밀번호 업데이트 구현 # 현재는 간단히 성공 응답 # 인증 세션 삭제 session.pop(verification_key, None) return { "success": True, "message": "비밀번호가 성공적으로 변경되었습니다." } except Exception as e: return { "success": False, "error": str(e) }, 500 # ============= 생체인증 API ============= @app.route("/api/biometric/register-challenge", methods=["POST"]) def biometric_register_challenge(): """생체인증 등록을 위한 challenge 생성""" try: import secrets data = request.json employee_id = data.get("employeeId") # Challenge 생성 (32바이트 랜덤) challenge = secrets.token_urlsafe(32) # Challenge를 세션에 저장 (10분간 유효) session[f"biometric_challenge_{employee_id}"] = { "challenge": challenge, "timestamp": datetime.now().timestamp() } return { "success": True, "challenge": challenge, "userId": employee_id } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/api/biometric/register", methods=["POST"]) def biometric_register(): """생체인증 credential 등록""" try: data = request.json employee_id = data.get("employeeId") credential = data.get("credential") # Credential을 DB에 저장 (Supabase) with build_pg_client() as c: # biometric_credentials 테이블에 저장 r = c.post( "/biometric_credentials", json={ "employee_id": employee_id, "credential_id": credential["id"], "credential_data": credential, "created_at": datetime.now().isoformat() } ) r.raise_for_status() # 세션에서 challenge 삭제 session.pop(f"biometric_challenge_{employee_id}", None) return { "success": True, "message": "생체인증이 등록되었습니다." } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/api/biometric/login-challenge", methods=["POST"]) def biometric_login_challenge(): """생체인증 로그인을 위한 challenge 생성""" try: import secrets data = request.json employee_id = data.get("employeeId") # Challenge 생성 challenge = secrets.token_urlsafe(32) # Challenge를 세션에 저장 session[f"biometric_login_challenge_{employee_id}"] = { "challenge": challenge, "timestamp": datetime.now().timestamp() } return { "success": True, "challenge": challenge } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/api/biometric/login", methods=["POST"]) def biometric_login(): """생체인증으로 로그인""" try: data = request.json employee_id = data.get("employeeId") assertion = data.get("assertion") # DB에서 credential 조회 with build_pg_client() as c: r = c.get( "/biometric_credentials", params={ "employee_id": f"eq.{employee_id}", "limit": "1" } ) r.raise_for_status() credentials = r.json() or [] if not credentials: raise ValueError("등록된 생체인증 정보가 없습니다.") # TODO: Assertion 검증 (실제로는 암호화 검증 필요) # 여기서는 간단히 credential_id만 확인 stored_credential = credentials[0] if stored_credential["credential_id"] != assertion["id"]: raise ValueError("인증 실패") # users 테이블에서 사용자 정보 조회 with build_pg_client() as c: r = c.get( "/users", params={ "employee_id": f"eq.{employee_id}", "limit": "1" } ) r.raise_for_status() users = r.json() or [] if not users: raise ValueError("사용자를 찾을 수 없습니다.") user_data = users[0] # 세션에 사용자 정보 저장 session["user_id"] = user_data["id"] session["email"] = user_data["email"] session["employee_id"] = user_data["employee_id"] session["name"] = user_data["name"] session["department_id"] = user_data["department_id"] session.permanent = True # Challenge 삭제 session.pop(f"biometric_login_challenge_{employee_id}", None) return { "success": True, "user": { "id": user_data["id"], "email": user_data["email"], "employee_id": user_data["employee_id"], "name": user_data["name"], "department_id": user_data["department_id"] } } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/api/biometric/unregister", methods=["POST"]) def biometric_unregister(): """생체인증 해제""" try: data = request.json employee_id = data.get("employeeId") # DB에서 credential 삭제 with build_pg_client() as c: r = c.delete( "/biometric_credentials", params={"employee_id": f"eq.{employee_id}"} ) r.raise_for_status() return { "success": True, "message": "생체인증이 해제되었습니다." } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/") @login_required def index(): user = get_current_user() return render_template( "index.html", app_name=APP_NAME, user=user ) # 기존 SQLite 기반 라우트 제거됨 @app.route("/modal/close") def modal_close(): return "" @app.route("/health") def health(): return {"status": "ok"} # 빈 파비콘 응답으로 404 제거 @app.route("/favicon.ico") def favicon(): return ("", 204, {"Content-Type": "image/x-icon"}) # 브라우저/툴 호환을 위해 루트 경로에도 매니페스트 노출 (정적 파일 경로 사용 권장) # 더 이상 /api/* 동기화 엔드포인트 제공하지 않음 # ----------------- Supabase 기반 라우트 ----------------- @app.route("/sb/tabs") def sb_tabs(): try: with build_pg_client() as c: r1 = c.get("/Fault_Code_Table", params={"select": "manufacturer"}) r1.raise_for_status() vals1 = [row.get("manufacturer") for row in (r1.json() or []) if row.get("manufacturer")] vals2: List[str] = [] for path in ("/Signals", "/signals"): try: r2 = c.get(path, params={"select": "manufacturer"}) r2.raise_for_status() vals2 = [row.get("manufacturer") for row in (r2.json() or []) if row.get("manufacturer")] break except httpx.HTTPError: continue vals = vals1 + vals2 seen: Dict[str, bool] = {} manufacturers: List[str] = [] for v in vals: if v not in seen: seen[v] = True manufacturers.append(v) return render_template("partials/sb_tabs.html", manufacturers=manufacturers) except Exception as e: return render_template( "partials/sb_error.html", error_message=str(e), supabase_url=app.config.get("SUPABASE_URL"), ) @app.route("/api/manufacturers") def api_manufacturers(): """제조사 목록을 JSON으로 반환 (TWA 앱용)""" try: with build_pg_client() as c: # Fault_Code_Table에서 제조사 r1 = c.get("/Fault_Code_Table", params={"select": "manufacturer"}) r1.raise_for_status() vals1 = [row.get("manufacturer") for row in (r1.json() or []) if row.get("manufacturer")] # Signals 테이블에서 제조사 vals2 = [] for path in ("/Signals", "/signals"): try: r2 = c.get(path, params={"select": "manufacturer"}) r2.raise_for_status() vals2 = [row.get("manufacturer") for row in (r2.json() or []) if row.get("manufacturer")] break except httpx.HTTPError: continue # MMI_Code 테이블에서 제조사 vals3 = [] try: r3 = c.get("/MMI_Code", params={"select": "manufacturer"}) r3.raise_for_status() vals3 = [row.get("manufacturer") for row in (r3.json() or []) if row.get("manufacturer")] except httpx.HTTPError: pass # 모든 제조사 목록 합치기 및 중복 제거 all_manufacturers = vals1 + vals2 + vals3 seen = set() unique_manufacturers = [] for v in all_manufacturers: if v and v not in seen: seen.add(v) unique_manufacturers.append(v) # 정렬 unique_manufacturers.sort() return { "success": True, "data": { "manufacturers": unique_manufacturers } } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/api/filters/fault") def api_fault_filters(): """고장코드 필터 옵션을 JSON으로 반환 (TWA 앱용)""" try: selected_manufacturer = request.args.get("manufacturer", "").strip() # devices, car_types, car_ids, alias_names 가져오기 devices = pg_unique("device") car_types = pg_unique("car_type") car_ids = pg_unique("car_id") # alias_name 필터는 제작사에 따라 값 제한 all_alias = pg_unique("alias_name") def alias_allowed(name: str) -> bool: if not name: return False low = name.lower() if selected_manufacturer.lower() == "woojin": return ("우진" in name) or ("woojin" in low) if selected_manufacturer.lower() == "rotem": return ("로템" in name) or ("다대" in name) or ("rotem" in low) return True alias_names = [a for a in all_alias if alias_allowed(a)] return { "success": True, "data": { "devices": sorted(devices), "car_types": sorted(car_types), "car_ids": sorted(car_ids), "alias_names": sorted(alias_names) } } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/api/filters/signal") def api_signal_filters(): """TCMS 신호 필터 옵션을 JSON으로 반환 (TWA 앱용)""" try: selected_manufacturer = request.args.get("manufacturer", "").strip() # alias_names, classifications 가져오기 all_alias = pg_unique_from("Signals", "alias_name") def alias_allowed(name: str) -> bool: if not name: return False low = name.lower() if selected_manufacturer.lower() == "woojin": return ("우진" in name) or ("woojin" in low) if selected_manufacturer.lower() == "rotem": return ("로템" in name) or ("다대" in name) or ("rotem" in low) return True alias_names = [a for a in all_alias if alias_allowed(a)] classifications = pg_unique_from("Signals", "classification") return { "success": True, "data": { "alias_names": sorted(alias_names), "classifications": sorted(classifications) } } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/api/filters/mmi") def api_mmi_filters(): """MMI 코드 필터 옵션을 JSON으로 반환 (TWA 앱용)""" try: selected_manufacturer = request.args.get("manufacturer", "").strip() with build_pg_client() as c: # alias_names 가져오기 res = c.get("/MMI_Code", params={"select": "alias_name"}) res.raise_for_status() all_alias = sorted({row.get("alias_name") for row in (res.json() or []) if row.get("alias_name")}) # 제조사별 필터링 def alias_allowed(name: str) -> bool: if not name: return False low = name.lower() if selected_manufacturer.lower() == "woojin": return ("우진" in name) or ("woojin" in low) if selected_manufacturer.lower() == "rotem": return ("로템" in name) or ("다대" in name) or ("rotem" in low) return True alias_names = [a for a in all_alias if alias_allowed(a)] return { "success": True, "data": { "alias_names": alias_names } } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/sb") def sb_home(): try: # ---- MMI 코드 분기 ---- if request.args.get("section", "fault").strip() == "mmicode": PAGE_SIZE = 50 page = int(request.args.get("page", "0")) offset = page * PAGE_SIZE # 셀렉트박스/검색 필드 with build_pg_client() as c: # 제조사 res = c.get("/MMI_Code", params={"select": "manufacturer"}) res.raise_for_status() manufacturers = sorted({row.get("manufacturer") for row in (res.json() or []) if row.get("manufacturer")}) # 차량분류(alias_name) res2 = c.get("/MMI_Code", params={"select": "alias_name"}) res2.raise_for_status() alias_names = sorted({row.get("alias_name") for row in (res2.json() or []) if row.get("alias_name")}) selected_manufacturer = request.args.get("manufacturer", "").strip() selected_alias_name = request.args.get("alias_name", "").strip() q = request.args.get("q", "").strip() group_code = request.args.get("group_code", "").strip().lower() == "on" # MMI 쿼리 params = { "select": "id,code_name,code_description,data_type,car_id,alias_name,manufacturer", "order": "code_name.asc", "limit": str(PAGE_SIZE), "offset": str(offset), } if selected_manufacturer: params["manufacturer"] = f"eq.{selected_manufacturer}" if selected_alias_name: params["alias_name"] = f"eq.{selected_alias_name}" if q: # 여러 컬럼에 대해 ilike filter params["or"] = f"(code_name.ilike.*{q}*,code_description.ilike.*{q}*,alias_name.ilike.*{q}*)" with build_pg_client() as c: res = c.get("/MMI_Code", params=params) res.raise_for_status() rows = res.json() or [] # dedup if group_code on if group_code: seen = set() dedup = [] for r in rows: code = r.get("code_name") if code and code not in seen: seen.add(code) dedup.append(r) rows = dedup query_params = request.args.to_dict() if "page" in query_params: del query_params["page"] query_params_string = urlencode(query_params) # 템플릿 렌더 및 선택 파라미터 전달 return render_template( "partials/sb_mmi_list.html", rows=rows, manufacturers=manufacturers, alias_names=alias_names, selected_manufacturer=selected_manufacturer, selected_alias_name=selected_alias_name, q=q, group_code="on" if group_code else "off", section="mmicode", page=page, page_size=PAGE_SIZE, query_params_string=query_params_string, ) # ---- 기존(고장코드/TCMS) ---- mf_fault = pg_unique("manufacturer") mf_signal = pg_unique_from("Signals", "manufacturer") seen_mf: Dict[str, bool] = {} manufacturers: List[str] = [] for v in (mf_fault + mf_signal): if v and v not in seen_mf: seen_mf[v] = True manufacturers.append(v) devices = pg_unique("device") car_types = pg_unique("car_type") car_ids = pg_unique("car_id") # alias_name 필터는 제작사에 따라 값 제한: 우진/로템/다대 포함 all_alias = pg_unique("alias_name") selected_manufacturer = request.args.get("manufacturer", "").strip() def alias_allowed(name: str) -> bool: if not name: return False low = name.lower() if selected_manufacturer.lower() == "woojin": return ("우진" in name) or ("woojin" in low) if selected_manufacturer.lower() == "rotem": return ("로템" in name) or ("다대" in name) or ("rotem" in low) return True alias_names = [a for a in all_alias if alias_allowed(a)] signal_classifications = pg_unique_from("Signals", "classification") # 선택값 처리 (쿼리스트링에 manufacturer 키가 있으면 빈 문자열이라도 그대로 유지) if "manufacturer" in request.args: selected_manufacturer = (request.args.get("manufacturer") or "").strip() else: selected_manufacturer = manufacturers[0] if manufacturers else "" # 다른 필터의 현재 선택값 유지 selected_device = request.args.get("device", "").strip() selected_car_type = request.args.get("car_type", "").strip() selected_alias_name = request.args.get("alias_name", "").strip() selected_classification = request.args.get("classification", "").strip() q = request.args.get("q", "").strip() section = request.args.get("section", "fault").strip() or "fault" return render_template( "partials/sb_manufacturer.html", manufacturers=manufacturers, devices=devices, car_types=car_types, car_ids=car_ids, alias_names=alias_names, signal_classifications=signal_classifications, selected_manufacturer=selected_manufacturer, selected_device=selected_device, selected_car_type=selected_car_type, selected_alias_name=selected_alias_name, selected_classification=selected_classification, q=q, section=section, ) except Exception as e: return render_template( "partials/sb_error.html", error_message=str(e), supabase_url=app.config.get("SUPABASE_URL"), ) @app.route("/api/mmi/list") def api_mmi_list(): """MMI 코드 목록을 JSON으로 반환""" try: PAGE_SIZE = 50 page = int(request.args.get("page", "0")) offset = page * PAGE_SIZE manufacturer = request.args.get("manufacturer", "").strip() alias_name = request.args.get("alias_name", "").strip() q = request.args.get("q", "").strip() params: Dict[str, str] = { "select": "id,code_name,code_description,data_type,car_id,alias_name,manufacturer", "order": "code_name.asc", "limit": str(PAGE_SIZE), "offset": str(offset), } if manufacturer: params["manufacturer"] = f"eq.{manufacturer}" if alias_name: params["alias_name"] = f"eq.{alias_name}" if q: params["or"] = f"(code_name.ilike.*{q}*,code_description.ilike.*{q}*)" with build_pg_client() as c: r = c.get("/MMI_Code", params=params) r.raise_for_status() rows = r.json() or [] return { "success": True, "data": { "rows": rows, "total": len(rows), "page": page, "page_size": PAGE_SIZE, "has_next": len(rows) == PAGE_SIZE, "has_prev": page > 0 } } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/api/faults/list") def api_faults_list(): """고장코드 목록을 JSON으로 반환 (새로운 API)""" try: PAGE_SIZE = 50 page = int(request.args.get("page", "0")) offset = page * PAGE_SIZE manufacturer = request.args.get("manufacturer", "").strip() device = request.args.get("device", "").strip() car_type = request.args.get("car_type", "").strip() car_id = request.args.get("car_id", "").strip() alias_name = request.args.get("alias_name", "").strip() q = request.args.get("q", "").strip() params: Dict[str, str] = { "select": "f_code,f_code_num,f_name,manufacturer,device,car_type,car_id,fault_detail,alias_name", "order": "f_code.asc", "limit": str(PAGE_SIZE), "offset": str(offset), } if manufacturer: params["manufacturer"] = f"eq.{manufacturer}" if device: params["device"] = f"eq.{device}" if car_type: params["car_type"] = f"eq.{car_type}" if car_id: params["car_id"] = f"eq.{car_id}" if alias_name: params["alias_name"] = f"eq.{alias_name}" if q: params["or"] = f"(f_code.ilike.*{q}*,f_name.ilike.*{q}*)" with build_pg_client() as c: r = c.get("/Fault_Code_Table", params=params) r.raise_for_status() rows = r.json() or [] # 그룹핑 옵션: 같은 f_code를 첫 항목만 남김 group_code = request.args.get("group_code", "").strip().lower() if group_code == "on": seen: Dict[str, bool] = {} dedup: List[Dict] = [] for row in rows: code = row.get("f_code") if code and code not in seen: seen[code] = True dedup.append(row) rows = dedup return { "success": True, "data": { "rows": rows, "total": len(rows), "page": page, "page_size": PAGE_SIZE, "has_next": len(rows) == PAGE_SIZE, "has_prev": page > 0 } } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/sb/faults/list") def sb_faults_list(): """기존 HTML 템플릿 반환 (호환성 유지)""" try: PAGE_SIZE = 50 page = int(request.args.get("page", "0")) offset = page * PAGE_SIZE manufacturer = request.args.get("manufacturer", "").strip() device = request.args.get("device", "").strip() car_type = request.args.get("car_type", "").strip() car_id = request.args.get("car_id", "").strip() alias_name = request.args.get("alias_name", "").strip() q = request.args.get("q", "").strip() section = request.args.get("section", "fault").strip() params: Dict[str, str] = { "select": "f_code,f_code_num,f_name,manufacturer,device,car_type,car_id,fault_detail,alias_name", "order": "f_code.asc", "limit": str(PAGE_SIZE), "offset": str(offset), } if manufacturer: params["manufacturer"] = f"eq.{manufacturer}" if device: params["device"] = f"eq.{device}" if car_type: params["car_type"] = f"eq.{car_type}" if car_id: params["car_id"] = f"eq.{car_id}" if alias_name: params["alias_name"] = f"eq.{alias_name}" if q: params["or"] = f"(f_code.ilike.*{q}*,f_name.ilike.*{q}*)" with build_pg_client() as c: r = c.get("/Fault_Code_Table", params=params) r.raise_for_status() rows = r.json() or [] # 그룹핑 옵션: 같은 f_code를 첫 항목만 남김 group_code = request.args.get("group_code", "").strip().lower() if group_code == "on": seen: Dict[str, bool] = {} dedup: List[Dict] = [] for r in rows: code = r.get("f_code") if code and code not in seen: seen[code] = True dedup.append(r) rows = dedup query_params = request.args.to_dict() if "page" in query_params: del query_params["page"] query_params_string = urlencode(query_params) return render_template( "partials/sb_fault_list.html", rows=rows, page=page, page_size=PAGE_SIZE, query_params_string=query_params_string, ) except Exception as e: return render_template( "partials/sb_error.html", error_message=str(e), supabase_url=app.config.get("SUPABASE_URL"), ) @app.route("/api/faults/") def api_fault_detail(f_code: str): """고장코드 상세 정보를 JSON으로 반환 (새로운 API)""" try: params = { "select": "f_code,f_code_num,f_name,car_type,f_class,grade,device,fault_detail,fault_reaction,fault_detection,fault_clear,fault_action,fault_schematics,car_id,manufacturer,alias_name", "f_code": f"eq.{f_code}", "limit": "1", } with build_pg_client() as c: r = c.get("/Fault_Code_Table", params=params) r.raise_for_status() rows = r.json() or [] if not rows: return { "success": False, "error": "고장코드를 찾을 수 없습니다." }, 404 row = rows[0] return { "success": True, "data": row } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/sb/faults/") def sb_fault_detail(f_code: str): """기존 HTML 템플릿 반환 (호환성 유지)""" try: params = { "select": "f_code,f_code_num,f_name,car_type,f_class,grade,device,fault_detail,fault_reaction,fault_detection,fault_clear,fault_action,fault_schematics,car_id,manufacturer", "f_code": f"eq.{f_code}", "limit": "1", } with build_pg_client() as c: r = c.get("/Fault_Code_Table", params=params) r.raise_for_status() rows = r.json() or [] if not rows: abort(404) row = rows[0] return render_template("partials/sb_fault_detail.html", row=row) except Exception as e: return render_template( "partials/sb_error.html", error_message=str(e), supabase_url=app.config.get("SUPABASE_URL"), ) # ----------------- Signals (TCMS) ----------------- @app.route("/api/signals/list") def api_signals_list(): """TCMS 신호 목록을 JSON으로 반환 (새로운 API)""" try: PAGE_SIZE = 50 page = int(request.args.get("page", "0")) offset = page * PAGE_SIZE manufacturer = request.args.get("manufacturer", "").strip() classification = request.args.get("classification", "").strip() q = request.args.get("q", "").strip() alias_name = request.args.get("alias_name", "").strip() params: Dict[str, str] = { "select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name", "order": "sig_num.asc", "limit": str(PAGE_SIZE), "offset": str(offset), } if manufacturer: params["manufacturer"] = f"eq.{manufacturer}" if classification: params["classification"] = f"eq.{classification}" if alias_name: params["alias_name"] = f"eq.{alias_name}" if q: params["or"] = f"(signal_abbreviation.ilike.*{q}*,signal_description.ilike.*{q}*)" with build_pg_client() as c: rows = [] last_error: Exception | None = None for path in ("/Signals", "/signals"): try: r = c.get(path, params=params) r.raise_for_status() rows = r.json() or [] last_error = None break except Exception as e: last_error = e rows = [] continue if last_error and not rows: raise last_error # TCMS도 group_code=on이면 sig_num/dedup(또는 id?) group_code = request.args.get("group_code", "").strip().lower() if group_code == "on": seen: Dict[str, bool] = {} dedup: List[Dict] = [] for row in rows: code = row.get("id") if code and code not in seen: seen[code] = True dedup.append(row) rows = dedup return { "success": True, "data": { "rows": rows, "total": len(rows), "page": page, "page_size": PAGE_SIZE, "has_next": len(rows) == PAGE_SIZE, "has_prev": page > 0 } } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/sb/signals/list") def sb_signals_list(): """기존 HTML 템플릿 반환 (호환성 유지)""" try: PAGE_SIZE = 50 page = int(request.args.get("page", "0")) offset = page * PAGE_SIZE manufacturer = request.args.get("manufacturer", "").strip() classification = request.args.get("classification", "").strip() q = request.args.get("q", "").strip() alias_name = request.args.get("alias_name", "").strip() params: Dict[str, str] = { "select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name", "order": "sig_num.asc", "limit": str(PAGE_SIZE), "offset": str(offset), } if manufacturer: params["manufacturer"] = f"eq.{manufacturer}" if classification: params["classification"] = f"eq.{classification}" if alias_name: params["alias_name"] = f"eq.{alias_name}" if q: params["or"] = f"(signal_abbreviation.ilike.*{q}*,signal_description.ilike.*{q}*)" with build_pg_client() as c: rows = [] last_error: Exception | None = None for path in ("/Signals", "/signals"): try: r = c.get(path, params=params) r.raise_for_status() rows = r.json() or [] last_error = None break except Exception as e: last_error = e rows = [] continue if last_error and not rows: raise last_error # TCMS도 group_code=on이면 sig_num/dedup(또는 id?) group_code = request.args.get("group_code", "").strip().lower() if group_code == "on": seen: Dict[str, bool] = {} dedup: List[Dict] = [] for r in rows: code = r.get("id") if code and code not in seen: seen[code] = True dedup.append(r) rows = dedup query_params = request.args.to_dict() if "page" in query_params: del query_params["page"] query_params_string = urlencode(query_params) return render_template( "partials/sb_signal_list.html", rows=rows, page=page, page_size=PAGE_SIZE, query_params_string=query_params_string, ) except Exception as e: return render_template( "partials/sb_error.html", error_message=str(e), supabase_url=app.config.get("SUPABASE_URL"), ) @app.route("/api/signals/") def api_signal_detail(item_id: str): """TCMS 신호 상세 정보를 JSON으로 반환 (새로운 API)""" try: params = { "select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name,original_data,created_at,updated_at", "id": f"eq.{item_id}", "limit": "1", } with build_pg_client() as c: rows = [] last_error: Exception | None = None for path in ("/Signals", "/signals"): try: r = c.get(path, params=params) r.raise_for_status() rows = r.json() or [] last_error = None break except Exception as e: last_error = e rows = [] continue if last_error and not rows: raise last_error if not rows: return { "success": False, "error": "TCMS 신호를 찾을 수 없습니다." }, 404 row = rows[0] return { "success": True, "data": row } except Exception as e: return { "success": False, "error": str(e) }, 500 @app.route("/sb/signals/") def sb_signal_detail(item_id: str): """기존 HTML 템플릿 반환 (호환성 유지)""" try: params = { "select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name,original_data,created_at,updated_at", "id": f"eq.{item_id}", "limit": "1", } with build_pg_client() as c: rows = [] last_error: Exception | None = None for path in ("/Signals", "/signals"): try: r = c.get(path, params=params) r.raise_for_status() rows = r.json() or [] last_error = None break except Exception as e: last_error = e rows = [] continue if last_error and not rows: raise last_error if not rows: abort(404) row = rows[0] return render_template("partials/sb_signal_detail.html", row=row) except Exception as e: return render_template( "partials/sb_error.html", error_message=str(e), supabase_url=app.config.get("SUPABASE_URL"), ) @app.route("/sb/health") def sb_health(): try: with build_pg_client() as c: r = c.get("/Fault_Code_Table", params={"select": "f_code", "limit": "1"}) r.raise_for_status() return {"sb": "ok", "url": app.config.get("SUPABASE_URL")} except Exception as e: return {"sb": "error", "url": app.config.get("SUPABASE_URL"), "error": str(e)} # 간단한 Signals 테스트 엔드포인트: 상위 5개 레코드를 JSON으로 반환 @app.route("/sb/signals/test") def sb_signals_test(): try: params = { "select": "uuid,sig_num,signal_abbreviation,manufacturer,classification,alias_name", "order": "sig_num.asc", "limit": "5", } with build_pg_client() as c: rows = [] used_path = None last_error = None for path in ("/Signals", "/signals"): try: r = c.get(path, params=params) r.raise_for_status() rows = r.json() or [] used_path = path last_error = None break except Exception as e: last_error = str(e) rows = [] continue if last_error and not rows: return {"ok": False, "url": app.config.get("SUPABASE_URL"), "tried": ["/Signals", "/signals"], "error": last_error}, 500 return {"ok": True, "url": app.config.get("SUPABASE_URL"), "path": used_path, "rows": rows} except Exception as e: return {"ok": False, "url": app.config.get("SUPABASE_URL"), "error": str(e)}, 500 # 상세 디버그: 두 경로 각각의 상태/본문을 그대로 반환 @app.route("/sb/signals/debug") def sb_signals_debug(): try: out = [] with build_pg_client() as c: for path in ("/Signals", "/signals"): try: r = c.get(path, params={"select": "*", "limit": "5"}) ct = r.headers.get("content-type", "") body = None try: body = r.json() except Exception: body = r.text out.append({ "path": path, "status": r.status_code, "content_type": ct, "body": body, }) except httpx.HTTPError as e: out.append({"path": path, "error": str(e)}) return {"ok": True, "url": app.config.get("SUPABASE_URL"), "results": out} except Exception as e: return {"ok": False, "url": app.config.get("SUPABASE_URL"), "error": str(e)}, 500 return app app = create_app() if __name__ == "__main__": port = int(os.environ.get("PORT", "5000")) app.run(host="0.0.0.0", port=port, debug=True)