baidu_web/imgSearcher.py

362 lines
16 KiB
Python

import time
import json
import re
import requests
from bs4 import BeautifulSoup
from playwright.sync_api import sync_playwright
from PIL import Image, ImageOps
from skimage.metrics import structural_similarity as ssim
import numpy as np
import sys, os, random
class BaiduImageSearcher:
def __init__(self, base_path, sources=None, image_downloader=None, db_manager=None, logger=None):
self.base_path = base_path
self.filtered_sources = set(sources) if sources else {'淘宝', 'tmall', '1688'}
self.image_downloader = image_downloader
self.db_manager = db_manager
self.logger = logger
self.browser = None
self.page = None
self.is_first_search = True # 첫 번째 검색 여부를 추적
self.logger.info("ImageSearcher initialized.")
def start_browser(self):
# Playwright로 브라우저를 시작하고 최초 접속 URL로 이동
self.logger.info("start_browser.")
self.playwright = sync_playwright().start()
browser_path = os.path.join(self.base_path, 'src', 'browsers', 'chromium-1112', 'chrome-win','chrome.exe')
browser_webkit_path = os.path.join(self.base_path, 'src', 'browsers', 'webkit-2083', 'Playwright.exe')
user_data_dir = os.path.join(self.base_path, 'src', 'browsers', 'user_data')
self.logger.debug(f"브라우저 경로: {browser_path}")
self.logger.debug(f"사용자 폴더 경로: {user_data_dir}")
# 사용자 데이터 디렉토리가 존재하지 않으면 생성
if not os.path.exists(user_data_dir):
os.makedirs(user_data_dir)
self.logger.debug(f"{user_data_dir} 디렉토리가 생성되었습니다.")
# User agent 설정
user_agent = random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
])
self.logger.debug(f"user_agent: {user_agent}")
# 브라우저 시작 및 설정
self.browser = self.playwright.chromium.launch_persistent_context(
user_data_dir,
headless=True,
permissions=["geolocation", "notifications"],
geolocation={"latitude": 37.5665, "longitude": 126.9780},
locale="ko-KR",
args=[
'--disable-popup-blocking',
'--start-maximized',
'--window-size=1920,1080'
],
executable_path=browser_path,
user_agent=user_agent
)
# self.browser = self.playwright.chromium.launch(headless=True)
self.page = self.browser.new_page()
# 첫 번째 기본 탭 닫기
if self.browser.pages:
self.browser.pages[0].close()
# User-Agent와 추가 헤더 설정
self.page.set_extra_http_headers({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36",
"Referer": "https://www.baidu.com/",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1", # Do Not Track 요청 헤더
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Cache-Control": "max-age=0"
})
self.initial_url = 'https://graph.baidu.com/pcpage/index?tpl_from=pc'
# upload_button_xpath = '//*[@id="app"]/div/div[1]/div[7]/div/span[1]/span[1]'
self.logger.info("Start Complete B")
# self.page.route("**/*", lambda route, request: route.abort() if request.resource_type in ["image", "stylesheet", "font"] else route.continue_())
# # self.page.goto(self.initial_url) # 최초 접속 URL
# self.goto_initialPage()
# self.page.wait_for_selector(upload_button_xpath)
def goto_initialPage(self):
self.page.goto(self.initial_url) # 최초 접속 URL
def close_browser(self):
# 브라우저 종료
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
def check_capcha(self):
try:
# 현재 URL 확인 및 로그 출력
current_url = self.page.evaluate("() => window.location.href")
self.logger.info(f"Current URL: {current_url}")
# 에러 페이지 조건 확인
if current_url.startswith('https://graph.baidu.com/errpage'):
self.logger.warning("Error page detected. Navigating back.")
# self.page.go_back() # 뒤로 가기
# self.page.goto(self.initial_url) # 최초 접속 URL
return True
else:
return False
except Exception as e:
self.logger.error(f"Finding Error page occured ERROR: {e}", exc_info=True)
return False
def upload_image(self, image_path):
try:
if self.check_capcha():
return False
# 첫 번째 검색과 이후 검색의 선택자를 다르게 설정
# if self.is_first_search:
# if self.is_first_search:
self.logger.info("is_first_search")
upload_button_xpath = '//*[@id="app"]/div/div[1]/div[7]/div/span[1]/span[1]'
upload_input_xpath = '//*[@id="app"]/div/div[1]/div[7]/div/div/div[2]/div[2]/div/form/input'
self.is_first_search = False # 이후 검색에서는 일반 선택자를 사용
# else:
# self.logger.info("another search")
# upload_button_xpath = '//*[@id="app"]/div/div[1]/div/div[1]/div/div/div[1]/span[1]/span[1]'
# upload_input_xpath = '//*[@id="app"]/div/div[1]/div/div[1]/div/div/div[1]/div/div[2]/div[2]/div/form/input'
# 이미지 업로드 버튼 클릭 및 파일 업로드
self.page.wait_for_selector(upload_button_xpath)
self.page.click(upload_button_xpath)
if self.check_capcha():
return False
self.page.wait_for_selector(upload_input_xpath)
self.page.set_input_files(upload_input_xpath, image_path)
return True
except Exception as e:
self.logger.error(f"Error in upload_image: {e}", exc_info=True)
def expand_results(self):
# 확장 버튼 클릭 및 확장된 상품 카드 요소 로드 대기
if self.check_capcha():
return False
self.logger.info("expand_results")
expand_button_xpath = '//*[@id="app"]/div/div[2]/div/div[1]/ul/li[2]'
self.page.wait_for_selector(expand_button_xpath)
self.page.click(expand_button_xpath)
if self.check_capcha():
return False
# 확장된 상품 카드 이미지 요소가 로드될 때까지 대기
self.logger.info("확장된 상품 카드 이미지 요소가 로드될 때까지 대기")
product_card_selector = 'div.graph-product-list-img img'
self.page.wait_for_selector(product_card_selector)
if self.check_capcha():
return False
return True
def extract_product_data(self, product_id):
# 검색 결과 페이지에서 JSON 데이터 추출
if self.check_capcha():
return False
content = None
self.logger.info("검색 결과 페이지에서 JSON 데이터 추출")
content = self.page.content()
soup = BeautifulSoup(content, 'html.parser')
script_tag = soup.select_one("html > head > script:nth-of-type(2)")
if script_tag:
self.logger.info("script_tag를 찾음")
raw_data = script_tag.string.strip()
match = re.search(r"window\.cardData\s*=\s*(\[\{.*\}\]);", raw_data, re.DOTALL)
if match:
json_data_str = match.group(1)
# self.logger.debug(f"[json_data_str]\n{json_data_str}")
try:
data = json.loads(json_data_str)
product_info = []
# 필터링된 출처에 따라 데이터 추출
for idx, card in enumerate(data):
if card.get("cardName") == "product":
products = card["tplData"]["list"]
# 출처 필터링
filtered_products = [
product for product in products if product.get("source", "") in self.filtered_sources
]
# 필터링 후 남은 데이터가 없을 경우 처리 건너뜀
if not filtered_products:
self.logger.warning(f"No products left after filtering by source for card index {idx}. Skipping this card.")
continue
# 가격 기준으로 정렬
try:
products = sorted(filtered_products, key=lambda x: float(x.get("text", "").replace("", "").replace(",", "")))
print(f"낮은 가격순 정렬")
except ValueError as e:
self.logger.error(f"Error sorting products by price: {e}")
# 이미지 다운로드 및 saved_img_path 설정
for product in products:
imgurl = product.get("imgurl", "")
product["saved_img_path"] = self.image_downloader.download_image_for_searchResult(
imgurl, product_id=product_id, index=products.index(product)
)
# 1번상품부터 우선 is_selected = 1로 설정
products[0]["is_selected"] = 1
print(f"first products is_selected : 1")
# # 최유사 최저가 상품 판별
# original_img_path = self.db_manager.get_saved_img_path(product_id)
# most_similar_product = self.find_most_similar_product(original_img_path, products)
# # 최유사 상품의 is_selected 설정
# if most_similar_product:
# most_similar_product["is_selected"] = 1
# print(f"most_similar_product : {most_similar_product}")
print(f"{idx}/{len(products)} Processed products")
# 상품 정보를 product_info에 추가
for product in products:
title = product.get("desc", "")
source = product.get("source", "")
price = product.get("text", "")
buyurl = product.get("buyurl", "")
imgurl = product.get("imgurl", "")
saved_img_path = product.get("saved_img_path", "")
is_selected = product.get("is_selected", 0)
# if most_similar_product:
# is_selected = most_similar_product.get("is_selected", 0)
# print(f"is_selected : {is_selected}")
# else:
# is_selected = 0
# 출처 필터링
if source in self.filtered_sources:
original_url = self.get_original_url(buyurl)
product_info.append({
"title": title,
"source": source,
"price": price,
"imgurl": imgurl,
"saved_img_path": saved_img_path,
"encrypted_url": buyurl,
"original_url": original_url,
"is_selected": is_selected
})
self.logger.info("product_info 추출 완료")
self.logger.info(f"{product_info}")
return product_info
except json.JSONDecodeError as e:
self.logger.debug(f"JSON 디코딩 오류: {e}", exc_info=True)
except Exception as e:
self.logger.error(f"extract_product_data 오류: {e}", exc_info=True)
return []
def get_original_url(self, encrypted_url):
# 암호화된 구매 링크를 원래 URL로 변환
try:
response = requests.get(encrypted_url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"
})
self.logger.info("get_original_url 변환 완료")
return response.url if response.status_code == 200 else "변환 실패"
except requests.RequestException:
self.logger.info("get_original_url 변환 실패")
return "변환 실패"
def find_most_similar_product(self, original_img_path, products):
"""
원본 이미지와 유사성이 0.7 이상인 제품 중 최저가 상품을 선택합니다.
:param original_img_path: 원본 이미지 경로
:param products: 상품 데이터 리스트
:return: 선택된 상품 (dict) 또는 None
"""
min_price = float('inf')
most_similar_product = None
try:
# 원본 이미지 불러오기
original_img = Image.open(original_img_path).convert("L") # 흑백 변환
for product in products:
try:
# 저장된 이미지 경로 가져오기
saved_img_path = product.get("saved_img_path")
if not saved_img_path:
self.logger.debug("No saved image path for product.")
continue
# 저장된 이미지 불러오기
saved_img = Image.open(saved_img_path).convert("L") # 흑백 변환
# 이미지 크기 조정
if original_img.size != saved_img.size:
saved_img = ImageOps.fit(
saved_img, original_img.size, method=Image.Resampling.LANCZOS
)
# 이미지 유사도 계산
original_arr = np.array(original_img)
saved_arr = np.array(saved_img)
similarity, _ = ssim(original_arr, saved_arr, full=True)
if similarity >= 0.2: # 유사도가 0.7 이상일 경우
price_str = product.get("text", "").replace("", "").replace(",", "")
price = float(price_str)
if price < min_price:
min_price = price
most_similar_product = product
except Exception as e:
self.logger.debug(f"Error processing product image: {e}", exc_info=True)
continue
if most_similar_product:
# 모든 상품의 is_selected 값을 초기화
for product in products:
product["is_selected"] = 0
# 최종적으로 선택된 상품에 is_selected = 1 설정
if most_similar_product:
most_similar_product["is_selected"] = 1
except Exception as e:
self.logger.debug(f"Error loading original image: {e}", exc_info=True)
return most_similar_product