280 lines
12 KiB
Python
280 lines
12 KiB
Python
import sqlite3
|
|
# import pandas as pd
|
|
import re, os
|
|
import configparser
|
|
|
|
from translatepy import Translator
|
|
from translatepy.translators.google import GoogleTranslate
|
|
|
|
class DatabaseManager:
|
|
def __init__(self, base_path, db_path, logger):
|
|
self.logger = logger
|
|
self.base_path = base_path
|
|
self.db_path = db_path
|
|
self.conn = sqlite3.connect(db_path, check_same_thread=False) # 스레드 간 공유 허용
|
|
self.logger.info("Database connection established.")
|
|
|
|
self.translator = Translator([GoogleTranslate()]) # 기본 Google 번역기 사용
|
|
|
|
self.exclude_categories = self._load_exclude_categories()
|
|
self._create_tables()
|
|
|
|
def translate_text(self, text):
|
|
"""
|
|
텍스트를 번역합니다. 번역 실패 시 원본 텍스트를 반환합니다.
|
|
|
|
:param text: 번역할 텍스트
|
|
:return: 번역된 텍스트 또는 원본 텍스트
|
|
"""
|
|
try:
|
|
# 번역 요청
|
|
result = self.translator.translate(text, destination_language="ko") # 한국어로 번역
|
|
return result.result # 번역된 텍스트 반환
|
|
except Exception as e:
|
|
self.logger.error(f"Translation failed for text '{text}': {e}")
|
|
return text # 번역 실패 시 원본 텍스트 반환
|
|
|
|
def _load_exclude_categories(self):
|
|
"""
|
|
filter_cat.ini 파일에서 제외 카테고리를 읽어옵니다.
|
|
"""
|
|
config = configparser.ConfigParser()
|
|
ini_path = os.path.join(self.base_path, "filter_cat.ini")
|
|
|
|
if not os.path.exists(ini_path):
|
|
self.logger.warning(f"filter_cat.ini not found at {ini_path}. No categories will be excluded.")
|
|
return []
|
|
|
|
try:
|
|
config.read(ini_path, encoding='utf-8')
|
|
categories = config.get("FILTER", "exclude_categories", fallback="")
|
|
exclude_list = [cat.strip() for cat in categories.split(",") if cat.strip()]
|
|
self.logger.info(f"Exclude categories loaded: {exclude_list}")
|
|
return exclude_list
|
|
except Exception as e:
|
|
self.logger.error(f"Error reading filter_cat.ini: {e}", exc_info=True)
|
|
return []
|
|
|
|
|
|
def _create_tables(self):
|
|
try:
|
|
with self.conn:
|
|
# products 테이블 생성
|
|
self.conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS products (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT,
|
|
image_url TEXT,
|
|
tag TEXT,
|
|
price INTEGER,
|
|
category TEXT,
|
|
percenty_category TEXT,
|
|
selected_search_result_id INTEGER,
|
|
saved_img_path TEXT, -- 이미지 저장 경로 필드 추가
|
|
FOREIGN KEY(selected_search_result_id) REFERENCES search_results(id)
|
|
)
|
|
''')
|
|
|
|
# search_results 테이블에 고유 ID 추가
|
|
self.conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS search_results (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
product_id INTEGER,
|
|
title TEXT,
|
|
source TEXT,
|
|
price TEXT,
|
|
imgurl TEXT,
|
|
encrypted_url TEXT,
|
|
original_url TEXT,
|
|
saved_img_path TEXT, -- 이미지 저장 경로 필드 추가
|
|
is_selected BOOLEAN DEFAULT 0,
|
|
FOREIGN KEY (product_id) REFERENCES products (id)
|
|
)
|
|
''')
|
|
self.logger.info("Tables created successfully.")
|
|
except Exception as e:
|
|
self.logger.error(f"Error creating tables: {e}", exc_info=True)
|
|
|
|
def insert_products(self, products, price_filter=0):
|
|
"""
|
|
products 데이터를 DB에 저장하기 전 가격 필터링을 적용.
|
|
|
|
:param products: 엑셀에서 읽은 상품 데이터 리스트
|
|
:param price_filter: 가격 필터링 기준값 (기본값: 0, 필터링 없음)
|
|
"""
|
|
product_ids = []
|
|
try:
|
|
with self.conn:
|
|
for product in products:
|
|
|
|
# 가격 필터링
|
|
price = product.get('price', 0)
|
|
if price_filter > 0 and price < price_filter:
|
|
self.logger.debug(f"Product '{product['name']}' skipped due to price filter: {price} < {price_filter}")
|
|
continue
|
|
|
|
# 카테고리 필터링
|
|
percent_category = product.get('percenty_category', '')
|
|
if not self.filter_by_category(percent_category):
|
|
self.logger.debug(f"Product '{product['name']}' skipped due to category filter: {percent_category}")
|
|
continue
|
|
|
|
cursor = self.conn.execute('''
|
|
INSERT INTO products (name, image_url, tag, price, category, percenty_category, saved_img_path)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
''', (product['name'], product['image_url'], product['tag'], product['price'], product['category'], product['percenty_category'], product.get('saved_img_path')))
|
|
product_ids.append(cursor.lastrowid)
|
|
self.logger.debug(f"Inserted product with ID: {cursor.lastrowid}")
|
|
|
|
self.logger.info(f"{len(product_ids)} products inserted into database after filtering.")
|
|
except Exception as e:
|
|
self.logger.error(f"Error inserting products: {e}", exc_info=True)
|
|
return product_ids
|
|
|
|
|
|
def update_product_image_path(self, product_id, image_path):
|
|
"""특정 상품의 saved_img_path 필드만 업데이트"""
|
|
try:
|
|
with self.conn:
|
|
self.conn.execute('''
|
|
UPDATE products
|
|
SET saved_img_path = ?
|
|
WHERE id = ?
|
|
''', (image_path, product_id))
|
|
self.logger.info(f"Updated saved_img_path for Product ID [{product_id}]")
|
|
except Exception as e:
|
|
self.logger.error(f"Error updating saved_img_path for Product ID [{product_id}]: {e}", exc_info=True)
|
|
|
|
def insert_search_results_ori(self, product_id, search_results):
|
|
# 검색 결과 중 상위 5개만 DB에 저장
|
|
try:
|
|
top_results = search_results[:5] # 상위 5개로 제한
|
|
with self.conn:
|
|
self.conn.executemany('''
|
|
INSERT INTO search_results (product_id, title, source, price, imgurl, saved_img_path, encrypted_url, original_url, is_selected)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', [(product_id, result['title'], result['source'], result['price'], result['imgurl'], result['saved_img_path'], result['encrypted_url'], result['original_url'], result['is_selected']) for result in top_results])
|
|
self.logger.info(f"Product ID [{product_id}]: {len(top_results)} search results inserted into database.")
|
|
except Exception as e:
|
|
self.logger.error(f"Error inserting search results for Product ID [{product_id}]: {e}", exc_info=True)
|
|
|
|
def insert_search_results(self, product_id, search_results):
|
|
"""
|
|
검색 결과 중 상위 5개를 DB에 저장하며, title을 번역하여 저장.
|
|
"""
|
|
try:
|
|
top_results = search_results[:5] # 상위 5개로 제한
|
|
with self.conn:
|
|
for result in top_results:
|
|
# title 번역
|
|
translated_title = self.translate_text(result['title'])
|
|
|
|
# 데이터 삽입
|
|
self.conn.execute('''
|
|
INSERT INTO search_results (product_id, title, source, price, imgurl, saved_img_path, encrypted_url, original_url, is_selected)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', (
|
|
product_id,
|
|
translated_title, # 번역된 제목 저장
|
|
result['source'],
|
|
result['price'],
|
|
result['imgurl'],
|
|
result['saved_img_path'],
|
|
result['encrypted_url'],
|
|
result['original_url'],
|
|
result['is_selected']
|
|
))
|
|
self.logger.info(f"Product ID [{product_id}]: {len(top_results)} search results inserted into database.")
|
|
except Exception as e:
|
|
self.logger.error(f"Error inserting search results for Product ID [{product_id}]: {e}", exc_info=True)
|
|
|
|
def select_search_result_for_product(self, product_id, search_result_id):
|
|
try:
|
|
with self.conn:
|
|
# products 테이블의 selected_search_result_id 업데이트
|
|
self.conn.execute('''
|
|
UPDATE products
|
|
SET selected_search_result_id = ?
|
|
WHERE id = ?
|
|
''', (search_result_id, product_id))
|
|
|
|
# search_results 테이블의 선택된 항목 업데이트
|
|
self.conn.execute('''
|
|
UPDATE search_results
|
|
SET is_selected = CASE WHEN id = ? THEN 1 ELSE 0 END
|
|
WHERE product_id = ?
|
|
''', (search_result_id, product_id))
|
|
|
|
self.logger.info(f"Product ID [{product_id}] selected search result ID [{search_result_id}].")
|
|
except Exception as e:
|
|
self.logger.error(f"Error selecting search result for product ID [{product_id}]: {e}", exc_info=True)
|
|
|
|
def get_selected_search_result(self, product_id):
|
|
# products 테이블에서 선택된 search_result_id 가져오기
|
|
try:
|
|
cursor = self.conn.execute('''
|
|
SELECT selected_search_result_id
|
|
FROM products
|
|
WHERE id = ?
|
|
''', (product_id,))
|
|
row = cursor.fetchone()
|
|
if row and row[0]:
|
|
return row[0]
|
|
return None
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching selected search result for product ID [{product_id}]: {e}", exc_info=True)
|
|
return None
|
|
|
|
def count_products(self):
|
|
"""products 테이블의 레코드 개수를 반환합니다."""
|
|
try:
|
|
cursor = self.conn.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM products")
|
|
count = cursor.fetchone()[0]
|
|
self.logger.debug(f"Total products count: {count}")
|
|
return count
|
|
except Exception as e:
|
|
self.logger.error(f"Error counting products: {e}", exc_info=True)
|
|
return 0
|
|
|
|
def get_saved_img_path(self, product_id):
|
|
"""
|
|
products 테이블에서 product_id에 해당하는 saved_img_path를 반환합니다.
|
|
|
|
:param product_id: 대상 product_id
|
|
:return: saved_img_path 또는 None
|
|
"""
|
|
try:
|
|
cursor = self.conn.execute(
|
|
"SELECT saved_img_path FROM products WHERE id = ?", (product_id,)
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return row[0] # saved_img_path 값 반환
|
|
return None
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching saved_img_path for product ID [{product_id}]: {e}", exc_info=True)
|
|
return None
|
|
|
|
def filter_by_category(self, percent_category):
|
|
"""
|
|
카테고리를 기반으로 상품을 필터링합니다.
|
|
products 데이터를 DB에 저장하기 전 가격 및 카테고리 필터링을 적용.
|
|
"""
|
|
if not percent_category or not self.exclude_categories:
|
|
return True # 필터링 조건이 없으면 포함
|
|
|
|
category_without_code = re.sub(r'\[.*?\]', '', percent_category)
|
|
category_parts = category_without_code.split('-')
|
|
|
|
for category in category_parts:
|
|
for exclude_text in self.exclude_categories:
|
|
if exclude_text.strip() in category.strip():
|
|
return False # 제외
|
|
return True # 포함
|
|
|
|
def close(self):
|
|
if self.conn:
|
|
self.conn.close()
|
|
self.logger.info("Database connection closed.")
|