VOC_Monitor/app/utils/database.py

455 lines
17 KiB
Python

import sqlite3
from datetime import datetime
import os
from utils.path_utils import get_data_dir
from utils.logger import get_logger
class VOCDatabase:
"""
VOC 데이터를 저장하고 조회하는 SQLite 데이터베이스 관리 클래스.
이전 버전의 스키마를 유지하면서 새로운 기능을 통합합니다.
"""
def __init__(self, db_name="voc.db"):
self.logger = get_logger("Database")
# 데이터베이스 경로 설정 (패킹 후 데이터 소실 방지)
self.db_path = str(get_data_dir() / db_name)
# 디렉토리 생성 확인
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
# 연결 공유 (check_same_thread=False로 GUI/백그라운드 스레드 간 공유 허용)
try:
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self.logger.info(f"DB 연결 성공: {self.db_path}")
except Exception as e:
self.logger.critical(f"DB 연결 실패: {e}")
raise e
self._init_db()
def _init_db(self):
"""데이터베이스 테이블 초기화 및 스키마 업데이트(마이그레이션)"""
cur = self.conn.cursor()
# 1. 테이블 생성 (모든 레거시 필드 포함)
cur.execute("""
CREATE TABLE IF NOT EXISTS posts (
id TEXT PRIMARY KEY,
title TEXT,
writer TEXT,
date TEXT,
department TEXT,
is_public INTEGER, -- 0:비공개, 1:공개
status TEXT,
content TEXT,
answer TEXT,
is_related INTEGER DEFAULT 0,
-- 상세 필드 (레거시 호환)
station TEXT, -- 역명
channel TEXT, -- 접수채널
attachment TEXT, -- 고객첨부파일
voc_type TEXT, -- VOC유형
response_type TEXT, -- 응답구분
summary TEXT, -- 요약
-- 타임스탬프 (이전 명칭 유지)
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP,
checked_at TIMESTAMP -- 사용자 확인 시점
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS sync_state (
key TEXT PRIMARY KEY,
value TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 2. 마이그레이션: 컬럼 존재 여부 확인 및 추가
cur.execute("PRAGMA table_info(posts)")
existing_cols = [col[1] for col in cur.fetchall()]
# 새 버전에서 생성했던 컬럼명이 있다면 이전 명칭으로 마이그레이션 시도 (필요시)
if "crawled_at" in existing_cols and "created_at" not in existing_cols:
try: cur.execute("ALTER TABLE posts RENAME COLUMN crawled_at TO created_at")
except: pass
if "detail_crawled_at" in existing_cols and "updated_at" not in existing_cols:
try: cur.execute("ALTER TABLE posts RENAME COLUMN detail_crawled_at TO updated_at")
except: pass
# 모든 필수 컬럼이 있는지 확인 (누락된 경우 추가)
required_cols = {
"station": "TEXT",
"channel": "TEXT",
"attachment": "TEXT",
"voc_type": "TEXT",
"response_type": "TEXT",
"summary": "TEXT",
"checked_at": "TIMESTAMP",
"created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP",
"updated_at": "TIMESTAMP"
}
for col, col_type in required_cols.items():
if col not in existing_cols:
try:
cur.execute(f"ALTER TABLE posts ADD COLUMN {col} {col_type}")
except sqlite3.OperationalError:
pass
self.conn.commit()
def upsert_post(self, post_data):
"""기본 목록 정보를 저장하거나 업데이트 (is_new, is_updated 반환)"""
is_new = False
is_updated = False
now = datetime.now()
cur = self.conn.cursor()
# 1. 존재 여부 및 기존 상태 확인
cur.execute("SELECT status, title, department, is_public FROM posts WHERE id = ?", (post_data['id'],))
row = cur.fetchone()
if row is None:
# [신규]
cur.execute("""
INSERT INTO posts (
id, title, writer, date, department, is_public, status,
is_related, channel, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
post_data['id'],
post_data['title'],
post_data['writer'],
post_data.get('date', ''),
post_data['department'],
post_data['is_public'],
post_data['status'],
post_data.get('is_related', 0),
post_data.get('channel', ''),
now, now
))
is_new = True
else:
# [업데이트] 상태 변경 또는 제목 확장 확인
old_status, old_title, old_dept, old_public = row
# 변경 사항 체크
if (old_status != post_data['status'] or
old_dept != post_data['department'] or
old_public != post_data['is_public']):
is_updated = True
# 제목이 '...'으로 끝나는 기존 제목보다 더 본 정보가 들어오면 업데이트
if len(post_data['title']) > len(old_title) and not post_data['title'].endswith('...'):
is_updated = True
if is_updated:
cur.execute("""
UPDATE posts
SET title=?, writer=?, date=?, department=?, is_public=?, status=?,
is_related=?, channel=?, updated_at=?
WHERE id=?
""", (
post_data['title'],
post_data['writer'],
post_data.get('date', ''),
post_data['department'],
post_data['is_public'],
post_data['status'],
post_data.get('is_related', 0),
post_data.get('channel', ''),
now,
post_data['id']
))
else:
# 상태 변경 없어도 확인 시점 등은 업데이트 가능 (레거시 코드 로직 유지)
cur.execute("UPDATE posts SET updated_at=? WHERE id=?", (now, post_data['id']))
self.conn.commit()
return is_new, is_updated
def update_detail(self, voc_id, detail_data):
"""상세 페이지에서 추출한 모든 정보를 업데이트"""
cur = self.conn.cursor()
now = datetime.now()
cur.execute('''
UPDATE posts SET
title = ?,
content = ?,
date = ?,
station = ?,
channel = ?,
attachment = ?,
answer = ?,
voc_type = ?,
response_type = ?,
summary = ?,
writer = ?,
updated_at = ?
WHERE id = ?
''', (
detail_data.get('title'),
detail_data.get('content'),
detail_data.get('date'),
detail_data.get('station'),
detail_data.get('channel'),
detail_data.get('attachment'),
detail_data.get('answer'),
detail_data.get('voc_type'),
detail_data.get('response_type'),
detail_data.get('summary'),
detail_data.get('writer'),
now, voc_id
))
self.conn.commit()
def mark_as_checked(self, voc_id):
"""해당 게시글을 '확인함' 처리합니다."""
cur = self.conn.cursor()
cur.execute("UPDATE posts SET checked_at = CURRENT_TIMESTAMP WHERE id = ?", (voc_id,))
self.conn.commit()
def get_new_posts_since(self, timestamp_str, related_only=True):
"""특정 시점 이후에 수집/업데이트된 신규 게시글 조회"""
cur = self.conn.cursor()
# created_at 또는 updated_at 기준으로 조회
if related_only:
cur.execute("""
SELECT * FROM posts
WHERE is_related = 1
AND (created_at > ? OR updated_at > ?)
ORDER BY created_at DESC
""", (timestamp_str, timestamp_str))
else:
cur.execute("""
SELECT * FROM posts
WHERE (created_at > ? OR updated_at > ?)
ORDER BY created_at DESC
""", (timestamp_str, timestamp_str))
return cur.fetchall()
def get_unchecked_related_posts(self, related_only=True):
"""확인하지 않은 게시글 조회 (미확인 알림용)"""
cur = self.conn.cursor()
if related_only:
cur.execute("""
SELECT * FROM posts
WHERE is_related = 1
AND checked_at IS NULL
ORDER BY created_at DESC
""")
else:
cur.execute("""
SELECT * FROM posts
WHERE checked_at IS NULL
ORDER BY created_at DESC
""")
return cur.fetchall()
def get_posts_needing_detail(self, recheck_hours=3):
"""
상세 수집이 필요한 게시글 조회
조건:
1. 내용이 없거나 (content IS NULL OR content = '')
2. 또는 마지막 업데이트가 recheck_hours 이상 경과 (updated_at 시간 체크)
우선순위: is_related DESC, id DESC (최신글/관심글 우선)
Args:
recheck_hours: 재검사 간격(시간 단위). 기본값: 3시간
Returns:
list: (id, title, is_related) 튜플 리스트
"""
cur = self.conn.cursor()
# recheck 간격 계산
from datetime import datetime, timedelta
threshold_time = (datetime.now() - timedelta(hours=recheck_hours)).strftime("%Y-%m-%d %H:%M:%S")
cur.execute('''
SELECT id, title, is_related FROM posts
WHERE
(content IS NULL OR content = '')
OR (updated_at IS NOT NULL AND updated_at < ?)
ORDER BY is_related DESC, id DESC
LIMIT 10
''', (threshold_time,))
return cur.fetchall()
def get_all_posts(self, limit=500):
"""전체 목록 조회"""
cur = self.conn.cursor()
cur.execute("SELECT * FROM posts ORDER BY id DESC LIMIT ?", (limit,))
return cur.fetchall()
def get_post_by_id(self, voc_id):
"""특정 ID 조회"""
cur = self.conn.cursor()
cur.execute("SELECT * FROM posts WHERE id = ?", (voc_id,))
return cur.fetchone()
def close(self):
"""DB 연결 종료"""
if self.conn:
self.conn.close()
def get_sync_cursor(self, key: str) -> str:
"""동기화 커서 조회"""
cur = self.conn.cursor()
cur.execute("SELECT value FROM sync_state WHERE key = ?", (key,))
row = cur.fetchone()
return str(row["value"]) if row and row["value"] else ""
def save_sync_cursor(self, key: str, value: str):
"""동기화 커서 저장 (upsert)"""
cur = self.conn.cursor()
cur.execute(
"""
INSERT INTO sync_state (key, value, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(key) DO UPDATE SET
value=excluded.value,
updated_at=CURRENT_TIMESTAMP
""",
(key, value),
)
self.conn.commit()
# ========================================================================
# 통계 쿼리 메서드 (Statistics Queries)
# ========================================================================
def get_stats_by_period(self, start_date: str, end_date: str):
"""
기간별 VOC 발생 건수 조회
Args:
start_date (str): 시작일 (YYYY-MM-DD)
end_date (str): 종료일 (YYYY-MM-DD)
Returns:
list[dict]: 날짜별 건수 [{"day": "2026-01-01", "count": 5}, ...]
"""
cur = self.conn.cursor()
cur.execute("""
SELECT
DATE(date) as day,
COUNT(*) as count
FROM posts
WHERE date BETWEEN ? AND ?
GROUP BY DATE(date)
ORDER BY day
""", (start_date, end_date))
rows = cur.fetchall()
return [{"day": row["day"], "count": row["count"]} for row in rows]
def get_stats_by_department(self, start_date: str, end_date: str):
"""
부서별 VOC 분포 조회
Args:
start_date (str): 시작일 (YYYY-MM-DD)
end_date (str): 종료일 (YYYY-MM-DD)
Returns:
list[dict]: 부서별 건수 및 비율 [{"department": "차량", "count": 80, "percentage": 53.33}, ...]
"""
cur = self.conn.cursor()
cur.execute("""
SELECT
department,
COUNT(*) as count,
ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM posts WHERE date BETWEEN ? AND ?), 2) as percentage
FROM posts
WHERE date BETWEEN ? AND ?
GROUP BY department
ORDER BY count DESC
""", (start_date, end_date, start_date, end_date))
rows = cur.fetchall()
return [{"department": row["department"], "count": row["count"], "percentage": row["percentage"]} for row in rows]
def get_stats_by_status(self, start_date: str, end_date: str):
"""
상태별 VOC 현황 조회
Args:
start_date (str): 시작일 (YYYY-MM-DD)
end_date (str): 종료일 (YYYY-MM-DD)
Returns:
list[dict]: 상태별 건수 및 비율 [{"status": "접수", "count": 30, "percentage": 20.0}, ...]
"""
cur = self.conn.cursor()
cur.execute("""
SELECT
status,
COUNT(*) as count,
ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM posts WHERE date BETWEEN ? AND ?), 2) as percentage
FROM posts
WHERE date BETWEEN ? AND ?
GROUP BY status
ORDER BY count DESC
""", (start_date, end_date, start_date, end_date))
rows = cur.fetchall()
return [{"status": row["status"], "count": row["count"], "percentage": row["percentage"]} for row in rows]
def get_all_texts_in_period(self, start_date: str, end_date: str):
"""
키워드 분석용 텍스트 조회 (제목 + 내용)
Args:
start_date (str): 시작일 (YYYY-MM-DD)
end_date (str): 종료일 (YYYY-MM-DD)
Returns:
list[str]: 텍스트 리스트 (제목 + 내용)
"""
cur = self.conn.cursor()
cur.execute("""
SELECT title, content
FROM posts
WHERE date BETWEEN ? AND ?
""", (start_date, end_date))
rows = cur.fetchall()
texts = []
for row in rows:
text = row["title"] or ""
if row["content"]:
text += " " + row["content"]
texts.append(text)
return texts
def create_statistics_indexes(self):
"""
통계 쿼리 성능 향상을 위한 인덱스 생성
이미 존재하는 인덱스는 무시됩니다.
"""
cur = self.conn.cursor()
try:
cur.execute("CREATE INDEX IF NOT EXISTS idx_date ON posts(date)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_department ON posts(department)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_status ON posts(status)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_date_department ON posts(date, department)")
self.conn.commit()
self.logger.info("통계 인덱스 생성 완료")
except Exception as e:
self.logger.error(f"인덱스 생성 실패: {e}")