384 lines
17 KiB
Python
384 lines
17 KiB
Python
from selenium import webdriver
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from selenium.common.exceptions import TimeoutException, NoSuchElementException
|
||
from PyQt5.QtWidgets import QMessageBox
|
||
from bs4 import BeautifulSoup
|
||
import os, re
|
||
from time import sleep
|
||
import time
|
||
from random import randint
|
||
from selenium_stealth import stealth
|
||
from fake_useragent import UserAgent
|
||
from PIL import Image
|
||
import imagehash
|
||
import requests
|
||
from io import BytesIO
|
||
|
||
from googletrans import Translator
|
||
|
||
from urllib.request import urlretrieve
|
||
from modules.cookie_manager import load_cookies, save_cookies, check_login_status
|
||
from modules.compare_with_cv2 import compare_images
|
||
|
||
import logging
|
||
from logging.handlers import RotatingFileHandler
|
||
# 로그 설정
|
||
logger = logging.getLogger()
|
||
logger.setLevel(logging.INFO)
|
||
|
||
# RotatingFileHandler를 사용하여 로그 파일 설정
|
||
log_handler = RotatingFileHandler(filename='tao_parser.log', maxBytes=10 * 1024 * 1024, backupCount=3)
|
||
log_handler.setLevel(logging.INFO)
|
||
|
||
# 로그 포매터 설정
|
||
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||
log_handler.setFormatter(formatter)
|
||
|
||
# 로거에 핸들러 추가
|
||
logger.addHandler(log_handler)
|
||
|
||
def trans(text):
|
||
# Initialize the Translator
|
||
translator = Translator()
|
||
|
||
# Translate the text from Chinese to Korean
|
||
translated = translator.translate(text, src='zh-cn', dest='ko')
|
||
|
||
# Return the translated text
|
||
return translated.text
|
||
|
||
def setup_driver():
|
||
"""
|
||
웹 드라이버 설정 및 selenium-stealth 적용
|
||
"""
|
||
logging.info("웹드라이버 설정")
|
||
options = webdriver.ChromeOptions()
|
||
ua = UserAgent()
|
||
options.add_argument(f"--user-agent={ua.random}") # 랜덤 user_agent 사용
|
||
options.add_argument("--disable-blink-features=AutomationControlled")
|
||
options.add_argument('--ignore-certificate-errors')
|
||
options.add_argument('--ssl-protocol=any')
|
||
options.add_argument('--disable-cache')
|
||
driver = webdriver.Chrome(options=options)
|
||
|
||
# selenium-stealth 설정 적용
|
||
logging.info("selenium-stealth 설정 적용")
|
||
stealth(driver,
|
||
languages=["en-US", "en"],
|
||
vendor="Google Inc.",
|
||
platform="Win32",
|
||
webgl_vendor="Intel Inc.",
|
||
renderer="Intel Iris OpenGL Engine",
|
||
fix_hairline=True,
|
||
)
|
||
|
||
current_user_agent = driver.execute_script("return navigator.userAgent;")
|
||
logging.info(f"현재 사용 중인 User-Agent: {current_user_agent}")
|
||
|
||
return driver
|
||
|
||
|
||
def login_and_manage_session(driver):
|
||
"""
|
||
로그인 및 세션 관리 함수
|
||
"""
|
||
driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
|
||
|
||
# 쿠키 로드 및 적용
|
||
if load_cookies(driver):
|
||
driver.refresh() # 쿠키 로드 후 페이지 새로고침
|
||
logging.info("쿠키를 통해 로그인 상태 복원")
|
||
# 로그인 상태 확인 루프
|
||
while True:
|
||
if check_login_status(driver):
|
||
logging.info("로그인 성공!")
|
||
driver.get("https://world.taobao.com/wow/tmg-fc/tmw/search_image?")
|
||
break # 로그인이 확인되면 루프 탈출
|
||
else:
|
||
logging.info("로그인이 확인되지 않았습니다. 재로그인을 시도해주세요.")
|
||
# driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
|
||
time.sleep(5) # 재시도 전에 대기
|
||
else:
|
||
logging.info("새로운 로그인 프로세스가 필요함")
|
||
driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
|
||
while True:
|
||
if check_login_status(driver):
|
||
logging.info("로그인 성공!")
|
||
driver.get("https://world.taobao.com/wow/tmg-fc/tmw/search_image?")
|
||
break # 로그인이 확인되면 루프 탈출
|
||
else:
|
||
logging.info("로그인이 확인되지 않았습니다. 재로그인을 시도해주세요.")
|
||
# driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
|
||
time.sleep(5) # 재시도 전에 대기
|
||
# 로그인 완료 후 쿠키 저장
|
||
save_cookies(driver)
|
||
|
||
|
||
def fetch_and_save_taobao_products(driver, imgurl, item_count=5, sort_order=1):
|
||
"""
|
||
타오바오의 상품을 검색하고 결과를 파싱하는 함수
|
||
"""
|
||
|
||
# 이미지 URL로부터 pHash 값을 계산하는 함수
|
||
def calculate_phash(image_url):
|
||
try:
|
||
# 캡차요청 회피를 위한 헤더 재설정
|
||
headers = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36",
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
|
||
"Accept-Language": "en-US,en;q=0.9",
|
||
"Accept-Encoding": "gzip, deflate, br",
|
||
"DNT": "1", # Do Not Track 요청 헤더 (사용자의 추적을 거부)
|
||
"Connection": "keep-alive",
|
||
"Upgrade-Insecure-Requests": "1", # https로의 업그레이드를 요청
|
||
"Cache-Control": "max-age=0", # 캐시된 콘텐츠를 재사용하지 않도록 요청
|
||
}
|
||
response = requests.get(image_url, headers=headers)
|
||
# response = requests.get(image_url)
|
||
# 이미지 데이터 검증을 위한 임시 파일 저장
|
||
if response.status_code == 200:
|
||
with open('temp_image', 'wb') as f:
|
||
f.write(response.content)
|
||
|
||
if response.status_code == 200 and 'image' in response.headers['Content-Type']:
|
||
img = Image.open(BytesIO(response.content))
|
||
phash = imagehash.phash(img)
|
||
return phash
|
||
else:
|
||
logging.info("이미지 로드 실패 또는 잘못된 콘텐츠 타입")
|
||
logging.info(response.status_code)
|
||
logging.info(response.headers)
|
||
logging.info(response.text[:500]) # 본문의 처음 500자 출력
|
||
|
||
return None # 이미지 처리에 실패하면 None 반환
|
||
except Exception as e:
|
||
logging.info(f"이미지 처리 중 오류 발생: {e}")
|
||
return None # 예외 발생 시 None 반환
|
||
|
||
|
||
# 두 이미지 URL의 pHash 값의 차이를 계산하는 함수
|
||
def compare_images_phash(imgurl, product_imgurl):
|
||
hash1 = calculate_phash(imgurl)
|
||
hash2 = calculate_phash(product_imgurl)
|
||
if hash1 is not None and hash2 is not None:
|
||
difference = hash1 - hash2
|
||
return difference
|
||
else:
|
||
return None # 해시 계산에 실패한 경우 None 반환
|
||
|
||
def convert_price(price_str):
|
||
try:
|
||
return int(float(price_str))
|
||
except ValueError:
|
||
return 0
|
||
|
||
def convert_sales_volume(sales_str):
|
||
match = re.search(r'(\d+)(万)?\+?', sales_str)
|
||
if match:
|
||
num = int(match.group(1))
|
||
if match.group(2): # '万'이 포함되어 있다면
|
||
num *= 10000 # 万은 10,000을 의미
|
||
return num
|
||
else:
|
||
return 0
|
||
|
||
def extract_item_id(url):
|
||
match = re.search(r'taobao.com/i(\d{10,12})', url)
|
||
return match.group(1) if match else None
|
||
|
||
def search_img(imgurl):
|
||
# imgurl에서 이미지를 로컬에 저장
|
||
local_image_path = "./img/temp_image.jpg"
|
||
if not os.path.exists("./img"):
|
||
os.makedirs("./img")
|
||
urlretrieve(imgurl, local_image_path) # 주어진 imgurl 사용
|
||
|
||
# JavaScript를 사용하여 이미지 검색 버튼 클릭
|
||
search_button_selector = ".component-search-icon-active"
|
||
driver.execute_script(f"document.querySelector('{search_button_selector}').click();")
|
||
logging.info("이미지검색버튼 클릭")
|
||
|
||
# 파일 업로드 처리
|
||
file_input = WebDriverWait(driver, 60).until(
|
||
EC.presence_of_element_located((By.CSS_SELECTOR, "input[type='file']"))
|
||
)
|
||
file_input.send_keys(os.path.abspath(local_image_path))
|
||
|
||
def check_first_product(driver):
|
||
try:
|
||
first_product_CSS = ".rax-view-v2:nth-child(1) > .rax-view-v2 > .mobile--class-1--2Vz4bM4"
|
||
WebDriverWait(driver, 10).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, first_product_CSS)))
|
||
logging.info("첫번째 상품을 찾았습니다.")
|
||
return True
|
||
except TimeoutException:
|
||
logging.info("오류 : 첫번째 상품을 찾을 수 없습니다.")
|
||
return False
|
||
|
||
def handle_sorry_message(driver):
|
||
sorry_message_xpath = "//span[contains(.,'Sorry,没有找到相关的宝贝!!')]"
|
||
try:
|
||
WebDriverWait(driver, 5).until(EC.visibility_of_element_located((By.XPATH, sorry_message_xpath)))
|
||
logging.info("'Sorry' 발생. 페이지를 새로고침 합니다.")
|
||
driver.refresh()
|
||
time.sleep(3)
|
||
return True
|
||
except TimeoutException:
|
||
return False
|
||
|
||
def handle_captcha(driver):
|
||
capcha_iframe = 'baxia-dialog-content'
|
||
captcha_image_xpath = "//img[@class='captcha-img']"
|
||
try:
|
||
WebDriverWait(driver, 5).until(EC.visibility_of_element_located((By.XPATH, capcha_iframe)))
|
||
logging.info("CAPTCHA화면 발생. 진행을 위해 해결하세요")
|
||
while True:
|
||
if check_first_product(driver):
|
||
logging.info("CAPTCHA가 해결되었습니다. 첫번째 상품이 로드되었습니다.")
|
||
return True
|
||
else:
|
||
logging.info("CAPTCHA가 해결될때 까지 기다림.")
|
||
time.sleep(5)
|
||
except TimeoutException:
|
||
return False
|
||
|
||
|
||
|
||
while True: # 무한 루프를 시작하여 조건에 따라 재시도를 관리
|
||
search_attempts = 0
|
||
max_search_attempts = 5 # 상품 검색을 최대 몇 번까지 재시도할지 설정
|
||
found_first_product = False
|
||
|
||
while search_attempts < max_search_attempts and not found_first_product:
|
||
search_img(imgurl) # 상품 검색 시작
|
||
max_refresh_attempts = 5
|
||
refresh_attempts = 0
|
||
|
||
while not found_first_product:
|
||
if check_first_product(driver):
|
||
# 첫 번째 상품이 로드되면 HTML 파싱 수행
|
||
logging.info("첫 번째 상품을 성공적으로 찾았습니다. HTML 파싱을 시작합니다.")
|
||
found_first_product = True
|
||
break # 첫 번째 상품을 찾았으니 내부 while 루프 탈출
|
||
else:
|
||
if handle_sorry_message(driver) and refresh_attempts < max_refresh_attempts:
|
||
logging.info("Sorry 화면입니다. 페이지를 새로고침합니다.")
|
||
# driver.refresh()
|
||
# time.sleep(3)
|
||
refresh_attempts += 1
|
||
elif handle_captcha(driver):
|
||
logging.info("캡차 화면입니다. 캡차를 해결합니다.")
|
||
# handle_captcha(driver)
|
||
elif check_login_status(driver):
|
||
logging.info("로그인이 필요한 화면입니다. 로그인을 확인합니다.")
|
||
# check_login_status(driver)
|
||
else:
|
||
logging.info("알 수 없는 상태입니다. 상품 검색을 다시 시도합니다.")
|
||
break # 알 수 없는 상태이므로 내부 while 루프를 탈출하여 상품 검색을 재시도
|
||
|
||
if refresh_attempts >= max_refresh_attempts:
|
||
logging.info("최대 새로고침 시도 횟수를 초과했습니다. 상품 검색을 다시 시도합니다.")
|
||
break # 최대 새로고침 횟수를 초과하면 내부 while 루프를 탈출하여 상품 검색을 재시도
|
||
|
||
if found_first_product:
|
||
break # 첫 번째 상품을 찾았으므로 전체 while 루프 탈출
|
||
else:
|
||
search_attempts += 1
|
||
logging.info(f"상품 검색 재시도 {search_attempts}/{max_search_attempts}")
|
||
|
||
if found_first_product:
|
||
# 성공적으로 첫 번째 상품을 찾은 후의 처리 로직을 여기에 작성
|
||
break # 성공적으로 처리가 완료되면 무한 루프 탈출
|
||
else:
|
||
logging.info("상품 검색 최대 재시도 횟수를 초과했습니다. 프로세스를 처음부터 다시 시작합니다.")
|
||
# 필요한 경우, 여기에서 추가적인 초기화 작업을 수행할 수 있습니다.
|
||
|
||
|
||
# 페이지 로딩 대기
|
||
WebDriverWait(driver, 10).until(
|
||
EC.presence_of_element_located((By.CSS_SELECTOR, "body"))
|
||
)
|
||
wait=randint(2,4)
|
||
time.sleep(wait) # 또는 더 긴 시간
|
||
|
||
# 페이지의 HTML을 가져옴
|
||
page_source = driver.page_source
|
||
# BeautifulSoup 객체 생성
|
||
soup = BeautifulSoup(page_source, 'html.parser')
|
||
logging.info("html파싱")
|
||
|
||
# 상품 정보를 저장할 리스트 초기화
|
||
products = [] # 상품 정보를 저장할 리스트
|
||
similarity_threshold = 40 # 유사도 판단 기준 (해밍 거리)
|
||
|
||
# 상품 정보 추출
|
||
for i, product in enumerate(soup.select('a.mobile--class-1--2Vz4bM4'), start=1):
|
||
if i > item_count: # 설정한 아이템 갯수에 도달하면 반복 중단
|
||
break
|
||
try:
|
||
product_url = product['href']
|
||
Tao_itemID = extract_item_id(product_url)
|
||
image_url = 'https:' + product.select_one("img")['src']
|
||
product_name = product.select_one("span.mobile--summary--2mK9e7G").text
|
||
trans_product_name = trans(product_name)
|
||
price_str = product.select_one("span.mobile--price--3eMQ3ec").text
|
||
price = convert_price(price_str)
|
||
sales_volume_str = product.select_one("span.mobile--buy--2I4hwR4").text
|
||
sales_volume = convert_sales_volume(sales_volume_str)
|
||
|
||
# =============================
|
||
# 이미지 유사도 검사부분 개선필요. 현재는 삭제
|
||
# 이미지 유사도 검사
|
||
# logging.info("이미지 유사도 검사 실행")
|
||
# logging.info(f"원본이미지 : {imgurl}")
|
||
# logging.info(f"타켓이미지 : {image_url}")
|
||
# difference = compare_images_phash(imgurl, image_url)
|
||
# logging.info(f"이미지 유사도 difference = {difference}/{similarity_threshold}")
|
||
# ==============================
|
||
|
||
difference = 30
|
||
# wait=randint(1,4)
|
||
# time.sleep(wait) # 또는 더 긴 시간
|
||
# logging.info(f"요청간 TimeSleep : {wait}")
|
||
|
||
|
||
if difference <= similarity_threshold:
|
||
logging.info(f"상품 [{Tao_itemID}]의 상품정보 추가")
|
||
product_info = {
|
||
"Product Name": trans_product_name,
|
||
"Image URL": image_url,
|
||
"Price": price,
|
||
"Sales Volume": sales_volume,
|
||
"Product URL": product_url,
|
||
"Tao_itemID": Tao_itemID,
|
||
}
|
||
else:
|
||
logging.info(f"상품 [{Tao_itemID}]의 상품이미지가 일치하지 않아 제외처리")
|
||
|
||
products.append(product_info)
|
||
except Exception as e:
|
||
logging.info(f"상품정보 추출 오류 발생 {i}: {e}")
|
||
|
||
|
||
# # 정렬 로직 (가격순, 판매량순 정렬)
|
||
# if sort_order == 2: # 가격순 정렬
|
||
# products.sort(key=lambda x: float(x['Price'].strip('¥')) if isinstance(x['Price'], str) else float(x['Price']))
|
||
# elif sort_order == 3: # 판매량순 정렬
|
||
# products.sort(key=lambda x: int(x['Sales Volume'].strip('已售').strip('件')), reverse=True)
|
||
|
||
|
||
# # 정렬 로직 (가격순, 판매량순 정렬)
|
||
# if sort_order == 2: # 가격순 정렬
|
||
# products.sort(key=lambda x: float(x['Price'].strip('¥')))
|
||
# elif sort_order == 3: # 판매량순 정렬
|
||
# products.sort(key=lambda x: int(x['Sales Volume'].strip('已售').strip('件')), reverse=True)
|
||
|
||
# 셀레니움 드라이버 종료
|
||
# driver.quit()
|
||
|
||
# 상품 정보 반환
|
||
return [(product['Product URL'], product['Tao_itemID'], product['Image URL'], product['Product Name'], product['Price'], product['Sales Volume']) for product in products]
|
||
|
||
|