AutoTao/src/playwright_thread.py

151 lines
7.6 KiB
Python

import os
import sys
import random
import asyncio
import logging
import re
from PySide6.QtCore import QThread, Signal
from playwright.async_api import async_playwright, Page
class PlaywrightThread(QThread):
data_collected = Signal(bool, str)
def __init__(self, logger, db_manager):
super().__init__()
self.logger = logger
self.db_manager = db_manager
async def collect_data(self):
try:
async with async_playwright() as p:
# 브라우저 경로 설정
browser_path = None
if getattr(sys, 'frozen', False):
browser_path = os.path.join(os.path.dirname(sys.executable), 'browsers', 'chromium-1112', 'chrome-win', 'chrome.exe')
else:
browser_path = os.path.join(os.path.dirname(__file__), 'browsers', 'chromium-1112', 'chrome-win', 'chrome.exe')
self.logger.log(f"브라우저 경로: {browser_path}", level=logging.DEBUG)
# 사용자 에이전트 설정
user_agent = random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
])
self.logger.log(f"user_agent: {user_agent}", level=logging.DEBUG)
# 브라우저 시작 (headless 모드)
browser = await p.chromium.launch(
headless=True, # headless 모드로 설정
executable_path=browser_path,
args=[
'--disable-popup-blocking',
'--start-maximized',
'--window-size=1920,1080'
]
)
# 시크릿 브라우저 컨텍스트 생성
context = await browser.new_context(
user_agent=user_agent,
geolocation={"latitude": 37.5665, "longitude": 126.9780},
locale="ko-KR",
permissions=["geolocation", "notifications"]
)
# 페이지 열기
page = await context.new_page()
await page.goto("https://world.taobao.com")
self.logger.log(f"타오바오 사이트에 접속했습니다.", level=logging.INFO)
# 페이지 로딩 확인 및 pagedown
await page.wait_for_selector(".tb-pick-content-item") # 상품 카드 로딩 대기
self.logger.log(f"페이지 로딩 완료 - Pagedown을 두 번 누릅니다.", level=logging.INFO)
await page.keyboard.press("PageDown")
await page.wait_for_timeout(1000) # 1초 대기
await page.keyboard.press("PageDown")
await page.wait_for_timeout(2000) # 추가 2초 대기 (상품 로딩)
# 상품 수집
items_data = await self.scrape_items(page)
if items_data:
self.db_manager.insert_items(items_data)
self.data_collected.emit(True, "데이터 수집 완료")
else:
self.data_collected.emit(False, "데이터 수집 실패")
await context.close()
await browser.close()
except Exception as e:
self.logger.log(f"브라우저 작업 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
self.data_collected.emit(False, f"오류 발생: {e}", exec=True)
async def scrape_items(self, page: Page):
try:
items = await page.query_selector_all("div#ice-container div.tb-pick-feeds-container > div")
self.logger.log(f"{len(items)}개의 상품 카드가 발견되었습니다.", level=logging.DEBUG)
items_data = []
for idx, item in enumerate(items):
self.logger.log(f"{idx + 1}번째 상품 카드 처리 중 - XPath 확인", level=logging.DEBUG)
# a 태그를 명시적으로 선택하여 href 가져오기
link_element = await item.query_selector("a.item-link")
if link_element:
item_href = await link_element.get_attribute("href")
self.logger.log(f"{idx + 1}번째 상품 href: {item_href}", level=logging.DEBUG)
if item_href:
# 숫자만 추출하고 9~12자리 필터링
item_id_match = re.search(r'(\d{9,12})', item_href)
if item_id_match:
item_id = item_id_match.group(1)
pc_url = f"https://item.taobao.com/item.htm?id={item_id}"
self.logger.log(f"{idx + 1}번째 상품 ID: {item_id}, 상품 URL: {pc_url}", level=logging.DEBUG)
else:
self.logger.log(f"{idx + 1}번째 상품 ID를 올바르게 추출할 수 없습니다.", level=logging.WARNING)
continue
else:
self.logger.log(f"{idx + 1}번째 상품 ID를 가져올 수 없습니다.", level=logging.WARNING)
continue
else:
self.logger.log(f"{idx + 1}번째 상품의 a 태그를 찾을 수 없습니다.", level=logging.WARNING)
continue
name_element = await item.query_selector(".info-wrapper-title-text")
name = await name_element.inner_text() if name_element else "N/A"
self.logger.log(f"{idx + 1}번째 상품명: {name}", level=logging.DEBUG)
price_element = await item.query_selector(".price-value")
price = await price_element.inner_text() if price_element else "0"
self.logger.log(f"{idx + 1}번째 상품 가격: {price}", level=logging.DEBUG)
image_element = await item.query_selector(".img-wrapper")
image_style = await image_element.get_attribute("style") if image_element else ""
# image_url = image_style.split("url(")[-1].strip('")') if image_style else "N/A"
image_url = (image_style.split("url(")[-1].strip('");').strip("'").replace("//", "https://", 1) if image_style else "N/A")
self.logger.log(f"{idx + 1}번째 상품 이미지 URL: {image_url}", level=logging.DEBUG)
sales_element = await item.query_selector(".month-sale")
sales = await sales_element.inner_text() if sales_element else "0"
self.logger.log(f"{idx + 1}번째 상품 판매량: {sales}", level=logging.DEBUG)
items_data.append((item_id, pc_url, name, float(price), image_url, sales))
self.logger.log(f"수집된 상품 수 : {len(items_data)}", level=logging.DEBUG)
return items_data
except Exception as e:
self.logger.log(f"데이터 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return None
async def wait_for_user(self):
await asyncio.sleep(2)
def run(self):
asyncio.run(self.collect_data())