AI_MMI_Analyser/sample/test_expo.py

166 lines
6.2 KiB
Python

import csv
import os
import pandas as pd
import sys
# 프로젝트 루트 경로를 path에 추가 (모듈 import를 위해)
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try:
# 앞서 만든 파서 모듈에서 클래스 가져오기
from log_parser import DDOSItoMMI, DOSItoMMIClass
except ImportError:
print("[Error] 'app/data/log_parser.py' 파일을 찾을 수 없거나 임포트할 수 없습니다.")
print("프로젝트 구조를 확인하거나 log_parser.py 코드를 이 파일 상단에 포함시켜주세요.")
sys.exit(1)
# -------------------------------------------------------------------------
# 1. 숨겨진 바이트 추출 함수 (C# 코드에서 Skip된 영역)
# -------------------------------------------------------------------------
def extract_hidden_bytes(data: bytes):
"""
84바이트 중 C# 파서에서 변수로 저장하지 않고 건너뛴 바이트들 추출
"""
if len(data) < 84: return {}
hidden = {
"byte_00_header": data[0], # 보통 STX (0x02) 등
"byte_02_unknown": data[2], # SEQ와 SOURCE 사이
"byte_17_ack1": data[17], # ACK 관련 (C#에서 계산에만 쓰고 저장은 안함)
"byte_18_ack2": data[18], # ACK 관련
# "byte_56_ver": data[56], # SW 버전 (파싱됨)
# "byte_59_62": data[59:63], # 속도/VDI 중복 영역 (확인용)
"byte_79_spare": data[79],
"byte_80_spare": data[80],
"byte_81_spare": data[81],
"byte_82_spare": data[82],
"byte_83_spare": data[83] # 패킷 끝부분 (ETX/Checksum?)
}
return hidden
# -------------------------------------------------------------------------
# 2. 메인 실행 함수
# -------------------------------------------------------------------------
def run_export(file_path, output_csv):
"""
.dat 파일을 읽어 정상 데이터 + 숨겨진 데이터를 결합하여 CSV로 저장
"""
if not os.path.exists(file_path):
print(f"[Error] 파일을 찾을 수 없습니다: {file_path}")
return
print(f"▶ 데이터 로딩 시작: {file_path}")
parsed_data_list = []
hidden_data_list = []
file_size = os.path.getsize(file_path)
with open(file_path, 'rb') as f:
CHUNK_SIZE = 84
count = 0
while f.tell() < file_size:
chunk = f.read(CHUNK_SIZE)
if len(chunk) < CHUNK_SIZE:
break
# (A) 정상 파싱 (LogParser의 로직 사용)
# DDOSItoMMI.set_data는 바이트 청크를 받아 DOSItoMMIClass 객체를 반환합니다.
try:
mmi_data = DDOSItoMMI.set_data(chunk)
parsed_data_list.append(mmi_data)
except Exception as e:
print(f"[Warning] 파싱 에러 (Index {count}): {e}")
continue
# (B) 숨겨진 데이터 추출
hidden_data = extract_hidden_bytes(chunk)
hidden_data_list.append(hidden_data)
count += 1
if count % 10000 == 0:
print(f" ... {count}개 처리 중")
print(f"▶ 로딩 완료: 총 {len(parsed_data_list)}")
# 3. 데이터프레임 변환 및 병합
print("▶ 데이터프레임 변환 중...")
# dataclass 리스트 -> DataFrame
df_normal = pd.DataFrame([vars(d) for d in parsed_data_list])
# Hidden dict 리스트 -> DataFrame
df_hidden = pd.DataFrame(hidden_data_list)
# 두 데이터프레임을 가로로 병합 (인덱스 기준)
final_df = pd.concat([df_normal, df_hidden], axis=1)
# 4. 통신 불량 분석 (SEQ 점프 확인)
analyze_communication_health(final_df)
# 5. CSV 저장
print(f"▶ CSV 파일 저장 중... ({output_csv})")
final_df.to_csv(output_csv, index=False, encoding='utf-8-sig')
print("▶ 완료!")
def analyze_communication_health(df):
"""
SEQ(순서번호)를 Source별로 그룹화하여 정확한 통신 끊김 분석
"""
if 'seq' not in df.columns or 'source' not in df.columns:
print("SEQ 또는 Source 데이터가 없습니다.")
return
print("\n[정밀 통신 상태 분석 결과 (Source별)]")
# 소스별로 그룹화
grouped = df.groupby('source')
total_breaks = 0
for source_id, group_df in grouped:
# 해당 소스 데이터만 복사 및 정렬
sdf = group_df.sort_index().copy()
# SEQ 차이 계산
sdf['prev_seq'] = sdf['seq'].shift(1)
# 정상 조건: (차이 == 1) OR (255 -> 0)
# 그 외는 끊김 (첫 행은 제외)
mask_normal = (sdf['seq'] - sdf['prev_seq'] == 1)
mask_wrap = (sdf['prev_seq'] == 255) & (sdf['seq'] == 0)
mask_first = sdf['prev_seq'].isna()
# 끊김 발생 지점
breaks = sdf[~(mask_normal | mask_wrap | mask_first)]
break_count = len(breaks)
total_breaks += break_count
print(f"▶ Source {source_id}: 총 {len(sdf)}개 패킷 중 {break_count}회 끊김")
if break_count > 0:
print(f" [Source {source_id} 주요 끊김 시점 Top 3]")
for idx, row in breaks.head(3).iterrows():
print(f" - 시간: {row['time']} | SEQ: {int(row['prev_seq'])} -> {int(row['seq'])}")
if total_breaks == 0:
print("\n✅ 모든 소스의 통신 상태가 매우 양호합니다! (완벽한 연속성)")
else:
print(f"\n⚠️ 전체 소스 합산 총 {total_breaks}회의 통신 끊김이 확인되었습니다.")
# -------------------------------------------------------------------------
# 실행
# -------------------------------------------------------------------------
if __name__ == "__main__":
# 분석할 .dat 파일 경로 (여기에 실제 파일명을 넣으세요)
target_file = "20250723_180942_13_HCR_TC2_edit.dat"
# 결과 파일명
output_csv = "analysis_result_full.csv"
if os.path.exists(target_file):
run_export(target_file, output_csv)
else:
print(f"파일이 없습니다: {target_file}")
# 테스트를 위해 더미 파일 생성 로직 등을 넣을 수도 있음