174 lines
5.9 KiB
Python
174 lines
5.9 KiB
Python
|
|
import pandas as pd
|
|
from pathlib import Path
|
|
|
|
def parse_excel_file(input_path, diagram_type, line=1):
|
|
print(f"🔄 파싱 시작: {input_path} ({diagram_type})")
|
|
|
|
# 1. 엑셀 로드
|
|
raw = pd.read_excel(input_path, sheet_name=0, header=None)
|
|
|
|
# 2. 헤더 및 데이터 위치 탐색
|
|
header_row_idx = None
|
|
for i in range(min(15, len(raw))):
|
|
if "기지" in raw.iloc[i].astype(str).tolist():
|
|
header_row_idx = i
|
|
break
|
|
|
|
if header_row_idx is None:
|
|
print(f"⚠️ {input_path}: 헤더(기지)를 찾을 수 없습니다. 건너뜁니다.")
|
|
return None
|
|
|
|
# 3. 데이터프레임 구조화
|
|
header = raw.iloc[header_row_idx].astype(str).str.strip().tolist()
|
|
df = raw.iloc[header_row_idx + 1:].reset_index(drop=True)
|
|
|
|
new_cols = []
|
|
counts = {}
|
|
for col in header:
|
|
col_name = col if col != "nan" else "unnamed"
|
|
if col_name in counts:
|
|
counts[col_name] += 1
|
|
new_cols.append(f"{col_name}_{counts[col_name]}")
|
|
else:
|
|
counts[col_name] = 0
|
|
new_cols.append(col_name)
|
|
|
|
df.columns = new_cols
|
|
|
|
# 4. 필수 컬럼 식별
|
|
track_col = new_cols[0]
|
|
direction_col = next((c for c in [str(x) for x in new_cols] if "상하" in c), None)
|
|
run_col = next((c for c in [str(x) for x in new_cols] if "입출고" in c), None)
|
|
duty_col = next((c for c in [str(x) for x in new_cols] if "DIA" in c or "다이아" in c), None)
|
|
train_col = next((c for c in [str(x) for x in new_cols] if "열번" in c), None)
|
|
depot_col = next((c for c in [str(x) for x in new_cols] if "기지" in c), None)
|
|
|
|
# 역 컬럼 정의
|
|
meta_keywords = ["상하", "입출고", "DIA", "다이아", "열번", "기지", "track_no", "unnamed"]
|
|
station_cols = []
|
|
col_found_start = False
|
|
for i, col in enumerate(new_cols):
|
|
if col == depot_col:
|
|
col_found_start = True
|
|
continue
|
|
if col_found_start:
|
|
is_meta = any(k in col for k in meta_keywords)
|
|
if not is_meta:
|
|
station_cols.append(col)
|
|
|
|
# 5. 데이터 클렌징
|
|
df["train_number"] = pd.to_numeric(df[train_col], errors="coerce")
|
|
df = df.dropna(subset=["train_number"])
|
|
|
|
direction_map = {"상": "up", "하": "down"}
|
|
def classify_run_type(text):
|
|
if pd.isna(text) or text == "nan": return "regular"
|
|
text = str(text)
|
|
if "출" in text: return "depot_out"
|
|
elif "입" in text: return "depot_in"
|
|
else: return "regular"
|
|
|
|
df["direction"] = df[direction_col].map(direction_map).fillna("unknown")
|
|
df["run_type"] = df[run_col].apply(classify_run_type)
|
|
df["depot_detail"] = df[run_col].astype(str).replace("nan", "")
|
|
df["duty_id"] = pd.to_numeric(df[duty_col], errors="coerce").fillna(0).astype(int)
|
|
df["depot"] = df[depot_col].astype(str).replace("nan", "")
|
|
df["track_no"] = pd.to_numeric(df[track_col], errors="coerce").fillna(0).astype(int)
|
|
|
|
df["line"] = line
|
|
df["diagram_type"] = diagram_type
|
|
|
|
# 6. 세로형 변환 (Melt)
|
|
long_df = df.melt(
|
|
id_vars=[
|
|
"line", "diagram_type", "duty_id", "train_number",
|
|
"direction", "run_type", "depot", "depot_detail", "track_no"
|
|
],
|
|
value_vars=station_cols,
|
|
var_name="station",
|
|
value_name="time"
|
|
)
|
|
|
|
# 7. 후처리
|
|
long_df = long_df.dropna(subset=["time"])
|
|
|
|
def format_time(t):
|
|
if pd.isna(t) or t == "" or t == "nan": return None
|
|
try:
|
|
if isinstance(t, str):
|
|
# HH:MM:SS format
|
|
return t
|
|
ts = pd.to_datetime(str(t))
|
|
return ts.strftime("%H:%M:%S")
|
|
except:
|
|
return None
|
|
|
|
long_df["time"] = long_df["time"].apply(format_time)
|
|
long_df = long_df.dropna(subset=["time"])
|
|
|
|
station_order = {s: i + 1 for i, s in enumerate(station_cols)}
|
|
long_df["seq"] = long_df["station"].map(station_order)
|
|
|
|
return long_df
|
|
|
|
def convert_all_timetables(output_path="data/unified_timetable.parquet"):
|
|
config = [
|
|
{"file": "weekday.xlsx", "type": "weekday"},
|
|
{"file": "weekend.xlsx", "type": "weekend"},
|
|
{"file": "holiday.xlsx", "type": "holiday"}
|
|
]
|
|
|
|
all_dfs = []
|
|
|
|
for item in config:
|
|
file_path = Path("d:/py_train/voc_noti/test/dia") / item["file"]
|
|
if file_path.exists():
|
|
df = parse_excel_file(file_path, item["type"])
|
|
if df is not None:
|
|
all_dfs.append(df)
|
|
else:
|
|
print(f"❌ 파일 없음: {file_path}")
|
|
|
|
if not all_dfs:
|
|
print("❌ 파싱된 데이터가 없습니다.")
|
|
return
|
|
|
|
final_df = pd.concat(all_dfs, ignore_index=True)
|
|
|
|
# 정렬
|
|
final_df = final_df.sort_values(by=["diagram_type", "duty_id", "train_number", "seq"]).reset_index(drop=True)
|
|
|
|
# 최종 컬럼 순서 및 타입
|
|
final_df = final_df[[
|
|
"line", "diagram_type", "duty_id", "train_number",
|
|
"direction", "run_type", "depot", "depot_detail", "track_no",
|
|
"station", "seq", "time"
|
|
]]
|
|
|
|
# 타입 고정
|
|
final_df = final_df.astype({
|
|
"line": "int8",
|
|
"diagram_type": "string",
|
|
"duty_id": "int16",
|
|
"train_number": "int32",
|
|
"direction": "string",
|
|
"run_type": "string",
|
|
"depot": "string",
|
|
"depot_detail": "string",
|
|
"track_no": "int16",
|
|
"station": "string",
|
|
"seq": "int16",
|
|
"time": "string"
|
|
})
|
|
|
|
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
|
|
final_df.to_parquet(output_path, index=False, engine="pyarrow", compression="zstd")
|
|
|
|
print(f"\n✅ 통합 데이터 생성 완료: {output_path}")
|
|
print(f" - 총 데이터 건수: {len(final_df)} 건")
|
|
print(f" - 다이어그램 타입별 건수:\n{final_df['diagram_type'].value_counts()}")
|
|
|
|
if __name__ == "__main__":
|
|
convert_all_timetables("d:/py_train/voc_noti/test/dia/data/unified_timetable.parquet")
|