BookMakerAdder/test/t3.py

371 lines
16 KiB
Python

import eel
import sqlite3
import subprocess
import json
import threading
import logging
import traceback
import os
import psutil
import glob
import pandas as pd
from datetime import datetime, time
# 로깅 기본 설정
logging.basicConfig(level=logging.INFO)
# =====================================================================
# DB 관련 기능을 담당하는 모듈 (DBHandler)
# =====================================================================
class DBHandler:
def __init__(self, db_path="markets.db"):
self.db_path = db_path
self.ensure_db()
def ensure_db(self):
if not os.path.exists(self.db_path):
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE markets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
country TEXT,
mall_grade TEXT,
mall_name TEXT,
mall_url TEXT,
extract_count INTEGER DEFAULT 0
)
""")
conn.commit()
conn.close()
def load_excel(self, file_path, remove_existing=False, log_callback=print):
try:
ext = os.path.splitext(file_path)[-1].lower()
if ext == ".xls":
df = pd.read_excel(file_path, sheet_name=0, engine="xlrd")
elif ext == ".xlsx":
df = pd.read_excel(file_path, sheet_name=0, engine="openpyxl")
else:
log_callback("지원되지 않는 파일 형식입니다. (.xls 또는 .xlsx)")
return "지원되지 않는 파일 형식입니다. (.xls 또는 .xlsx)"
required_columns = ['country', 'mall_grade', 'mall_name', 'mall_url']
for col in required_columns:
if col not in df.columns:
df[col] = ""
df = df[required_columns]
df.drop_duplicates(subset=required_columns, inplace=True)
conn = sqlite3.connect(self.db_path)
if remove_existing:
conn.execute("DROP TABLE IF EXISTS markets")
conn.execute("""
CREATE TABLE markets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
country TEXT,
mall_grade TEXT,
mall_name TEXT,
mall_url TEXT,
extract_count INTEGER DEFAULT 0
)
""")
try:
conn.execute("ALTER TABLE markets ADD COLUMN extract_count INTEGER DEFAULT 0")
except sqlite3.OperationalError:
pass
for _, row in df.iterrows():
conn.execute("""
INSERT INTO markets (country, mall_grade, mall_name, mall_url)
VALUES (?, ?, ?, ?)
""", (row['country'], row['mall_grade'], row['mall_name'], row['mall_url']))
conn.commit()
conn.close()
log_callback("DB 저장 완료")
return "DB 저장 완료"
except Exception as e:
msg = f"엑셀 로드 에러: {e}\n{traceback.format_exc()}"
log_callback(msg)
return msg
def view_data(self):
conn = sqlite3.connect(self.db_path)
df = pd.read_sql_query("SELECT * FROM markets", conn)
conn.close()
# HTML 테이블 형식으로 반환
return df.to_html()
def reset_extract_count(self, log_callback=print):
conn = sqlite3.connect(self.db_path)
conn.execute("UPDATE markets SET extract_count = 0")
conn.commit()
conn.close()
log_callback("모든 추출 횟수가 초기화되었습니다.")
return "모든 추출 횟수가 초기화되었습니다."
def extract_bookmarks(self, country, grade, count, extract_based, max_extract, log_callback=print):
try:
conn = sqlite3.connect(self.db_path)
query = "SELECT id, mall_name AS name, mall_url AS url FROM markets WHERE 1=1"
if country != "랜덤":
query += f" AND country = '{country}'"
if grade != "랜덤":
query += f" AND mall_grade = '{grade}'"
if extract_based:
query += " AND extract_count < ? ORDER BY extract_count ASC, RANDOM()"
query += f" LIMIT {count}"
df = pd.read_sql_query(query, conn, params=(max_extract,))
else:
query += " ORDER BY RANDOM()"
query += f" LIMIT {count}"
df = pd.read_sql_query(query, conn)
if df.empty:
log_callback("추출 가능한 데이터가 없습니다.")
conn.close()
return None
for record in df.to_dict("records"):
conn.execute("UPDATE markets SET extract_count = extract_count + 1 WHERE id = ?", (record["id"],))
conn.commit()
conn.close()
log_callback(f"{len(df)}개의 북마크를 추출했습니다.")
return df.to_dict("records")
except Exception as e:
msg = f"DB 쿼리 실행 오류: {e}"
log_callback(msg)
return None
# =====================================================================
# 북마크 추가 작업을 백그라운드로 처리하는 모듈 (BookmarkWorker)
# =====================================================================
class BookmarkWorker(threading.Thread):
def __init__(self, bookmarks, folder_name, bookmarks_path, browser_path, selected_browser, remove_existing,
progress_callback, log_callback, completed_callback):
super().__init__()
self.bookmarks = bookmarks
self.folder_name = folder_name
self.bookmarks_path = bookmarks_path
self.browser_path = browser_path
self.selected_browser = selected_browser
self.remove_existing = remove_existing
self.progress_callback = progress_callback
self.log_callback = log_callback
self.completed_callback = completed_callback
def run(self):
try:
if not os.path.exists(self.bookmarks_path):
self.log_callback(f"즐겨찾기 JSON 파일을 찾을 수 없습니다: {self.bookmarks_path}")
return
try:
with open(self.bookmarks_path, "r", encoding="utf-8") as file:
file_content = file.read().strip()
if not file_content:
bookmarks_data = {"roots": {"bookmark_bar": {"children": []}}}
self.log_callback("JSON 파일이 비어 있어 기본값으로 초기화합니다.")
else:
bookmarks_data = json.loads(file_content)
except json.JSONDecodeError as e:
self.log_callback(f"JSON 파싱 오류: {e}")
bookmarks_data = {"roots": {"bookmark_bar": {"children": []}}}
if self.remove_existing:
bookmarks_data["roots"]["bookmark_bar"] = self.remove_existing_bookmarks(
bookmarks_data["roots"]["bookmark_bar"]
)
total = len(self.bookmarks)
bookmark_bar = bookmarks_data["roots"]["bookmark_bar"]
current_time = datetime.now().strftime("%m-%d-%H-%M-%S")
parent_folder_name = f"거상북마크-{current_time}"
parent_folder = {"type": "folder", "name": parent_folder_name, "children": []}
chunk_size = 100
for idx, start in enumerate(range(0, total, chunk_size), start=1):
sub_folder = {
"type": "folder",
"name": f"거상북마크-{self.folder_name}-{idx}",
"children": []
}
for bm in self.bookmarks[start:start+chunk_size]:
sub_folder["children"].append({
"type": "url",
"name": bm["name"],
"url": bm["url"]
})
parent_folder["children"].append(sub_folder)
progress = int((start + min(chunk_size, total - start)) / total * 100)
self.progress_callback(progress)
bookmark_bar["children"].append(parent_folder)
with open(self.bookmarks_path, "w", encoding="utf-8") as f:
json.dump(bookmarks_data, f, indent=4, ensure_ascii=False)
self.log_callback("즐겨찾기 추가 완료!")
self.run_browser_with_profile(self.browser_path, self.bookmarks_path, self.selected_browser)
self.run_and_focus_copyman()
self.completed_callback()
except Exception as e:
self.log_callback(f"작업 오류: {e}\n{traceback.format_exc()}")
self.progress_callback(0)
def run_browser_with_profile(self, browser_path, bookmarks_path, browser_name):
if "웨일" in browser_name:
for proc in psutil.process_iter(attrs=["pid", "name"]):
if "whale" in proc.info["name"].lower():
try:
proc.terminate()
proc.wait(timeout=5)
self.log_callback("웨일 프로세스 종료됨.")
except Exception as e:
self.log_callback(f"웨일 종료 오류: {e}")
if os.path.exists(browser_path):
profile_dir = os.path.basename(os.path.dirname(bookmarks_path))
if "크롬" in browser_name.lower():
page_url = "chrome://bookmarks/"
elif "웨일" in browser_name.lower():
page_url = "whale://bookmarks/"
else:
page_url = "about:blank"
if profile_dir:
subprocess.Popen([browser_path, f"--profile-directory={profile_dir}", page_url])
self.log_callback(f"{browser_name} 프로필 {profile_dir}로 북마크 열기")
else:
subprocess.Popen([browser_path, page_url])
self.log_callback(f"{browser_name} 기본 프로필로 북마크 열기")
else:
self.log_callback(f"{browser_name} 경로가 올바르지 않습니다: {browser_path}")
def run_and_focus_copyman(self):
program_name = "@카피맨.exe"
shortcut_name = "@카피맨"
window_title_start = "카피맨"
try:
pid = self.is_program_running(program_name)
if pid:
self.log_callback(f"{program_name} 실행 중 (PID: {pid})")
if not self.focus_window_by_title(window_title_start):
self.log_callback("카피맨 창을 찾지 못함.")
else:
self.log_callback("카피맨 실행 안됨. 실행 시도...")
shortcut = self.find_shortcut_in_start_menu(shortcut_name)
if shortcut:
self.run_program(shortcut)
else:
self.log_callback("카피맨 바로가기 없음.")
except Exception as e:
self.log_callback(f"카피맨 실행 오류: {e}\n{traceback.format_exc()}")
def is_program_running(self, process_name):
for proc in psutil.process_iter(attrs=["pid", "name"]):
if process_name.lower() in proc.info["name"].lower():
return proc.info["pid"]
return None
def focus_window_by_title(self, title_start):
try:
import pygetwindow as gw
for window in gw.getAllWindows():
if window.title and window.title.startswith(title_start):
try:
window.activate()
self.log_callback(f"창 활성화: {window.title}")
return True
except Exception as e:
self.log_callback(f"창 활성화 실패: {e}")
return False
return False
except Exception as e:
self.log_callback(f"포커스 전환 오류: {e}")
return False
def find_shortcut_in_start_menu(self, shortcut_name):
user_menu = os.path.expandvars(r"%APPDATA%\Microsoft\Windows\Start Menu\Programs")
all_users_menu = os.path.expandvars(r"%ProgramData%\Microsoft\Windows\Start Menu\Programs")
for path in [user_menu, all_users_menu]:
shortcut = glob.glob(os.path.join(path, f"**\\{shortcut_name}.lnk"), recursive=True)
if shortcut:
return shortcut[0]
return None
def run_program(self, shortcut_path):
try:
subprocess.Popen([shortcut_path], shell=True)
self.log_callback(f"프로그램 실행: {shortcut_path}")
except Exception as e:
self.log_callback(f"실행 오류: {e}")
def remove_existing_bookmarks(self, node):
if not isinstance(node, dict):
return node
if node.get("type") == "folder" and node.get("name", "").startswith("거상북마크"):
self.log_callback(f"제거된 폴더: {node.get('name')}")
return None
if "children" in node:
node["children"] = [self.remove_existing_bookmarks(child)
for child in node["children"]
if self.remove_existing_bookmarks(child) is not None]
return node
# 전역 DBHandler 생성
db_handler = DBHandler()
# =====================================================================
# Eel을 통한 UI와 백엔드 연동
# =====================================================================
@eel.expose
def load_excel(file_path, remove_existing):
result = db_handler.load_excel(file_path, remove_existing, log_callback=print)
eel.log(result)
@eel.expose
def view_data():
html_data = db_handler.view_data()
eel.show_data(html_data)
@eel.expose
def reset_extract_count():
result = db_handler.reset_extract_count(log_callback=print)
eel.log(result)
@eel.expose
def run_task(country, grade, count, remove_existing, extract_based, max_extract, browser_choice, chrome_path, whale_path):
bookmarks = db_handler.extract_bookmarks(country, grade, int(count), extract_based, int(max_extract), log_callback=print)
if bookmarks is None:
eel.log("북마크 추출 실패")
return
folder_name = f"거상북마크-{grade}"
if browser_choice == "크롬":
# 기본 크롬 북마크 경로 (사용자 환경에 맞게 수정 필요)
selected_bookmarks_path = os.path.join(os.path.expandvars(r"%LOCALAPPDATA%\Google\Chrome\User Data\Default"), "Bookmarks")
selected_browser_path = chrome_path
selected_browser = "크롬"
eel.log("크롬 브라우저 선택됨.")
else:
selected_bookmarks_path = os.path.join(os.path.expandvars(r"%LOCALAPPDATA%\Naver\Naver Whale\User Data\Default"), "Bookmarks")
selected_browser_path = whale_path
selected_browser = "웨일"
eel.log("웨일 브라우저 선택됨.")
if not os.path.exists(selected_browser_path):
eel.log("브라우저 실행 파일 경로가 유효하지 않습니다.")
return
if not os.path.exists(selected_bookmarks_path):
eel.log("브라우저 북마크 경로가 유효하지 않습니다.")
return
def progress_callback(val):
eel.update_progress(val)
def log_callback(msg):
eel.log(msg)
def completed_callback():
eel.log("작업 완료!")
eel.task_completed()
worker = BookmarkWorker(bookmarks, folder_name, selected_bookmarks_path,
selected_browser_path, selected_browser, remove_existing,
progress_callback, log_callback, completed_callback)
worker.start()
# Eel 초기화 (웹 폴더 내부에 index.html 파일이 있어야 합니다.)
eel.init("web")
# Eel 앱 실행 (index.html 열림)
eel.start("index.html", size=(900,700))