import struct import os from dataclasses import dataclass, field from typing import List, Optional # ------------------------------------------------------------------------- # 1. 데이터 구조 정의 (C# DOSItoMMIClass 완벽 대응) # ------------------------------------------------------------------------- @dataclass class DOSItoMMIClass: # [기본 정보] seq: int = 0 source: int = 0 # 16/80: 1계, 그외: 2계 time: str = "" trainno: str = "" # 열차번호 # [속도 및 아날로그 값] trainspeed: float = 0.0 # 현재 속도 limitspeed: int = 0 # 제한 속도 pwm_value: int = 0 # PWM dtg: float = 0.0 # 잔여 거리 ato_limitSpeed: int = 0 # ATO 제한 속도 tasc_value: int = 0 # 그래프용(ato_limitSpeed와 동일) # [ATC/ATO 주파수 및 코드 (XOR 복호화 데이터)] osc_f: float = 0.0 # OSC 주파수 atc_code_carrier_f: float = 0.0 # ATC Carrier Freq atc_code_f: float = 0.0 # ATC Code Freq atc_carrier_f: str = "-" # F1~F4 문자열 # [ATC 카드 상태 - Byte 13, 14, 19] do_zvr: bool = False; do_edl: bool = False; do_edr: bool = False do_fsb: bool = False; do_ebm: bool = False; do_ebp: bool = False system_active: bool = False; over_spd_warning: bool = False tcr: bool = False; hcr: bool = False door_open: bool = False; door_close: bool = False psd_open: bool = False; psd_close: bool = False atc_status: str = "-" # INITIAL, MANUAL, ACTIVE, STANDBY wheelcheck: bool = False mpdt: str = "-" # START, NG, OK ipdt: str = "-" # NG, OK # [운전 모드 및 마스콘 - Byte 15, 16] fa: bool = False; auto: bool = False; mcs: bool = False yard: bool = False; fmc: bool = False reverser_rvs: bool = False; reverser_fwd: bool = False; reverser_neu: bool = False ato_start_btn: bool = False; ato_eb_req: bool = False tacho_dir_a: bool = False; tacho_dir_b: bool = False mascon_neu: bool = False; mascon_dr: bool = False mascon_br: bool = False; mascon_eb: bool = False # [FAIL 카드 정보 - Byte 20] fail_atcr: bool = False; fail_atoc: bool = False; fail_tcms: bool = False fail_tacho2: bool = False; fail_tacho1: bool = False # [마커 및 TWC - Byte 21, 28-30, 34] recovery: bool = False; nomal: bool = False; tasc: bool = False marker: str = "-" # ATS, PGX, PG3-2, PG2, PG1 pstn: int = 0; nstn: int = 0; dstn: int = 0 # 전역, 현역, 종착역 twct_enable: bool = False # TWC 수신 여부 door_close_warning: bool = False # DCW wrongdoor: bool = False nextdoor: str = "-" # [ATO RLY 및 기타 - Byte 23, 24, 25] trac_dr: bool = False; trac_br: bool = False; trac_cs: bool = False ador: bool = False; adol: bool = False; adc: bool = False start_enable: bool = False; trainberth: bool = False tc2: bool = False; tc1: bool = False; tascdb: bool = False tcmsdoor: str = "-" # LDO, RDO doormod: str = "-" # A/A, A/M, M/M pre_brake: bool = False; limit_drive: bool = False ov_stop1: bool = False; ov_stop2: bool = False sh_stop1: bool = False; sh_stop2: bool = False # [ATC 코드 상세 - Byte 35] atc_code: str = "-" # 01, 02, 25 ... atc_code_str: str = "-" # 호환용(동일 값) atc_code_val: int = 0 # 그래프용 속도값 osc_f0_ok: bool = False # [에러 메시지 모음] diagnostic_err_msg: str = "" atc_err_msg: str = "" interface_err_msg: str = "" ato_err_msg: str = "" # [VDI/VDO 카드 (DIO) - Byte 63~78] # VDI A vdia_stat: str = ""; vdia_input: str = "" vdia_rvs: bool = False; vdia_neu: bool = False; vdia_fwd: bool = False vdia_mascondr: bool = False; vdia_masconbr: bool = False; vdia_masconeb: bool = False vdia_doorclose: bool = False; vdia_dooropen: bool = False vdia_fmc: bool = False; vdia_yard: bool = False; vdia_mcs: bool = False vdia_auto: bool = False; vdia_fa: bool = False; vdia_tcr: bool = False; vdia_hcr: bool = False # VDI B vdib_stat: str = ""; vdib_input: str = "" vdib_rvs: bool = False; vdib_neu: bool = False; vdib_fwd: bool = False vdib_mascondr: bool = False; vdib_masconbr: bool = False; vdib_masconeb: bool = False vdib_doorclose: bool = False; vdib_dooropen: bool = False vdib_fmc: bool = False; vdib_yard: bool = False; vdib_mcs: bool = False vdib_auto: bool = False; vdib_fa: bool = False; vdib_tcr: bool = False; vdib_hcr: bool = False # VDI C vdic_stat: str = ""; vdic_input: str = "" vdic_tc2: bool = False; vdic_tc1: bool = False vdic_edlfb: bool = False; vdic_edrfb: bool = False; vdic_zvrfb: bool = False vdic_fsbfb: bool = False; vdic_ebmfb: bool = False; vdic_ebpfb: bool = False vdic_unit1: bool = False; vdic_startbtn: bool = False vdic_psdclose: bool = False; vdic_psdopen: bool = False # VDI D vdid_stat: str = ""; vdid_input: str = "" vdid_tc2: bool = False; vdid_tc1: bool = False vdid_edlfb: bool = False; vdid_edrfb: bool = False; vdid_zvrfb: bool = False vdid_fsbfb: bool = False; vdid_ebmfb: bool = False; vdid_ebpfb: bool = False vdid_unit1: bool = False; vdid_startbtn: bool = False vdid_psdclose: bool = False; vdid_psdopen: bool = False # VDO A/B vdoa_stat: str = ""; vdob_stat: str = "" vdoa_edl: bool = False; vdoa_edr: bool = False; vdoa_zvr: bool = False vdoa_fsb: bool = False; vdoa_ebm: bool = False; vdoa_ebp: bool = False vdob_edl: bool = False; vdob_edr: bool = False; vdob_zvr: bool = False vdob_fsb: bool = False; vdob_ebm: bool = False; vdob_ebp: bool = False # ------------------------------------------------------------------------- # 2. 핵심 파싱 로직 (C# DDOSItoMMI.cs 정밀 이식) # ------------------------------------------------------------------------- class DDOSItoMMI: @staticmethod def set_data(data: bytes) -> DOSItoMMIClass: mmi = DOSItoMMIClass() # 데이터 길이 보호 if len(data) < 84: return mmi # [Header] mmi.seq = data[1] mmi.source = data[3] # [Time] "20YY.MM.DD HH:mm:ss" try: mmi.time = (f"20{data[4]:02x}.{data[5]:02x}.{data[6]:02x} " f"{data[7]:02x}:{data[8]:02x}:{data[9]:02x}") except: mmi.time = "Invalid Time" # [Speed] Byte 10-11 speed_raw = (data[10] << 8) | data[11] mmi.trainspeed = speed_raw / 10.0 mmi.limitspeed = data[12] # [DO ATC / Byte 13] mmi.do_zvr = (data[13] & 32) > 0 mmi.do_edl = (data[13] & 16) > 0 mmi.do_edr = (data[13] & 8) > 0 mmi.do_fsb = (data[13] & 4) > 0 mmi.do_ebm = (data[13] & 2) > 0 mmi.do_ebp = (data[13] & 1) > 0 # [Status / Byte 14] mmi.system_active = (data[14] & 128) > 0 mmi.over_spd_warning = (data[14] & 64) > 0 mmi.tcr = (data[14] & 32) > 0 mmi.hcr = (data[14] & 16) > 0 mmi.door_open = (data[14] & 8) > 0 mmi.door_close = (data[14] & 4) > 0 mmi.psd_open = (data[14] & 2) > 0 mmi.psd_close = (data[14] & 1) > 0 # [Mode / Byte 15] mmi.fa = (data[15] & 128) > 0 mmi.auto = (data[15] & 64) > 0 mmi.mcs = (data[15] & 32) > 0 mmi.yard = (data[15] & 16) > 0 mmi.fmc = (data[15] & 8) > 0 mmi.reverser_rvs = (data[15] & 4) > 0 mmi.reverser_fwd = (data[15] & 2) > 0 mmi.reverser_neu = (data[15] & 1) > 0 # [Mascon & ATO / Byte 16] mmi.ato_start_btn = (data[16] & 128) > 0 mmi.ato_eb_req = (data[16] & 64) > 0 mmi.tacho_dir_a = (data[16] & 32) > 0 mmi.tacho_dir_b = (data[16] & 16) > 0 mmi.mascon_neu = (data[16] & 0) > 0 # Logic Copy mmi.mascon_dr = (data[16] & 2) > 0 mmi.mascon_br = (data[16] & 4) > 0 mmi.mascon_eb = (data[16] & 8) > 0 # [ATC Status / Byte 19] val_19 = data[19] & 192 if val_19 == 0: mmi.atc_status = "INITIAL PDT" elif val_19 == 192: mmi.atc_status = "MANUAL PDT" elif val_19 == 64: mmi.atc_status = "ATC ACTIVE" elif val_19 == 128: mmi.atc_status = "ATC STANDBY" mmi.wheelcheck = (data[19] & 32) > 0 val_mpdt = data[19] & 28 if val_mpdt == 16: mmi.mpdt = "START" elif val_mpdt == 8: mmi.mpdt = "NG" elif val_mpdt == 4: mmi.mpdt = "OK" val_ipdt = data[19] & 3 if val_ipdt == 2: mmi.ipdt = "NG" elif val_ipdt == 1: mmi.ipdt = "OK" # [Fail / Byte 20] mmi.fail_atcr = (data[20] & 128) > 0 mmi.fail_atoc = (data[20] & 64) > 0 mmi.fail_tcms = (data[20] & 32) > 0 mmi.fail_tacho2 = (data[20] & 2) > 0 mmi.fail_tacho1 = (data[20] & 1) > 0 # [Marker / Byte 21] mmi.recovery = (data[21] & 128) > 0 mmi.nomal = (data[21] & 64) > 0 mmi.tasc = (data[21] & 32) > 0 val_21 = data[21] & 31 if val_21 == 16: mmi.marker = "ATS" elif val_21 == 8: mmi.marker = "PGX" elif val_21 == 4: mmi.marker = "PG3-2" elif val_21 == 2: mmi.marker = "PG2" elif val_21 == 1: mmi.marker = "PG1" # [PWM / Byte 22] mmi.pwm_value = data[22] # [RLY ATO / Byte 23] mmi.trac_dr = (data[23] & 128) > 0 mmi.trac_br = (data[23] & 64) > 0 mmi.trac_cs = (data[23] & 32) > 0 mmi.ador = (data[23] & 16) > 0 mmi.adol = (data[23] & 8) > 0 mmi.adc = (data[23] & 4) > 0 mmi.start_enable = (data[23] & 2) > 0 mmi.trainberth = (data[23] & 1) > 0 # [TCMS / Byte 24] mmi.tc2 = (data[24] & 128) > 0 mmi.tc1 = (data[24] & 64) > 0 mmi.tascdb = (data[24] & 32) > 0 val_tcms_door = data[24] & 24 if val_tcms_door == 16: mmi.tcmsdoor = "LDO" elif val_tcms_door == 8: mmi.tcmsdoor = "RDO" elif val_tcms_door == 24: mmi.tcmsdoor = "L,RDO" val_door_mode = data[24] & 7 if val_door_mode == 4: mmi.doormod = "A / A" elif val_door_mode == 2: mmi.doormod = "A / M" elif val_door_mode == 1: mmi.doormod = "M / M" # [ETC / Byte 25] mmi.pre_brake = (data[25] & 128) > 0 mmi.limit_drive = (data[25] & 64) > 0 mmi.ov_stop1 = (data[25] & 8) > 0 mmi.ov_stop2 = (data[25] & 4) > 0 mmi.sh_stop1 = (data[25] & 2) > 0 mmi.sh_stop2 = (data[25] & 1) > 0 # [Train No / Byte 26-27] mmi.trainno = f"{(data[26] << 8 | data[27]):04X}" # [Stations / Byte 28-30] mmi.pstn = data[28] mmi.nstn = data[29] mmi.dstn = data[30] # [DTG / Byte 31-32] num_dtg = (data[31] << 8) | data[32] if mmi.tasc: mmi.dtg = num_dtg / 100.0 else: if mmi.trainspeed == 0.0 and not mmi.trac_dr: if (num_dtg & 32768) != 0: num_dtg -= 65536 mmi.dtg = (num_dtg / 100.0) * -1.0 else: mmi.dtg = num_dtg / 10.0 # [ATO Limit / Byte 33] mmi.ato_limitSpeed = data[33] mmi.tasc_value = data[33] # [TWC / Byte 34] # kur(128), osc(64), inching(32), twct_enable(16), dcw(8), wrongdoor(4) mmi.twct_enable = (data[34] & 16) > 0 mmi.door_close_warning = (data[34] & 8) > 0 mmi.wrongdoor = (data[34] & 4) > 0 val_nextdoor = data[34] & 3 if val_nextdoor == 2: mmi.nextdoor = "RIGHT" elif val_nextdoor == 1: mmi.nextdoor = "LEFT" # [ATC Code / Byte 35] # 상위 4비트: Code Index atc_idx = (data[35] & 240) >> 4 atc_str_map = {0:"02", 1:"01", 2:"25", 3:"40", 4:"55", 5:"65", 6:"75", 7:"DE", 8:"DW"} mmi.atc_code = atc_str_map.get(atc_idx, "-") mmi.atc_code_str = mmi.atc_code # 그래프용 ATC 코드 속도 매핑 유지 atc_speed_map = {0: 0, 1: 0, 2: 25, 3: 40, 4: 55, 5: 65, 6: 75, 7: -1, 8: -1} mmi.atc_code_val = atc_speed_map.get(atc_idx, 0) mmi.osc_f0_ok = (data[35] & 8) > 0 # 하위 3비트: Carrier Freq carr_idx = data[35] & 7 if carr_idx == 1: mmi.atc_carrier_f = "F1" elif carr_idx == 2: mmi.atc_carrier_f = "F2" elif carr_idx == 3: mmi.atc_carrier_f = "F3" elif carr_idx == 4: mmi.atc_carrier_f = "F4" # [Frequency Calculations (XOR Obfuscation) / Byte 36-39, 55, 58] mmi.atc_code_carrier_f = ((data[36] ^ 65) << 8 | (data[37] ^ 82)) * 10.0 mmi.atc_code_f = ((data[38] ^ 99) << 8 | (data[39] ^ 116)) / 10.0 mmi.osc_f = ((data[55] ^ 99) << 8 | (data[58] ^ 100)) / 10.0 # [VDI A - Byte 63, 64, 65] mmi.vdia_rvs = (data[63] & 128) > 0 mmi.vdia_neu = (data[63] & 64) > 0 mmi.vdia_fwd = (data[63] & 32) > 0 mmi.vdia_mascondr = (data[63] & 16) > 0 mmi.vdia_masconbr = (data[63] & 8) > 0 mmi.vdia_masconeb = (data[63] & 4) > 0 mmi.vdia_doorclose = (data[63] & 2) > 0 mmi.vdia_dooropen = (data[63] & 1) > 0 mmi.vdia_fmc = (data[64] & 64) > 0 mmi.vdia_yard = (data[64] & 32) > 0 mmi.vdia_mcs = (data[64] & 16) > 0 mmi.vdia_auto = (data[64] & 8) > 0 mmi.vdia_fa = (data[64] & 4) > 0 mmi.vdia_tcr = (data[64] & 2) > 0 mmi.vdia_hcr = (data[64] & 1) > 0 mmi.vdia_stat = f"{data[65]:02X}" # [VDI B - Byte 66, 67, 68] mmi.vdib_rvs = (data[66] & 128) > 0 mmi.vdib_neu = (data[66] & 64) > 0 mmi.vdib_fwd = (data[66] & 32) > 0 mmi.vdib_mascondr = (data[66] & 16) > 0 mmi.vdib_masconbr = (data[66] & 8) > 0 mmi.vdib_masconeb = (data[66] & 4) > 0 mmi.vdib_doorclose = (data[66] & 2) > 0 mmi.vdib_dooropen = (data[66] & 1) > 0 mmi.vdib_fmc = (data[67] & 64) > 0 mmi.vdib_yard = (data[67] & 32) > 0 mmi.vdib_mcs = (data[67] & 16) > 0 mmi.vdib_auto = (data[67] & 8) > 0 mmi.vdib_fa = (data[67] & 4) > 0 mmi.vdib_tcr = (data[67] & 2) > 0 mmi.vdib_hcr = (data[67] & 1) > 0 mmi.vdib_stat = f"{data[68]:02X}" # [VDI C - Byte 69, 70, 71] mmi.vdic_tc2 = (data[69] & 128) > 0 mmi.vdic_tc1 = (data[69] & 64) > 0 mmi.vdic_edlfb = (data[69] & 32) > 0 mmi.vdic_edrfb = (data[69] & 16) > 0 mmi.vdic_zvrfb = (data[69] & 8) > 0 mmi.vdic_fsbfb = (data[69] & 4) > 0 mmi.vdic_ebmfb = (data[69] & 2) > 0 mmi.vdic_ebpfb = (data[69] & 1) > 0 mmi.vdic_unit1 = (data[70] & 128) > 0 mmi.vdic_startbtn = (data[70] & 4) > 0 mmi.vdic_psdclose = (data[70] & 2) > 0 mmi.vdic_psdopen = (data[70] & 1) > 0 mmi.vdic_stat = f"{data[71]:02X}" # [VDI D - Byte 72, 73, 74] mmi.vdid_tc2 = (data[72] & 128) > 0 mmi.vdid_tc1 = (data[72] & 64) > 0 mmi.vdid_edlfb = (data[72] & 32) > 0 mmi.vdid_edrfb = (data[72] & 16) > 0 mmi.vdid_zvrfb = (data[72] & 8) > 0 mmi.vdid_fsbfb = (data[72] & 4) > 0 mmi.vdid_ebmfb = (data[72] & 2) > 0 mmi.vdid_ebpfb = (data[72] & 1) > 0 mmi.vdid_unit1 = (data[73] & 128) > 0 mmi.vdid_startbtn = (data[73] & 4) > 0 mmi.vdid_psdclose = (data[73] & 2) > 0 mmi.vdid_psdopen = (data[73] & 1) > 0 mmi.vdid_stat = f"{data[74]:02X}" # [VDO A - Byte 75, 76] mmi.vdoa_edl = (data[75] & 32) > 0 mmi.vdoa_edr = (data[75] & 16) > 0 mmi.vdoa_zvr = (data[75] & 8) > 0 mmi.vdoa_fsb = (data[75] & 4) > 0 mmi.vdoa_ebm = (data[75] & 2) > 0 mmi.vdoa_ebp = (data[75] & 1) > 0 mmi.vdoa_stat = f"{data[76]:02X}" # [VDO B - Byte 77, 78] mmi.vdob_edl = (data[77] & 32) > 0 mmi.vdob_edr = (data[77] & 16) > 0 mmi.vdob_zvr = (data[77] & 8) > 0 mmi.vdob_fsb = (data[77] & 4) > 0 mmi.vdob_ebm = (data[77] & 2) > 0 mmi.vdob_ebp = (data[77] & 1) > 0 mmi.vdob_stat = f"{data[78]:02X}" return mmi # ------------------------------------------------------------------------- # 3. 파일 로더 # ------------------------------------------------------------------------- class LogLoader: def load(self, filepath: str) -> List[DOSItoMMIClass]: result_list = [] if not os.path.exists(filepath): return result_list file_size = os.path.getsize(filepath) with open(filepath, 'rb') as f: CHUNK_SIZE = 84 while f.tell() < file_size: chunk = f.read(CHUNK_SIZE) if len(chunk) < CHUNK_SIZE: break # 오류 방지용 try-except 추가 권장 try: mmi_data = DDOSItoMMI.set_data(chunk) result_list.append(mmi_data) except Exception as e: print(f"Error parsing chunk at {f.tell()}: {e}") return result_list