HUTAMS_AUDIO/app/main.py

565 lines
26 KiB
Python

from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, UploadFile, File, Form, HTTPException, BackgroundTasks, Request, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import shutil
import tempfile
import os
import asyncio
import imageio_ffmpeg
from typing import Optional
from datetime import datetime
import logging
# ── 로깅 최우선 초기화 (Windows UTF-8 stdout, 파일 롤링 포함) ──────────────
from app.core.logging_config import setup_logging
setup_logging()
os.environ["PATH"] += os.pathsep + os.path.dirname(imageio_ffmpeg.get_ffmpeg_exe())
from pydub import AudioSegment
AudioSegment.converter = imageio_ffmpeg.get_ffmpeg_exe()
from app.core.config import settings
from app.core.exceptions import STTError, ModelNotFoundError, AudioFileNotFoundError
from app.models.stt import STTRequest, STTResponse
from app.services.stt_service import WhisperSTTService
from app.services.audio_listener import RadioListener
from app.db.database import init_db, SessionLocal
from app.db.models import TranscriptionRecord, TranscriptionSegment
from app.routers import records as records_router
logger = logging.getLogger("uvicorn.error")
# ─── 전역 감청 인스턴스 (선언만, lifespan에서 초기화)
_radio_listener: RadioListener | None = None
# ─── LLM 백그라운드 워커 큐 ─────────────────────────────────────────────────
# STT 완료 직후 즉시 큐에 넣고 응답을 반환 → 워커가 비동기로 화자 재분류
# asyncio.Queue는 서버 시작 후 event loop가 준비된 뒤 생성해야 하므로
# lifespan 내부에서 초기화됩니다.
llm_task_queue: asyncio.Queue | None = None
# ─── 앱 수명주기: DB 초기화 + RadioListener 구동 ─────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
global _radio_listener, llm_task_queue
# (1) DB 초기화
logger.info("HUTAMS STT Service 시작 - DB 초기화 중...")
init_db()
logger.info("DB 초기화 완료 (whisper.db)")
# (2) 대규모 도메인 사전(CSV) 로드 (Lifespan 기동 중 안전 장치)
try:
from app.core.dictionary import load_terms_from_csv
load_terms_from_csv("bs_20240810.csv")
except Exception as e:
logger.warning(f"대규모 용어 사전 적재 실패 (서버 기동 유지): {e}")
# (3) LLM 워커 큐 초기화 (event loop가 준비된 뒤)
llm_task_queue = asyncio.Queue()
# ─────────────────────────────────────────────────────────────────────────
# [생존형 LLM 백그라운드 워커]
# while True 루프 바깥 예외는 워커 자체를 죽이는데,
# while True 안쪽 try/except 가 모든 개별 작업 예외를 흡수하므로
# 특정 세그먼트 LLM 실패가 워커를 절대로 죽이지 않는다.
# ─────────────────────────────────────────────────────────────────────────
async def llm_worker_loop():
logger.info("[LLM Worker] 생존형 백그라운드 워커 시작 (대화 쓰레드 판별/화자 분류)")
while True:
task = None
try:
task = await llm_task_queue.get()
record_id: int = task.get("record_id")
seg_idx: int = task.get("seg_idx", 0)
text: str = task.get("text", "")
context: str = task.get("context", "")
# ── 1. 첫 번째 DB 세션: 필요한 데이터만 미리 조회 (LLM 중 블로킹 방지) ──
db = SessionLocal()
try:
from sqlalchemy import select as _select, asc as _asc, desc as _desc, func as _func
target_seg = db.scalars(
_select(TranscriptionSegment)
.where(TranscriptionSegment.record_id == record_id)
.order_by(_asc(TranscriptionSegment.id))
.offset(seg_idx).limit(1)
).first()
if not target_seg:
logger.debug(f"[LLM Worker] 스킵: record={record_id}[{seg_idx}] 미존재")
continue
t_seg_id = target_seg.id
t_text = target_seg.text or ""
t_speaker = target_seg.speaker
t_start_iso = target_seg.absolute_start_time
t_rec_id = target_seg.record_id
prev_seg = db.scalars(
_select(TranscriptionSegment)
.where(TranscriptionSegment.id < t_seg_id)
.order_by(_desc(TranscriptionSegment.id))
.limit(1)
).first()
p_seg_id = prev_seg.id if prev_seg else None
p_text = prev_seg.text or "" if prev_seg else ""
p_end_iso = prev_seg.absolute_end_time if prev_seg else None
p_rec_id = prev_seg.record_id if prev_seg else -1
finally:
db.close()
# ── 2. LLM 추론 구간 (DB 연결 해제 상태, 비동기 스레드 풀) ──
from app.services.speaker_classifier import classify_speaker
from app.services.llm_service import llm_service
# a) 화자 판별
speaker_result = await asyncio.get_event_loop().run_in_executor(
None, classify_speaker, text, context
)
# b) 쓰레드(맥락) 판별 (30초 하드 컷오프 포함)
is_continuation = False
if p_seg_id is not None:
def _parse_ts(iso_str):
if not iso_str: return 0.0
try:
# Python 3.11에서는 Z 접미사가 포함된 ISO 8601도 파싱되지만,
# 안전을 위해 replace 활용
return datetime.fromisoformat(iso_str.replace("Z", "+00:00")).timestamp()
except Exception:
return 0.0
gap = _parse_ts(t_start_iso) - _parse_ts(p_end_iso)
if gap >= 30.0:
logger.info(f"[LLM Thread] 30초 이상 Gap ({gap:.1f}s) → 무조건 단절(False)")
is_continuation = False
else:
is_continuation = await asyncio.get_event_loop().run_in_executor(
None, llm_service.check_thread_continuation, p_text, t_text
)
logger.info(f"[LLM Thread] LLM 이어짐 판별: {is_continuation} (gap={gap:.1f}s)")
# ── 3. 두 번째 DB 세션: 결과 갱신 및 상태 반영 ──
db = SessionLocal()
try:
target_seg = db.get(TranscriptionSegment, t_seg_id)
if not target_seg: continue
if target_seg.speaker in ("불명", None, ""):
target_seg.speaker = speaker_result
action = "new"
updated_record_id = target_seg.record_id
if is_continuation:
# 이전 대화로 병합 (Append)
target_seg.record_id = p_rec_id
parent_record = db.get(TranscriptionRecord, p_rec_id)
if parent_record:
parent_record.full_text = f"{parent_record.full_text} {t_text}".strip()
updated_record_id = p_rec_id
action = "append"
# 이전 Record가 완전히 비게 되었다면 껍데기 삭제 (Clean-up)
db.flush()
remaining = db.scalar(
_select(_func.count(TranscriptionSegment.id))
.where(TranscriptionSegment.record_id == t_rec_id)
)
if remaining == 0:
old_record = db.get(TranscriptionRecord, t_rec_id)
if old_record:
db.delete(old_record)
else:
# 독립 대화 (New)
# 이미 _save_to_db가 독립 레코드로 만들었지만,
# 한 STTResponse(같은 파일) 내에서 맥락이 단절된 경우엔 강제로 분리한다.
if target_seg.record_id == p_rec_id:
target_rec = db.get(TranscriptionRecord, t_rec_id)
new_record = TranscriptionRecord(
filename=target_rec.filename if target_rec else "thread_split",
full_text=t_text,
language="ko",
base_datetime=datetime.now(),
processing_time_sec=0, audio_duration_sec=0, peak_memory_mb=0, process_speed_x=0
)
db.add(new_record)
db.flush()
target_seg.record_id = new_record.id
updated_record_id = new_record.id
db.commit()
# ── 4. WebSocket Broadcast: 업데이트 내역 전송 ──
await ws_manager.broadcast({
"type": "thread_updated",
"data": {
"record_id": updated_record_id,
"action": action,
"segment_id": t_seg_id,
"speaker": speaker_result
}
})
# ── 5. 키워드 추출 및 지식 연결 (Context Provider) ──
from app.services.context_service import context_service
keywords = await asyncio.get_event_loop().run_in_executor(
None, llm_service.extract_keywords, t_text
)
if keywords:
logger.info(f"[LLM Worker] 키워드 추출 성공: {keywords}")
contexts = await asyncio.get_event_loop().run_in_executor(
None, context_service.get_extended_context, keywords
)
if contexts:
logger.info(f"[LLM Worker] 지식 발견: {len(contexts)}")
await ws_manager.broadcast({
"type": "context_discovered",
"data": {
"record_id": updated_record_id,
"contexts": contexts
}
})
finally:
db.close()
except Exception as exc:
logger.error(f"[LLM Worker] 작업 처리 중 예외 발생 (생존 방어): {exc!r}")
finally:
if task is not None:
llm_task_queue.task_done()
# [Chapter 8.0] 심각도 기반 차등 보관 태스크 (24시간 주기 실행)
async def audio_cleanup_loop():
logger.info("[Cleanup Worker] 차등 삭제 스케줄러 기동 (3일 초과 일반 오디오 삭제)")
while True:
try:
db = SessionLocal()
try:
from datetime import datetime, timedelta
cutoff_time = datetime.now() - timedelta(days=3)
target_segments = db.query(TranscriptionSegment).join(TranscriptionRecord).filter(
TranscriptionRecord.created_at < cutoff_time,
TranscriptionRecord.urgency != "긴급",
TranscriptionSegment.audio_path.isnot(None)
).all()
for seg in target_segments:
if seg.audio_path and os.path.exists(seg.audio_path):
os.remove(seg.audio_path)
logger.info(f"[Cleanup] 파일 삭제됨: {seg.audio_path}")
seg.audio_path = None
if target_segments:
db.commit()
logger.info(f"[Cleanup] {len(target_segments)}개 오래된 일반 오디오 경로 DB NULL 처리 완료")
except Exception as e:
logger.error(f"[Cleanup] 스케줄러 오류: {e}")
db.rollback()
finally:
db.close()
await asyncio.sleep(24 * 3600) # 24시간마다 실행
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"[Cleanup] 스케줄러 외부 오류: {e}")
await asyncio.sleep(3600)
# 워커 태스크 기동 (FastAPI lifespan 내에서 asyncio Task로 등록)
worker_task = asyncio.create_task(llm_worker_loop())
cleanup_task = asyncio.create_task(audio_cleanup_loop())
logger.info("[LLM Worker & Cleanup] asyncio Task 등록 완료")
# (3) 실시간 on_segment 콜백 정의
async def on_segment(wav_path: str):
"""RadioListener 가 wav 파일을 완성하면 호출되는 비동기 콜백."""
try:
svc = WhisperSTTService()
req = STTRequest(
audio_file_path=wav_path,
language="ko",
base_datetime=datetime.now()
)
resp = svc.transcribe(req)
# DB 저장: 모든 세그먼트 speaker="불명" 으로 즉시 저장
saved_filename = os.path.basename(wav_path)
record_id = _save_to_db_sync(saved_filename, resp, None)
# WebSocket broadcast (STT 완료 즉시, LLM 기다리지 않음)
broadcast_data = resp.model_dump()
broadcast_data["type"] = "stt_result"
broadcast_data["record_id"] = record_id # 워커(Thread Check) 결과 매핑용
# DB insert된 segment들의 id도 프론트엔드로 전달
# (_save_to_db_sync가 삽입한 target_seg.id를 알 수 없으나,
# 여기서는 단순화를 위해 STTResponse만 보냄. 프론트에서 segment 인덱스를 활용하거나 생략)
await ws_manager.broadcast(broadcast_data)
logger.info(f"WebSocket broadcast 완료 ({len(ws_manager.active_connections)}개 클라이언트)")
# ── LLM 화자 재분류 작업을 큐에 넣기 (put_nowait: 블로킹 없음) ──
if llm_task_queue is not None and record_id is not None:
text_parts = [seg.text or "" for seg in resp.segments]
for i, seg_data in enumerate(resp.segments):
context = " ".join(text_parts[max(0, i-2):i])
# seg_id를 얻기 위해 DB에서 record_id로 조회 (lazy)
llm_task_queue.put_nowait({
"seg_id": None, # 번호 모름 → 워커에서 record_id로 탐색
"record_id": record_id,
"seg_idx": i,
"text": seg_data.text or "",
"context": context,
})
logger.debug(f"[on_segment] LLM 큐 등록: {len(resp.segments)}")
except Exception as e:
logger.error(f"on_segment 파이프라인 오류: {e}")
finally:
try:
if os.path.exists(wav_path):
os.remove(wav_path)
except OSError:
pass
# (4) 감청 모듈 분기 기동 (AUDIO_SOURCE 설정에 따라)
loop = asyncio.get_event_loop()
source = settings.AUDIO_SOURCE.strip().lower()
if source == "mock":
from app.services.mock_audio_listener import MockAudioListener
mock_path = settings.MOCK_AUDIO_PATH
logger.info(f"AUDIO_SOURCE=mock → MockAudioListener 기동 ({mock_path})")
_radio_listener = MockAudioListener(
on_segment=on_segment, loop=loop, wav_path=mock_path, loop_playback=True
)
else:
logger.info("AUDIO_SOURCE=mic → RadioListener(실마이크) 기동")
_radio_listener = RadioListener(on_segment=on_segment, loop=loop)
_radio_listener.start()
yield # ← 서버 서비스 제공 구간
# (5) 종료 시 안전 해제
if _radio_listener:
_radio_listener.stop()
worker_task.cancel() # 워커 태스크 취소
cleanup_task.cancel() # 정리 태스크 취소
logger.info("HUTAMS STT Service 종료")
# FastAPI Application 객체 생성
app = FastAPI(
title=settings.PROJECT_NAME,
description="현업 LTE-R 무전 감청 및 실시간 분석용 STT API",
version="1.0.0",
lifespan=lifespan
)
# DI를 위한 의존성 주입 객체
def get_stt_service() -> WhisperSTTService:
return WhisperSTTService()
# 정적 파일 및 템플릿 설정
_HERE = os.path.dirname(os.path.abspath(__file__))
templates = Jinja2Templates(directory=os.path.join(_HERE, "templates"))
app.mount("/static", StaticFiles(directory=os.path.join(_HERE, "static")), name="static")
# 라우터 등록
app.include_router(records_router.router)
# ─── 대시보드 페이지 ───────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
async def dashboard(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
def _save_to_db(filename: str, response: STTResponse, base_datetime: Optional[datetime]) -> Optional[int]:
"""
STT 결과를 SQLite DB에 Insert.
반환: 저장된 record.id (실패 시 None) — on_segment 에서 LLM 큐 등록에 활용.
세그먼트 speaker 는 즉시 '불명' 으로 저장하고, 워커가 나중에 업데이트.
"""
db = None
try:
from app.services.llm_service import llm_service
meta = llm_service.generate_metadata(response.text)
db = SessionLocal()
record = TranscriptionRecord(
filename=filename,
full_text=response.text,
language=response.language,
base_datetime=base_datetime,
processing_time_sec=response.processing_time_sec,
audio_duration_sec=response.audio_duration_sec,
peak_memory_mb=response.peak_memory_mb,
process_speed_x=response.process_speed_x,
title=meta.title or None,
summary=meta.summary or None,
keywords=meta.keywords or None,
urgency=meta.urgency or None,
)
db.add(record)
db.flush() # record.id 확보
for seg in response.segments:
db.add(TranscriptionSegment(
record_id=record.id,
start_sec=seg.start_sec,
end_sec=seg.end_sec,
text=seg.text,
speaker="불명", # 즉시 저장 기본값 — 워커가 나중에 업데이트
absolute_start_time=seg.absolute_start_time,
absolute_end_time=seg.absolute_end_time,
audio_path=seg.audio_path,
))
db.commit()
# [Chapter 8.0] 생성된 세그먼트 ID를 응답 객체에 주입하여 프론트엔드 오디오 재생 뱃지와 매핑
for idx, seg in enumerate(response.segments):
if hasattr(TranscriptionSegment, 'id'):
# SQLAlchemy에 의해 방금 부여된 ID
db_seg = db.query(TranscriptionSegment).filter_by(record_id=record.id).order_by(TranscriptionSegment.id).offset(idx).first()
if db_seg:
# Pydantic 모델에 여분 필드(extra)로 id 주입
seg.id = db_seg.id
record_id = record.id
logger.info(f"DB 저장 완료 (record_id={record_id}, filename={filename})")
return record_id
except Exception as e:
logger.error(f"DB 저장 실패 (Soft-Fail): {e}")
try:
if db:
db.rollback()
except Exception:
pass
return None
finally:
try:
if db:
db.close()
except Exception:
pass
# 동기 별칭
_save_to_db_sync = _save_to_db
@app.post("/api/v1/transcribe", response_model=STTResponse, summary="오디오 파일 STT 변환")
async def transcribe_audio_file(
background_tasks: BackgroundTasks,
audio: UploadFile = File(..., description="분석할 무전 오디오 파일 (m4a, mp3, mp4, wav 등)"),
language: str = Form("ko", description="오디오 언어 코드 (기본: ko)"),
base_datetime: Optional[datetime] = Form(None, description="녹음 시작 절대 시간 (ISO 8601, 예: 2024-03-06T12:00:00+09:00)"),
stt_service: WhisperSTTService = Depends(get_stt_service)
):
"""
클라이언트로부터 오디오 파일을 업로드 받아 STT 엔진에 돌린 후,
Pydantic Model 통일 규격의 결과값 반환. 처리 후 결과는 DB에도 자동 적재됨.
"""
tmp_path = ""
saved_filename = ""
try:
import time
# 1. 파일을 고유한 이름으로 static/audio 폴더에 영구 저장 (timestamp 활용)
safe_name = audio.filename.replace(" ", "_")
timestamp = int(time.time())
saved_filename = f"{timestamp}_{safe_name}"
save_dir = os.path.join(_HERE, "static", "audio")
os.makedirs(save_dir, exist_ok=True)
tmp_path = os.path.join(save_dir, saved_filename)
with open(tmp_path, "wb") as buffer:
shutil.copyfileobj(audio.file, buffer)
# 2. Controller -> Model 데이터 생성
request_dto = STTRequest(
audio_file_path=tmp_path,
language=language,
base_datetime=base_datetime
)
# 3. 비즈니스 로직(Service) 호출
response_dto = stt_service.transcribe(request_dto)
# 4. 백그라운드로 DB 저장 (Soft-Fail: DB 에러가 응답을 막지 않음)
# 이제 임시 파일명이 아닌 실제 저장된 파일명을 DB에 저장합니다.
background_tasks.add_task(_save_to_db, saved_filename, response_dto, base_datetime)
return response_dto
except ModelNotFoundError as e:
raise HTTPException(status_code=500, detail=str(e))
except AudioFileNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e))
except STTError as e:
raise HTTPException(status_code=500, detail=str(e))
# finally 블록에서 os.remove(tmp_path)를 호출하던 부분을 제거하여
# 오디오 파일이 static 폴더에 보존되도록 합니다.
# ─── WebSocket 매니저 및 라우터 (실시간 감청용) ───────────────────────────────
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
async def broadcast(self, message: dict):
for connection in self.active_connections:
try:
await connection.send_json(message)
except Exception:
pass
ws_manager = ConnectionManager()
# 마지막으로 broadcast된 데이터를 캐시 (뒤늦게 연결한 클라이언트에게 즉시 전송)
_last_broadcast: dict | None = None
@app.websocket("/api/v1/ws/live")
async def websocket_live_endpoint(websocket: WebSocket):
global _last_broadcast
await ws_manager.connect(websocket)
try:
# 연결 성공 알림
await websocket.send_json({"type": "info", "message": "WebSocket Connected."})
# Late-join 지원: 연결 즉시 DB에서 최근 레코드 1건을 push
# (사용자가 Live Mode를 켰을 때 화면이 공백으로 보이는 문제 해결)
try:
db = SessionLocal()
from sqlalchemy.orm import joinedload as _jl
latest = db.query(TranscriptionRecord).options(
_jl(TranscriptionRecord.segments)
).order_by(TranscriptionRecord.id.desc()).first()
if latest:
from app.models.record import RecordListResponse
payload = RecordListResponse.model_validate(latest).model_dump(mode="json")
payload["type"] = "stt_result"
payload["late_join"] = True
await websocket.send_json(payload)
db.close()
except Exception as e:
logger.warning(f"Late-join push 실패 (무시): {e}")
while True:
data = await websocket.receive_text()
except WebSocketDisconnect:
ws_manager.disconnect(websocket)