앱 메인 파일(app.py) 삭제 및 데이터베이스 관련 파일 수정: 데이터베이스 테이블 이름 변경, UI 업데이트 강제 호출 추가, 고장 코드 검색 및 관련 기능 개선

This commit is contained in:
Envy_PC 2025-04-13 21:54:13 +09:00
parent 391cb0dc7e
commit 189ea70f3a
7 changed files with 876 additions and 494 deletions

74
app.py
View File

@ -1,74 +0,0 @@
import flet as ft
from database.db_manager import DatabaseManager
from ui.fault_finder import FaultFinderUI
def main(page: ft.Page):
"""
메인 애플리케이션 함수입니다.
Args:
page (ft.Page): Flet 페이지 객체
"""
# 페이지 설정
page.title = "고장 코드 검색 애플리케이션"
page.theme_mode = ft.ThemeMode.LIGHT
page.window_width = 1000
page.window_height = 800
page.window_min_width = 800
page.window_min_height = 600
page.padding = 10
page.bgcolor = ft.colors.WHITE
page.scroll = ft.ScrollMode.AUTO
# 데이터베이스 관리자 초기화
db_manager = DatabaseManager(db_path="fault_codes.db")
# UI 컴포넌트 초기화
fault_finder_ui = FaultFinderUI(page, db_manager)
# 앱 타이틀
app_title = ft.Container(
content=ft.Row([
ft.Icon(ft.icons.SEARCH, color=ft.colors.BLUE, size=32),
ft.Text("철도차량 고장코드 검색 시스템", size=24, weight=ft.FontWeight.BOLD),
], alignment=ft.MainAxisAlignment.CENTER),
margin=ft.margin.only(bottom=10),
padding=10,
bgcolor=ft.colors.BLUE_50,
border_radius=10,
)
# 메인 컨텐츠 영역
main_content = ft.Container(
content=fault_finder_ui.container,
expand=True,
bgcolor=ft.colors.WHITE,
border_radius=10,
border=ft.border.all(1, ft.colors.BLUE_200),
padding=10,
)
# 푸터
footer = ft.Container(
content=ft.Row([
ft.Text("© 2023 철도차량 정비 시스템", size=12, color=ft.colors.GREY_700),
ft.Text("버전 1.0", size=12, color=ft.colors.GREY_700),
], alignment=ft.MainAxisAlignment.SPACE_BETWEEN),
margin=ft.margin.only(top=10),
padding=10,
bgcolor=ft.colors.BLUE_50,
border_radius=10,
)
# 레이아웃 구성
page.add(
app_title,
main_content,
footer,
)
# 초기 데이터 로드
fault_finder_ui.load_all_codes()
if __name__ == "__main__":
ft.app(target=main)

View File

@ -18,7 +18,7 @@ def clean_text(text):
prefixes = [
"반응 : ", "반응\n1)", "반응\n0)",
"검지조건 : ", "검지조건\n1)", "검지조건\n0)",
"소거 : ", "소거조건\n1)", "소거조건\n0)",
"소거조건 : ", "소거조건\n1)", "소거조건\n0)",
"조치방법 : ", "조치방법\n1)", "조치방법\n0)",
"관련도면 : ", "관련도면\n1)", "관련도면\n0)"
]
@ -57,11 +57,11 @@ def clean_database():
cursor = conn.cursor()
# 테이블의 모든 레코드 가져오기
cursor.execute("SELECT * FROM woojin200")
cursor.execute("SELECT * FROM fault_code_list")
records = cursor.fetchall()
# 컬럼 이름 가져오기
cursor.execute("PRAGMA table_info(woojin200)")
cursor.execute("PRAGMA table_info(fault_code_list)")
columns = [column[1] for column in cursor.fetchall()]
# 업데이트할 레코드 준비
@ -71,22 +71,22 @@ def clean_database():
record_dict = dict(zip(columns, record))
# 각 텍스트 필드 정제
record_dict['reaction'] = clean_text(record_dict['reaction'])
record_dict['detect_condition'] = clean_text(record_dict['detect_condition'])
# record_dict['reaction'] = clean_text(record_dict['reaction'])
# record_dict['detect_condition'] = clean_text(record_dict['detect_condition'])
record_dict['clear_condition'] = clean_text(record_dict['clear_condition'])
# action에서 drawing 추출
record_dict['action'], record_dict['drawing'] = extract_drawing(record_dict['action'])
record_dict['action'] = clean_text(record_dict['action'])
# record_dict['action'], record_dict['drawing'] = extract_drawing(record_dict['action'])
# record_dict['action'] = clean_text(record_dict['action'])
# 업데이트할 레코드 준비
updated_record = tuple(record_dict[col] for col in columns)
updated_records.append(updated_record)
# 테이블 업데이트
cursor.execute("DELETE FROM woojin200")
cursor.execute("DELETE FROM fault_code_list")
cursor.executemany(f"""
INSERT INTO woojin200 ({','.join(columns)})
INSERT INTO fault_code_list ({','.join(columns)})
VALUES ({','.join(['?']*len(columns))})
""", updated_records)

82
clean_database_admin.py Normal file
View File

@ -0,0 +1,82 @@
import sqlite3
def clean_fault_code_list():
"""
fault_code_list 테이블에서 외래키(manufacturer_id) 설정되지 않은 레코드를 삭제합니다.
스크립트는 관리자용으로, 데이터베이스 정리 전용입니다.
"""
conn = sqlite3.connect('fault_codes.db')
cursor = conn.cursor()
try:
# 현재 상태 확인
cursor.execute("SELECT COUNT(*) FROM fault_code_list")
total_records = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM fault_code_list WHERE manufacturer_id IS NULL")
null_records = cursor.fetchone()[0]
print(f"현재 총 레코드 수: {total_records}")
print(f"manufacturer_id가 NULL인 레코드 수: {null_records}")
# 외래키 제약 활성화
cursor.execute("PRAGMA foreign_keys = ON")
# 외래키가 설정되지 않은 레코드 삭제
cursor.execute("DELETE FROM fault_code_list WHERE manufacturer_id IS NULL")
deleted = cursor.rowcount
print(f"삭제된 레코드 수: {deleted}")
# 제작사 테이블 중복 제거
cursor.execute("""
DELETE FROM manufacturers
WHERE id NOT IN (
SELECT MIN(id)
FROM manufacturers
GROUP BY name
)
""")
deleted_manufacturers = cursor.rowcount
print(f"제거된 중복 제작사 수: {deleted_manufacturers}")
# 외래키 설정 업데이트
cursor.execute("""
UPDATE fault_code_list
SET manufacturer_id = (
SELECT id FROM manufacturers
WHERE name = '우진'
LIMIT 1
)
WHERE manufacturer_id IS NULL
""")
updated = cursor.rowcount
print(f"업데이트된 레코드 수: {updated}")
# 최종 상태 확인
cursor.execute("SELECT COUNT(*) FROM fault_code_list")
final_records = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM manufacturers")
manufacturer_count = cursor.fetchone()[0]
print(f"최종 레코드 수: {final_records}")
print(f"제작사 수: {manufacturer_count}")
conn.commit()
print("데이터베이스 정리가 완료되었습니다.")
except sqlite3.Error as e:
conn.rollback()
print(f"오류 발생: {e}")
finally:
conn.close()
if __name__ == "__main__":
user_input = input("이 스크립트는 데이터베이스에서 외래키가 설정되지 않은 레코드를 삭제합니다. 계속하시겠습니까? (y/n): ")
if user_input.lower() == 'y':
clean_fault_code_list()
else:
print("스크립트 실행이 취소되었습니다.")

View File

@ -2,7 +2,7 @@ import sqlite3
from typing import List, Dict, Optional
class DatabaseManager:
"""데이터베이스 관리 클래스"""
"""데이터베이스 관리 클래스 - 조회 전용"""
def __init__(self, db_path: str = "fault_codes.db"):
"""
@ -11,93 +11,181 @@ class DatabaseManager:
db_path (str): 데이터베이스 파일 경로
"""
self.db_path = db_path
self._create_tables()
self.conn = None
def _create_tables(self):
"""필요한 테이블 생성"""
conn = sqlite3.connect(self.db_path)
def _get_connection(self):
"""데이터베이스 연결을 반환합니다."""
return sqlite3.connect(self.db_path)
def get_manufacturers(self) -> List[str]:
"""제작사 목록을 가져옵니다."""
conn = self._get_connection()
cursor = conn.cursor()
# 고장 코드 테이블 생성
cursor.execute('''
CREATE TABLE IF NOT EXISTS fault_codes (
code TEXT PRIMARY KEY,
description TEXT,
solution TEXT
)
''')
conn.commit()
cursor.execute("SELECT name FROM manufacturers ORDER BY name")
manufacturers = [row[0] for row in cursor.fetchall()]
conn.close()
return manufacturers
def search_fault_codes(self, search_term: str) -> List[Dict[str, str]]:
"""
고장 코드 검색
Args:
search_term (str): 검색어
Returns:
List[Dict[str, str]]: 검색 결과 목록
"""
conn = sqlite3.connect(self.db_path)
def get_fault_types(self) -> List[str]:
"""고장 타입 목록을 가져옵니다."""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT fault_type FROM fault_code_list WHERE fault_type IS NOT NULL ORDER BY fault_type")
types = [row[0] for row in cursor.fetchall()]
conn.close()
return types
def get_fault_codes(self, manufacturer: Optional[str] = None,
fault_type: Optional[str] = None) -> List[Dict]:
"""고장 코드 목록을 가져옵니다."""
conn = self._get_connection()
cursor = conn.cursor()
if search_term:
# 코드나 설명에서 검색어 찾기
cursor.execute('''
SELECT fault_code, fault_name, fault_level, fault_type,
fault_detail, reaction, detect_condition, clear_condition, action
FROM fault_codes
WHERE fault_name LIKE ? OR fault_detail LIKE ?
ORDER BY fault_code
''', (f'%{search_term}%', f'%{search_term}%'))
else:
# 전체 목록 가져오기
cursor.execute('''
SELECT fault_code, fault_name, fault_level, fault_type,
fault_detail, reaction, detect_condition, clear_condition, action
FROM fault_codes
ORDER BY fault_code
''')
query = """
SELECT f.fault_code, f.fault_name, f.fault_type, m.name as manufacturer
FROM fault_code_list f
LEFT JOIN manufacturers m ON f.manufacturer_id = m.id
WHERE f.manufacturer_id IS NOT NULL
"""
params = []
if manufacturer:
query += " AND m.name = ?"
params.append(manufacturer)
if fault_type:
query += " AND f.fault_type = ?"
params.append(fault_type)
query += " ORDER BY f.fault_code"
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
return [
{
'code': str(code),
'description': f"[{level}] {name} - {detail}",
'solution': f"조치방법:\n{action}\n\n검지조건:\n{detect}\n\n소거조건:\n{clear}"
'code': row[0],
'name': row[1],
'type': row[2],
'manufacturer': row[3]
}
for code, name, level, type_, detail, reaction, detect, clear, action in results
for row in results
]
def add_fault(self, code: str, description: str, solution: Optional[str] = None,
category: Optional[str] = None) -> bool:
"""
새로운 고장 코드를 추가합니다.
def search_fault_codes(self, search_term: str) -> List[Dict]:
"""고장 코드를 검색합니다."""
conn = self._get_connection()
cursor = conn.cursor()
Args:
code (str): 고장 코드
description (str): 설명
solution (Optional[str]): 해결 방법
category (Optional[str]): 카테고리
Returns:
bool: 추가 성공 여부
query = """
SELECT f.fault_code, f.fault_name, f.fault_type, m.name as manufacturer
FROM fault_code_list f
LEFT JOIN manufacturers m ON f.manufacturer_id = m.id
WHERE f.manufacturer_id IS NOT NULL
AND (f.fault_code LIKE ? OR f.fault_name LIKE ?)
ORDER BY f.fault_code
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO fault_codes (code, description, solution, category)
VALUES (?, ?, ?, ?)
""", (code, description, solution, category))
conn.commit()
return True
except sqlite3.Error:
return False
cursor.execute(query, (f'%{search_term}%', f'%{search_term}%'))
results = cursor.fetchall()
conn.close()
return [
{
'code': row[0],
'name': row[1],
'type': row[2],
'manufacturer': row[3]
}
for row in results
]
def get_abbreviations(self) -> List[Dict]:
"""약어 목록을 가져옵니다."""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT abbreviation, description FROM abbreviations ORDER BY abbreviation")
results = cursor.fetchall()
conn.close()
return [
{
'abbreviation': row[0],
'description': row[1]
}
for row in results
]
def get_drawings(self) -> List[Dict]:
"""도면 목록을 가져옵니다."""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT drawing_code, drawing_name, related_device FROM drawings ORDER BY drawing_code")
results = cursor.fetchall()
conn.close()
return [
{
'code': row[0],
'name': row[1],
'device': row[2]
}
for row in results
]
def get_signals(self) -> List[Dict]:
"""신호 목록을 가져옵니다."""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT signal_code, signal_name, signal_type FROM signals ORDER BY signal_code")
results = cursor.fetchall()
conn.close()
return [
{
'code': row[0],
'name': row[1],
'type': row[2]
}
for row in results
]
def get_fault_detail(self, fault_code: str) -> Optional[Dict]:
"""특정 고장 코드의 상세 정보를 가져옵니다."""
conn = self._get_connection()
cursor = conn.cursor()
query = """
SELECT f.fault_code, f.fault_name, f.fault_level, f.fault_type,
f.fault_detail, f.reaction, f.detect_condition, f.clear_condition,
f.action, f.drawing, m.name as manufacturer
FROM fault_code_list f
LEFT JOIN manufacturers m ON f.manufacturer_id = m.id
WHERE f.fault_code = ? AND f.manufacturer_id IS NOT NULL
"""
cursor.execute(query, (fault_code,))
result = cursor.fetchone()
conn.close()
if not result:
return None
return {
'code': result[0],
'name': result[1],
'level': result[2],
'type': result[3],
'detail': result[4],
'reaction': result[5],
'detect_condition': result[6],
'clear_condition': result[7],
'action': result[8],
'drawing': result[9],
'manufacturer': result[10]
}
def __del__(self):
"""데이터베이스 연결을 종료합니다."""
if hasattr(self, 'conn'):
if hasattr(self, 'conn') and self.conn:
self.conn.close()

Binary file not shown.

View File

@ -28,6 +28,7 @@ def main(page: ft.Page):
# 초기 로딩 시 전체 코드 목록 표시
fault_finder.load_all_codes()
page.update() # UI 업데이트 강제 호출
if __name__ == "__main__":
ft.app(target=main)

File diff suppressed because it is too large Load Diff