565 lines
26 KiB
Python
565 lines
26 KiB
Python
from contextlib import asynccontextmanager
|
|
from fastapi import FastAPI, Depends, UploadFile, File, Form, HTTPException, BackgroundTasks, Request, WebSocket, WebSocketDisconnect
|
|
from fastapi.responses import JSONResponse, HTMLResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
import shutil
|
|
import tempfile
|
|
import os
|
|
import asyncio
|
|
import imageio_ffmpeg
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
# ── 로깅 최우선 초기화 (Windows UTF-8 stdout, 파일 롤링 포함) ──────────────
|
|
from app.core.logging_config import setup_logging
|
|
setup_logging()
|
|
|
|
os.environ["PATH"] += os.pathsep + os.path.dirname(imageio_ffmpeg.get_ffmpeg_exe())
|
|
|
|
from pydub import AudioSegment
|
|
AudioSegment.converter = imageio_ffmpeg.get_ffmpeg_exe()
|
|
|
|
from app.core.config import settings
|
|
from app.core.exceptions import STTError, ModelNotFoundError, AudioFileNotFoundError
|
|
from app.models.stt import STTRequest, STTResponse
|
|
from app.services.stt_service import WhisperSTTService
|
|
from app.services.audio_listener import RadioListener
|
|
from app.db.database import init_db, SessionLocal
|
|
from app.db.models import TranscriptionRecord, TranscriptionSegment
|
|
from app.routers import records as records_router
|
|
|
|
logger = logging.getLogger("uvicorn.error")
|
|
|
|
# ─── 전역 감청 인스턴스 (선언만, lifespan에서 초기화)
|
|
_radio_listener: RadioListener | None = None
|
|
|
|
# ─── LLM 백그라운드 워커 큐 ─────────────────────────────────────────────────
|
|
# STT 완료 직후 즉시 큐에 넣고 응답을 반환 → 워커가 비동기로 화자 재분류
|
|
# asyncio.Queue는 서버 시작 후 event loop가 준비된 뒤 생성해야 하므로
|
|
# lifespan 내부에서 초기화됩니다.
|
|
llm_task_queue: asyncio.Queue | None = None
|
|
|
|
# ─── 앱 수명주기: DB 초기화 + RadioListener 구동 ─────────────────────────────
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
global _radio_listener, llm_task_queue
|
|
|
|
# (1) DB 초기화
|
|
logger.info("HUTAMS STT Service 시작 - DB 초기화 중...")
|
|
init_db()
|
|
logger.info("DB 초기화 완료 (whisper.db)")
|
|
|
|
# (2) 대규모 도메인 사전(CSV) 로드 (Lifespan 기동 중 안전 장치)
|
|
try:
|
|
from app.core.dictionary import load_terms_from_csv
|
|
load_terms_from_csv("bs_20240810.csv")
|
|
except Exception as e:
|
|
logger.warning(f"대규모 용어 사전 적재 실패 (서버 기동 유지): {e}")
|
|
|
|
# (3) LLM 워커 큐 초기화 (event loop가 준비된 뒤)
|
|
llm_task_queue = asyncio.Queue()
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
# [생존형 LLM 백그라운드 워커]
|
|
# while True 루프 바깥 예외는 워커 자체를 죽이는데,
|
|
# while True 안쪽 try/except 가 모든 개별 작업 예외를 흡수하므로
|
|
# 특정 세그먼트 LLM 실패가 워커를 절대로 죽이지 않는다.
|
|
# ─────────────────────────────────────────────────────────────────────────
|
|
async def llm_worker_loop():
|
|
logger.info("[LLM Worker] 생존형 백그라운드 워커 시작 (대화 쓰레드 판별/화자 분류)")
|
|
while True:
|
|
task = None
|
|
try:
|
|
task = await llm_task_queue.get()
|
|
record_id: int = task.get("record_id")
|
|
seg_idx: int = task.get("seg_idx", 0)
|
|
text: str = task.get("text", "")
|
|
context: str = task.get("context", "")
|
|
|
|
# ── 1. 첫 번째 DB 세션: 필요한 데이터만 미리 조회 (LLM 중 블로킹 방지) ──
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import select as _select, asc as _asc, desc as _desc, func as _func
|
|
target_seg = db.scalars(
|
|
_select(TranscriptionSegment)
|
|
.where(TranscriptionSegment.record_id == record_id)
|
|
.order_by(_asc(TranscriptionSegment.id))
|
|
.offset(seg_idx).limit(1)
|
|
).first()
|
|
|
|
if not target_seg:
|
|
logger.debug(f"[LLM Worker] 스킵: record={record_id}[{seg_idx}] 미존재")
|
|
continue
|
|
|
|
t_seg_id = target_seg.id
|
|
t_text = target_seg.text or ""
|
|
t_speaker = target_seg.speaker
|
|
t_start_iso = target_seg.absolute_start_time
|
|
t_rec_id = target_seg.record_id
|
|
|
|
prev_seg = db.scalars(
|
|
_select(TranscriptionSegment)
|
|
.where(TranscriptionSegment.id < t_seg_id)
|
|
.order_by(_desc(TranscriptionSegment.id))
|
|
.limit(1)
|
|
).first()
|
|
|
|
p_seg_id = prev_seg.id if prev_seg else None
|
|
p_text = prev_seg.text or "" if prev_seg else ""
|
|
p_end_iso = prev_seg.absolute_end_time if prev_seg else None
|
|
p_rec_id = prev_seg.record_id if prev_seg else -1
|
|
finally:
|
|
db.close()
|
|
|
|
# ── 2. LLM 추론 구간 (DB 연결 해제 상태, 비동기 스레드 풀) ──
|
|
from app.services.speaker_classifier import classify_speaker
|
|
from app.services.llm_service import llm_service
|
|
|
|
# a) 화자 판별
|
|
speaker_result = await asyncio.get_event_loop().run_in_executor(
|
|
None, classify_speaker, text, context
|
|
)
|
|
|
|
# b) 쓰레드(맥락) 판별 (30초 하드 컷오프 포함)
|
|
is_continuation = False
|
|
if p_seg_id is not None:
|
|
def _parse_ts(iso_str):
|
|
if not iso_str: return 0.0
|
|
try:
|
|
# Python 3.11에서는 Z 접미사가 포함된 ISO 8601도 파싱되지만,
|
|
# 안전을 위해 replace 활용
|
|
return datetime.fromisoformat(iso_str.replace("Z", "+00:00")).timestamp()
|
|
except Exception:
|
|
return 0.0
|
|
|
|
gap = _parse_ts(t_start_iso) - _parse_ts(p_end_iso)
|
|
if gap >= 30.0:
|
|
logger.info(f"[LLM Thread] 30초 이상 Gap ({gap:.1f}s) → 무조건 단절(False)")
|
|
is_continuation = False
|
|
else:
|
|
is_continuation = await asyncio.get_event_loop().run_in_executor(
|
|
None, llm_service.check_thread_continuation, p_text, t_text
|
|
)
|
|
logger.info(f"[LLM Thread] LLM 이어짐 판별: {is_continuation} (gap={gap:.1f}s)")
|
|
|
|
# ── 3. 두 번째 DB 세션: 결과 갱신 및 상태 반영 ──
|
|
db = SessionLocal()
|
|
try:
|
|
target_seg = db.get(TranscriptionSegment, t_seg_id)
|
|
if not target_seg: continue
|
|
|
|
if target_seg.speaker in ("불명", None, ""):
|
|
target_seg.speaker = speaker_result
|
|
|
|
action = "new"
|
|
updated_record_id = target_seg.record_id
|
|
|
|
if is_continuation:
|
|
# 이전 대화로 병합 (Append)
|
|
target_seg.record_id = p_rec_id
|
|
parent_record = db.get(TranscriptionRecord, p_rec_id)
|
|
if parent_record:
|
|
parent_record.full_text = f"{parent_record.full_text} {t_text}".strip()
|
|
updated_record_id = p_rec_id
|
|
action = "append"
|
|
|
|
# 이전 Record가 완전히 비게 되었다면 껍데기 삭제 (Clean-up)
|
|
db.flush()
|
|
remaining = db.scalar(
|
|
_select(_func.count(TranscriptionSegment.id))
|
|
.where(TranscriptionSegment.record_id == t_rec_id)
|
|
)
|
|
if remaining == 0:
|
|
old_record = db.get(TranscriptionRecord, t_rec_id)
|
|
if old_record:
|
|
db.delete(old_record)
|
|
else:
|
|
# 독립 대화 (New)
|
|
# 이미 _save_to_db가 독립 레코드로 만들었지만,
|
|
# 한 STTResponse(같은 파일) 내에서 맥락이 단절된 경우엔 강제로 분리한다.
|
|
if target_seg.record_id == p_rec_id:
|
|
target_rec = db.get(TranscriptionRecord, t_rec_id)
|
|
new_record = TranscriptionRecord(
|
|
filename=target_rec.filename if target_rec else "thread_split",
|
|
full_text=t_text,
|
|
language="ko",
|
|
base_datetime=datetime.now(),
|
|
processing_time_sec=0, audio_duration_sec=0, peak_memory_mb=0, process_speed_x=0
|
|
)
|
|
db.add(new_record)
|
|
db.flush()
|
|
target_seg.record_id = new_record.id
|
|
updated_record_id = new_record.id
|
|
|
|
db.commit()
|
|
|
|
# ── 4. WebSocket Broadcast: 업데이트 내역 전송 ──
|
|
await ws_manager.broadcast({
|
|
"type": "thread_updated",
|
|
"data": {
|
|
"record_id": updated_record_id,
|
|
"action": action,
|
|
"segment_id": t_seg_id,
|
|
"speaker": speaker_result
|
|
}
|
|
})
|
|
# ── 5. 키워드 추출 및 지식 연결 (Context Provider) ──
|
|
from app.services.context_service import context_service
|
|
keywords = await asyncio.get_event_loop().run_in_executor(
|
|
None, llm_service.extract_keywords, t_text
|
|
)
|
|
|
|
if keywords:
|
|
logger.info(f"[LLM Worker] 키워드 추출 성공: {keywords}")
|
|
contexts = await asyncio.get_event_loop().run_in_executor(
|
|
None, context_service.get_extended_context, keywords
|
|
)
|
|
if contexts:
|
|
logger.info(f"[LLM Worker] 지식 발견: {len(contexts)}건")
|
|
await ws_manager.broadcast({
|
|
"type": "context_discovered",
|
|
"data": {
|
|
"record_id": updated_record_id,
|
|
"contexts": contexts
|
|
}
|
|
})
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
except Exception as exc:
|
|
logger.error(f"[LLM Worker] 작업 처리 중 예외 발생 (생존 방어): {exc!r}")
|
|
finally:
|
|
if task is not None:
|
|
llm_task_queue.task_done()
|
|
|
|
# [Chapter 8.0] 심각도 기반 차등 보관 태스크 (24시간 주기 실행)
|
|
async def audio_cleanup_loop():
|
|
logger.info("[Cleanup Worker] 차등 삭제 스케줄러 기동 (3일 초과 일반 오디오 삭제)")
|
|
while True:
|
|
try:
|
|
db = SessionLocal()
|
|
try:
|
|
from datetime import datetime, timedelta
|
|
cutoff_time = datetime.now() - timedelta(days=3)
|
|
|
|
target_segments = db.query(TranscriptionSegment).join(TranscriptionRecord).filter(
|
|
TranscriptionRecord.created_at < cutoff_time,
|
|
TranscriptionRecord.urgency != "긴급",
|
|
TranscriptionSegment.audio_path.isnot(None)
|
|
).all()
|
|
|
|
for seg in target_segments:
|
|
if seg.audio_path and os.path.exists(seg.audio_path):
|
|
os.remove(seg.audio_path)
|
|
logger.info(f"[Cleanup] 파일 삭제됨: {seg.audio_path}")
|
|
seg.audio_path = None
|
|
|
|
if target_segments:
|
|
db.commit()
|
|
logger.info(f"[Cleanup] {len(target_segments)}개 오래된 일반 오디오 경로 DB NULL 처리 완료")
|
|
except Exception as e:
|
|
logger.error(f"[Cleanup] 스케줄러 오류: {e}")
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
await asyncio.sleep(24 * 3600) # 24시간마다 실행
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"[Cleanup] 스케줄러 외부 오류: {e}")
|
|
await asyncio.sleep(3600)
|
|
|
|
# 워커 태스크 기동 (FastAPI lifespan 내에서 asyncio Task로 등록)
|
|
worker_task = asyncio.create_task(llm_worker_loop())
|
|
cleanup_task = asyncio.create_task(audio_cleanup_loop())
|
|
logger.info("[LLM Worker & Cleanup] asyncio Task 등록 완료")
|
|
|
|
# (3) 실시간 on_segment 콜백 정의
|
|
async def on_segment(wav_path: str):
|
|
"""RadioListener 가 wav 파일을 완성하면 호출되는 비동기 콜백."""
|
|
try:
|
|
svc = WhisperSTTService()
|
|
req = STTRequest(
|
|
audio_file_path=wav_path,
|
|
language="ko",
|
|
base_datetime=datetime.now()
|
|
)
|
|
resp = svc.transcribe(req)
|
|
|
|
# DB 저장: 모든 세그먼트 speaker="불명" 으로 즉시 저장
|
|
saved_filename = os.path.basename(wav_path)
|
|
record_id = _save_to_db_sync(saved_filename, resp, None)
|
|
|
|
# WebSocket broadcast (STT 완료 즉시, LLM 기다리지 않음)
|
|
broadcast_data = resp.model_dump()
|
|
broadcast_data["type"] = "stt_result"
|
|
broadcast_data["record_id"] = record_id # 워커(Thread Check) 결과 매핑용
|
|
|
|
# DB insert된 segment들의 id도 프론트엔드로 전달
|
|
# (_save_to_db_sync가 삽입한 target_seg.id를 알 수 없으나,
|
|
# 여기서는 단순화를 위해 STTResponse만 보냄. 프론트에서 segment 인덱스를 활용하거나 생략)
|
|
await ws_manager.broadcast(broadcast_data)
|
|
logger.info(f"WebSocket broadcast 완료 ({len(ws_manager.active_connections)}개 클라이언트)")
|
|
|
|
# ── LLM 화자 재분류 작업을 큐에 넣기 (put_nowait: 블로킹 없음) ──
|
|
if llm_task_queue is not None and record_id is not None:
|
|
text_parts = [seg.text or "" for seg in resp.segments]
|
|
for i, seg_data in enumerate(resp.segments):
|
|
context = " ".join(text_parts[max(0, i-2):i])
|
|
# seg_id를 얻기 위해 DB에서 record_id로 조회 (lazy)
|
|
llm_task_queue.put_nowait({
|
|
"seg_id": None, # 번호 모름 → 워커에서 record_id로 탐색
|
|
"record_id": record_id,
|
|
"seg_idx": i,
|
|
"text": seg_data.text or "",
|
|
"context": context,
|
|
})
|
|
logger.debug(f"[on_segment] LLM 큐 등록: {len(resp.segments)}건")
|
|
|
|
except Exception as e:
|
|
logger.error(f"on_segment 파이프라인 오류: {e}")
|
|
finally:
|
|
try:
|
|
if os.path.exists(wav_path):
|
|
os.remove(wav_path)
|
|
except OSError:
|
|
pass
|
|
|
|
# (4) 감청 모듈 분기 기동 (AUDIO_SOURCE 설정에 따라)
|
|
loop = asyncio.get_event_loop()
|
|
source = settings.AUDIO_SOURCE.strip().lower()
|
|
|
|
if source == "mock":
|
|
from app.services.mock_audio_listener import MockAudioListener
|
|
mock_path = settings.MOCK_AUDIO_PATH
|
|
logger.info(f"AUDIO_SOURCE=mock → MockAudioListener 기동 ({mock_path})")
|
|
_radio_listener = MockAudioListener(
|
|
on_segment=on_segment, loop=loop, wav_path=mock_path, loop_playback=True
|
|
)
|
|
else:
|
|
logger.info("AUDIO_SOURCE=mic → RadioListener(실마이크) 기동")
|
|
_radio_listener = RadioListener(on_segment=on_segment, loop=loop)
|
|
|
|
_radio_listener.start()
|
|
|
|
yield # ← 서버 서비스 제공 구간
|
|
|
|
# (5) 종료 시 안전 해제
|
|
if _radio_listener:
|
|
_radio_listener.stop()
|
|
worker_task.cancel() # 워커 태스크 취소
|
|
cleanup_task.cancel() # 정리 태스크 취소
|
|
logger.info("HUTAMS STT Service 종료")
|
|
|
|
# FastAPI Application 객체 생성
|
|
app = FastAPI(
|
|
title=settings.PROJECT_NAME,
|
|
description="현업 LTE-R 무전 감청 및 실시간 분석용 STT API",
|
|
version="1.0.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
# DI를 위한 의존성 주입 객체
|
|
def get_stt_service() -> WhisperSTTService:
|
|
return WhisperSTTService()
|
|
|
|
# 정적 파일 및 템플릿 설정
|
|
_HERE = os.path.dirname(os.path.abspath(__file__))
|
|
templates = Jinja2Templates(directory=os.path.join(_HERE, "templates"))
|
|
app.mount("/static", StaticFiles(directory=os.path.join(_HERE, "static")), name="static")
|
|
|
|
# 라우터 등록
|
|
app.include_router(records_router.router)
|
|
|
|
# ─── 대시보드 페이지 ───────────────────────────────────────────────────────────
|
|
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
|
|
async def dashboard(request: Request):
|
|
return templates.TemplateResponse("index.html", {"request": request})
|
|
|
|
def _save_to_db(filename: str, response: STTResponse, base_datetime: Optional[datetime]) -> Optional[int]:
|
|
"""
|
|
STT 결과를 SQLite DB에 Insert.
|
|
반환: 저장된 record.id (실패 시 None) — on_segment 에서 LLM 큐 등록에 활용.
|
|
세그먼트 speaker 는 즉시 '불명' 으로 저장하고, 워커가 나중에 업데이트.
|
|
"""
|
|
db = None
|
|
try:
|
|
from app.services.llm_service import llm_service
|
|
meta = llm_service.generate_metadata(response.text)
|
|
|
|
db = SessionLocal()
|
|
record = TranscriptionRecord(
|
|
filename=filename,
|
|
full_text=response.text,
|
|
language=response.language,
|
|
base_datetime=base_datetime,
|
|
processing_time_sec=response.processing_time_sec,
|
|
audio_duration_sec=response.audio_duration_sec,
|
|
peak_memory_mb=response.peak_memory_mb,
|
|
process_speed_x=response.process_speed_x,
|
|
title=meta.title or None,
|
|
summary=meta.summary or None,
|
|
keywords=meta.keywords or None,
|
|
urgency=meta.urgency or None,
|
|
)
|
|
db.add(record)
|
|
db.flush() # record.id 확보
|
|
|
|
for seg in response.segments:
|
|
db.add(TranscriptionSegment(
|
|
record_id=record.id,
|
|
start_sec=seg.start_sec,
|
|
end_sec=seg.end_sec,
|
|
text=seg.text,
|
|
speaker="불명", # 즉시 저장 기본값 — 워커가 나중에 업데이트
|
|
absolute_start_time=seg.absolute_start_time,
|
|
absolute_end_time=seg.absolute_end_time,
|
|
audio_path=seg.audio_path,
|
|
))
|
|
|
|
db.commit()
|
|
|
|
# [Chapter 8.0] 생성된 세그먼트 ID를 응답 객체에 주입하여 프론트엔드 오디오 재생 뱃지와 매핑
|
|
for idx, seg in enumerate(response.segments):
|
|
if hasattr(TranscriptionSegment, 'id'):
|
|
# SQLAlchemy에 의해 방금 부여된 ID
|
|
db_seg = db.query(TranscriptionSegment).filter_by(record_id=record.id).order_by(TranscriptionSegment.id).offset(idx).first()
|
|
if db_seg:
|
|
# Pydantic 모델에 여분 필드(extra)로 id 주입
|
|
seg.id = db_seg.id
|
|
|
|
record_id = record.id
|
|
logger.info(f"DB 저장 완료 (record_id={record_id}, filename={filename})")
|
|
return record_id
|
|
|
|
except Exception as e:
|
|
logger.error(f"DB 저장 실패 (Soft-Fail): {e}")
|
|
try:
|
|
if db:
|
|
db.rollback()
|
|
except Exception:
|
|
pass
|
|
return None
|
|
finally:
|
|
try:
|
|
if db:
|
|
db.close()
|
|
except Exception:
|
|
pass
|
|
|
|
# 동기 별칭
|
|
_save_to_db_sync = _save_to_db
|
|
|
|
|
|
|
|
@app.post("/api/v1/transcribe", response_model=STTResponse, summary="오디오 파일 STT 변환")
|
|
async def transcribe_audio_file(
|
|
background_tasks: BackgroundTasks,
|
|
audio: UploadFile = File(..., description="분석할 무전 오디오 파일 (m4a, mp3, mp4, wav 등)"),
|
|
language: str = Form("ko", description="오디오 언어 코드 (기본: ko)"),
|
|
base_datetime: Optional[datetime] = Form(None, description="녹음 시작 절대 시간 (ISO 8601, 예: 2024-03-06T12:00:00+09:00)"),
|
|
stt_service: WhisperSTTService = Depends(get_stt_service)
|
|
):
|
|
"""
|
|
클라이언트로부터 오디오 파일을 업로드 받아 STT 엔진에 돌린 후,
|
|
Pydantic Model 통일 규격의 결과값 반환. 처리 후 결과는 DB에도 자동 적재됨.
|
|
"""
|
|
tmp_path = ""
|
|
saved_filename = ""
|
|
try:
|
|
import time
|
|
# 1. 파일을 고유한 이름으로 static/audio 폴더에 영구 저장 (timestamp 활용)
|
|
safe_name = audio.filename.replace(" ", "_")
|
|
timestamp = int(time.time())
|
|
saved_filename = f"{timestamp}_{safe_name}"
|
|
save_dir = os.path.join(_HERE, "static", "audio")
|
|
os.makedirs(save_dir, exist_ok=True)
|
|
|
|
tmp_path = os.path.join(save_dir, saved_filename)
|
|
with open(tmp_path, "wb") as buffer:
|
|
shutil.copyfileobj(audio.file, buffer)
|
|
|
|
# 2. Controller -> Model 데이터 생성
|
|
request_dto = STTRequest(
|
|
audio_file_path=tmp_path,
|
|
language=language,
|
|
base_datetime=base_datetime
|
|
)
|
|
|
|
# 3. 비즈니스 로직(Service) 호출
|
|
response_dto = stt_service.transcribe(request_dto)
|
|
|
|
# 4. 백그라운드로 DB 저장 (Soft-Fail: DB 에러가 응답을 막지 않음)
|
|
# 이제 임시 파일명이 아닌 실제 저장된 파일명을 DB에 저장합니다.
|
|
background_tasks.add_task(_save_to_db, saved_filename, response_dto, base_datetime)
|
|
|
|
return response_dto
|
|
|
|
except ModelNotFoundError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
except AudioFileNotFoundError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except STTError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
# finally 블록에서 os.remove(tmp_path)를 호출하던 부분을 제거하여
|
|
# 오디오 파일이 static 폴더에 보존되도록 합니다.
|
|
|
|
# ─── WebSocket 매니저 및 라우터 (실시간 감청용) ───────────────────────────────
|
|
class ConnectionManager:
|
|
def __init__(self):
|
|
self.active_connections: list[WebSocket] = []
|
|
|
|
async def connect(self, websocket: WebSocket):
|
|
await websocket.accept()
|
|
self.active_connections.append(websocket)
|
|
|
|
def disconnect(self, websocket: WebSocket):
|
|
if websocket in self.active_connections:
|
|
self.active_connections.remove(websocket)
|
|
|
|
async def broadcast(self, message: dict):
|
|
for connection in self.active_connections:
|
|
try:
|
|
await connection.send_json(message)
|
|
except Exception:
|
|
pass
|
|
|
|
ws_manager = ConnectionManager()
|
|
|
|
# 마지막으로 broadcast된 데이터를 캐시 (뒤늦게 연결한 클라이언트에게 즉시 전송)
|
|
_last_broadcast: dict | None = None
|
|
|
|
@app.websocket("/api/v1/ws/live")
|
|
async def websocket_live_endpoint(websocket: WebSocket):
|
|
global _last_broadcast
|
|
await ws_manager.connect(websocket)
|
|
try:
|
|
# 연결 성공 알림
|
|
await websocket.send_json({"type": "info", "message": "WebSocket Connected."})
|
|
|
|
# Late-join 지원: 연결 즉시 DB에서 최근 레코드 1건을 push
|
|
# (사용자가 Live Mode를 켰을 때 화면이 공백으로 보이는 문제 해결)
|
|
try:
|
|
db = SessionLocal()
|
|
from sqlalchemy.orm import joinedload as _jl
|
|
latest = db.query(TranscriptionRecord).options(
|
|
_jl(TranscriptionRecord.segments)
|
|
).order_by(TranscriptionRecord.id.desc()).first()
|
|
if latest:
|
|
from app.models.record import RecordListResponse
|
|
payload = RecordListResponse.model_validate(latest).model_dump(mode="json")
|
|
payload["type"] = "stt_result"
|
|
payload["late_join"] = True
|
|
await websocket.send_json(payload)
|
|
db.close()
|
|
except Exception as e:
|
|
logger.warning(f"Late-join push 실패 (무시): {e}")
|
|
|
|
while True:
|
|
data = await websocket.receive_text()
|
|
except WebSocketDisconnect:
|
|
ws_manager.disconnect(websocket)
|