1052 lines
38 KiB
Plaintext
1052 lines
38 KiB
Plaintext
import os
|
|
from typing import Optional, List, Dict
|
|
import httpx
|
|
|
|
from flask import Flask, g, request, abort, session, jsonify
|
|
from urllib.parse import urlencode
|
|
from dotenv import load_dotenv
|
|
from flask_cors import CORS
|
|
from functools import wraps
|
|
|
|
|
|
APP_NAME = "1호선 고장코드"
|
|
|
|
|
|
def create_app() -> Flask:
|
|
app = Flask(__name__)
|
|
app.config.update(TEMPLATES_AUTO_RELOAD=True)
|
|
|
|
# 세션 설정
|
|
app.secret_key = os.environ.get("SECRET_KEY", "dev-secret-key-change-in-production")
|
|
app.config["SESSION_COOKIE_HTTPONLY"] = True
|
|
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
|
|
|
|
# CORS 설정 (Vue PWA에서 접근 허용)
|
|
# 프로덕션: 특정 도메인만 허용
|
|
allowed_origins = [
|
|
"https://humetrain.me",
|
|
"http://localhost:5173", # 개발 서버
|
|
"http://127.0.0.1:5173",
|
|
"http://localhost:8080", # 로컬 nginx
|
|
"http://127.0.0.1:8080",
|
|
]
|
|
|
|
# 개발 모드에서는 모든 도메인 허용
|
|
CORS(app,
|
|
origins=["*"] if app.debug else allowed_origins,
|
|
supports_credentials=True,
|
|
allow_headers=["Content-Type", "Authorization", "apikey", "Accept-Profile", "Content-Profile"],
|
|
methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"])
|
|
|
|
# 환경변수 로드 및 Supabase 기본값 설정
|
|
load_dotenv()
|
|
app.config.setdefault("SUPABASE_URL", os.environ.get("SUPABASE_URL", "http://localhost:8000"))
|
|
app.config.setdefault("SUPABASE_ANON_KEY", os.environ.get("SUPABASE_ANON_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoiYW5vbiIsImlzcyI6InN1cGFiYXNlIiwiaWF0IjoxNzU4NTUxNjY2LCJleHAiOjQxMDI0NDQ4MDB9.jMCGL3Q-N2o_l7JQE_HrO7Uoct86CMgLsVxpabisG4I"))
|
|
app.config.setdefault("SUPABASE_BASIC_USER", os.environ.get("SUPABASE_BASIC_USER", ""))
|
|
app.config.setdefault("SUPABASE_BASIC_PASSWORD", os.environ.get("SUPABASE_BASIC_PASSWORD", ""))
|
|
|
|
# PostgREST(REST) 클라이언트 빌더
|
|
def build_pg_client() -> httpx.Client:
|
|
base = app.config["SUPABASE_URL"].rstrip("/") + "/rest/v1"
|
|
basic_user = app.config.get("SUPABASE_BASIC_USER") or ""
|
|
basic_pass = app.config.get("SUPABASE_BASIC_PASSWORD") or ""
|
|
headers = {
|
|
"apikey": app.config.get("SUPABASE_ANON_KEY", ""),
|
|
"Accept-Profile": "public",
|
|
"Content-Profile": "public",
|
|
}
|
|
auth = httpx.BasicAuth(basic_user, basic_pass) if basic_user else None
|
|
if not auth and app.config.get("SUPABASE_ANON_KEY"):
|
|
headers["Authorization"] = f"Bearer {app.config['SUPABASE_ANON_KEY']}"
|
|
return httpx.Client(base_url=base, headers=headers, auth=auth, timeout=10)
|
|
|
|
def pg_unique(col: str) -> List[str]:
|
|
with build_pg_client() as c:
|
|
r = c.get("/Fault_Code_Table", params={"select": col})
|
|
r.raise_for_status()
|
|
vals = [row.get(col) for row in (r.json() or []) if row.get(col)]
|
|
seen: Dict[str, bool] = {}
|
|
out: List[str] = []
|
|
for v in vals:
|
|
if v not in seen:
|
|
seen[v] = True
|
|
out.append(v)
|
|
return out
|
|
|
|
def pg_unique_from(table: str, col: str) -> List[str]:
|
|
"""지정 테이블에서 고유값 리스트를 반환한다."""
|
|
with build_pg_client() as c:
|
|
r = c.get(f"/{table}", params={"select": col})
|
|
r.raise_for_status()
|
|
vals = [row.get(col) for row in (r.json() or []) if row.get(col)]
|
|
seen: Dict[str, bool] = {}
|
|
out: List[str] = []
|
|
for v in vals:
|
|
if v not in seen:
|
|
seen[v] = True
|
|
out.append(v)
|
|
return out
|
|
|
|
# 인증 헬퍼 함수
|
|
def login_required(f):
|
|
"""로그인이 필요한 라우트를 위한 데코레이터"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if "user_id" not in session:
|
|
return jsonify({"success": False, "error": "Authentication required"}), 401
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def get_current_user() -> Optional[Dict]:
|
|
"""현재 로그인한 사용자 정보 반환"""
|
|
if "user_id" in session:
|
|
return {
|
|
"user_id": session.get("user_id"),
|
|
"email": session.get("email"),
|
|
"employee_id": session.get("employee_id"),
|
|
"name": session.get("name"),
|
|
"department_id": session.get("department_id"),
|
|
"access_token": session.get("access_token")
|
|
}
|
|
return None
|
|
|
|
# ============= 인증 API =============
|
|
|
|
@app.route("/api/auth/login", methods=["POST"])
|
|
def api_login():
|
|
"""로그인 API"""
|
|
try:
|
|
from auth import AuthManager
|
|
|
|
data = request.json
|
|
employee_id = data.get("employee_id", "").strip()
|
|
password = data.get("password", "").strip()
|
|
|
|
if not employee_id or not password:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "사번과 비밀번호를 모두 입력해주세요."
|
|
}), 400
|
|
|
|
auth_manager = AuthManager(
|
|
app.config["SUPABASE_URL"],
|
|
app.config["SUPABASE_ANON_KEY"]
|
|
)
|
|
|
|
success, message, session_data = auth_manager.login_user_by_employee_id(employee_id, password)
|
|
|
|
if success and session_data:
|
|
# 세션에 사용자 정보 저장
|
|
session["user_id"] = session_data["user_id"]
|
|
session["auth_id"] = session_data["auth_id"]
|
|
session["email"] = session_data["email"]
|
|
session["employee_id"] = session_data["employee_id"]
|
|
session["name"] = session_data["name"]
|
|
session["department_id"] = session_data["department_id"]
|
|
session["access_token"] = session_data["access_token"]
|
|
session.permanent = True
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"user": {
|
|
"user_id": session_data["user_id"],
|
|
"email": session_data["email"],
|
|
"employee_id": session_data["employee_id"],
|
|
"name": session_data["name"],
|
|
"department_id": session_data["department_id"]
|
|
}
|
|
})
|
|
else:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": message
|
|
}), 401
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/auth/signup", methods=["POST"])
|
|
def api_signup():
|
|
"""회원가입 API"""
|
|
try:
|
|
from auth import AuthManager
|
|
|
|
data = request.json
|
|
email = data.get("email", "").strip()
|
|
password = data.get("password", "").strip()
|
|
password_confirm = data.get("password_confirm", "").strip()
|
|
employee_id = data.get("employee_id", "").strip()
|
|
name = data.get("name", "").strip()
|
|
department_id = data.get("department_id", "").strip()
|
|
|
|
# 입력값 검증
|
|
if not all([email, password, password_confirm, employee_id, name, department_id]):
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "모든 필드를 입력해주세요."
|
|
}), 400
|
|
|
|
# 비밀번호 확인
|
|
if password != password_confirm:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "비밀번호가 일치하지 않습니다."
|
|
}), 400
|
|
|
|
# 비밀번호 길이 검증
|
|
if len(password) < 8:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "비밀번호는 최소 8자 이상이어야 합니다."
|
|
}), 400
|
|
|
|
auth_manager = AuthManager(
|
|
app.config["SUPABASE_URL"],
|
|
app.config["SUPABASE_ANON_KEY"]
|
|
)
|
|
|
|
# 회원가입 처리
|
|
success, message, _ = auth_manager.signup_user(
|
|
email=email,
|
|
password=password,
|
|
employee_id=employee_id,
|
|
name=name,
|
|
department_id=int(department_id)
|
|
)
|
|
|
|
if success:
|
|
return jsonify({
|
|
"success": True,
|
|
"message": message
|
|
})
|
|
else:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": message
|
|
}), 400
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/auth/logout", methods=["POST"])
|
|
def api_logout():
|
|
"""로그아웃 API"""
|
|
try:
|
|
from auth import AuthManager
|
|
|
|
# 세션에서 access_token 가져오기
|
|
access_token = session.get("access_token")
|
|
|
|
if access_token:
|
|
auth_manager = AuthManager(
|
|
app.config["SUPABASE_URL"],
|
|
app.config["SUPABASE_ANON_KEY"]
|
|
)
|
|
auth_manager.logout_user(access_token)
|
|
|
|
# 세션 클리어
|
|
session.clear()
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "로그아웃되었습니다."
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/auth/me")
|
|
@login_required
|
|
def api_current_user():
|
|
"""현재 로그인한 사용자 정보 조회 API"""
|
|
user = get_current_user()
|
|
return jsonify({
|
|
"success": True,
|
|
"user": user
|
|
})
|
|
|
|
@app.route("/api/departments")
|
|
def api_departments():
|
|
"""부서 목록 조회 API"""
|
|
try:
|
|
from auth import AuthManager
|
|
|
|
auth_manager = AuthManager(
|
|
app.config["SUPABASE_URL"],
|
|
app.config["SUPABASE_ANON_KEY"]
|
|
)
|
|
|
|
departments = auth_manager.get_departments()
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"departments": departments
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
# ============= 이메일 코드 인증 API =============
|
|
|
|
@app.route("/api/email/send-code", methods=["POST"])
|
|
def send_email_code():
|
|
"""이메일 인증 코드 발송"""
|
|
try:
|
|
import random
|
|
import string
|
|
from datetime import datetime, timedelta
|
|
|
|
data = request.json
|
|
email = data.get("email", "").strip().lower()
|
|
code_type = data.get("type", "signup") # signup, password_reset
|
|
|
|
# 이메일 도메인 검증
|
|
if not email.endswith("@humetro.busan.kr"):
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "humetro.busan.kr 도메인의 이메일만 사용 가능합니다."
|
|
}), 400
|
|
|
|
# 6자리 숫자 코드 생성
|
|
code = ''.join(random.choices(string.digits, k=6))
|
|
|
|
# 세션에 코드 저장 (5분간 유효)
|
|
expiry = datetime.now() + timedelta(minutes=5)
|
|
session[f"email_code_{email}_{code_type}"] = {
|
|
"code": code,
|
|
"expiry": expiry.timestamp(),
|
|
"attempts": 0
|
|
}
|
|
|
|
# TODO: 실제 이메일 발송
|
|
print(f"\n=== 이메일 인증 코드 ===")
|
|
print(f"To: {email}")
|
|
print(f"Code: {code}")
|
|
print(f"Type: {code_type}")
|
|
print(f"Expiry: {expiry}")
|
|
print(f"=======================\n")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "인증 코드를 이메일로 전송했습니다.",
|
|
"debug_code": code if app.debug else None
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/email/verify-code", methods=["POST"])
|
|
def verify_email_code():
|
|
"""이메일 인증 코드 검증"""
|
|
try:
|
|
from datetime import datetime
|
|
|
|
data = request.json
|
|
email = data.get("email", "").strip().lower()
|
|
code = data.get("code", "").strip()
|
|
code_type = data.get("type", "signup")
|
|
|
|
# 세션에서 코드 확인
|
|
session_key = f"email_code_{email}_{code_type}"
|
|
stored_data = session.get(session_key)
|
|
|
|
if not stored_data:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "인증 코드가 존재하지 않거나 만료되었습니다."
|
|
}), 400
|
|
|
|
# 시도 횟수 체크
|
|
if stored_data["attempts"] >= 5:
|
|
session.pop(session_key, None)
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "인증 시도 횟수를 초과했습니다."
|
|
}), 400
|
|
|
|
# 만료 시간 체크
|
|
if datetime.now().timestamp() > stored_data["expiry"]:
|
|
session.pop(session_key, None)
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "인증 코드가 만료되었습니다."
|
|
}), 400
|
|
|
|
# 코드 검증
|
|
if stored_data["code"] != code:
|
|
stored_data["attempts"] += 1
|
|
session[session_key] = stored_data
|
|
return jsonify({
|
|
"success": False,
|
|
"error": f"인증 코드가 올바르지 않습니다. (남은 시도: {5 - stored_data['attempts']}회)"
|
|
}), 400
|
|
|
|
# 인증 성공
|
|
session[f"email_verified_{email}_{code_type}"] = {
|
|
"verified_at": datetime.now().timestamp()
|
|
}
|
|
session.pop(session_key, None)
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "인증이 완료되었습니다."
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
# ============= 생체인증 API =============
|
|
|
|
@app.route("/api/biometric/register-challenge", methods=["POST"])
|
|
def biometric_register_challenge():
|
|
"""생체인증 등록을 위한 challenge 생성"""
|
|
try:
|
|
import secrets
|
|
from datetime import datetime
|
|
|
|
data = request.json
|
|
employee_id = data.get("employeeId")
|
|
|
|
challenge = secrets.token_urlsafe(32)
|
|
session[f"biometric_challenge_{employee_id}"] = {
|
|
"challenge": challenge,
|
|
"timestamp": datetime.now().timestamp()
|
|
}
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"challenge": challenge,
|
|
"userId": employee_id
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/biometric/register", methods=["POST"])
|
|
def biometric_register():
|
|
"""생체인증 credential 등록"""
|
|
try:
|
|
from datetime import datetime
|
|
|
|
data = request.json
|
|
employee_id = data.get("employeeId")
|
|
credential = data.get("credential")
|
|
|
|
with build_pg_client() as c:
|
|
r = c.post(
|
|
"/biometric_credentials",
|
|
json={
|
|
"employee_id": employee_id,
|
|
"credential_id": credential["id"],
|
|
"credential_data": credential,
|
|
"created_at": datetime.now().isoformat()
|
|
}
|
|
)
|
|
r.raise_for_status()
|
|
|
|
session.pop(f"biometric_challenge_{employee_id}", None)
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "생체인증이 등록되었습니다."
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/biometric/login-challenge", methods=["POST"])
|
|
def biometric_login_challenge():
|
|
"""생체인증 로그인을 위한 challenge 생성"""
|
|
try:
|
|
import secrets
|
|
from datetime import datetime
|
|
|
|
data = request.json
|
|
employee_id = data.get("employeeId")
|
|
|
|
challenge = secrets.token_urlsafe(32)
|
|
session[f"biometric_login_challenge_{employee_id}"] = {
|
|
"challenge": challenge,
|
|
"timestamp": datetime.now().timestamp()
|
|
}
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"challenge": challenge
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/biometric/login", methods=["POST"])
|
|
def biometric_login():
|
|
"""생체인증으로 로그인"""
|
|
try:
|
|
data = request.json
|
|
employee_id = data.get("employeeId")
|
|
assertion = data.get("assertion")
|
|
|
|
with build_pg_client() as c:
|
|
r = c.get(
|
|
"/biometric_credentials",
|
|
params={
|
|
"employee_id": f"eq.{employee_id}",
|
|
"limit": "1"
|
|
}
|
|
)
|
|
r.raise_for_status()
|
|
credentials = r.json() or []
|
|
|
|
if not credentials:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "등록된 생체인증 정보가 없습니다."
|
|
}), 404
|
|
|
|
stored_credential = credentials[0]
|
|
if stored_credential["credential_id"] != assertion["id"]:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "인증 실패"
|
|
}), 401
|
|
|
|
# users 테이블에서 사용자 정보 조회
|
|
with build_pg_client() as c:
|
|
r = c.get(
|
|
"/users",
|
|
params={
|
|
"employee_id": f"eq.{employee_id}",
|
|
"limit": "1"
|
|
}
|
|
)
|
|
r.raise_for_status()
|
|
users = r.json() or []
|
|
|
|
if not users:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "사용자를 찾을 수 없습니다."
|
|
}), 404
|
|
|
|
user_data = users[0]
|
|
|
|
# 세션에 사용자 정보 저장
|
|
session["user_id"] = user_data["id"]
|
|
session["email"] = user_data["email"]
|
|
session["employee_id"] = user_data["employee_id"]
|
|
session["name"] = user_data["name"]
|
|
session["department_id"] = user_data["department_id"]
|
|
session.permanent = True
|
|
|
|
session.pop(f"biometric_login_challenge_{employee_id}", None)
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"user": {
|
|
"id": user_data["id"],
|
|
"email": user_data["email"],
|
|
"employee_id": user_data["employee_id"],
|
|
"name": user_data["name"],
|
|
"department_id": user_data["department_id"]
|
|
}
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/biometric/unregister", methods=["POST"])
|
|
def biometric_unregister():
|
|
"""생체인증 해제"""
|
|
try:
|
|
data = request.json
|
|
employee_id = data.get("employeeId")
|
|
|
|
with build_pg_client() as c:
|
|
r = c.delete(
|
|
"/biometric_credentials",
|
|
params={"employee_id": f"eq.{employee_id}"}
|
|
)
|
|
r.raise_for_status()
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "생체인증이 해제되었습니다."
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
# ============= 데이터 조회 API =============
|
|
|
|
@app.route("/health")
|
|
def health():
|
|
return jsonify({"status": "ok"})
|
|
|
|
@app.route("/favicon.ico")
|
|
def favicon():
|
|
return ("", 204, {"Content-Type": "image/x-icon"})
|
|
|
|
@app.route("/api/manufacturers")
|
|
def api_manufacturers():
|
|
"""제조사 목록을 JSON으로 반환"""
|
|
try:
|
|
with build_pg_client() as c:
|
|
# Fault_Code_Table에서 제조사
|
|
r1 = c.get("/Fault_Code_Table", params={"select": "manufacturer"})
|
|
r1.raise_for_status()
|
|
vals1 = [row.get("manufacturer") for row in (r1.json() or []) if row.get("manufacturer")]
|
|
|
|
# Signals 테이블에서 제조사
|
|
vals2 = []
|
|
for path in ("/Signals", "/signals"):
|
|
try:
|
|
r2 = c.get(path, params={"select": "manufacturer"})
|
|
r2.raise_for_status()
|
|
vals2 = [row.get("manufacturer") for row in (r2.json() or []) if row.get("manufacturer")]
|
|
break
|
|
except httpx.HTTPError:
|
|
continue
|
|
|
|
# MMI_Code 테이블에서 제조사
|
|
vals3 = []
|
|
try:
|
|
r3 = c.get("/MMI_Code", params={"select": "manufacturer"})
|
|
r3.raise_for_status()
|
|
vals3 = [row.get("manufacturer") for row in (r3.json() or []) if row.get("manufacturer")]
|
|
except httpx.HTTPError:
|
|
pass
|
|
|
|
# 중복 제거
|
|
all_manufacturers = vals1 + vals2 + vals3
|
|
seen = set()
|
|
unique_manufacturers = []
|
|
for v in all_manufacturers:
|
|
if v and v not in seen:
|
|
seen.add(v)
|
|
unique_manufacturers.append(v)
|
|
|
|
unique_manufacturers.sort()
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"data": {
|
|
"manufacturers": unique_manufacturers
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/filters/fault")
|
|
def api_fault_filters():
|
|
"""고장코드 필터 옵션을 JSON으로 반환"""
|
|
try:
|
|
selected_manufacturer = request.args.get("manufacturer", "").strip()
|
|
|
|
devices = pg_unique("device")
|
|
car_types = pg_unique("car_type")
|
|
car_ids = pg_unique("car_id")
|
|
|
|
all_alias = pg_unique("alias_name")
|
|
def alias_allowed(name: str) -> bool:
|
|
if not name:
|
|
return False
|
|
low = name.lower()
|
|
if selected_manufacturer.lower() == "woojin":
|
|
return ("우진" in name) or ("woojin" in low)
|
|
if selected_manufacturer.lower() == "rotem":
|
|
return ("로템" in name) or ("다대" in name) or ("rotem" in low)
|
|
return True
|
|
alias_names = [a for a in all_alias if alias_allowed(a)]
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"data": {
|
|
"devices": sorted(devices),
|
|
"car_types": sorted(car_types),
|
|
"car_ids": sorted(car_ids),
|
|
"alias_names": sorted(alias_names)
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/filters/signal")
|
|
def api_signal_filters():
|
|
"""TCMS 신호 필터 옵션을 JSON으로 반환"""
|
|
try:
|
|
selected_manufacturer = request.args.get("manufacturer", "").strip()
|
|
|
|
all_alias = pg_unique_from("Signals", "alias_name")
|
|
def alias_allowed(name: str) -> bool:
|
|
if not name:
|
|
return False
|
|
low = name.lower()
|
|
if selected_manufacturer.lower() == "woojin":
|
|
return ("우진" in name) or ("woojin" in low)
|
|
if selected_manufacturer.lower() == "rotem":
|
|
return ("로템" in name) or ("다대" in name) or ("rotem" in low)
|
|
return True
|
|
alias_names = [a for a in all_alias if alias_allowed(a)]
|
|
classifications = pg_unique_from("Signals", "classification")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"data": {
|
|
"alias_names": sorted(alias_names),
|
|
"classifications": sorted(classifications)
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/filters/mmi")
|
|
def api_mmi_filters():
|
|
"""MMI 코드 필터 옵션을 JSON으로 반환"""
|
|
try:
|
|
selected_manufacturer = request.args.get("manufacturer", "").strip()
|
|
|
|
with build_pg_client() as c:
|
|
res = c.get("/MMI_Code", params={"select": "alias_name"})
|
|
res.raise_for_status()
|
|
all_alias = sorted({row.get("alias_name") for row in (res.json() or []) if row.get("alias_name")})
|
|
|
|
def alias_allowed(name: str) -> bool:
|
|
if not name:
|
|
return False
|
|
low = name.lower()
|
|
if selected_manufacturer.lower() == "woojin":
|
|
return ("우진" in name) or ("woojin" in low)
|
|
if selected_manufacturer.lower() == "rotem":
|
|
return ("로템" in name) or ("다대" in name) or ("rotem" in low)
|
|
return True
|
|
alias_names = [a for a in all_alias if alias_allowed(a)]
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"data": {
|
|
"alias_names": alias_names
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/mmi/list")
|
|
def api_mmi_list():
|
|
"""MMI 코드 목록을 JSON으로 반환"""
|
|
try:
|
|
PAGE_SIZE = 50
|
|
page = int(request.args.get("page", "0"))
|
|
offset = page * PAGE_SIZE
|
|
|
|
manufacturer = request.args.get("manufacturer", "").strip()
|
|
alias_name = request.args.get("alias_name", "").strip()
|
|
q = request.args.get("q", "").strip()
|
|
|
|
params: Dict[str, str] = {
|
|
"select": "id,code_name,code_description,data_type,car_id,alias_name,manufacturer",
|
|
"order": "code_name.asc",
|
|
"limit": str(PAGE_SIZE),
|
|
"offset": str(offset),
|
|
}
|
|
|
|
if manufacturer:
|
|
params["manufacturer"] = f"eq.{manufacturer}"
|
|
if alias_name:
|
|
params["alias_name"] = f"eq.{alias_name}"
|
|
if q:
|
|
params["or"] = f"(code_name.ilike.*{q}*,code_description.ilike.*{q}*)"
|
|
|
|
with build_pg_client() as c:
|
|
r = c.get("/MMI_Code", params=params)
|
|
r.raise_for_status()
|
|
rows = r.json() or []
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"data": {
|
|
"rows": rows,
|
|
"total": len(rows),
|
|
"page": page,
|
|
"page_size": PAGE_SIZE,
|
|
"has_next": len(rows) == PAGE_SIZE,
|
|
"has_prev": page > 0
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/faults/list")
|
|
def api_faults_list():
|
|
"""고장코드 목록을 JSON으로 반환"""
|
|
try:
|
|
PAGE_SIZE = 50
|
|
page = int(request.args.get("page", "0"))
|
|
offset = page * PAGE_SIZE
|
|
|
|
manufacturer = request.args.get("manufacturer", "").strip()
|
|
device = request.args.get("device", "").strip()
|
|
car_type = request.args.get("car_type", "").strip()
|
|
car_id = request.args.get("car_id", "").strip()
|
|
alias_name = request.args.get("alias_name", "").strip()
|
|
q = request.args.get("q", "").strip()
|
|
|
|
params: Dict[str, str] = {
|
|
"select": "f_code,f_code_num,f_name,manufacturer,device,car_type,car_id,fault_detail,alias_name",
|
|
"order": "f_code.asc",
|
|
"limit": str(PAGE_SIZE),
|
|
"offset": str(offset),
|
|
}
|
|
|
|
if manufacturer:
|
|
params["manufacturer"] = f"eq.{manufacturer}"
|
|
if device:
|
|
params["device"] = f"eq.{device}"
|
|
if car_type:
|
|
params["car_type"] = f"eq.{car_type}"
|
|
if car_id:
|
|
params["car_id"] = f"eq.{car_id}"
|
|
if alias_name:
|
|
params["alias_name"] = f"eq.{alias_name}"
|
|
if q:
|
|
params["or"] = f"(f_code.ilike.*{q}*,f_name.ilike.*{q}*)"
|
|
|
|
with build_pg_client() as c:
|
|
r = c.get("/Fault_Code_Table", params=params)
|
|
r.raise_for_status()
|
|
rows = r.json() or []
|
|
|
|
# 그룹핑 옵션
|
|
group_code = request.args.get("group_code", "").strip().lower()
|
|
if group_code == "on":
|
|
seen: Dict[str, bool] = {}
|
|
dedup: List[Dict] = []
|
|
for row in rows:
|
|
code = row.get("f_code")
|
|
if code and code not in seen:
|
|
seen[code] = True
|
|
dedup.append(row)
|
|
rows = dedup
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"data": {
|
|
"rows": rows,
|
|
"total": len(rows),
|
|
"page": page,
|
|
"page_size": PAGE_SIZE,
|
|
"has_next": len(rows) == PAGE_SIZE,
|
|
"has_prev": page > 0
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/faults/<string:f_code>")
|
|
def api_fault_detail(f_code: str):
|
|
"""고장코드 상세 정보를 JSON으로 반환"""
|
|
try:
|
|
params = {
|
|
"select": "f_code,f_code_num,f_name,car_type,f_class,grade,device,fault_detail,fault_reaction,fault_detection,fault_clear,fault_action,fault_schematics,car_id,manufacturer,alias_name",
|
|
"f_code": f"eq.{f_code}",
|
|
"limit": "1",
|
|
}
|
|
with build_pg_client() as c:
|
|
r = c.get("/Fault_Code_Table", params=params)
|
|
r.raise_for_status()
|
|
rows = r.json() or []
|
|
if not rows:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "고장코드를 찾을 수 없습니다."
|
|
}), 404
|
|
row = rows[0]
|
|
return jsonify({
|
|
"success": True,
|
|
"data": row
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/signals/list")
|
|
def api_signals_list():
|
|
"""TCMS 신호 목록을 JSON으로 반환"""
|
|
try:
|
|
PAGE_SIZE = 50
|
|
page = int(request.args.get("page", "0"))
|
|
offset = page * PAGE_SIZE
|
|
|
|
manufacturer = request.args.get("manufacturer", "").strip()
|
|
classification = request.args.get("classification", "").strip()
|
|
q = request.args.get("q", "").strip()
|
|
alias_name = request.args.get("alias_name", "").strip()
|
|
|
|
params: Dict[str, str] = {
|
|
"select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name",
|
|
"order": "sig_num.asc",
|
|
"limit": str(PAGE_SIZE),
|
|
"offset": str(offset),
|
|
}
|
|
if manufacturer:
|
|
params["manufacturer"] = f"eq.{manufacturer}"
|
|
if classification:
|
|
params["classification"] = f"eq.{classification}"
|
|
if alias_name:
|
|
params["alias_name"] = f"eq.{alias_name}"
|
|
if q:
|
|
params["or"] = f"(signal_abbreviation.ilike.*{q}*,signal_description.ilike.*{q}*)"
|
|
|
|
with build_pg_client() as c:
|
|
rows = []
|
|
last_error: Exception | None = None
|
|
for path in ("/Signals", "/signals"):
|
|
try:
|
|
r = c.get(path, params=params)
|
|
r.raise_for_status()
|
|
rows = r.json() or []
|
|
last_error = None
|
|
break
|
|
except Exception as e:
|
|
last_error = e
|
|
rows = []
|
|
continue
|
|
if last_error and not rows:
|
|
raise last_error
|
|
|
|
group_code = request.args.get("group_code", "").strip().lower()
|
|
if group_code == "on":
|
|
seen: Dict[str, bool] = {}
|
|
dedup: List[Dict] = []
|
|
for row in rows:
|
|
code = row.get("id")
|
|
if code and code not in seen:
|
|
seen[code] = True
|
|
dedup.append(row)
|
|
rows = dedup
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"data": {
|
|
"rows": rows,
|
|
"total": len(rows),
|
|
"page": page,
|
|
"page_size": PAGE_SIZE,
|
|
"has_next": len(rows) == PAGE_SIZE,
|
|
"has_prev": page > 0
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/api/signals/<string:item_id>")
|
|
def api_signal_detail(item_id: str):
|
|
"""TCMS 신호 상세 정보를 JSON으로 반환"""
|
|
try:
|
|
params = {
|
|
"select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name,original_data,created_at,updated_at",
|
|
"id": f"eq.{item_id}",
|
|
"limit": "1",
|
|
}
|
|
with build_pg_client() as c:
|
|
rows = []
|
|
last_error: Exception | None = None
|
|
for path in ("/Signals", "/signals"):
|
|
try:
|
|
r = c.get(path, params=params)
|
|
r.raise_for_status()
|
|
rows = r.json() or []
|
|
last_error = None
|
|
break
|
|
except Exception as e:
|
|
last_error = e
|
|
rows = []
|
|
continue
|
|
if last_error and not rows:
|
|
raise last_error
|
|
if not rows:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "TCMS 신호를 찾을 수 없습니다."
|
|
}), 404
|
|
row = rows[0]
|
|
return jsonify({
|
|
"success": True,
|
|
"data": row
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
"success": False,
|
|
"error": str(e)
|
|
}), 500
|
|
|
|
@app.route("/sb/health")
|
|
def sb_health():
|
|
try:
|
|
with build_pg_client() as c:
|
|
r = c.get("/Fault_Code_Table", params={"select": "f_code", "limit": "1"})
|
|
r.raise_for_status()
|
|
return jsonify({"sb": "ok", "url": app.config.get("SUPABASE_URL")})
|
|
except Exception as e:
|
|
return jsonify({"sb": "error", "url": app.config.get("SUPABASE_URL"), "error": str(e)})
|
|
|
|
return app
|
|
|
|
|
|
app = create_app()
|
|
if __name__ == "__main__":
|
|
port = int(os.environ.get("PORT", "5000"))
|
|
app.run(host="0.0.0.0", port=port, debug=True)
|
|
|