AI_MMI_Analyser/app/data/w_mmiProcessor.py

56 lines
1.9 KiB
Python

import polars as pl
from mmi_parser import MMIParser
class MMIProcessor:
def __init__(self):
# 역 코드 매핑 테이블 (예시)
self.station_map = {
134: "범어사",
135: "남산",
136: "두실",
# ... 실제 매핑 데이터 추가
}
# ATC 코드 매핑
self.atc_code_map = {
0: "02", 1: "01", 2: "25", 3: "40", 4: "55"
}
def process_and_save(self, input_path: str, output_parquet_path: str):
# 1. 파싱 (Raw Data 로드)
parser = MMIParser()
df = parser.load_to_dataframe(input_path)
print(f"Loaded {len(df)} rows.")
# 2. 이중계 필터링 (Redundancy Logic)
# 로직: 같은 seq 중에서 is_break가 False(정상)인 것을 우선 선택
# sort: seq 오름차순, is_break 오름차순 (False=0, True=1 이므로 정상 데이터가 위로 옴)
df_clean = (
df.sort(["seq", "is_break"])
.unique(subset=["seq"], keep="first") # 그룹별 첫 번째(정상)만 유지
)
print(f"Filtered redundancy. {len(df)} -> {len(df_clean)} rows.")
# 3. 시멘틱 데이터 보강 (Semantic Enrichment)
# map_dict를 사용하여 고속 매핑
df_enriched = df_clean.with_columns([
pl.col("nstn").replace(self.station_map).alias("station_name"),
pl.col("atc_code_idx").replace(self.atc_code_map).alias("atc_code_str")
])
# 4. Parquet 저장 (압축 사용)
df_enriched.write_parquet(output_parquet_path, compression="zstd")
print(f"Saved to {output_parquet_path}")
return df_enriched
# --- 실행 예시 ---
if __name__ == "__main__":
processor = MMIProcessor()
# 로우 데이터 경로와 저장할 경로 지정
df = processor.process_and_save("raw_log.bin", "processed_data.parquet")
# 결과 확인
print(df.head())