405 lines
15 KiB
Python
405 lines
15 KiB
Python
import sqlite3
|
|
from datetime import datetime
|
|
import os
|
|
from utils.path_utils import get_data_dir
|
|
from utils.logger import get_logger
|
|
|
|
class VOCDatabase:
|
|
"""
|
|
VOC 데이터를 저장하고 조회하는 SQLite 데이터베이스 관리 클래스.
|
|
이전 버전의 스키마를 유지하면서 새로운 기능을 통합합니다.
|
|
"""
|
|
|
|
def __init__(self, db_name="voc.db"):
|
|
self.logger = get_logger("Database")
|
|
# 데이터베이스 경로 설정 (패킹 후 데이터 소실 방지)
|
|
self.db_path = str(get_data_dir() / db_name)
|
|
|
|
# 디렉토리 생성 확인
|
|
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
|
|
|
|
# 연결 공유 (check_same_thread=False로 GUI/백그라운드 스레드 간 공유 허용)
|
|
try:
|
|
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
self.conn.row_factory = sqlite3.Row
|
|
self.logger.info(f"DB 연결 성공: {self.db_path}")
|
|
except Exception as e:
|
|
self.logger.critical(f"DB 연결 실패: {e}")
|
|
raise e
|
|
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
"""데이터베이스 테이블 초기화 및 스키마 업데이트(마이그레이션)"""
|
|
cur = self.conn.cursor()
|
|
|
|
# 1. 테이블 생성 (모든 레거시 필드 포함)
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS posts (
|
|
id TEXT PRIMARY KEY,
|
|
title TEXT,
|
|
writer TEXT,
|
|
date TEXT,
|
|
department TEXT,
|
|
is_public INTEGER, -- 0:비공개, 1:공개
|
|
status TEXT,
|
|
content TEXT,
|
|
answer TEXT,
|
|
is_related INTEGER DEFAULT 0,
|
|
|
|
-- 상세 필드 (레거시 호환)
|
|
station TEXT, -- 역명
|
|
channel TEXT, -- 접수채널
|
|
attachment TEXT, -- 고객첨부파일
|
|
voc_type TEXT, -- VOC유형
|
|
response_type TEXT, -- 응답구분
|
|
summary TEXT, -- 요약
|
|
|
|
-- 타임스탬프 (이전 명칭 유지)
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP,
|
|
checked_at TIMESTAMP -- 사용자 확인 시점
|
|
)
|
|
""")
|
|
|
|
# 2. 마이그레이션: 컬럼 존재 여부 확인 및 추가
|
|
cur.execute("PRAGMA table_info(posts)")
|
|
existing_cols = [col[1] for col in cur.fetchall()]
|
|
|
|
# 새 버전에서 생성했던 컬럼명이 있다면 이전 명칭으로 마이그레이션 시도 (필요시)
|
|
if "crawled_at" in existing_cols and "created_at" not in existing_cols:
|
|
try: cur.execute("ALTER TABLE posts RENAME COLUMN crawled_at TO created_at")
|
|
except: pass
|
|
if "detail_crawled_at" in existing_cols and "updated_at" not in existing_cols:
|
|
try: cur.execute("ALTER TABLE posts RENAME COLUMN detail_crawled_at TO updated_at")
|
|
except: pass
|
|
|
|
# 모든 필수 컬럼이 있는지 확인 (누락된 경우 추가)
|
|
required_cols = {
|
|
"station": "TEXT",
|
|
"channel": "TEXT",
|
|
"attachment": "TEXT",
|
|
"voc_type": "TEXT",
|
|
"response_type": "TEXT",
|
|
"summary": "TEXT",
|
|
"checked_at": "TIMESTAMP",
|
|
"created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP",
|
|
"updated_at": "TIMESTAMP"
|
|
}
|
|
|
|
for col, col_type in required_cols.items():
|
|
if col not in existing_cols:
|
|
try:
|
|
cur.execute(f"ALTER TABLE posts ADD COLUMN {col} {col_type}")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
self.conn.commit()
|
|
|
|
def upsert_post(self, post_data):
|
|
"""기본 목록 정보를 저장하거나 업데이트 (is_new, is_updated 반환)"""
|
|
is_new = False
|
|
is_updated = False
|
|
now = datetime.now()
|
|
|
|
cur = self.conn.cursor()
|
|
|
|
# 1. 존재 여부 및 기존 상태 확인
|
|
cur.execute("SELECT status, title, department, is_public FROM posts WHERE id = ?", (post_data['id'],))
|
|
row = cur.fetchone()
|
|
|
|
if row is None:
|
|
# [신규]
|
|
cur.execute("""
|
|
INSERT INTO posts (
|
|
id, title, writer, date, department, is_public, status,
|
|
is_related, channel, created_at, updated_at
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
post_data['id'],
|
|
post_data['title'],
|
|
post_data['writer'],
|
|
post_data.get('date', ''),
|
|
post_data['department'],
|
|
post_data['is_public'],
|
|
post_data['status'],
|
|
post_data.get('is_related', 0),
|
|
post_data.get('channel', ''),
|
|
now, now
|
|
))
|
|
is_new = True
|
|
else:
|
|
# [업데이트] 상태 변경 또는 제목 확장 확인
|
|
old_status, old_title, old_dept, old_public = row
|
|
|
|
# 변경 사항 체크
|
|
if (old_status != post_data['status'] or
|
|
old_dept != post_data['department'] or
|
|
old_public != post_data['is_public']):
|
|
is_updated = True
|
|
|
|
# 제목이 '...'으로 끝나는 기존 제목보다 더 본 정보가 들어오면 업데이트
|
|
if len(post_data['title']) > len(old_title) and not post_data['title'].endswith('...'):
|
|
is_updated = True
|
|
|
|
if is_updated:
|
|
cur.execute("""
|
|
UPDATE posts
|
|
SET title=?, writer=?, date=?, department=?, is_public=?, status=?,
|
|
is_related=?, channel=?, updated_at=?
|
|
WHERE id=?
|
|
""", (
|
|
post_data['title'],
|
|
post_data['writer'],
|
|
post_data.get('date', ''),
|
|
post_data['department'],
|
|
post_data['is_public'],
|
|
post_data['status'],
|
|
post_data.get('is_related', 0),
|
|
post_data.get('channel', ''),
|
|
now,
|
|
post_data['id']
|
|
))
|
|
else:
|
|
# 상태 변경 없어도 확인 시점 등은 업데이트 가능 (레거시 코드 로직 유지)
|
|
cur.execute("UPDATE posts SET updated_at=? WHERE id=?", (now, post_data['id']))
|
|
|
|
self.conn.commit()
|
|
return is_new, is_updated
|
|
|
|
def update_detail(self, voc_id, detail_data):
|
|
"""상세 페이지에서 추출한 모든 정보를 업데이트"""
|
|
cur = self.conn.cursor()
|
|
now = datetime.now()
|
|
|
|
cur.execute('''
|
|
UPDATE posts SET
|
|
title = ?,
|
|
content = ?,
|
|
date = ?,
|
|
station = ?,
|
|
channel = ?,
|
|
attachment = ?,
|
|
answer = ?,
|
|
voc_type = ?,
|
|
response_type = ?,
|
|
summary = ?,
|
|
writer = ?,
|
|
updated_at = ?
|
|
WHERE id = ?
|
|
''', (
|
|
detail_data.get('title'),
|
|
detail_data.get('content'),
|
|
detail_data.get('date'),
|
|
detail_data.get('station'),
|
|
detail_data.get('channel'),
|
|
detail_data.get('attachment'),
|
|
detail_data.get('answer'),
|
|
detail_data.get('voc_type'),
|
|
detail_data.get('response_type'),
|
|
detail_data.get('summary'),
|
|
detail_data.get('writer'),
|
|
now, voc_id
|
|
))
|
|
self.conn.commit()
|
|
|
|
def mark_as_checked(self, voc_id):
|
|
"""해당 게시글을 '확인함' 처리합니다."""
|
|
cur = self.conn.cursor()
|
|
cur.execute("UPDATE posts SET checked_at = CURRENT_TIMESTAMP WHERE id = ?", (voc_id,))
|
|
self.conn.commit()
|
|
|
|
def get_new_posts_since(self, timestamp_str, related_only=True):
|
|
"""특정 시점 이후에 수집/업데이트된 신규 게시글 조회"""
|
|
cur = self.conn.cursor()
|
|
# created_at 또는 updated_at 기준으로 조회
|
|
if related_only:
|
|
cur.execute("""
|
|
SELECT * FROM posts
|
|
WHERE is_related = 1
|
|
AND (created_at > ? OR updated_at > ?)
|
|
ORDER BY created_at DESC
|
|
""", (timestamp_str, timestamp_str))
|
|
else:
|
|
cur.execute("""
|
|
SELECT * FROM posts
|
|
WHERE (created_at > ? OR updated_at > ?)
|
|
ORDER BY created_at DESC
|
|
""", (timestamp_str, timestamp_str))
|
|
return cur.fetchall()
|
|
|
|
def get_unchecked_related_posts(self, related_only=True):
|
|
"""확인하지 않은 게시글 조회 (미확인 알림용)"""
|
|
cur = self.conn.cursor()
|
|
if related_only:
|
|
cur.execute("""
|
|
SELECT * FROM posts
|
|
WHERE is_related = 1
|
|
AND checked_at IS NULL
|
|
ORDER BY created_at DESC
|
|
""")
|
|
else:
|
|
cur.execute("""
|
|
SELECT * FROM posts
|
|
WHERE checked_at IS NULL
|
|
ORDER BY created_at DESC
|
|
""")
|
|
return cur.fetchall()
|
|
|
|
def get_posts_needing_detail(self, recheck_hours=3):
|
|
"""상세 수집이 필요한 게시글 조회 (내용이 없거나 처리중인 최신글)"""
|
|
cur = self.conn.cursor()
|
|
# 내용이 없고 공개된 것 우선
|
|
cur.execute('''
|
|
SELECT id, title, is_related FROM posts
|
|
WHERE content IS NULL OR content = ''
|
|
ORDER BY is_public DESC, id DESC
|
|
LIMIT 10
|
|
''')
|
|
return cur.fetchall()
|
|
|
|
def get_all_posts(self, limit=500):
|
|
"""전체 목록 조회"""
|
|
cur = self.conn.cursor()
|
|
cur.execute("SELECT * FROM posts ORDER BY id DESC LIMIT ?", (limit,))
|
|
return cur.fetchall()
|
|
|
|
def get_post_by_id(self, voc_id):
|
|
"""특정 ID 조회"""
|
|
cur = self.conn.cursor()
|
|
cur.execute("SELECT * FROM posts WHERE id = ?", (voc_id,))
|
|
return cur.fetchone()
|
|
|
|
def close(self):
|
|
"""DB 연결 종료"""
|
|
if self.conn:
|
|
self.conn.close()
|
|
|
|
# ========================================================================
|
|
# 통계 쿼리 메서드 (Statistics Queries)
|
|
# ========================================================================
|
|
|
|
def get_stats_by_period(self, start_date: str, end_date: str):
|
|
"""
|
|
기간별 VOC 발생 건수 조회
|
|
|
|
Args:
|
|
start_date (str): 시작일 (YYYY-MM-DD)
|
|
end_date (str): 종료일 (YYYY-MM-DD)
|
|
|
|
Returns:
|
|
list[dict]: 날짜별 건수 [{"day": "2026-01-01", "count": 5}, ...]
|
|
"""
|
|
cur = self.conn.cursor()
|
|
cur.execute("""
|
|
SELECT
|
|
DATE(date) as day,
|
|
COUNT(*) as count
|
|
FROM posts
|
|
WHERE date BETWEEN ? AND ?
|
|
GROUP BY DATE(date)
|
|
ORDER BY day
|
|
""", (start_date, end_date))
|
|
|
|
rows = cur.fetchall()
|
|
return [{"day": row["day"], "count": row["count"]} for row in rows]
|
|
|
|
def get_stats_by_department(self, start_date: str, end_date: str):
|
|
"""
|
|
부서별 VOC 분포 조회
|
|
|
|
Args:
|
|
start_date (str): 시작일 (YYYY-MM-DD)
|
|
end_date (str): 종료일 (YYYY-MM-DD)
|
|
|
|
Returns:
|
|
list[dict]: 부서별 건수 및 비율 [{"department": "차량", "count": 80, "percentage": 53.33}, ...]
|
|
"""
|
|
cur = self.conn.cursor()
|
|
cur.execute("""
|
|
SELECT
|
|
department,
|
|
COUNT(*) as count,
|
|
ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM posts WHERE date BETWEEN ? AND ?), 2) as percentage
|
|
FROM posts
|
|
WHERE date BETWEEN ? AND ?
|
|
GROUP BY department
|
|
ORDER BY count DESC
|
|
""", (start_date, end_date, start_date, end_date))
|
|
|
|
rows = cur.fetchall()
|
|
return [{"department": row["department"], "count": row["count"], "percentage": row["percentage"]} for row in rows]
|
|
|
|
def get_stats_by_status(self, start_date: str, end_date: str):
|
|
"""
|
|
상태별 VOC 현황 조회
|
|
|
|
Args:
|
|
start_date (str): 시작일 (YYYY-MM-DD)
|
|
end_date (str): 종료일 (YYYY-MM-DD)
|
|
|
|
Returns:
|
|
list[dict]: 상태별 건수 및 비율 [{"status": "접수", "count": 30, "percentage": 20.0}, ...]
|
|
"""
|
|
cur = self.conn.cursor()
|
|
cur.execute("""
|
|
SELECT
|
|
status,
|
|
COUNT(*) as count,
|
|
ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM posts WHERE date BETWEEN ? AND ?), 2) as percentage
|
|
FROM posts
|
|
WHERE date BETWEEN ? AND ?
|
|
GROUP BY status
|
|
ORDER BY count DESC
|
|
""", (start_date, end_date, start_date, end_date))
|
|
|
|
rows = cur.fetchall()
|
|
return [{"status": row["status"], "count": row["count"], "percentage": row["percentage"]} for row in rows]
|
|
|
|
def get_all_texts_in_period(self, start_date: str, end_date: str):
|
|
"""
|
|
키워드 분석용 텍스트 조회 (제목 + 내용)
|
|
|
|
Args:
|
|
start_date (str): 시작일 (YYYY-MM-DD)
|
|
end_date (str): 종료일 (YYYY-MM-DD)
|
|
|
|
Returns:
|
|
list[str]: 텍스트 리스트 (제목 + 내용)
|
|
"""
|
|
cur = self.conn.cursor()
|
|
cur.execute("""
|
|
SELECT title, content
|
|
FROM posts
|
|
WHERE date BETWEEN ? AND ?
|
|
""", (start_date, end_date))
|
|
|
|
rows = cur.fetchall()
|
|
texts = []
|
|
for row in rows:
|
|
text = row["title"] or ""
|
|
if row["content"]:
|
|
text += " " + row["content"]
|
|
texts.append(text)
|
|
|
|
return texts
|
|
|
|
def create_statistics_indexes(self):
|
|
"""
|
|
통계 쿼리 성능 향상을 위한 인덱스 생성
|
|
|
|
이미 존재하는 인덱스는 무시됩니다.
|
|
"""
|
|
cur = self.conn.cursor()
|
|
|
|
try:
|
|
cur.execute("CREATE INDEX IF NOT EXISTS idx_date ON posts(date)")
|
|
cur.execute("CREATE INDEX IF NOT EXISTS idx_department ON posts(department)")
|
|
cur.execute("CREATE INDEX IF NOT EXISTS idx_status ON posts(status)")
|
|
cur.execute("CREATE INDEX IF NOT EXISTS idx_date_department ON posts(date, department)")
|
|
self.conn.commit()
|
|
self.logger.info("통계 인덱스 생성 완료")
|
|
except Exception as e:
|
|
self.logger.error(f"인덱스 생성 실패: {e}")
|