166 lines
6.2 KiB
Python
166 lines
6.2 KiB
Python
import csv
|
|
import os
|
|
import pandas as pd
|
|
import sys
|
|
|
|
# 프로젝트 루트 경로를 path에 추가 (모듈 import를 위해)
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
try:
|
|
# 앞서 만든 파서 모듈에서 클래스 가져오기
|
|
from log_parser import DDOSItoMMI, DOSItoMMIClass
|
|
except ImportError:
|
|
print("[Error] 'app/data/log_parser.py' 파일을 찾을 수 없거나 임포트할 수 없습니다.")
|
|
print("프로젝트 구조를 확인하거나 log_parser.py 코드를 이 파일 상단에 포함시켜주세요.")
|
|
sys.exit(1)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# 1. 숨겨진 바이트 추출 함수 (C# 코드에서 Skip된 영역)
|
|
# -------------------------------------------------------------------------
|
|
def extract_hidden_bytes(data: bytes):
|
|
"""
|
|
84바이트 중 C# 파서에서 변수로 저장하지 않고 건너뛴 바이트들 추출
|
|
"""
|
|
if len(data) < 84: return {}
|
|
|
|
hidden = {
|
|
"byte_00_header": data[0], # 보통 STX (0x02) 등
|
|
"byte_02_unknown": data[2], # SEQ와 SOURCE 사이
|
|
"byte_17_ack1": data[17], # ACK 관련 (C#에서 계산에만 쓰고 저장은 안함)
|
|
"byte_18_ack2": data[18], # ACK 관련
|
|
# "byte_56_ver": data[56], # SW 버전 (파싱됨)
|
|
# "byte_59_62": data[59:63], # 속도/VDI 중복 영역 (확인용)
|
|
"byte_79_spare": data[79],
|
|
"byte_80_spare": data[80],
|
|
"byte_81_spare": data[81],
|
|
"byte_82_spare": data[82],
|
|
"byte_83_spare": data[83] # 패킷 끝부분 (ETX/Checksum?)
|
|
}
|
|
return hidden
|
|
|
|
# -------------------------------------------------------------------------
|
|
# 2. 메인 실행 함수
|
|
# -------------------------------------------------------------------------
|
|
def run_export(file_path, output_csv):
|
|
"""
|
|
.dat 파일을 읽어 정상 데이터 + 숨겨진 데이터를 결합하여 CSV로 저장
|
|
"""
|
|
if not os.path.exists(file_path):
|
|
print(f"[Error] 파일을 찾을 수 없습니다: {file_path}")
|
|
return
|
|
|
|
print(f"▶ 데이터 로딩 시작: {file_path}")
|
|
|
|
parsed_data_list = []
|
|
hidden_data_list = []
|
|
|
|
file_size = os.path.getsize(file_path)
|
|
|
|
with open(file_path, 'rb') as f:
|
|
CHUNK_SIZE = 84
|
|
count = 0
|
|
|
|
while f.tell() < file_size:
|
|
chunk = f.read(CHUNK_SIZE)
|
|
if len(chunk) < CHUNK_SIZE:
|
|
break
|
|
|
|
# (A) 정상 파싱 (LogParser의 로직 사용)
|
|
# DDOSItoMMI.set_data는 바이트 청크를 받아 DOSItoMMIClass 객체를 반환합니다.
|
|
try:
|
|
mmi_data = DDOSItoMMI.set_data(chunk)
|
|
parsed_data_list.append(mmi_data)
|
|
except Exception as e:
|
|
print(f"[Warning] 파싱 에러 (Index {count}): {e}")
|
|
continue
|
|
|
|
# (B) 숨겨진 데이터 추출
|
|
hidden_data = extract_hidden_bytes(chunk)
|
|
hidden_data_list.append(hidden_data)
|
|
|
|
count += 1
|
|
if count % 10000 == 0:
|
|
print(f" ... {count}개 처리 중")
|
|
|
|
print(f"▶ 로딩 완료: 총 {len(parsed_data_list)} 건")
|
|
|
|
# 3. 데이터프레임 변환 및 병합
|
|
print("▶ 데이터프레임 변환 중...")
|
|
|
|
# dataclass 리스트 -> DataFrame
|
|
df_normal = pd.DataFrame([vars(d) for d in parsed_data_list])
|
|
|
|
# Hidden dict 리스트 -> DataFrame
|
|
df_hidden = pd.DataFrame(hidden_data_list)
|
|
|
|
# 두 데이터프레임을 가로로 병합 (인덱스 기준)
|
|
final_df = pd.concat([df_normal, df_hidden], axis=1)
|
|
|
|
# 4. 통신 불량 분석 (SEQ 점프 확인)
|
|
analyze_communication_health(final_df)
|
|
|
|
# 5. CSV 저장
|
|
print(f"▶ CSV 파일 저장 중... ({output_csv})")
|
|
final_df.to_csv(output_csv, index=False, encoding='utf-8-sig')
|
|
print("▶ 완료!")
|
|
|
|
def analyze_communication_health(df):
|
|
"""
|
|
SEQ(순서번호)를 Source별로 그룹화하여 정확한 통신 끊김 분석
|
|
"""
|
|
if 'seq' not in df.columns or 'source' not in df.columns:
|
|
print("SEQ 또는 Source 데이터가 없습니다.")
|
|
return
|
|
|
|
print("\n[정밀 통신 상태 분석 결과 (Source별)]")
|
|
|
|
# 소스별로 그룹화
|
|
grouped = df.groupby('source')
|
|
|
|
total_breaks = 0
|
|
|
|
for source_id, group_df in grouped:
|
|
# 해당 소스 데이터만 복사 및 정렬
|
|
sdf = group_df.sort_index().copy()
|
|
|
|
# SEQ 차이 계산
|
|
sdf['prev_seq'] = sdf['seq'].shift(1)
|
|
|
|
# 정상 조건: (차이 == 1) OR (255 -> 0)
|
|
# 그 외는 끊김 (첫 행은 제외)
|
|
mask_normal = (sdf['seq'] - sdf['prev_seq'] == 1)
|
|
mask_wrap = (sdf['prev_seq'] == 255) & (sdf['seq'] == 0)
|
|
mask_first = sdf['prev_seq'].isna()
|
|
|
|
# 끊김 발생 지점
|
|
breaks = sdf[~(mask_normal | mask_wrap | mask_first)]
|
|
break_count = len(breaks)
|
|
total_breaks += break_count
|
|
|
|
print(f"▶ Source {source_id}: 총 {len(sdf)}개 패킷 중 {break_count}회 끊김")
|
|
|
|
if break_count > 0:
|
|
print(f" [Source {source_id} 주요 끊김 시점 Top 3]")
|
|
for idx, row in breaks.head(3).iterrows():
|
|
print(f" - 시간: {row['time']} | SEQ: {int(row['prev_seq'])} -> {int(row['seq'])}")
|
|
|
|
if total_breaks == 0:
|
|
print("\n✅ 모든 소스의 통신 상태가 매우 양호합니다! (완벽한 연속성)")
|
|
else:
|
|
print(f"\n⚠️ 전체 소스 합산 총 {total_breaks}회의 통신 끊김이 확인되었습니다.")
|
|
|
|
# -------------------------------------------------------------------------
|
|
# 실행
|
|
# -------------------------------------------------------------------------
|
|
if __name__ == "__main__":
|
|
# 분석할 .dat 파일 경로 (여기에 실제 파일명을 넣으세요)
|
|
target_file = "20250723_180942_13_HCR_TC2_edit.dat"
|
|
|
|
# 결과 파일명
|
|
output_csv = "analysis_result_full.csv"
|
|
|
|
if os.path.exists(target_file):
|
|
run_export(target_file, output_csv)
|
|
else:
|
|
print(f"파일이 없습니다: {target_file}")
|
|
# 테스트를 위해 더미 파일 생성 로직 등을 넣을 수도 있음 |