259 lines
12 KiB
Python
259 lines
12 KiB
Python
from selenium import webdriver
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from selenium.common.exceptions import TimeoutException, NoSuchElementException
|
||
from PyQt5.QtWidgets import QMessageBox
|
||
from bs4 import BeautifulSoup
|
||
import os
|
||
from time import sleep
|
||
import time
|
||
from random import randint
|
||
from selenium_stealth import stealth
|
||
from fake_useragent import UserAgent
|
||
|
||
|
||
from urllib.request import urlretrieve
|
||
from modules.cookie_manager import load_cookies, save_cookies, check_login_status
|
||
|
||
|
||
def fetch_and_save_taobao_products(imgurl, item_count=10, sort_order=1):
|
||
# 셀레니움 드라이버 설정
|
||
options = webdriver.ChromeOptions()
|
||
|
||
ua = UserAgent()
|
||
options.add_argument(f"--user-agent={ua.random}") # 랜덤 user_agent 사용
|
||
|
||
# # 사용자 에이전트 문자열 목록
|
||
# user_agents = [
|
||
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36",
|
||
# "Mozilla/5.0 (Macintosh; Intel Mac OS X 12.3; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36",
|
||
# "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36"
|
||
# ]
|
||
# options.add_argument("--user-agent=" + user_agents[randint(0, len(user_agents) - 1)])
|
||
|
||
|
||
options.add_argument("--disable-blink-features=AutomationControlled")
|
||
|
||
# options.add_argument("--headless") # 헤드리스 모드
|
||
driver = webdriver.Chrome(options=options)
|
||
|
||
# selenium-stealth 설정 적용
|
||
stealth(driver,
|
||
languages=["en-US", "en"],
|
||
vendor="Google Inc.",
|
||
platform="Win32",
|
||
webgl_vendor="Intel Inc.",
|
||
renderer="Intel Iris OpenGL Engine",
|
||
fix_hairline=True,
|
||
)
|
||
# driver.get("https://world.taobao.com/wow/tmg-fc/tmw/search_image?")
|
||
driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
|
||
|
||
|
||
print("셀레니움 시작")
|
||
# # 사용자 에이전트 변경
|
||
# headers = {
|
||
# "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
|
||
# }
|
||
|
||
# 요청 간 지연 시간 추가
|
||
sleep(1) # 1초 대기
|
||
|
||
# 쿠키 로드 및 적용
|
||
load_cookies(driver)
|
||
print("지난 쿠키 로드 완료")
|
||
|
||
#sleep(1) # 1초 대기
|
||
driver.refresh() # 쿠키 로드 후 페이지 새로고침
|
||
print("페이지 리로드")
|
||
|
||
# # 로그인 상태 확인
|
||
# if not check_login_status(driver):
|
||
# # 로그인되지 않았을 경우, 사용자가 로그인할 때까지 기다리거나 로그인 과정 수행
|
||
# print("재로그인 하세요!")
|
||
# print("로그인 할때까지 무한히 기다림")
|
||
# # 로그인할 시간을 주기 위해 명시적으로 기다리는 대신 사용자에게 알림
|
||
# WebDriverWait(driver, timeout=None).until(EC.presence_of_element_located((By.CSS_SELECTOR, ".rax-view-v2:nth-child(1) > .rax-view-v2 > .mobile--class-1--2Vz4bM4")))
|
||
|
||
logged_in_xpath = "/html/body/div[1]/div/ul[1]/li[2]/div[1]/div[2]/a"
|
||
|
||
# # 로그인 상태 확인 루프
|
||
# while True:
|
||
# if check_login_status(driver):
|
||
# print("로그인 성공!")
|
||
# break # 로그인이 확인되면 루프 탈출
|
||
# else:
|
||
# print("로그인이 확인되지 않았습니다. 재로그인을 시도해주세요.")
|
||
# try:
|
||
# # 여기서는 로그인 페이지로 이동하는 코드나 로그인을 유도하는 메시지를 표시할 수 있습니다.
|
||
# # 예: driver.get("로그인 페이지 URL")
|
||
# # 사용자에게 로그인하라는 메세지를 표시하고 싶다면 아래와 같이 처리합니다.
|
||
# # driver.execute_script('alert("로그인이 필요합니다. 로그인 페이지로 이동하여 로그인 해주세요.");')
|
||
|
||
# # 주기적인 확인을 위해 대기
|
||
# print("로그인 하세요")
|
||
# # QMessageBox.information("알림", "로그인 하세요")
|
||
# # sleep(5) # 5초 대기
|
||
# if WebDriverWait(driver, 3).until(EC.presence_of_element_located((By.XPATH, logged_in_xpath))):
|
||
# user_id_text = driver.find_element(By.XPATH, logged_in_xpath).text
|
||
# print(f"로그인된 상태입니다. 로그인된 ID: {user_id_text}")
|
||
# except NoSuchElementException:
|
||
# print("로그인 요소를 찾을 수 없습니다. 페이지를 확인해주세요.")
|
||
# break # 요소를 찾을 수 없는 경우, 루프 탈출
|
||
|
||
|
||
# 로그인 상태 확인 루프
|
||
|
||
while True:
|
||
if check_login_status(driver):
|
||
print("로그인 성공!")
|
||
driver.get("https://world.taobao.com/wow/tmg-fc/tmw/search_image?")
|
||
|
||
break # 로그인이 확인되면 루프 탈출
|
||
else:
|
||
print("로그인이 확인되지 않았습니다. 재로그인을 시도해주세요.")
|
||
# driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
|
||
|
||
# 로그인 필요 알림 및 대기 로직이 `check_login_status` 함수 내에 포함되어 있으므로 여기서 추가적인 조치는 필요 없음
|
||
# 로그인 페이지로 리다이렉션하거나 사용자에게 로그인을 유도하는 메시지를 표시할 수 있음
|
||
# 예: driver.get("로그인 페이지 URL")
|
||
# 예: driver.execute_script('alert("로그인이 필요합니다. 로그인 페이지로 이동하여 로그인 해주세요.");')
|
||
|
||
# 주기적인 확인을 위해 짧은 대기 시간을 두고 루프를 계속 실행
|
||
time.sleep(5) # 재시도 전에 대기
|
||
|
||
|
||
# 로그인 완료 후 쿠키 저장
|
||
save_cookies(driver)
|
||
print("쿠키저장완료")
|
||
|
||
# imgurl에서 이미지를 로컬에 저장
|
||
local_image_path = "./img/temp_image.jpg"
|
||
if not os.path.exists("./img"):
|
||
os.makedirs("./img")
|
||
urlretrieve(imgurl, local_image_path) # 주어진 imgurl 사용
|
||
|
||
# JavaScript를 사용하여 이미지 검색 버튼 클릭
|
||
search_button_selector = ".component-search-icon-active"
|
||
driver.execute_script(f"document.querySelector('{search_button_selector}').click();")
|
||
print("이미지검색버튼 클릭")
|
||
|
||
# 파일 업로드 처리
|
||
file_input = WebDriverWait(driver, 60).until(
|
||
EC.presence_of_element_located((By.CSS_SELECTOR, "input[type='file']"))
|
||
)
|
||
file_input.send_keys(os.path.abspath(local_image_path))
|
||
|
||
|
||
max_refresh_attempts = 5
|
||
attempts = 0
|
||
|
||
while attempts < max_refresh_attempts:
|
||
try:
|
||
# "Sorry" 메시지 확인
|
||
sorry_message_xpath = "//span[contains(.,'Sorry,没有找到相关的宝贝!!')]"
|
||
WebDriverWait(driver, 5).until(EC.visibility_of_element_located((By.XPATH, sorry_message_xpath)))
|
||
print("Sorry 메시지가 감지되었습니다. 페이지를 새로고침합니다.")
|
||
attempts += 1
|
||
driver.refresh()
|
||
time.sleep(3) # 페이지 새로고침 후 잠시 대기
|
||
except TimeoutException:
|
||
# "Sorry" 메시지가 없는 경우
|
||
if check_login_status(driver):
|
||
# 로그인 상태 확인 후 캡차 화면 판단 로직 추가
|
||
try:
|
||
first_product_CSS = ".rax-view-v2:nth-child(1) > .rax-view-v2 > .mobile--class-1--2Vz4bM4"
|
||
WebDriverWait(driver, 10).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, first_product_CSS)))
|
||
print("첫 번째 상품이 로드되었습니다.")
|
||
break # 첫 번째 상품이 로드되면 반복 종료
|
||
except TimeoutException:
|
||
# 첫 번째 상품이 로드되지 않는 경우, 캡차 화면으로 판단
|
||
print("캡차 화면이 감지되었습니다. 사용자가 해결할 때까지 기다립니다.")
|
||
QMessageBox.information(None, "캡차 확인", "캡차가 감지되었습니다. 해결 후 확인 버튼을 눌러주세요.")
|
||
attempts += 1
|
||
# 사용자가 캡차를 해결할 때까지 기다림
|
||
while True:
|
||
try:
|
||
WebDriverWait(driver, 10).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, first_product_CSS)))
|
||
print("첫 번째 상품이 로드되었습니다. 캡차 해결됨.")
|
||
break
|
||
except TimeoutException:
|
||
print("캡차가 여전히 해결되지 않았습니다. 다시 확인합니다.")
|
||
time.sleep(5) # 사용자가 캡차를 해결할 시간을 줌
|
||
break
|
||
else:
|
||
print("로그인이 필요합니다.")
|
||
# 로그인이 필요한 경우의 처리 로직 추가
|
||
break
|
||
|
||
if attempts == max_refresh_attempts:
|
||
raise Exception("최대 시도 횟수를 초과했습니다. 첫 번째 상품을 찾을 수 없습니다.")
|
||
|
||
|
||
# # 첫 번째 상품의 URL이 나타날 때까지 기다림
|
||
# first_product_url_xpath = "/html/body/div[12]/div/div[2]/div[2]/div[1]/div/a"
|
||
# driver.implicitly_wait(10) # 최대 10초간 대기
|
||
# print("첫번째 상품이 나올때 까지 10초간 대기")
|
||
|
||
# # 상품이 모두 로드될 때까지 추가로 기다림
|
||
# time.sleep(2) # 2~3초 정도 추가 대기
|
||
|
||
# 페이지의 HTML을 가져옴
|
||
page_source = driver.page_source
|
||
print("html파싱")
|
||
# 상품 정보를 저장할 리스트 초기화
|
||
products = []
|
||
|
||
# BeautifulSoup 객체 생성
|
||
soup = BeautifulSoup(page_source, 'html.parser')
|
||
|
||
# 상품 정보 추출
|
||
for i, product in enumerate(soup.select('a.mobile--class-1--2Vz4bM4'), start=1):
|
||
if i > item_count: # 설정한 아이템 갯수에 도달하면 반복 중단
|
||
break
|
||
try:
|
||
product_url = 'https:' + product['href']
|
||
image_url = 'https:' + product.select_one("img")['src']
|
||
product_name = product.select_one("span.mobile--summary--2mK9e7G").text
|
||
price = product.select_one("span.mobile--price--3eMQ3ec").text
|
||
sales_volume = product.select_one("span.mobile--buy--2I4hwR4").text
|
||
|
||
product_info = {
|
||
"Product Name": product_name,
|
||
"Image URL": image_url,
|
||
"Price": price,
|
||
"Sales Volume": sales_volume,
|
||
"Product URL": product_url,
|
||
}
|
||
|
||
# 상품 정보 DB에 저장 (DB 저장 로직은 생략되어 있음)
|
||
# 예: db.save_product_info(product_info)
|
||
|
||
products.append(product_info)
|
||
except Exception as e:
|
||
print(f"Error extracting product {i}: {e}")
|
||
|
||
# 정렬 로직 (가격순, 판매량순 정렬)
|
||
if sort_order == 2: # 가격순 정렬
|
||
products.sort(key=lambda x: float(x['Price'].strip('¥')))
|
||
elif sort_order == 3: # 판매량순 정렬
|
||
products.sort(key=lambda x: int(x['Sales Volume'].strip('已售').strip('件')), reverse=True)
|
||
|
||
# 셀레니움 드라이버 종료
|
||
driver.quit()
|
||
|
||
# 상품 정보 반환
|
||
return [(product['Product URL'], product['Image URL'], product['Product Name'], product['Price'], product['Sales Volume']) for product in products]
|
||
|
||
|
||
|
||
# # 사용 예시
|
||
# # db 객체는 예시로 None으로 설정되어 있으며, 실제 DB 객체를 전달해야 함.
|
||
# main_keyword = "Example Keyword"
|
||
# keyword_id = 123
|
||
# product_infos = fetch_and_save_taobao_products(main_keyword, keyword_id, item_count=10, sort_order=1, local_image_path="./img/test_image.jpg", db=None)
|
||
# for info in product_infos:
|
||
# print(info)
|
||
|
||
|