BookMakerAdder/t2.py

542 lines
22 KiB
Python

import flet as ft
import sqlite3
import subprocess
import json
import threading
import logging
import traceback
import os
import psutil
import glob
import pandas as pd
from datetime import datetime, time
# 로깅 기본 설정
logging.basicConfig(level=logging.INFO)
# =====================================================================
# DB 관련 기능을 담당하는 모듈 (DBHandler)
# =====================================================================
class DBHandler:
def __init__(self, db_path="markets.db"):
self.db_path = db_path
self.ensure_db()
def ensure_db(self):
if not os.path.exists(self.db_path):
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE markets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
country TEXT,
mall_grade TEXT,
mall_name TEXT,
mall_url TEXT,
extract_count INTEGER DEFAULT 0
)
""")
conn.commit()
conn.close()
def load_excel(self, file_path, remove_existing=False, log_callback=print):
try:
ext = os.path.splitext(file_path)[-1].lower()
if ext == ".xls":
df = pd.read_excel(file_path, sheet_name=0, engine="xlrd")
elif ext == ".xlsx":
df = pd.read_excel(file_path, sheet_name=0, engine="openpyxl")
else:
log_callback("지원되지 않는 파일 형식입니다. (.xls 또는 .xlsx)")
return
required_columns = ['country', 'mall_grade', 'mall_name', 'mall_url']
for col in required_columns:
if col not in df.columns:
df[col] = ""
df = df[required_columns]
df.drop_duplicates(subset=required_columns, inplace=True)
conn = sqlite3.connect(self.db_path)
if remove_existing:
conn.execute("DROP TABLE IF EXISTS markets")
conn.execute("""
CREATE TABLE markets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
country TEXT,
mall_grade TEXT,
mall_name TEXT,
mall_url TEXT,
extract_count INTEGER DEFAULT 0
)
""")
try:
conn.execute("ALTER TABLE markets ADD COLUMN extract_count INTEGER DEFAULT 0")
except sqlite3.OperationalError:
pass
for _, row in df.iterrows():
conn.execute("""
INSERT INTO markets (country, mall_grade, mall_name, mall_url)
VALUES (?, ?, ?, ?)
""", (row['country'], row['mall_grade'], row['mall_name'], row['mall_url']))
conn.commit()
conn.close()
log_callback("DB 저장 완료")
except Exception as e:
log_callback(f"엑셀 로드 에러: {e}\n{traceback.format_exc()}")
def view_data(self):
conn = sqlite3.connect(self.db_path)
df = pd.read_sql_query("SELECT * FROM markets", conn)
conn.close()
return df
def reset_extract_count(self, log_callback=print):
conn = sqlite3.connect(self.db_path)
conn.execute("UPDATE markets SET extract_count = 0")
conn.commit()
conn.close()
log_callback("모든 추출 횟수가 초기화되었습니다.")
def extract_bookmarks(self, country, grade, count, extract_based, max_extract, log_callback=print):
try:
conn = sqlite3.connect(self.db_path)
query = "SELECT id, mall_name AS name, mall_url AS url FROM markets WHERE 1=1"
if country != "랜덤":
query += f" AND country = '{country}'"
if grade != "랜덤":
query += f" AND mall_grade = '{grade}'"
if extract_based:
query += " AND extract_count < ? ORDER BY extract_count ASC, RANDOM()"
query += f" LIMIT {count}"
df = pd.read_sql_query(query, conn, params=(max_extract,))
else:
query += " ORDER BY RANDOM()"
query += f" LIMIT {count}"
df = pd.read_sql_query(query, conn)
if df.empty:
log_callback("추출 가능한 데이터가 없습니다.")
conn.close()
return None
# 추출 횟수 업데이트
for record in df.to_dict("records"):
conn.execute("UPDATE markets SET extract_count = extract_count + 1 WHERE id = ?", (record["id"],))
conn.commit()
conn.close()
log_callback(f"{len(df)}개의 북마크를 추출했습니다.")
return df.to_dict("records")
except Exception as e:
log_callback(f"DB 쿼리 실행 오류: {e}")
return None
# =====================================================================
# 북마크 추가 작업을 백그라운드로 처리하는 모듈 (BookmarkWorker)
# =====================================================================
class BookmarkWorker(threading.Thread):
def __init__(self, bookmarks, folder_name, bookmarks_path, browser_path, selected_browser, remove_existing,
progress_callback, log_callback, completed_callback):
super().__init__()
self.bookmarks = bookmarks
self.folder_name = folder_name
self.bookmarks_path = bookmarks_path
self.browser_path = browser_path
self.selected_browser = selected_browser
self.remove_existing = remove_existing
self.progress_callback = progress_callback
self.log_callback = log_callback
self.completed_callback = completed_callback
def run(self):
try:
if not os.path.exists(self.bookmarks_path):
self.log_callback(f"즐겨찾기 JSON 파일을 찾을 수 없습니다: {self.bookmarks_path}")
return
try:
with open(self.bookmarks_path, "r", encoding="utf-8") as file:
file_content = file.read().strip()
if not file_content:
bookmarks_data = {"roots": {"bookmark_bar": {"children": []}}}
self.log_callback("JSON 파일이 비어 있어 기본값으로 초기화합니다.")
else:
bookmarks_data = json.loads(file_content)
except json.JSONDecodeError as e:
self.log_callback(f"JSON 파싱 오류: {e}")
bookmarks_data = {"roots": {"bookmark_bar": {"children": []}}}
if self.remove_existing:
bookmarks_data["roots"]["bookmark_bar"] = self.remove_existing_bookmarks(
bookmarks_data["roots"]["bookmark_bar"]
)
total = len(self.bookmarks)
bookmark_bar = bookmarks_data["roots"]["bookmark_bar"]
current_time = datetime.now().strftime("%m-%d-%H-%M-%S")
parent_folder_name = f"거상북마크-{current_time}"
parent_folder = {"type": "folder", "name": parent_folder_name, "children": []}
chunk_size = 100
for idx, start in enumerate(range(0, total, chunk_size), start=1):
sub_folder = {
"type": "folder",
"name": f"거상북마크-{self.folder_name}-{idx}",
"children": []
}
for bm in self.bookmarks[start:start+chunk_size]:
sub_folder["children"].append({
"type": "url",
"name": bm["name"],
"url": bm["url"]
})
parent_folder["children"].append(sub_folder)
progress = int((start + min(chunk_size, total - start)) / total * 100)
self.progress_callback(progress)
bookmark_bar["children"].append(parent_folder)
with open(self.bookmarks_path, "w", encoding="utf-8") as f:
json.dump(bookmarks_data, f, indent=4, ensure_ascii=False)
self.log_callback("즐겨찾기 추가 완료!")
self.run_browser_with_profile(self.browser_path, self.bookmarks_path, self.selected_browser)
self.run_and_focus_copyman()
self.completed_callback()
except Exception as e:
self.log_callback(f"작업 오류: {e}\n{traceback.format_exc()}")
self.progress_callback(0)
def run_browser_with_profile(self, browser_path, bookmarks_path, browser_name):
if "웨일" in browser_name:
for proc in psutil.process_iter(attrs=["pid", "name"]):
if "whale" in proc.info["name"].lower():
try:
proc.terminate()
proc.wait(timeout=5)
self.log_callback("웨일 프로세스 종료됨.")
except Exception as e:
self.log_callback(f"웨일 종료 오류: {e}")
if os.path.exists(browser_path):
profile_dir = os.path.basename(os.path.dirname(bookmarks_path))
if "크롬" in browser_name.lower():
page_url = "chrome://bookmarks/"
elif "웨일" in browser_name.lower():
page_url = "whale://bookmarks/"
else:
page_url = "about:blank"
if profile_dir:
subprocess.Popen([browser_path, f"--profile-directory={profile_dir}", page_url])
self.log_callback(f"{browser_name} 프로필 {profile_dir}로 북마크 열기")
else:
subprocess.Popen([browser_path, page_url])
self.log_callback(f"{browser_name} 기본 프로필로 북마크 열기")
else:
self.log_callback(f"{browser_name} 경로가 올바르지 않습니다: {browser_path}")
def run_and_focus_copyman(self):
program_name = "@카피맨.exe"
shortcut_name = "@카피맨"
window_title_start = "카피맨"
try:
pid = self.is_program_running(program_name)
if pid:
self.log_callback(f"{program_name} 실행 중 (PID: {pid})")
if not self.focus_window_by_title(window_title_start):
self.log_callback("카피맨 창을 찾지 못함.")
else:
self.log_callback("카피맨 실행 안됨. 실행 시도...")
shortcut = self.find_shortcut_in_start_menu(shortcut_name)
if shortcut:
self.run_program(shortcut)
else:
self.log_callback("카피맨 바로가기 없음.")
except Exception as e:
self.log_callback(f"카피맨 실행 오류: {e}\n{traceback.format_exc()}")
def is_program_running(self, process_name):
for proc in psutil.process_iter(attrs=["pid", "name"]):
if process_name.lower() in proc.info["name"].lower():
return proc.info["pid"]
return None
def focus_window_by_title(self, title_start):
try:
import pygetwindow as gw
for window in gw.getAllWindows():
if window.title and window.title.startswith(title_start):
try:
window.activate()
self.log_callback(f"창 활성화: {window.title}")
return True
except Exception as e:
self.log_callback(f"창 활성화 실패: {e}")
return False
return False
except Exception as e:
self.log_callback(f"포커스 전환 오류: {e}")
return False
def find_shortcut_in_start_menu(self, shortcut_name):
user_menu = os.path.expandvars(r"%APPDATA%\Microsoft\Windows\Start Menu\Programs")
all_users_menu = os.path.expandvars(r"%ProgramData%\Microsoft\Windows\Start Menu\Programs")
for path in [user_menu, all_users_menu]:
shortcut = glob.glob(os.path.join(path, f"**\\{shortcut_name}.lnk"), recursive=True)
if shortcut:
return shortcut[0]
return None
def run_program(self, shortcut_path):
try:
subprocess.Popen([shortcut_path], shell=True)
self.log_callback(f"프로그램 실행: {shortcut_path}")
except Exception as e:
self.log_callback(f"실행 오류: {e}")
def remove_existing_bookmarks(self, node):
if not isinstance(node, dict):
return node
if node.get("type") == "folder" and node.get("name", "").startswith("거상북마크"):
self.log_callback(f"제거된 폴더: {node.get('name')}")
return None
if "children" in node:
node["children"] = [self.remove_existing_bookmarks(child)
for child in node["children"]
if self.remove_existing_bookmarks(child) is not None]
return node
# =====================================================================
# Flet UI와 백엔드 모듈을 통합한 메인 앱
# =====================================================================
def main(page: ft.Page):
page.title = "모듈화된 백엔드 & Flet 앱"
page.horizontal_alignment = "center"
page.vertical_alignment = "start"
page.padding = 20
# 기본 비밀번호 및 DB 핸들러 생성
stored_password = "365"
user_password = None
db_handler = DBHandler()
bookmarks_global = [] # 추출된 북마크 저장용
# 로그와 진행률 UI
log_text = ft.Text(value="", size=12)
progress_bar = ft.ProgressBar(width=300, value=0)
def log(msg):
nonlocal log_text
log_text.value += msg + "\n"
page.update()
def update_progress(val):
progress_bar.value = val / 100
page.update()
def task_completed():
log("작업 완료!")
dlg = ft.AlertDialog(
title=ft.Text("완료"),
content=ft.Text("즐겨찾기 추가 작업이 완료되었습니다."),
actions=[ft.TextButton("닫기", on_click=lambda e: close_dialog())],
modal=True
)
page.dialog = dlg
dlg.open = True
page.update()
def close_dialog():
page.dialog.open = False
page.update()
# 비밀번호 입력 다이얼로그
def show_password_dialog():
def on_password_submit(e):
nonlocal user_password
user_password = pwd_field.value
if user_password != stored_password:
result = ft.AlertDialog(
title=ft.Text("비밀번호 오류"),
content=ft.Text("비밀번호가 일치하지 않습니다. 프로그램을 종료합니다."),
actions=[ft.TextButton("닫기", on_click=lambda e: page.window_close())],
modal=True
)
page.dialog = result
result.open = True
page.update()
else:
pwd_dlg.open = False
page.update()
pwd_field = ft.TextField(label="비밀번호", password=True)
pwd_dlg = ft.AlertDialog(
title=ft.Text("비밀번호 입력"),
content=ft.Column([pwd_field]),
actions=[ft.TextButton("확인", on_click=on_password_submit)],
modal=True
)
page.dialog = pwd_dlg
pwd_dlg.open = True
page.update()
show_password_dialog()
# =================================================================
# UI 컨트롤 구성
# =================================================================
country_dropdown = ft.Dropdown(
label="국가",
options=[
ft.dropdown.Option("미국"),
ft.dropdown.Option("유럽"),
ft.dropdown.Option("중국"),
ft.dropdown.Option("일본"),
ft.dropdown.Option("한국"),
ft.dropdown.Option("기타"),
ft.dropdown.Option("랜덤")
],
value="중국"
)
grade_dropdown = ft.Dropdown(
label="등급",
options=[
ft.dropdown.Option("일반"),
ft.dropdown.Option("파워"),
ft.dropdown.Option("빅파워"),
ft.dropdown.Option("랜덤")
],
value="랜덤"
)
count_field = ft.TextField(label="갯수", value="1000", keyboard_type=ft.KeyboardType.NUMBER)
remove_existing_checkbox = ft.Checkbox(label="기존 북마크 제거", value=False)
extract_based_checkbox = ft.Checkbox(label="추출 횟수 기반 추출", value=False)
max_extract_field = ft.TextField(label="최대 추출 횟수", value="1", keyboard_type=ft.KeyboardType.NUMBER)
browser_dropdown = ft.Dropdown(
label="브라우저 선택",
options=[
ft.dropdown.Option("크롬"),
ft.dropdown.Option("웨일")
],
value="크롬"
)
# 파일 선택: 엑셀 파일 입력
file_picker_excel = ft.FilePicker(on_result=lambda e: on_excel_selected(e))
page.overlay.append(file_picker_excel)
def on_excel_selected(e: ft.FilePickerResultEvent):
if e.files is not None and len(e.files) > 0:
file_path = e.files[0].path
db_handler.load_excel(file_path, remove_existing_checkbox.value, log)
else:
log("엑셀 파일 선택 취소됨.")
db_input_button = ft.ElevatedButton("DB 입력", on_click=lambda e: file_picker_excel.pick_files(
allow_multiple=False, file_type=ft.FilePickerFileType.CUSTOM, allowed_extensions=["xlsx", "xls"]
))
view_data_button = ft.ElevatedButton("데이터 보기", on_click=lambda e: view_data_action())
def view_data_action():
df = db_handler.view_data()
if df.empty:
log("DB에 데이터가 없습니다.")
return
dlg = ft.AlertDialog(
title=ft.Text("DB 데이터"),
content=ft.Text(str(df)),
actions=[ft.TextButton("닫기", on_click=lambda e: close_dialog())],
modal=True
)
page.dialog = dlg
dlg.open = True
page.update()
reset_extract_button = ft.ElevatedButton("추출 횟수 초기화", on_click=lambda e: db_handler.reset_extract_count(log))
# 파일 선택: 브라우저 실행 파일 경로 설정 (Flet FilePicker 활용)
file_picker_browser = ft.FilePicker(on_result=lambda e: on_browser_selected(e))
page.overlay.append(file_picker_browser)
browser_path_field = ft.TextField(label="브라우저 경로", value="")
# 전역 변수 업데이트: 기본 경로 설정 (사용자 환경에 맞게 수정)
chrome_path = "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe"
chrome_bookmarks_path = os.path.join(os.path.expandvars(r"%LOCALAPPDATA%\Google\Chrome\User Data\Default"), "Bookmarks")
whale_path = "C:\\Program Files\\Naver\\Naver Whale\\Application\\whale.exe"
whale_bookmarks_path = os.path.join(os.path.expandvars(r"%LOCALAPPDATA%\Naver\Naver Whale\User Data\Default"), "Bookmarks")
def on_browser_selected(e: ft.FilePickerResultEvent):
nonlocal chrome_path, whale_path
if e.files and len(e.files) > 0:
file_path = e.files[0].path
browser_path_field.value = file_path
if browser_dropdown.value == "크롬":
chrome_path = file_path
else:
whale_path = file_path
log(f"브라우저 경로 설정: {file_path}")
page.update()
else:
log("브라우저 파일 선택 취소됨.")
chrome_path_button = ft.ElevatedButton("크롬 경로 설정", on_click=lambda e: file_picker_browser.pick_files(
allow_multiple=False, file_type=ft.FilePickerFileType.CUSTOM, allowed_extensions=["exe"]
))
whale_path_button = ft.ElevatedButton("웨일 경로 설정", on_click=lambda e: file_picker_browser.pick_files(
allow_multiple=False, file_type=ft.FilePickerFileType.CUSTOM, allowed_extensions=["exe"]
))
run_button = ft.ElevatedButton("실행")
def run_task(e):
country = country_dropdown.value
grade = grade_dropdown.value
try:
cnt = int(count_field.value)
except:
cnt = 1000
remove_existing = remove_existing_checkbox.value
extract_based = extract_based_checkbox.value
try:
max_extract = int(max_extract_field.value)
except:
max_extract = 1
bookmarks_extracted = db_handler.extract_bookmarks(country, grade, cnt, extract_based, max_extract, log)
if bookmarks_extracted is None:
return
nonlocal bookmarks_global
bookmarks_global = bookmarks_extracted
folder_name = f"거상북마크-{grade}"
if browser_dropdown.value == "크롬":
selected_bookmarks_path = chrome_bookmarks_path
selected_browser_path = chrome_path
selected_browser = "크롬"
log("크롬 브라우저 선택됨.")
else:
selected_bookmarks_path = whale_bookmarks_path
selected_browser_path = whale_path
selected_browser = "웨일"
log("웨일 브라우저 선택됨.")
if not os.path.exists(selected_browser_path):
log("브라우저 실행 파일 경로가 유효하지 않습니다.")
return
if not os.path.exists(selected_bookmarks_path):
log("브라우저 북마크 경로가 유효하지 않습니다.")
return
worker = BookmarkWorker(bookmarks_global, folder_name, selected_bookmarks_path,
selected_browser_path, selected_browser, remove_existing,
update_progress, log, task_completed)
worker.start()
run_button.on_click = run_task
controls_row = ft.Row(controls=[country_dropdown, grade_dropdown, count_field, remove_existing_checkbox])
extract_row = ft.Row(controls=[extract_based_checkbox, max_extract_field, reset_extract_button])
browser_row = ft.Row(controls=[browser_dropdown, chrome_path_button, whale_path_button, browser_path_field])
action_row = ft.Row(controls=[db_input_button, view_data_button, run_button])
page.add(controls_row, extract_row, browser_row, action_row, progress_bar, ft.Text("로그:"), log_text)
page.update()
ft.app(target=main)