import os import numpy as np import os import numpy as np import os import numpy as np import os import numpy as np import polars as pl from typing import Optional class FastLogParser: CHUNK_SIZE = 84 @staticmethod def parse(input_path: str) -> Optional[pl.DataFrame]: """ Numpy 벡터 연산을 사용하여 MMI 바이너리 로그를 초고속으로 파싱 후 DataFrame 반환. """ if not os.path.exists(input_path): return None file_size = os.path.getsize(input_path) n_records = file_size // FastLogParser.CHUNK_SIZE if n_records == 0: return None try: with open(input_path, 'rb') as f: # 파일 전체를 한번에 메모리로 로드 (가장 빠름) raw_data = np.fromfile(f, dtype=np.uint8) # 잘린 마지막 청크 제거 및 Reshape limit = n_records * FastLogParser.CHUNK_SIZE raw_data = raw_data[:limit] # (Row수, 84바이트) 형태의 2차원 배열로 변환 m = raw_data.reshape(n_records, FastLogParser.CHUNK_SIZE) # --- [1. Basic Info] --- seq = m[:, 1] source = m[:, 3] # 16(0x10) or 80(0x50) -> 1계, 그 외 -> 2계 system_id = np.where((source == 16) | (source == 80), 1, 2).astype(np.uint8) # 이중계 판단 (32가 아니면 Break/Standby) is_break = (source != 32) # Time (BCD-like hex display: 0x25 -> 25) # (val // 16) * 10 + (val % 16) 로직이 맞음 def bcd(arr): return (arr // 16) * 10 + (arr % 16) year = bcd(m[:, 4]).astype(np.int16) + 2000 month = bcd(m[:, 5]) day = bcd(m[:, 6]) hour = bcd(m[:, 7]) minute = bcd(m[:, 8]) second = bcd(m[:, 9]) # --- [2. Speed & Analog] --- # uint8 두 개를 합쳐 uint16으로 변환 시 반드시 먼저 astype(uint16)을 해야 함 (안 그러면 오버플로우) trainspeed = ((m[:, 10].astype(np.uint16) << 8) | m[:, 11].astype(np.uint16)) / 10.0 limitspeed = m[:, 12] pwm_value = m[:, 22] ato_limitSpeed = m[:, 33] tasc_value = m[:, 33] # --- [3. Digital Flags (Bitwise Vectorized)] --- # Byte 13: DO ATC b13 = m[:, 13] do_zvr = (b13 & 32) > 0; do_edl = (b13 & 16) > 0; do_edr = (b13 & 8) > 0 do_fsb = (b13 & 4) > 0; do_ebm = (b13 & 2) > 0; do_ebp = (b13 & 1) > 0 # Byte 14: Status b14 = m[:, 14] system_active = (b14 & 128) > 0; over_spd_warning = (b14 & 64) > 0 tcr = (b14 & 32) > 0; hcr = (b14 & 16) > 0 door_open = (b14 & 8) > 0; door_close = (b14 & 4) > 0 psd_open = (b14 & 2) > 0; psd_close = (b14 & 1) > 0 # Byte 15: Mode b15 = m[:, 15] fa = (b15 & 128) > 0; auto = (b15 & 64) > 0; mcs = (b15 & 32) > 0 yard = (b15 & 16) > 0; fmc = (b15 & 8) > 0 reverser_rvs = (b15 & 4) > 0; reverser_fwd = (b15 & 2) > 0; reverser_neu = (b15 & 1) > 0 # Byte 16: Mascon b16 = m[:, 16] ato_start_btn = (b16 & 128) > 0; ato_eb_req = (b16 & 64) > 0 tacho_dir_a = (b16 & 32) > 0; tacho_dir_b = (b16 & 16) > 0 mascon_dr = (b16 & 2) > 0; mascon_br = (b16 & 4) > 0; mascon_eb = (b16 & 8) > 0 # Byte 19: ATC Status b19 = m[:, 19] val_19 = b19 & 192 # 0, 192, 64, 128 wheelcheck = (b19 & 32) > 0 # Byte 20: Fail b20 = m[:, 20] fail_atcr = (b20 & 128) > 0; fail_atoc = (b20 & 64) > 0; fail_tcms = (b20 & 32) > 0 fail_tacho2 = (b20 & 2) > 0; fail_tacho1 = (b20 & 1) > 0 # Byte 21: Marker b21 = m[:, 21] recovery = (b21 & 128) > 0; nomal = (b21 & 64) > 0; tasc = (b21 & 32) > 0 marker_val = b21 & 31 # 16, 8, 4, 2, 1 # Byte 23: RLY ATO b23 = m[:, 23] trac_dr = (b23 & 128) > 0; trac_br = (b23 & 64) > 0; trac_cs = (b23 & 32) > 0 ador = (b23 & 16) > 0; adol = (b23 & 8) > 0; adc = (b23 & 4) > 0 start_enable = (b23 & 2) > 0; trainberth = (b23 & 1) > 0 # Byte 24: TCMS b24 = m[:, 24] tc2 = (b24 & 128) > 0; tc1 = (b24 & 64) > 0; tascdb = (b24 & 32) > 0 # Byte 25: ETC b25 = m[:, 25] pre_brake = (b25 & 128) > 0; limit_drive = (b25 & 64) > 0 ov_stop1 = (b25 & 8) > 0; ov_stop2 = (b25 & 4) > 0 sh_stop1 = (b25 & 2) > 0; sh_stop2 = (b25 & 1) > 0 # Byte 26-27: Train No trainno_int = (m[:, 26].astype(np.uint16) << 8) | m[:, 27].astype(np.uint16) # Byte 28-30: Stations pstn = m[:, 28]; nstn = m[:, 29]; dstn = m[:, 30] # Byte 31-32: DTG (Distance to Go) - 정밀 로직 구현 num_dtg = (m[:, 31].astype(np.uint16) << 8) | m[:, 32].astype(np.uint16) # DTG 부호 처리: uint16을 int16으로 해석해서 음수 여부 판단 # C# 로직: if (num_dtg & 32768) != 0: num_dtg -= 65536 # Numpy에서는 그냥 .view(np.int16)하거나 astype(np.int16)하면 32768(0x8000) 이상은 자동으로 음수가 됨. dtg_signed = num_dtg.astype(np.int16) # 조건: TASC 모드가 아니면서 속도가 0이고 역행(Dr)이 아닐 때 부호 반전 cond_reverse = (~tasc) & (trainspeed == 0.0) & (~trac_dr) # np.where(조건, 참일때값, 거짓일때값) # 참일 때: (값 * -1) / 100.0 # 거짓일 때: TASC면 / 100.0, 아니면 / 10.0 dtg = np.where( cond_reverse, dtg_signed * -1.0 / 100.0, np.where(tasc, num_dtg / 100.0, num_dtg / 10.0) ) # Byte 34: TWC b34 = m[:, 34] twct_enable = (b34 & 16) > 0; door_close_warning = (b34 & 8) > 0; wrongdoor = (b34 & 4) > 0 # Byte 35: ATC Code b35 = m[:, 35] atc_idx = (b35 & 240) >> 4 osc_f0_ok = (b35 & 8) > 0 # Byte 36-39, 55, 58: Freq (XOR) atc_code_carrier_f = (((m[:, 36]^65).astype(np.uint16) << 8) | (m[:, 37]^82).astype(np.uint16)) * 10.0 atc_code_f = (((m[:, 38]^99).astype(np.uint16) << 8) | (m[:, 39]^116).astype(np.uint16)) / 10.0 osc_f = (((m[:, 55]^99).astype(np.uint16) << 8) | (m[:, 58]^100).astype(np.uint16)) / 10.0 # --- [Missing Fields Restored: VDI/VDO] --- # VDI A (63-65) vdia_rvs = (m[:, 63] & 128) > 0; vdia_neu = (m[:, 63] & 64) > 0; vdia_fwd = (m[:, 63] & 32) > 0 vdia_mascondr = (m[:, 63] & 16) > 0; vdia_masconbr = (m[:, 63] & 8) > 0; vdia_masconeb = (m[:, 63] & 4) > 0 vdia_doorclose = (m[:, 63] & 2) > 0; vdia_dooropen = (m[:, 63] & 1) > 0 vdia_fmc = (m[:, 64] & 64) > 0; vdia_yard = (m[:, 64] & 32) > 0; vdia_mcs = (m[:, 64] & 16) > 0 vdia_auto = (m[:, 64] & 8) > 0; vdia_fa = (m[:, 64] & 4) > 0 # VDI B (66-68) vdib_rvs = (m[:, 66] & 128) > 0; vdib_neu = (m[:, 66] & 64) > 0; vdib_fwd = (m[:, 66] & 32) > 0 vdib_mascondr = (m[:, 66] & 16) > 0; vdib_masconbr = (m[:, 66] & 8) > 0; vdib_masconeb = (m[:, 66] & 4) > 0 vdib_doorclose = (m[:, 66] & 2) > 0; vdib_dooropen = (m[:, 66] & 1) > 0 # VDI C (69-71) vdic_tc2 = (m[:, 69] & 128) > 0; vdic_tc1 = (m[:, 69] & 64) > 0 vdic_edlfb = (m[:, 69] & 32) > 0; vdic_edrfb = (m[:, 69] & 16) > 0 vdic_psdclose = (m[:, 70] & 2) > 0; vdic_psdopen = (m[:, 70] & 1) > 0 # VDO A (75-76) vdoa_edl = (m[:, 75] & 32) > 0; vdoa_edr = (m[:, 75] & 16) > 0 vdoa_zvr = (m[:, 75] & 8) > 0; vdoa_fsb = (m[:, 75] & 4) > 0 # --- [4. Create DataFrame] --- data_dict = { "seq": seq, "source": source, "system_id": system_id, "is_break": is_break, "year": year, "month": month, "day": day, "hour": hour, "minute": minute, "second": second, "trainspeed": trainspeed, "limitspeed": limitspeed, "pwm_value": pwm_value, "ato_limitSpeed": ato_limitSpeed, "tasc_value": tasc_value, "do_zvr": do_zvr, "do_edl": do_edl, "do_edr": do_edr, "do_fsb": do_fsb, "do_ebm": do_ebm, "do_ebp": do_ebp, "system_active": system_active, "over_spd_warning": over_spd_warning, "tcr": tcr, "hcr": hcr, "door_open": door_open, "door_close": door_close, "psd_open": psd_open, "psd_close": psd_close, "fa": fa, "auto": auto, "mcs": mcs, "yard": yard, "fmc": fmc, "reverser_rvs": reverser_rvs, "reverser_fwd": reverser_fwd, "reverser_neu": reverser_neu, "ato_start_btn": ato_start_btn, "ato_eb_req": ato_eb_req, "tacho_dir_a": tacho_dir_a, "tacho_dir_b": tacho_dir_b, "mascon_dr": mascon_dr, "mascon_br": mascon_br, "mascon_eb": mascon_eb, "atc_status_code": val_19, "fail_atcr": fail_atcr, "fail_atoc": fail_atoc, "fail_tcms": fail_tcms, "recovery": recovery, "nomal": nomal, "tasc": tasc, "marker_val": marker_val, "trac_dr": trac_dr, "trac_br": trac_br, "trac_cs": trac_cs, "tc2": tc2, "tc1": tc1, "tascdb": tascdb, "pre_brake": pre_brake, "limit_drive": limit_drive, "trainno_int": trainno_int, "pstn": pstn, "nstn": nstn, "dstn": dstn, "dtg": dtg, "twct_enable": twct_enable, "door_close_warning": door_close_warning, "wrongdoor": wrongdoor, "atc_idx": atc_idx, "osc_f0_ok": osc_f0_ok, "atc_code_carrier_f": atc_code_carrier_f, "atc_code_f": atc_code_f, "osc_f": osc_f, # 복원된 VDI/VDO "vdia_rvs": vdia_rvs, "vdia_neu": vdia_neu, "vdia_fwd": vdia_fwd, "vdia_mascondr": vdia_mascondr, "vdia_masconbr": vdia_masconbr, "vdia_doorclose": vdia_doorclose, "vdia_dooropen": vdia_dooropen, "vdib_doorclose": vdib_doorclose, "vdib_dooropen": vdib_dooropen, "vdoa_edl": vdoa_edl, "vdoa_edr": vdoa_edr, "vdoa_zvr": vdoa_zvr } df = pl.DataFrame(data_dict) # --- [5. Post-Processing] --- # Polars의 강력한 문자열 처리 기능 사용 (Lambda 루프 제거 -> 속도 향상) # 시간 문자열 생성 df = df.with_columns( pl.format("{}-{}-{} {}:{}:{}", pl.col("year"), pl.col("month").cast(pl.String).str.zfill(2), pl.col("day").cast(pl.String).str.zfill(2), pl.col("hour").cast(pl.String).str.zfill(2), pl.col("minute").cast(pl.String).str.zfill(2), pl.col("second").cast(pl.String).str.zfill(2) ).alias("time") ) # 열차번호 Hex String 변환 df = df.with_columns( pl.col("trainno_int") .map_elements(lambda x: f"{x:04X}", return_dtype=pl.String) .alias("trainno") ) # [Optional] 여기서 역 이름 매핑이나 코드 변환을 수행할 수 있음 # (이전 대화의 Processor 로직을 여기에 통합해도 됨) return df except Exception as e: print(f"[FastParser] Error: {e}") import traceback traceback.print_exc() return None @staticmethod def parse_to_parquet(input_path: str, output_path: str = None) -> Optional[str]: """ MMI 바이너리 로그를 파싱하여 Parquet로 저장 (Wrapper) """ df = FastLogParser.parse(input_path) if df is None: return None try: if output_path is None: output_path = input_path.replace(".dat", ".parquet") df.write_parquet(output_path, compression="zstd") return output_path except Exception as e: print(f"[FastParser] Error: {e}") import traceback traceback.print_exc() return None