199 lines
5.4 KiB
Python
199 lines
5.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
SQL 파일 로더 모듈
|
|
PostgreSQL 형식의 INSERT 문을 파싱하여 SQLite에 삽입합니다.
|
|
"""
|
|
|
|
import re
|
|
import ast
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
|
|
from core.logger import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
def parse_postgresql_insert(sql_content: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
PostgreSQL 형식의 INSERT 문을 파싱합니다.
|
|
|
|
형식: INSERT INTO "table" ("col1", "col2") VALUES ('val1', 'val2'), ('val3', 'val4')
|
|
|
|
Args:
|
|
sql_content: SQL 파일 내용
|
|
|
|
Returns:
|
|
파싱된 레코드 리스트
|
|
"""
|
|
records = []
|
|
|
|
# INSERT 문 패턴 매칭
|
|
pattern = r'INSERT INTO\s+"[^"]+"\s*\(([^)]+)\)\s*VALUES\s*(.+?)(?=\s*;|\s*$)'
|
|
|
|
matches = re.finditer(pattern, sql_content, re.IGNORECASE | re.DOTALL)
|
|
|
|
for match in matches:
|
|
columns_str = match.group(1)
|
|
values_str = match.group(2)
|
|
|
|
# 컬럼명 파싱
|
|
columns = [col.strip().strip('"') for col in columns_str.split(',')]
|
|
|
|
# VALUES 부분 파싱 (여러 레코드가 있을 수 있음)
|
|
# VALUES ('val1', 'val2'), ('val3', 'val4') 형식
|
|
value_records = _parse_values(values_str)
|
|
|
|
for value_record in value_records:
|
|
if len(value_record) == len(columns):
|
|
record = dict(zip(columns, value_record))
|
|
records.append(record)
|
|
else:
|
|
logger.warning(f"컬럼 수 불일치: {len(columns)} 컬럼, {len(value_record)} 값")
|
|
|
|
return records
|
|
|
|
|
|
def _parse_values(values_str: str) -> List[List[Any]]:
|
|
"""
|
|
VALUES 부분을 파싱합니다.
|
|
|
|
Args:
|
|
values_str: VALUES ('val1', 'val2'), ('val3', 'val4') 형식의 문자열
|
|
|
|
Returns:
|
|
값 리스트의 리스트
|
|
"""
|
|
records = []
|
|
|
|
# 괄호로 묶인 각 레코드 찾기
|
|
# 복잡한 경우를 처리하기 위해 스택 사용
|
|
current_record = []
|
|
current_value = ""
|
|
in_quotes = False
|
|
quote_char = None
|
|
paren_depth = 0
|
|
i = 0
|
|
|
|
while i < len(values_str):
|
|
char = values_str[i]
|
|
|
|
# 이스케이프 처리
|
|
if char == '\\' and i + 1 < len(values_str):
|
|
current_value += char + values_str[i + 1]
|
|
i += 2
|
|
continue
|
|
|
|
# 따옴표 처리
|
|
if char in ("'", '"'):
|
|
if not in_quotes:
|
|
in_quotes = True
|
|
quote_char = char
|
|
current_value += char
|
|
elif char == quote_char:
|
|
# 닫는 따옴표
|
|
in_quotes = False
|
|
quote_char = None
|
|
current_value += char
|
|
else:
|
|
current_value += char
|
|
# 괄호 처리
|
|
elif char == '(' and not in_quotes:
|
|
paren_depth += 1
|
|
if paren_depth == 1:
|
|
# 새로운 레코드 시작
|
|
current_record = []
|
|
current_value = ""
|
|
else:
|
|
current_value += char
|
|
elif char == ')' and not in_quotes:
|
|
paren_depth -= 1
|
|
if paren_depth == 0:
|
|
# 레코드 완료
|
|
if current_value.strip():
|
|
current_record.append(_clean_value(current_value.strip()))
|
|
records.append(current_record)
|
|
current_record = []
|
|
current_value = ""
|
|
else:
|
|
current_value += char
|
|
elif char == ',' and not in_quotes and paren_depth == 1:
|
|
# 레코드 내 값 구분자
|
|
if current_value.strip():
|
|
current_record.append(_clean_value(current_value.strip()))
|
|
current_value = ""
|
|
else:
|
|
current_value += char
|
|
|
|
i += 1
|
|
|
|
return records
|
|
|
|
|
|
def _clean_value(value: str) -> Any:
|
|
"""
|
|
값을 정리합니다.
|
|
|
|
Args:
|
|
value: 원시 값 문자열
|
|
|
|
Returns:
|
|
정리된 값 (None, 문자열, 숫자 등)
|
|
"""
|
|
value = value.strip()
|
|
|
|
# NULL 처리
|
|
if value.upper() == 'NULL' or value == '':
|
|
return None
|
|
|
|
# 문자열 처리 (따옴표 제거)
|
|
if value.startswith("'") and value.endswith("'"):
|
|
# 이스케이프 처리
|
|
result = value[1:-1].replace("''", "'").replace("\\'", "'")
|
|
return result
|
|
elif value.startswith('"') and value.endswith('"'):
|
|
result = value[1:-1].replace('""', '"').replace('\\"', '"')
|
|
return result
|
|
|
|
# 숫자 처리
|
|
try:
|
|
if '.' in value:
|
|
return float(value)
|
|
else:
|
|
return int(value)
|
|
except ValueError:
|
|
pass
|
|
|
|
# 불리언 처리
|
|
if value.lower() == 'true':
|
|
return True
|
|
elif value.lower() == 'false':
|
|
return False
|
|
|
|
return value
|
|
|
|
|
|
def load_sql_file(sql_file: Path) -> List[Dict[str, Any]]:
|
|
"""
|
|
SQL 파일을 로드하고 파싱합니다.
|
|
|
|
Args:
|
|
sql_file: SQL 파일 경로
|
|
|
|
Returns:
|
|
파싱된 레코드 리스트
|
|
"""
|
|
try:
|
|
with open(sql_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
records = parse_postgresql_insert(content)
|
|
logger.info(f"SQL 파일 파싱 완료: {sql_file.name} ({len(records)}개 레코드)")
|
|
return records
|
|
|
|
except Exception as e:
|
|
logger.error(f"SQL 파일 로드 실패 ({sql_file}): {e}")
|
|
return []
|
|
|