201 lines
8.8 KiB
Python
201 lines
8.8 KiB
Python
import time
|
|
import json
|
|
import re
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
class BaiduImageSearcher:
|
|
def __init__(self, sources=None, logger=None):
|
|
self.filtered_sources = set(sources) if sources else {'淘宝', 'tmall', '1688'}
|
|
self.logger = logger
|
|
self.browser = None
|
|
self.page = None
|
|
self.is_first_search = True # 첫 번째 검색 여부를 추적
|
|
self.logger.info("ImageSearcher initialized.")
|
|
|
|
def start_browser(self):
|
|
# Playwright로 브라우저를 시작하고 최초 접속 URL로 이동
|
|
self.logger.info("start_browser.")
|
|
|
|
self.playwright = sync_playwright().start()
|
|
self.browser = self.playwright.chromium.launch(headless=False)
|
|
self.page = self.browser.new_page()
|
|
|
|
# User-Agent와 추가 헤더 설정
|
|
self.page.set_extra_http_headers({
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36",
|
|
"Referer": "https://www.baidu.com/",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
|
|
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
|
"Accept-Encoding": "gzip, deflate, br",
|
|
"DNT": "1", # Do Not Track 요청 헤더
|
|
"Connection": "keep-alive",
|
|
"Upgrade-Insecure-Requests": "1",
|
|
"Cache-Control": "max-age=0"
|
|
})
|
|
|
|
self.initial_url = 'https://graph.baidu.com/pcpage/index?tpl_from=pc'
|
|
# upload_button_xpath = '//*[@id="app"]/div/div[1]/div[7]/div/span[1]/span[1]'
|
|
|
|
self.logger.info("goto URL")
|
|
# self.page.route("**/*", lambda route, request: route.abort() if request.resource_type in ["image", "stylesheet", "font"] else route.continue_())
|
|
|
|
# # self.page.goto(self.initial_url) # 최초 접속 URL
|
|
# self.goto_initialPage()
|
|
# self.page.wait_for_selector(upload_button_xpath)
|
|
|
|
def goto_initialPage(self):
|
|
self.page.goto(self.initial_url) # 최초 접속 URL
|
|
|
|
def close_browser(self):
|
|
# 브라우저 종료
|
|
if self.browser:
|
|
self.browser.close()
|
|
if self.playwright:
|
|
self.playwright.stop()
|
|
|
|
def check_capcha(self):
|
|
try:
|
|
# 현재 URL 확인 및 로그 출력
|
|
current_url = self.page.evaluate("() => window.location.href")
|
|
self.logger.info(f"Current URL: {current_url}")
|
|
|
|
# 에러 페이지 조건 확인
|
|
if current_url.startswith('https://graph.baidu.com/errpage'):
|
|
self.logger.warning("Error page detected. Navigating back.")
|
|
# self.page.go_back() # 뒤로 가기
|
|
# self.page.goto(self.initial_url) # 최초 접속 URL
|
|
return True
|
|
else:
|
|
return False
|
|
except Exception as e:
|
|
self.logger.error(f"Finding Error page occured ERROR: {e}", exc_info=True)
|
|
return False
|
|
|
|
|
|
def upload_image(self, image_path):
|
|
try:
|
|
if self.check_capcha():
|
|
return False
|
|
|
|
# 첫 번째 검색과 이후 검색의 선택자를 다르게 설정
|
|
# if self.is_first_search:
|
|
# if self.is_first_search:
|
|
# self.logger.info("is_first_search")
|
|
# upload_button_xpath = '//*[@id="app"]/div/div[1]/div[7]/div/span[1]/span[1]'
|
|
# upload_input_xpath = '//*[@id="app"]/div/div[1]/div[7]/div/div/div[2]/div[2]/div/form/input'
|
|
# # self.is_first_search = False # 이후 검색에서는 일반 선택자를 사용
|
|
# else:
|
|
# self.logger.info("another search")
|
|
# upload_button_xpath = '//*[@id="app"]/div/div[1]/div/div[1]/div/div/div[1]/span[1]/span[1]'
|
|
# upload_input_xpath = '//*[@id="app"]/div/div[1]/div/div[1]/div/div/div[1]/div/div[2]/div[2]/div/form/input'
|
|
|
|
upload_button_xpath = '//*[@id="app"]/div/div[1]/div[7]/div/span[1]/span[1]'
|
|
upload_input_xpath = '//*[@id="app"]/div/div[1]/div[7]/div/div/div[2]/div[2]/div/form/input'
|
|
|
|
# 이미지 업로드 버튼 클릭 및 파일 업로드
|
|
self.page.wait_for_selector(upload_button_xpath)
|
|
self.page.click(upload_button_xpath)
|
|
|
|
if self.check_capcha():
|
|
return False
|
|
|
|
self.page.wait_for_selector(upload_input_xpath)
|
|
self.page.set_input_files(upload_input_xpath, image_path)
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Error in upload_image: {e}", exc_info=True)
|
|
|
|
def expand_results(self):
|
|
# 확장 버튼 클릭 및 확장된 상품 카드 요소 로드 대기
|
|
|
|
if self.check_capcha():
|
|
return False
|
|
|
|
self.logger.info("expand_results")
|
|
expand_button_xpath = '//*[@id="app"]/div/div[2]/div/div[1]/ul/li[2]'
|
|
self.page.wait_for_selector(expand_button_xpath)
|
|
self.page.click(expand_button_xpath)
|
|
|
|
if self.check_capcha():
|
|
return False
|
|
|
|
# 확장된 상품 카드 이미지 요소가 로드될 때까지 대기
|
|
self.logger.info("확장된 상품 카드 이미지 요소가 로드될 때까지 대기")
|
|
product_card_selector = 'div.graph-product-list-img img'
|
|
self.page.wait_for_selector(product_card_selector)
|
|
if self.check_capcha():
|
|
return False
|
|
return True
|
|
|
|
def extract_product_data(self):
|
|
# 검색 결과 페이지에서 JSON 데이터 추출
|
|
if self.check_capcha():
|
|
return False
|
|
content = None
|
|
self.logger.info("검색 결과 페이지에서 JSON 데이터 추출")
|
|
content = self.page.content()
|
|
|
|
self.page.go_back() # 뒤로 가기
|
|
|
|
soup = BeautifulSoup(content, 'html.parser')
|
|
script_tag = soup.select_one("html > head > script:nth-of-type(2)")
|
|
|
|
if script_tag:
|
|
self.logger.info("script_tag를 찾음")
|
|
raw_data = script_tag.string.strip()
|
|
match = re.search(r"window\.cardData\s*=\s*(\[\{.*\}\]);", raw_data, re.DOTALL)
|
|
if match:
|
|
json_data_str = match.group(1)
|
|
print(f"[json_data_str]\n{json_data_str}")
|
|
try:
|
|
data = json.loads(json_data_str)
|
|
product_info = []
|
|
|
|
# 필터링된 출처에 따라 데이터 추출
|
|
for card in data:
|
|
if card.get("cardName") == "product":
|
|
products = card["tplData"]["list"]
|
|
for product in products:
|
|
title = product.get("desc", "")
|
|
source = product.get("source", "")
|
|
price = product.get("text", "")
|
|
buyurl = product.get("buyurl", "")
|
|
imgurl = product.get("imgurl", "")
|
|
|
|
# 출처 필터링
|
|
if source in self.filtered_sources:
|
|
original_url = self.get_original_url(buyurl)
|
|
product_info.append({
|
|
"title": title,
|
|
"source": source,
|
|
"price": price,
|
|
"imgurl": imgurl,
|
|
"encrypted_url": buyurl,
|
|
"original_url": original_url
|
|
})
|
|
self.logger.info("product_info 추출 완료")
|
|
self.logger.info(f"추출된 정보 \n{product_info}")
|
|
|
|
return product_info
|
|
except json.JSONDecodeError as e:
|
|
print("JSON 디코딩 오류:", e)
|
|
else:
|
|
print("JSON 데이터를 찾을 수 없습니다.")
|
|
else:
|
|
print("JSON 데이터가 포함된 스크립트를 찾지 못했습니다.")
|
|
return []
|
|
|
|
def get_original_url(self, encrypted_url):
|
|
# 암호화된 구매 링크를 원래 URL로 변환
|
|
try:
|
|
response = requests.get(encrypted_url, headers={
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"
|
|
})
|
|
self.logger.info("get_original_url 변환 완료")
|
|
return response.url if response.status_code == 200 else "변환 실패"
|
|
except requests.RequestException:
|
|
self.logger.info("get_original_url 변환 실패")
|
|
return "변환 실패"
|