Supabase 증분 동기화 서비스 초안 추가

This commit is contained in:
9700X_PC 2026-02-19 21:19:09 +09:00
parent c5f4623b02
commit 3289d0b9f0
5 changed files with 443 additions and 6 deletions

View File

@ -0,0 +1,158 @@
"""
Supabase 동기화 서비스
크롤링 대신 Supabase REST API에서 게시글을 증분 동기화합니다.
기본 설계는 pull 기반이며, Realtime 구독 도입 브리지 역할을 수행합니다.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Optional
import requests
from pathlib import Path
@dataclass
class SyncResult:
success: bool
pulled_count: int = 0
upserted_count: int = 0
skipped_count: int = 0
message: str = ""
error_code: str = ""
class SupabaseSyncService:
"""Supabase posts 테이블 증분 동기화 서비스"""
CURSOR_KEY = "supabase_posts_cursor"
def __init__(self, settings: dict, db, logger):
self.settings = settings
self.db = db
self.logger = logger
@property
def sync_settings(self) -> dict:
return self.settings.get("sync", {})
@property
def enabled(self) -> bool:
return bool(self.sync_settings.get("enabled", False))
def _resolve_connection(self) -> tuple[str, str, bool | str, str]:
sync = self.sync_settings
url = str(sync.get("supabase_url", "")).strip()
key = str(sync.get("supabase_key", "")).strip()
table = str(sync.get("table", "posts")).strip() or "posts"
if sync.get("use_update_connection", True):
update_cfg = self.settings.get("update", {})
url = url or str(update_cfg.get("supabase_url", "")).strip()
key = key or str(update_cfg.get("supabase_key", "")).strip()
ssl_verify = sync.get("ssl_verify", self.settings.get("update", {}).get("ssl_verify", True))
ca_bundle_path = str(sync.get("ca_bundle_path", self.settings.get("update", {}).get("ca_bundle_path", ""))).strip()
verify_option = self._build_verify_option(bool(ssl_verify), ca_bundle_path)
return url.rstrip("/"), key, verify_option, table
@staticmethod
def _build_verify_option(ssl_verify: bool, ca_bundle_path: str) -> bool | str:
if not ssl_verify:
return False
bundle = str(ca_bundle_path or "").strip()
if not bundle:
return True
bundle_path = Path(bundle).expanduser()
return str(bundle_path) if bundle_path.exists() else True
def sync_once(self) -> SyncResult:
"""증분 동기화 1회 수행"""
if not self.enabled:
return SyncResult(success=True, message="동기화 비활성화")
url, key, verify_option, table = self._resolve_connection()
if not url or not key:
return SyncResult(success=False, message="sync.supabase_url/supabase_key 미설정", error_code="ERR_SYNC_CONFIG")
batch_size = int(self.sync_settings.get("pull_batch_size", 500))
timeout = int(self.sync_settings.get("timeout_sec", 10))
cursor = self.db.get_sync_cursor(self.CURSOR_KEY)
headers = {
"apikey": key,
"Authorization": f"Bearer {key}",
"Content-Type": "application/json",
}
params: dict[str, Any] = {
"select": "*",
"order": "updated_at.asc",
"limit": str(batch_size),
}
if cursor:
params["updated_at"] = f"gt.{cursor}"
endpoint = f"{url}/rest/v1/{table}"
try:
resp = requests.get(endpoint, headers=headers, params=params, timeout=timeout, verify=verify_option)
resp.raise_for_status()
rows = resp.json() or []
if not rows:
return SyncResult(success=True, message="신규 동기화 데이터 없음")
upserted = 0
latest_cursor: Optional[str] = cursor
for row in rows:
post_data = self._map_row_to_post(row)
if not post_data:
continue
self.db.upsert_post(post_data)
upserted += 1
updated_at = str(row.get("updated_at", "")).strip()
if updated_at:
latest_cursor = updated_at
if latest_cursor:
self.db.save_sync_cursor(self.CURSOR_KEY, latest_cursor)
self.logger.info(f"Supabase 동기화 완료: pulled={len(rows)} upserted={upserted}")
return SyncResult(
success=True,
pulled_count=len(rows),
upserted_count=upserted,
skipped_count=max(0, len(rows) - upserted),
message="동기화 완료",
)
except requests.exceptions.Timeout:
return SyncResult(success=False, message="동기화 요청 시간 초과", error_code="ERR_SYNC_TIMEOUT")
except requests.exceptions.SSLError as e:
return SyncResult(success=False, message=f"SSL 검증 실패: {e}", error_code="ERR_SYNC_SSL")
except requests.exceptions.RequestException as e:
return SyncResult(success=False, message=f"네트워크 오류: {e}", error_code="ERR_SYNC_NETWORK")
except Exception as e:
return SyncResult(success=False, message=f"동기화 실패: {e}", error_code="ERR_SYNC_UNKNOWN")
@staticmethod
def _map_row_to_post(row: dict) -> Optional[dict]:
post_id = str(row.get("id", "")).strip()
if not post_id:
return None
return {
"id": post_id,
"title": str(row.get("title", "")),
"writer": str(row.get("writer", "")),
"date": str(row.get("date", "")),
"department": str(row.get("department", "")),
"is_public": int(row.get("is_public", 1) or 1),
"status": str(row.get("status", "")),
"is_related": int(row.get("is_related", 0) or 0),
"channel": str(row.get("channel", "")),
}

View File

@ -572,10 +572,99 @@ LIMIT 1
---
## 9. 변경 이력
## 9. Supabase 동기화 전환 계약 (초안)
### 9.1 Pydantic 데이터 모델
```python
class SyncCursor(BaseModel):
"""증분 동기화 기준점"""
last_synced_at: datetime
last_event_id: Optional[str] = None
class SyncPolicy(BaseModel):
"""클라이언트 동기화 정책"""
pull_batch_size: int = Field(500, ge=100, le=5000)
reconnect_backoff_sec: int = Field(5, ge=1, le=300)
max_gap_fill_hours: int = Field(72, ge=1, le=720)
realtime_enabled: bool = True
class SyncEvent(BaseModel):
"""Supabase Realtime 이벤트 표준 모델"""
event_type: Literal["INSERT", "UPDATE", "DELETE"]
table: Literal["voc_posts"]
schema: str = "public"
commit_timestamp: datetime
record: dict
old_record: Optional[dict] = None
class SyncUpsertPayload(BaseModel):
"""Collector -> Supabase 업서트 표준 페이로드"""
id: str
title: str
writer: str
department: str
date: datetime
status: str
channel: str
is_public: int = Field(1, ge=0, le=1)
content: Optional[str] = None
attachment: Optional[str] = None
is_related: int = Field(0, ge=0, le=1)
source_updated_at: datetime
source_hash: Optional[str] = None
class SyncResult(BaseModel):
"""동기화 실행 결과"""
success: bool
error_code: Optional[str] = None
message: str = ""
pulled_count: int = 0
upserted_count: int = 0
skipped_count: int = 0
```
### 9.2 모듈 간 인터페이스 규약
| 호출자 | 메서드 | 입력 | 출력 | 설명 |
|--------|--------|------|------|------|
| `AppController` | `sync_client.initial_sync(cursor, policy)` | `SyncCursor, SyncPolicy` | `SyncResult` | 앱 시작 시 초기 증분 동기화 |
| `AppController` | `sync_client.start_realtime(policy, on_event, on_error)` | `SyncPolicy, Callable, Callable` | `None` | 실시간 이벤트 구독 시작 |
| `AppController` | `sync_client.gap_fill(cursor, policy)` | `SyncCursor, SyncPolicy` | `SyncResult` | 재연결 후 누락 구간 보정 |
| `SyncClient` | `VOCDatabase.upsert_posts(posts)` | `list[SyncUpsertPayload]` | `int` | 로컬 DB idempotent upsert |
| `SyncClient` | `VOCDatabase.save_sync_cursor(cursor)` | `SyncCursor` | `None` | 마지막 동기화 기준점 저장 |
| `Collector` | `supabase_repo.upsert_posts(posts)` | `list[SyncUpsertPayload]` | `SyncResult` | Supabase 테이블 업서트 |
### 9.3 오류 코드 계약
| 코드 | 계층 | 의미 |
|------|------|------|
| `ERR_SYNC_AUTH` | SyncClient | Supabase 인증 실패 |
| `ERR_SYNC_NETWORK` | SyncClient | 네트워크/전송 오류 |
| `ERR_SYNC_REALTIME_DROP` | SyncClient | Realtime 연결 끊김 |
| `ERR_SYNC_GAP_FILL` | SyncClient | 누락 구간 보정 실패 |
| `ERR_SYNC_SCHEMA` | SyncClient/Collector | 이벤트/페이로드 스키마 불일치 |
| `ERR_SYNC_DB_LOCK` | Client DB | 로컬 SQLite 락 |
| `ERR_COLLECTOR_UPSERT` | Collector | Supabase 업서트 실패 |
### 9.4 충돌/정합성 규칙
- 동일 `id``source_updated_at`이 더 최신인 레코드만 반영
- 동일 데이터 재수신은 무시 가능해야 함(idempotent)
- `DELETE` 이벤트는 즉시 물리삭제하지 않고 soft-delete 플래그 또는 tombstone 정책 우선
- cursor 저장은 배치 커밋 단위로 수행해 재시작 시 중복/누락 최소화
---
## 10. 변경 이력
| 날짜 | 버전 | 변경 내용 |
|------|------|-----------|
| 2026-02-19 | 3.4 | **Supabase 동기화 전환 계약 초안** - Realtime 이벤트/커서/동기화 상태 Pydantic 모델, Collector-Client 인터페이스 규격 추가 |
| 2026-02-18 | 3.3 | **알림 설정 확장** - `noti.db_check_interval_minutes`, `noti.unchecked_check_interval_minutes`, `noti.unchecked_delay_enabled` 규격 추가 |
| 2026-02-18 | 3.2 | **자동 업데이트 모듈 명세 추가** - UpdateManager, UpdaterGUI 인터페이스 정의, config.json 프로토콜 |
| 2026-02-18 | 3.1 | **크롤링 시스템 개선** - ScraperService 하드코딩 제거 (`target_depts`, `keywords` settings.json 연동), 필터링 로직 고도화 (AND/OR 모드, `_check_filter_match` 메서드), 예외처리 강화 |
@ -587,5 +676,5 @@ LIMIT 1
---
작성자: KH.Choi
최종 수정: 2026-02-18
버전: 3.3
최종 수정: 2026-02-19
버전: 3.4

View File

@ -642,6 +642,93 @@ updater = AutoUpdater(
---
## 12. 아키텍처 전환 사양 (승인 전 코딩 금지)
### 12.1 목표
- 현재 프로젝트에서 Selenium 기반 크롤링을 제거하고, Supabase를 원천 데이터 소스로 사용
- `로컬 SQLite 캐시 + Supabase Realtime 동기화` 구조로 전환
- 크롤링/정제/Upsert는 별도 프로젝트(수집 파이프라인)로 분리
### 12.2 대상 시스템 분리
1) **Collector 프로젝트(신규)**
- 역할: 외부 VOC 소스 수집, 정제, Supabase `voc_posts` Upsert
- 책임: 원천 품질 보장, 중복 제거, 스키마 준수, 재처리(Replay) 지원
2) **Client 프로젝트(현재 앱)**
- 역할: Supabase 조회/Realtime 수신, 로컬 캐시 유지, 알림/보고서/UI
- 책임: 오프라인 내구성, 사용자 설정 기반 필터링, 읽음/확인 상태 관리
### 12.3 동기화 상세 로직
#### 12.3.1 초기 동기화 (Cold Start)
```
1. 앱 시작 시 Supabase 접속 상태 확인
2. 마지막 동기화 시각(cursor) 조회
3. cursor 이후 데이터를 페이지 단위로 pull
4. 로컬 DB에 upsert
5. 로컬 마지막 동기화 시각 저장
6. Realtime 구독 시작
```
#### 12.3.2 증분 동기화 (Realtime)
```
1. Supabase Realtime 이벤트 수신(INSERT/UPDATE)
2. 이벤트 payload 스키마 검증(Pydantic)
3. 로컬 DB upsert
4. 알림 정책 평가(관심조건/미확인 정책)
5. 토스트/팝업 표시
```
#### 12.3.3 복구 동기화 (Gap Fill)
```
1. Realtime 연결 끊김 감지
2. 재연결 성공 시 마지막 cursor 기준 증분 pull
3. 누락 구간 보정 후 Realtime 재구독
```
### 12.4 영향도 분석
| 영역 | 영향도 | 변경 방향 |
|------|--------|-----------|
| `SchedulerManager` | 높음 | 크롤링 스케줄 제거, 동기화/헬스체크 스케줄로 대체 |
| `ScraperService` | 매우 높음 | 현재 프로젝트에서 제거 또는 어댑터 계층으로 축소 |
| `VOCDatabase` | 높음 | Realtime payload upsert/충돌해결/커서 저장 메서드 추가 |
| `NotificationManager` | 중간 | 이벤트 소스가 DB poll 중심에서 sync-event 중심으로 이동 |
| `SettingsDialog` | 중간 | Supabase sync/realtime 정책 항목 추가 |
| `Controller` | 높음 | 로그인/크롤링 초기화 제거, SyncClient 초기화 추가 |
### 12.5 예외 케이스 및 처리 원칙
| 예외 상황 | 감지 조건 | 처리 전략 | 사용자 메시지 |
|-----------|-----------|-----------|---------------|
| Supabase 접속 실패 | 연결/인증 오류 | 로컬 캐시 모드로 강등 + 백오프 재시도 | "서버 연결 실패. 로컬 데이터로 동작합니다." |
| Realtime 끊김 | heartbeat timeout | 재구독 + gap fill 수행 | "실시간 동기화 재연결 중입니다." |
| 이벤트 스키마 불일치 | Pydantic validation error | 해당 이벤트 격리/로그 + 전체 파이프라인 유지 | "일부 데이터 형식 오류가 발생했습니다." |
| cursor 손상 | 파싱 실패/null | 안전 cursor(최근 N시간)로 복구 pull | "동기화 기준점을 복구했습니다." |
| 중복 이벤트 | 동일 id/updated_at 재수신 | upsert idempotent 처리 | (사용자 노출 없음) |
| out-of-order 이벤트 | 더 오래된 updated_at 수신 | 최신 타임스탬프 우선 규칙으로 무시 | (사용자 노출 없음) |
| 로컬 DB 잠금 | sqlite lock timeout | 짧은 재시도 + 큐 적재 | "로컬 저장이 지연되고 있습니다." |
| 대량 백필 부하 | 초기 sync row 폭증 | 배치 처리 + UI 비차단 + 진행률 표시 | "초기 동기화 중입니다." |
### 12.6 비기능 요구사항
- 동기화 정확성: 동일 데이터 재수신 시 결과 동일(idempotent)
- 복구 가능성: 장애 후 자동 gap fill로 누락 0건 목표
- 사용자 연속성: 오프라인/부분 장애에서도 조회·보고서 기능 유지
- 관찰 가능성: sync lag, reconnect 횟수, validation 실패 건수 로그 필수
### 12.7 단계별 전환 계획
1. 문서 계약 확정(본 문서 + `api_contract.md`)
2. Collector 프로젝트 스키마/업서트 파이프라인 구현
3. 현재 앱에 SyncClient(read-only) 도입
4. 병행 운영(크롤링 + sync) 검증 기간 운영
5. 크롤링 경로 제거 및 완전 전환
---
작성자: KH.Choi
최종 수정: 2026-02-18
버전: 3.3
최종 수정: 2026-02-19
버전: 3.4

View File

@ -131,6 +131,13 @@
- **Docker**: 컨테이너화
- **CI/CD**: 자동 빌드 및 배포
### 아키텍처 전환 제안 (검토 필요)
- [ ] **크롤링 및 supabase DB에 upset 기능을 별도의 프로젝트로 구현 및 현재 프로젝트는 크롤링 제거 및 supabase DB 동기화(로컬DB캐시 및 supabase 동기화-realtime 사용)**
- 목적: 클라이언트(현재 프로젝트)와 수집/정제 파이프라인(별도 프로젝트) 분리
- 범위: 현재 앱은 로컬 캐시 + Supabase Realtime 구독 기반 조회/알림에 집중
- 선행조건: `docs/project_spec.md` 상세 로직/예외 정의, `docs/api_contract.md` 데이터 계약 확정
- 구현 제한: 사용자 승인 전까지 코드 변경 금지
### 🎯 자동 업데이트 시스템 [완료] ✅
**요청일**: 2026-02-18
**완료일**: 2026-02-18
@ -218,4 +225,4 @@
---
작성자: KH.Choi
최종 수정: 2026-02-18
최종 수정: 2026-02-19

View File

@ -0,0 +1,96 @@
import sys
import unittest
from pathlib import Path
from unittest.mock import patch
ROOT_DIR = Path(__file__).resolve().parents[1]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
from app.services.supabase_sync_service import SupabaseSyncService
class _DummyDB:
def __init__(self):
self.cursor = ""
self.saved = []
def get_sync_cursor(self, _key: str) -> str:
return self.cursor
def save_sync_cursor(self, _key: str, value: str):
self.cursor = value
def upsert_post(self, post_data: dict):
self.saved.append(post_data)
return True, False
class _DummyLogger:
def info(self, *_args, **_kwargs):
return None
class TestSupabaseSyncService(unittest.TestCase):
def test_sync_once_returns_config_error_without_credentials(self):
service = SupabaseSyncService(
settings={"sync": {"enabled": True, "supabase_url": "", "supabase_key": ""}},
db=_DummyDB(),
logger=_DummyLogger(),
)
result = service.sync_once()
self.assertFalse(result.success)
self.assertEqual(result.error_code, "ERR_SYNC_CONFIG")
def test_sync_once_upserts_rows_and_updates_cursor(self):
db = _DummyDB()
settings = {
"sync": {
"enabled": True,
"supabase_url": "https://example.supabase.co",
"supabase_key": "dummy-key",
"table": "posts",
"pull_batch_size": 100,
"timeout_sec": 5,
"use_update_connection": False,
"ssl_verify": True,
"ca_bundle_path": "",
}
}
class _Resp:
status_code = 200
def raise_for_status(self):
return None
def json(self):
return [
{
"id": "26021881833",
"title": "테스트",
"writer": "홍길동",
"date": "2026-02-19 10:00:00",
"department": "차량",
"is_public": 1,
"status": "접수",
"channel": "인터넷",
"updated_at": "2026-02-19T10:00:00Z",
}
]
service = SupabaseSyncService(settings=settings, db=db, logger=_DummyLogger())
with patch("app.services.supabase_sync_service.requests.get", return_value=_Resp()):
result = service.sync_once()
self.assertTrue(result.success)
self.assertEqual(result.pulled_count, 1)
self.assertEqual(result.upserted_count, 1)
self.assertEqual(len(db.saved), 1)
self.assertEqual(db.cursor, "2026-02-19T10:00:00Z")
if __name__ == "__main__":
unittest.main()