baidu_web/databaseManager.py

280 lines
12 KiB
Python

import sqlite3
# import pandas as pd
import re, os
import configparser
from translatepy import Translator
from translatepy.translators.google import GoogleTranslate
class DatabaseManager:
def __init__(self, base_path, db_path, logger):
self.logger = logger
self.base_path = base_path
self.db_path = db_path
self.conn = sqlite3.connect(db_path, check_same_thread=False) # 스레드 간 공유 허용
self.logger.info("Database connection established.")
self.translator = Translator([GoogleTranslate()]) # 기본 Google 번역기 사용
self.exclude_categories = self._load_exclude_categories()
self._create_tables()
def translate_text(self, text):
"""
텍스트를 번역합니다. 번역 실패 시 원본 텍스트를 반환합니다.
:param text: 번역할 텍스트
:return: 번역된 텍스트 또는 원본 텍스트
"""
try:
# 번역 요청
result = self.translator.translate(text, destination_language="ko") # 한국어로 번역
return result.result # 번역된 텍스트 반환
except Exception as e:
self.logger.error(f"Translation failed for text '{text}': {e}")
return text # 번역 실패 시 원본 텍스트 반환
def _load_exclude_categories(self):
"""
filter_cat.ini 파일에서 제외 카테고리를 읽어옵니다.
"""
config = configparser.ConfigParser()
ini_path = os.path.join(self.base_path, "filter_cat.ini")
if not os.path.exists(ini_path):
self.logger.warning(f"filter_cat.ini not found at {ini_path}. No categories will be excluded.")
return []
try:
config.read(ini_path, encoding='utf-8')
categories = config.get("FILTER", "exclude_categories", fallback="")
exclude_list = [cat.strip() for cat in categories.split(",") if cat.strip()]
self.logger.info(f"Exclude categories loaded: {exclude_list}")
return exclude_list
except Exception as e:
self.logger.error(f"Error reading filter_cat.ini: {e}", exc_info=True)
return []
def _create_tables(self):
try:
with self.conn:
# products 테이블 생성
self.conn.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
image_url TEXT,
tag TEXT,
price INTEGER,
category TEXT,
percenty_category TEXT,
selected_search_result_id INTEGER,
saved_img_path TEXT, -- 이미지 저장 경로 필드 추가
FOREIGN KEY(selected_search_result_id) REFERENCES search_results(id)
)
''')
# search_results 테이블에 고유 ID 추가
self.conn.execute('''
CREATE TABLE IF NOT EXISTS search_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
title TEXT,
source TEXT,
price TEXT,
imgurl TEXT,
encrypted_url TEXT,
original_url TEXT,
saved_img_path TEXT, -- 이미지 저장 경로 필드 추가
is_selected BOOLEAN DEFAULT 0,
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
self.logger.info("Tables created successfully.")
except Exception as e:
self.logger.error(f"Error creating tables: {e}", exc_info=True)
def insert_products(self, products, price_filter=0):
"""
products 데이터를 DB에 저장하기 전 가격 필터링을 적용.
:param products: 엑셀에서 읽은 상품 데이터 리스트
:param price_filter: 가격 필터링 기준값 (기본값: 0, 필터링 없음)
"""
product_ids = []
try:
with self.conn:
for product in products:
# 가격 필터링
price = product.get('price', 0)
if price_filter > 0 and price < price_filter:
self.logger.debug(f"Product '{product['name']}' skipped due to price filter: {price} < {price_filter}")
continue
# 카테고리 필터링
percent_category = product.get('percenty_category', '')
if not self.filter_by_category(percent_category):
self.logger.debug(f"Product '{product['name']}' skipped due to category filter: {percent_category}")
continue
cursor = self.conn.execute('''
INSERT INTO products (name, image_url, tag, price, category, percenty_category, saved_img_path)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (product['name'], product['image_url'], product['tag'], product['price'], product['category'], product['percenty_category'], product.get('saved_img_path')))
product_ids.append(cursor.lastrowid)
self.logger.debug(f"Inserted product with ID: {cursor.lastrowid}")
self.logger.info(f"{len(product_ids)} products inserted into database after filtering.")
except Exception as e:
self.logger.error(f"Error inserting products: {e}", exc_info=True)
return product_ids
def update_product_image_path(self, product_id, image_path):
"""특정 상품의 saved_img_path 필드만 업데이트"""
try:
with self.conn:
self.conn.execute('''
UPDATE products
SET saved_img_path = ?
WHERE id = ?
''', (image_path, product_id))
self.logger.info(f"Updated saved_img_path for Product ID [{product_id}]")
except Exception as e:
self.logger.error(f"Error updating saved_img_path for Product ID [{product_id}]: {e}", exc_info=True)
def insert_search_results_ori(self, product_id, search_results):
# 검색 결과 중 상위 5개만 DB에 저장
try:
top_results = search_results[:5] # 상위 5개로 제한
with self.conn:
self.conn.executemany('''
INSERT INTO search_results (product_id, title, source, price, imgurl, saved_img_path, encrypted_url, original_url, is_selected)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', [(product_id, result['title'], result['source'], result['price'], result['imgurl'], result['saved_img_path'], result['encrypted_url'], result['original_url'], result['is_selected']) for result in top_results])
self.logger.info(f"Product ID [{product_id}]: {len(top_results)} search results inserted into database.")
except Exception as e:
self.logger.error(f"Error inserting search results for Product ID [{product_id}]: {e}", exc_info=True)
def insert_search_results(self, product_id, search_results):
"""
검색 결과 중 상위 5개를 DB에 저장하며, title을 번역하여 저장.
"""
try:
top_results = search_results[:5] # 상위 5개로 제한
with self.conn:
for result in top_results:
# title 번역
translated_title = self.translate_text(result['title'])
# 데이터 삽입
self.conn.execute('''
INSERT INTO search_results (product_id, title, source, price, imgurl, saved_img_path, encrypted_url, original_url, is_selected)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
product_id,
translated_title, # 번역된 제목 저장
result['source'],
result['price'],
result['imgurl'],
result['saved_img_path'],
result['encrypted_url'],
result['original_url'],
result['is_selected']
))
self.logger.info(f"Product ID [{product_id}]: {len(top_results)} search results inserted into database.")
except Exception as e:
self.logger.error(f"Error inserting search results for Product ID [{product_id}]: {e}", exc_info=True)
def select_search_result_for_product(self, product_id, search_result_id):
try:
with self.conn:
# products 테이블의 selected_search_result_id 업데이트
self.conn.execute('''
UPDATE products
SET selected_search_result_id = ?
WHERE id = ?
''', (search_result_id, product_id))
# search_results 테이블의 선택된 항목 업데이트
self.conn.execute('''
UPDATE search_results
SET is_selected = CASE WHEN id = ? THEN 1 ELSE 0 END
WHERE product_id = ?
''', (search_result_id, product_id))
self.logger.info(f"Product ID [{product_id}] selected search result ID [{search_result_id}].")
except Exception as e:
self.logger.error(f"Error selecting search result for product ID [{product_id}]: {e}", exc_info=True)
def get_selected_search_result(self, product_id):
# products 테이블에서 선택된 search_result_id 가져오기
try:
cursor = self.conn.execute('''
SELECT selected_search_result_id
FROM products
WHERE id = ?
''', (product_id,))
row = cursor.fetchone()
if row and row[0]:
return row[0]
return None
except Exception as e:
self.logger.error(f"Error fetching selected search result for product ID [{product_id}]: {e}", exc_info=True)
return None
def count_products(self):
"""products 테이블의 레코드 개수를 반환합니다."""
try:
cursor = self.conn.cursor()
cursor.execute("SELECT COUNT(*) FROM products")
count = cursor.fetchone()[0]
self.logger.debug(f"Total products count: {count}")
return count
except Exception as e:
self.logger.error(f"Error counting products: {e}", exc_info=True)
return 0
def get_saved_img_path(self, product_id):
"""
products 테이블에서 product_id에 해당하는 saved_img_path를 반환합니다.
:param product_id: 대상 product_id
:return: saved_img_path 또는 None
"""
try:
cursor = self.conn.execute(
"SELECT saved_img_path FROM products WHERE id = ?", (product_id,)
)
row = cursor.fetchone()
if row:
return row[0] # saved_img_path 값 반환
return None
except Exception as e:
self.logger.error(f"Error fetching saved_img_path for product ID [{product_id}]: {e}", exc_info=True)
return None
def filter_by_category(self, percent_category):
"""
카테고리를 기반으로 상품을 필터링합니다.
products 데이터를 DB에 저장하기 전 가격 및 카테고리 필터링을 적용.
"""
if not percent_category or not self.exclude_categories:
return True # 필터링 조건이 없으면 포함
category_without_code = re.sub(r'\[.*?\]', '', percent_category)
category_parts = category_without_code.split('-')
for category in category_parts:
for exclude_text in self.exclude_categories:
if exclude_text.strip() in category.strip():
return False # 제외
return True # 포함
def close(self):
if self.conn:
self.conn.close()
self.logger.info("Database connection closed.")