first commit

This commit is contained in:
Envy_PC 2025-05-02 16:30:53 +09:00
commit 6f4309a616
15 changed files with 3347 additions and 0 deletions

13
.gitignore vendored Normal file
View File

@ -0,0 +1,13 @@
.venv/
__pycache__/
*.pyc
*.pyo
*.pyd
*.pyw
*.pyz
*.log
*.log.*
*.log.*.*
*.log.*.*.*
*.log.*.*.*.*
*.log.*.*.*.*.*

BIN
Pline1.db Normal file

Binary file not shown.

82
README.md Normal file
View File

@ -0,0 +1,82 @@
# TRNote - 철도 음성인식 시스템
TRNote는 철도 관제사와 기관사의 음성 대화를 인식하고 분석하는 시스템입니다.
## 주요 기능
- 다양한 음원 소스(윈도우 사운드 출력, 스피커, 마이크, LINE-IN 등)에서 오디오 캡처
- OpenAI Whisper API를 사용한 음성-텍스트 변환
- 관제사/기관사 화자 구분 및 대화 분석
- 전동차 편성 정보 및 고장 이력 조회
- 대화 내용을 기반으로 관련 데이터 검색 및 표시
## 시스템 요구사항
- Windows 10 이상
- Python 3.8 이상
- OpenAI API 키
- Poetry (패키지 관리)
## 설치 방법
### Poetry 설치
```
pip install poetry
```
### 프로젝트 설치
1. 저장소 클론
```
git clone https://github.com/username/TRNote.git
cd TRNote
```
2. Poetry를 사용하여 의존성 설치
```
poetry install
```
3. OpenAI API 키 설정
```
# Windows
set OPENAI_API_KEY=your_api_key_here
# Linux/MacOS
export OPENAI_API_KEY=your_api_key_here
```
## 실행 방법
Poetry 환경에서 실행:
```
poetry run trnote
```
또는 직접 실행:
```
poetry run python main.py
```
## 사용 방법
1. 음원 소스 선택 드롭다운에서 오디오 소스 선택
2. '시작' 버튼 클릭하여 음성 인식 시작
3. 인식된 대화는 왼쪽 대화창에 표시됨
4. 관련 열차 및 고장 정보는 오른쪽 패널에 표시됨
5. 과거 대화는 왼쪽 상단 목록에서 선택하여 볼 수 있음
## 구조
- `main.py`: 애플리케이션 메인 진입점
- `modules/`: 각 기능별 모듈
- `audio_source.py`: 오디오 소스 관리
- `speech_recognition.py`: 음성 인식 처리
- `conversation_analyzer.py`: 대화 분석 및 화자 구분
- `database_manager.py`: 데이터베이스 관리
- `gui_components.py`: GUI 컴포넌트
## 라이선스
MIT

12
config.ini Normal file
View File

@ -0,0 +1,12 @@
[api]
openai_api_key = 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
[audio]
sample_rate = 16000
silence_threshold = 0.02
silence_duration = 60
buffer_duration = 3
[app]
theme = light

493
main.py Normal file
View File

@ -0,0 +1,493 @@
import flet as ft
import configparser
import os
import numpy as np
import wave
import tempfile
from modules.audio_source import AudioSource
from modules.speech_recognition import SpeechRecognizer
from modules.conversation_analyzer import ConversationAnalyzer
from modules.database_manager import DatabaseManager
from modules.gui_components import (
AudioSourceSelector,
StatusIndicator,
ConversationView,
PreviousConversationsList,
TrainInfoPanel
)
def main(page: ft.Page):
# 설정 파일 로드
config = configparser.ConfigParser()
# 기본 설정 생성
if not os.path.exists("config.ini"):
config["api"] = {
"openai_api_key": "your_openai_api_key_here"
}
config["audio"] = {
"sample_rate": "16000",
"silence_threshold": "0.02",
"silence_duration": "60",
"buffer_duration": "3"
}
config["app"] = {
"theme": "light"
}
with open("config.ini", "w") as configfile:
config.write(configfile)
else:
config.read("config.ini")
# 테마 설정
theme = config.get("app", "theme", fallback="light")
page.theme_mode = ft.ThemeMode.LIGHT if theme.lower() == "light" else ft.ThemeMode.DARK
page.title = "철도 음성인식 시스템"
page.window_width = 1200
page.window_height = 800
page.padding = 10
# 인스턴스 생성
audio_source = AudioSource()
speech_recognizer = SpeechRecognizer()
conversation_analyzer = ConversationAnalyzer()
db_manager = DatabaseManager()
# 테스트 모드 상태
is_test_mode = False
# GUI 컴포넌트 생성
audio_source_selector = AudioSourceSelector(audio_source)
status_indicator = StatusIndicator()
conversation_view = ConversationView()
previous_conversations_list = PreviousConversationsList(conversation_view)
train_info_panel = TrainInfoPanel(db_manager)
# 실시간 처리 상태 표시기
realtime_indicator = ft.Text("", italic=True, color=ft.colors.GREY_600)
# 파일 선택 콜백 함수 정의
def on_file_picked(e):
if e.files and len(e.files) > 0:
file_path = e.files[0].path
test_status.value = f"선택된 파일: {os.path.basename(file_path)}"
page.update()
# 선택된 파일 처리
process_audio_file(file_path)
# 테스트 모드 컨트롤
test_mode_switch = ft.Switch(label="테스트 모드", value=False, on_change=lambda e: toggle_test_mode(e.control.value))
file_picker = ft.FilePicker(on_result=on_file_picked)
page.overlay.append(file_picker)
test_file_button = ft.ElevatedButton(
"MP3 파일 선택",
icon=ft.icons.UPLOAD_FILE,
on_click=lambda _: file_picker.pick_files(
allow_multiple=False,
allowed_extensions=["mp3", "wav"]
),
disabled=True
)
# 테스트 모드 상태 표시
test_status = ft.Text("", italic=True, color=ft.colors.ORANGE)
# 레이아웃 구성
left_panel = ft.Column([
ft.Container(
content=previous_conversations_list,
height=320,
border=ft.border.all(1, ft.colors.GREY_400),
border_radius=5,
padding=5
),
ft.Container(
content=conversation_view,
height=480,
border=ft.border.all(1, ft.colors.GREY_400),
border_radius=5,
padding=5,
expand=True
),
], expand=True)
right_panel = ft.Container(
content=train_info_panel,
border=ft.border.all(1, ft.colors.GREY_400),
border_radius=5,
padding=10,
expand=True
)
# 상단 컨트롤에 설정 버튼 추가
top_controls = ft.Row([
audio_source_selector,
status_indicator,
ft.ElevatedButton("시작", on_click=lambda e: start_monitoring()),
ft.ElevatedButton("중지", on_click=lambda e: stop_monitoring()),
ft.IconButton(
icon=ft.icons.SETTINGS,
tooltip="설정",
on_click=lambda e: show_settings_dialog()
),
])
# 테스트 모드 컨트롤
test_controls = ft.Row([
test_mode_switch,
test_file_button,
test_status
], visible=True)
# 메인 레이아웃에 실시간 인디케이터 추가
main_layout = ft.Column([
top_controls,
test_controls,
realtime_indicator,
ft.Row([
left_panel,
right_panel,
], expand=True),
], expand=True)
page.add(main_layout)
# 테스트 모드 토글 함수
def toggle_test_mode(value):
nonlocal is_test_mode
is_test_mode = value
test_file_button.disabled = not value
if value:
test_status.value = "테스트 모드: 활성화됨"
audio_source_selector.disabled = True
else:
test_status.value = "테스트 모드: 비활성화됨"
audio_source_selector.disabled = False
page.update()
# 오디오 파일 처리
def process_audio_file(file_path):
try:
status_indicator.set_status("파일 처리 중")
status_indicator.set_detecting(True)
page.update()
# MP3 또는 WAV 파일 처리
if file_path.lower().endswith('.mp3'):
# MP3를 WAV로 변환 (librosa 사용)
import librosa
# MP3 파일 로드
audio_data, sample_rate = librosa.load(file_path, sr=16000, mono=True)
# WAV 파일로 변환
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file:
temp_wav_path = temp_file.name
# 16비트 정수로 변환
audio_data_int = (audio_data * 32767).astype(np.int16)
# WAV 파일 저장 - 쓰기 모드('wb')로 열기
with wave.open(temp_wav_path, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2) # 16-bit
wf.setframerate(16000)
wf.writeframes(audio_data_int.tobytes())
# OpenAI의 API를 사용해 음성 인식
text = speech_recognizer.recognize_file(temp_wav_path)
# 임시 파일 삭제
os.unlink(temp_wav_path)
else:
# WAV 파일 직접 처리
text = speech_recognizer.recognize_file(file_path)
# 인식 결과 처리
if text:
realtime_indicator.value = f"파일 인식 결과: {text[:100]}..."
speakers = conversation_analyzer.identify_speakers(text)
context = conversation_analyzer.analyze_conversation(text, speakers)
conversation_view.update_conversation(text, speakers)
previous_conversations_list.add_conversation(context)
# 데이터베이스에서 관련 정보 검색
train_info = db_manager.get_train_info(context)
fault_info = db_manager.get_fault_info(context)
train_info_panel.update_info(train_info, fault_info)
else:
realtime_indicator.value = "파일 인식 실패: 텍스트를 추출할 수 없습니다."
status_indicator.set_detecting(False)
status_indicator.set_status("대기 중")
page.update()
except Exception as e:
print(f"오디오 파일 처리 오류: {e}")
realtime_indicator.value = f"오류: {str(e)}"
status_indicator.set_detecting(False)
status_indicator.set_status("오류")
page.update()
# 음원 감시 관련 함수
def start_monitoring():
if is_test_mode:
# 테스트 모드에서는 파일 선택을 유도
file_picker.pick_files(
allow_multiple=False,
allowed_extensions=["mp3", "wav"]
)
return
try:
selected_source = audio_source_selector.get_selected_source()
if not selected_source:
page.snack_bar = ft.SnackBar(ft.Text("음원 소스를 선택해주세요"))
page.snack_bar.open = True
page.update()
return
status_indicator.set_status("녹음 중")
# 일반 및 실시간 콜백 함수 전달
audio_source.start_recording(selected_source, on_audio_data, on_realtime_audio_data)
page.update()
except Exception as e:
print(f"녹음 시작 오류: {e}")
page.snack_bar = ft.SnackBar(ft.Text(f"녹음 시작 오류: {e}"))
page.snack_bar.open = True
page.update()
def stop_monitoring():
try:
audio_source.stop_recording()
# 실시간 인식 중지
if hasattr(speech_recognizer, 'stop_realtime_recognition'):
speech_recognizer.stop_realtime_recognition()
status_indicator.set_status("대기 중")
realtime_indicator.value = ""
page.update()
except Exception as e:
print(f"녹음 중지 오류: {e}")
def on_audio_data(audio_data):
"""완성된 오디오 데이터 처리 콜백"""
try:
if len(audio_data) == 0:
return
status_indicator.set_detecting(True)
page.update()
text = speech_recognizer.recognize(audio_data)
if text:
speakers = conversation_analyzer.identify_speakers(text)
context = conversation_analyzer.analyze_conversation(text, speakers)
conversation_view.update_conversation(text, speakers)
previous_conversations_list.add_conversation(context)
# 데이터베이스에서 관련 정보 검색
train_info = db_manager.get_train_info(context)
fault_info = db_manager.get_fault_info(context)
train_info_panel.update_info(train_info, fault_info)
status_indicator.set_detecting(False)
page.update()
except Exception as e:
print(f"오디오 데이터 처리 오류: {e}")
status_indicator.set_detecting(False)
page.update()
def on_realtime_audio_data(audio_data):
"""실시간 오디오 데이터 처리 콜백"""
try:
if len(audio_data) == 0:
return
status_indicator.set_detecting(True)
# 실시간 인식 결과 처리 콜백
def on_realtime_result(text):
if text:
realtime_indicator.value = f"실시간 인식 중: {text[:100]}..."
page.update()
# 실시간 처리 지원 확인
if hasattr(speech_recognizer, 'add_audio_data') and hasattr(speech_recognizer, 'start_realtime_recognition'):
# 큐에 오디오 데이터 추가
speech_recognizer.add_audio_data(audio_data)
# 처리가 시작되지 않았으면 시작
if hasattr(speech_recognizer, 'is_processing') and not speech_recognizer.is_processing:
speech_recognizer.start_realtime_recognition(on_realtime_result)
else:
# 실시간 변환을 지원하지 않는 경우 일반 변환 사용
text = speech_recognizer.recognize(audio_data)
if text:
realtime_indicator.value = f"감지: {text[:100]}..."
page.update()
except Exception as e:
print(f"실시간 오디오 처리 오류: {e}")
def show_settings_dialog():
"""설정 대화상자 표시"""
try:
# API 키 입력 필드
api_key_field = ft.TextField(
label="OpenAI API 키",
value=speech_recognizer.api_key,
password=True,
width=400
)
# 오디오 설정 슬라이더
silence_threshold_slider = ft.Slider(
min=0.01,
max=0.1,
divisions=9,
value=float(config.get("audio", "silence_threshold", fallback="0.02")),
label="{value}",
on_change=lambda e: update_slider_label(e)
)
silence_threshold_text = ft.Text(f"소리 감지 임계값: {silence_threshold_slider.value}")
silence_duration_slider = ft.Slider(
min=10,
max=120,
divisions=11,
value=float(config.get("audio", "silence_duration", fallback="60")),
label="{value}",
on_change=lambda e: update_duration_label(e)
)
silence_duration_text = ft.Text(f"대화 구분 시간(초): {silence_duration_slider.value}")
buffer_duration_slider = ft.Slider(
min=1,
max=10,
divisions=9,
value=float(config.get("audio", "buffer_duration", fallback="3")),
label="{value}",
on_change=lambda e: update_buffer_label(e)
)
buffer_duration_text = ft.Text(f"버퍼 길이(초): {buffer_duration_slider.value}")
# 슬라이더 레이블 업데이트 함수
def update_slider_label(e):
silence_threshold_text.value = f"소리 감지 임계값: {e.control.value:.2f}"
page.update()
def update_duration_label(e):
silence_duration_text.value = f"대화 구분 시간(초): {e.control.value:.0f}"
page.update()
def update_buffer_label(e):
buffer_duration_text.value = f"버퍼 길이(초): {e.control.value:.0f}"
page.update()
# 테마 선택 라디오 버튼
theme_radio = ft.RadioGroup(
content=ft.Column([
ft.Radio(value="light", label="밝은 테마"),
ft.Radio(value="dark", label="어두운 테마"),
]),
value=theme
)
# 설정 저장 함수
def save_settings(e):
try:
# API 키 저장
if api_key_field.value:
speech_recognizer.set_api_key(api_key_field.value)
# 오디오 설정 저장
if hasattr(audio_source, 'update_settings'):
audio_source.update_settings(
silence_threshold=silence_threshold_slider.value,
silence_duration=silence_duration_slider.value,
buffer_duration=buffer_duration_slider.value
)
else:
# 직접 속성 설정
audio_source.silence_threshold = silence_threshold_slider.value
audio_source.silence_duration = silence_duration_slider.value
audio_source.buffer_duration = buffer_duration_slider.value
# 설정 저장
config.set("audio", "silence_threshold", str(silence_threshold_slider.value))
config.set("audio", "silence_duration", str(silence_duration_slider.value))
config.set("audio", "buffer_duration", str(buffer_duration_slider.value))
# 테마 설정 저장
config.set("app", "theme", theme_radio.value)
with open("config.ini", "w") as config_file:
config.write(config_file)
# 테마 적용
page.theme_mode = ft.ThemeMode.LIGHT if theme_radio.value.lower() == "light" else ft.ThemeMode.DARK
# 대화상자 닫기
dialog.open = False
page.update()
# 설정 적용 알림
page.snack_bar = ft.SnackBar(ft.Text("설정이 저장되었습니다"))
page.snack_bar.open = True
page.update()
except Exception as e:
print(f"설정 저장 오류: {e}")
page.snack_bar = ft.SnackBar(ft.Text(f"설정 저장 오류: {e}"))
page.snack_bar.open = True
page.update()
# 설정 대화상자
dialog = ft.AlertDialog(
title=ft.Text("설정"),
content=ft.Column([
ft.Text("OpenAI API 설정", weight=ft.FontWeight.BOLD),
api_key_field,
ft.Divider(),
ft.Text("오디오 설정", weight=ft.FontWeight.BOLD),
silence_threshold_text,
silence_threshold_slider,
silence_duration_text,
silence_duration_slider,
buffer_duration_text,
buffer_duration_slider,
ft.Divider(),
ft.Text("앱 설정", weight=ft.FontWeight.BOLD),
ft.Text("테마"),
theme_radio,
], scroll=ft.ScrollMode.AUTO, height=500),
actions=[
ft.TextButton("취소", on_click=lambda e: setattr(dialog, "open", False)),
ft.TextButton("저장", on_click=save_settings),
],
actions_alignment=ft.MainAxisAlignment.END,
)
# 대화상자 표시
page.dialog = dialog
dialog.open = True
page.update()
except Exception as e:
print(f"설정 대화상자 표시 오류: {e}")
if __name__ == "__main__":
ft.app(target=main)

5
modules/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""
TRNote - 철도 음성인식 시스템
"""
__version__ = "0.1.0"

271
modules/audio_source.py Normal file
View File

@ -0,0 +1,271 @@
import sounddevice as sd
import numpy as np
import threading
import time
import configparser
import os
from typing import Callable, List, Dict, Any
class AudioSource:
def __init__(self, config_path="config.ini"):
# 설정 파일 로드
self.config = configparser.ConfigParser()
if os.path.exists(config_path):
self.config.read(config_path)
self.is_recording = False
self.stream = None
self.callback = None
self.realtime_callback = None
self.thread = None
# 설정 파일에서 오디오 설정 읽기
self.silence_threshold = self.config.getfloat("audio", "silence_threshold", fallback=0.02)
self.silence_duration = self.config.getfloat("audio", "silence_duration", fallback=60)
self.buffer_duration = self.config.getfloat("audio", "buffer_duration", fallback=3)
self.sample_rate = self.config.getint("audio", "sample_rate", fallback=16000)
self.last_sound_time = 0
self.buffer = np.array([])
self.realtime_buffer = np.array([])
self.realtime_buffer_size = int(self.sample_rate * 2) # 2초 버퍼
self.last_realtime_process = 0
self.realtime_interval = 2.0 # 2초마다 실시간 처리
def get_available_sources(self) -> List[Dict[str, Any]]:
"""사용 가능한 모든 오디오 소스 목록을 반환합니다."""
try:
devices = sd.query_devices()
sources = []
for i, device in enumerate(devices):
if isinstance(device, dict) and device.get('max_input_channels', 0) > 0:
sources.append({
'id': i,
'name': device['name'],
'channels': device['max_input_channels'],
'type': self._determine_device_type(device['name'])
})
# 윈도우 사운드 출력 캡처 옵션 추가
sources.append({
'id': 'wasapi_loopback',
'name': '시스템 소리 출력 (WASAPI Loopback)',
'channels': 2,
'type': 'system_output'
})
return sources
except Exception as e:
print(f"오디오 소스 목록 가져오기 오류: {e}")
# 기본 소스라도 반환
return [{
'id': 0,
'name': '기본 마이크',
'channels': 1,
'type': 'microphone'
}]
def _determine_device_type(self, name: str) -> str:
"""장치 이름을 기반으로 장치 유형을 결정합니다."""
name_lower = name.lower()
if 'mic' in name_lower or 'microphone' in name_lower:
return 'microphone'
elif 'line' in name_lower and 'in' in name_lower:
return 'line_in'
elif 'speaker' in name_lower or 'headphone' in name_lower:
return 'speaker'
else:
return 'unknown'
def start_recording(self, source_id, callback: Callable, realtime_callback: Callable = None):
"""오디오 소스 녹음을 시작합니다."""
if self.is_recording:
self.stop_recording()
self.callback = callback
self.realtime_callback = realtime_callback
self.is_recording = True
self.buffer = np.array([])
self.realtime_buffer = np.array([])
self.last_sound_time = time.time()
self.last_realtime_process = time.time()
try:
# WASAPI 루프백일 경우 특별 처리
if source_id == 'wasapi_loopback':
try:
device_info = sd.query_devices()
device_idx = None
# 출력 장치 찾기
for i, device in enumerate(device_info):
if isinstance(device, dict) and device.get('hostapi', None) == 0 and device.get('max_output_channels', 0) > 0:
device_idx = i
break
if device_idx is None:
device_idx = sd.default.device[1] # 기본 출력 장치
self.stream = sd.InputStream(
device=f'{device_idx}:loopback',
samplerate=self.sample_rate,
channels=2,
callback=self._audio_callback
)
except Exception as e:
print(f"WASAPI 루프백 설정 오류: {e}")
# 기본 장치로 대체
self.stream = sd.InputStream(
samplerate=self.sample_rate,
channels=1,
callback=self._audio_callback
)
else:
# 일반 입력 장치
try:
self.stream = sd.InputStream(
device=source_id,
samplerate=self.sample_rate,
channels=1,
callback=self._audio_callback
)
except Exception as e:
print(f"입력 장치 설정 오류: {e}")
# 기본 장치로 대체
self.stream = sd.InputStream(
samplerate=self.sample_rate,
channels=1,
callback=self._audio_callback
)
self.stream.start()
# 침묵 감지 스레드 시작
self.thread = threading.Thread(target=self._silence_detection_thread)
self.thread.daemon = True
self.thread.start()
except Exception as e:
print(f"녹음 시작 오류: {e}")
self.is_recording = False
def stop_recording(self):
"""녹음을 중지합니다."""
self.is_recording = False
if self.stream:
try:
self.stream.stop()
self.stream.close()
except Exception as e:
print(f"녹음 중지 오류: {e}")
finally:
self.stream = None
self.thread = None
def _audio_callback(self, indata, frames, time, status):
"""오디오 데이터가 들어올 때 호출되는 콜백 함수입니다."""
if status:
print(f"상태: {status}")
try:
# 모노로 변환 (필요한 경우)
if indata.shape[1] > 1:
audio_data = np.mean(indata, axis=1)
else:
audio_data = indata[:, 0]
# 음량 레벨 계산
volume_norm = np.linalg.norm(audio_data) / len(audio_data)
# 소리가 감지된 경우
if volume_norm > self.silence_threshold:
self.last_sound_time = time.time()
# 메인 버퍼에 데이터 추가
if len(self.buffer) == 0:
self.buffer = audio_data
else:
self.buffer = np.append(self.buffer, audio_data)
# 실시간 버퍼에 데이터 추가
if len(self.realtime_buffer) == 0:
self.realtime_buffer = audio_data
else:
self.realtime_buffer = np.append(self.realtime_buffer, audio_data)
# 실시간 버퍼가 최대 크기를 초과하면 오래된 데이터 제거
if len(self.realtime_buffer) > self.realtime_buffer_size:
self.realtime_buffer = self.realtime_buffer[-self.realtime_buffer_size:]
# 실시간 처리 (일정 시간 간격으로)
current_time = time.time()
if (current_time - self.last_realtime_process) > self.realtime_interval and self.realtime_callback:
self.last_realtime_process = current_time
# 데이터 복사 후 처리
realtime_data = self.realtime_buffer.copy()
if self.realtime_callback:
# 별도 쓰레드에서 실행하지 않고 직접 콜백 호출
self.realtime_callback(realtime_data)
# 메인 버퍼가 최대 길이를 초과하면 콜백 호출 및 버퍼 초기화
max_buffer_size = self.sample_rate * self.buffer_duration
if len(self.buffer) >= max_buffer_size:
if self.callback:
buffer_copy = self.buffer.copy()
self.callback(buffer_copy)
self.buffer = np.array([])
except Exception as e:
print(f"오디오 처리 오류: {e}")
def _silence_detection_thread(self):
"""침묵 감지 스레드입니다. 침묵이 일정 시간 이상 지속되면 현재 버퍼의 데이터를 처리합니다."""
while self.is_recording:
try:
current_time = time.time()
# 현재 버퍼에 데이터가 있고 일정 시간 동안 소리가 없는 경우
if len(self.buffer) > 0 and (current_time - self.last_sound_time) > 1.0:
if self.callback:
buffer_copy = self.buffer.copy()
self.callback(buffer_copy)
self.buffer = np.array([])
# 실시간 버퍼도 처리하고 초기화
if self.realtime_callback and len(self.realtime_buffer) > 0:
realtime_data = self.realtime_buffer.copy()
self.realtime_callback(realtime_data)
self.realtime_buffer = np.array([])
# 장시간 침묵이 지속되면 새로운 대화로 간주
if (current_time - self.last_sound_time) > self.silence_duration:
# 새로운 대화 시작을 알림
pass
time.sleep(0.5) # 스레드 부하 감소
except Exception as e:
print(f"침묵 감지 오류: {e}")
time.sleep(1) # 오류 시 더 긴 대기
def update_settings(self, silence_threshold=None, silence_duration=None, buffer_duration=None):
"""오디오 설정을 업데이트합니다."""
if silence_threshold is not None:
self.silence_threshold = silence_threshold
self.config.set("audio", "silence_threshold", str(silence_threshold))
if silence_duration is not None:
self.silence_duration = silence_duration
self.config.set("audio", "silence_duration", str(silence_duration))
if buffer_duration is not None:
self.buffer_duration = buffer_duration
self.config.set("audio", "buffer_duration", str(buffer_duration))
# 설정 파일 저장
with open("config.ini", "w") as config_file:
self.config.write(config_file)

View File

@ -0,0 +1,233 @@
import re
import time
import os
from typing import Dict, List, Tuple, Any, Optional
from datetime import datetime
class ConversationAnalyzer:
def __init__(self):
# 대화 구분을 위한 패턴
self.controller_patterns = [
r"전철\s*(?:신평|보안)",
r"관제\s*(?:\d+)?",
]
self.driver_patterns = [
r"(\d{4})\s*열차\s*(?:(\d+)\s*편성)?",
r"다대\s*회차선\s*(\d{4})\s*열차\s*(?:(\d+)\s*편성)?",
]
# 현재 및 최근 대화 저장
self.current_conversation = {
"id": "",
"start_time": None,
"last_update": None,
"train_number": None,
"train_formation": None,
"speakers": [],
"messages": [],
"faults": []
}
self.conversations = []
self.max_conversation_history = 50
# 로그 디렉토리 생성
self.log_dir = "conversation_logs"
try:
if not os.path.exists(self.log_dir):
os.makedirs(self.log_dir)
except Exception as e:
print(f"로그 디렉토리 생성 오류: {e}")
def identify_speakers(self, text: str) -> Dict[str, str]:
"""텍스트에서 화자를 식별합니다."""
result = {"role": "unknown", "name": "알 수 없음"}
# 관제사 패턴 확인
for pattern in self.controller_patterns:
match = re.search(pattern, text)
if match:
result["role"] = "controller"
result["name"] = "관제사"
break
# 기관사 패턴 확인
for pattern in self.driver_patterns:
match = re.search(pattern, text)
if match:
result["role"] = "driver"
result["name"] = "기관사"
# 열차 번호와 편성 추출
train_number = match.group(1) if match.groups() and len(match.groups()) > 0 else None
train_formation = match.group(2) if match.groups() and len(match.groups()) > 1 else None
if train_number:
result["train_number"] = train_number
if train_formation:
result["train_formation"] = train_formation
break
return result
def identify_faults(self, text: str) -> List[str]:
"""텍스트에서 고장 관련 키워드를 식별합니다."""
fault_patterns = [
r"HVAC",
r"배기팬",
r"고장",
r"장애",
r"오류",
r"문제",
r"에러",
r"점검",
r"이상",
]
found_faults = []
for pattern in fault_patterns:
if re.search(pattern, text, re.IGNORECASE):
found_faults.append(pattern)
return found_faults
def analyze_conversation(self, text: str, speaker_info: Dict[str, str]) -> Dict[str, Any]:
"""대화 내용을 분석하고 현재 대화 컨텍스트를 업데이트합니다."""
try:
current_time = time.time()
# 새 대화 시작 여부 확인
if (self.current_conversation["last_update"] is None or
current_time - self.current_conversation["last_update"] > 60): # 1분 이상 침묵
# 이전 대화가 있으면 저장
if self.current_conversation["start_time"] is not None:
# 대화 로그 저장
self._save_conversation_log(self.current_conversation)
self.conversations.insert(0, self.current_conversation.copy())
# 최대 저장 개수 제한
if len(self.conversations) > self.max_conversation_history:
self.conversations = self.conversations[:self.max_conversation_history]
# 새 대화 초기화
conversation_id = datetime.now().strftime("%Y%m%d%H%M%S")
self.current_conversation = {
"id": conversation_id,
"start_time": current_time,
"last_update": current_time,
"train_number": None,
"train_formation": None,
"speakers": [],
"messages": [],
"faults": []
}
else:
# 기존 대화 업데이트
self.current_conversation["last_update"] = current_time
# 화자 정보 업데이트
speaker_role = speaker_info.get("role", "unknown")
speaker_name = speaker_info.get("name", "알 수 없음")
speaker_exists = False
for s in self.current_conversation["speakers"]:
if s["role"] == speaker_role:
speaker_exists = True
break
if not speaker_exists:
self.current_conversation["speakers"].append({
"role": speaker_role,
"name": speaker_name
})
# 열차 번호와 편성 업데이트
if "train_number" in speaker_info and self.current_conversation["train_number"] is None:
self.current_conversation["train_number"] = speaker_info["train_number"]
if "train_formation" in speaker_info and self.current_conversation["train_formation"] is None:
self.current_conversation["train_formation"] = speaker_info["train_formation"]
# 고장 정보 식별 및 업데이트
faults = self.identify_faults(text)
for fault in faults:
if fault not in self.current_conversation["faults"]:
self.current_conversation["faults"].append(fault)
# 메시지 추가
timestamp = datetime.fromtimestamp(current_time).strftime("%H:%M:%S")
self.current_conversation["messages"].append({
"time": timestamp,
"speaker": speaker_name,
"text": text
})
return self.current_conversation
except Exception as e:
print(f"대화 분석 오류: {e}")
# 오류 발생 시 기본 대화 객체 반환
return {
"id": datetime.now().strftime("%Y%m%d%H%M%S"),
"start_time": time.time(),
"last_update": time.time(),
"train_number": None,
"train_formation": None,
"speakers": [{
"role": "unknown",
"name": "알 수 없음"
}],
"messages": [{
"time": datetime.now().strftime("%H:%M:%S"),
"speaker": "알 수 없음",
"text": text
}],
"faults": []
}
def _save_conversation_log(self, conversation: Dict[str, Any]):
"""대화 내용을 로그 파일로 저장합니다."""
try:
if not conversation or not conversation.get("messages"):
return
log_filename = f"{self.log_dir}/conversation_{conversation['id']}.txt"
with open(log_filename, "w", encoding="utf-8") as f:
# 헤더 정보
f.write(f"대화 ID: {conversation['id']}\n")
f.write(f"시작 시간: {datetime.fromtimestamp(conversation['start_time']).strftime('%Y-%m-%d %H:%M:%S')}\n")
if conversation.get("train_number"):
f.write(f"열차 번호: {conversation['train_number']}\n")
if conversation.get("train_formation"):
f.write(f"편성 번호: {conversation['train_formation']}\n")
if conversation.get("faults"):
f.write(f"감지된 고장: {', '.join(conversation['faults'])}\n")
f.write("\n--- 대화 내용 ---\n\n")
# 대화 내용
for msg in conversation["messages"]:
f.write(f"[{msg['time']}] {msg['speaker']}: {msg['text']}\n")
except Exception as e:
print(f"대화 로그 저장 오류: {e}")
def get_conversation_history(self) -> List[Dict[str, Any]]:
"""대화 기록을 반환합니다."""
return self.conversations
def get_conversation_by_id(self, conversation_id: str) -> Optional[Dict[str, Any]]:
"""ID로 대화를 찾아 반환합니다."""
if self.current_conversation["id"] == conversation_id:
return self.current_conversation
for conv in self.conversations:
if conv["id"] == conversation_id:
return conv
return None

273
modules/database_manager.py Normal file
View File

@ -0,0 +1,273 @@
import sqlite3
import os
import json
from typing import Dict, List, Any, Optional
from datetime import datetime
class DatabaseManager:
def __init__(self, db_path: str = "railway_data.db"):
self.db_path = db_path
self.initialize_db()
# 임시 데이터 생성 (실제 구현에서는 DB에서 로드)
self.sample_data = {
"train_schedules": {
"2212": {"formation": "24", "type": "일반", "line": "1호선"},
"2242": {"formation": "24", "type": "일반", "line": "1호선"},
"2312": {"formation": "31", "type": "급행", "line": "1호선"},
},
"train_formations": {
"24": {
"manufacturer": "현대로템",
"introduction_date": "2019-05",
"recent_faults": [
{"date": "2023-11-15", "car": "3", "component": "HVAC", "detail": "배기팬 고장", "status": "완료"},
{"date": "2023-12-03", "car": "5", "component": "출입문", "detail": "개방 지연", "status": "완료"},
],
"maintenance": [
{"type": "일상", "date": "2023-12-20", "details": "각종 센서 점검 및 배터리 교체"},
{"type": "월상", "date": "2023-11-25", "details": "공조장치 종합 점검"},
{"type": "중정비", "date": "2023-10-05", "details": "차량 종합 정밀검사"}
],
"reports": [
{"type": "동향", "date": "2023-12-01", "file": "reports/24_trend_202312.pdf"},
{"type": "조치결과", "date": "2023-11-20", "file": "reports/24_fix_20231120.pdf"}
]
},
"31": {
"manufacturer": "대우중공업",
"introduction_date": "2012-08",
"recent_faults": [
{"date": "2023-10-25", "car": "2", "component": "제동장치", "detail": "공기압 저하", "status": "완료"},
],
"maintenance": [
{"type": "일상", "date": "2023-12-15", "details": "전기장치 점검"},
{"type": "월상", "date": "2023-11-10", "details": "차체 및 대차 검사"},
{"type": "중정비", "date": "2023-06-20", "details": "차량 전체 검사 및 부품 교체"}
],
"reports": [
{"type": "동향", "date": "2023-12-01", "file": "reports/31_trend_202312.pdf"}
]
}
},
"fault_history": [
{"date": "2023-12-05", "formation": "15", "car": "3", "component": "HVAC", "detail": "냉방 불량", "status": "완료"},
{"date": "2023-12-01", "formation": "22", "car": "1", "component": "HVAC", "detail": "과열", "status": "완료"},
{"date": "2023-11-28", "formation": "17", "car": "4", "component": "HVAC", "detail": "배기팬 고장", "status": "완료"},
{"date": "2023-11-25", "formation": "24", "car": "3", "component": "HVAC", "detail": "배기팬 고장", "status": "완료"},
{"date": "2023-11-20", "formation": "08", "car": "2", "component": "HVAC", "detail": "제어기 오류", "status": "완료"},
{"date": "2023-11-15", "formation": "19", "car": "5", "component": "HVAC", "detail": "필터 교체 필요", "status": "완료"},
{"date": "2023-11-10", "formation": "28", "car": "6", "component": "HVAC", "detail": "모터 소음", "status": "완료"},
]
}
def initialize_db(self):
"""데이터베이스 초기화 및 필요한 테이블 생성"""
if not os.path.exists(self.db_path):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 열차다이아표 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS train_schedules (
train_number TEXT PRIMARY KEY,
formation_number TEXT,
train_type TEXT,
line TEXT
)
''')
# 편성데이터 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS train_formations (
formation_number TEXT PRIMARY KEY,
manufacturer TEXT,
introduction_date TEXT,
details TEXT
)
''')
# 고장이력 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS fault_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT,
formation_number TEXT,
car_number TEXT,
component TEXT,
detail TEXT,
status TEXT
)
''')
# 대화 기록 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS conversation_history (
id TEXT PRIMARY KEY,
timestamp TEXT,
train_number TEXT,
formation_number TEXT,
content TEXT
)
''')
conn.commit()
conn.close()
# 샘플 데이터 추가
self._add_sample_data()
def _add_sample_data(self):
"""샘플 데이터를 DB에 추가합니다."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 열차다이아표 데이터
for train_number, data in self.sample_data["train_schedules"].items():
cursor.execute(
"INSERT OR REPLACE INTO train_schedules VALUES (?, ?, ?, ?)",
(train_number, data["formation"], data["type"], data["line"])
)
# 편성데이터
for formation_number, data in self.sample_data["train_formations"].items():
cursor.execute(
"INSERT OR REPLACE INTO train_formations VALUES (?, ?, ?, ?)",
(formation_number, data["manufacturer"], data["introduction_date"], json.dumps(data))
)
# 고장이력 데이터
for fault in self.sample_data["fault_history"]:
cursor.execute(
"INSERT OR IGNORE INTO fault_history (date, formation_number, car_number, component, detail, status) VALUES (?, ?, ?, ?, ?, ?)",
(fault["date"], fault["formation"], fault["car"], fault["component"], fault["detail"], fault["status"])
)
conn.commit()
conn.close()
def get_train_formation(self, train_number: str) -> Optional[str]:
"""열차번호로 편성번호를 조회합니다."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT formation_number FROM train_schedules WHERE train_number = ?", (train_number,))
result = cursor.fetchone()
conn.close()
if result:
return result[0]
return None
def get_formation_details(self, formation_number: str) -> Optional[Dict[str, Any]]:
"""편성번호로 편성 상세정보를 조회합니다."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT details FROM train_formations WHERE formation_number = ?", (formation_number,))
result = cursor.fetchone()
conn.close()
if result:
return json.loads(result[0])
return None
def get_fault_history(self, component: str = None, limit: int = 50) -> List[Dict[str, Any]]:
"""특정 컴포넌트의 고장이력을 조회합니다."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query = "SELECT * FROM fault_history"
params = []
if component:
query += " WHERE component = ?"
params.append(component)
query += " ORDER BY date DESC LIMIT ?"
params.append(limit)
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
fault_history = []
for row in results:
fault_history.append({
"id": row[0],
"date": row[1],
"formation": row[2],
"car": row[3],
"component": row[4],
"detail": row[5],
"status": row[6]
})
return fault_history
def save_conversation(self, conversation: Dict[str, Any]):
"""대화 기록을 저장합니다."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO conversation_history VALUES (?, ?, ?, ?, ?)",
(
conversation["id"],
datetime.fromtimestamp(conversation["start_time"]).isoformat() if conversation["start_time"] else None,
conversation["train_number"],
conversation["train_formation"],
json.dumps(conversation)
)
)
conn.commit()
conn.close()
def get_train_info(self, conversation: Dict[str, Any]) -> Dict[str, Any]:
"""대화 컨텍스트를 기반으로 열차 정보를 검색합니다."""
train_info = {"found": False}
train_number = conversation.get("train_number")
if not train_number:
return train_info
# 편성번호 조회
formation_number = conversation.get("train_formation")
if not formation_number:
formation_number = self.get_train_formation(train_number)
if formation_number:
# 편성 상세정보 조회
formation_details = self.get_formation_details(formation_number)
if formation_details:
train_info.update({
"found": True,
"train_number": train_number,
"formation_number": formation_number,
"details": formation_details
})
return train_info
def get_fault_info(self, conversation: Dict[str, Any]) -> Dict[str, Any]:
"""대화 컨텍스트를 기반으로 고장 정보를 검색합니다."""
fault_info = {"found": False, "faults": []}
if not conversation.get("faults"):
return fault_info
for fault_type in conversation["faults"]:
component = "HVAC" if fault_type.upper() == "HVAC" else fault_type
fault_history = self.get_fault_history(component)
if fault_history:
fault_info["found"] = True
fault_info["component"] = component
fault_info["faults"] = fault_history
break
return fault_info

404
modules/gui_components.py Normal file
View File

@ -0,0 +1,404 @@
import flet as ft
from typing import Callable, Dict, List, Any, Optional
from datetime import datetime
class AudioSourceSelector(ft.UserControl):
def __init__(self, audio_source):
super().__init__()
self.audio_source = audio_source
self.sources = []
self.dropdown = None
def did_mount(self):
self.sources = self.audio_source.get_available_sources()
self.dropdown.options = [
ft.dropdown.Option(key=str(src["id"]), text=src["name"])
for src in self.sources
]
self.update()
def build(self):
self.dropdown = ft.Dropdown(
label="음원 소스 선택",
width=300,
options=[]
)
return ft.Row([
self.dropdown,
ft.IconButton(
icon=ft.icons.REFRESH,
tooltip="소스 새로고침",
on_click=self.refresh_sources
)
])
def refresh_sources(self, e):
self.sources = self.audio_source.get_available_sources()
self.dropdown.options = [
ft.dropdown.Option(key=str(src["id"]), text=src["name"])
for src in self.sources
]
self.update()
def get_selected_source(self):
if not self.dropdown.value:
return None
if self.dropdown.value == "wasapi_loopback":
return "wasapi_loopback"
return int(self.dropdown.value)
class StatusIndicator(ft.UserControl):
def __init__(self):
super().__init__()
self.status = "대기 중"
self.is_detecting = False
self.status_text = None
self.indicator = None
def build(self):
self.status_text = ft.Text(self.status, size=14)
self.indicator = ft.Container(
width=20,
height=20,
border_radius=10,
bgcolor=ft.colors.GREY_400
)
return ft.Row([
self.status_text,
self.indicator,
], spacing=5)
def set_status(self, status: str):
self.status = status
self.status_text.value = status
if status == "녹음 중":
self.indicator.bgcolor = ft.colors.GREEN
elif status == "오류":
self.indicator.bgcolor = ft.colors.RED
else:
self.indicator.bgcolor = ft.colors.GREY_400
self.update()
def set_detecting(self, is_detecting: bool):
self.is_detecting = is_detecting
if is_detecting:
self.indicator.bgcolor = ft.colors.YELLOW
elif self.status == "녹음 중":
self.indicator.bgcolor = ft.colors.GREEN
self.update()
class ConversationView(ft.UserControl):
def __init__(self):
super().__init__()
self.conversation = []
self.text_area = None
def build(self):
self.text_area = ft.TextField(
multiline=True,
read_only=True,
min_lines=15,
max_lines=20,
expand=True,
shift_enter=True,
border_color=ft.colors.GREY_300,
)
return ft.Column([
ft.Text("현재 대화", size=16, weight=ft.FontWeight.BOLD),
self.text_area,
])
def update_conversation(self, text: str, speaker_info: Dict[str, str]):
speaker_name = speaker_info.get("name", "알 수 없음")
if self.conversation:
self.conversation.append(f"{speaker_name}: {text}")
else:
self.conversation = [f"{speaker_name}: {text}"]
self.text_area.value = "\n\n".join(self.conversation)
self.update()
def set_conversation(self, messages: List[Dict[str, str]]):
self.conversation = [
f"{msg['speaker']}: {msg['text']}"
for msg in messages
]
self.text_area.value = "\n\n".join(self.conversation)
self.update()
class PreviousConversationsList(ft.UserControl):
def __init__(self, conversation_view: ConversationView):
super().__init__()
self.conversation_view = conversation_view
self.conversations = []
self.list_view = None
def build(self):
self.list_view = ft.ListView(
expand=1,
spacing=2,
padding=10,
auto_scroll=False,
)
return ft.Column([
ft.Text("이전 대화", size=16, weight=ft.FontWeight.BOLD),
self.list_view,
])
def add_conversation(self, conversation: Dict[str, Any]):
if not conversation or "id" not in conversation:
return
# 이미 있는지 확인
for i, conv in enumerate(self.conversations):
if conv["id"] == conversation["id"]:
self.conversations[i] = conversation
self._refresh_list()
return
# 새 대화 추가
self.conversations.insert(0, conversation)
# 최대 개수 제한
if len(self.conversations) > 50:
self.conversations = self.conversations[:50]
self._refresh_list()
def _refresh_list(self):
self.list_view.controls = []
for conv in self.conversations:
# 대화 요약 생성
train_info = ""
if conv.get("train_number"):
train_info = f"[{conv['train_number']}열차"
if conv.get("train_formation"):
train_info += f" {conv['train_formation']}편성]"
else:
train_info += "]"
time_info = ""
if conv.get("start_time"):
time_info = datetime.fromtimestamp(conv["start_time"]).strftime("%H:%M:%S")
# 대화 내용 요약 (첫 번째 메시지 또는 최대 30자)
summary = ""
if conv.get("messages") and len(conv["messages"]) > 0:
first_msg = conv["messages"][0]["text"]
summary = first_msg[:30] + ("..." if len(first_msg) > 30 else "")
self.list_view.controls.append(
ft.ListTile(
title=ft.Text(f"{time_info} {train_info}"),
subtitle=ft.Text(summary),
on_click=lambda e, conv_id=conv["id"]: self._on_conversation_selected(conv_id)
)
)
self.update()
def _on_conversation_selected(self, conversation_id: str):
# 선택된 대화 찾기
selected_conv = None
for conv in self.conversations:
if conv["id"] == conversation_id:
selected_conv = conv
break
if selected_conv and selected_conv.get("messages"):
self.conversation_view.set_conversation(selected_conv["messages"])
class TrainInfoPanel(ft.UserControl):
def __init__(self, db_manager):
super().__init__()
self.db_manager = db_manager
self.train_info = None
self.fault_info = None
self.container = None
def build(self):
self.container = ft.Column(
[ft.Text("열차 정보가 표시됩니다.", italic=True, color=ft.colors.GREY_600)],
scroll=ft.ScrollMode.AUTO,
spacing=10,
)
return self.container
def update_info(self, train_info: Dict[str, Any], fault_info: Dict[str, Any]):
self.train_info = train_info
self.fault_info = fault_info
self.container.controls = []
# 열차 정보 표시
if train_info and train_info.get("found"):
details = train_info.get("details", {})
train_header = ft.Row([
ft.Text(
f"{train_info['train_number']}열차 {train_info['formation_number']}편성",
size=18,
weight=ft.FontWeight.BOLD
)
])
# 기본 정보
basic_info = ft.Column([
ft.Row([
ft.Text("제작사:", weight=ft.FontWeight.BOLD),
ft.Text(details.get("manufacturer", "-"))
]),
ft.Row([
ft.Text("도입일:", weight=ft.FontWeight.BOLD),
ft.Text(details.get("introduction_date", "-"))
]),
])
self.container.controls.extend([
train_header,
ft.Divider(),
ft.Text("기본 정보", weight=ft.FontWeight.BOLD),
basic_info,
ft.Divider(),
])
# 고장 이력
if details.get("recent_faults"):
fault_list = ft.Column([
ft.DataTable(
columns=[
ft.DataColumn(ft.Text("날짜")),
ft.DataColumn(ft.Text("호차")),
ft.DataColumn(ft.Text("부품")),
ft.DataColumn(ft.Text("내용")),
],
rows=[
ft.DataRow(
cells=[
ft.DataCell(ft.Text(fault["date"])),
ft.DataCell(ft.Text(fault["car"])),
ft.DataCell(ft.Text(fault["component"])),
ft.DataCell(ft.Text(fault["detail"])),
]
)
for fault in details["recent_faults"]
]
)
])
self.container.controls.extend([
ft.Text("최근 고장 이력", weight=ft.FontWeight.BOLD),
fault_list,
ft.Divider(),
])
# 정비 이력
if details.get("maintenance"):
maintenance_list = ft.Column([
ft.DataTable(
columns=[
ft.DataColumn(ft.Text("유형")),
ft.DataColumn(ft.Text("날짜")),
ft.DataColumn(ft.Text("내용")),
],
rows=[
ft.DataRow(
cells=[
ft.DataCell(ft.Text(maint["type"])),
ft.DataCell(ft.Text(maint["date"])),
ft.DataCell(ft.Text(maint["details"])),
]
)
for maint in details["maintenance"]
]
)
])
self.container.controls.extend([
ft.Text("정비 이력", weight=ft.FontWeight.BOLD),
maintenance_list,
ft.Divider(),
])
# 보고서
if details.get("reports"):
report_list = ft.Column([
ft.Row([
ft.Text(f"{report['type']} 보고서: "),
ft.Text(report["date"]),
ft.FilledButton(
"보기",
icon=ft.icons.DESCRIPTION,
on_click=lambda e, file_path=report["file"]: self._open_report(file_path)
)
])
for report in details["reports"]
])
self.container.controls.extend([
ft.Text("보고서", weight=ft.FontWeight.BOLD),
report_list,
])
# HVAC 또는 다른 고장 정보 표시
if fault_info and fault_info.get("found"):
component = fault_info.get("component", "")
faults = fault_info.get("faults", [])
if faults:
component_header = ft.Text(
f"{component} 관련 최근 고장 이력",
size=16,
weight=ft.FontWeight.BOLD
)
fault_list = ft.DataTable(
columns=[
ft.DataColumn(ft.Text("날짜")),
ft.DataColumn(ft.Text("편성")),
ft.DataColumn(ft.Text("호차")),
ft.DataColumn(ft.Text("내용")),
],
rows=[
ft.DataRow(
cells=[
ft.DataCell(ft.Text(fault["date"])),
ft.DataCell(ft.Text(fault["formation"])),
ft.DataCell(ft.Text(fault["car"])),
ft.DataCell(ft.Text(fault["detail"])),
]
)
for fault in faults[:10] # 최대 10개만 표시
]
)
self.container.controls.extend([
ft.Divider(height=30),
component_header,
fault_list,
])
self.update()
def _open_report(self, file_path: str):
"""보고서 파일을 엽니다."""
# 실제 구현에서는 파일 열기 로직 추가
print(f"보고서 파일 열기: {file_path}")

View File

@ -0,0 +1,234 @@
import numpy as np
import os
import tempfile
import openai
import configparser
import wave
import struct
import threading
import queue
import time
from typing import Optional, Callable
class SpeechRecognizer:
def __init__(self, config_path="config.ini"):
self.config = configparser.ConfigParser()
if os.path.exists(config_path):
self.config.read(config_path)
self.api_key = self.config.get("api", "openai_api_key", fallback="")
if not self.api_key or self.api_key == "your_openai_api_key_here":
print("경고: OpenAI API 키가 설정되지 않았습니다. config.ini 파일을 확인하세요.")
openai.api_key = self.api_key
# 실시간 처리를 위한 설정
self.is_processing = False
self.audio_queue = queue.Queue()
self.process_thread = None
self.callback = None
def recognize(self, audio_data: np.ndarray) -> Optional[str]:
"""오디오 데이터를 텍스트로 변환합니다."""
if len(audio_data) == 0:
return None
try:
# 임시 파일로 오디오 저장
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file:
# WAV 파일 형식으로 저장
with wave.open(temp_file.name, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2) # 16-bit
wf.setframerate(16000)
# float32 데이터를 int16으로 변환
audio_data_int = (audio_data * 32767).astype(np.int16)
wf.writeframes(audio_data_int.tobytes())
# OpenAI Whisper API를 사용하여 음성 인식
with open(temp_file.name, "rb") as audio_file:
try:
if not self.api_key:
print("API 키가 설정되지 않았습니다.")
return None
# 최신 OpenAI API 사용 방식
try:
client = openai.OpenAI(api_key=self.api_key)
result = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language="ko",
response_format="text"
)
text_result = result if isinstance(result, str) else result.text
except (AttributeError, ImportError, NameError):
# 이전 버전 OpenAI 라이브러리 지원 (최대한 호환성 유지)
try:
# audio 모듈 사용 (최신 버전)
result = openai.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language="ko",
response_format="text"
)
text_result = result if isinstance(result, str) else result.text
except (AttributeError, ImportError, NameError):
# Audio 클래스 사용 (이전 버전)
try:
result = openai.Audio.transcribe(
model="whisper-1",
file=audio_file,
language="ko",
response_format="text"
)
text_result = result if isinstance(result, str) else result.get("text", "")
except:
print("OpenAI API 호출 방식을 찾을 수 없습니다.")
return None
# 임시 파일 삭제
os.unlink(temp_file.name)
return text_result
except Exception as e:
print(f"API 호출 오류: {e}")
# 임시 파일 삭제 시도
try:
os.unlink(temp_file.name)
except:
pass
return None
except Exception as e:
print(f"음성 인식 오류: {e}")
return None
def recognize_file(self, file_path: str) -> Optional[str]:
"""오디오 파일을 텍스트로 변환합니다."""
if not os.path.exists(file_path):
print(f"파일이 존재하지 않습니다: {file_path}")
return None
try:
# OpenAI Whisper API를 사용하여 음성 인식
with open(file_path, "rb") as audio_file:
try:
if not self.api_key:
print("API 키가 설정되지 않았습니다.")
return None
# 최신 OpenAI API 사용 방식
try:
client = openai.OpenAI(api_key=self.api_key)
result = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language="ko",
response_format="text"
)
text_result = result if isinstance(result, str) else result.text
except (AttributeError, ImportError, NameError):
# 이전 버전 OpenAI 라이브러리 지원 (최대한 호환성 유지)
try:
# audio 모듈 사용 (최신 버전)
result = openai.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language="ko",
response_format="text"
)
text_result = result if isinstance(result, str) else result.text
except (AttributeError, ImportError, NameError):
# Audio 클래스 사용 (이전 버전)
try:
result = openai.Audio.transcribe(
model="whisper-1",
file=audio_file,
language="ko",
response_format="text"
)
text_result = result if isinstance(result, str) else result.get("text", "")
except:
print("OpenAI API 호출 방식을 찾을 수 없습니다.")
return None
return text_result
except Exception as e:
print(f"파일 API 호출 오류: {e}")
return None
except Exception as e:
print(f"파일 음성 인식 오류: {e}")
return None
def start_realtime_recognition(self, result_callback: Callable[[str], None]):
"""실시간 음성 인식을 시작합니다."""
if self.is_processing:
return
self.is_processing = True
self.callback = result_callback
# 오디오 처리 스레드 시작
self.process_thread = threading.Thread(target=self._process_audio_queue)
self.process_thread.daemon = True
self.process_thread.start()
def stop_realtime_recognition(self):
"""실시간 음성 인식을 중지합니다."""
self.is_processing = False
# 큐 비우기
while not self.audio_queue.empty():
try:
self.audio_queue.get_nowait()
except queue.Empty:
break
self.process_thread = None
def add_audio_data(self, audio_data: np.ndarray):
"""오디오 큐에 데이터를 추가합니다."""
if self.is_processing:
self.audio_queue.put(audio_data)
def _process_audio_queue(self):
"""백그라운드에서 오디오 큐의 데이터를 처리합니다."""
while self.is_processing:
try:
# 큐에서 오디오 데이터 가져오기 (1초 타임아웃)
audio_data = self.audio_queue.get(timeout=1.0)
# 오디오 데이터 변환
result = self.recognize(audio_data)
# 결과가 있으면 콜백 호출
if result and self.callback:
self.callback(result)
except queue.Empty:
# 타임아웃 - 계속 진행
time.sleep(0.1)
continue
except Exception as e:
print(f"오디오 처리 오류: {e}")
time.sleep(0.5) # 오류 시 잠시 대기
def set_api_key(self, api_key: str):
"""OpenAI API 키를 설정합니다."""
self.api_key = api_key
openai.api_key = api_key
# config.ini 파일 업데이트
self.config.set("api", "openai_api_key", api_key)
try:
with open("config.ini", "w") as config_file:
self.config.write(config_file)
except Exception as e:
print(f"설정 파일 저장 오류: {e}")

1305
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

22
pyproject.toml Normal file
View File

@ -0,0 +1,22 @@
[tool.poetry]
name = "trnote"
version = "0.1.0"
description = "철도 음성인식 시스템"
authors = ["Your Name <your.email@example.com>"]
readme = "README.md"
packages = [{include = "."}]
[tool.poetry.dependencies]
python = "^3.8"
flet = "^0.9.0"
openai = "^1.0.0"
numpy = "^1.22.0"
sounddevice = "^0.4.6"
pyaudio = "^0.2.13"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
trnote = "main:main"

BIN
railway_data.db Normal file

Binary file not shown.

0
updater/__init__.py Normal file
View File