AutoPercenty3/test/shoppingScapper/ns_shopping.py

195 lines
8.9 KiB
Python

from pywinauto import Application, findwindows, timings
from pywinauto.timings import wait_until
import time, logging, os, re
logger = logging.getLogger(__name__)
class ShoppingLensScraper:
def __init__(self, whale_exe_path=None, user_data_dir=None, cache_dir=None, logger=None):
# self.whale_exe_path = whale_exe_path
# self.user_data_dir = user_data_dir
# self.cache_dir = cache_dir
self.logger = logger or logging.getLogger(__name__)
def save_control_identifiers(self, window, output_file="debug_controls.txt", logger=None):
"""
특정 윈도우의 컨트롤 식별자를 파일로 저장합니다.
:param window: 탐지 대상 윈도우
:param output_file: 출력 파일 경로
:param logger: 로깅 객체 (선택적)
"""
try:
with open(output_file, "w", encoding="utf-8") as f:
original_stdout = os.sys.stdout # 기존 stdout 백업
os.sys.stdout = f # stdout을 파일로 변경
window.print_control_identifiers() # 컨트롤 식별자 출력
os.sys.stdout = original_stdout # stdout 복원
if logger:
logger.info(f"컨트롤 식별자가 {output_file}에 저장되었습니다.")
else:
print(f"컨트롤 식별자가 {output_file}에 저장되었습니다.")
except Exception as e:
if logger:
logger.error(f"컨트롤 식별자를 저장하는 중 오류 발생: {e}")
else:
print(f"컨트롤 식별자를 저장하는 중 오류 발생: {e}")
def search(self, whale_window, image_url, max_retries=3):
"""네이버 웨일을 이용하여 쇼핑 렌즈 검색 수행"""
for attempt in range(max_retries):
try:
print(f"검색 시도 {attempt + 1}/{max_retries}")
# URL 이동
print("주소창으로 이동")
address_bar = whale_window.child_window(title="주소창 및 검색창", control_type="Edit")
address_bar.click_input()
for chunk in [image_url[i:i + 10] for i in range(0, len(image_url), 10)]:
address_bar.type_keys(chunk, with_spaces=True)
address_bar.type_keys("{ENTER}")
time.sleep(2) # 페이지 로딩 대기
# 이미지 우클릭 및 쇼핑 렌즈 실행
image = whale_window.child_window(control_type="Image")
image.right_click_input()
print("쇼핑 렌즈 검색")
menu_item = whale_window.child_window(title="쇼핑렌즈로 검색하기", control_type="MenuItem")
menu_item.click_input()
print("쇼핑 렌즈 검색 결과 기다리기")
timings.wait_until(10, 0.5, lambda: whale_window.child_window(control_type="List", found_index=0).exists())
time.sleep(0.5)
# control_type="Document" 요소 검색
document_elements = whale_window.descendants(control_type="Document")
if not document_elements:
raise RuntimeError("Document 요소를 찾을 수 없습니다.")
type_index = None
for document in document_elements:
title_text = document.window_text() # Document의 title 텍스트 가져오기
if "쇼핑렌즈 검색결과" in title_text:
type_index = 1 # "쇼핑렌즈 검색결과" 포함 시 type_index=1
break
elif "본문 바로가기" in title_text:
type_index = 0 # "본문 바로가기" 포함 시 type_index=0
break
else:
type_index = None
if type_index is None:
raise RuntimeError("적절한 type_index를 결정할 수 없습니다.")
print(f"결정된 type_index: {type_index}")
list_box = whale_window.child_window(control_type="List", found_index=type_index)
listbox_texts = list_box.texts()
products_infos = self.process_listbox_data(listbox_texts, type_index)
print(f"스크래핑 완료. 추출된 데이터 : {len(products_infos)}\n {products_infos}")
self.click_back_and_img_buttons(whale_window)
return products_infos
except Exception as e:
print(f"검색 시도 {attempt + 1}/{max_retries} 실패: {e}")
if attempt == max_retries - 1:
print("최대 재시도 횟수에 도달했습니다.")
try:
# 검색실패시 해당시점의 컨트롤식별자 저장
self.save_control_identifiers(whale_window, "debug_controls.txt", self.logger)
print("컨트롤 식별자가 저장되었습니다.")
except Exception as ee:
print(f"컨트롤 식별자 저장 중 오류 발생: {ee}")
return []
time.sleep(2) # 재시도 전 대기
def click_back_and_img_buttons(self, window):
"""
뒤로가기 버튼을 클릭하는 함수
:param window: pywinauto WindowSpecification 객체
"""
try:
# 뒤로가기 버튼 클릭
back_button = window.child_window(title="뒤로", control_type="Button")
img_button = window.child_window(title="누락된 이미지 설명을 확인하려면 컨텍스트 메뉴를 여세요.", control_type="Image")
if back_button.exists():
print("이미지버튼을 클릭합니다.")
img_button.click_input()
print("뒤로가기 버튼을 클릭합니다.")
back_button.click_input()
else:
print("버튼을 찾을 수 없습니다.")
except Exception as e:
print(f"버튼 클릭 중 오류 발생: {e}")
def process_listbox_data(self, listbox_texts, type_index, max_product_count=5):
"""
list_box.texts() 데이터를 가공하여 상품 정보를 추출합니다.
:param listbox_texts: ListBox 텍스트 리스트
:param type_index: 0 또는 1로 구분된 데이터 타입
:return: 가공된 상품 정보 리스트 (최대 5개)
"""
processed_data = []
if type_index == 0:
for idx, text_list in enumerate(listbox_texts):
if idx >= max_product_count: # 최대 max_product_count개의 상품만 처리
break
try:
# 리스트 형태인지 확인 후 처리
if isinstance(text_list, list):
text = " ".join(text_list) # 리스트를 문자열로 변환
else:
text = text_list
# 상품명 추출 (반복된 단어 찾아서 처리)
match = re.search(r"(.*?)\s\1", text)
product_name = match.group(1) if match else text.split()[0]
# 가격 정보 추출
prices = [int(price.replace(",", "")) for price in re.findall(r"(\d{1,3}(?:,\d{3})*)원", text)]
product_price = prices[0] if prices else 0
shipping_price = prices[1] if len(prices) > 1 else 0
processed_data.append({
"상품명": product_name,
"상품가격": product_price,
"배송비": shipping_price
})
except Exception as e:
print(f"데이터 처리 중 오류 발생 (type_index=0): {text_list} -> {e}")
elif type_index == 1:
for idx, text_list in enumerate(listbox_texts):
if idx >= max_product_count: # 최대 max_product_count 개의 상품만 처리
break
try:
if isinstance(text_list, list):
product_name = re.sub(r"[^\w\s]", "", text_list[0]).strip()
product_price = int(text_list[2].replace(",", ""))
else:
raise ValueError("type_index=1 데이터가 리스트 형식이 아님")
shipping_price = 0
processed_data.append({
"상품명": product_name,
"상품가격": product_price,
"배송비": shipping_price
})
except Exception as e:
print(f"데이터 처리 중 오류 발생 (type_index=1): {text_list} -> {e}")
else:
raise ValueError(f"알 수 없는 type_index 값: {type_index}")
return processed_data