Tr_Code/app_htmx.py.old

1731 lines
66 KiB
Python

import os
from typing import Optional, List, Dict
import httpx
from flask import Flask, g, render_template, request, abort, redirect, url_for, session, flash
from urllib.parse import urlencode
from dotenv import load_dotenv
from flask_cors import CORS
from functools import wraps
APP_NAME = "1호선 고장코드"
def create_app() -> Flask:
app = Flask(__name__)
app.config.update(TEMPLATES_AUTO_RELOAD=True)
# 세션 설정
app.secret_key = os.environ.get("SECRET_KEY", "dev-secret-key-change-in-production")
app.config["SESSION_COOKIE_HTTPONLY"] = True
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
# CORS 설정 (개발 환경: 모든 도메인 허용)
CORS(app, origins=["*"], supports_credentials=True)
# 환경변수 로드 및 Supabase 기본값 설정
load_dotenv()
# 기본: Kong 프록시(8000) 또는 사용자가 지정한 URL
app.config.setdefault("SUPABASE_URL", os.environ.get("SUPABASE_URL", "http://localhost:8000"))
app.config.setdefault("SUPABASE_ANON_KEY", os.environ.get("SUPABASE_ANON_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoiYW5vbiIsImlzcyI6InN1cGFiYXNlIiwiaWF0IjoxNzU4NTUxNjY2LCJleHAiOjQxMDI0NDQ4MDB9.jMCGL3Q-N2o_l7JQE_HrO7Uoct86CMgLsVxpabisG4I"))
# Kong Basic Auth(선택)
app.config.setdefault("SUPABASE_BASIC_USER", os.environ.get("SUPABASE_BASIC_USER", ""))
app.config.setdefault("SUPABASE_BASIC_PASSWORD", os.environ.get("SUPABASE_BASIC_PASSWORD", ""))
# 더 이상 SQLite 초기화/연결을 사용하지 않음 (Supabase만 사용)
# PostgREST(REST) 클라이언트 빌더
def build_pg_client() -> httpx.Client:
base = app.config["SUPABASE_URL"].rstrip("/") + "/rest/v1"
basic_user = app.config.get("SUPABASE_BASIC_USER") or ""
basic_pass = app.config.get("SUPABASE_BASIC_PASSWORD") or ""
headers = {
"apikey": app.config.get("SUPABASE_ANON_KEY", ""),
"Accept-Profile": "public",
"Content-Profile": "public",
}
auth = httpx.BasicAuth(basic_user, basic_pass) if basic_user else None
if not auth and app.config.get("SUPABASE_ANON_KEY"):
headers["Authorization"] = f"Bearer {app.config['SUPABASE_ANON_KEY']}"
return httpx.Client(base_url=base, headers=headers, auth=auth, timeout=10)
def pg_unique(col: str) -> List[str]:
with build_pg_client() as c:
r = c.get("/Fault_Code_Table", params={"select": col})
r.raise_for_status()
vals = [row.get(col) for row in (r.json() or []) if row.get(col)]
seen: Dict[str, bool] = {}
out: List[str] = []
for v in vals:
if v not in seen:
seen[v] = True
out.append(v)
return out
def pg_unique_from(table: str, col: str) -> List[str]:
"""지정 테이블에서 고유값 리스트를 반환한다."""
with build_pg_client() as c:
r = c.get(f"/{table}", params={"select": col})
r.raise_for_status()
vals = [row.get(col) for row in (r.json() or []) if row.get(col)]
seen: Dict[str, bool] = {}
out: List[str] = []
for v in vals:
if v not in seen:
seen[v] = True
out.append(v)
return out
# 인증 헬퍼 함수
def login_required(f):
"""로그인이 필요한 라우트를 위한 데코레이터"""
@wraps(f)
def decorated_function(*args, **kwargs):
if "user_id" not in session:
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
def get_current_user() -> Optional[Dict]:
"""현재 로그인한 사용자 정보 반환"""
if "user_id" in session:
return {
"user_id": session.get("user_id"),
"email": session.get("email"),
"employee_id": session.get("employee_id"),
"name": session.get("name"),
"department_id": session.get("department_id"),
"access_token": session.get("access_token")
}
return None
# 인증 라우트
@app.route("/auth/login", methods=["GET", "POST"])
def login():
"""로그인 페이지 및 로그인 처리"""
# 이미 로그인된 경우 메인 페이지로 리다이렉트
if get_current_user():
return redirect(url_for('index'))
if request.method == "POST":
from auth import AuthManager
employee_id = request.form.get("employee_id", "").strip()
password = request.form.get("password", "").strip()
if not employee_id or not password:
return render_template(
"login.html",
app_name=APP_NAME,
error="사번과 비밀번호를 모두 입력해주세요.",
employee_id=employee_id
)
auth_manager = AuthManager(
app.config["SUPABASE_URL"],
app.config["SUPABASE_ANON_KEY"]
)
success, message, session_data = auth_manager.login_user_by_employee_id(employee_id, password)
if success and session_data:
# 세션에 사용자 정보 저장
session["user_id"] = session_data["user_id"]
session["auth_id"] = session_data["auth_id"]
session["email"] = session_data["email"]
session["employee_id"] = session_data["employee_id"]
session["name"] = session_data["name"]
session["department_id"] = session_data["department_id"]
session["access_token"] = session_data["access_token"]
session.permanent = True
# 로그인 성공 시 메인 페이지로 리다이렉트
return redirect(url_for('index'))
else:
return render_template(
"login.html",
app_name=APP_NAME,
error=message,
employee_id=employee_id
)
# GET 요청: 로그인 페이지 표시
success_msg = request.args.get("success", "")
return render_template(
"login.html",
app_name=APP_NAME,
success=success_msg
)
@app.route("/auth/signup", methods=["GET", "POST"])
def signup():
"""회원가입 페이지 및 회원가입 처리"""
# 이미 로그인된 경우 메인 페이지로 리다이렉트
if get_current_user():
return redirect(url_for('index'))
from auth import AuthManager
auth_manager = AuthManager(
app.config["SUPABASE_URL"],
app.config["SUPABASE_ANON_KEY"]
)
# 부서 목록 조회
departments = auth_manager.get_departments()
if request.method == "POST":
email = request.form.get("email", "").strip()
password = request.form.get("password", "").strip()
password_confirm = request.form.get("password_confirm", "").strip()
employee_id = request.form.get("employee_id", "").strip()
name = request.form.get("name", "").strip()
department_id = request.form.get("department_id", "").strip()
# 입력값 검증
if not all([email, password, password_confirm, employee_id, name, department_id]):
return render_template(
"signup.html",
app_name=APP_NAME,
error="모든 필드를 입력해주세요.",
departments=departments,
email=email,
employee_id=employee_id,
name=name,
department_id=department_id
)
# 비밀번호 확인
if password != password_confirm:
return render_template(
"signup.html",
app_name=APP_NAME,
error="비밀번호가 일치하지 않습니다.",
departments=departments,
email=email,
employee_id=employee_id,
name=name,
department_id=department_id
)
# 비밀번호 길이 검증
if len(password) < 8:
return render_template(
"signup.html",
app_name=APP_NAME,
error="비밀번호는 최소 8자 이상이어야 합니다.",
departments=departments,
email=email,
employee_id=employee_id,
name=name,
department_id=department_id
)
# 회원가입 처리
success, message, _ = auth_manager.signup_user(
email=email,
password=password,
employee_id=employee_id,
name=name,
department_id=int(department_id)
)
if success:
# 회원가입 성공 시 로그인 페이지로 리다이렉트
return render_template(
"login.html",
app_name=APP_NAME,
success=message + " 로그인해주세요."
)
else:
return render_template(
"signup.html",
app_name=APP_NAME,
error=message,
departments=departments,
email=email,
employee_id=employee_id,
name=name,
department_id=department_id
)
# GET 요청: 회원가입 페이지 표시
return render_template(
"signup.html",
app_name=APP_NAME,
departments=departments
)
@app.route("/auth/logout")
def logout():
"""로그아웃 처리"""
from auth import AuthManager
# 세션에서 access_token 가져오기
access_token = session.get("access_token")
if access_token:
auth_manager = AuthManager(
app.config["SUPABASE_URL"],
app.config["SUPABASE_ANON_KEY"]
)
auth_manager.logout_user(access_token)
# 세션 클리어
session.clear()
# 로그인 페이지로 리다이렉트
return redirect(url_for('login'))
@app.route("/auth/forgot-password", methods=["GET", "POST"])
def forgot_password():
"""비밀번호 찾기 페이지 및 처리"""
if request.method == "POST":
from auth import AuthManager
import secrets
from datetime import datetime, timedelta
employee_id = request.form.get("employee_id", "").strip()
if not employee_id:
return render_template(
"forgot_password.html",
app_name=APP_NAME,
error="사번을 입력해주세요.",
employee_id=employee_id
)
auth_manager = AuthManager(
app.config["SUPABASE_URL"],
app.config["SUPABASE_ANON_KEY"]
)
# 사번으로 사용자 찾기
user = auth_manager.get_user_by_employee_id(employee_id)
if not user:
# 보안상 사용자가 없어도 성공 메시지 표시
return render_template(
"forgot_password.html",
app_name=APP_NAME,
success="등록된 이메일로 비밀번호 재설정 링크를 전송했습니다."
)
# 재설정 토큰 생성 (실제 환경에서는 이메일로 전송)
reset_token = secrets.token_urlsafe(32)
session[f"reset_token_{reset_token}"] = {
"employee_id": employee_id,
"email": user["email"],
"expires": (datetime.now() + timedelta(hours=1)).isoformat()
}
# TODO: 실제 환경에서는 이메일로 링크 전송
# 개발 환경에서는 직접 재설정 페이지로 리다이렉트
return redirect(url_for('reset_password', token=reset_token))
# GET 요청: 비밀번호 찾기 페이지 표시
return render_template(
"forgot_password.html",
app_name=APP_NAME
)
@app.route("/auth/reset-password", methods=["GET", "POST"])
def reset_password():
"""비밀번호 재설정 페이지 및 처리"""
token = request.args.get("token") or request.form.get("token")
if not token:
return redirect(url_for('login'))
# 토큰 검증
token_data = session.get(f"reset_token_{token}")
if not token_data:
return render_template(
"reset_password.html",
app_name=APP_NAME,
token=token,
error="유효하지 않거나 만료된 링크입니다."
)
# 만료 시간 확인
from datetime import datetime
expires = datetime.fromisoformat(token_data["expires"])
if datetime.now() > expires:
session.pop(f"reset_token_{token}", None)
return render_template(
"reset_password.html",
app_name=APP_NAME,
token=token,
error="링크가 만료되었습니다. 다시 요청해주세요."
)
if request.method == "POST":
from auth import AuthManager
password = request.form.get("password", "").strip()
password_confirm = request.form.get("password_confirm", "").strip()
if not password or not password_confirm:
return render_template(
"reset_password.html",
app_name=APP_NAME,
token=token,
error="모든 필드를 입력해주세요."
)
if password != password_confirm:
return render_template(
"reset_password.html",
app_name=APP_NAME,
token=token,
error="비밀번호가 일치하지 않습니다."
)
if len(password) < 8:
return render_template(
"reset_password.html",
app_name=APP_NAME,
token=token,
error="비밀번호는 최소 8자 이상이어야 합니다."
)
auth_manager = AuthManager(
app.config["SUPABASE_URL"],
app.config["SUPABASE_ANON_KEY"]
)
# 비밀번호 재설정
success, message = auth_manager.reset_password(
token_data["email"],
password
)
if success:
# 토큰 삭제
session.pop(f"reset_token_{token}", None)
# 로그인 페이지로 리다이렉트 (성공 메시지 포함)
return redirect(url_for('login') + '?success=' + message)
else:
return render_template(
"reset_password.html",
app_name=APP_NAME,
token=token,
error=message
)
# GET 요청: 비밀번호 재설정 페이지 표시
return render_template(
"reset_password.html",
app_name=APP_NAME,
token=token
)
@app.route("/api/departments")
def api_departments():
"""부서 목록 조회 API (회원가입용)"""
try:
from auth import AuthManager
auth_manager = AuthManager(
app.config["SUPABASE_URL"],
app.config["SUPABASE_ANON_KEY"]
)
departments = auth_manager.get_departments()
return {
"success": True,
"departments": departments
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
# ============= 이메일 코드 인증 API =============
@app.route("/api/email/send-code", methods=["POST"])
def send_email_code():
"""이메일 인증 코드 발송"""
try:
import random
import string
from datetime import datetime, timedelta
data = request.json
email = data.get("email", "").strip().lower()
code_type = data.get("type", "signup") # signup, password_reset
# 이메일 도메인 검증
if not email.endswith("@humetro.busan.kr"):
return {
"success": False,
"error": "humetro.busan.kr 도메인의 이메일만 사용 가능합니다."
}, 400
# 6자리 숫자 코드 생성
code = ''.join(random.choices(string.digits, k=6))
# 세션에 코드 저장 (5분간 유효)
expiry = datetime.now() + timedelta(minutes=5)
session[f"email_code_{email}_{code_type}"] = {
"code": code,
"expiry": expiry.timestamp(),
"attempts": 0
}
# TODO: 실제 이메일 발송 (현재는 로그에만 출력)
print(f"\n=== 이메일 인증 코드 ===")
print(f"To: {email}")
print(f"Code: {code}")
print(f"Type: {code_type}")
print(f"Expiry: {expiry}")
print(f"=======================\n")
# 개발 환경에서는 코드를 응답에 포함 (프로덕션에서는 제거)
return {
"success": True,
"message": "인증 코드를 이메일로 전송했습니다.",
"debug_code": code if app.debug else None # 개발 환경에서만
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/api/email/verify-code", methods=["POST"])
def verify_email_code():
"""이메일 인증 코드 검증"""
try:
from datetime import datetime
data = request.json
email = data.get("email", "").strip().lower()
code = data.get("code", "").strip()
code_type = data.get("type", "signup")
# 세션에서 코드 확인
session_key = f"email_code_{email}_{code_type}"
stored_data = session.get(session_key)
if not stored_data:
return {
"success": False,
"error": "인증 코드가 존재하지 않거나 만료되었습니다."
}, 400
# 시도 횟수 체크 (5회 제한)
if stored_data["attempts"] >= 5:
session.pop(session_key, None)
return {
"success": False,
"error": "인증 시도 횟수를 초과했습니다. 코드를 재전송해주세요."
}, 400
# 만료 시간 체크
if datetime.now().timestamp() > stored_data["expiry"]:
session.pop(session_key, None)
return {
"success": False,
"error": "인증 코드가 만료되었습니다. 코드를 재전송해주세요."
}, 400
# 코드 검증
if stored_data["code"] != code:
stored_data["attempts"] += 1
session[session_key] = stored_data
return {
"success": False,
"error": f"인증 코드가 올바르지 않습니다. (남은 시도: {5 - stored_data['attempts']}회)"
}, 400
# 인증 성공 - 세션에 인증 완료 표시
session[f"email_verified_{email}_{code_type}"] = {
"verified_at": datetime.now().timestamp()
}
# 사용된 코드 삭제
session.pop(session_key, None)
return {
"success": True,
"message": "인증이 완료되었습니다."
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/api/auth/reset-password", methods=["POST"])
def reset_password_api():
"""비밀번호 재설정 (이메일 인증 후)"""
try:
from datetime import datetime
data = request.json
email = data.get("email", "").strip().lower()
new_password = data.get("password", "").strip()
# 이메일 인증 확인 (세션에서)
verification_key = f"email_verified_{email}_password_reset"
verified_data = session.get(verification_key)
if not verified_data:
return {
"success": False,
"error": "이메일 인증이 필요합니다."
}, 400
# 인증이 10분 이내에 이루어졌는지 확인
verified_at = verified_data.get("verified_at", 0)
if datetime.now().timestamp() - verified_at > 600: # 10분
session.pop(verification_key, None)
return {
"success": False,
"error": "인증 시간이 만료되었습니다. 다시 시도해주세요."
}, 400
# Supabase에서 사용자 조회
with build_pg_client() as c:
r = c.get(
"/users",
params={
"email": f"eq.{email}",
"limit": "1"
}
)
r.raise_for_status()
users = r.json() or []
if not users:
return {
"success": False,
"error": "사용자를 찾을 수 없습니다."
}, 404
user_id = users[0]["id"]
# Supabase Auth에서 비밀번호 업데이트
# TODO: Supabase Auth API를 통한 비밀번호 업데이트 구현
# 현재는 간단히 성공 응답
# 인증 세션 삭제
session.pop(verification_key, None)
return {
"success": True,
"message": "비밀번호가 성공적으로 변경되었습니다."
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
# ============= 생체인증 API =============
@app.route("/api/biometric/register-challenge", methods=["POST"])
def biometric_register_challenge():
"""생체인증 등록을 위한 challenge 생성"""
try:
import secrets
data = request.json
employee_id = data.get("employeeId")
# Challenge 생성 (32바이트 랜덤)
challenge = secrets.token_urlsafe(32)
# Challenge를 세션에 저장 (10분간 유효)
session[f"biometric_challenge_{employee_id}"] = {
"challenge": challenge,
"timestamp": datetime.now().timestamp()
}
return {
"success": True,
"challenge": challenge,
"userId": employee_id
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/api/biometric/register", methods=["POST"])
def biometric_register():
"""생체인증 credential 등록"""
try:
data = request.json
employee_id = data.get("employeeId")
credential = data.get("credential")
# Credential을 DB에 저장 (Supabase)
with build_pg_client() as c:
# biometric_credentials 테이블에 저장
r = c.post(
"/biometric_credentials",
json={
"employee_id": employee_id,
"credential_id": credential["id"],
"credential_data": credential,
"created_at": datetime.now().isoformat()
}
)
r.raise_for_status()
# 세션에서 challenge 삭제
session.pop(f"biometric_challenge_{employee_id}", None)
return {
"success": True,
"message": "생체인증이 등록되었습니다."
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/api/biometric/login-challenge", methods=["POST"])
def biometric_login_challenge():
"""생체인증 로그인을 위한 challenge 생성"""
try:
import secrets
data = request.json
employee_id = data.get("employeeId")
# Challenge 생성
challenge = secrets.token_urlsafe(32)
# Challenge를 세션에 저장
session[f"biometric_login_challenge_{employee_id}"] = {
"challenge": challenge,
"timestamp": datetime.now().timestamp()
}
return {
"success": True,
"challenge": challenge
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/api/biometric/login", methods=["POST"])
def biometric_login():
"""생체인증으로 로그인"""
try:
data = request.json
employee_id = data.get("employeeId")
assertion = data.get("assertion")
# DB에서 credential 조회
with build_pg_client() as c:
r = c.get(
"/biometric_credentials",
params={
"employee_id": f"eq.{employee_id}",
"limit": "1"
}
)
r.raise_for_status()
credentials = r.json() or []
if not credentials:
raise ValueError("등록된 생체인증 정보가 없습니다.")
# TODO: Assertion 검증 (실제로는 암호화 검증 필요)
# 여기서는 간단히 credential_id만 확인
stored_credential = credentials[0]
if stored_credential["credential_id"] != assertion["id"]:
raise ValueError("인증 실패")
# users 테이블에서 사용자 정보 조회
with build_pg_client() as c:
r = c.get(
"/users",
params={
"employee_id": f"eq.{employee_id}",
"limit": "1"
}
)
r.raise_for_status()
users = r.json() or []
if not users:
raise ValueError("사용자를 찾을 수 없습니다.")
user_data = users[0]
# 세션에 사용자 정보 저장
session["user_id"] = user_data["id"]
session["email"] = user_data["email"]
session["employee_id"] = user_data["employee_id"]
session["name"] = user_data["name"]
session["department_id"] = user_data["department_id"]
session.permanent = True
# Challenge 삭제
session.pop(f"biometric_login_challenge_{employee_id}", None)
return {
"success": True,
"user": {
"id": user_data["id"],
"email": user_data["email"],
"employee_id": user_data["employee_id"],
"name": user_data["name"],
"department_id": user_data["department_id"]
}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/api/biometric/unregister", methods=["POST"])
def biometric_unregister():
"""생체인증 해제"""
try:
data = request.json
employee_id = data.get("employeeId")
# DB에서 credential 삭제
with build_pg_client() as c:
r = c.delete(
"/biometric_credentials",
params={"employee_id": f"eq.{employee_id}"}
)
r.raise_for_status()
return {
"success": True,
"message": "생체인증이 해제되었습니다."
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/")
@login_required
def index():
user = get_current_user()
return render_template(
"index.html",
app_name=APP_NAME,
user=user
)
# 기존 SQLite 기반 라우트 제거됨
@app.route("/modal/close")
def modal_close():
return ""
@app.route("/health")
def health():
return {"status": "ok"}
# 빈 파비콘 응답으로 404 제거
@app.route("/favicon.ico")
def favicon():
return ("", 204, {"Content-Type": "image/x-icon"})
# 브라우저/툴 호환을 위해 루트 경로에도 매니페스트 노출 (정적 파일 경로 사용 권장)
# 더 이상 /api/* 동기화 엔드포인트 제공하지 않음
# ----------------- Supabase 기반 라우트 -----------------
@app.route("/sb/tabs")
def sb_tabs():
try:
with build_pg_client() as c:
r1 = c.get("/Fault_Code_Table", params={"select": "manufacturer"})
r1.raise_for_status()
vals1 = [row.get("manufacturer") for row in (r1.json() or []) if row.get("manufacturer")]
vals2: List[str] = []
for path in ("/Signals", "/signals"):
try:
r2 = c.get(path, params={"select": "manufacturer"})
r2.raise_for_status()
vals2 = [row.get("manufacturer") for row in (r2.json() or []) if row.get("manufacturer")]
break
except httpx.HTTPError:
continue
vals = vals1 + vals2
seen: Dict[str, bool] = {}
manufacturers: List[str] = []
for v in vals:
if v not in seen:
seen[v] = True
manufacturers.append(v)
return render_template("partials/sb_tabs.html", manufacturers=manufacturers)
except Exception as e:
return render_template(
"partials/sb_error.html",
error_message=str(e),
supabase_url=app.config.get("SUPABASE_URL"),
)
@app.route("/api/manufacturers")
def api_manufacturers():
"""제조사 목록을 JSON으로 반환 (TWA 앱용)"""
try:
with build_pg_client() as c:
# Fault_Code_Table에서 제조사
r1 = c.get("/Fault_Code_Table", params={"select": "manufacturer"})
r1.raise_for_status()
vals1 = [row.get("manufacturer") for row in (r1.json() or []) if row.get("manufacturer")]
# Signals 테이블에서 제조사
vals2 = []
for path in ("/Signals", "/signals"):
try:
r2 = c.get(path, params={"select": "manufacturer"})
r2.raise_for_status()
vals2 = [row.get("manufacturer") for row in (r2.json() or []) if row.get("manufacturer")]
break
except httpx.HTTPError:
continue
# MMI_Code 테이블에서 제조사
vals3 = []
try:
r3 = c.get("/MMI_Code", params={"select": "manufacturer"})
r3.raise_for_status()
vals3 = [row.get("manufacturer") for row in (r3.json() or []) if row.get("manufacturer")]
except httpx.HTTPError:
pass
# 모든 제조사 목록 합치기 및 중복 제거
all_manufacturers = vals1 + vals2 + vals3
seen = set()
unique_manufacturers = []
for v in all_manufacturers:
if v and v not in seen:
seen.add(v)
unique_manufacturers.append(v)
# 정렬
unique_manufacturers.sort()
return {
"success": True,
"data": {
"manufacturers": unique_manufacturers
}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/api/filters/fault")
def api_fault_filters():
"""고장코드 필터 옵션을 JSON으로 반환 (TWA 앱용)"""
try:
selected_manufacturer = request.args.get("manufacturer", "").strip()
# devices, car_types, car_ids, alias_names 가져오기
devices = pg_unique("device")
car_types = pg_unique("car_type")
car_ids = pg_unique("car_id")
# alias_name 필터는 제작사에 따라 값 제한
all_alias = pg_unique("alias_name")
def alias_allowed(name: str) -> bool:
if not name:
return False
low = name.lower()
if selected_manufacturer.lower() == "woojin":
return ("우진" in name) or ("woojin" in low)
if selected_manufacturer.lower() == "rotem":
return ("로템" in name) or ("다대" in name) or ("rotem" in low)
return True
alias_names = [a for a in all_alias if alias_allowed(a)]
return {
"success": True,
"data": {
"devices": sorted(devices),
"car_types": sorted(car_types),
"car_ids": sorted(car_ids),
"alias_names": sorted(alias_names)
}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/api/filters/signal")
def api_signal_filters():
"""TCMS 신호 필터 옵션을 JSON으로 반환 (TWA 앱용)"""
try:
selected_manufacturer = request.args.get("manufacturer", "").strip()
# alias_names, classifications 가져오기
all_alias = pg_unique_from("Signals", "alias_name")
def alias_allowed(name: str) -> bool:
if not name:
return False
low = name.lower()
if selected_manufacturer.lower() == "woojin":
return ("우진" in name) or ("woojin" in low)
if selected_manufacturer.lower() == "rotem":
return ("로템" in name) or ("다대" in name) or ("rotem" in low)
return True
alias_names = [a for a in all_alias if alias_allowed(a)]
classifications = pg_unique_from("Signals", "classification")
return {
"success": True,
"data": {
"alias_names": sorted(alias_names),
"classifications": sorted(classifications)
}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/api/filters/mmi")
def api_mmi_filters():
"""MMI 코드 필터 옵션을 JSON으로 반환 (TWA 앱용)"""
try:
selected_manufacturer = request.args.get("manufacturer", "").strip()
with build_pg_client() as c:
# alias_names 가져오기
res = c.get("/MMI_Code", params={"select": "alias_name"})
res.raise_for_status()
all_alias = sorted({row.get("alias_name") for row in (res.json() or []) if row.get("alias_name")})
# 제조사별 필터링
def alias_allowed(name: str) -> bool:
if not name:
return False
low = name.lower()
if selected_manufacturer.lower() == "woojin":
return ("우진" in name) or ("woojin" in low)
if selected_manufacturer.lower() == "rotem":
return ("로템" in name) or ("다대" in name) or ("rotem" in low)
return True
alias_names = [a for a in all_alias if alias_allowed(a)]
return {
"success": True,
"data": {
"alias_names": alias_names
}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/sb")
def sb_home():
try:
# ---- MMI 코드 분기 ----
if request.args.get("section", "fault").strip() == "mmicode":
PAGE_SIZE = 50
page = int(request.args.get("page", "0"))
offset = page * PAGE_SIZE
# 셀렉트박스/검색 필드
with build_pg_client() as c:
# 제조사
res = c.get("/MMI_Code", params={"select": "manufacturer"})
res.raise_for_status()
manufacturers = sorted({row.get("manufacturer") for row in (res.json() or []) if row.get("manufacturer")})
# 차량분류(alias_name)
res2 = c.get("/MMI_Code", params={"select": "alias_name"})
res2.raise_for_status()
alias_names = sorted({row.get("alias_name") for row in (res2.json() or []) if row.get("alias_name")})
selected_manufacturer = request.args.get("manufacturer", "").strip()
selected_alias_name = request.args.get("alias_name", "").strip()
q = request.args.get("q", "").strip()
group_code = request.args.get("group_code", "").strip().lower() == "on"
# MMI 쿼리
params = {
"select": "id,code_name,code_description,data_type,car_id,alias_name,manufacturer",
"order": "code_name.asc",
"limit": str(PAGE_SIZE),
"offset": str(offset),
}
if selected_manufacturer:
params["manufacturer"] = f"eq.{selected_manufacturer}"
if selected_alias_name:
params["alias_name"] = f"eq.{selected_alias_name}"
if q:
# 여러 컬럼에 대해 ilike filter
params["or"] = f"(code_name.ilike.*{q}*,code_description.ilike.*{q}*,alias_name.ilike.*{q}*)"
with build_pg_client() as c:
res = c.get("/MMI_Code", params=params)
res.raise_for_status()
rows = res.json() or []
# dedup if group_code on
if group_code:
seen = set()
dedup = []
for r in rows:
code = r.get("code_name")
if code and code not in seen:
seen.add(code)
dedup.append(r)
rows = dedup
query_params = request.args.to_dict()
if "page" in query_params:
del query_params["page"]
query_params_string = urlencode(query_params)
# 템플릿 렌더 및 선택 파라미터 전달
return render_template(
"partials/sb_mmi_list.html",
rows=rows,
manufacturers=manufacturers,
alias_names=alias_names,
selected_manufacturer=selected_manufacturer,
selected_alias_name=selected_alias_name,
q=q,
group_code="on" if group_code else "off",
section="mmicode",
page=page,
page_size=PAGE_SIZE,
query_params_string=query_params_string,
)
# ---- 기존(고장코드/TCMS) ----
mf_fault = pg_unique("manufacturer")
mf_signal = pg_unique_from("Signals", "manufacturer")
seen_mf: Dict[str, bool] = {}
manufacturers: List[str] = []
for v in (mf_fault + mf_signal):
if v and v not in seen_mf:
seen_mf[v] = True
manufacturers.append(v)
devices = pg_unique("device")
car_types = pg_unique("car_type")
car_ids = pg_unique("car_id")
# alias_name 필터는 제작사에 따라 값 제한: 우진/로템/다대 포함
all_alias = pg_unique("alias_name")
selected_manufacturer = request.args.get("manufacturer", "").strip()
def alias_allowed(name: str) -> bool:
if not name:
return False
low = name.lower()
if selected_manufacturer.lower() == "woojin":
return ("우진" in name) or ("woojin" in low)
if selected_manufacturer.lower() == "rotem":
return ("로템" in name) or ("다대" in name) or ("rotem" in low)
return True
alias_names = [a for a in all_alias if alias_allowed(a)]
signal_classifications = pg_unique_from("Signals", "classification")
# 선택값 처리 (쿼리스트링에 manufacturer 키가 있으면 빈 문자열이라도 그대로 유지)
if "manufacturer" in request.args:
selected_manufacturer = (request.args.get("manufacturer") or "").strip()
else:
selected_manufacturer = manufacturers[0] if manufacturers else ""
# 다른 필터의 현재 선택값 유지
selected_device = request.args.get("device", "").strip()
selected_car_type = request.args.get("car_type", "").strip()
selected_alias_name = request.args.get("alias_name", "").strip()
selected_classification = request.args.get("classification", "").strip()
q = request.args.get("q", "").strip()
section = request.args.get("section", "fault").strip() or "fault"
return render_template(
"partials/sb_manufacturer.html",
manufacturers=manufacturers,
devices=devices,
car_types=car_types,
car_ids=car_ids,
alias_names=alias_names,
signal_classifications=signal_classifications,
selected_manufacturer=selected_manufacturer,
selected_device=selected_device,
selected_car_type=selected_car_type,
selected_alias_name=selected_alias_name,
selected_classification=selected_classification,
q=q,
section=section,
)
except Exception as e:
return render_template(
"partials/sb_error.html",
error_message=str(e),
supabase_url=app.config.get("SUPABASE_URL"),
)
@app.route("/api/mmi/list")
def api_mmi_list():
"""MMI 코드 목록을 JSON으로 반환"""
try:
PAGE_SIZE = 50
page = int(request.args.get("page", "0"))
offset = page * PAGE_SIZE
manufacturer = request.args.get("manufacturer", "").strip()
alias_name = request.args.get("alias_name", "").strip()
q = request.args.get("q", "").strip()
params: Dict[str, str] = {
"select": "id,code_name,code_description,data_type,car_id,alias_name,manufacturer",
"order": "code_name.asc",
"limit": str(PAGE_SIZE),
"offset": str(offset),
}
if manufacturer:
params["manufacturer"] = f"eq.{manufacturer}"
if alias_name:
params["alias_name"] = f"eq.{alias_name}"
if q:
params["or"] = f"(code_name.ilike.*{q}*,code_description.ilike.*{q}*)"
with build_pg_client() as c:
r = c.get("/MMI_Code", params=params)
r.raise_for_status()
rows = r.json() or []
return {
"success": True,
"data": {
"rows": rows,
"total": len(rows),
"page": page,
"page_size": PAGE_SIZE,
"has_next": len(rows) == PAGE_SIZE,
"has_prev": page > 0
}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/api/faults/list")
def api_faults_list():
"""고장코드 목록을 JSON으로 반환 (새로운 API)"""
try:
PAGE_SIZE = 50
page = int(request.args.get("page", "0"))
offset = page * PAGE_SIZE
manufacturer = request.args.get("manufacturer", "").strip()
device = request.args.get("device", "").strip()
car_type = request.args.get("car_type", "").strip()
car_id = request.args.get("car_id", "").strip()
alias_name = request.args.get("alias_name", "").strip()
q = request.args.get("q", "").strip()
params: Dict[str, str] = {
"select": "f_code,f_code_num,f_name,manufacturer,device,car_type,car_id,fault_detail,alias_name",
"order": "f_code.asc",
"limit": str(PAGE_SIZE),
"offset": str(offset),
}
if manufacturer:
params["manufacturer"] = f"eq.{manufacturer}"
if device:
params["device"] = f"eq.{device}"
if car_type:
params["car_type"] = f"eq.{car_type}"
if car_id:
params["car_id"] = f"eq.{car_id}"
if alias_name:
params["alias_name"] = f"eq.{alias_name}"
if q:
params["or"] = f"(f_code.ilike.*{q}*,f_name.ilike.*{q}*)"
with build_pg_client() as c:
r = c.get("/Fault_Code_Table", params=params)
r.raise_for_status()
rows = r.json() or []
# 그룹핑 옵션: 같은 f_code를 첫 항목만 남김
group_code = request.args.get("group_code", "").strip().lower()
if group_code == "on":
seen: Dict[str, bool] = {}
dedup: List[Dict] = []
for row in rows:
code = row.get("f_code")
if code and code not in seen:
seen[code] = True
dedup.append(row)
rows = dedup
return {
"success": True,
"data": {
"rows": rows,
"total": len(rows),
"page": page,
"page_size": PAGE_SIZE,
"has_next": len(rows) == PAGE_SIZE,
"has_prev": page > 0
}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/sb/faults/list")
def sb_faults_list():
"""기존 HTML 템플릿 반환 (호환성 유지)"""
try:
PAGE_SIZE = 50
page = int(request.args.get("page", "0"))
offset = page * PAGE_SIZE
manufacturer = request.args.get("manufacturer", "").strip()
device = request.args.get("device", "").strip()
car_type = request.args.get("car_type", "").strip()
car_id = request.args.get("car_id", "").strip()
alias_name = request.args.get("alias_name", "").strip()
q = request.args.get("q", "").strip()
section = request.args.get("section", "fault").strip()
params: Dict[str, str] = {
"select": "f_code,f_code_num,f_name,manufacturer,device,car_type,car_id,fault_detail,alias_name",
"order": "f_code.asc",
"limit": str(PAGE_SIZE),
"offset": str(offset),
}
if manufacturer:
params["manufacturer"] = f"eq.{manufacturer}"
if device:
params["device"] = f"eq.{device}"
if car_type:
params["car_type"] = f"eq.{car_type}"
if car_id:
params["car_id"] = f"eq.{car_id}"
if alias_name:
params["alias_name"] = f"eq.{alias_name}"
if q:
params["or"] = f"(f_code.ilike.*{q}*,f_name.ilike.*{q}*)"
with build_pg_client() as c:
r = c.get("/Fault_Code_Table", params=params)
r.raise_for_status()
rows = r.json() or []
# 그룹핑 옵션: 같은 f_code를 첫 항목만 남김
group_code = request.args.get("group_code", "").strip().lower()
if group_code == "on":
seen: Dict[str, bool] = {}
dedup: List[Dict] = []
for r in rows:
code = r.get("f_code")
if code and code not in seen:
seen[code] = True
dedup.append(r)
rows = dedup
query_params = request.args.to_dict()
if "page" in query_params:
del query_params["page"]
query_params_string = urlencode(query_params)
return render_template(
"partials/sb_fault_list.html",
rows=rows,
page=page,
page_size=PAGE_SIZE,
query_params_string=query_params_string,
)
except Exception as e:
return render_template(
"partials/sb_error.html",
error_message=str(e),
supabase_url=app.config.get("SUPABASE_URL"),
)
@app.route("/api/faults/<string:f_code>")
def api_fault_detail(f_code: str):
"""고장코드 상세 정보를 JSON으로 반환 (새로운 API)"""
try:
params = {
"select": "f_code,f_code_num,f_name,car_type,f_class,grade,device,fault_detail,fault_reaction,fault_detection,fault_clear,fault_action,fault_schematics,car_id,manufacturer,alias_name",
"f_code": f"eq.{f_code}",
"limit": "1",
}
with build_pg_client() as c:
r = c.get("/Fault_Code_Table", params=params)
r.raise_for_status()
rows = r.json() or []
if not rows:
return {
"success": False,
"error": "고장코드를 찾을 수 없습니다."
}, 404
row = rows[0]
return {
"success": True,
"data": row
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/sb/faults/<string:f_code>")
def sb_fault_detail(f_code: str):
"""기존 HTML 템플릿 반환 (호환성 유지)"""
try:
params = {
"select": "f_code,f_code_num,f_name,car_type,f_class,grade,device,fault_detail,fault_reaction,fault_detection,fault_clear,fault_action,fault_schematics,car_id,manufacturer",
"f_code": f"eq.{f_code}",
"limit": "1",
}
with build_pg_client() as c:
r = c.get("/Fault_Code_Table", params=params)
r.raise_for_status()
rows = r.json() or []
if not rows:
abort(404)
row = rows[0]
return render_template("partials/sb_fault_detail.html", row=row)
except Exception as e:
return render_template(
"partials/sb_error.html",
error_message=str(e),
supabase_url=app.config.get("SUPABASE_URL"),
)
# ----------------- Signals (TCMS) -----------------
@app.route("/api/signals/list")
def api_signals_list():
"""TCMS 신호 목록을 JSON으로 반환 (새로운 API)"""
try:
PAGE_SIZE = 50
page = int(request.args.get("page", "0"))
offset = page * PAGE_SIZE
manufacturer = request.args.get("manufacturer", "").strip()
classification = request.args.get("classification", "").strip()
q = request.args.get("q", "").strip()
alias_name = request.args.get("alias_name", "").strip()
params: Dict[str, str] = {
"select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name",
"order": "sig_num.asc",
"limit": str(PAGE_SIZE),
"offset": str(offset),
}
if manufacturer:
params["manufacturer"] = f"eq.{manufacturer}"
if classification:
params["classification"] = f"eq.{classification}"
if alias_name:
params["alias_name"] = f"eq.{alias_name}"
if q:
params["or"] = f"(signal_abbreviation.ilike.*{q}*,signal_description.ilike.*{q}*)"
with build_pg_client() as c:
rows = []
last_error: Exception | None = None
for path in ("/Signals", "/signals"):
try:
r = c.get(path, params=params)
r.raise_for_status()
rows = r.json() or []
last_error = None
break
except Exception as e:
last_error = e
rows = []
continue
if last_error and not rows:
raise last_error
# TCMS도 group_code=on이면 sig_num/dedup(또는 id?)
group_code = request.args.get("group_code", "").strip().lower()
if group_code == "on":
seen: Dict[str, bool] = {}
dedup: List[Dict] = []
for row in rows:
code = row.get("id")
if code and code not in seen:
seen[code] = True
dedup.append(row)
rows = dedup
return {
"success": True,
"data": {
"rows": rows,
"total": len(rows),
"page": page,
"page_size": PAGE_SIZE,
"has_next": len(rows) == PAGE_SIZE,
"has_prev": page > 0
}
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/sb/signals/list")
def sb_signals_list():
"""기존 HTML 템플릿 반환 (호환성 유지)"""
try:
PAGE_SIZE = 50
page = int(request.args.get("page", "0"))
offset = page * PAGE_SIZE
manufacturer = request.args.get("manufacturer", "").strip()
classification = request.args.get("classification", "").strip()
q = request.args.get("q", "").strip()
alias_name = request.args.get("alias_name", "").strip()
params: Dict[str, str] = {
"select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name",
"order": "sig_num.asc",
"limit": str(PAGE_SIZE),
"offset": str(offset),
}
if manufacturer:
params["manufacturer"] = f"eq.{manufacturer}"
if classification:
params["classification"] = f"eq.{classification}"
if alias_name:
params["alias_name"] = f"eq.{alias_name}"
if q:
params["or"] = f"(signal_abbreviation.ilike.*{q}*,signal_description.ilike.*{q}*)"
with build_pg_client() as c:
rows = []
last_error: Exception | None = None
for path in ("/Signals", "/signals"):
try:
r = c.get(path, params=params)
r.raise_for_status()
rows = r.json() or []
last_error = None
break
except Exception as e:
last_error = e
rows = []
continue
if last_error and not rows:
raise last_error
# TCMS도 group_code=on이면 sig_num/dedup(또는 id?)
group_code = request.args.get("group_code", "").strip().lower()
if group_code == "on":
seen: Dict[str, bool] = {}
dedup: List[Dict] = []
for r in rows:
code = r.get("id")
if code and code not in seen:
seen[code] = True
dedup.append(r)
rows = dedup
query_params = request.args.to_dict()
if "page" in query_params:
del query_params["page"]
query_params_string = urlencode(query_params)
return render_template(
"partials/sb_signal_list.html",
rows=rows,
page=page,
page_size=PAGE_SIZE,
query_params_string=query_params_string,
)
except Exception as e:
return render_template(
"partials/sb_error.html",
error_message=str(e),
supabase_url=app.config.get("SUPABASE_URL"),
)
@app.route("/api/signals/<string:item_id>")
def api_signal_detail(item_id: str):
"""TCMS 신호 상세 정보를 JSON으로 반환 (새로운 API)"""
try:
params = {
"select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name,original_data,created_at,updated_at",
"id": f"eq.{item_id}",
"limit": "1",
}
with build_pg_client() as c:
rows = []
last_error: Exception | None = None
for path in ("/Signals", "/signals"):
try:
r = c.get(path, params=params)
r.raise_for_status()
rows = r.json() or []
last_error = None
break
except Exception as e:
last_error = e
rows = []
continue
if last_error and not rows:
raise last_error
if not rows:
return {
"success": False,
"error": "TCMS 신호를 찾을 수 없습니다."
}, 404
row = rows[0]
return {
"success": True,
"data": row
}
except Exception as e:
return {
"success": False,
"error": str(e)
}, 500
@app.route("/sb/signals/<string:item_id>")
def sb_signal_detail(item_id: str):
"""기존 HTML 템플릿 반환 (호환성 유지)"""
try:
params = {
"select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name,original_data,created_at,updated_at",
"id": f"eq.{item_id}",
"limit": "1",
}
with build_pg_client() as c:
rows = []
last_error: Exception | None = None
for path in ("/Signals", "/signals"):
try:
r = c.get(path, params=params)
r.raise_for_status()
rows = r.json() or []
last_error = None
break
except Exception as e:
last_error = e
rows = []
continue
if last_error and not rows:
raise last_error
if not rows:
abort(404)
row = rows[0]
return render_template("partials/sb_signal_detail.html", row=row)
except Exception as e:
return render_template(
"partials/sb_error.html",
error_message=str(e),
supabase_url=app.config.get("SUPABASE_URL"),
)
@app.route("/sb/health")
def sb_health():
try:
with build_pg_client() as c:
r = c.get("/Fault_Code_Table", params={"select": "f_code", "limit": "1"})
r.raise_for_status()
return {"sb": "ok", "url": app.config.get("SUPABASE_URL")}
except Exception as e:
return {"sb": "error", "url": app.config.get("SUPABASE_URL"), "error": str(e)}
# 간단한 Signals 테스트 엔드포인트: 상위 5개 레코드를 JSON으로 반환
@app.route("/sb/signals/test")
def sb_signals_test():
try:
params = {
"select": "uuid,sig_num,signal_abbreviation,manufacturer,classification,alias_name",
"order": "sig_num.asc",
"limit": "5",
}
with build_pg_client() as c:
rows = []
used_path = None
last_error = None
for path in ("/Signals", "/signals"):
try:
r = c.get(path, params=params)
r.raise_for_status()
rows = r.json() or []
used_path = path
last_error = None
break
except Exception as e:
last_error = str(e)
rows = []
continue
if last_error and not rows:
return {"ok": False, "url": app.config.get("SUPABASE_URL"), "tried": ["/Signals", "/signals"], "error": last_error}, 500
return {"ok": True, "url": app.config.get("SUPABASE_URL"), "path": used_path, "rows": rows}
except Exception as e:
return {"ok": False, "url": app.config.get("SUPABASE_URL"), "error": str(e)}, 500
# 상세 디버그: 두 경로 각각의 상태/본문을 그대로 반환
@app.route("/sb/signals/debug")
def sb_signals_debug():
try:
out = []
with build_pg_client() as c:
for path in ("/Signals", "/signals"):
try:
r = c.get(path, params={"select": "*", "limit": "5"})
ct = r.headers.get("content-type", "")
body = None
try:
body = r.json()
except Exception:
body = r.text
out.append({
"path": path,
"status": r.status_code,
"content_type": ct,
"body": body,
})
except httpx.HTTPError as e:
out.append({"path": path, "error": str(e)})
return {"ok": True, "url": app.config.get("SUPABASE_URL"), "results": out}
except Exception as e:
return {"ok": False, "url": app.config.get("SUPABASE_URL"), "error": str(e)}, 500
return app
app = create_app()
if __name__ == "__main__":
port = int(os.environ.get("PORT", "5000"))
app.run(host="0.0.0.0", port=port, debug=True)