tao/modules/tao_parser-ChoisEnvy.py

384 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from PyQt5.QtWidgets import QMessageBox
from bs4 import BeautifulSoup
import os, re
from time import sleep
import time
from random import randint
from selenium_stealth import stealth
from fake_useragent import UserAgent
from PIL import Image
import imagehash
import requests
from io import BytesIO
from googletrans import Translator
from urllib.request import urlretrieve
from modules.cookie_manager import load_cookies, save_cookies, check_login_status
from modules.compare_with_cv2 import compare_images
import logging
from logging.handlers import RotatingFileHandler
# 로그 설정
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# RotatingFileHandler를 사용하여 로그 파일 설정
log_handler = RotatingFileHandler(filename='tao_parser.log', maxBytes=10 * 1024 * 1024, backupCount=3)
log_handler.setLevel(logging.INFO)
# 로그 포매터 설정
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
log_handler.setFormatter(formatter)
# 로거에 핸들러 추가
logger.addHandler(log_handler)
def trans(text):
# Initialize the Translator
translator = Translator()
# Translate the text from Chinese to Korean
translated = translator.translate(text, src='zh-cn', dest='ko')
# Return the translated text
return translated.text
def setup_driver():
"""
웹 드라이버 설정 및 selenium-stealth 적용
"""
logging.info("웹드라이버 설정")
options = webdriver.ChromeOptions()
ua = UserAgent()
options.add_argument(f"--user-agent={ua.random}") # 랜덤 user_agent 사용
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument('--ignore-certificate-errors')
options.add_argument('--ssl-protocol=any')
options.add_argument('--disable-cache')
driver = webdriver.Chrome(options=options)
# selenium-stealth 설정 적용
logging.info("selenium-stealth 설정 적용")
stealth(driver,
languages=["en-US", "en"],
vendor="Google Inc.",
platform="Win32",
webgl_vendor="Intel Inc.",
renderer="Intel Iris OpenGL Engine",
fix_hairline=True,
)
current_user_agent = driver.execute_script("return navigator.userAgent;")
logging.info(f"현재 사용 중인 User-Agent: {current_user_agent}")
return driver
def login_and_manage_session(driver):
"""
로그인 및 세션 관리 함수
"""
driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
# 쿠키 로드 및 적용
if load_cookies(driver):
driver.refresh() # 쿠키 로드 후 페이지 새로고침
logging.info("쿠키를 통해 로그인 상태 복원")
# 로그인 상태 확인 루프
while True:
if check_login_status(driver):
logging.info("로그인 성공!")
driver.get("https://world.taobao.com/wow/tmg-fc/tmw/search_image?")
break # 로그인이 확인되면 루프 탈출
else:
logging.info("로그인이 확인되지 않았습니다. 재로그인을 시도해주세요.")
# driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
time.sleep(5) # 재시도 전에 대기
else:
logging.info("새로운 로그인 프로세스가 필요함")
driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
while True:
if check_login_status(driver):
logging.info("로그인 성공!")
driver.get("https://world.taobao.com/wow/tmg-fc/tmw/search_image?")
break # 로그인이 확인되면 루프 탈출
else:
logging.info("로그인이 확인되지 않았습니다. 재로그인을 시도해주세요.")
# driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
time.sleep(5) # 재시도 전에 대기
# 로그인 완료 후 쿠키 저장
save_cookies(driver)
def fetch_and_save_taobao_products(driver, imgurl, item_count=5, sort_order=1):
"""
타오바오의 상품을 검색하고 결과를 파싱하는 함수
"""
# 이미지 URL로부터 pHash 값을 계산하는 함수
def calculate_phash(image_url):
try:
# 캡차요청 회피를 위한 헤더 재설정
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1", # Do Not Track 요청 헤더 (사용자의 추적을 거부)
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1", # https로의 업그레이드를 요청
"Cache-Control": "max-age=0", # 캐시된 콘텐츠를 재사용하지 않도록 요청
}
response = requests.get(image_url, headers=headers)
# response = requests.get(image_url)
# 이미지 데이터 검증을 위한 임시 파일 저장
if response.status_code == 200:
with open('temp_image', 'wb') as f:
f.write(response.content)
if response.status_code == 200 and 'image' in response.headers['Content-Type']:
img = Image.open(BytesIO(response.content))
phash = imagehash.phash(img)
return phash
else:
logging.info("이미지 로드 실패 또는 잘못된 콘텐츠 타입")
logging.info(response.status_code)
logging.info(response.headers)
logging.info(response.text[:500]) # 본문의 처음 500자 출력
return None # 이미지 처리에 실패하면 None 반환
except Exception as e:
logging.info(f"이미지 처리 중 오류 발생: {e}")
return None # 예외 발생 시 None 반환
# 두 이미지 URL의 pHash 값의 차이를 계산하는 함수
def compare_images_phash(imgurl, product_imgurl):
hash1 = calculate_phash(imgurl)
hash2 = calculate_phash(product_imgurl)
if hash1 is not None and hash2 is not None:
difference = hash1 - hash2
return difference
else:
return None # 해시 계산에 실패한 경우 None 반환
def convert_price(price_str):
try:
return int(float(price_str))
except ValueError:
return 0
def convert_sales_volume(sales_str):
match = re.search(r'(\d+)(万)?\+?', sales_str)
if match:
num = int(match.group(1))
if match.group(2): # '万'이 포함되어 있다면
num *= 10000 # 万은 10,000을 의미
return num
else:
return 0
def extract_item_id(url):
match = re.search(r'taobao.com/i(\d{10,12})', url)
return match.group(1) if match else None
def search_img(imgurl):
# imgurl에서 이미지를 로컬에 저장
local_image_path = "./img/temp_image.jpg"
if not os.path.exists("./img"):
os.makedirs("./img")
urlretrieve(imgurl, local_image_path) # 주어진 imgurl 사용
# JavaScript를 사용하여 이미지 검색 버튼 클릭
search_button_selector = ".component-search-icon-active"
driver.execute_script(f"document.querySelector('{search_button_selector}').click();")
logging.info("이미지검색버튼 클릭")
# 파일 업로드 처리
file_input = WebDriverWait(driver, 60).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "input[type='file']"))
)
file_input.send_keys(os.path.abspath(local_image_path))
def check_first_product(driver):
try:
first_product_CSS = ".rax-view-v2:nth-child(1) > .rax-view-v2 > .mobile--class-1--2Vz4bM4"
WebDriverWait(driver, 10).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, first_product_CSS)))
logging.info("첫번째 상품을 찾았습니다.")
return True
except TimeoutException:
logging.info("오류 : 첫번째 상품을 찾을 수 없습니다.")
return False
def handle_sorry_message(driver):
sorry_message_xpath = "//span[contains(.,'Sorry没有找到相关的宝贝')]"
try:
WebDriverWait(driver, 5).until(EC.visibility_of_element_located((By.XPATH, sorry_message_xpath)))
logging.info("'Sorry' 발생. 페이지를 새로고침 합니다.")
driver.refresh()
time.sleep(3)
return True
except TimeoutException:
return False
def handle_captcha(driver):
capcha_iframe = 'baxia-dialog-content'
captcha_image_xpath = "//img[@class='captcha-img']"
try:
WebDriverWait(driver, 5).until(EC.visibility_of_element_located((By.XPATH, capcha_iframe)))
logging.info("CAPTCHA화면 발생. 진행을 위해 해결하세요")
while True:
if check_first_product(driver):
logging.info("CAPTCHA가 해결되었습니다. 첫번째 상품이 로드되었습니다.")
return True
else:
logging.info("CAPTCHA가 해결될때 까지 기다림.")
time.sleep(5)
except TimeoutException:
return False
while True: # 무한 루프를 시작하여 조건에 따라 재시도를 관리
search_attempts = 0
max_search_attempts = 5 # 상품 검색을 최대 몇 번까지 재시도할지 설정
found_first_product = False
while search_attempts < max_search_attempts and not found_first_product:
search_img(imgurl) # 상품 검색 시작
max_refresh_attempts = 5
refresh_attempts = 0
while not found_first_product:
if check_first_product(driver):
# 첫 번째 상품이 로드되면 HTML 파싱 수행
logging.info("첫 번째 상품을 성공적으로 찾았습니다. HTML 파싱을 시작합니다.")
found_first_product = True
break # 첫 번째 상품을 찾았으니 내부 while 루프 탈출
else:
if handle_sorry_message(driver) and refresh_attempts < max_refresh_attempts:
logging.info("Sorry 화면입니다. 페이지를 새로고침합니다.")
# driver.refresh()
# time.sleep(3)
refresh_attempts += 1
elif handle_captcha(driver):
logging.info("캡차 화면입니다. 캡차를 해결합니다.")
# handle_captcha(driver)
elif check_login_status(driver):
logging.info("로그인이 필요한 화면입니다. 로그인을 확인합니다.")
# check_login_status(driver)
else:
logging.info("알 수 없는 상태입니다. 상품 검색을 다시 시도합니다.")
break # 알 수 없는 상태이므로 내부 while 루프를 탈출하여 상품 검색을 재시도
if refresh_attempts >= max_refresh_attempts:
logging.info("최대 새로고침 시도 횟수를 초과했습니다. 상품 검색을 다시 시도합니다.")
break # 최대 새로고침 횟수를 초과하면 내부 while 루프를 탈출하여 상품 검색을 재시도
if found_first_product:
break # 첫 번째 상품을 찾았으므로 전체 while 루프 탈출
else:
search_attempts += 1
logging.info(f"상품 검색 재시도 {search_attempts}/{max_search_attempts}")
if found_first_product:
# 성공적으로 첫 번째 상품을 찾은 후의 처리 로직을 여기에 작성
break # 성공적으로 처리가 완료되면 무한 루프 탈출
else:
logging.info("상품 검색 최대 재시도 횟수를 초과했습니다. 프로세스를 처음부터 다시 시작합니다.")
# 필요한 경우, 여기에서 추가적인 초기화 작업을 수행할 수 있습니다.
# 페이지 로딩 대기
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "body"))
)
wait=randint(2,4)
time.sleep(wait) # 또는 더 긴 시간
# 페이지의 HTML을 가져옴
page_source = driver.page_source
# BeautifulSoup 객체 생성
soup = BeautifulSoup(page_source, 'html.parser')
logging.info("html파싱")
# 상품 정보를 저장할 리스트 초기화
products = [] # 상품 정보를 저장할 리스트
similarity_threshold = 40 # 유사도 판단 기준 (해밍 거리)
# 상품 정보 추출
for i, product in enumerate(soup.select('a.mobile--class-1--2Vz4bM4'), start=1):
if i > item_count: # 설정한 아이템 갯수에 도달하면 반복 중단
break
try:
product_url = product['href']
Tao_itemID = extract_item_id(product_url)
image_url = 'https:' + product.select_one("img")['src']
product_name = product.select_one("span.mobile--summary--2mK9e7G").text
trans_product_name = trans(product_name)
price_str = product.select_one("span.mobile--price--3eMQ3ec").text
price = convert_price(price_str)
sales_volume_str = product.select_one("span.mobile--buy--2I4hwR4").text
sales_volume = convert_sales_volume(sales_volume_str)
# =============================
# 이미지 유사도 검사부분 개선필요. 현재는 삭제
# 이미지 유사도 검사
# logging.info("이미지 유사도 검사 실행")
# logging.info(f"원본이미지 : {imgurl}")
# logging.info(f"타켓이미지 : {image_url}")
# difference = compare_images_phash(imgurl, image_url)
# logging.info(f"이미지 유사도 difference = {difference}/{similarity_threshold}")
# ==============================
difference = 30
# wait=randint(1,4)
# time.sleep(wait) # 또는 더 긴 시간
# logging.info(f"요청간 TimeSleep : {wait}")
if difference <= similarity_threshold:
logging.info(f"상품 [{Tao_itemID}]의 상품정보 추가")
product_info = {
"Product Name": trans_product_name,
"Image URL": image_url,
"Price": price,
"Sales Volume": sales_volume,
"Product URL": product_url,
"Tao_itemID": Tao_itemID,
}
else:
logging.info(f"상품 [{Tao_itemID}]의 상품이미지가 일치하지 않아 제외처리")
products.append(product_info)
except Exception as e:
logging.info(f"상품정보 추출 오류 발생 {i}: {e}")
# # 정렬 로직 (가격순, 판매량순 정렬)
# if sort_order == 2: # 가격순 정렬
# products.sort(key=lambda x: float(x['Price'].strip('¥')) if isinstance(x['Price'], str) else float(x['Price']))
# elif sort_order == 3: # 판매량순 정렬
# products.sort(key=lambda x: int(x['Sales Volume'].strip('已售').strip('件')), reverse=True)
# # 정렬 로직 (가격순, 판매량순 정렬)
# if sort_order == 2: # 가격순 정렬
# products.sort(key=lambda x: float(x['Price'].strip('¥')))
# elif sort_order == 3: # 판매량순 정렬
# products.sort(key=lambda x: int(x['Sales Volume'].strip('已售').strip('件')), reverse=True)
# 셀레니움 드라이버 종료
# driver.quit()
# 상품 정보 반환
return [(product['Product URL'], product['Tao_itemID'], product['Image URL'], product['Product Name'], product['Price'], product['Sales Volume']) for product in products]