import csv import os import pandas as pd import sys # 프로젝트 루트 경로를 path에 추가 (모듈 import를 위해) sys.path.append(os.path.dirname(os.path.abspath(__file__))) try: # 앞서 만든 파서 모듈에서 클래스 가져오기 from log_parser import DDOSItoMMI, DOSItoMMIClass except ImportError: print("[Error] 'app/data/log_parser.py' 파일을 찾을 수 없거나 임포트할 수 없습니다.") print("프로젝트 구조를 확인하거나 log_parser.py 코드를 이 파일 상단에 포함시켜주세요.") sys.exit(1) # ------------------------------------------------------------------------- # 1. 숨겨진 바이트 추출 함수 (C# 코드에서 Skip된 영역) # ------------------------------------------------------------------------- def extract_hidden_bytes(data: bytes): """ 84바이트 중 C# 파서에서 변수로 저장하지 않고 건너뛴 바이트들 추출 """ if len(data) < 84: return {} hidden = { "byte_00_header": data[0], # 보통 STX (0x02) 등 "byte_02_unknown": data[2], # SEQ와 SOURCE 사이 "byte_17_ack1": data[17], # ACK 관련 (C#에서 계산에만 쓰고 저장은 안함) "byte_18_ack2": data[18], # ACK 관련 # "byte_56_ver": data[56], # SW 버전 (파싱됨) # "byte_59_62": data[59:63], # 속도/VDI 중복 영역 (확인용) "byte_79_spare": data[79], "byte_80_spare": data[80], "byte_81_spare": data[81], "byte_82_spare": data[82], "byte_83_spare": data[83] # 패킷 끝부분 (ETX/Checksum?) } return hidden # ------------------------------------------------------------------------- # 2. 메인 실행 함수 # ------------------------------------------------------------------------- def run_export(file_path, output_csv): """ .dat 파일을 읽어 정상 데이터 + 숨겨진 데이터를 결합하여 CSV로 저장 """ if not os.path.exists(file_path): print(f"[Error] 파일을 찾을 수 없습니다: {file_path}") return print(f"▶ 데이터 로딩 시작: {file_path}") parsed_data_list = [] hidden_data_list = [] file_size = os.path.getsize(file_path) with open(file_path, 'rb') as f: CHUNK_SIZE = 84 count = 0 while f.tell() < file_size: chunk = f.read(CHUNK_SIZE) if len(chunk) < CHUNK_SIZE: break # (A) 정상 파싱 (LogParser의 로직 사용) # DDOSItoMMI.set_data는 바이트 청크를 받아 DOSItoMMIClass 객체를 반환합니다. try: mmi_data = DDOSItoMMI.set_data(chunk) parsed_data_list.append(mmi_data) except Exception as e: print(f"[Warning] 파싱 에러 (Index {count}): {e}") continue # (B) 숨겨진 데이터 추출 hidden_data = extract_hidden_bytes(chunk) hidden_data_list.append(hidden_data) count += 1 if count % 10000 == 0: print(f" ... {count}개 처리 중") print(f"▶ 로딩 완료: 총 {len(parsed_data_list)} 건") # 3. 데이터프레임 변환 및 병합 print("▶ 데이터프레임 변환 중...") # dataclass 리스트 -> DataFrame df_normal = pd.DataFrame([vars(d) for d in parsed_data_list]) # Hidden dict 리스트 -> DataFrame df_hidden = pd.DataFrame(hidden_data_list) # 두 데이터프레임을 가로로 병합 (인덱스 기준) final_df = pd.concat([df_normal, df_hidden], axis=1) # 4. 통신 불량 분석 (SEQ 점프 확인) analyze_communication_health(final_df) # 5. CSV 저장 print(f"▶ CSV 파일 저장 중... ({output_csv})") final_df.to_csv(output_csv, index=False, encoding='utf-8-sig') print("▶ 완료!") def analyze_communication_health(df): """ SEQ(순서번호)를 Source별로 그룹화하여 정확한 통신 끊김 분석 """ if 'seq' not in df.columns or 'source' not in df.columns: print("SEQ 또는 Source 데이터가 없습니다.") return print("\n[정밀 통신 상태 분석 결과 (Source별)]") # 소스별로 그룹화 grouped = df.groupby('source') total_breaks = 0 for source_id, group_df in grouped: # 해당 소스 데이터만 복사 및 정렬 sdf = group_df.sort_index().copy() # SEQ 차이 계산 sdf['prev_seq'] = sdf['seq'].shift(1) # 정상 조건: (차이 == 1) OR (255 -> 0) # 그 외는 끊김 (첫 행은 제외) mask_normal = (sdf['seq'] - sdf['prev_seq'] == 1) mask_wrap = (sdf['prev_seq'] == 255) & (sdf['seq'] == 0) mask_first = sdf['prev_seq'].isna() # 끊김 발생 지점 breaks = sdf[~(mask_normal | mask_wrap | mask_first)] break_count = len(breaks) total_breaks += break_count print(f"▶ Source {source_id}: 총 {len(sdf)}개 패킷 중 {break_count}회 끊김") if break_count > 0: print(f" [Source {source_id} 주요 끊김 시점 Top 3]") for idx, row in breaks.head(3).iterrows(): print(f" - 시간: {row['time']} | SEQ: {int(row['prev_seq'])} -> {int(row['seq'])}") if total_breaks == 0: print("\n✅ 모든 소스의 통신 상태가 매우 양호합니다! (완벽한 연속성)") else: print(f"\n⚠️ 전체 소스 합산 총 {total_breaks}회의 통신 끊김이 확인되었습니다.") # ------------------------------------------------------------------------- # 실행 # ------------------------------------------------------------------------- if __name__ == "__main__": # 분석할 .dat 파일 경로 (여기에 실제 파일명을 넣으세요) target_file = "20250723_180942_13_HCR_TC2_edit.dat" # 결과 파일명 output_csv = "analysis_result_full.csv" if os.path.exists(target_file): run_export(target_file, output_csv) else: print(f"파일이 없습니다: {target_file}") # 테스트를 위해 더미 파일 생성 로직 등을 넣을 수도 있음