Tr_Code/app.py.backup

1052 lines
38 KiB
Plaintext

import os
from typing import Optional, List, Dict
import httpx
from flask import Flask, g, request, abort, session, jsonify
from urllib.parse import urlencode
from dotenv import load_dotenv
from flask_cors import CORS
from functools import wraps
APP_NAME = "1호선 고장코드"
def create_app() -> Flask:
app = Flask(__name__)
app.config.update(TEMPLATES_AUTO_RELOAD=True)
# 세션 설정
app.secret_key = os.environ.get("SECRET_KEY", "dev-secret-key-change-in-production")
app.config["SESSION_COOKIE_HTTPONLY"] = True
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
# CORS 설정 (Vue PWA에서 접근 허용)
# 프로덕션: 특정 도메인만 허용
allowed_origins = [
"https://humetrain.me",
"http://localhost:5173", # 개발 서버
"http://127.0.0.1:5173",
"http://localhost:8080", # 로컬 nginx
"http://127.0.0.1:8080",
]
# 개발 모드에서는 모든 도메인 허용
CORS(app,
origins=["*"] if app.debug else allowed_origins,
supports_credentials=True,
allow_headers=["Content-Type", "Authorization", "apikey", "Accept-Profile", "Content-Profile"],
methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"])
# 환경변수 로드 및 Supabase 기본값 설정
load_dotenv()
app.config.setdefault("SUPABASE_URL", os.environ.get("SUPABASE_URL", "http://localhost:8000"))
app.config.setdefault("SUPABASE_ANON_KEY", os.environ.get("SUPABASE_ANON_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoiYW5vbiIsImlzcyI6InN1cGFiYXNlIiwiaWF0IjoxNzU4NTUxNjY2LCJleHAiOjQxMDI0NDQ4MDB9.jMCGL3Q-N2o_l7JQE_HrO7Uoct86CMgLsVxpabisG4I"))
app.config.setdefault("SUPABASE_BASIC_USER", os.environ.get("SUPABASE_BASIC_USER", ""))
app.config.setdefault("SUPABASE_BASIC_PASSWORD", os.environ.get("SUPABASE_BASIC_PASSWORD", ""))
# PostgREST(REST) 클라이언트 빌더
def build_pg_client() -> httpx.Client:
base = app.config["SUPABASE_URL"].rstrip("/") + "/rest/v1"
basic_user = app.config.get("SUPABASE_BASIC_USER") or ""
basic_pass = app.config.get("SUPABASE_BASIC_PASSWORD") or ""
headers = {
"apikey": app.config.get("SUPABASE_ANON_KEY", ""),
"Accept-Profile": "public",
"Content-Profile": "public",
}
auth = httpx.BasicAuth(basic_user, basic_pass) if basic_user else None
if not auth and app.config.get("SUPABASE_ANON_KEY"):
headers["Authorization"] = f"Bearer {app.config['SUPABASE_ANON_KEY']}"
return httpx.Client(base_url=base, headers=headers, auth=auth, timeout=10)
def pg_unique(col: str) -> List[str]:
with build_pg_client() as c:
r = c.get("/Fault_Code_Table", params={"select": col})
r.raise_for_status()
vals = [row.get(col) for row in (r.json() or []) if row.get(col)]
seen: Dict[str, bool] = {}
out: List[str] = []
for v in vals:
if v not in seen:
seen[v] = True
out.append(v)
return out
def pg_unique_from(table: str, col: str) -> List[str]:
"""지정 테이블에서 고유값 리스트를 반환한다."""
with build_pg_client() as c:
r = c.get(f"/{table}", params={"select": col})
r.raise_for_status()
vals = [row.get(col) for row in (r.json() or []) if row.get(col)]
seen: Dict[str, bool] = {}
out: List[str] = []
for v in vals:
if v not in seen:
seen[v] = True
out.append(v)
return out
# 인증 헬퍼 함수
def login_required(f):
"""로그인이 필요한 라우트를 위한 데코레이터"""
@wraps(f)
def decorated_function(*args, **kwargs):
if "user_id" not in session:
return jsonify({"success": False, "error": "Authentication required"}), 401
return f(*args, **kwargs)
return decorated_function
def get_current_user() -> Optional[Dict]:
"""현재 로그인한 사용자 정보 반환"""
if "user_id" in session:
return {
"user_id": session.get("user_id"),
"email": session.get("email"),
"employee_id": session.get("employee_id"),
"name": session.get("name"),
"department_id": session.get("department_id"),
"access_token": session.get("access_token")
}
return None
# ============= 인증 API =============
@app.route("/api/auth/login", methods=["POST"])
def api_login():
"""로그인 API"""
try:
from auth import AuthManager
data = request.json
employee_id = data.get("employee_id", "").strip()
password = data.get("password", "").strip()
if not employee_id or not password:
return jsonify({
"success": False,
"error": "사번과 비밀번호를 모두 입력해주세요."
}), 400
auth_manager = AuthManager(
app.config["SUPABASE_URL"],
app.config["SUPABASE_ANON_KEY"]
)
success, message, session_data = auth_manager.login_user_by_employee_id(employee_id, password)
if success and session_data:
# 세션에 사용자 정보 저장
session["user_id"] = session_data["user_id"]
session["auth_id"] = session_data["auth_id"]
session["email"] = session_data["email"]
session["employee_id"] = session_data["employee_id"]
session["name"] = session_data["name"]
session["department_id"] = session_data["department_id"]
session["access_token"] = session_data["access_token"]
session.permanent = True
return jsonify({
"success": True,
"user": {
"user_id": session_data["user_id"],
"email": session_data["email"],
"employee_id": session_data["employee_id"],
"name": session_data["name"],
"department_id": session_data["department_id"]
}
})
else:
return jsonify({
"success": False,
"error": message
}), 401
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/auth/signup", methods=["POST"])
def api_signup():
"""회원가입 API"""
try:
from auth import AuthManager
data = request.json
email = data.get("email", "").strip()
password = data.get("password", "").strip()
password_confirm = data.get("password_confirm", "").strip()
employee_id = data.get("employee_id", "").strip()
name = data.get("name", "").strip()
department_id = data.get("department_id", "").strip()
# 입력값 검증
if not all([email, password, password_confirm, employee_id, name, department_id]):
return jsonify({
"success": False,
"error": "모든 필드를 입력해주세요."
}), 400
# 비밀번호 확인
if password != password_confirm:
return jsonify({
"success": False,
"error": "비밀번호가 일치하지 않습니다."
}), 400
# 비밀번호 길이 검증
if len(password) < 8:
return jsonify({
"success": False,
"error": "비밀번호는 최소 8자 이상이어야 합니다."
}), 400
auth_manager = AuthManager(
app.config["SUPABASE_URL"],
app.config["SUPABASE_ANON_KEY"]
)
# 회원가입 처리
success, message, _ = auth_manager.signup_user(
email=email,
password=password,
employee_id=employee_id,
name=name,
department_id=int(department_id)
)
if success:
return jsonify({
"success": True,
"message": message
})
else:
return jsonify({
"success": False,
"error": message
}), 400
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/auth/logout", methods=["POST"])
def api_logout():
"""로그아웃 API"""
try:
from auth import AuthManager
# 세션에서 access_token 가져오기
access_token = session.get("access_token")
if access_token:
auth_manager = AuthManager(
app.config["SUPABASE_URL"],
app.config["SUPABASE_ANON_KEY"]
)
auth_manager.logout_user(access_token)
# 세션 클리어
session.clear()
return jsonify({
"success": True,
"message": "로그아웃되었습니다."
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/auth/me")
@login_required
def api_current_user():
"""현재 로그인한 사용자 정보 조회 API"""
user = get_current_user()
return jsonify({
"success": True,
"user": user
})
@app.route("/api/departments")
def api_departments():
"""부서 목록 조회 API"""
try:
from auth import AuthManager
auth_manager = AuthManager(
app.config["SUPABASE_URL"],
app.config["SUPABASE_ANON_KEY"]
)
departments = auth_manager.get_departments()
return jsonify({
"success": True,
"departments": departments
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
# ============= 이메일 코드 인증 API =============
@app.route("/api/email/send-code", methods=["POST"])
def send_email_code():
"""이메일 인증 코드 발송"""
try:
import random
import string
from datetime import datetime, timedelta
data = request.json
email = data.get("email", "").strip().lower()
code_type = data.get("type", "signup") # signup, password_reset
# 이메일 도메인 검증
if not email.endswith("@humetro.busan.kr"):
return jsonify({
"success": False,
"error": "humetro.busan.kr 도메인의 이메일만 사용 가능합니다."
}), 400
# 6자리 숫자 코드 생성
code = ''.join(random.choices(string.digits, k=6))
# 세션에 코드 저장 (5분간 유효)
expiry = datetime.now() + timedelta(minutes=5)
session[f"email_code_{email}_{code_type}"] = {
"code": code,
"expiry": expiry.timestamp(),
"attempts": 0
}
# TODO: 실제 이메일 발송
print(f"\n=== 이메일 인증 코드 ===")
print(f"To: {email}")
print(f"Code: {code}")
print(f"Type: {code_type}")
print(f"Expiry: {expiry}")
print(f"=======================\n")
return jsonify({
"success": True,
"message": "인증 코드를 이메일로 전송했습니다.",
"debug_code": code if app.debug else None
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/email/verify-code", methods=["POST"])
def verify_email_code():
"""이메일 인증 코드 검증"""
try:
from datetime import datetime
data = request.json
email = data.get("email", "").strip().lower()
code = data.get("code", "").strip()
code_type = data.get("type", "signup")
# 세션에서 코드 확인
session_key = f"email_code_{email}_{code_type}"
stored_data = session.get(session_key)
if not stored_data:
return jsonify({
"success": False,
"error": "인증 코드가 존재하지 않거나 만료되었습니다."
}), 400
# 시도 횟수 체크
if stored_data["attempts"] >= 5:
session.pop(session_key, None)
return jsonify({
"success": False,
"error": "인증 시도 횟수를 초과했습니다."
}), 400
# 만료 시간 체크
if datetime.now().timestamp() > stored_data["expiry"]:
session.pop(session_key, None)
return jsonify({
"success": False,
"error": "인증 코드가 만료되었습니다."
}), 400
# 코드 검증
if stored_data["code"] != code:
stored_data["attempts"] += 1
session[session_key] = stored_data
return jsonify({
"success": False,
"error": f"인증 코드가 올바르지 않습니다. (남은 시도: {5 - stored_data['attempts']}회)"
}), 400
# 인증 성공
session[f"email_verified_{email}_{code_type}"] = {
"verified_at": datetime.now().timestamp()
}
session.pop(session_key, None)
return jsonify({
"success": True,
"message": "인증이 완료되었습니다."
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
# ============= 생체인증 API =============
@app.route("/api/biometric/register-challenge", methods=["POST"])
def biometric_register_challenge():
"""생체인증 등록을 위한 challenge 생성"""
try:
import secrets
from datetime import datetime
data = request.json
employee_id = data.get("employeeId")
challenge = secrets.token_urlsafe(32)
session[f"biometric_challenge_{employee_id}"] = {
"challenge": challenge,
"timestamp": datetime.now().timestamp()
}
return jsonify({
"success": True,
"challenge": challenge,
"userId": employee_id
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/biometric/register", methods=["POST"])
def biometric_register():
"""생체인증 credential 등록"""
try:
from datetime import datetime
data = request.json
employee_id = data.get("employeeId")
credential = data.get("credential")
with build_pg_client() as c:
r = c.post(
"/biometric_credentials",
json={
"employee_id": employee_id,
"credential_id": credential["id"],
"credential_data": credential,
"created_at": datetime.now().isoformat()
}
)
r.raise_for_status()
session.pop(f"biometric_challenge_{employee_id}", None)
return jsonify({
"success": True,
"message": "생체인증이 등록되었습니다."
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/biometric/login-challenge", methods=["POST"])
def biometric_login_challenge():
"""생체인증 로그인을 위한 challenge 생성"""
try:
import secrets
from datetime import datetime
data = request.json
employee_id = data.get("employeeId")
challenge = secrets.token_urlsafe(32)
session[f"biometric_login_challenge_{employee_id}"] = {
"challenge": challenge,
"timestamp": datetime.now().timestamp()
}
return jsonify({
"success": True,
"challenge": challenge
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/biometric/login", methods=["POST"])
def biometric_login():
"""생체인증으로 로그인"""
try:
data = request.json
employee_id = data.get("employeeId")
assertion = data.get("assertion")
with build_pg_client() as c:
r = c.get(
"/biometric_credentials",
params={
"employee_id": f"eq.{employee_id}",
"limit": "1"
}
)
r.raise_for_status()
credentials = r.json() or []
if not credentials:
return jsonify({
"success": False,
"error": "등록된 생체인증 정보가 없습니다."
}), 404
stored_credential = credentials[0]
if stored_credential["credential_id"] != assertion["id"]:
return jsonify({
"success": False,
"error": "인증 실패"
}), 401
# users 테이블에서 사용자 정보 조회
with build_pg_client() as c:
r = c.get(
"/users",
params={
"employee_id": f"eq.{employee_id}",
"limit": "1"
}
)
r.raise_for_status()
users = r.json() or []
if not users:
return jsonify({
"success": False,
"error": "사용자를 찾을 수 없습니다."
}), 404
user_data = users[0]
# 세션에 사용자 정보 저장
session["user_id"] = user_data["id"]
session["email"] = user_data["email"]
session["employee_id"] = user_data["employee_id"]
session["name"] = user_data["name"]
session["department_id"] = user_data["department_id"]
session.permanent = True
session.pop(f"biometric_login_challenge_{employee_id}", None)
return jsonify({
"success": True,
"user": {
"id": user_data["id"],
"email": user_data["email"],
"employee_id": user_data["employee_id"],
"name": user_data["name"],
"department_id": user_data["department_id"]
}
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/biometric/unregister", methods=["POST"])
def biometric_unregister():
"""생체인증 해제"""
try:
data = request.json
employee_id = data.get("employeeId")
with build_pg_client() as c:
r = c.delete(
"/biometric_credentials",
params={"employee_id": f"eq.{employee_id}"}
)
r.raise_for_status()
return jsonify({
"success": True,
"message": "생체인증이 해제되었습니다."
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
# ============= 데이터 조회 API =============
@app.route("/health")
def health():
return jsonify({"status": "ok"})
@app.route("/favicon.ico")
def favicon():
return ("", 204, {"Content-Type": "image/x-icon"})
@app.route("/api/manufacturers")
def api_manufacturers():
"""제조사 목록을 JSON으로 반환"""
try:
with build_pg_client() as c:
# Fault_Code_Table에서 제조사
r1 = c.get("/Fault_Code_Table", params={"select": "manufacturer"})
r1.raise_for_status()
vals1 = [row.get("manufacturer") for row in (r1.json() or []) if row.get("manufacturer")]
# Signals 테이블에서 제조사
vals2 = []
for path in ("/Signals", "/signals"):
try:
r2 = c.get(path, params={"select": "manufacturer"})
r2.raise_for_status()
vals2 = [row.get("manufacturer") for row in (r2.json() or []) if row.get("manufacturer")]
break
except httpx.HTTPError:
continue
# MMI_Code 테이블에서 제조사
vals3 = []
try:
r3 = c.get("/MMI_Code", params={"select": "manufacturer"})
r3.raise_for_status()
vals3 = [row.get("manufacturer") for row in (r3.json() or []) if row.get("manufacturer")]
except httpx.HTTPError:
pass
# 중복 제거
all_manufacturers = vals1 + vals2 + vals3
seen = set()
unique_manufacturers = []
for v in all_manufacturers:
if v and v not in seen:
seen.add(v)
unique_manufacturers.append(v)
unique_manufacturers.sort()
return jsonify({
"success": True,
"data": {
"manufacturers": unique_manufacturers
}
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/filters/fault")
def api_fault_filters():
"""고장코드 필터 옵션을 JSON으로 반환"""
try:
selected_manufacturer = request.args.get("manufacturer", "").strip()
devices = pg_unique("device")
car_types = pg_unique("car_type")
car_ids = pg_unique("car_id")
all_alias = pg_unique("alias_name")
def alias_allowed(name: str) -> bool:
if not name:
return False
low = name.lower()
if selected_manufacturer.lower() == "woojin":
return ("우진" in name) or ("woojin" in low)
if selected_manufacturer.lower() == "rotem":
return ("로템" in name) or ("다대" in name) or ("rotem" in low)
return True
alias_names = [a for a in all_alias if alias_allowed(a)]
return jsonify({
"success": True,
"data": {
"devices": sorted(devices),
"car_types": sorted(car_types),
"car_ids": sorted(car_ids),
"alias_names": sorted(alias_names)
}
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/filters/signal")
def api_signal_filters():
"""TCMS 신호 필터 옵션을 JSON으로 반환"""
try:
selected_manufacturer = request.args.get("manufacturer", "").strip()
all_alias = pg_unique_from("Signals", "alias_name")
def alias_allowed(name: str) -> bool:
if not name:
return False
low = name.lower()
if selected_manufacturer.lower() == "woojin":
return ("우진" in name) or ("woojin" in low)
if selected_manufacturer.lower() == "rotem":
return ("로템" in name) or ("다대" in name) or ("rotem" in low)
return True
alias_names = [a for a in all_alias if alias_allowed(a)]
classifications = pg_unique_from("Signals", "classification")
return jsonify({
"success": True,
"data": {
"alias_names": sorted(alias_names),
"classifications": sorted(classifications)
}
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/filters/mmi")
def api_mmi_filters():
"""MMI 코드 필터 옵션을 JSON으로 반환"""
try:
selected_manufacturer = request.args.get("manufacturer", "").strip()
with build_pg_client() as c:
res = c.get("/MMI_Code", params={"select": "alias_name"})
res.raise_for_status()
all_alias = sorted({row.get("alias_name") for row in (res.json() or []) if row.get("alias_name")})
def alias_allowed(name: str) -> bool:
if not name:
return False
low = name.lower()
if selected_manufacturer.lower() == "woojin":
return ("우진" in name) or ("woojin" in low)
if selected_manufacturer.lower() == "rotem":
return ("로템" in name) or ("다대" in name) or ("rotem" in low)
return True
alias_names = [a for a in all_alias if alias_allowed(a)]
return jsonify({
"success": True,
"data": {
"alias_names": alias_names
}
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/mmi/list")
def api_mmi_list():
"""MMI 코드 목록을 JSON으로 반환"""
try:
PAGE_SIZE = 50
page = int(request.args.get("page", "0"))
offset = page * PAGE_SIZE
manufacturer = request.args.get("manufacturer", "").strip()
alias_name = request.args.get("alias_name", "").strip()
q = request.args.get("q", "").strip()
params: Dict[str, str] = {
"select": "id,code_name,code_description,data_type,car_id,alias_name,manufacturer",
"order": "code_name.asc",
"limit": str(PAGE_SIZE),
"offset": str(offset),
}
if manufacturer:
params["manufacturer"] = f"eq.{manufacturer}"
if alias_name:
params["alias_name"] = f"eq.{alias_name}"
if q:
params["or"] = f"(code_name.ilike.*{q}*,code_description.ilike.*{q}*)"
with build_pg_client() as c:
r = c.get("/MMI_Code", params=params)
r.raise_for_status()
rows = r.json() or []
return jsonify({
"success": True,
"data": {
"rows": rows,
"total": len(rows),
"page": page,
"page_size": PAGE_SIZE,
"has_next": len(rows) == PAGE_SIZE,
"has_prev": page > 0
}
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/faults/list")
def api_faults_list():
"""고장코드 목록을 JSON으로 반환"""
try:
PAGE_SIZE = 50
page = int(request.args.get("page", "0"))
offset = page * PAGE_SIZE
manufacturer = request.args.get("manufacturer", "").strip()
device = request.args.get("device", "").strip()
car_type = request.args.get("car_type", "").strip()
car_id = request.args.get("car_id", "").strip()
alias_name = request.args.get("alias_name", "").strip()
q = request.args.get("q", "").strip()
params: Dict[str, str] = {
"select": "f_code,f_code_num,f_name,manufacturer,device,car_type,car_id,fault_detail,alias_name",
"order": "f_code.asc",
"limit": str(PAGE_SIZE),
"offset": str(offset),
}
if manufacturer:
params["manufacturer"] = f"eq.{manufacturer}"
if device:
params["device"] = f"eq.{device}"
if car_type:
params["car_type"] = f"eq.{car_type}"
if car_id:
params["car_id"] = f"eq.{car_id}"
if alias_name:
params["alias_name"] = f"eq.{alias_name}"
if q:
params["or"] = f"(f_code.ilike.*{q}*,f_name.ilike.*{q}*)"
with build_pg_client() as c:
r = c.get("/Fault_Code_Table", params=params)
r.raise_for_status()
rows = r.json() or []
# 그룹핑 옵션
group_code = request.args.get("group_code", "").strip().lower()
if group_code == "on":
seen: Dict[str, bool] = {}
dedup: List[Dict] = []
for row in rows:
code = row.get("f_code")
if code and code not in seen:
seen[code] = True
dedup.append(row)
rows = dedup
return jsonify({
"success": True,
"data": {
"rows": rows,
"total": len(rows),
"page": page,
"page_size": PAGE_SIZE,
"has_next": len(rows) == PAGE_SIZE,
"has_prev": page > 0
}
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/faults/<string:f_code>")
def api_fault_detail(f_code: str):
"""고장코드 상세 정보를 JSON으로 반환"""
try:
params = {
"select": "f_code,f_code_num,f_name,car_type,f_class,grade,device,fault_detail,fault_reaction,fault_detection,fault_clear,fault_action,fault_schematics,car_id,manufacturer,alias_name",
"f_code": f"eq.{f_code}",
"limit": "1",
}
with build_pg_client() as c:
r = c.get("/Fault_Code_Table", params=params)
r.raise_for_status()
rows = r.json() or []
if not rows:
return jsonify({
"success": False,
"error": "고장코드를 찾을 수 없습니다."
}), 404
row = rows[0]
return jsonify({
"success": True,
"data": row
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/signals/list")
def api_signals_list():
"""TCMS 신호 목록을 JSON으로 반환"""
try:
PAGE_SIZE = 50
page = int(request.args.get("page", "0"))
offset = page * PAGE_SIZE
manufacturer = request.args.get("manufacturer", "").strip()
classification = request.args.get("classification", "").strip()
q = request.args.get("q", "").strip()
alias_name = request.args.get("alias_name", "").strip()
params: Dict[str, str] = {
"select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name",
"order": "sig_num.asc",
"limit": str(PAGE_SIZE),
"offset": str(offset),
}
if manufacturer:
params["manufacturer"] = f"eq.{manufacturer}"
if classification:
params["classification"] = f"eq.{classification}"
if alias_name:
params["alias_name"] = f"eq.{alias_name}"
if q:
params["or"] = f"(signal_abbreviation.ilike.*{q}*,signal_description.ilike.*{q}*)"
with build_pg_client() as c:
rows = []
last_error: Exception | None = None
for path in ("/Signals", "/signals"):
try:
r = c.get(path, params=params)
r.raise_for_status()
rows = r.json() or []
last_error = None
break
except Exception as e:
last_error = e
rows = []
continue
if last_error and not rows:
raise last_error
group_code = request.args.get("group_code", "").strip().lower()
if group_code == "on":
seen: Dict[str, bool] = {}
dedup: List[Dict] = []
for row in rows:
code = row.get("id")
if code and code not in seen:
seen[code] = True
dedup.append(row)
rows = dedup
return jsonify({
"success": True,
"data": {
"rows": rows,
"total": len(rows),
"page": page,
"page_size": PAGE_SIZE,
"has_next": len(rows) == PAGE_SIZE,
"has_prev": page > 0
}
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/signals/<string:item_id>")
def api_signal_detail(item_id: str):
"""TCMS 신호 상세 정보를 JSON으로 반환"""
try:
params = {
"select": "id,sig_num,signal_abbreviation,signal_description,status_value,manufacturer,classification,alias_name,original_data,created_at,updated_at",
"id": f"eq.{item_id}",
"limit": "1",
}
with build_pg_client() as c:
rows = []
last_error: Exception | None = None
for path in ("/Signals", "/signals"):
try:
r = c.get(path, params=params)
r.raise_for_status()
rows = r.json() or []
last_error = None
break
except Exception as e:
last_error = e
rows = []
continue
if last_error and not rows:
raise last_error
if not rows:
return jsonify({
"success": False,
"error": "TCMS 신호를 찾을 수 없습니다."
}), 404
row = rows[0]
return jsonify({
"success": True,
"data": row
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/sb/health")
def sb_health():
try:
with build_pg_client() as c:
r = c.get("/Fault_Code_Table", params={"select": "f_code", "limit": "1"})
r.raise_for_status()
return jsonify({"sb": "ok", "url": app.config.get("SUPABASE_URL")})
except Exception as e:
return jsonify({"sb": "error", "url": app.config.get("SUPABASE_URL"), "error": str(e)})
return app
app = create_app()
if __name__ == "__main__":
port = int(os.environ.get("PORT", "5000"))
app.run(host="0.0.0.0", port=port, debug=True)