TaoSourcerer/modules/tao_parser.py

1009 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver import ActionChains
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from PyQt5.QtWidgets import QMessageBox
from bs4 import BeautifulSoup
import os, re
from time import sleep
import time
import pyperclip
import win32clipboard
import traceback
from random import randint
import random
from selenium_stealth import stealth
from fake_useragent import UserAgent
from PIL import Image
# import imagehash
import requests
from io import BytesIO
#from googletrans import Translator
from translatepy import Translator
from urllib.request import urlretrieve
from modules.cookie_manager import *
from modules.crop_n_rotate import *
# from modules.compare_with_cv2 import compare_images
import logging
# 로거 인스턴스 가져오기
logger = logging.getLogger('default_logger')
MAX_IMAGE_MODIFICATIONS = 3 # 최대 이미지 수정 횟수
def convert_price(price_str):
logger.debug(f"convert_price : {price_str}")
try:
return int(float(price_str))
except ValueError:
return 0
def convert_sales_volume(sales_str):
logger.debug(f"convert_sales_volume : {sales_str}")
match = re.search(r'(\d+)(万)?\+?', sales_str)
if match:
num = int(match.group(1))
if match.group(2): # '万'이 포함되어 있다면
num *= 10000 # 万은 10,000을 의미
return num
else:
return 0
def extract_item_id(url):
logger.debug(f"extract_item_id : {url}")
match = re.search(r'taobao.com/i(\d{10,12})', url)
return match.group(1) if match else None
def fetch_products(souped, item_count, similarity_threshold):
fetched_products = []
if not souped: # HTML 파싱 결과가 비어 있는 경우
logger.debug("HTML 파싱 결과가 비어 있습니다.")
return []
for i, souped_element in enumerate(souped, start=1):
logger.debug(f"souped product 진행률 : [{i}]/[{len(souped)}]")
if i > item_count: # 설정한 아이템 갯수에 도달하면 반복 중단
break
try:
product_url = souped_element['href']
logger.debug(f"타오바오 상품 product_url : {product_url}")
Tao_itemID = extract_item_id(product_url)
logger.debug(f"타오바오 상품 extract_item_id : {Tao_itemID}")
image_url = 'https:' + souped_element.select_one("img")['src']
logger.debug(f"타오바오 상품 image_url : {image_url}")
product_name = souped_element.select_one("span.mobile--summary--2mK9e7G").text
logger.debug(f"타오바오 상품 product_name : {product_name}")
#trans_product_name = trans(product_name)
trans_product_name = translate_texts_translatepy(product_name)
trans_product_name = str(trans_product_name)
logger.debug(f"타오바오 상품 trans_product_name : {trans_product_name}")
price_str = souped_element.select_one("span.mobile--price--3eMQ3ec").text
logger.debug(f"타오바오 상품 price_str : {price_str}")
price = convert_price(price_str)
sales_volume_str = souped_element.select_one("span.mobile--buy--2I4hwR4").text
logger.debug(f"타오바오 상품 sales_volume_str : {sales_volume_str}")
sales_volume = convert_sales_volume(sales_volume_str)
logger.debug(f"타오바오 상품 sales_volume : {sales_volume}")
# =============================
# 이미지 유사도 검사부분 개선필요. 현재는 삭제
# 이미지 유사도 검사
# logger.debug("이미지 유사도 검사 실행")
# logger.debug(f"원본이미지 : {imgurl}")
# logger.debug(f"타켓이미지 : {image_url}")
# difference = compare_images_phash(imgurl, image_url)
# logger.debug(f"이미지 유사도 difference = {difference}/{similarity_threshold}")
# ==============================
difference = 30
# wait=randint(1,4)
# time.sleep(wait) # 또는 더 긴 시간
# logger.debug(f"요청간 TimeSleep : {wait}")
if difference <= similarity_threshold:
logger.debug(f"상품 [{Tao_itemID}]의 상품정보 추가")
product_info = {
"Product URL": product_url,
"Tao_itemID": Tao_itemID,
"Image URL": image_url,
"Product Name": trans_product_name,
"Price": price,
"Sales Volume": sales_volume,
}
else:
logger.warning(f"상품 [{Tao_itemID}]의 상품이미지가 일치하지 않아 제외처리")
fetched_products.append(product_info)
except Exception as e:
logger.error(f"상품정보 추출 오류 발생 {i}: {e}", exc_info=True)
continue
return fetched_products
def translate_texts_translatepy(text, src_lang='zh-cn', dest_lang='ko'):
"""translatepy 라이브러리를 사용하여 텍스트 리스트를 번역합니다."""
try:
translator = Translator()
logger.debug(f"번역대상 텍스트 ======== {text}")
result = translator.translate(text, source_language=src_lang, destination_language=dest_lang)
logger.debug(f"번역된 텍스트 : {result}")
translation = str(result)
return translation
except Exception as e:
logger.debug(f"번역 중 오류발생 : {e}", exc_info=True)
def trans(text):
try:
# Initialize the Translator
translator = Translator()
# Translate the text from Chinese to Korean
translated = translator.translate(text, src='zh-cn', dest='ko')
# Return the translated text
return translated.text
except Exception as e:
logger.debug(f"번역 중 오류발생 : {e}", exc_info=True)
def check_first_product_tao(driver):
try:
first_product_CSS = ".rax-view-v2:nth-child(1) > .rax-view-v2 > .mobile--class-1--2Vz4bM4"
WebDriverWait(driver, 10).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, first_product_CSS)))
logger.debug("첫번째 상품을 찾았습니다.")
return True
except TimeoutException:
logger.debug("오류 : 첫번째 상품을 찾을 수 없습니다.")
return False
def handle_baxiaFrame(driver, defaultFrameName, message_controller):
last_message_time = time.time()
baxia_iframe_id = 'baxia-dialog-content'
scratch_captcha_class = 'scratch-captcha-question'
nocaptcha_class = 'nc_1_nocaptcha'
login_class = 'login-box loading'
captcha_resolved = False
try:
WebDriverWait(driver, 5).until(EC.visibility_of_element_located((By.ID, baxia_iframe_id)))
# driver.switch_to.frame(baxia_iframe_id)
logger.debug("baxia_Frame 화면 발생. 타입에 따른 해결 시도")
switch_to_frame(driver, defaultFrameName, baxia_iframe_id)
# WebDriverWait(driver, 10).until(
# EC.frame_to_be_available_and_switch_to_it(baxia_iframe_id))
# driver.switch_to.frame(baxia_iframe_id)
logger.debug(f"baxia_Frame 에서 {baxia_iframe_id}로 전환")
# 스크래치 캡차 처리
if driver.find_elements(By.CLASS_NAME, scratch_captcha_class):
logger.debug("baxia_Frame 에서 [Scratch Captcha] 타입 발생 감지")
time.sleep(1)
try:
captcha_resolved = handle_scratch_captcha(driver, defaultFrameName, message_controller)
except Exception as e:
logger.error(f"handle_scratch_captcha 메서드 호출 중 에러 발생 : {e}", exc_info=True)
logger.debug(f"(baxia_Frame) [Scratch Captcha] 처리결과 : [{captcha_resolved}]")
# 슬라이더 캡차 처리
elif driver.find_elements(By.CLASS_NAME, nocaptcha_class):
logger.debug("baxia_Frame 에서 [Sliding Captcha] 타입 발생 감지")
time.sleep(1)
try:
captcha_resolved = handle_sliding_captcha(driver, defaultFrameName, message_controller, last_message_time)
except Exception as e:
logger.error(f"handle_sliding_captcha 메서드 호출 중 에러 발생 : {e}", exc_info=True)
logger.debug(f"(baxia_Frame) [Sliding Captcha] 처리결과 : [{captcha_resolved}]")
# 로그인 상태 확인 및 처리
elif driver.find_elements(By.CLASS_NAME, login_class):
logger.debug("baxia_Frame 에서 [로그인 요구] 타입 발생 감지")
time.sleep(1)
try:
if load_cookies(driver):
captcha_resolved = True
else:
logger.debug("사용자의 로그인 시도 대기 5초마다 체크")
check_login_status_by_iframe() # 사용자가 로그인하길 기다림
except Exception as e:
logger.error(f"baxia_Frame 에서 [로그인 요구] 처리 중 에러 발생 : {e}", exc_info=True)
except TimeoutException as e:
logger.error(f"CAPTCHA 화면을 찾는 데 실패했습니다: {e}")
captcha_resolved = False
finally:
logger.debug("finally switch_to_frame 호출")
switch_to_frame(driver, defaultFrameName, 'default')
# driver.switch_to.default_content()
logger.debug("기본 프레임으로 전환했습니다.")
return captcha_resolved
def switch_to_frame(driver, defaultFrameName, frameName):
"""
지정된 프레임으로 전환하는 함수입니다.
이 함수는 현재 포커스된 프레임이 전환하려는 프레임과 다를 경우에만 프레임 전환을 시도합니다.
:param driver: WebDriver 인스턴스
:param frameName: 전환할 프레임의 이름 또는 'default' (기본 프레임으로 전환할 경우)
"""
# logger.debug("프레임 포커스를 전환하겠습니다.")
current_frame = driver.execute_script("return self.name") # 현재 프레임의 이름을 얻음
logger.debug(f"[switch_to_frame]현재 프레임 포커스 : [{current_frame}]")
logger.debug("[switch_to_frame]프레임 전환 전 기본 프레임으로 전환 시도")
if current_frame != defaultFrameName: # 현재 프레임이 기본 프레임이 아니면 전환
driver.switch_to.default_content()
logger.debug("[switch_to_frame]현재 포커스가 기본 프레임이 아니기 때문에 기본프레임으로 전환했습니다.")
else:
logger.debug("[switch_to_frame]현재 포커스는 기본 프레임상태이므로 전환하지 않습니다.")
if frameName == 'default':
if current_frame != defaultFrameName: # 현재 프레임이 기본 프레임이 아니면 전환
driver.switch_to.default_content()
logger.debug("[switch_to_frame]기본 프레임으로 전환했습니다.")
else:
# frameName이 현재 프레임과 다르면 전환을 시도
if current_frame != frameName:
try:
# 프레임 존재 확인
WebDriverWait(driver, 10).until(
EC.frame_to_be_available_and_switch_to_it(frameName)
)
logger.debug(f"[switch_to_frame] '{frameName}' 프레임으로 전환했습니다.")
except TimeoutException:
logger.error(f"[switch_to_frame] '{frameName}' 프레임을 찾을 수 없습니다.")
except Exception as e:
logger.error(f"[switch_to_frame] '{frameName}' 프레임으로의 전환 중 오류 발생: {e}")
else:
logger.debug(f"[switch_to_frame] '{frameName}' 프레임에 이미 포커스되어 있습니다.")
# def handle_captcha_for_tao(driver, message_controller):
# captcha_iframe_id = 'baxia-dialog-content'
# last_message_time = time.time()
# scratch_captcha_class = 'scratch-captcha-question'
# nocaptcha_class = 'nc_1_nocaptcha'
# login_class = 'login-box loading'
# try:
# WebDriverWait(driver, 10).until(EC.visibility_of_element_located((By.ID, captcha_iframe_id)))
# logger.debug("CAPTCHA 화면 발생. 진행을 위해 해결하세요")
# # iframe으로 포커스 전환
# driver.switch_to.frame(captcha_iframe_id)
# # 캡차 유형 식별
# if driver.find_elements(By.CLASS_NAME, scratch_captcha_class):
# # message_controller.send_message("scratch_captcha 발견: 처리를 기다리고 있습니다.")
# logger.debug("스크래치 캡차 발생")
# handle_scratch_captcha(driver) # Scratch captcha 처리를 위한 별도의 함수
# elif driver.find_elements(By.CLASS_NAME, nocaptcha_class):
# logger.debug("슬라이드 캡차 발생")
# handle_sliding_captcha(driver, message_controller, last_message_time) # Nocaptcha 처리를 위한 별도의 함수
# elif driver.find_elements(By.CLASS_NAME, login_class):
# logger.debug("로그인 팝업 발생")
# if load_cookies(driver):
# logger.debug("저장된 쿠키로 로그인 완료")
# else:
# logger.debug("사용자의 로그인 시도 대기")
# # message_controller.send_massage("로그인 요청")
# time.sleep(600)
# except TimeoutException as e:
# logger.error(f"CAPTCHA 화면을 찾는 데 실패했습니다: {e}", exc_info=True)
# return False
# finally:
# # 기본 컨텐츠로 포커스 전환
# driver.switch_to.default_content()
# logger.debug("캡차 해결 완료")
def handle_scratch_captcha(driver, defaultFrameName, message_controller, max_wait_time=7200):
"""
스크래치 캡차를 처리하는 함수
:param driver: WebDriver 인스턴스
:param message_controller: 메시지 컨트롤러 인스턴스
"""
# logger.debug("스크래치 캡차 발생")
# 사용자에게 캡차 해결 요청 메시지 전송
# message_controller.send_message("스크래치 캡차가 발생했습니다. 해결을 도와주세요.")
logger.debug("Please Soleve Scratch Captcha | 해결을 도와주세요.")
# 향후 캡차 해결 서비스 도입을 위한 준비
# TODO: 2Captcha 같은 캡차 해결 서비스를 도입하여 자동으로 처리할 예정
# captcha_solver = TwoCaptchaSolver(api_key='YOUR_API_KEY')
# result = captcha_solver.solve_captcha(driver)
# if result:
# logger.debug("스크래치 캡차 자동 해결 완료")
# else:
# logger.error("스크래치 캡차 자동 해결 실패")
# 캡차가 해결될 때까지 대기
captcha_frame_selector = (By.ID, 'baxia-dialog-content')
scratch_captcha_class = 'scratch-captcha-question'
start_time = time.time()
resolved = False
# logger.debug(f"시간 경과 : {time.time() - start_time} (Second) | 최대경과 [{max_wait_time}]")
# 기본 프레임으로 전환
driver.switch_to.default_content()
logger.debug("기본 프레임으로 전환했습니다.") # baxia-dialog-content 프레임이 사라질 때까지 대기
while time.time() - start_time < max_wait_time:
logger.debug(f"시간 경과 : {time.time() - start_time} (Second) | 최대경과 [{max_wait_time}]")
try:
WebDriverWait(driver, 5).until(
EC.invisibility_of_element_located(captcha_frame_selector)
)
logger.debug("스크래치 캡차 해결됨, 추가 확인을 위해 5초 대기 중...")
time.sleep(5) # 해결 후 잠시 대기
# 대기 후 다시 프레임이 없는지 확인
# switch_to_frame(driver, defaultFrameName, 'default')
# if not WebDriverWait(driver, 2).until(
# EC.presence_of_element_located(captcha_frame_selector)):
try:
WebDriverWait(driver, 5).until(
EC.invisibility_of_element_located(captcha_frame_selector)
)
# if not driver.find_elements(*captcha_frame_selector):
logger.debug("@@@@@스크래치 캡차가 최종적으로 해결되었습니다.@@@@@")
return True
except TimeoutException:
logger.debug("스크래치 캡차 재발생, 다시 해결을 기다립니다.")
except TimeoutException:
logger.debug("스크래치 캡차가 여전히 존재합니다, 계속 대기합니다.")
continue # 계속해서 캡차가 존재하는지 확인
except Exception as e:
logger.error(f"스크래치 캡차 대기 중 오류 발생: {e}")
break
if time.time() - start_time >= max_wait_time:
logger.error("스크래치 캡차 해결 대기 시간 초과. 프로그램을 종료하거나 추가 조치가 필요합니다.")
# 메세지 보낸 후 2캅차 호출
return False
return True
def handle_sliding_captcha(driver, defaultFrameName ,message_controller, last_message_time):
# driver.switch_to.frame('baxia-dialog-content')
act = ActionChains(driver)
logger.debug("Sliding Captcha 해결 시도")
offset = 258
attempt_count = 0
max_attempts = 100 # 최대 시도 횟수 설정
while attempt_count < max_attempts:
try:
try:
if driver.find_elements(By.CLASS_NAME, 'errloading'):
refresh_button = WebDriverWait(driver, 3).until(
EC.visibility_of_element_located((By.CLASS_NAME, 'errloading')))
act.click(refresh_button).perform()
except:
pass
# 슬라이더 캡차가 로드되기를 기다림
slider = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, 'nc_1_n1z')) # 슬라이더의 ID를 정확히 알아야 함
)
logger.debug("슬라이더 로드")
# 슬라이더를 클릭하여 활성화
act.click_and_hold(slider).perform()
time.sleep(0.2)
logger.debug("슬라이더 클릭")
# 슬라이더를 움직이는 로직
rand_offset = randint(180,220)
# # 점진적으로 슬라이더를 이동
# for x in range(0, rand_offset, 1): # 10픽셀 단위로 슬라이더 이동
# act.move_by_offset(1, 0).perform()
# time.sleep(0.001) # 자연스러운 움직임을 위해 소량의 딜레이 추가
# logger.debug(f"슬라이더 {x+10}픽셀 이동")
act.move_by_offset(rand_offset, 0).perform() # 랜덤한 절반 이동
logger.debug("슬라이더 이동")
wait_time = random.uniform(0.1, 0.7)
time.sleep(wait_time) # 결과 확인을 위한 대기 시간
# # 점진적으로 슬라이더를 추가적으로 이동
# for x in range(0, offset-rand_offset, 1): # 10픽셀 단위로 슬라이더 이동
# act.move_by_offset(1, 0).perform()
# time.sleep(0.01) # 자연스러운 움직임을 위해 소량의 딜레이 추가
# logger.debug(f"슬라이더 {x+10}픽셀 이동")
act.move_by_offset(offset-rand_offset, 0).perform() # 랜덤한 절반 이동
wait_time = randint(1,2)
wait_time = random.uniform(0.3, 1.4)
time.sleep(wait_time) # 결과 확인을 위한 대기 시간
act.release().perform()# 마우스 버튼을 놓음
logger.debug("슬라이더 버튼 놓음")
# 성공적으로 슬라이드 했는지 확인
logger.debug("캡차 결과 확인 중...")
time.sleep(0.2) # 결과 확인을 위한 대기 시간
# 캡차 해결 여부 확인
#<div id="`nc_1_refresh1`" class="errloading"><i id="`nc_1_refresh2`" class="nc_iconfont icon_warn"></i>Oops... something's wrong. Please refresh and try again.(error:x5fZg)</div>
try:
WebDriverWait(driver, 2).until_not(
EC.visibility_of_element_located((By.CLASS_NAME, 'errloading'))
)
WebDriverWait(driver, 3).until_not(
EC.visibility_of_element_located((By.ID, 'baxia-dialog-content'))
)
# WebDriverWait(driver, 2).until_not(
# EC.visibility_of_element_located((By.ID, 'nc_1_n1z'))
# )
logger.debug("Sliding Captcha Solved")
logger.debug("추가적인 Scratch Captcha 감지 대기 중 : 대기시간 4초")
time.sleep(4)
if check_for_scratch_captcha(driver):
logger.debug("Sliding Captcha 해결 후 scratch 캡차 발생 감지")
return handle_scratch_captcha(driver, defaultFrameName, message_controller)
logger.debug("스크래치 캡차 없음 확인")
logger.debug(f"!!! Sliding Captcha [{attempt_count+1}]/[{max_attempts}]회 만에 해결!!!")
return True
except TimeoutException:
logger.debug(f"Sliding Captcha 해결 실패, [{attempt_count+1}]/[{max_attempts}]회 재시도 중...")
refresh_button = WebDriverWait(driver, 3).until(
EC.visibility_of_element_located((By.CLASS_NAME, 'errloading')))
act.click(refresh_button).perform()
attempt_count += 1
except Exception as e:
logger.error(f"Sliding Captcha 처리 중 오류 발생: {e}", exc_info=True)
return False
# finally:
# driver.switch_to.default_content()
logger.debug("최대 시도 횟수 초과, Sliding Captcha 해결 실패")
return False
def check_element_visibility(driver, element_id, timeout=5):
try:
WebDriverWait(driver, timeout).until(EC.visibility_of_element_located((By.ID, element_id)))
return True
except TimeoutException:
return False
# #[수정된 내 코드]
# try:
# refresh_button = WebDriverWait(driver, 3).until(
# EC.visibility_of_element_located((By.CLASS_NAME, 'errloading')))
# print("슬라이딩 CAPTCHA 해결 실패, 재시도 중...")
# ActionChains(driver).click(refresh_button).perform()
# except NoSuchElementException:
# try:
# WebDriverWait(driver, 3).until(
# EC.visibility_of_element_located((By.ID, 'baxia-dialog-content')))
# logger.debug("슬라이딩 캡차 재시도 필요")
# except NoSuchElementException:
# logger.debug("슬라이딩 캡차 해결됨")
# if check_for_scratch_captcha(driver):
# logger.debug("스크래치 캡차 발생 감지")
# return handle_scratch_captcha(driver, message_controller)
# return True
# #기존에 에러가 발생했던 코드
# if not driver.find_elements_by_class_name('errloading'):
# if not driver.find_elements_by_id('baxia-dialog-content'):
# logger.debug("슬라이딩 캡차 해결됨")
# if check_for_scratch_captcha(driver):
# logger.debug("스크래치 캡차 발생 감지")
# return handle_scratch_captcha(driver, message_controller)
# return True
# else:
# logger.debug("슬라이딩 캡차 재시도 필요")
# else:
# refresh_button = driver.find_element_by_class_name('errloading')
# logger.debug("슬라이딩 CAPTCHA 해결 실패, 재시도 중...")
# ActionChains(driver).click(refresh_button).perform()
# except Exception as e:
# logger.error(f"슬라이딩 캡차 처리 중 오류 발생: {e}", exc_info=True)
# return False
# finally:
# driver.switch_to.default_content()
def check_for_scratch_captcha(driver):
# 스크래치 캡차 감지 로직
logger.debug("스크래치 캡차 감지 여부 판단 중...")
scratch_captcha_class = 'scratch-captcha-question'
return bool(driver.find_elements(By.CLASS_NAME, scratch_captcha_class))
def setup_driver():
"""
웹 드라이버 설정 및 selenium-stealth 적용
"""
try:
logger.debug("웹드라이버 설정")
options = webdriver.ChromeOptions()
# ua = UserAgent()
# ua.user_agent_list = ua.pc_browsers
# # 랜덤 PC Agent 선택
# random_PC_UA = ua.random
pc_user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.1 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36"
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1 Safari/605.1.15"
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:88.0) Gecko/20100101 Firefox/88.0"
"Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko"
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36"
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36 Edg/88.0.705.63"
"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:78.0) Gecko/20100101 Firefox/78.0"
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0"
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534+ (KHTML, like Gecko) BingPreview/1.0b"
]
# 랜덤 PC 사용자 에이전트 선택
random_PC_UA = random.choice(pc_user_agents)
logger.debug(f"UserAgent : {random_PC_UA}")
options.add_argument(f"--user-agent={random_PC_UA}") # 랜덤 user_agent 사용
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument('--ignore-certificate-errors')
options.add_argument('--ssl-protocol=any')
options.add_argument('--disable-cache')
options.add_argument("--allow-insecure-localhost") # 로컬 SSL 인증서 오류 무시
driver = webdriver.Chrome(options=options)
# selenium-stealth 설정 적용
logger.debug("selenium-stealth 설정 적용")
stealth(driver,
languages=["en-US", "en"],
vendor="Google Inc.",
platform="Win32",
webgl_vendor="Intel Inc.",
renderer="Intel Iris OpenGL Engine",
fix_hairline=True,
)
current_user_agent = driver.execute_script("return navigator.userAgent;")
logger.debug(f"현재 사용 중인 User-Agent: {current_user_agent}")
return driver
except Exception as e:
logger.debug(f"셀레니움 드라이버 설정 중 에러 발생 : {e}", exc_info=True)
logger.error(traceback.format_exc()) # traceback 모듈을 사용하여 예외 정보를 문자열로 변환하여 로그에 출력
def login_and_manage_session(driver):
"""
로그인 및 세션 관리 함수
"""
driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
# 쿠키 로드 및 적용
if load_cookies(driver):
driver.refresh() # 쿠키 로드 후 페이지 새로고침
logger.debug("쿠키를 통해 로그인 상태 복원")
# 로그인 상태 확인 루프
while True:
if check_login_status(driver):
logger.debug("로그인 성공! 서치 웹페이지로 이동")
driver.get("https://world.taobao.com/wow/tmg-fc/tmw/search_image?")
break # 로그인이 확인되면 루프 탈출
else:
logger.debug("로그인이 확인되지 않았습니다. 재로그인을 시도해주세요.")
# driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
time.sleep(5) # 재시도 전에 대기
else:
logger.debug("새로운 로그인 프로세스가 필요함")
driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
while True:
if check_login_status(driver):
logger.debug("새로운 로그인 성공! -> 서치 웹페이지로 이동")
driver.get("https://world.taobao.com/wow/tmg-fc/tmw/search_image?")
break # 로그인이 확인되면 루프 탈출
else:
logger.debug("새로운 로그인이 확인되지 않았습니다. 재로그인을 시도해주세요.")
# driver.get("https://world.taobao.com/wow/z/oversea/SEO-SEM/ovs-pc-login?redirectURL=https%3A%2F%2Fworld.taobao.com%2Fwow%2Ftmg-fc%2Ftmw%2Fsearch_image%3F")
time.sleep(5) # 재시도 전에 대기
# 로그인 완료 후 쿠키 저장
save_cookies(driver)
def send_image_to_clipboard(image_path):
logger.debug(f"이미지 path : {image_path}")
image = Image.open(image_path)
output = BytesIO()
image.convert("RGB").save(output, "BMP")
data = output.getvalue()[14:]
logger.debug("이미지 BMP로 변환")
output.close()
logger.debug("이미지를 클립보드에 복사 시도")
win32clipboard.OpenClipboard()
win32clipboard.EmptyClipboard()
win32clipboard.SetClipboardData(win32clipboard.CF_DIB, data)
win32clipboard.CloseClipboard()
logger.debug("이미지를 클립보드에 복사 완료")
def search_img_with_action(driver, imgurl, isCrop, search_attempts):
try:
if not WebDriverWait(driver, 3).until(EC.invisibility_of_element_located((By.ID, 'baxia-dialog-content'))):
logger.error("CAPTCHA 화면이 활성화되어 있어 검색을 시작할 수 없습니다.")
return False
except Exception as e:
logger.error(f"search_img_with_action에서 캡차 화면 인식 중 에러 : {e}", exc_info=True)
return False
# 이미지를 다운로드하고 클립보드에 복사
logger.debug(f"search_img_with_action IMG_URL: {imgurl}")
try:
response = requests.get(imgurl)
image = Image.open(BytesIO(response.content))
# 이미지가 RGBA 모드인 경우 RGB로 변환
if image.mode == "RGBA":
image = image.convert("RGB")
elif image.mode == "P":
image = image.convert("RGB")
logger.debug(f"isCrop = {isCrop}")
if isCrop >= 1 and isCrop < 7:
image = crop_by_boxline(image, 0.015)
search_attempts += 1
logger.debug(f"1.5% 크롭 : 이미지 수정 [{search_attempts}]회 수행")
elif isCrop >= 7 and isCrop <= 9:
image = rotate_by_angle(image, 2, 'transparent')
search_attempts += 1
logger.debug(f"2% 회전 : 이미지 수정 [{search_attempts}]회 수행")
elif isCrop == 10:
image = mirror_by_image(image)
search_attempts += 1
logger.debug(f"거울반사 : 이미지 수정 [{search_attempts}]회 수행")
elif isCrop == 0:
logger.debug("이미지수정 없음")
search_attempts += 1
temp_path = "./temp_image.jpg"
image.save(temp_path)
send_image_to_clipboard(temp_path)
except Exception as e:
logger.debug(f"이미지 URL에서 다운로드 후 클립보드 복사 과정에서 예외 발생: {e}", exc_info=True)
return False
# 이미지 검색 영역을 클릭하기 위한 액션 체인 생성
try:
search_area_selector = ".rax-textinput"
search_area = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, search_area_selector))
)
# ActionChains(driver).click(search_area).perform()
ActionChains(driver).click_and_hold(search_area).perform()
# 랜덤한 시간을 생성
random_sleep_time = random.uniform(0.2, 1.7)
time.sleep(random_sleep_time)
ActionChains(driver).release().perform()
logger.debug("이미지검색영역 클릭")
# 클립보드의 이미지를 붙여넣기
ActionChains(driver).key_down(Keys.CONTROL).send_keys('v').key_up(Keys.CONTROL).perform()
logger.debug("클립보드의 이미지를 붙여넣기")
# 랜덤한 시간을 생성
random_sleep_time = random.uniform(0.6, 2.4)
time.sleep(random_sleep_time)
# 검색 버튼 클릭
search_button_selector = ".component-preview-button"
search_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, search_button_selector))
)
# ActionChains(driver).click(search_button).perform()
ActionChains(driver).click_and_hold(search_button).perform()
random_sleep_time = random.uniform(1.2, 2.4)
time.sleep(random_sleep_time)
ActionChains(driver).release().perform()
logger.debug("검색버튼 클릭")
# 랜덤한 시간을 생성
random_sleep_time = random.uniform(0.5, 1.6)
time.sleep(random_sleep_time)
return True
except Exception as e:
logger.debug(f"search_img_with_action 과정 중 에러 발생 : {e}", exc_info=True)
return False
def check_first_product(driver):
try:
first_product_CSS = ".rax-view-v2:nth-child(1) > .rax-view-v2 > .mobile--class-1--2Vz4bM4"
WebDriverWait(driver, 3).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, first_product_CSS)))
logger.debug("첫번째 상품을 찾았습니다.")
return True
except TimeoutException:
logger.debug("첫번째 상품을 찾을 수 없습니다.")
return False
def handle_sorry_message(driver):
sorry_message_xpath = "//span[contains(.,'Sorry没有找到相关的宝贝')]"
try:
WebDriverWait(driver, 3).until(EC.visibility_of_element_located((By.XPATH, sorry_message_xpath)))
logger.debug("'Sorry' 발생. 페이지를 새로고침 합니다.")
driver.refresh()
time.sleep(2)
return True
except TimeoutException:
return False
def fetch_and_save_taobao_products(driver, message_controller, imgurl, cursor, db, item_count=5, sort_order=1):
"""
타오바오의 상품을 검색하고 결과를 파싱하는 함수
"""
# 상품 정보를 저장할 리스트 초기화
products = []
search_attempts = 0
max_search_attempts = 15 # 상품 검색을 최대 몇 번까지 재시도할지 설정
refresh_attempts = 0
max_refresh_attempts = 5 # 새로고침 최대 횟수
similarity_threshold = 40 # 유사도 판단 기준 (해밍 거리)
product_found = False
isCrop = 0
try:
defaultFrameName = driver.execute_script("return self.name")
logger.debug(f"기본 프레임 이름 : {defaultFrameName}")
except Exception as e:
logger.error(f"기본 프레임 이름을 얻는 중 에러 발생 : {e}", exc_info=True)
while search_attempts < max_search_attempts:
# 이미지 검색 및 수정 (검색 시도마다 다른 수정 방식 적용)
logger.debug(f"검색 시도 {search_attempts + 1}/{max_search_attempts}")
is_Success_search_action = search_img_with_action(driver, imgurl, isCrop, search_attempts) # 액션체인으로 상품 검색 시작
if not is_Success_search_action:
if not handle_baxiaFrame(driver, defaultFrameName, message_controller):
logger.error("CAPTCHA 처리 실패, 다음 시도로 넘어갑니다.")
search_attempts += 1
continue
if is_Success_search_action:
logger.debug("Success : 상품검색 시도 성공")
logger.debug(f"첫번째 상품 존재 체크 : {product_found}")
product_found = check_first_product(driver) # 첫 번째 상품 확인
else:
logger.debug("Failed : 상품검색 시도 실패")
if product_found:
logger.debug("첫번째 상품 발견")
try:
# 페이지 로딩 대기
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "body"))
)
wait=randint(1,3)
time.sleep(wait) # 또는 더 긴 시간
page_source = driver.page_source # 페이지의 HTML을 가져옴
soup = BeautifulSoup(page_source, 'html.parser') # BeautifulSoup 객체 생성
logger.debug("html파싱")
souped = soup.select('a.mobile--class-1--2Vz4bM4')
except Exception as e:
logger.debug(f"타오바오 상품정보 html 파싱 중 에러발생 : {e}", exc_info=True)
try:
# products.extend(fetch_products(souped, item_count, products, similarity_threshold))
products = fetch_products(souped, item_count, similarity_threshold)
logger.debug("상품정보 반환")
logger.debug(f"상품정보 {products}")
return products
# return [(product['Product URL'], product['Tao_itemID'], product['Image URL'], product['Product Name'], product['Price'], product['Sales Volume']) for product in products]
except Exception as e:
logger.debug("상품정보 반환 중 에러발생")
logger.debug(f"기본값(빈값)을 반환합니다. : {e}", exc_info=True)
products = [{"Product URL": "", "Tao_itemID": "", "Image URL": "", "Product Name": "", "Price": "", "Sales Volume": ""} for _ in range(item_count)]
return products
if handle_baxiaFrame(driver, defaultFrameName, message_controller):
logger.debug("handle_baxiaFrame 해결. 첫번째 상품확인")
product_found = check_first_product(driver) # 첫 번째 상품 확인
continue
if handle_sorry_message(driver):
refresh_attempts += 1
logger.debug(f"Sorry 화면 : 새로고침 [{refresh_attempts}]/[{max_refresh_attempts}]")
if refresh_attempts > max_refresh_attempts:
logger.debug("최대 새로고침횟수 도달 : 이미지 수정 시도")
refresh_attempts = 0
isCrop = randint(1, 10) # 이미지 수정 방법을 무작위로 선택
product_found = check_first_product(driver) # 첫 번째 상품 확인
continue
if not products: # 상품을 찾지 못한 경우 빈 상품 정보 채워 반환
logger.debug(f"상품 검색 횟수 초과 : [{max_search_attempts}]회 실패 => 빈 상품 정보 반환")
products = [{"Product URL": "", "Tao_itemID": "", "Image URL": "", "Product Name": "", "Price": "", "Sales Volume": ""} for _ in range(item_count)]
return products
# while True: # 무한 루프를 시작하여 조건에 따라 재시도를 관리
# logger.debug(f"상품검색 최대횟수 : {max_search_attempts}회")
# found_first_product = False
# isCrop = 0
# cropCount = 0
# # while search_attempts < max_search_attempts and not found_first_product:
# while search_attempts < max_search_attempts:
# #logger.debug(f"상품 검색 시작 : {found_first_product} 상품")
# #search_img(imgurl) # 상품 검색 시작
# image_modification_count = 0 # 이미지 수정 횟수 초기화
# found_first_product = False
# logger.debug(f"액션체인으로 상품 검색 시작 : {found_first_product} 상품")
# search_img_with_action(driver, imgurl, isCrop, cropCount) # 액션체인으로 상품 검색 시작
# max_refresh_attempts = 5
# refresh_attempts = 0
# # while not found_first_product:
# while not found_first_product and image_modification_count < MAX_IMAGE_MODIFICATIONS:
# if check_first_product(driver):
# # 첫 번째 상품이 로드되면 HTML 파싱 수행
# logger.debug("첫 번째 상품을 성공적으로 찾았습니다. HTML 파싱을 시작합니다.")
# found_first_product = True
# break # 첫 번째 상품을 찾았으니 내부 while 루프 탈출
# else:
# # if handle_captcha(driver, message_controller):
# if handle_captcha_for_tao(driver, message_controller):
# logger.debug("캡차 화면입니다. 캡차를 해결합니다.")
# # handle_captcha(driver)
# elif handle_sorry_message(driver) and refresh_attempts < max_refresh_attempts:
# logger.debug("Sorry 화면입니다. 페이지를 새로고침합니다.")
# # driver.refresh()
# # time.sleep(3)
# refresh_attempts += 1
# # elif check_login_status(driver):
# # logger.debug("로그인이 필요한 화면입니다. 로그인을 확인합니다.")
# # # check_login_status(driver)
# else:
# logger.debug("알 수 없는 상태입니다. 상품 검색을 다시 시도합니다.")
# break # 알 수 없는 상태이므로 내부 while 루프를 탈출하여 상품 검색을 재시도
# if refresh_attempts >= max_refresh_attempts:
# logger.debug("최대 새로고침 시도 횟수를 초과했습니다. 상품 검색을 다시 시도합니다.")
# break # 최대 새로고침 횟수를 초과하면 내부 while 루프를 탈출하여 상품 검색을 재시도
# if found_first_product:
# break # 첫 번째 상품을 찾았으므로 전체 while 루프 탈출
# else:
# search_attempts += 1
# isCrop = randint(1,10)
# logger.debug(f"상품 검색 재시도 {search_attempts}/{max_search_attempts}")
# if found_first_product:
# # 성공적으로 첫 번째 상품을 찾은 후의 처리 로직을 여기에 작성
# break # 성공적으로 처리가 완료되면 무한 루프 탈출
# else:
# logger.debug("상품 검색 최대 재시도 횟수를 초과했습니다. 프로세스를 처음부터 다시 시작합니다.")
# # 필요한 경우, 여기에서 추가적인 초기화 작업을 수행할 수 있습니다.
# try:
# # 페이지 로딩 대기
# WebDriverWait(driver, 10).until(
# EC.presence_of_element_located((By.CSS_SELECTOR, "body"))
# )
# wait=randint(2,4)
# time.sleep(wait) # 또는 더 긴 시간
# # 페이지의 HTML을 가져옴
# page_source = driver.page_source
# # BeautifulSoup 객체 생성
# soup = BeautifulSoup(page_source, 'html.parser')
# logger.debug("html파싱")
# souped = soup.select('a.mobile--class-1--2Vz4bM4')
# except Exception as e:
# logger.debug(f"타오바오 상품정보 html 파싱 중 에러발생 : {e}")
# fetch_products(souped, item_count, products, similarity_threshold)
# # 상품 정보 반환
# return [(product['Product URL'], product['Tao_itemID'], product['Image URL'], product['Product Name'], product['Price'], product['Sales Volume']) for product in products]
# # # 이미지 URL로부터 pHash 값을 계산하는 함수
# # def calculate_phash(image_url):
# # try:
# # # 캡차요청 회피를 위한 헤더 재설정
# # headers = {
# # "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36",
# # "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
# # "Accept-Language": "en-US,en;q=0.9",
# # "Accept-Encoding": "gzip, deflate, br",
# # "DNT": "1", # Do Not Track 요청 헤더 (사용자의 추적을 거부)
# # "Connection": "keep-alive",
# # "Upgrade-Insecure-Requests": "1", # https로의 업그레이드를 요청
# # "Cache-Control": "max-age=0", # 캐시된 콘텐츠를 재사용하지 않도록 요청
# # }
# # response = requests.get(image_url, headers=headers)
# # # response = requests.get(image_url)
# # # 이미지 데이터 검증을 위한 임시 파일 저장
# # if response.status_code == 200:
# # with open('temp_image', 'wb') as f:
# # f.write(response.content)
# # if response.status_code == 200 and 'image' in response.headers['Content-Type']:
# # img = Image.open(BytesIO(response.content))
# # # phash = imagehash.phash(img)
# # # return phash
# # return 1
# # else:
# # logger.debug("이미지 로드 실패 또는 잘못된 콘텐츠 타입")
# # logger.debug(response.status_code)
# # logger.debug(response.headers)
# # logger.debug(response.text[:500]) # 본문의 처음 500자 출력
# # return None # 이미지 처리에 실패하면 None 반환
# # except Exception as e:
# # logger.debug(f"이미지 처리 중 오류 발생: {e}")
# # return None # 예외 발생 시 None 반환
# # # 두 이미지 URL의 pHash 값의 차이를 계산하는 함수
# # def compare_images_phash(imgurl, product_imgurl):
# # hash1 = calculate_phash(imgurl)
# # hash2 = calculate_phash(product_imgurl)
# # if hash1 is not None and hash2 is not None:
# # difference = hash1 - hash2
# # return difference
# # else:
# # return None # 해시 계산에 실패한 경우 None 반환
# # def convert_price(price_str):
# # logger.debug(f"convert_price : {price_str}")
# # try:
# # return int(float(price_str))
# # except ValueError:
# # return 0
# # def convert_sales_volume(sales_str):
# # logger.debug(f"convert_sales_volume : {sales_str}")
# # match = re.search(r'(\d+)(万)?\+?', sales_str)
# # if match:
# # num = int(match.group(1))
# # if match.group(2): # '万'이 포함되어 있다면
# # num *= 10000 # 万은 10,000을 의미
# # return num
# # else:
# # return 0
# # def extract_item_id(url):
# # logger.debug(f"extract_item_id : {url}")
# # match = re.search(r'taobao.com/i(\d{10,12})', url)
# # return match.group(1) if match else None