AI_MMI_Analyser/app/ui/views/dashboard_view.py

1094 lines
48 KiB
Python

"""
Legacy Viewer (Dashboard View)
- 상단 20%: 속도계 그룹
- 중간 30%: 신호 카드 그룹
- 하단 50%: 그래프 그룹 (미니 그래프)
"""
from PySide6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QGridLayout,
QGroupBox, QFrame, QLabel, QSplitter, QScrollArea,
QSizePolicy, QPushButton)
from PySide6.QtCore import Qt
from PySide6.QtGui import QColor
from app.ui.components.signal_components import (
OnOffSignalLabel, DataSignalLabel, SignalCard
)
from app.core.sync_controller import sync_manager
from app.ui.views.graph_view import GraphView
class DashboardView(QWidget):
def __init__(self, panel_id):
super().__init__()
self.panel_id = panel_id
# 노이즈 필터링 상태
self.last_valid_pwm = 0
self._signal_filter_state = {}
self._signal_valid_sources = {}
self._source_filtered_last = {}
# 신호 컴포넌트 참조 저장
self.signal_widgets = {}
self.setup_ui()
sync_manager.time_changed.connect(self.update_view)
def setup_ui(self):
main_layout = QVBoxLayout(self)
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.setSpacing(0)
# === 메인 Splitter (수직) ===
self.main_splitter = QSplitter(Qt.Vertical)
self.main_splitter.setChildrenCollapsible(False) # 완전히 접히지 않도록
self.main_splitter.setHandleWidth(5)
self.main_splitter.setStyleSheet("""
QSplitter::handle { background-color: #3E3E42; }
QSplitter::handle:hover { background-color: #007ACC; }
QSplitter::handle:pressed { background-color: #005A9E; }
""")
# === [1] 상단: 신호 카드 그룹 ===
signal_group = self._create_signal_group()
self.main_splitter.addWidget(signal_group)
# === [2] 하단: 미니 그래프 그룹 ===
graph_group = self._create_graph_group()
self.main_splitter.addWidget(graph_group)
# 초기 비율 설정 (60:40)
self.main_splitter.setSizes([300, 200])
main_layout.addWidget(self.main_splitter)
# =========================================================================
# [2] 신호 카드 그룹
# =========================================================================
def _create_signal_group(self):
# 전체 컨테이너 (스크롤 + VD 버튼)
wrapper = QWidget()
wrapper_layout = QVBoxLayout(wrapper)
wrapper_layout.setContentsMargins(0, 0, 0, 0)
wrapper_layout.setSpacing(0)
scroll = QScrollArea()
scroll.setWidgetResizable(True)
scroll.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
scroll.setMinimumHeight(100)
scroll.setStyleSheet("""
QScrollArea { background-color: #1E1E1E; border: none; }
QScrollBar:vertical { background: #2D2D30; width: 8px; }
QScrollBar::handle:vertical { background: #555; border-radius: 4px; min-height: 20px; }
QScrollBar:horizontal { background: #2D2D30; height: 8px; }
QScrollBar::handle:horizontal { background: #555; border-radius: 4px; min-width: 20px; }
""")
container = QWidget()
container.setStyleSheet("background-color: #1E1E1E;")
grid = QGridLayout(container)
grid.setContentsMargins(3, 3, 3, 3)
grid.setSpacing(3)
# 카드 생성 - 3열 배치
# Row 0: ATC | ATO | TWC | FAIL
# Row 1: RLY(ATO) | DI(ATC) | DO(ATC)
# Row 2: ETC1 (DATA) | ETC2 (ON/OFF)
# Row 0: INFO, ATC, ATO, TWC (4열)
self.card_info = self._create_info_card()
self.card_atc = self._create_atc_card()
self.card_ato = self._create_ato_card()
self.card_twc = self._create_twc_card()
grid.addWidget(self.card_info, 0, 0)
grid.addWidget(self.card_atc, 0, 1)
grid.addWidget(self.card_ato, 0, 2)
grid.addWidget(self.card_twc, 0, 3)
# Row 1: FAIL, RLY(ATO), DI(ATC), DO(ATC)
self.card_fail = self._create_fail_card()
self.card_rly = self._create_rly_card()
self.card_di = self._create_di_card()
self.card_do = self._create_do_card()
grid.addWidget(self.card_fail, 1, 0)
grid.addWidget(self.card_rly, 1, 1)
grid.addWidget(self.card_di, 1, 2)
grid.addWidget(self.card_do, 1, 3)
# Row 2: ETC1 (DATA 전용), ETC2 (ON/OFF 전용)
self.card_etc1 = self._create_etc1_card() # DATA 컴포넌트
self.card_etc2 = self._create_etc2_card() # ON/OFF 컴포넌트
grid.addWidget(self.card_etc1, 2, 0, 1, 2) # 2열 차지
grid.addWidget(self.card_etc2, 2, 2, 1, 2) # 2열 차지
# 메인 카드 목록 (참조용)
self.main_cards = [
self.card_info, self.card_atc, self.card_ato, self.card_twc, self.card_fail,
self.card_rly, self.card_di, self.card_do,
self.card_etc1, self.card_etc2
]
# VD 카드 (6개) - 토글 가능, 기본값 OFF (숨김)
self.vd_cards = [
self._create_vdi_a_card(),
self._create_vdi_b_card(),
self._create_vdi_c_card(),
self._create_vdi_d_card(),
self._create_vdo_a_card(),
self._create_vdo_b_card(),
]
# VD 카드 배치 (4열, Row 3~4)
for i, card in enumerate(self.vd_cards):
card.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Preferred)
row = 3 + (i // 4)
col = i % 4
grid.addWidget(card, row, col)
card.setVisible(False) # 기본값: 숨김 (OFF)
scroll.setWidget(container)
wrapper_layout.addWidget(scroll, 1)
# VD 토글 버튼 (오른쪽 하단)
btn_container = QWidget()
btn_container.setStyleSheet("background-color: #1E1E1E;")
btn_layout = QHBoxLayout(btn_container)
btn_layout.setContentsMargins(5, 2, 5, 2)
btn_layout.addStretch()
self.vd_visible = False # VD 카드 표시 상태 - 기본값 OFF
self.btn_vd_toggle = QPushButton("VD ▲") # 기본값: 숨김 상태
self.btn_vd_toggle.setFixedSize(50, 22)
self.btn_vd_toggle.setStyleSheet("""
QPushButton {
background-color: #4C4C4C;
color: #888888;
border: 1px solid #666;
border-radius: 3px;
font-size: 10px;
font-weight: bold;
}
QPushButton:hover {
background-color: #5C5C5C;
}
QPushButton:pressed {
background-color: #3C3C3C;
}
""")
self.btn_vd_toggle.clicked.connect(self._toggle_vd_cards)
btn_layout.addWidget(self.btn_vd_toggle)
wrapper_layout.addWidget(btn_container)
return wrapper
def _toggle_vd_cards(self):
"""VD 카드들(VDI A/B/C/D, VDO A/B) 표시/숨김 토글"""
self.vd_visible = not self.vd_visible
for card in self.vd_cards:
card.setVisible(self.vd_visible)
if self.vd_visible:
self.btn_vd_toggle.setText("VD ▼")
self.btn_vd_toggle.setStyleSheet("""
QPushButton {
background-color: #4C4C4C;
color: #00FF00;
border: 1px solid #666;
border-radius: 3px;
font-size: 10px;
font-weight: bold;
}
QPushButton:hover { background-color: #5C5C5C; }
""")
else:
self.btn_vd_toggle.setText("VD ▲")
self.btn_vd_toggle.setStyleSheet("""
QPushButton {
background-color: #4C4C4C;
color: #888888;
border: 1px solid #666;
border-radius: 3px;
font-size: 10px;
font-weight: bold;
}
QPushButton:hover { background-color: #5C5C5C; }
""")
# -------------------------------------------------------------------------
# INFO 카드 (상단 상태 요약)
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
# INFO 카드 (상단 상태 요약)
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
# INFO 카드 (상단 상태 요약)
# -------------------------------------------------------------------------
def _create_info_card(self):
card = SignalCard("INFO")
# 커스텀 레이아웃을 위해 add_signal 대신 signal_grid 직접 사용
grid = card.signal_grid
# 1. TIME (상단, 전체 너비 사용 및 중앙 정렬)
time_sig = DataSignalLabel("time", "TIME", "")
time_sig.set_value_style("font-size: 16px; font-weight: bold; color: #00D084;") # 강조
# DataSignalLabel은 내부적으로 QHBoxLayout을 사용하므로,
# 중앙 정렬을 위해 컨테이너나 스타일 조정이 필요할 수 있으나,
# 여기서는 Grid의 0행 전체를 차지하게 함.
grid.addWidget(time_sig, 0, 0, 1, 4, Qt.AlignCenter)
card.signals["time"] = time_sig
self.signal_widgets["info_time"] = time_sig
# 2. SPEED (속도계) - 1행, 전체 너비
from app.ui.components.speedometer import SpeedometerWidget
self.speedometer = SpeedometerWidget(title="SPEED", unit="km/h", max_value=120)
grid.addWidget(self.speedometer, 1, 0, 1, 4, Qt.AlignCenter)
card.signals["speed"] = self.speedometer # 필요 시 참조용
self.signal_widgets["info_speed"] = self.speedometer
# 3. 나머지 데이터 (LIMIT, DTG, PWM, ATC CODE) - 2행에 2열씩 배치
# Row 2: LIMIT, DTG
# Row 3: PWM, ATC CODE
limit_sig = DataSignalLabel("limit", "LIMIT", "km/h")
dtg_sig = DataSignalLabel("dtg", "DTG", "m")
pwm_sig = DataSignalLabel("pwm", "PWM", "%")
atc_sig = DataSignalLabel("atc_code", "ATC CODE", "")
# (위젯, 행, 열, 행스팬, 열스팬)
grid.addWidget(limit_sig, 2, 0, 1, 2)
grid.addWidget(dtg_sig, 2, 2, 1, 2)
grid.addWidget(pwm_sig, 3, 0, 1, 2)
grid.addWidget(atc_sig, 3, 2, 1, 2)
# signals 등록
card.signals["limit"] = limit_sig
card.signals["dtg"] = dtg_sig
card.signals["pwm"] = pwm_sig
card.signals["atc_code"] = atc_sig
self.signal_widgets["info_limit"] = limit_sig
self.signal_widgets["info_dtg"] = dtg_sig
self.signal_widgets["info_pwm"] = pwm_sig
self.signal_widgets["info_atc_code"] = atc_sig
# 레이아웃 마무리 (SignalCard 내부 메서드 호출 대신 수동으로 stretch 설정)
grid.setRowStretch(4, 1)
return card
# -------------------------------------------------------------------------
# ATC 카드
# -------------------------------------------------------------------------
def _create_atc_card(self):
card = SignalCard("ATC")
# DATA: 운전실 상태
cab = DataSignalLabel("cab", "운전실", description="운전실 선택 상태 (TC1/TC2/OFF)")
card.add_signal(cab, "cab")
self.signal_widgets["atc_cab"] = cab
# DATA: ATC ACTIVE, CARR, CODE, CODE FREQ, LIMIT
for name, desc in [
("ATC ACT", "ATC 활성화 상태"),
("CARR", "ATC 캐리어 주파수"),
("CODE", "ATC 속도 코드"),
("FREQ", "코드 주파수"),
("LIMIT", "ATC 제한속도"),
]:
sig = DataSignalLabel(name.replace(" ", "_").lower(), name, description=desc)
card.add_signal(sig, name.replace(" ", "_").lower())
self.signal_widgets[f"atc_{name.replace(' ', '_').lower()}"] = sig
return card
# -------------------------------------------------------------------------
# TWC 카드
# -------------------------------------------------------------------------
def _create_twc_card(self):
card = SignalCard("TWC")
# ON/OFF
for name, desc in [
("DCW", "Door Close Warning"),
("TWX Tx", "TWC 송신 활성화"),
("BERTH", "Train Berth - 정위치 정차"),
("W.DOOR", "Wrong Door - 잘못된 도어"),
]:
sig = OnOffSignalLabel(name.replace(" ", "_").lower(), name, desc)
card.add_signal(sig, name.replace(" ", "_").lower())
self.signal_widgets[f"twc_{name.replace(' ', '_').lower()}"] = sig
# DATA
for name, desc in [
("열번", "열차 번호"),
("현재역", "현재 역 코드"),
("다음역", "다음 역 코드"),
("종착역", "종착역 코드"),
("DOOR", "다음 도어 방향"),
]:
sig = DataSignalLabel(name, name, description=desc)
card.add_signal(sig, name)
self.signal_widgets[f"twc_{name}"] = sig
return card
# -------------------------------------------------------------------------
# ATO 카드
# -------------------------------------------------------------------------
def _create_ato_card(self):
card = SignalCard("ATO")
# ON/OFF
for name, full_name, desc in [
("TASC", "TASC", "Train Automatic Stop Control - 자동정차제어"),
("TASC DB", "TASC DB", "TASC Dynamic Brake"),
("ATO EB", "ATO EB REQ", "ATO Emergency Brake Request - ATO 비상제동 요청"),
]:
sig = OnOffSignalLabel(name, full_name, desc)
card.add_signal(sig, name.lower().replace(" ", "_"))
self.signal_widgets[f"ato_{name.lower().replace(' ', '_')}"] = sig
# DATA
for name, desc in [
("MARKER", "ATO 마커 (PG1/2/3/X)"),
("OSC", "OSC 주파수"),
("PWM", "PWM 값"),
("DTG", "남은거리(m)"),
]:
sig = DataSignalLabel(name.lower(), name, description=desc)
card.add_signal(sig, name.lower())
self.signal_widgets[f"ato_{name.lower()}"] = sig
return card
# -------------------------------------------------------------------------
# RLY (ATO) 카드
# -------------------------------------------------------------------------
def _create_rly_card(self):
card = SignalCard("RLY (ATO)")
for name in ["DR", "BR", "ADC", "ADOL", "ADOR", "OSC", "KUR", "SEL"]:
sig = OnOffSignalLabel(name, name, f"RLY {name} 릴레이 출력")
card.add_signal(sig, name.lower())
self.signal_widgets[f"rly_{name.lower()}"] = sig
return card
# -------------------------------------------------------------------------
# FAIL 카드
# -------------------------------------------------------------------------
def _create_fail_card(self):
card = SignalCard("FAIL")
for name, desc in [
("ATO.R", "ATO Receiver Fail"),
("ATO.C", "ATO Controller Fail"),
("TCMS", "TCMS Communication Fail"),
("TACHO#1", "Tachometer #1 Fail"),
("TACHO#2", "Tachometer #2 Fail"),
]:
sig = OnOffSignalLabel(name.replace(".", "_").replace("#", ""), name, desc)
sig.set_style_override(
on_style="background-color: #E53935; color: white; border-radius: 3px; padding: 2px 6px; font-weight: bold;",
)
card.add_signal(sig, name.replace(".", "_").replace("#", "").lower())
self.signal_widgets[f"fail_{name.replace('.', '_').replace('#', '').lower()}"] = sig
return card
# -------------------------------------------------------------------------
# DI (ATC) 카드
# -------------------------------------------------------------------------
def _create_di_card(self):
card = SignalCard("DI (ATC)")
# ON/OFF
for name in ["HCR", "TCR", "START"]:
sig = OnOffSignalLabel(name, name, f"DI {name} 입력 신호")
card.add_signal(sig, name.lower())
self.signal_widgets[f"di_{name.lower()}"] = sig
# DATA 형태로 표시 (운전모드/마스콘/역전기/PSD/DOOR)
for name, desc in [
("운전모드", "운전 모드"),
("마스콘", "마스콘 위치"),
("역전기", "역전기 위치"),
("PSD", "PSD 상태"),
("DOOR", "도어 상태"),
]:
sig = DataSignalLabel(name.lower(), name, description=desc)
card.add_signal(sig, name.lower())
self.signal_widgets[f"di_{name.lower()}"] = sig
return card
# -------------------------------------------------------------------------
# DO (ATC) 카드
# -------------------------------------------------------------------------
def _create_do_card(self):
card = SignalCard("DO (ATC)")
for name in ["EB+", "EB-", "FSB", "ZVR", "EDR", "EDL"]:
sig = OnOffSignalLabel(name.replace("+", "p").replace("-", "m"), name, f"DO {name} 출력 신호")
card.add_signal(sig, name.replace("+", "p").replace("-", "m").lower())
self.signal_widgets[f"do_{name.replace('+', 'p').replace('-', 'm').lower()}"] = sig
return card
# -------------------------------------------------------------------------
# ETC1 카드 (DATA 컴포넌트만) - 가로 배치
# -------------------------------------------------------------------------
def _create_etc1_card(self):
card = SignalCard("ETC1")
card.set_horizontal_data_mode(True) # DATA를 가로로 배치
# DATA
for name in ["INIT PDT", "MNUL PDT", "TCMS DR", "DIA1", "DIA2", "VER", "DOOR MD"]:
sig = DataSignalLabel(name.replace(" ", "_").lower(), name, description=f"ETC {name}")
card.add_signal(sig, name.replace(" ", "_").lower())
self.signal_widgets[f"etc_{name.replace(' ', '_').lower()}"] = sig
return card
# -------------------------------------------------------------------------
# ETC2 카드 (ON/OFF 컴포넌트만)
# -------------------------------------------------------------------------
def _create_etc2_card(self):
card = SignalCard("ETC2")
# ON/OFF
for name in ["SYS ACT", "INCH", "CS", "평상", "회복", "O.SPD", "사전제동", "가속제한",
"-70↑", "+70↑", "TDIR A", "TDIR B"]:
sig = OnOffSignalLabel(name.replace(" ", "_"), name, f"ETC {name} 신호")
card.add_signal(sig, name.replace(" ", "_").lower())
self.signal_widgets[f"etc_{name.replace(' ', '_').lower()}"] = sig
return card
# -------------------------------------------------------------------------
# VDI A/B/C/D 카드
# -------------------------------------------------------------------------
def _create_vdi_a_card(self):
return self._create_vdi_ab_card("VDI A", "vdi_a")
def _create_vdi_b_card(self):
return self._create_vdi_ab_card("VDI B", "vdi_b")
def _create_vdi_ab_card(self, title, prefix):
card = SignalCard(title)
for name in ["HCR", "TCR", "FA", "AUTO", "MCS", "YARD", "FMC",
"D.OPEN", "D.CLOSE", "MC EB", "MC BR", "MC DR", "FWD", "NEU", "RVS"]:
sig = OnOffSignalLabel(name.replace(".", "_"), name, f"{title} {name}")
card.add_signal(sig, name.replace(".", "_").lower())
self.signal_widgets[f"{prefix}_{name.replace('.', '_').lower()}"] = sig
return card
def _create_vdi_c_card(self):
return self._create_vdi_cd_card("VDI C", "vdi_c")
def _create_vdi_d_card(self):
return self._create_vdi_cd_card("VDI D", "vdi_d")
def _create_vdi_cd_card(self, title, prefix):
card = SignalCard(title)
for name in ["PSD OP", "PSD CL", "START", "UNIT1", "EB+FB", "EB-FB",
"FSB FB", "ZVR FB", "EDR FB", "EDL FB", "TC1", "TC2"]:
sig = OnOffSignalLabel(name.replace("+", "p").replace("-", "m").replace(" ", "_"),
name, f"{title} {name}")
card.add_signal(sig, name.replace("+", "p").replace("-", "m").replace(" ", "_").lower())
self.signal_widgets[f"{prefix}_{name.replace('+', 'p').replace('-', 'm').replace(' ', '_').lower()}"] = sig
return card
# -------------------------------------------------------------------------
# VDO A/B 카드
# -------------------------------------------------------------------------
def _create_vdo_a_card(self):
return self._create_vdo_card("VDO A", "vdo_a")
def _create_vdo_b_card(self):
return self._create_vdo_card("VDO B", "vdo_b")
def _create_vdo_card(self, title, prefix):
card = SignalCard(title)
for name in ["EB+", "EB-", "FSB", "ZVR", "EDL", "EDR"]:
sig = OnOffSignalLabel(name.replace("+", "p").replace("-", "m"), name, f"{title} {name}")
card.add_signal(sig, name.replace("+", "p").replace("-", "m").lower())
self.signal_widgets[f"{prefix}_{name.replace('+', 'p').replace('-', 'm').lower()}"] = sig
return card
# =========================================================================
# [3] 그래프 그룹 (GraphView 전체 임베드)
# =========================================================================
def _create_graph_group(self):
# GraphView를 그대로 임베드
self.embedded_graph = GraphView(self.panel_id)
self.embedded_graph.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.embedded_graph.setMinimumHeight(150)
return self.embedded_graph
def set_data(self, data_list):
"""외부에서 데이터 로드 시 그래프에도 전달"""
if hasattr(self, 'embedded_graph'):
self.embedded_graph.set_data(data_list)
# source 기반 신뢰도 추정 (가짜 0 제거용)
signal_attr_map = {
"trainspeed": "trainspeed",
"pwm_value": "pwm_value",
"dtg": "dtg",
"osc_f": "osc_f",
}
self._signal_valid_sources = self._infer_valid_sources(data_list, signal_attr_map)
self._source_filtered_last = {}
# =========================================================================
# 노이즈 필터링 함수들 (1초 단위 깜박임 필터링)
# =========================================================================
def apply_pwm_filter(self, raw_pwm):
if 15 <= raw_pwm <= 90:
self.last_valid_pwm = raw_pwm
return raw_pwm
if raw_pwm == 0:
return self.last_valid_pwm
return 0
def filter_majority_signal(self, signal_name: str, current_value: bool,
window_size: int = 5) -> bool:
"""
과반수 기반 디지털 신호 필터 (매 프레임 깜박이는 신호용)
최근 window_size개 값 중 과반수가 ON이면 ON 반환
SYSTEM ACTIVE처럼 매 초 깜박이는 신호에 적용
"""
state_key = f"maj_{signal_name}"
if state_key not in self._signal_filter_state:
self._signal_filter_state[state_key] = {
'history': [],
}
state = self._signal_filter_state[state_key]
state['history'].append(current_value)
# window_size 유지
if len(state['history']) > window_size:
state['history'].pop(0)
# 과반수 계산
on_count = sum(state['history'])
return on_count > len(state['history']) // 2
def filter_digital_signal(self, signal_name: str, current_value: bool,
stable_frames: int = 3) -> bool:
"""
디지털 신호 노이즈 필터링 (1초 깜박임 방지) - 완화된 버전
Args:
signal_name: 신호 이름 (상태 추적용)
current_value: 현재 값
stable_frames: 값이 변경되기 위해 필요한 연속 프레임 수 (기본 3프레임)
Logic:
- 첫 번째 값은 그대로 반영 (초기화)
- 이후 값이 변경되려면 stable_frames 연속 같은 값이어야 함
- 깜박임(1-2프레임 변동)은 무시됨
"""
if signal_name not in self._signal_filter_state:
# 초기값은 현재 값으로 설정 (첫 프레임부터 정상 표시)
self._signal_filter_state[signal_name] = {
'stable_value': current_value, # 현재 값으로 초기화
'pending_value': current_value,
'same_count': 0,
}
return current_value
state = self._signal_filter_state[signal_name]
if current_value == state['stable_value']:
# 안정값과 같으면 유지
state['pending_value'] = current_value
state['same_count'] = 0
return state['stable_value']
elif current_value == state['pending_value']:
# 대기값과 같으면 카운트 증가
state['same_count'] += 1
if state['same_count'] >= stable_frames:
state['stable_value'] = current_value
state['same_count'] = 0
return state['stable_value']
else:
# 새로운 값이면 대기값 갱신
state['pending_value'] = current_value
state['same_count'] = 1
return state['stable_value']
def filter_numeric_signal(self, signal_name: str, current_value: float,
min_valid: float = 0, stable_frames: int = 3) -> float:
"""
숫자 신호 노이즈 필터링 (1초 깜박임 방지) - 완화된 버전
Args:
signal_name: 신호 이름
current_value: 현재 값
min_valid: 유효 최소값 (기본 0 = 모든 값 허용)
stable_frames: 값이 변경되기 위해 필요한 연속 프레임 수
Logic:
- 첫 번째 값은 그대로 반영
- 이후 값 변경은 stable_frames 연속 같은 값이어야 함
"""
state_key = f"num_{signal_name}"
if state_key not in self._signal_filter_state:
self._signal_filter_state[state_key] = {
'stable_value': current_value, # 현재 값으로 초기화
'pending_value': current_value,
'same_count': 0,
}
return current_value
state = self._signal_filter_state[state_key]
# 같은 값이면 유지
if current_value == state['stable_value']:
state['pending_value'] = current_value
state['same_count'] = 0
return state['stable_value']
# 다른 값이면 카운트
if current_value == state['pending_value']:
state['same_count'] += 1
if state['same_count'] >= stable_frames:
state['stable_value'] = current_value
state['same_count'] = 0
else:
state['pending_value'] = current_value
state['same_count'] = 1
return state['stable_value']
def filter_string_signal(self, signal_name: str, current_value: str,
stable_frames: int = 3) -> str:
"""
문자열 신호 노이즈 필터링 (1초 깜박임 방지)
Args:
signal_name: 신호 이름
current_value: 현재 값
stable_frames: 안정된 것으로 인정할 연속 프레임 수
Logic:
- 값이 변경되면 stable_frames 동안 이전 값 유지
- 새 값이 stable_frames 연속되면 변경 인정
"""
state_key = f"str_{signal_name}"
if state_key not in self._signal_filter_state:
self._signal_filter_state[state_key] = {
'stable_value': current_value,
'pending_value': current_value,
'same_count': 0,
}
state = self._signal_filter_state[state_key]
if current_value == state['stable_value']:
# 안정값과 같으면 유지
state['pending_value'] = current_value
state['same_count'] = 0
return state['stable_value']
elif current_value == state['pending_value']:
# 대기값과 같으면 카운트 증가
state['same_count'] += 1
if state['same_count'] >= stable_frames:
state['stable_value'] = current_value
state['same_count'] = 0
return state['stable_value']
else:
# 새로운 값이면 대기값 갱신
state['pending_value'] = current_value
state['same_count'] = 1
return state['stable_value']
def filter_mode_signal(self, signal_name: str, current_mode: str,
stable_frames: int = 3) -> str:
"""
모드 신호 노이즈 필터링 (FA/AUTO/MCS 등 모드 전환 시)
Logic:
- None이 아닌 모드가 들어오면 즉시 표시
- None이 들어오면 stable_frames 동안 이전 모드 유지
- None이 stable_frames 연속되면 None으로 인정
"""
state_key = f"mode_{signal_name}"
if state_key not in self._signal_filter_state:
self._signal_filter_state[state_key] = {
'stable_mode': None,
'none_count': 0,
}
state = self._signal_filter_state[state_key]
if current_mode is not None:
state['none_count'] = 0
state['stable_mode'] = current_mode
return current_mode
else:
state['none_count'] += 1
if state['none_count'] >= stable_frames:
state['stable_mode'] = None
return state['stable_mode']
def _infer_valid_sources(self, data_list, signal_attr_map, zero_ratio_threshold=0.98, min_samples=50):
"""
소스별로 '거의 항상 0'인 신호를 찾아 유효 소스를 추정한다.
- 특정 소스가 해당 신호에서 0 비율이 매우 높고, 다른 소스는 그렇지 않으면 해당 소스를 제외.
"""
stats = {k: {} for k in signal_attr_map.keys()}
for d in data_list or []:
src = getattr(d, "source", None)
if src is None:
continue
for key, attr in signal_attr_map.items():
val = getattr(d, attr, None)
if val is None:
continue
s = stats[key].setdefault(src, {"n": 0, "zero": 0})
s["n"] += 1
if val == 0:
s["zero"] += 1
valid_sources_map = {}
for key, by_src in stats.items():
if not by_src or len(by_src) <= 1:
valid_sources_map[key] = None
continue
ratios = {}
for src, s in by_src.items():
if s["n"] < min_samples:
continue
ratios[src] = (s["zero"] / s["n"]) if s["n"] else 0
if not ratios:
valid_sources_map[key] = None
continue
low_zero = [src for src, r in ratios.items() if r < zero_ratio_threshold]
high_zero = [src for src, r in ratios.items() if r >= zero_ratio_threshold]
if low_zero and high_zero:
valid_sources_map[key] = set(low_zero)
else:
valid_sources_map[key] = None
return valid_sources_map
def _get_source_filtered_value(self, data, key, attr, default=0):
"""
source 기반으로 유효 값만 사용. 유효하지 않은 소스면 직전 유효값 유지.
"""
val = getattr(data, attr, default)
src = getattr(data, "source", None)
valid_sources = self._signal_valid_sources.get(key)
is_valid = (valid_sources is None) or (src in valid_sources)
if is_valid:
self._source_filtered_last[key] = val
return val
if key in self._source_filtered_last:
return self._source_filtered_last[key]
return val if val is not None else default
# =========================================================================
# 데이터 업데이트
# =========================================================================
def update_view(self, index, data, source_id):
if not data:
return
# === INFO 카드 업데이트 ===
speed = self._get_source_filtered_value(data, "trainspeed", "trainspeed", default=0)
raw_pwm = self._get_source_filtered_value(data, "pwm_value", "pwm_value", default=0)
filtered_pwm = self.apply_pwm_filter(raw_pwm)
limit_spd = self.filter_numeric_signal("limitspeed", getattr(data, 'limitspeed', 0))
raw_dtg = self._get_source_filtered_value(data, "dtg", "dtg", default=0)
dtg = self.filter_numeric_signal("dtg", raw_dtg, min_valid=1)
atc_code = getattr(data, 'atc_code', '-')
time_str = getattr(data, 'time', '-')[-8:]
if w := self.signal_widgets.get("info_time"):
w.set_value(time_str)
if w := self.signal_widgets.get("info_speed"):
w.set_value(f"{speed:.1f}")
if hasattr(w, "set_limit"):
w.set_limit(limit_spd)
if hasattr(w, "set_atc_code"):
w.set_atc_code(atc_code)
if w := self.signal_widgets.get("info_limit"):
w.set_value(int(limit_spd))
if w := self.signal_widgets.get("info_dtg"):
w.set_value(f"{dtg:.0f}")
if w := self.signal_widgets.get("info_pwm"):
w.set_value(filtered_pwm)
if w := self.signal_widgets.get("info_atc_code"):
w.set_value(str(atc_code))
# === 신호 카드 업데이트 ===
self._update_signals(data)
def _update_signals(self, data):
"""신호 카드 내 모든 신호 업데이트 (노이즈 필터링 적용)"""
# ===================================================================
# ATC 카드
# ===================================================================
tc1 = self.filter_digital_signal("tc1", getattr(data, 'tc1', False))
tc2 = self.filter_digital_signal("tc2", getattr(data, 'tc2', False))
cab_text = "TC1" if tc1 else ("TC2" if tc2 else "OFF")
if w := self.signal_widgets.get("atc_cab"):
w.set_value(cab_text)
# ===================================================================
# DO 카드 (모든 출력 신호에 필터 적용)
# ===================================================================
do_signals = [
("ebp", "do_ebp"), ("ebm", "do_ebm"), ("fsb", "do_fsb"),
("zvr", "do_zvr"), ("edr", "do_edr"), ("edl", "do_edl")
]
for name, attr in do_signals:
val = self.filter_digital_signal(attr, getattr(data, attr, False))
if w := self.signal_widgets.get(f"do_{name}"):
w.set_status(val)
# ===================================================================
# DI 카드 (모든 입력 신호에 필터 적용)
# ===================================================================
# HCR, TCR, START
if w := self.signal_widgets.get("di_hcr"):
w.set_status(self.filter_digital_signal("di_hcr", getattr(data, 'hcr', False)))
if w := self.signal_widgets.get("di_tcr"):
w.set_status(self.filter_digital_signal("di_tcr", getattr(data, 'tcr', False)))
if w := self.signal_widgets.get("di_start"):
w.set_status(self.filter_digital_signal("di_start", getattr(data, 'ato_start_btn', False)))
# 운전모드 (DATA)
raw_mode = None
for m in ["fa", "auto", "mcs", "yard", "fmc"]:
if self.filter_digital_signal(f"di_{m}", getattr(data, m, False)):
raw_mode = m.upper()
break
mode_text = raw_mode or "없음"
if w := self.signal_widgets.get("di_운전모드"):
w.set_value(mode_text)
# PSD
psd_open = self.filter_digital_signal("psd_open", getattr(data, 'psd_open', False))
psd_close = self.filter_digital_signal("psd_close", getattr(data, 'psd_close', False))
psd_text = "열림" if psd_open else ("닫힘" if psd_close else "없음")
if w := self.signal_widgets.get("di_psd"):
w.set_value(psd_text)
# DOOR
door_open = self.filter_digital_signal("door_open", getattr(data, 'door_open', False))
door_close = self.filter_digital_signal("door_close", getattr(data, 'door_close', False))
door_text = "열림" if door_open else ("닫힘" if door_close else "없음")
if w := self.signal_widgets.get("di_door"):
w.set_value(door_text)
# 마스콘
mc_eb = self.filter_digital_signal("mascon_eb", getattr(data, 'mascon_eb', False))
mc_br = self.filter_digital_signal("mascon_br", getattr(data, 'mascon_br', False))
mc_dr = self.filter_digital_signal("mascon_dr", getattr(data, 'mascon_dr', False))
raw_mc_mode = "EB" if mc_eb else ("BR" if mc_br else ("DR" if mc_dr else None))
mc_text = raw_mc_mode or "없음"
if w := self.signal_widgets.get("di_마스콘"):
w.set_value(mc_text)
# 역전기
fwd = self.filter_digital_signal("reversingrod_fwd", getattr(data, 'reversingrod_fwd', False))
neu = self.filter_digital_signal("reversingrod_neu", getattr(data, 'reversingrod_neu', False))
rvs = self.filter_digital_signal("reversingrod_rvs", getattr(data, 'reversingrod_rvs', False))
rev_text = "전진" if fwd else ("중립" if neu else ("후진" if rvs else "없음"))
if w := self.signal_widgets.get("di_역전기"):
w.set_value(rev_text)
# ===================================================================
# ATO 카드
# ===================================================================
if w := self.signal_widgets.get("ato_tasc"):
w.set_status(self.filter_digital_signal("tasc", getattr(data, 'tasc', False)))
if w := self.signal_widgets.get("ato_tasc_db"):
w.set_status(self.filter_digital_signal("tascdb", getattr(data, 'tascdb', False)))
if w := self.signal_widgets.get("ato_ato_eb"):
w.set_status(self.filter_digital_signal("ato_eb_req", getattr(data, 'ato_eb_req', False)))
# ATO DATA
if w := self.signal_widgets.get("ato_marker"):
marker_val = self.filter_string_signal("ato_marker", str(getattr(data, 'marker', '-')))
w.set_value(marker_val)
if w := self.signal_widgets.get("ato_osc"):
raw_osc = self._get_source_filtered_value(data, "osc_f", "osc_f", default=0)
osc = self.filter_numeric_signal("osc_f", raw_osc, min_valid=0)
w.set_value(f"{osc:.1f}" if osc else "-")
if w := self.signal_widgets.get("ato_pwm"):
pwm_val = self.apply_pwm_filter(self._get_source_filtered_value(data, "pwm_value", "pwm_value", default=0))
w.set_value(pwm_val)
if w := self.signal_widgets.get("ato_dtg"):
dtg_val = self.filter_numeric_signal("dtg_ato", self._get_source_filtered_value(data, "dtg", "dtg", default=0), min_valid=1)
w.set_value(f"{dtg_val:.0f}")
# ===================================================================
# RLY (ATO) 카드
# ===================================================================
rly_signals = [
("dr", "trac_dr"), ("br", "trac_br"), ("adc", "adc"),
("adol", "adol"), ("ador", "ador"), ("osc", "osc"),
("kur", "kur"), ("sel", "start_enable")
]
for name, attr in rly_signals:
val = self.filter_digital_signal(f"rly_{attr}", getattr(data, attr, False))
if w := self.signal_widgets.get(f"rly_{name}"):
w.set_status(val)
# ===================================================================
# FAIL 카드
# ===================================================================
fail_signals = [
("ato_r", "fail_atcr"), ("ato_c", "fail_atoc"), ("tcms", "fail_tcms"),
("tacho1", "fail_tacho1"), ("tacho2", "fail_tacho2")
]
for name, attr in fail_signals:
val = self.filter_digital_signal(f"fail_{attr}", getattr(data, attr, False))
if w := self.signal_widgets.get(f"fail_{name}"):
w.set_status(val)
# ===================================================================
# TWC 카드
# ===================================================================
if w := self.signal_widgets.get("twc_dcw"):
w.set_status(self.filter_digital_signal("door_close_warning", getattr(data, 'door_close_warning', False)))
if w := self.signal_widgets.get("twc_twx_tx"):
w.set_status(self.filter_digital_signal("twct_enable", getattr(data, 'twct_enable', False)))
if w := self.signal_widgets.get("twc_berth"):
w.set_status(self.filter_digital_signal("trainberth", getattr(data, 'trainberth', False)))
if w := self.signal_widgets.get("twc_w.door"):
w.set_status(self.filter_digital_signal("wrongdoor", getattr(data, 'wrongdoor', False)))
# TWC DATA
if w := self.signal_widgets.get("twc_열번"):
w.set_value(getattr(data, 'trainno', '-'))
if w := self.signal_widgets.get("twc_현재역"):
w.set_value(getattr(data, 'pstn', '-'))
if w := self.signal_widgets.get("twc_다음역"):
w.set_value(getattr(data, 'nstn', '-'))
if w := self.signal_widgets.get("twc_종착역"):
w.set_value(getattr(data, 'dstn', '-'))
if w := self.signal_widgets.get("twc_DOOR"):
w.set_value(getattr(data, 'nextdoor', '-'))
# ===================================================================
# ETC 카드
# ===================================================================
# SYSTEM ACTIVE는 매 프레임 깜박이므로 과반수 필터 사용
if w := self.signal_widgets.get("etc_sys_act"):
val = self.filter_majority_signal("system_active", getattr(data, 'system_active', False))
w.set_status(val)
# 나머지 ETC 신호는 일반 필터 사용
etc_signals = [
("inch", "inching"), ("cs", "trac_cs"),
("평상", "nomal"), ("회복", "recovery"), ("o.spd", "over_spd_warning"),
("사전제동", "pre_brake"), ("가속제한", "limit_drive"),
("-70↑", "sh_stop1"), ("+70↑", "ov_stop1"),
("tdir_a", "tacho_dir_a"), ("tdir_b", "tacho_dir_b")
]
for name, attr in etc_signals:
val = self.filter_digital_signal(f"etc_{attr}", getattr(data, attr, False))
if w := self.signal_widgets.get(f"etc_{name}"):
w.set_status(val)
# ETC DATA (문자열 필터 적용)
if w := self.signal_widgets.get("etc_init_pdt"):
val = self.filter_string_signal("etc_ipdt", str(getattr(data, 'ipdt', '-')))
w.set_value(val)
if w := self.signal_widgets.get("etc_mnul_pdt"):
val = self.filter_string_signal("etc_mpdt", str(getattr(data, 'mpdt', '-')))
w.set_value(val)
if w := self.signal_widgets.get("etc_tcms_dr"):
val = self.filter_string_signal("etc_tcmsdoor", str(getattr(data, 'tcmsdoor', '-')))
w.set_value(val)
if w := self.signal_widgets.get("etc_door_md"):
val = self.filter_string_signal("etc_doormod", str(getattr(data, 'doormod', '-')))
w.set_value(val)
# ===================================================================
# VDI A/B/C/D, VDO A/B - 모든 신호에 필터 적용
# ===================================================================
# VDI A
vdi_a_map = {
"hcr": "vdia_hcr", "tcr": "vdia_tcr", "fa": "vdia_fa",
"auto": "vdia_auto", "mcs": "vdia_mcs", "yard": "vdia_yard",
"fmc": "vdia_fmc", "d_open": "vdia_dooropen", "d_close": "vdia_doorclose",
"mc_eb": "vdia_masconeb", "mc_br": "vdia_masconbr", "mc_dr": "vdia_mascondr",
"fwd": "vdia_fwd", "neu": "vdia_neu", "rvs": "vdia_rvs"
}
for widget_name, attr in vdi_a_map.items():
val = self.filter_digital_signal(attr, getattr(data, attr, False))
if w := self.signal_widgets.get(f"vdi_a_{widget_name}"):
w.set_status(val)
# VDI B
vdi_b_map = {
"hcr": "vdib_hcr", "tcr": "vdib_tcr", "fa": "vdib_fa",
"auto": "vdib_auto", "mcs": "vdib_mcs", "yard": "vdib_yard",
"fmc": "vdib_fmc", "d_open": "vdib_dooropen", "d_close": "vdib_doorclose",
"mc_eb": "vdib_masconeb", "mc_br": "vdib_masconbr", "mc_dr": "vdib_mascondr",
"fwd": "vdib_fwd", "neu": "vdib_neu", "rvs": "vdib_rvs"
}
for widget_name, attr in vdi_b_map.items():
val = self.filter_digital_signal(attr, getattr(data, attr, False))
if w := self.signal_widgets.get(f"vdi_b_{widget_name}"):
w.set_status(val)
# VDI C
vdi_c_map = {
"psd_op": "vdic_psdopen", "psd_cl": "vdic_psdclose", "start": "vdic_startbtn",
"unit1": "vdic_unit1", "ebpfb": "vdic_ebpfb", "ebmfb": "vdic_ebmfb",
"fsb_fb": "vdic_fsbfb", "zvr_fb": "vdic_zvrfb", "edr_fb": "vdic_edrfb",
"edl_fb": "vdic_edlfb", "tc1": "vdic_tc1", "tc2": "vdic_tc2"
}
for widget_name, attr in vdi_c_map.items():
val = self.filter_digital_signal(attr, getattr(data, attr, False))
if w := self.signal_widgets.get(f"vdi_c_{widget_name}"):
w.set_status(val)
# VDI D
vdi_d_map = {
"psd_op": "vdid_psdopen", "psd_cl": "vdid_psdclose", "start": "vdid_startbtn",
"unit1": "vdid_unit1", "ebpfb": "vdid_ebpfb", "ebmfb": "vdid_ebmfb",
"fsb_fb": "vdid_fsbfb", "zvr_fb": "vdid_zvrfb", "edr_fb": "vdid_edrfb",
"edl_fb": "vdid_edlfb", "tc1": "vdid_tc1", "tc2": "vdid_tc2"
}
for widget_name, attr in vdi_d_map.items():
val = self.filter_digital_signal(attr, getattr(data, attr, False))
if w := self.signal_widgets.get(f"vdi_d_{widget_name}"):
w.set_status(val)
# VDO A
vdo_a_map = {
"ebp": "vdoa_ebp", "ebm": "vdoa_ebm", "fsb": "vdoa_fsb",
"zvr": "vdoa_zvr", "edl": "vdoa_edl", "edr": "vdoa_edr"
}
for widget_name, attr in vdo_a_map.items():
val = self.filter_digital_signal(attr, getattr(data, attr, False))
if w := self.signal_widgets.get(f"vdo_a_{widget_name}"):
w.set_status(val)
# VDO B
vdo_b_map = {
"ebp": "vdob_ebp", "ebm": "vdob_ebm", "fsb": "vdob_fsb",
"zvr": "vdob_zvr", "edl": "vdob_edl", "edr": "vdob_edr"
}
for widget_name, attr in vdo_b_map.items():
val = self.filter_digital_signal(attr, getattr(data, attr, False))
if w := self.signal_widgets.get(f"vdo_b_{widget_name}"):
w.set_status(val)