VOC_Monitor/app/controllers/scheduler_manager.py

561 lines
22 KiB
Python

"""
스케줄링 관리자
백그라운드 작업 스케줄링을 전담하는 모듈입니다.
크롤링 주기, 알림 체크 주기 등을 관리하며,
schedule 라이브러리를 사용하여 주기적 작업을 실행합니다.
주요 기능:
- 크롤링 사이클 실행
- DB 체크 사이클 실행
- 스케줄 업데이트 (설정 변경 시)
- 스케줄러 루프 관리
의존성:
- schedule: 주기적 작업 스케줄링
- threading: 백그라운드 스레드 실행
- VOCScraper: 웹 크롤링
- VOCDatabase: 데이터베이스 작업
작성자: KH.Choi
최종 수정: 2026-02-17
버전: 1.0 (Controller에서 분리)
"""
import time
import threading
import schedule
from datetime import datetime, timedelta
from typing import Callable, Optional
from core.exceptions import (
ScraperError,
LoginFailedError,
SessionExpiredError,
DatabaseError
)
from utils.logger import get_logger
class SchedulerManager:
"""
스케줄링 관리자
백그라운드에서 주기적으로 실행되는 작업들을 관리합니다.
크롤링, DB 체크 등의 작업을 설정된 주기에 따라 자동 실행합니다.
Attributes:
logger: 로거 인스턴스
settings (dict): 설정 정보
model (VOCScraper): 크롤러 인스턴스
db (VOCDatabase): 데이터베이스 인스턴스
running (bool): 스케줄러 실행 상태
last_check_time (str): 마지막 DB 체크 시간
is_related_callback (Callable): 관심글 판별 콜백
notify_callback (Callable): 알림 발생 콜백
주요 메서드:
start: 스케줄러 시작
stop: 스케줄러 중지
update_schedule: 스케줄 재설정
run_crawling_cycle: 크롤링 사이클 실행
run_db_check_cycle: DB 체크 사이클 실행
사용 예시:
>>> scheduler = SchedulerManager(settings, model, db, logger)
>>> scheduler.set_callbacks(
... is_related_func=controller.is_related_post,
... notify_func=controller.show_popup_notification
... )
>>> scheduler.start()
"""
def __init__(self, settings: dict, model, db, logger=None):
"""
스케줄러 초기화
Args:
settings: 설정 딕셔너리
model: VOCScraper 인스턴스
db: VOCDatabase 인스턴스
logger: 로거 인스턴스 (선택)
"""
self.logger = logger or get_logger("SchedulerManager")
self.settings = settings
self.model = model
self.db = db
self.running = False
# 마지막 DB 체크 시간 (초기값: 현재 - 1일)
self.last_check_time = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S")
# 영속화된 체크 시간 로드
self._load_last_check_time()
# 중복 알림 방지용 세트
self.notified_post_ids: set = set()
# 콜백 함수 (Controller에서 주입)
self.is_related_callback: Optional[Callable] = None
self.notify_callback: Optional[Callable] = None
def set_callbacks(self, is_related_func: Callable, notify_func: Callable):
"""
콜백 함수 설정 (의존성 주입)
Controller의 메서드를 콜백으로 등록합니다.
Args:
is_related_func: 관심글 판별 함수 (title, dept) -> int
notify_func: 알림 발생 함수 (title, msg, voc_id) -> None
"""
self.is_related_callback = is_related_func
self.notify_callback = notify_func
def start(self):
"""
스케줄러 시작
백그라운드 스레드에서 스케줄러 루프를 실행합니다.
초기 스케줄을 설정하고, 첫 크롤링을 즉시 실행합니다.
"""
if self.running:
self.logger.warning("스케줄러가 이미 실행 중입니다.")
return
self.running = True
self.update_schedule()
# 백그라운드 스레드에서 스케줄러 루프 실행
threading.Thread(target=self._scheduler_loop, daemon=True).start()
# 첫 크롤링 즉시 실행
threading.Thread(target=self.run_crawling_cycle, daemon=True).start()
self.logger.info("스케줄러 시작됨")
def stop(self):
"""스케줄러 중지"""
self.running = False
schedule.clear()
self.logger.info("스케줄러 중지됨")
def update_schedule(self):
"""
스케줄 업데이트
설정 변경 사항을 반영하여 스케줄을 재설정합니다.
기존 스케줄을 모두 삭제하고 새로 등록합니다.
"""
schedule.clear()
# 크롤링 주기 (기본: 10분)
crawl_interval = self.settings.get('crawling', {}).get('interval_minutes', 10)
schedule.every(crawl_interval).minutes.do(self.run_crawling_cycle)
# DB 체크 주기 (설정값, 기본: 5분)
noti_interval = self.settings.get('noti', {}).get('db_check_interval_minutes', 5)
schedule.every(noti_interval).minutes.do(self.run_db_check_cycle)
# 미확인 글 체크 주기 (설정값, 기본: 30분)
unchecked_interval = self.settings.get('noti', {}).get('unchecked_check_interval_minutes', 30)
schedule.every(unchecked_interval).minutes.do(self.run_unchecked_check_cycle)
self.logger.info(
f"스케줄 업데이트됨: 크롤링 {crawl_interval}분 / 신규 알림 {noti_interval}분 / 미확인 체크 {unchecked_interval}"
)
def _scheduler_loop(self):
"""
스케줄러 루프 (내부 메서드)
백그라운드 스레드에서 실행되며,
등록된 스케줄을 주기적으로 확인하고 실행합니다.
"""
while self.running:
schedule.run_pending()
time.sleep(1)
def run_crawling_cycle(self):
"""
크롤링 사이클 실행
웹사이트에서 VOC 데이터를 수집하여 DB에 저장합니다.
로그인 세션 관리, 목록 수집, 상세 내용 수집을 순차적으로 수행합니다.
에러 처리:
- LoginFailedError: 로그인 실패 시 다음 사이클에 재시도
- SessionExpiredError: 세션 만료 시 재로그인 후 재시도
- DatabaseError: DB 오류 시 로그 기록 후 계속
"""
self.logger.info("크롤링 사이클 시작...")
try:
# 1. 로그인 상태 확인 및 재로그인
if not self.model.is_logged_in():
self.logger.info("로그인 세션 만료. 재로그인 시도 중...")
login_success = self.model.login(
self.settings['login']['id'],
self.settings['login']['pw']
)
if not login_success:
raise LoginFailedError("재로그인 실패. 다음 사이클에 재시도합니다.")
# 2. 목록 수집 (Metadata)
max_pg = self.settings['crawling'].get('max_pages', 2)
keywords = self.settings['crawling']['keywords']
target_depts = self.settings['crawling']['target_depts']
list_result = self.model.fetch_list_pages(max_pg, keywords, target_depts)
# 세션 만료 시 재로그인 후 재시도
if list_result.get('status') == 'session_expired':
self.logger.warning("목록 수집 중 세션 만료. 재로그인 후 재시도...")
self.model.login(self.settings['login']['id'], self.settings['login']['pw'])
list_result = self.model.fetch_list_pages(max_pg, keywords, target_depts)
# 3. 수집된 데이터 처리
if list_result.get('status') == 'success':
self._process_collected_posts(list_result['data'])
else:
self.logger.error(f"목록 수집 실패: {list_result.get('error', '알 수 없는 오류')}")
# 4. 상세 내용 수집 (조건부)
self._collect_detail_content()
self.logger.info("크롤링 사이클 완료.")
except LoginFailedError as e:
self.logger.error(f"로그인 실패: {e.message}")
except SessionExpiredError as e:
self.logger.warning(f"세션 만료: {e.message}")
except DatabaseError as e:
self.logger.error(f"DB 오류: {e.message}")
except Exception as e:
self.logger.error(f"크롤링 중 예상치 못한 오류: {e}", exc_info=True)
def _process_collected_posts(self, posts_data: list):
"""
수집된 게시글 데이터 처리 (내부 메서드)
Pydantic 모델로 검증 후 DB에 저장합니다.
Args:
posts_data: 크롤링 결과 딕셔너리 리스트
"""
from models.model import VOCPost
from pydantic import ValidationError
for post_dict in posts_data:
try:
# Pydantic 모델을 통한 데이터 검증
post_obj = VOCPost(**post_dict)
except ValidationError as e:
self.logger.error(
f"데이터 검증 실패 (ID: {post_dict.get('id', 'Unknown')}): {e}"
)
continue
# 관심글 여부 판별 (Controller 콜백 사용)
if self.is_related_callback:
post_obj.is_related = self.is_related_callback(post_obj.title, post_obj.department)
# DB 저장 (Pydantic v2)
save_data = post_obj.model_dump()
try:
self.db.upsert_post(save_data)
except DatabaseError as e:
self.logger.error(f"DB 저장 실패 (ID: {post_obj.id}): {e.message}")
def _collect_detail_content(self):
"""
상세 내용 수집 (내부 메서드)
내용이 없거나 최근 글인 경우 상세 파싱을 시도합니다.
"""
recheck_hours = self.settings['crawling']['recheck_hours']
targets = self.db.get_posts_needing_detail(recheck_hours)
for tgt in targets:
voc_id, title, is_related = tgt
# 비공개 글 체크
post_check = self.db.get_post_by_id(voc_id)
if post_check and post_check['is_public'] == 0:
continue
# 상세 내용 수집
detail_data = self.model.fetch_detail_content(voc_id)
if detail_data:
try:
self.db.update_detail(voc_id, detail_data)
if is_related:
self.logger.info(f"상세 내용 업데이트됨: {title}")
except DatabaseError as e:
self.logger.error(f"상세 내용 저장 실패 (ID: {voc_id}): {e.message}")
time.sleep(1.2) # 서버 부하 방지용 딜레이
def run_db_check_cycle(self):
"""
DB 체크 사이클 실행
마지막 체크 이후 신규 데이터를 조회하여 알림을 발생시킵니다.
모든 관심글(is_related=1)에 대해 알림을 볃니다.
"""
self.logger.info("신규 DB 데이터 확인 중...")
try:
# 1. 마지막 체크 이후 수집된 신규글 조회
noti_settings = self.settings.get('noti', {})
use_related_filter = noti_settings.get('use_related_filter', True)
raw_posts = self.db.get_new_posts_since(
self.last_check_time,
related_only=use_related_filter
)
self.last_check_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self._save_last_check_time() # 영속화
if not raw_posts:
mode_text = "관심글" if use_related_filter else "전체글"
self.logger.debug(f"신규 알림 대상 없음: 마지막 체크 이후 {mode_text} 0건")
return
# sqlite3.Row 객체를 dict로 변환
new_posts = [dict(row) for row in raw_posts]
# 2. 알림 설정 로드
use_sound = noti_settings.get('sound', True)
# 3. 중복 알림 방지 필터링
valid_posts = self._filter_duplicate_notifications(new_posts)
if not valid_posts:
self.logger.debug("신규 알림 대상 없음: 중복 필터링 후 0건")
return
self.logger.info(
f"신규 알림 발송: {len(valid_posts)}건 / 필터모드={'관심글' if use_related_filter else '전체글'} "
f"(대상 ID: {[p.get('id') for p in valid_posts]})"
)
# 4. 소리 재생 (설정 시)
if use_sound:
self._play_notification_sound()
# 5. 알림창 표시 (Controller 콜백 사용)
if self.notify_callback:
self._show_notifications(valid_posts)
except DatabaseError as e:
self.logger.error(f"DB 체크 중 오류: {e.message}")
except Exception as e:
self.logger.error(f"DB 체크 중 예상치 못한 오류: {e}", exc_info=True)
def _filter_duplicate_notifications(self, posts: list) -> list:
"""
중복 알림 방지 필터링 (낺부 메서드)
이미 알림을 볂은 게시글은 제외합니다.
Args:
posts: 게시글 리스트
Returns:
list: 중복 제거된 게시글 리스트
"""
valid_posts = []
for post in posts:
post_id = post.get('id')
# 이미 알림 볂은 글인지 체크
if post_id not in self.notified_post_ids:
valid_posts.append(post)
self.notified_post_ids.add(post_id)
# 메모리 관리를 위해 최근 1000개만 유지
if len(self.notified_post_ids) > 1000:
self.notified_post_ids.pop()
return valid_posts
def _load_last_check_time(self):
"""마지막 체크 시간 로드 (영속화)"""
try:
from utils.path_utils import get_data_dir
import json
state_file = get_data_dir() / "scheduler_state.json"
if state_file.exists():
with open(state_file, 'r', encoding='utf-8') as f:
state = json.load(f)
saved_time = state.get('last_check_time', '')
if saved_time:
self.last_check_time = saved_time
self.logger.info(f"마지막 체크 시간 복원: {saved_time}")
except Exception as e:
self.logger.warning(f"마지막 체크 시간 로드 실패: {e}")
def _save_last_check_time(self):
"""마지막 체크 시간 저장 (영속화)"""
try:
from utils.path_utils import get_data_dir
import json
state_file = get_data_dir() / "scheduler_state.json"
state = {'last_check_time': self.last_check_time}
with open(state_file, 'w', encoding='utf-8') as f:
json.dump(state, f, ensure_ascii=False)
except Exception as e:
self.logger.warning(f"마지막 체크 시간 저장 실패: {e}")
def _play_notification_sound(self):
"""알림 소리 재생 (내부 메서드)"""
try:
import winsound
winsound.MessageBeep()
except Exception as e:
self.logger.warning(f"알림 소리 재생 실패: {e}")
def run_unchecked_check_cycle(self):
"""
미확인 글 체크 사이클 실행
확인하지 않은 관심글을 주기적으로 체크하여 알림을 발생시킵니다.
설정값(`unchecked_check_interval_minutes`)마다 실행됩니다.
"""
try:
noti_settings = self.settings.get('noti', {})
use_related_filter = noti_settings.get('use_related_filter', True)
use_delay_filter = noti_settings.get('unchecked_delay_enabled', True)
unchecked_interval = noti_settings.get('unchecked_check_interval_minutes', 30)
self.logger.info(
f"미확인 글 DB 확인 중... (주기 {unchecked_interval}분 / 대상 {'관심글' if use_related_filter else '전체글'} "
f"/ 30분 지연조건 {'ON' if use_delay_filter else 'OFF'})"
)
# 1. 확인하지 않은 게시글 조회
unchecked_posts = self.db.get_unchecked_related_posts(related_only=use_related_filter)
if not unchecked_posts:
mode_text = "관심글" if use_related_filter else "전체글"
self.logger.debug(f"미확인 알림 대상 없음: {mode_text} 미확인 0건")
return
# sqlite3.Row 객체를 dict로 변환
posts = [dict(row) for row in unchecked_posts]
# 2. 30분 경과 조건(ON/OFF)에 따라 필터링
from datetime import datetime, timedelta
now = datetime.now()
valid_posts = []
if use_delay_filter:
for post in posts:
created_at = post.get('created_at', '')
if created_at:
try:
post_time = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S")
# 30분 이상 지난 글만 알림
if now - post_time >= timedelta(minutes=30):
valid_posts.append(post)
except Exception:
# 날짜 파싱 실패 시 포함
valid_posts.append(post)
else:
# 생성 시간이 없으면 포함
valid_posts.append(post)
else:
# 지연 조건 OFF: 미확인 글 전체 알림
valid_posts = posts
if not valid_posts:
self.logger.debug("미확인 알림 대상 없음: 조건 필터링 후 0건")
return
self.logger.info(
f"미확인 알림 발송: 원본 {len(posts)}건 중 {len(valid_posts)}건 (대상 ID: {[p.get('id') for p in valid_posts]})"
)
# 3. 알림 설정 로드
use_sound = noti_settings.get('sound', True)
# 4. 소리 재생 (설정 시) - 미확인 알림은 부드러운 알림
if use_sound:
self._play_notification_sound()
# 5. 알림창 표시
if self.notify_callback:
self._show_unchecked_notifications(valid_posts)
except DatabaseError as e:
self.logger.error(f"미확인 글 체크 중 오류: {e.message}")
except Exception as e:
self.logger.error(f"미확인 글 체크 중 예상치 못한 오류: {e}", exc_info=True)
def _show_unchecked_notifications(self, posts: list):
"""
미확인 글 알림창 표시 (낺부 메서드)
Args:
posts: 미확인 게시글 리스트
"""
if not self.notify_callback:
return
if len(posts) == 1:
# 단건 알림
post = posts[0]
title = f"⚠️ 미확인 VOC [{post['department']}]"
use_delay_filter = self.settings.get('noti', {}).get('unchecked_delay_enabled', True)
suffix = "(30분 이상 미확인)" if use_delay_filter else "(미확인 글)"
msg = f"{post['title']}\n\n{suffix}"
self.notify_callback(title, msg, post['id'])
else:
# 다건 알림
summary_title = f"⚠️ 미확인 VOC {len(posts)}"
lines = ["다음 글들을 아직 확인하지 않았습니다:"]
for p in posts[:5]: # 최대 5개만 표시
time_val = p['date'].split(' ')[1][:5] if ' ' in p['date'] else "시간미상"
lines.append(f"• [{time_val}] {p['title']}")
if len(posts) > 5:
lines.append(f"... 외 {len(posts) - 5}")
combined_msg = "\n".join(lines)
self.logger.debug(
f"미확인 다건 알림 내용: {[p.get('title', '') for p in posts[:5]]}"
)
self.notify_callback(summary_title, combined_msg, None)
def _show_notifications(self, posts: list):
"""
알림창 표시 (낺부 메서드)
1건이면 단건 알림, 다건이면 요약 알림을 표시합니다.
Args:
posts: 알림 대상 게시글 리스트
"""
if not self.notify_callback:
return
if len(posts) == 1:
# 단건 알림
post = posts[0]
title = f"신규 VOC [{post['department']}]"
msg = post['title']
self.notify_callback(title, msg, post['id'])
else:
# 다건 알림 (리스트 형태)
summary_title = f"신규 VOC {len(posts)}건 발생"
lines = []
for p in posts:
# 시간 정보 추출 (YYYY-MM-DD HH:MM:SS -> HH:MM)
time_val = p['date'].split(' ')[1][:5] if ' ' in p['date'] else "시간미상"
lines.append(f"• [{time_val}] {p['title']}")
combined_msg = "\n".join(lines)
self.notify_callback(summary_title, combined_msg, None)