AI_MMI_Analyser/app/ui/analysis_panel.py

323 lines
13 KiB
Python

from PySide6.QtWidgets import QWidget, QVBoxLayout, QTabWidget, QLabel, QPushButton, QHBoxLayout, QFileDialog
from PySide6.QtCore import Signal
from app.ui.views.graph_view import GraphView
# 추후 구현할 뷰들: from app.ui.views.dashboard_view import DashboardView
from app.data.log_parser import LogLoader
from app.core.sync_controller import sync_manager
from app.ui.views.dashboard_view import DashboardView
from app.ui.views.ai_view import AIDiagnosisView
import polars as pl
from dataclasses import fields
from app.data.fast_parser import FastLogParser
from app.data.log_parser import DOSItoMMIClass
class AnalysisPanel(QWidget):
# 데이터 로드 완료 시그널
data_loaded = Signal()
def __init__(self, panel_id, parent=None):
super().__init__(parent)
self.panel_id = panel_id # "left" or "right"
self.loader = None # 지연 초기화 - 실제 사용 시점에 생성
self.raw_data = []
self.data_date = None # 데이터 날짜 (YYYY.MM.DD 형식)
self.file_path = None
self.current_index = None
self.current_row = None
self.setup_ui()
# 동기화 신호 받기
sync_manager.time_changed.connect(self.on_sync_received)
def setup_ui(self):
layout = QVBoxLayout(self)
# [상단] 개별 파일 로드 버튼 (A/B 비교를 위해)
top_bar = QHBoxLayout()
self.lbl_file = QLabel("No File Loaded")
self.btn_load = QPushButton(f"Load File ({self.panel_id.upper()})")
self.btn_load.clicked.connect(self.open_file)
top_bar.addWidget(self.lbl_file)
top_bar.addWidget(self.btn_load)
layout.addLayout(top_bar)
# [중앙] 탭 위젯
self.tabs = QTabWidget()
# 탭 1: 그래프 뷰
self.graph_view = GraphView(self.panel_id)
self.tabs.addTab(self.graph_view, "📊 Graph")
# 탭 2: 기존 뷰어 (껍데기)
self.dashboard_view = DashboardView(self.panel_id)
self.tabs.addTab(self.dashboard_view, "📟 Legacy Viewer")
# 탭 3: 신호 목록 (껍데기)
self.tabs.addTab(QWidget(), "📜 Signal List")
# 탭 4: AI 분석 (GraphView 컨텍스트 포함)
self.ai_view = AIDiagnosisView(self.panel_id, graph_view=self.graph_view)
self.tabs.addTab(self.ai_view, "🤖 AI Diagnosis")
# 그래프 선택 구간 → AI 요청 연결
self.graph_view.range_ai_requested.connect(self._on_range_ai_requested)
layout.addWidget(self.tabs)
def open_file(self):
file_name, _ = QFileDialog.getOpenFileName(self, "Open Log", "", "Log Files (*.dat)")
if file_name:
self.load_data(file_name)
def load_data(self, file_path):
self.file_path = file_path
self.lbl_file.setText(file_path.split("/")[-1])
# 1. FastLogParser 시도 (Polars)
parquet_path = file_path.replace(".dat", ".parquet")
try:
# 파싱 및 Parquet 저장 (이미 있으면 스킵 가능하지만, 로직상 호출)
# FastLogParser 내부에서 파일 존재 체크 등을 수행함
output_path = FastLogParser.parse_to_parquet(file_path)
if output_path:
# Polars로 읽기
df = pl.read_parquet(output_path)
# DataFrame -> List[DOSItoMMIClass] 변환
# 1) 필드 목록 추출
valid_fields = {f.name for f in fields(DOSItoMMIClass)}
# 2) 딕셔너리 변환 (Polars -> Python List of Dicts)
# rows = df.to_dicts() # 이 방식은 빠름
# 3) 객체 생성 (불필요한 컬럼 제거)
# 벡터화된 방식은 아니지만, Python 루프보다는 빠름 (파싱 비용 제거됨)
# 50만건 기준 약 1~2초 소요 예상
self.raw_data = [
DOSItoMMIClass(**{k: v for k, v in row.items() if k in valid_fields})
for row in df.to_dicts()
]
print(f"[AnalysisPanel] Loaded {len(self.raw_data)} records via FastLogParser")
else:
raise Exception("FastLogParser returned None")
except Exception as e:
print(f"[AnalysisPanel] FastLogParser failed, falling back to legacy loader: {e}")
# LogLoader 지연 초기화
if self.loader is None:
self.loader = LogLoader()
self.raw_data = self.loader.load(file_path)
# 레거시 로더 사용 시 역 정보 매핑 (기존 로직 유지)
from app.core.reference_data import ref_data
if ref_data._is_loaded and self.raw_data:
for item in self.raw_data:
if item.pstn > 0:
st_name = ref_data.get_station_name(item.pstn)
if st_name:
item.station_name = st_name
# 데이터 날짜 추출 (첫 번째 레코드의 time 필드에서)
if self.raw_data and len(self.raw_data) > 0:
time_str = getattr(self.raw_data[0], 'time', '')
# time 형식: "20YY.MM.DD HH:mm:ss" -> 날짜 부분만 추출
if time_str and len(time_str) >= 10:
self.data_date = time_str[:10] # "20YY.MM.DD"
else:
self.data_date = None
else:
self.data_date = None
# 하위 뷰들에게 데이터 전달
self.graph_view.set_data(self.raw_data)
self.dashboard_view.set_data(self.raw_data) # Legacy Viewer에도 전달
self.ai_view.set_file_context(file_path)
self.ai_view.set_data(self.raw_data)
# 기본 커서 포인트(첫 레코드) 설정
if self.raw_data:
self.current_index = 0
self.current_row = self.raw_data[0]
# 첫 번째 패널이 로드되면 전체 레코드 수 설정 (임시)
if self.panel_id == "left":
sync_manager._total_records = len(self.raw_data)
# 데이터 로드 완료 시그널 발생
self.data_loaded.emit()
def on_sync_received(self, index, data, source_id):
"""
SyncController로부터 "시간 바꿔!" 명령이 오면 실행
단, 내가 보낸 신호가 아닐 때만 반응해야 함 (Loop 방지)
"""
# 내 패널에서 움직인 커서도 기록해둬야 AI/플로팅 채팅에서 현재 시점을 알 수 있음
if source_id == self.panel_id:
self.current_index = index
self.current_row = data
return
if source_id != self.panel_id:
# 타임라인/커서 이동
self.graph_view.set_index(index)
# 추후 다른 뷰들도 이동 (예: self.dashboard.set_values(data))
def _on_range_ai_requested(self, start_ms: int, end_ms: int, start_idx: int, end_idx: int):
"""그래프에서 선택한 구간을 AI 분석으로 전달"""
if hasattr(self, "ai_view") and self.ai_view:
self.ai_view.run_range_analysis(start_idx, end_idx, start_ms, end_ms)
def set_slave_mode(self, is_slave):
# Sync 모드일 때 로드 버튼 숨기기
self.btn_load.setVisible(not is_slave)
self.lbl_file.setText("Linked to Left Panel" if is_slave else "No File")
def receive_shared_data(self, data_list):
# 다른 패널에서 데이터 받아오기
self.raw_data = data_list
self.graph_view.set_data(data_list)
self.dashboard_view.set_data(data_list) # Legacy Viewer에도 전달
self.ai_view.set_data(data_list)
if self.raw_data:
self.current_index = 0
self.current_row = self.raw_data[0]
def get_ai_context(self):
"""플로팅 AI/외부 호출용: 현재 패널의 AI 컨텍스트 묶음"""
ctx = {
"panel_id": self.panel_id,
"file_path": self.file_path,
"data_date": self.data_date,
"current_index": self.current_index,
}
try:
if hasattr(self, "graph_view") and self.graph_view and hasattr(self.graph_view, "get_ai_context"):
ctx["graph"] = self.graph_view.get_ai_context(self.current_index)
except Exception:
ctx["graph"] = {}
try:
if self.current_row is not None:
d = self.current_row
ctx["record"] = {
"time": getattr(d, "time", ""),
"trainno": getattr(d, "trainno", ""),
"trainspeed": getattr(d, "trainspeed", None),
"dtg": getattr(d, "dtg", None),
"atc_status": getattr(d, "atc_status", ""),
"marker": getattr(d, "marker", ""),
"pstn": getattr(d, "pstn", None),
}
except Exception:
pass
return ctx
def compute_pg_at_station(self, marker_name: str = "PG3-2"):
"""
'각 역마다 PG3-2 위치에서의 속도/TASC' 같은 질의를 위해
역 구간별(역 이벤트 ~ 다음 역 이벤트)로 PG 마커를 매칭해 지표를 추출한다.
"""
if not self.raw_data or not hasattr(self, "graph_view") or not self.graph_view:
return []
gv = self.graph_view
if not hasattr(gv, "chart_view") or not getattr(gv, "start_timestamp", 0):
return []
# 역 이벤트(시간, 역명)
try:
stations = list(gv.chart_view.event_markers.get("STATION", []))
except Exception:
stations = []
stations = [(int(ts), str(name)) for ts, name in stations if ts is not None]
stations.sort(key=lambda x: x[0])
if not stations:
return []
# PG Rx 마커(시간, 타입)
try:
pgs = list(gv.chart_view.event_markers.get("PG", []))
except Exception:
pgs = []
pgs = [(int(ts), str(name)) for ts, name in pgs if ts is not None]
pgs = [p for p in pgs if p[1] == marker_name]
pgs.sort(key=lambda x: x[0])
results = []
for i, (st_ts, st_name) in enumerate(stations):
seg_start = st_ts
seg_end = stations[i + 1][0] if i + 1 < len(stations) else 10**18
# 구간 내 첫 PG3-2 (없으면 None)
pg_ts = None
for t, n in pgs:
if seg_start <= t < seg_end:
pg_ts = t
break
row = {
"station": st_name,
"station_ts": st_ts,
"pg_marker": marker_name,
"pg_ts": pg_ts,
}
if pg_ts is not None:
idx = int(round((pg_ts - gv.start_timestamp) / 1000.0))
idx = max(0, min(len(self.raw_data) - 1, idx))
d = self.raw_data[idx]
# raw 값
row.update(
{
"index": idx,
"time": getattr(d, "time", ""),
"raw_trainspeed": getattr(d, "trainspeed", None),
"raw_tasc_value": getattr(d, "tasc_value", None),
"raw_dtg": getattr(d, "dtg", None),
"raw_atc_status": getattr(d, "atc_status", ""),
}
)
# filtered(그래프 표시) 값도 함께
try:
sp = gv.signals_map.get("speed", {}).get("data", [])
ta = gv.signals_map.get("tasc", {}).get("data", [])
dg = gv.signals_map.get("dtg", {}).get("data", [])
if 0 <= idx < len(sp):
row["filtered_speed"] = sp[idx]
if 0 <= idx < len(ta):
row["filtered_tasc"] = ta[idx]
if 0 <= idx < len(dg):
row["filtered_dtg"] = dg[idx]
except Exception:
pass
# all-zero 프레임 여부
try:
if hasattr(gv, "_all_zero_mask") and gv._all_zero_mask and 0 <= idx < len(gv._all_zero_mask):
row["all_zero_frame"] = bool(gv._all_zero_mask[idx])
except Exception:
pass
try:
# 우선 filtered로 차이 계산, 없으면 raw로 폴백
s = row.get("filtered_speed", row.get("raw_trainspeed"))
t = row.get("filtered_tasc", row.get("raw_tasc_value"))
if s is not None and t is not None:
row["speed_minus_tasc"] = float(s) - float(t)
except Exception:
pass
results.append(row)
return results
def get_data_date(self):
"""현재 로드된 데이터의 날짜 반환"""
return self.data_date