159 lines
7.5 KiB
Python
159 lines
7.5 KiB
Python
import os, time
|
|
from xlsReader import ExcelReader
|
|
from databaseManager import DatabaseManager
|
|
from imageDownloader import ImageDownloader
|
|
from imgSearcher import BaiduImageSearcher
|
|
from resultDiag import ProductViewer
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
|
|
class MainProcessor:
|
|
def __init__(self, excel_folder, db_path, img_folder, logger):
|
|
self.logger = logger
|
|
self.excel_reader = ExcelReader(excel_folder, logger)
|
|
self.db_manager = DatabaseManager(db_path, logger)
|
|
self.logger.debug("1")
|
|
self.resultViewer = ProductViewer(db_path)
|
|
self.logger.debug("2")
|
|
self.image_downloader = ImageDownloader(img_folder, logger)
|
|
self.image_searcher = BaiduImageSearcher(sources=['淘宝', 'tmall', '1688'], image_downloader=self.image_downloader, logger=logger)
|
|
self.logger.info("MainProcessor initialized.")
|
|
|
|
def clean_up_files(self):
|
|
img_folder = os.path.join(os.getcwd(), 'img')
|
|
xls_folder = os.path.join(os.getcwd(), 'xls')
|
|
|
|
for folder in [img_folder, xls_folder]:
|
|
for filename in os.listdir(folder):
|
|
file_path = os.path.join(folder, filename)
|
|
try:
|
|
if os.path.isfile(file_path):
|
|
os.remove(file_path)
|
|
self.logger.info(f"Deleted file: {file_path}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error deleting file {file_path}: {e}")
|
|
|
|
def process_all_products(self):
|
|
# 모든 엑셀 파일을 읽어와 DB에 저장
|
|
products = self.excel_reader.read_excel_files()
|
|
product_ids = self.db_manager.insert_products(products)
|
|
|
|
self.image_searcher.start_browser()
|
|
|
|
# 각 상품에 대해 이미지 검색 수행
|
|
for product, product_id in zip(products, product_ids):
|
|
try:
|
|
if product_id is None:
|
|
self.logger.error("Failed to insert product into database.")
|
|
continue
|
|
|
|
# 이미지 다운로드 및 검색 실행
|
|
image_path = self.image_downloader.download_image(product['image_url'], product_id)
|
|
product['saved_img_path'] = image_path # 이미지 경로 설정
|
|
# self.db_manager.insert_products([product])
|
|
self.db_manager.update_product_image_path(product_id, image_path)
|
|
|
|
# 재시도 횟수 설정
|
|
max_retries = 3
|
|
attempt = 0
|
|
is_success_upload_image = False
|
|
is_success_expand_results = False
|
|
search_results = None
|
|
|
|
while attempt < max_retries:
|
|
# upload_image 메서드 실행 및 성공 여부 확인
|
|
self.image_searcher.goto_initialPage()
|
|
self.logger.debug(f"검색페이지로 가기")
|
|
|
|
is_success_upload_image = self.image_searcher.upload_image(image_path)
|
|
if not is_success_upload_image:
|
|
attempt += 1
|
|
self.logger.warning(f"Upload image failed for Product ID [{product_id}]. Retry {attempt}/{max_retries}")
|
|
time.sleep(1)
|
|
continue # 재시도 시 루프를 다시 시작
|
|
|
|
# expand_results 메서드 실행 및 성공 여부 확인
|
|
is_success_expand_results = self.image_searcher.expand_results()
|
|
if not is_success_expand_results:
|
|
attempt += 1
|
|
self.logger.warning(f"Expand results failed for Product ID [{product_id}]. Retry {attempt}/{max_retries}")
|
|
time.sleep(1)
|
|
continue # 재시도 시 루프를 다시 시작
|
|
|
|
# 검색 결과 추출
|
|
search_results = self.image_searcher.extract_product_data(product_id)
|
|
# if search_results == []: # 빈 리스트일 경우만 실패로 간주
|
|
# attempt += 1
|
|
# self.logger.warning(f"Extract product data failed for Product ID [{product_id}]. Retry {attempt}/{max_retries}")
|
|
# time.sleep(1)
|
|
# continue # 재시도 시 루프를 다시 시작
|
|
|
|
# 모든 작업이 성공하면 루프 종료
|
|
if is_success_upload_image and is_success_expand_results:
|
|
break
|
|
else:
|
|
# 재시도 횟수 초과 시 경고 로그 출력 및 다음 제품으로 이동
|
|
self.logger.error(f"Failed to process Product ID [{product_id}] after {max_retries} attempts.")
|
|
continue
|
|
|
|
# 성공 시 검색 결과를 DB에 저장
|
|
self.logger.debug(f"Insert DB: {product_id}")
|
|
|
|
self.db_manager.insert_search_results(product_id, search_results)
|
|
# input("로그를 확인한 후 아무 키나 눌러서 계속하세요...")
|
|
|
|
# os.remove(image_path)
|
|
self.logger.debug(f"Processed product ID: {product_id}")
|
|
time.sleep(1)
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to process product ID: {product_id} - {e}", exc_info=True)
|
|
|
|
def show_results(self):
|
|
# 검색 결과 출력
|
|
try:
|
|
self.resultViewer.show()
|
|
self.logger.debug(f"show_results Completed")
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to show_results - {e}", exc_info=True)
|
|
|
|
|
|
def export_to_xls(self):
|
|
# 현재 날짜와 시간을 사용하여 파일 이름 생성
|
|
date_str = datetime.now().strftime('%Y%m%d')
|
|
file_index = 1
|
|
|
|
try:
|
|
with self.db_manager.conn as conn:
|
|
cursor = conn.execute('''
|
|
SELECT s.original_url, p.name, p.tag, p.percenty_category
|
|
FROM products p
|
|
JOIN search_results s ON p.id = s.product_id
|
|
''')
|
|
|
|
data = cursor.fetchall()
|
|
|
|
# 50개씩 데이터를 나누어 출력
|
|
for i in range(0, len(data), 50):
|
|
# 50개 데이터를 추출하여 DataFrame으로 변환
|
|
chunk = data[i:i + 50]
|
|
df = pd.DataFrame(chunk, columns=["original_url", "name", "tag", "percenty_category"])
|
|
|
|
# 엑셀 파일 생성
|
|
excel_filename = f"출력데이터_{date_str}_{file_index}.xlsx"
|
|
with pd.ExcelWriter(excel_filename) as writer:
|
|
df.to_excel(writer, index=False, startrow=3, startcol=1, sheet_name="multi_ss")
|
|
|
|
# 셀 배치
|
|
worksheet = writer.sheets['multi_ss']
|
|
for row_idx, row_data in enumerate(chunk, start=4):
|
|
worksheet[f"B{row_idx}"] = row_data[0] # original_url
|
|
worksheet[f"C{row_idx}"] = row_data[1] # name
|
|
worksheet[f"F{row_idx}"] = row_data[2] # tag
|
|
worksheet[f"G{row_idx}"] = row_data[3] # percent_category
|
|
|
|
self.logger.info(f"{excel_filename} 파일에 데이터 50개 저장 완료.")
|
|
file_index += 1
|
|
except Exception as e:
|
|
self.logger.error(f"Error exporting to Excel: {e}", exc_info=True)
|