tao/modules/tao_parser_ori.py

259 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from PyQt5.QtWidgets import QMessageBox
from bs4 import BeautifulSoup
import os
from time import sleep
import time
from random import randint
from selenium_stealth import stealth
from fake_useragent import UserAgent
from urllib.request import urlretrieve
from modules.cookie_manager import load_cookies, save_cookies, check_login_status
def fetch_and_save_taobao_products(imgurl, item_count=10, sort_order=1):
# 셀레니움 드라이버 설정
options = webdriver.ChromeOptions()
ua = UserAgent()
options.add_argument(f"--user-agent={ua.random}") # 랜덤 user_agent 사용
# # 사용자 에이전트 문자열 목록
# user_agents = [
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36",
# "Mozilla/5.0 (Macintosh; Intel Mac OS X 12.3; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36",
# "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36"
# ]
# options.add_argument("--user-agent=" + user_agents[randint(0, len(user_agents) - 1)])
options.add_argument("--disable-blink-features=AutomationControlled")
# options.add_argument("--headless") # 헤드리스 모드
driver = webdriver.Chrome(options=options)
# selenium-stealth 설정 적용
stealth(driver,
languages=["en-US", "en"],
vendor="Google Inc.",
platform="Win32",
webgl_vendor="Intel Inc.",
renderer="Intel Iris OpenGL Engine",
fix_hairline=True,
)
# driver.get("https://world.taobao.com/wow/tmg-fc/tmw/search_image?")
driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
print("셀레니움 시작")
# # 사용자 에이전트 변경
# headers = {
# "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
# }
# 요청 간 지연 시간 추가
sleep(1) # 1초 대기
# 쿠키 로드 및 적용
load_cookies(driver)
print("지난 쿠키 로드 완료")
#sleep(1) # 1초 대기
driver.refresh() # 쿠키 로드 후 페이지 새로고침
print("페이지 리로드")
# # 로그인 상태 확인
# if not check_login_status(driver):
# # 로그인되지 않았을 경우, 사용자가 로그인할 때까지 기다리거나 로그인 과정 수행
# print("재로그인 하세요!")
# print("로그인 할때까지 무한히 기다림")
# # 로그인할 시간을 주기 위해 명시적으로 기다리는 대신 사용자에게 알림
# WebDriverWait(driver, timeout=None).until(EC.presence_of_element_located((By.CSS_SELECTOR, ".rax-view-v2:nth-child(1) > .rax-view-v2 > .mobile--class-1--2Vz4bM4")))
logged_in_xpath = "/html/body/div[1]/div/ul[1]/li[2]/div[1]/div[2]/a"
# # 로그인 상태 확인 루프
# while True:
# if check_login_status(driver):
# print("로그인 성공!")
# break # 로그인이 확인되면 루프 탈출
# else:
# print("로그인이 확인되지 않았습니다. 재로그인을 시도해주세요.")
# try:
# # 여기서는 로그인 페이지로 이동하는 코드나 로그인을 유도하는 메시지를 표시할 수 있습니다.
# # 예: driver.get("로그인 페이지 URL")
# # 사용자에게 로그인하라는 메세지를 표시하고 싶다면 아래와 같이 처리합니다.
# # driver.execute_script('alert("로그인이 필요합니다. 로그인 페이지로 이동하여 로그인 해주세요.");')
# # 주기적인 확인을 위해 대기
# print("로그인 하세요")
# # QMessageBox.information("알림", "로그인 하세요")
# # sleep(5) # 5초 대기
# if WebDriverWait(driver, 3).until(EC.presence_of_element_located((By.XPATH, logged_in_xpath))):
# user_id_text = driver.find_element(By.XPATH, logged_in_xpath).text
# print(f"로그인된 상태입니다. 로그인된 ID: {user_id_text}")
# except NoSuchElementException:
# print("로그인 요소를 찾을 수 없습니다. 페이지를 확인해주세요.")
# break # 요소를 찾을 수 없는 경우, 루프 탈출
# 로그인 상태 확인 루프
while True:
if check_login_status(driver):
print("로그인 성공!")
driver.get("https://world.taobao.com/wow/tmg-fc/tmw/search_image?")
break # 로그인이 확인되면 루프 탈출
else:
print("로그인이 확인되지 않았습니다. 재로그인을 시도해주세요.")
# driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
# 로그인 필요 알림 및 대기 로직이 `check_login_status` 함수 내에 포함되어 있으므로 여기서 추가적인 조치는 필요 없음
# 로그인 페이지로 리다이렉션하거나 사용자에게 로그인을 유도하는 메시지를 표시할 수 있음
# 예: driver.get("로그인 페이지 URL")
# 예: driver.execute_script('alert("로그인이 필요합니다. 로그인 페이지로 이동하여 로그인 해주세요.");')
# 주기적인 확인을 위해 짧은 대기 시간을 두고 루프를 계속 실행
time.sleep(5) # 재시도 전에 대기
# 로그인 완료 후 쿠키 저장
save_cookies(driver)
print("쿠키저장완료")
# imgurl에서 이미지를 로컬에 저장
local_image_path = "./img/temp_image.jpg"
if not os.path.exists("./img"):
os.makedirs("./img")
urlretrieve(imgurl, local_image_path) # 주어진 imgurl 사용
# JavaScript를 사용하여 이미지 검색 버튼 클릭
search_button_selector = ".component-search-icon-active"
driver.execute_script(f"document.querySelector('{search_button_selector}').click();")
print("이미지검색버튼 클릭")
# 파일 업로드 처리
file_input = WebDriverWait(driver, 60).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "input[type='file']"))
)
file_input.send_keys(os.path.abspath(local_image_path))
max_refresh_attempts = 5
attempts = 0
while attempts < max_refresh_attempts:
try:
# "Sorry" 메시지 확인
sorry_message_xpath = "//span[contains(.,'Sorry没有找到相关的宝贝')]"
WebDriverWait(driver, 5).until(EC.visibility_of_element_located((By.XPATH, sorry_message_xpath)))
print("Sorry 메시지가 감지되었습니다. 페이지를 새로고침합니다.")
attempts += 1
driver.refresh()
time.sleep(3) # 페이지 새로고침 후 잠시 대기
except TimeoutException:
# "Sorry" 메시지가 없는 경우
if check_login_status(driver):
# 로그인 상태 확인 후 캡차 화면 판단 로직 추가
try:
first_product_CSS = ".rax-view-v2:nth-child(1) > .rax-view-v2 > .mobile--class-1--2Vz4bM4"
WebDriverWait(driver, 10).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, first_product_CSS)))
print("첫 번째 상품이 로드되었습니다.")
break # 첫 번째 상품이 로드되면 반복 종료
except TimeoutException:
# 첫 번째 상품이 로드되지 않는 경우, 캡차 화면으로 판단
print("캡차 화면이 감지되었습니다. 사용자가 해결할 때까지 기다립니다.")
QMessageBox.information(None, "캡차 확인", "캡차가 감지되었습니다. 해결 후 확인 버튼을 눌러주세요.")
attempts += 1
# 사용자가 캡차를 해결할 때까지 기다림
while True:
try:
WebDriverWait(driver, 10).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, first_product_CSS)))
print("첫 번째 상품이 로드되었습니다. 캡차 해결됨.")
break
except TimeoutException:
print("캡차가 여전히 해결되지 않았습니다. 다시 확인합니다.")
time.sleep(5) # 사용자가 캡차를 해결할 시간을 줌
break
else:
print("로그인이 필요합니다.")
# 로그인이 필요한 경우의 처리 로직 추가
break
if attempts == max_refresh_attempts:
raise Exception("최대 시도 횟수를 초과했습니다. 첫 번째 상품을 찾을 수 없습니다.")
# # 첫 번째 상품의 URL이 나타날 때까지 기다림
# first_product_url_xpath = "/html/body/div[12]/div/div[2]/div[2]/div[1]/div/a"
# driver.implicitly_wait(10) # 최대 10초간 대기
# print("첫번째 상품이 나올때 까지 10초간 대기")
# # 상품이 모두 로드될 때까지 추가로 기다림
# time.sleep(2) # 2~3초 정도 추가 대기
# 페이지의 HTML을 가져옴
page_source = driver.page_source
print("html파싱")
# 상품 정보를 저장할 리스트 초기화
products = []
# BeautifulSoup 객체 생성
soup = BeautifulSoup(page_source, 'html.parser')
# 상품 정보 추출
for i, product in enumerate(soup.select('a.mobile--class-1--2Vz4bM4'), start=1):
if i > item_count: # 설정한 아이템 갯수에 도달하면 반복 중단
break
try:
product_url = 'https:' + product['href']
image_url = 'https:' + product.select_one("img")['src']
product_name = product.select_one("span.mobile--summary--2mK9e7G").text
price = product.select_one("span.mobile--price--3eMQ3ec").text
sales_volume = product.select_one("span.mobile--buy--2I4hwR4").text
product_info = {
"Product Name": product_name,
"Image URL": image_url,
"Price": price,
"Sales Volume": sales_volume,
"Product URL": product_url,
}
# 상품 정보 DB에 저장 (DB 저장 로직은 생략되어 있음)
# 예: db.save_product_info(product_info)
products.append(product_info)
except Exception as e:
print(f"Error extracting product {i}: {e}")
# 정렬 로직 (가격순, 판매량순 정렬)
if sort_order == 2: # 가격순 정렬
products.sort(key=lambda x: float(x['Price'].strip('¥')))
elif sort_order == 3: # 판매량순 정렬
products.sort(key=lambda x: int(x['Sales Volume'].strip('已售').strip('')), reverse=True)
# 셀레니움 드라이버 종료
driver.quit()
# 상품 정보 반환
return [(product['Product URL'], product['Image URL'], product['Product Name'], product['Price'], product['Sales Volume']) for product in products]
# # 사용 예시
# # db 객체는 예시로 None으로 설정되어 있으며, 실제 DB 객체를 전달해야 함.
# main_keyword = "Example Keyword"
# keyword_id = 123
# product_infos = fetch_and_save_taobao_products(main_keyword, keyword_id, item_count=10, sort_order=1, local_image_path="./img/test_image.jpg", db=None)
# for info in product_infos:
# print(info)