""" 회원 관리 및 인증 모듈 Supabase 이메일 인증을 사용하여 humetro.busan.kr 도메인만 허용 """ import os import re import secrets from typing import Optional, Dict, Any import httpx from flask import session, current_app from datetime import datetime, timedelta class AuthManager: """인증 및 회원 관리를 담당하는 클래스""" ALLOWED_EMAIL_DOMAIN = "humetro.busan.kr" def __init__(self, supabase_url: str, supabase_key: str): self.supabase_url = supabase_url.rstrip("/") self.supabase_key = supabase_key self.rest_base = f"{self.supabase_url}/rest/v1" self.auth_base = f"{self.supabase_url}/auth/v1" def _get_headers(self, auth_token: Optional[str] = None) -> Dict[str, str]: """API 요청을 위한 헤더 생성""" headers = { "apikey": self.supabase_key, "Content-Type": "application/json", "Accept-Profile": "public", "Content-Profile": "public", } if auth_token: headers["Authorization"] = f"Bearer {auth_token}" else: headers["Authorization"] = f"Bearer {self.supabase_key}" return headers def validate_email(self, email: str) -> tuple[bool, str]: """이메일 유효성 검증 - humetro.busan.kr 도메인만 허용""" email = email.strip().lower() # 이메일 형식 검증 email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, email): return False, "올바른 이메일 형식이 아닙니다." # 도메인 검증 domain = email.split('@')[1] if domain != self.ALLOWED_EMAIL_DOMAIN: return False, f"{self.ALLOWED_EMAIL_DOMAIN} 도메인의 이메일만 사용 가능합니다." return True, "OK" def get_departments(self) -> list[Dict[str, Any]]: """가입 가능한 부서 목록 조회""" with httpx.Client() as client: response = client.get( f"{self.rest_base}/departments", params={"select": "id,name,code", "order": "name.asc"}, headers=self._get_headers() ) response.raise_for_status() return response.json() or [] def check_email_exists(self, email: str) -> bool: """이메일 중복 체크""" with httpx.Client() as client: response = client.get( f"{self.rest_base}/users", params={"select": "id", "email": f"eq.{email.lower()}", "limit": "1"}, headers=self._get_headers() ) response.raise_for_status() users = response.json() or [] return len(users) > 0 def check_employee_id_exists(self, employee_id: str) -> bool: """사번 중복 체크""" with httpx.Client() as client: response = client.get( f"{self.rest_base}/users", params={"select": "id", "employee_id": f"eq.{employee_id}", "limit": "1"}, headers=self._get_headers() ) response.raise_for_status() users = response.json() or [] return len(users) > 0 def signup_user( self, email: str, password: str, employee_id: str, name: str, department_id: int ) -> tuple[bool, str, Optional[Dict]]: """ 회원가입 처리 1. Supabase Auth에 사용자 생성 2. users 테이블에 사용자 정보 저장 """ email = email.strip().lower() # 이메일 유효성 검증 is_valid, message = self.validate_email(email) if not is_valid: return False, message, None # 이메일 중복 체크 if self.check_email_exists(email): return False, "이미 등록된 이메일입니다.", None # 사번 중복 체크 if self.check_employee_id_exists(employee_id): return False, "이미 등록된 사번입니다.", None try: # 1. Supabase Auth에 사용자 생성 (이메일 확인 필요) with httpx.Client() as client: auth_response = client.post( f"{self.auth_base}/signup", json={ "email": email, "password": password, "email_confirm": False # 이메일 인증 링크 발송 }, headers=self._get_headers() ) if auth_response.status_code != 200: error_data = auth_response.json() return False, error_data.get("msg", "회원가입에 실패했습니다."), None auth_data = auth_response.json() user_id = auth_data.get("id") # 2. users 테이블에 사용자 정보 저장 user_response = client.post( f"{self.rest_base}/users", json={ "auth_id": user_id, "email": email, "employee_id": employee_id, "name": name, "department_id": department_id, "is_active": True }, headers=self._get_headers() ) if user_response.status_code not in [200, 201]: return False, "사용자 정보 저장에 실패했습니다.", None return True, "회원가입이 완료되었습니다. 이메일 인증을 완료해주세요.", auth_data except Exception as e: return False, f"회원가입 중 오류가 발생했습니다: {str(e)}", None def login_user_by_employee_id(self, employee_id: str, password: str) -> tuple[bool, str, Optional[Dict]]: """사번으로 로그인 처리""" employee_id = employee_id.strip() # 개발/테스트 계정 (123456/123456) if employee_id == "123456" and password == "123456": session_data = { "user_id": 0, "auth_id": "test-auth-id", "email": "test@humetro.busan.kr", "employee_id": "123456", "name": "테스트 사용자", "department_id": 1, "access_token": "test-token-123456" } return True, "로그인 성공 (개발 모드)", session_data try: with httpx.Client() as client: # 사번으로 사용자 조회 user_response = client.get( f"{self.rest_base}/users", params={ "select": "id,auth_id,email,employee_id,name,department_id,is_active", "employee_id": f"eq.{employee_id}", "limit": "1" }, headers=self._get_headers() ) if user_response.status_code != 200: return False, "사번 또는 비밀번호가 올바르지 않습니다.", None users = user_response.json() or [] if not users: return False, "사번 또는 비밀번호가 올바르지 않습니다.", None user_data = users[0] # 활성화된 사용자인지 확인 if not user_data.get("is_active", False): return False, "비활성화된 계정입니다. 관리자에게 문의하세요.", None # 이메일로 Supabase Auth 로그인 email = user_data["email"] auth_response = client.post( f"{self.auth_base}/token?grant_type=password", json={ "email": email, "password": password }, headers=self._get_headers() ) if auth_response.status_code != 200: return False, "사번 또는 비밀번호가 올바르지 않습니다.", None auth_data = auth_response.json() access_token = auth_data.get("access_token") # 세션에 저장할 데이터 session_data = { "user_id": user_data["id"], "auth_id": user_data["auth_id"], "email": user_data["email"], "employee_id": user_data["employee_id"], "name": user_data["name"], "department_id": user_data["department_id"], "access_token": access_token } return True, "로그인 성공", session_data except httpx.HTTPError as e: print(f"HTTP 오류: {str(e)}") return False, f"로그인 중 오류가 발생했습니다: {str(e)}", None except Exception as e: print(f"로그인 오류: {str(e)}") return False, f"로그인 중 오류가 발생했습니다: {str(e)}", None def login_user(self, email: str, password: str) -> tuple[bool, str, Optional[Dict]]: """로그인 처리 (이메일 기반, 하위 호환성 유지)""" email = email.strip().lower() try: with httpx.Client() as client: # Supabase Auth로 로그인 auth_response = client.post( f"{self.auth_base}/token?grant_type=password", json={ "email": email, "password": password }, headers=self._get_headers() ) if auth_response.status_code != 200: return False, "이메일 또는 비밀번호가 올바르지 않습니다.", None auth_data = auth_response.json() access_token = auth_data.get("access_token") user_auth_id = auth_data.get("user", {}).get("id") # users 테이블에서 사용자 정보 조회 user_response = client.get( f"{self.rest_base}/users", params={ "select": "id,auth_id,email,employee_id,name,department_id,is_active", "auth_id": f"eq.{user_auth_id}", "limit": "1" }, headers=self._get_headers(access_token) ) if user_response.status_code != 200: return False, "사용자 정보를 찾을 수 없습니다.", None users = user_response.json() or [] if not users: return False, "사용자 정보를 찾을 수 없습니다.", None user_data = users[0] # 활성화된 사용자인지 확인 if not user_data.get("is_active", False): return False, "비활성화된 계정입니다. 관리자에게 문의하세요.", None # 세션에 저장할 데이터 session_data = { "user_id": user_data["id"], "auth_id": user_data["auth_id"], "email": user_data["email"], "employee_id": user_data["employee_id"], "name": user_data["name"], "department_id": user_data["department_id"], "access_token": access_token } return True, "로그인 성공", session_data except Exception as e: return False, f"로그인 중 오류가 발생했습니다: {str(e)}", None def get_user_by_id(self, user_id: int, access_token: str) -> Optional[Dict]: """사용자 ID로 사용자 정보 조회""" try: with httpx.Client() as client: response = client.get( f"{self.rest_base}/users", params={ "select": "id,email,employee_id,name,department_id,departments(name),is_active", "id": f"eq.{user_id}", "limit": "1" }, headers=self._get_headers(access_token) ) response.raise_for_status() users = response.json() or [] return users[0] if users else None except Exception: return None def get_user_by_employee_id(self, employee_id: str) -> Optional[Dict]: """사번으로 사용자 정보 조회""" try: with httpx.Client() as client: response = client.get( f"{self.rest_base}/users", params={ "select": "id,auth_id,email,employee_id,name,department_id,is_active", "employee_id": f"eq.{employee_id}", "limit": "1" }, headers=self._get_headers() ) response.raise_for_status() users = response.json() or [] return users[0] if users else None except Exception: return None def reset_password(self, email: str, new_password: str) -> tuple[bool, str]: """비밀번호 재설정""" try: with httpx.Client() as client: # Supabase Auth Admin API를 사용하여 비밀번호 업데이트 # 참고: 실제 환경에서는 관리자 키 필요 response = client.put( f"{self.auth_base}/admin/users/{email}", json={ "password": new_password }, headers=self._get_headers() ) if response.status_code == 200: return True, "비밀번호가 성공적으로 재설정되었습니다." else: return False, "비밀번호 재설정에 실패했습니다." except Exception as e: print(f"비밀번호 재설정 오류: {str(e)}") return False, f"비밀번호 재설정 중 오류가 발생했습니다: {str(e)}" def logout_user(self, access_token: str) -> bool: """로그아웃 처리""" try: with httpx.Client() as client: response = client.post( f"{self.auth_base}/logout", headers=self._get_headers(access_token) ) return response.status_code == 204 except Exception: return False def get_current_user() -> Optional[Dict]: """현재 로그인한 사용자 정보 반환""" if "user_id" in session: return { "user_id": session.get("user_id"), "email": session.get("email"), "employee_id": session.get("employee_id"), "name": session.get("name"), "department_id": session.get("department_id"), "access_token": session.get("access_token") } return None def login_required(f): """로그인이 필요한 라우트를 위한 데코레이터""" from functools import wraps from flask import redirect, url_for @wraps(f) def decorated_function(*args, **kwargs): if get_current_user() is None: return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function