106 lines
3.8 KiB
Python
106 lines
3.8 KiB
Python
import struct
|
|
import os
|
|
import polars as pl
|
|
from typing import Dict, List, Any
|
|
|
|
class MMIParser:
|
|
"""
|
|
MMI 바이너리 로그를 파싱하여 Polars DataFrame으로 반환하는 클래스
|
|
"""
|
|
CHUNK_SIZE = 84
|
|
|
|
@staticmethod
|
|
def parse_chunk(data: bytes) -> Dict[str, Any]:
|
|
"""84바이트 청크 하나를 딕셔너리로 변환"""
|
|
if len(data) < 84:
|
|
return None
|
|
|
|
# 기본 파싱 (비트 연산 로직 유지)
|
|
# 속도를 위해 dataclass 대신 dict 사용
|
|
row = {}
|
|
|
|
# [Header]
|
|
row["seq"] = data[1]
|
|
row["source"] = data[3]
|
|
# 이중계 판단 로직: 소스가 32(0x20)가 아니면 Break(고장/대기)로 간주
|
|
# (사용자 정의 로직에 따라 16, 32 외의 값 처리도 필요할 수 있음)
|
|
row["is_break"] = (data[3] != 32)
|
|
|
|
# [Time]
|
|
try:
|
|
row["time"] = (f"20{data[4]:02x}.{data[5]:02x}.{data[6]:02x} "
|
|
f"{data[7]:02x}:{data[8]:02x}:{data[9]:02x}")
|
|
except:
|
|
row["time"] = None
|
|
|
|
# [Speed & Analog]
|
|
row["trainspeed"] = ((data[10] << 8) | data[11]) / 10.0
|
|
row["limitspeed"] = data[12]
|
|
row["pwm_value"] = data[22]
|
|
|
|
# [DTG Calculation]
|
|
tasc = (data[21] & 32) > 0
|
|
trac_dr = (data[23] & 128) > 0
|
|
num_dtg = (data[31] << 8) | data[32]
|
|
|
|
if tasc:
|
|
row["dtg"] = num_dtg / 100.0
|
|
else:
|
|
if row["trainspeed"] == 0.0 and not trac_dr:
|
|
if (num_dtg & 32768) != 0:
|
|
num_dtg -= 65536
|
|
row["dtg"] = (num_dtg / 100.0) * -1.0
|
|
else:
|
|
row["dtg"] = num_dtg / 10.0
|
|
|
|
# [Frequency & XOR Decoding]
|
|
row["atc_code_carrier_f"] = ((data[36] ^ 65) << 8 | (data[37] ^ 82)) * 10.0
|
|
row["atc_code_f"] = ((data[38] ^ 99) << 8 | (data[39] ^ 116)) / 10.0
|
|
|
|
# [Bitmasks - 주요 신호만 예시로 포함, 필요 시 전체 추가]
|
|
# bitmasking은 polars의 map_elements보다 여기서 처리해서 넘기는게 빠름
|
|
row["do_zvr"] = (data[13] & 32) > 0
|
|
row["do_ebm"] = (data[13] & 2) > 0
|
|
row["system_active"] = (data[14] & 128) > 0
|
|
row["door_open"] = (data[14] & 8) > 0
|
|
row["door_close"] = (data[14] & 4) > 0
|
|
|
|
# [Station Codes for Semantic Mapping]
|
|
row["pstn"] = data[28] # 이전역
|
|
row["nstn"] = data[29] # 현재/다음역
|
|
row["dstn"] = data[30] # 종착역
|
|
|
|
# [ATC Code Parsing]
|
|
atc_idx = (data[35] & 240) >> 4
|
|
# 매핑은 여기서 문자열로 변환하지 않고 숫자(Index)만 넘깁니다.
|
|
# (문자열 변환은 Processor에서 일괄 처리하는 게 속도상 유리)
|
|
row["atc_code_idx"] = atc_idx
|
|
|
|
return row
|
|
|
|
def load_to_dataframe(self, filepath: str) -> pl.DataFrame:
|
|
"""파일 전체를 읽어 Polars DataFrame으로 반환"""
|
|
if not os.path.exists(filepath):
|
|
raise FileNotFoundError(f"File not found: {filepath}")
|
|
|
|
raw_data = []
|
|
file_size = os.path.getsize(filepath)
|
|
|
|
with open(filepath, 'rb') as f:
|
|
# 파일 전체를 메모리에 로드 (50만개 * 84byte = 약 42MB, 메모리에 충분함)
|
|
# IO 속도 향상을 위해 한 번에 읽음
|
|
buffer = f.read()
|
|
|
|
# 청크 단위 파싱
|
|
total_chunks = len(buffer) // self.CHUNK_SIZE
|
|
for i in range(total_chunks):
|
|
start = i * self.CHUNK_SIZE
|
|
end = start + self.CHUNK_SIZE
|
|
chunk = buffer[start:end]
|
|
|
|
parsed_row = self.parse_chunk(chunk)
|
|
if parsed_row:
|
|
raw_data.append(parsed_row)
|
|
|
|
# Polars DataFrame 생성 (고속)
|
|
return pl.DataFrame(raw_data) |