465 lines
21 KiB
Python
465 lines
21 KiB
Python
import numpy as np
|
|
import os
|
|
import tempfile
|
|
import openai
|
|
import configparser
|
|
import wave
|
|
import struct
|
|
import threading
|
|
import queue
|
|
import time
|
|
import requests
|
|
import json
|
|
from vosk import Model, KaldiRecognizer
|
|
from typing import Optional, Callable
|
|
|
|
class SpeechRecognizer:
|
|
def __init__(self, config_path="config.ini"):
|
|
self.config = configparser.ConfigParser()
|
|
|
|
if os.path.exists(config_path):
|
|
self.config.read(config_path)
|
|
|
|
# OpenAI API 키
|
|
self.api_key = self.config.get("api", "openai_api_key", fallback="")
|
|
|
|
# Hugging Face API 키
|
|
self.hf_api_key = self.config.get("api", "huggingface_api_key", fallback="")
|
|
|
|
# 모델 선택
|
|
self.model_provider = self.config.get("model", "provider", fallback="huggingface")
|
|
self.model_name = self.config.get("model", "name", fallback="facebook/wav2vec2-base-960h")
|
|
|
|
# 한국어 모델 (기본값으로 사용 가능한 한국어 음성인식 모델들)
|
|
self.korean_models = [
|
|
"kresnik/wav2vec2-large-xlsr-korean", # 한국어 특화 모델
|
|
"openai/whisper-small", # 한국어 특화 Whisper 모델
|
|
"openai/whisper-medium", # 더 큰 한국어 특화 Whisper 모델
|
|
"openai/whisper-large-v3", # 다국어 모델 (한국어 포함)
|
|
"facebook/wav2vec2-base-960h" # 영어 기본 모델 (참고용)
|
|
]
|
|
|
|
# API 키가 설정되지 않았을 때 경고
|
|
if not self.api_key and not self.hf_api_key:
|
|
print("경고: API 키가 설정되지 않았습니다. config.ini 파일에 OpenAI API 키 또는 Hugging Face API 키를 설정하세요.")
|
|
|
|
# OpenAI 설정 (백업용)
|
|
openai.api_key = self.api_key
|
|
|
|
# 실시간 처리를 위한 설정
|
|
self.is_processing = False
|
|
self.audio_queue = queue.Queue()
|
|
self.process_thread = None
|
|
self.callback = None
|
|
|
|
# VOSK 모델 오프라인 초기화
|
|
if self.model_provider.lower() == "vosk":
|
|
try:
|
|
self.vosk_model = Model(self.model_name)
|
|
except Exception as e:
|
|
print(f"VOSK 모델 로드 오류: {e}")
|
|
self.vosk_model = None
|
|
else:
|
|
self.vosk_model = None
|
|
|
|
def recognize(self, audio_data: np.ndarray) -> Optional[str]:
|
|
"""오디오 데이터를 텍스트로 변환합니다."""
|
|
if len(audio_data) == 0:
|
|
return None
|
|
|
|
temp_file_path = None
|
|
try:
|
|
# 임시 파일 생성
|
|
fd, temp_file_path = tempfile.mkstemp(suffix=".wav")
|
|
os.close(fd) # 파일 디스크립터 즉시 닫기
|
|
|
|
# WAV 파일 작성 (wave 모듈 사용)
|
|
# 쓰기 모드('wb')로 열었으므로 Wave_write 객체가 반환됩니다
|
|
wf = wave.open(temp_file_path, 'wb')
|
|
try:
|
|
wf.setnchannels(1) # 모노 채널
|
|
wf.setsampwidth(2) # 16-bit
|
|
wf.setframerate(16000) # 샘플레이트
|
|
|
|
# float32 데이터를 int16으로 변환
|
|
audio_data_int = (audio_data * 32767).astype(np.int16)
|
|
wf.writeframes(audio_data_int.tobytes())
|
|
finally:
|
|
wf.close() # 명시적으로 닫아줌
|
|
|
|
if self.model_provider.lower() == "vosk":
|
|
# VOSK 오프라인 모델 사용
|
|
if self.vosk_model is None:
|
|
print("VOSK 모델이 로드되지 않았습니다.")
|
|
return None
|
|
return self._recognize_with_vosk(temp_file_path)
|
|
elif self.model_provider.lower() == "huggingface" and self.hf_api_key:
|
|
# Hugging Face 모델을 사용하여 음성 인식
|
|
return self._recognize_with_huggingface(temp_file_path)
|
|
elif self.api_key:
|
|
# OpenAI Whisper API를 백업으로 사용
|
|
return self._recognize_with_openai(temp_file_path)
|
|
else:
|
|
print("사용 가능한 API 키가 없습니다.")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"음성 인식 오류: {e}")
|
|
return None
|
|
finally:
|
|
# 임시 파일 삭제 시도 (try-finally로 보장)
|
|
if temp_file_path and os.path.exists(temp_file_path):
|
|
try:
|
|
os.unlink(temp_file_path)
|
|
except Exception as e:
|
|
print(f"임시 파일 삭제 오류: {e}")
|
|
|
|
def _recognize_with_huggingface(self, file_path: str) -> Optional[str]:
|
|
"""Hugging Face API를 사용하여 음성 인식을 수행합니다."""
|
|
try:
|
|
# 사용할 모델 결정
|
|
model_to_use = self.model_name
|
|
max_retries = 5 # 최대 재시도 횟수를 5회로 늘림
|
|
|
|
# 파일 확장자에 따라 Content-Type 결정
|
|
content_type = "audio/wav" # 기본값은 WAV
|
|
if file_path.lower().endswith(".mp3"):
|
|
content_type = "audio/mpeg"
|
|
elif file_path.lower().endswith(".flac"):
|
|
content_type = "audio/flac"
|
|
|
|
# API 요청 URL (모델 로딩 대기 파라미터 추가)
|
|
api_url = f"https://api-inference.huggingface.co/models/{model_to_use}?wait_for_model=true"
|
|
|
|
# 파일 크기 확인 및 로그 출력
|
|
file_size = os.path.getsize(file_path)
|
|
print(f"[INFO] 파일 크기: {file_size} bytes, 파일 형식: {content_type}")
|
|
|
|
# 파일 로드
|
|
with open(file_path, "rb") as f:
|
|
audio_bytes = f.read()
|
|
|
|
# API 호출용 헤더
|
|
headers = {
|
|
"Authorization": f"Bearer {self.hf_api_key}",
|
|
"Content-Type": content_type # 올바른 Content-Type 설정
|
|
}
|
|
|
|
print(f"[INFO] 모델 {model_to_use}로 인식 시도")
|
|
|
|
# 첫 번째 시도: 설정된 모델 (Exponential Backoff 적용)
|
|
for attempt in range(1, max_retries + 1):
|
|
try:
|
|
# Exponential Backoff 대기 시간 계산 (첫 시도는 0초)
|
|
backoff_time = 0 if attempt == 1 else 2 ** (attempt - 2)
|
|
|
|
print(f"[INFO] 요청 {attempt}/{max_retries}: {model_to_use}" +
|
|
(f" (Backoff: {backoff_time}초 대기 후)" if backoff_time > 0 else ""))
|
|
|
|
# 첫 시도가 아니면 Exponential Backoff 대기
|
|
if backoff_time > 0:
|
|
time.sleep(backoff_time)
|
|
|
|
# 120초 타임아웃으로 요청 (모델 로딩 시간 고려)
|
|
response = requests.post(api_url, headers=headers, data=audio_bytes, timeout=120)
|
|
|
|
# 응답 상태 코드 확인
|
|
print(f"응답 상태 코드: {response.status_code}")
|
|
|
|
# 503 Service Unavailable - 모델 로딩 중 또는 서버 과부하
|
|
if response.status_code == 503:
|
|
print(f"[WARN] 서버 과부하 (503 Service Unavailable)")
|
|
print(f"응답 내용: {response.content[:200]}")
|
|
if attempt < max_retries:
|
|
print(f"Exponential Backoff: {2 ** (attempt - 1)}초 후 재시도...")
|
|
continue # 위에서 계산된 다음 Backoff 시간 적용
|
|
break
|
|
|
|
# 기타 HTTP 오류
|
|
elif response.status_code != 200:
|
|
print(f"[WARN] HTTP 오류: {response.status_code}")
|
|
print(f"응답 내용: {response.content[:200]}")
|
|
if attempt < max_retries:
|
|
print(f"{backoff_time}초 후 재시도...")
|
|
continue
|
|
break
|
|
|
|
# JSON 응답 파싱 시도
|
|
try:
|
|
result = response.json()
|
|
|
|
# 결과 확인
|
|
if isinstance(result, dict) and "text" in result:
|
|
return result["text"]
|
|
|
|
# text 키가 없는 경우 다른 형식 확인
|
|
if isinstance(result, list) and len(result) > 0:
|
|
if "generated_text" in result[0]:
|
|
return result[0]["generated_text"]
|
|
elif "text" in result[0]:
|
|
return result[0]["text"]
|
|
|
|
# 다른 결과 형식을 처리하기 위한 로그
|
|
print(f"알 수 없는 응답 구조: {result}")
|
|
|
|
except json.JSONDecodeError:
|
|
# JSON 디코딩 오류 (빈 응답이나 HTML이 온 경우)
|
|
print(f"[WARN] JSON 파싱 실패: HTTP {response.status_code}, 내용 길이={len(response.content)}")
|
|
print(f"응답 내용 일부: {response.content[:200]}")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"[WARN] 요청 실패: {e}")
|
|
|
|
# 마지막 시도가 아니고 다른 오류인 경우 짧은 대기 후 재시도
|
|
if attempt < max_retries and response.status_code != 503:
|
|
print(f"1초 후 재시도...")
|
|
time.sleep(1)
|
|
|
|
print("설정된 모델로 모든 시도 실패")
|
|
|
|
# 두 번째 시도: 다른 한국어 모델을 하나씩 시도
|
|
for korean_model in self.korean_models:
|
|
if korean_model == model_to_use:
|
|
continue # 이미 시도한 모델은 건너뜀
|
|
|
|
print(f"[INFO] 대체 모델 {korean_model}로 시도")
|
|
|
|
# 대체 모델 API URL
|
|
alt_api_url = f"https://api-inference.huggingface.co/models/{korean_model}?wait_for_model=true"
|
|
|
|
for attempt in range(1, max_retries + 1):
|
|
try:
|
|
# Exponential Backoff 대기 시간 계산 (첫 시도는 0초)
|
|
backoff_time = 0 if attempt == 1 else 2 ** (attempt - 2)
|
|
|
|
print(f"[INFO] 대체 모델 시도 {attempt}/{max_retries}: {korean_model}" +
|
|
(f" (Backoff: {backoff_time}초 대기 후)" if backoff_time > 0 else ""))
|
|
|
|
# 첫 시도가 아니면 Exponential Backoff 대기
|
|
if backoff_time > 0:
|
|
time.sleep(backoff_time)
|
|
|
|
# 120초 타임아웃으로 요청
|
|
response = requests.post(alt_api_url, headers=headers, data=audio_bytes, timeout=120)
|
|
|
|
# 응답 상태 코드 확인
|
|
print(f"응답 상태 코드: {response.status_code}")
|
|
|
|
# 503 Service Unavailable - 모델 로딩 중 또는 서버 과부하
|
|
if response.status_code == 503:
|
|
print(f"[WARN] 서버 과부하 (503 Service Unavailable)")
|
|
print(f"응답 내용: {response.content[:200]}")
|
|
if attempt < max_retries:
|
|
print(f"Exponential Backoff: {2 ** (attempt - 1)}초 후 재시도...")
|
|
continue # 위에서 계산된 다음 Backoff 시간 적용
|
|
break
|
|
|
|
# 기타 HTTP 오류
|
|
elif response.status_code != 200:
|
|
print(f"[WARN] HTTP 오류: {response.status_code}")
|
|
print(f"응답 내용: {response.content[:200]}")
|
|
if attempt < max_retries:
|
|
print(f"{backoff_time}초 후 재시도...")
|
|
continue
|
|
break
|
|
|
|
# JSON 응답 파싱 시도
|
|
try:
|
|
result = response.json()
|
|
|
|
# 결과 확인
|
|
if isinstance(result, dict) and "text" in result:
|
|
return result["text"]
|
|
|
|
if isinstance(result, list) and len(result) > 0:
|
|
if "generated_text" in result[0]:
|
|
return result[0]["generated_text"]
|
|
elif "text" in result[0]:
|
|
return result[0]["text"]
|
|
|
|
except json.JSONDecodeError:
|
|
print(f"[WARN] 대체 모델 JSON 파싱 실패: HTTP {response.status_code}")
|
|
print(f"응답 내용 일부: {response.content[:200]}")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"[WARN] 대체 모델 요청 실패: {e}")
|
|
|
|
# 마지막 시도가 아니고 다른 오류인 경우 짧은 대기 후 재시도
|
|
if attempt < max_retries and response.status_code != 503:
|
|
print(f"1초 후 재시도...")
|
|
time.sleep(1)
|
|
|
|
print("모든 모델 시도 실패")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Hugging Face API 호출 오류: {e}")
|
|
return None
|
|
|
|
def _recognize_with_vosk(self, file_path: str) -> Optional[str]:
|
|
"""VOSK 오프라인 모델로 음성 인식을 수행합니다."""
|
|
try:
|
|
wf = wave.open(file_path, "rb")
|
|
except Exception as e:
|
|
print(f"VOSK 오디오 파일 열기 오류: {e}")
|
|
return None
|
|
if not self.vosk_model:
|
|
print("VOSK 모델이 로드되지 않았습니다.")
|
|
return None
|
|
rec = KaldiRecognizer(self.vosk_model, wf.getframerate())
|
|
text = ""
|
|
while True:
|
|
data = wf.readframes(4000)
|
|
if len(data) == 0:
|
|
break
|
|
if rec.AcceptWaveform(data):
|
|
res = json.loads(rec.Result())
|
|
text += res.get("text", "")
|
|
final_res = json.loads(rec.FinalResult())
|
|
text += final_res.get("text", "")
|
|
wf.close()
|
|
return text if text else None
|
|
|
|
def _recognize_with_openai(self, file_path: str) -> Optional[str]:
|
|
"""OpenAI Whisper API를 사용하여 음성 인식을 수행합니다. (백업 방식)"""
|
|
try:
|
|
with open(file_path, "rb") as audio_file:
|
|
try:
|
|
client = openai.OpenAI(api_key=self.api_key)
|
|
result = client.audio.transcriptions.create(
|
|
model="whisper-1", # 기본 모델
|
|
file=audio_file,
|
|
language="ko",
|
|
response_format="text"
|
|
)
|
|
text_result = result if isinstance(result, str) else result.text
|
|
return text_result
|
|
except Exception as api_error:
|
|
print(f"OpenAI API 호출 오류: {api_error}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"OpenAI API 파일 로드 오류: {e}")
|
|
return None
|
|
|
|
def recognize_file(self, file_path: str) -> Optional[str]:
|
|
"""오디오 파일을 텍스트로 변환합니다."""
|
|
if not os.path.exists(file_path):
|
|
print(f"파일이 존재하지 않습니다: {file_path}")
|
|
return None
|
|
|
|
try:
|
|
if self.model_provider.lower() == "vosk":
|
|
# VOSK 오프라인 모델 사용
|
|
if self.vosk_model is None:
|
|
print("VOSK 모델이 로드되지 않았습니다.")
|
|
return None
|
|
return self._recognize_with_vosk(file_path)
|
|
elif self.model_provider.lower() == "huggingface" and self.hf_api_key:
|
|
# Hugging Face 모델을 사용하여 음성 인식
|
|
return self._recognize_with_huggingface(file_path)
|
|
elif self.api_key:
|
|
# OpenAI Whisper API를 백업으로 사용
|
|
return self._recognize_with_openai(file_path)
|
|
else:
|
|
print("사용 가능한 API 키가 없습니다.")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"파일 음성 인식 오류: {e}")
|
|
return None
|
|
|
|
def start_realtime_recognition(self, result_callback: Callable[[str], None]):
|
|
"""실시간 음성 인식을 시작합니다."""
|
|
if self.is_processing:
|
|
return
|
|
|
|
self.is_processing = True
|
|
self.callback = result_callback
|
|
|
|
# 오디오 처리 스레드 시작
|
|
self.process_thread = threading.Thread(target=self._process_audio_queue)
|
|
self.process_thread.daemon = True
|
|
self.process_thread.start()
|
|
|
|
def stop_realtime_recognition(self):
|
|
"""실시간 음성 인식을 중지합니다."""
|
|
self.is_processing = False
|
|
|
|
# 큐 비우기
|
|
while not self.audio_queue.empty():
|
|
try:
|
|
self.audio_queue.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
|
|
self.process_thread = None
|
|
|
|
def add_audio_data(self, audio_data: np.ndarray):
|
|
"""오디오 큐에 데이터를 추가합니다."""
|
|
if self.is_processing:
|
|
self.audio_queue.put(audio_data)
|
|
|
|
def _process_audio_queue(self):
|
|
"""백그라운드에서 오디오 큐의 데이터를 처리합니다."""
|
|
while self.is_processing:
|
|
try:
|
|
# 큐에서 오디오 데이터 가져오기 (1초 타임아웃)
|
|
audio_data = self.audio_queue.get(timeout=1.0)
|
|
|
|
# 오디오 데이터 변환
|
|
result = self.recognize(audio_data)
|
|
|
|
# 결과가 있으면 콜백 호출
|
|
if result and self.callback:
|
|
self.callback(result)
|
|
|
|
except queue.Empty:
|
|
# 타임아웃 - 계속 진행
|
|
time.sleep(0.1)
|
|
continue
|
|
except Exception as e:
|
|
print(f"오디오 처리 오류: {e}")
|
|
time.sleep(0.5) # 오류 시 잠시 대기
|
|
|
|
def set_api_key(self, api_key: str):
|
|
"""OpenAI API 키를 설정합니다."""
|
|
self.api_key = api_key
|
|
openai.api_key = api_key
|
|
|
|
# config.ini 파일 업데이트
|
|
self.config.set("api", "openai_api_key", api_key)
|
|
try:
|
|
with open("config.ini", "w") as config_file:
|
|
self.config.write(config_file)
|
|
except Exception as e:
|
|
print(f"설정 파일 저장 오류: {e}")
|
|
|
|
def set_huggingface_api_key(self, api_key: str):
|
|
"""Hugging Face API 키를 설정합니다."""
|
|
self.hf_api_key = api_key
|
|
|
|
# config.ini 파일 업데이트
|
|
self.config.set("api", "huggingface_api_key", api_key)
|
|
try:
|
|
with open("config.ini", "w") as config_file:
|
|
self.config.write(config_file)
|
|
except Exception as e:
|
|
print(f"설정 파일 저장 오류: {e}")
|
|
|
|
def set_model(self, provider: str, model_name: str):
|
|
"""음성 인식 모델을 설정합니다."""
|
|
self.model_provider = provider
|
|
self.model_name = model_name
|
|
|
|
# config.ini 파일 업데이트
|
|
if not self.config.has_section("model"):
|
|
self.config.add_section("model")
|
|
|
|
self.config.set("model", "provider", provider)
|
|
self.config.set("model", "name", model_name)
|
|
|
|
try:
|
|
with open("config.ini", "w") as config_file:
|
|
self.config.write(config_file)
|
|
except Exception as e:
|
|
print(f"설정 파일 저장 오류: {e}") |