705 lines
35 KiB
Python
705 lines
35 KiB
Python
# from playwright.sync_api import sync_playwright
|
|
from playwright.async_api import async_playwright
|
|
import re
|
|
import pyautogui
|
|
import time
|
|
import win32gui, win32con
|
|
from bs4 import BeautifulSoup
|
|
import asyncio
|
|
|
|
class BrowserController:
|
|
def __init__(self, app, logger, locator_manager):
|
|
self.app = app
|
|
self.logger = logger
|
|
self.locator_manager = locator_manager
|
|
# self.chrome_window_name = "퍼센티 - 셀러들을 위한 AI 구매대행 솔루션 - Chrome"
|
|
# self.whale_window_name = "새 시크릿 탭 - Whale"
|
|
self.chrome_hwnd = None
|
|
self.whale_hwnd = None
|
|
|
|
self.playwright = None
|
|
self.browser = None
|
|
self.page = None
|
|
|
|
# BrowserController에 해당하는 모든 locator를 정의
|
|
self.chrome_window_name = self.locator_manager.get_locator('BrowserControl', 'chrome_window_name')
|
|
self.login_email_locator = self.locator_manager.get_locator('BrowserControl', 'login_email_locator')
|
|
self.login_password_locator = self.locator_manager.get_locator('BrowserControl', 'login_password_locator')
|
|
self.login_button_locator = self.locator_manager.get_locator('BrowserControl', 'login_button_locator')
|
|
self.admin_toggle_locator = self.locator_manager.get_locator('BrowserControl', 'admin_toggle_locator')
|
|
self.staff_id_locator = self.locator_manager.get_locator('BrowserControl', 'staff_id_locator')
|
|
self.staff_login_button_locator = self.locator_manager.get_locator('BrowserControl', 'staff_login_button_locator')
|
|
self.close_ad_dialog_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_dialog_locator')
|
|
self.close_ad_button_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_button_locator')
|
|
self.total_product_count_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_locator')
|
|
self.product_name_template = self.locator_manager.get_locator('BrowserControl', 'product_name_template')
|
|
self.product_price_template = self.locator_manager.get_locator('BrowserControl', 'product_price_template')
|
|
self.product_image_template = self.locator_manager.get_locator('BrowserControl', 'product_image_template')
|
|
self.product_edit_button = self.locator_manager.get_locator('BrowserControl', 'product_edit_button')
|
|
self.current_page = self.locator_manager.get_locator('BrowserControl', 'current_page')
|
|
self.next_page_button_template = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template')
|
|
self.new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
|
|
self.current_page_locator = self.locator_manager.get_locator('BrowserControl', 'current_page_locator')
|
|
self.source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
|
|
self.ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
|
|
self.option_input_field_locator = self.locator_manager.get_locator('BrowserControl', 'option_input_field_locator')
|
|
self.text_templates = self.locator_manager.selectors.get('DetailPageTextTemplates', {})
|
|
self.title_tab_locator = self.locator_manager.get_locator('BrowserControl', 'title_tab_locator')
|
|
self.option_tab_locator = self.locator_manager.get_locator('BrowserControl', 'option_tab_locator')
|
|
self.price_tab_locator = self.locator_manager.get_locator('BrowserControl', 'price_tab_locator')
|
|
self.tag_tab_locator = self.locator_manager.get_locator('BrowserControl', 'tag_tab_locator')
|
|
self.thumb_tab_locator = self.locator_manager.get_locator('BrowserControl', 'thumb_tab_locator')
|
|
self.detail_tab_locator = self.locator_manager.get_locator('BrowserControl', 'detail_tab_locator')
|
|
self.upload_tab_locator = self.locator_manager.get_locator('BrowserControl', 'upload_tab_locator')
|
|
self.save_button_locator = self.locator_manager.get_locator('BrowserControl', 'save_button_locator')
|
|
|
|
def get_page(self):
|
|
return self.page
|
|
|
|
async def start_browser(self):
|
|
"""크롬 브라우저 실행 및 페이지 로딩"""
|
|
self.logger.debug('크롬 브라우저 실행 중...')
|
|
|
|
# Playwright를 수동으로 실행하여 브라우저 유지
|
|
self.playwright = await async_playwright().start()
|
|
self.browser = await self.playwright.chromium.launch(headless=False) # 브라우저 비헤드리스 모드 실행
|
|
|
|
# 창 크기 설정 (1920x1080)
|
|
context = await self.browser.new_context(
|
|
viewport={"width": 1920, "height": 1080}
|
|
)
|
|
|
|
# 페이지 열기
|
|
self.page = await context.new_page()
|
|
# self.page.goto('https://percenty.co.kr/') # 원하는 페이지로 이동
|
|
await self.page.goto('https://percenty.co.kr/signin') # 원하는 페이지로 이동
|
|
self.logger.debug('newPage 로딩 ...')
|
|
# 사용자는 이 시점에 로그인 및 상세페이지 편집 모드로 들어감
|
|
|
|
# 페이지 제목을 가져와서 창 제목으로 활용
|
|
page_title = await self.page.title()
|
|
self.logger.debug(f'페이지 제목: {page_title}')
|
|
|
|
# 창 핸들 찾기 (동적으로 얻은 페이지 제목 사용)
|
|
self.chrome_hwnd = self.find_window_by_title(page_title)
|
|
if not self.chrome_hwnd:
|
|
self.logger.debug('크롬 창을 찾을 수 없습니다.')
|
|
else:
|
|
self.logger.debug(f'크롬 창 핸들: {self.chrome_hwnd}')
|
|
|
|
# await self.page.wait_for_load_state('networkidle', timeout=10000)
|
|
|
|
async def login(self, admin_id, user_id, admin_password, user_password, is_admin=False):
|
|
"""로그인 처리"""
|
|
self.logger.debug(f'로그인 시도 중: {"관리자" if is_admin else "직원"} 계정')
|
|
|
|
if is_admin:
|
|
# 관리자 로그인 처리
|
|
await self.page.fill(self.login_email_locator, admin_id)
|
|
await self.page.fill(self.login_password_locator, admin_password)
|
|
await self.page.click(self.login_button_locator)
|
|
else:
|
|
# 관리자 토글 버튼을 클릭해서 직원 로그인 화면 활성화
|
|
admin_toggle = self.page.locator(self.admin_toggle_locator)
|
|
if await admin_toggle.get_attribute("aria-checked") == "true":
|
|
await admin_toggle.click() # 관리자 모드에서 직원 모드로 전환
|
|
|
|
await self.page.fill(self.login_email_locator, admin_id)
|
|
await self.page.fill(self.staff_id_locator, user_id)
|
|
await self.page.fill(self.login_password_locator, user_password)
|
|
await self.page.click(self.staff_login_button_locator)
|
|
|
|
self.logger.debug(f'로그인 완료: {"관리자" if is_admin else "직원"} 계정')
|
|
|
|
# await self.page.wait_for_load_state('networkidle', timeout=10000)
|
|
|
|
await self.close_ad_if_exists()
|
|
|
|
async def close_browser(self):
|
|
"""브라우저 종료"""
|
|
if self.browser:
|
|
await self.browser.close()
|
|
await self.playwright.stop()
|
|
self.logger.debug('브라우저 종료됨.')
|
|
|
|
def find_window_by_title(self, window_name):
|
|
"""창 제목을 통해 핸들을 찾는 메서드"""
|
|
def enum_windows_callback(hwnd, result):
|
|
if win32gui.IsWindowVisible(hwnd) and window_name in win32gui.GetWindowText(hwnd):
|
|
result.append(hwnd)
|
|
result = []
|
|
win32gui.EnumWindows(enum_windows_callback, result)
|
|
return result[0] if result else None
|
|
|
|
def switch_to_chrome(self):
|
|
"""크롬으로 포커스 전환"""
|
|
if self.chrome_hwnd:
|
|
win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE)
|
|
win32gui.SetForegroundWindow(self.chrome_hwnd)
|
|
self.logger.debug('크롬 창으로 포커스 이동.')
|
|
else:
|
|
self.logger.debug('크롬 창을 찾을 수 없습니다.')
|
|
|
|
|
|
async def get_total_product_count_ori(self):
|
|
try:
|
|
# JavaScript로 해당 요소의 텍스트를 가져옴
|
|
element_text = await self.page.evaluate('''() => {
|
|
let element = document.querySelector('#root > div > div > div > div > main > div > div.sc-ezreuY.kYrYVh > div.sc-dChVcU.cRrUlt > div.sc-izQBue.dxiUJm > div > div:nth-child(1) > label > span:nth-child(2)');
|
|
return element ? element.innerText : null;
|
|
}''')
|
|
|
|
if element_text:
|
|
self.logger.debug(f"가져온 텍스트: {element_text}") # 텍스트 확인용 로그
|
|
# "총 xx개 상품"에서 숫자만 추출
|
|
count = int(''.join(filter(str.isdigit, element_text)))
|
|
return count
|
|
else:
|
|
self.logger.debug("요소를 찾을 수 없습니다.")
|
|
return 0
|
|
except Exception as e:
|
|
self.logger.debug(f"상품 수를 가져오는 중 오류 발생: {e}", exc_info=True)
|
|
return 0
|
|
|
|
async def get_total_product_count(self):
|
|
try:
|
|
# Python 변수를 JavaScript로 전달하여 요소의 텍스트를 가져옴
|
|
element_text = await self.page.evaluate(f'''(selector) => {{
|
|
let element = document.querySelector(selector);
|
|
return element ? element.innerText : null;
|
|
}}''', self.total_product_count_locator)
|
|
|
|
if element_text:
|
|
self.logger.debug(f"가져온 텍스트: {element_text}") # 텍스트 확인용 로그
|
|
# "총 xx개 상품"에서 숫자만 추출
|
|
count = int(''.join(filter(str.isdigit, element_text)))
|
|
return count
|
|
else:
|
|
self.logger.debug("요소를 찾을 수 없습니다.")
|
|
return 0
|
|
except Exception as e:
|
|
self.logger.debug(f"상품 수를 가져오는 중 오류 발생: {e}", exc_info=True)
|
|
return 0
|
|
|
|
|
|
# async def get_product_name(self, index, selector='xpath'):
|
|
# """
|
|
# 상품명을 수집하는 메서드
|
|
# index : 상품명을 수집하는 인덱스
|
|
# selector : 수집방법 (css 또는 xpath)
|
|
# """
|
|
# try:
|
|
# # config.ini에서 설정된 선택자에 인덱스를 적용하여 가져옴
|
|
# # product_name_selector = self.product_name_template.format(index=index)
|
|
# # self.logger.debug(f"사용된 선택자: {product_name_selector}") # 선택자 출력
|
|
|
|
# product_name_xpath_selector = self.product_name_template_xpath.format(index=index)
|
|
# self.logger.debug(f"사용된 선택자: {product_name_xpath_selector}") # 선택자 출력
|
|
|
|
|
|
# # XPath 기반으로 요소 검색
|
|
# product_name_element = await self.page.locator(f"xpath={product_name_xpath_selector}").element_handle()
|
|
|
|
# # product_name_element = await self.page.query_selector(product_name_selector)
|
|
|
|
# # product_name_element가 None인지 확인
|
|
# if product_name_element is None:
|
|
# self.logger.error(f"상품명 요소를 찾을 수 없습니다: index {index}")
|
|
# return "수집 오류 발생"
|
|
|
|
# # 요소가 존재할 경우, inner_text 수집
|
|
# product_name = await product_name_element.inner_text()
|
|
# return product_name.strip()
|
|
# except Exception as e:
|
|
# self.logger.error(f"상품명 수집 중 오류: {e}")
|
|
# return "수집 오류 발생"
|
|
|
|
|
|
|
|
def fetch_image_urls(self, html_content):
|
|
"""
|
|
HTML 콘텐츠에서 모든 <img> 태그의 URL을 순서대로 추출하고 중복 제거.
|
|
"""
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
# 중복된 이미지를 제거하기 위해 set 사용
|
|
image_urls_set = set()
|
|
|
|
# class="image_resized"를 가진 모든 <img> 태그 찾기
|
|
images_resized = soup.find_all('img', class_='image_resized')
|
|
for img in images_resized:
|
|
if img and 'src' in img.attrs:
|
|
image_urls_set.add(img['src']) # 중복을 방지하기 위해 set에 추가
|
|
|
|
# <figure class="image"> 내부의 모든 <img> 태그 찾기
|
|
figures = soup.find_all('figure', class_='image')
|
|
for figure in figures:
|
|
img_tag = figure.find('img')
|
|
if img_tag and 'src' in img_tag.attrs:
|
|
image_urls_set.add(img_tag['src']) # 중복을 방지하기 위해 set에 추가
|
|
|
|
# set을 list로 변환하여 반환 (순서 유지가 필요하면 set 대신 리스트로 처리해야 함)
|
|
image_urls = list(image_urls_set)
|
|
return image_urls
|
|
|
|
|
|
async def close_ad_if_exists(self):
|
|
|
|
"""광고 다이얼로그가 있으면 닫기 버튼을 클릭하는 메서드"""
|
|
try:
|
|
# 광고 다이얼로그가 나타날 때까지 기다림
|
|
await self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=3000)
|
|
self.logger.debug("다이얼로그가 발견되었습니다. 닫기 버튼을 클릭합니다.")
|
|
|
|
# 닫기 버튼 클릭
|
|
close_button = await self.page.query_selector(self.close_ad_button_locator)
|
|
if close_button:
|
|
await close_button.click()
|
|
self.logger.debug("다이얼로그를 성공적으로 닫았습니다.")
|
|
else:
|
|
self.logger.debug("닫기 버튼을 찾지 못했습니다.")
|
|
|
|
except Exception as e:
|
|
# 다이얼로그가 없거나 다른 문제가 발생한 경우
|
|
self.logger.debug(f"다이얼로그가 발견되지 않았거나 오류 발생: {e}", exc_info=True)
|
|
|
|
async def go_to_new_product_page(self):
|
|
"""신규 상품 등록 페이지로 이동"""
|
|
try:
|
|
new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
|
|
await self.page.click(new_product_page_locator)
|
|
self.logger.debug("신규 상품 등록 페이지로 이동 완료.")
|
|
except Exception as e:
|
|
self.logger.debug(f"신규 상품 등록 페이지 이동 중 오류: {e}", exc_info=True)
|
|
|
|
async def get_product_edit_buttons(self):
|
|
"""현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기"""
|
|
try:
|
|
# 버튼 선택자를 가져옴
|
|
edit_button_selector = self.product_edit_button
|
|
|
|
if not edit_button_selector:
|
|
self.logger.debug("상품 수정 버튼의 선택자를 찾을 수 없습니다.")
|
|
return []
|
|
|
|
# 선택자를 사용해 버튼 객체를 찾음
|
|
buttons = self.page.locator(edit_button_selector)
|
|
|
|
# 버튼이 존재하는지 확인
|
|
if await buttons.count() == 0:
|
|
self.logger.debug("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.")
|
|
return []
|
|
|
|
count = await buttons.count()
|
|
self.logger.debug(f"수정할 상품 개수: {count}")
|
|
|
|
# 모든 버튼을 리스트로 반환
|
|
return [buttons.nth(i) for i in range(count)]
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"상품 수정 버튼을 찾는 중 오류: {e}", exc_info=True)
|
|
return []
|
|
|
|
async def get_product_edit_buttons_by_templete(self):
|
|
"""현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기"""
|
|
try:
|
|
# 버튼 선택자 설정
|
|
edit_button_selector_template = f'//button[span[text()="세부사항 수정 및 업로드"]]'
|
|
|
|
# 선택자를 사용해 버튼 객체를 찾음
|
|
buttons = self.page.locator(edit_button_selector_template)
|
|
|
|
# 버튼이 존재하는지 확인
|
|
button_count = await buttons.count()
|
|
if button_count == 0:
|
|
self.logger.debug("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.")
|
|
return []
|
|
|
|
self.logger.debug(f"현재 페이지의 수정할 상품 개수: {button_count}")
|
|
|
|
# 모든 버튼을 리스트로 반환
|
|
return [buttons.nth(i) for i in range(button_count)]
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"상품 수정 버튼을 찾는 중 오류: {e}", exc_info=True)
|
|
return []
|
|
|
|
|
|
async def click_modify_button_by_text(self, index):
|
|
"""인덱스에 해당하는 '세부사항 수정 및 업로드' 버튼 클릭"""
|
|
try:
|
|
# config.ini에서 선택자 가져오기
|
|
button_template = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template')
|
|
button_selector = f'({button_template})[{index}]'
|
|
|
|
button = await self.page.query_selector(button_selector)
|
|
|
|
# 버튼이 화면에 보이도록 스크롤 후 클릭
|
|
if button:
|
|
await button.scroll_into_view_if_needed()
|
|
await self.page.evaluate('arguments[0].click();', button)
|
|
self.logger.debug(f'{index}번째 상품의 수정 버튼 클릭 완료')
|
|
else:
|
|
self.logger.debug(f'{index}번째 상품의 수정 버튼을 찾지 못했습니다.')
|
|
except Exception as e:
|
|
self.logger.debug(f'{index}번째 상품의 수정 버튼 클릭 중 오류: {str(e)}')
|
|
|
|
|
|
async def open_product_edit_dialog(self, button):
|
|
"""상품 수정 다이얼로그 열기"""
|
|
try:
|
|
# 요소가 화면에 없을 경우 스크롤하여 보이도록 함
|
|
await button.scroll_into_view_if_needed()
|
|
self.logger.debug("상품의 '세부사항 수정 및 업로드' 버튼을 화면에 보이도록 스크롤.")
|
|
|
|
await button.click()
|
|
self.logger.debug("세부사항 수정 다이얼로그 열기 완료.")
|
|
await self.page.wait_for_selector('div.ant-tabs-nav') # 다이얼로그가 완전히 로딩될 때까지 기다림
|
|
except Exception as e:
|
|
self.logger.debug(f"세부사항 수정 다이얼로그 열기 중 오류: {e}", exc_info=True)
|
|
|
|
async def click_detail_tab(self):
|
|
"""상세페이지 탭 클릭"""
|
|
try:
|
|
await self.page.click(self.detail_tab_locator)
|
|
self.logger.debug("상세페이지 탭 클릭 완료.")
|
|
except Exception as e:
|
|
self.logger.debug(f"상세페이지 탭 클릭 중 오류: {e}", exc_info=True)
|
|
|
|
async def click_option_tab(self):
|
|
"""옵션 탭 클릭"""
|
|
try:
|
|
await self.page.click(self.option_tab_locator)
|
|
self.logger.debug("옵션 탭 클릭 완료.")
|
|
except Exception as e:
|
|
self.logger.debug(f"옵션 탭 클릭 중 오류: {e}", exc_info=True)
|
|
|
|
async def click_price_tab(self):
|
|
"""가격 탭 클릭"""
|
|
try:
|
|
await self.page.click(self.price_tab_locator)
|
|
self.logger.debug("가격 탭 클릭 완료.")
|
|
except Exception as e:
|
|
self.logger.debug(f"가격 탭 클릭 중 오류: {e}", exc_info=True)
|
|
|
|
async def click_title_tab(self):
|
|
"""상품명 탭 클릭"""
|
|
try:
|
|
await self.page.click(self.title_tab_locator)
|
|
self.logger.debug("상품명 탭 클릭 완료.")
|
|
except Exception as e:
|
|
self.logger.debug(f"상품명 탭 클릭 중 오류: {e}", exc_info=True)
|
|
|
|
async def extract_image_urls(self, optionHandler, is_option_data=False):
|
|
"""상세페이지에서 이미지 URL 추출"""
|
|
try:
|
|
# 소스 편집 모드로 전환
|
|
source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
|
|
ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
|
|
|
|
# 소스 편집 모드로 전환
|
|
await self.page.click(source_button_locator)
|
|
self.logger.debug("소스 버튼 클릭 완료.")
|
|
|
|
|
|
# 'data-value' 속성 값을 추출 (textarea 요소)
|
|
textarea = await self.page.wait_for_selector(ck_source_editing_area_locator)
|
|
data_value = await textarea.get_attribute("data-value")
|
|
|
|
|
|
# HTML 소스에서 이미지 URL 추출
|
|
image_urls = self.fetch_image_urls(data_value)
|
|
self.logger.debug(f'추출된 이미지 URL 수: {len(image_urls)}')
|
|
|
|
# HTML 소스에서 이미지 URL 삭제
|
|
self.logger.debug('img 태그를 삭제 중...')
|
|
data_value_element = await self.page.query_selector(ck_source_editing_area_locator)
|
|
new_value = ""
|
|
if data_value_element:
|
|
await self.page.evaluate(f'() => document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", "{new_value}")')
|
|
updated_value = await data_value_element.get_attribute('data-value')
|
|
self.logger.debug(f'Updated data-value: {updated_value}')
|
|
else:
|
|
self.logger.debug('Element with data-value not found.')
|
|
self.logger.debug('img 태그 삭제 완료.')
|
|
|
|
# img 태그의 class 삭제 후 다시 소스 버튼 클릭
|
|
await self.page.click(source_button_locator)
|
|
self.logger.debug('소스 버튼 재 클릭 완료.')
|
|
|
|
|
|
if is_option_data:
|
|
self.logger.debug('옵션 데이터 입력 시작')
|
|
option_data = optionHandler.get_selected_translated_options()
|
|
|
|
# 옵션 입력 필드 선택
|
|
input_field_locator = self.locator_manager.get_locator('BrowserControl', 'option_input_field_locator')
|
|
input_field = await self.page.wait_for_selector(input_field_locator)
|
|
|
|
# 선두부 텍스트 입력
|
|
for key in sorted(self.text_templates.keys()):
|
|
leading_text = self.text_templates[key]
|
|
if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
|
|
await input_field.type(leading_text)
|
|
await input_field.press('Enter')
|
|
await input_field.press('Enter')
|
|
self.logger.debug(f"{key} 텍스트 입력 완료: {leading_text}")
|
|
|
|
# 각 옵션을 한 줄씩 입력
|
|
for i, option in enumerate(option_data, start=1):
|
|
if isinstance(option, tuple):
|
|
option_text = option[0] # 튜플의 첫 번째 요소를 사용
|
|
else:
|
|
option_text = option
|
|
|
|
# 옵션을 A. B. 등으로 표시하며 입력
|
|
# option_prefix = f"{chr(64 + i)}. " # A, B, C...
|
|
option_prefix = f"- {chr(64 + i)}. " # 마크다운 목록 A, B, C...
|
|
await input_field.type(option_prefix + option_text)
|
|
await input_field.press('Enter') # 엔터 키를 입력하여 줄바꿈
|
|
|
|
# 후두부 텍스트 입력
|
|
await input_field.type('---')
|
|
await input_field.press('Enter')
|
|
await input_field.type('나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.')
|
|
await input_field.press('Enter')
|
|
await input_field.type('---')
|
|
await input_field.press('Enter')
|
|
await input_field.press('Enter')
|
|
await input_field.press('Enter')
|
|
|
|
self.logger.debug('옵션 데이터 입력 완료 후 엔터 입력')
|
|
|
|
return image_urls
|
|
except Exception as e:
|
|
self.logger.debug(f"이미지 URL 추출 중 오류: {e}", exc_info=True)
|
|
return []
|
|
|
|
async def paste_image_in_chrome(self, clipboardImageManager, url):
|
|
"""크롬으로 포커스를 옮기고 클립보드의 이미지를 붙여넣고 엔터 입력"""
|
|
try:
|
|
self.switch_to_chrome() # 크롬으로 포커스 이동
|
|
await clipboardImageManager.process_clipboard(url) # 클립보드 내용을 처리
|
|
# clipboard_content = pyperclip.paste()
|
|
if clipboardImageManager.is_clipboard_image():
|
|
pyautogui.hotkey('ctrl', 'v') # 클립보드 이미지 붙여넣기
|
|
pyautogui.press('right') # 오른쪽 입력
|
|
self.logger.debug("이미지 붙여넣기 완료.")
|
|
self.logger.debug("이미지 붙여넣기 완료로 클립보드 비우기.")
|
|
await clipboardImageManager.clear_clipboard()
|
|
else:
|
|
self.logger.debug("클립보드가 비어있습니다.")
|
|
except Exception as e:
|
|
self.logger.debug(f"이미지 붙여넣기 중 오류: {e}", exc_info=True)
|
|
|
|
async def save_and_ecs_product_edit(self):
|
|
"""상품 수정 후 저장 버튼 클릭"""
|
|
try:
|
|
await self.page.click(self.save_button_locator)
|
|
await self.page.keyboard.press("Escape")
|
|
self.logger.debug("상품 수정 내용 저장 및 ECS 완료.")
|
|
except Exception as e:
|
|
self.logger.debug(f"저장 버튼 클릭 중 오류: {e}", exc_info=True)
|
|
|
|
async def save_product_edit(self):
|
|
"""상품 수정 후 저장 버튼 클릭"""
|
|
try:
|
|
await self.page.click(self.save_button_locator)
|
|
self.logger.debug("상품 수정 내용 저장 완료.")
|
|
except Exception as e:
|
|
self.logger.debug(f"저장 버튼 클릭 중 오류: {e}", exc_info=True)
|
|
|
|
async def go_to_next_page(self):
|
|
"""다음 페이지로 이동"""
|
|
try:
|
|
# 현재 페이지가 몇 번째 페이지인지 확인 (클래스에 'ant-pagination-item-active'가 있는 요소)
|
|
current_page = await self.page.query_selector(self.current_page_locator)
|
|
|
|
if not current_page:
|
|
self.logger.debug("현재 페이지 정보를 찾을 수 없습니다.")
|
|
return False
|
|
|
|
# 현재 활성화된 페이지 번호를 가져옴
|
|
current_page_number = int(await current_page.get_attribute("title"))
|
|
next_page_number = current_page_number + 1
|
|
|
|
# 다음 페이지 버튼을 찾음 (title 속성으로 다음 페이지를 찾음)
|
|
next_page_button_locator = self.next_page_button_template.format(page_number=next_page_number)
|
|
next_page_button = await self.page.query_selector(next_page_button_locator)
|
|
|
|
if next_page_button:
|
|
await next_page_button.click() # 페이지 버튼 클릭
|
|
# await self.page.wait_for_load_state('domcontentloaded') # 페이지 로딩이 완료될 때까지 대기
|
|
self.logger.debug(f"페이지 {next_page_number}로 이동 완료.")
|
|
return True
|
|
else:
|
|
self.logger.debug("다음 페이지가 없습니다.")
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"다음 페이지로 이동 중 오류 발생: {e}", exc_info=True)
|
|
return False
|
|
|
|
def switch_to_chrome(self):
|
|
"""크롬으로 포커스 전환"""
|
|
try:
|
|
if not self.chrome_hwnd:
|
|
self.chrome_hwnd = self.find_window_by_title(self.chrome_window_name)
|
|
if self.chrome_hwnd:
|
|
win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE)
|
|
win32gui.SetForegroundWindow(self.chrome_hwnd)
|
|
self.logger.debug('크롬 창으로 포커스 이동.')
|
|
else:
|
|
self.logger.debug('크롬 창을 찾을 수 없습니다.')
|
|
except Exception as e:
|
|
self.logger.debug(f"크롬 포커스 전환 중 오류: {e}", exc_info=True)
|
|
|
|
async def scroll_with_wheel(self, direction="down", pause_time=0.5, max_scrolls=50):
|
|
"""
|
|
휠 스크롤을 사용하여 페이지를 위나 아래로 천천히 스크롤.
|
|
|
|
Parameters:
|
|
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
|
|
- pause_time: 스크롤 사이의 대기 시간 (초).
|
|
- max_scrolls: 최대 스크롤 횟수.
|
|
"""
|
|
scroll_count = 0
|
|
|
|
self.logger.debug(f"스크롤 시작")
|
|
|
|
# 현재 페이지 높이 가져오기
|
|
last_height = await self.page.evaluate("document.body.scrollHeight")
|
|
self.logger.debug(f"현재 페이지 높이 가져오기 - {last_height}")
|
|
|
|
while scroll_count < max_scrolls:
|
|
if direction == "down":
|
|
# 아래로 스크롤
|
|
self.logger.debug(f"scroll_count[{scroll_count}]회 : 휠 아래로 1000px")
|
|
await self.page.evaluate("window.scrollBy(0, 1000);")
|
|
elif direction == "up":
|
|
# 위로 스크롤
|
|
self.logger.debug(f"scroll_count[{scroll_count}]회 : 휠 위로 1000px")
|
|
await self.page.evaluate("window.scrollBy(0, -1000);")
|
|
else:
|
|
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
|
|
|
|
self.logger.debug(f"pause_time 슬립 : {pause_time}")
|
|
await asyncio.sleep(pause_time)
|
|
|
|
# 새로운 페이지 높이 가져오기
|
|
new_height = await self.page.evaluate("document.body.scrollHeight")
|
|
self.logger.debug(f"새로운 페이지 높이 가져오기 - {new_height}")
|
|
|
|
# 스크롤이 더 이상 필요 없는 경우(페이지 끝에 도달)
|
|
if direction == "down" and new_height == last_height:
|
|
self.logger.debug(f"페이지 끝에 도달했습니다. 스크롤 횟수: {scroll_count}")
|
|
break
|
|
elif direction == "up" and new_height == 0:
|
|
self.logger.debug(f"페이지 시작에 도달했습니다. 스크롤 횟수: {scroll_count}")
|
|
break
|
|
|
|
self.logger.debug(f"새로운 페이지 높이를 현재높이로 재설정 - {new_height}")
|
|
last_height = new_height
|
|
scroll_count += 1
|
|
self.logger.debug(f"스크롤 카운트 + 1")
|
|
|
|
if scroll_count == max_scrolls:
|
|
self.logger.debug("최대 스크롤 횟수에 도달했습니다.")
|
|
|
|
async def scroll_with_keyboard(self, direction="down", pause_time=0.5, max_scrolls=50):
|
|
"""
|
|
키보드를 사용하여 페이지를 위나 아래로 천천히 스크롤.
|
|
|
|
Parameters:
|
|
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
|
|
- pause_time: 스크롤 사이의 대기 시간 (초).
|
|
- max_scrolls: 최대 스크롤 횟수.
|
|
"""
|
|
scroll_count = 0
|
|
|
|
while scroll_count < max_scrolls:
|
|
if direction == "down":
|
|
# 아래로 스크롤 (Page Down 키 사용)
|
|
await self.page.keyboard.press("PageDown")
|
|
elif direction == "up":
|
|
# 위로 스크롤 (Page Up 키 사용)
|
|
await self.page.keyboard.press("PageUp")
|
|
else:
|
|
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
|
|
|
|
await asyncio.sleep(pause_time)
|
|
|
|
scroll_count += 1
|
|
|
|
if scroll_count == max_scrolls:
|
|
self.logger.debug("최대 스크롤 횟수에 도달했습니다.")
|
|
|
|
|
|
# async def collect_product_info(self):
|
|
# """
|
|
# 상품 정보를 수집하는 메서드
|
|
# """
|
|
# try:
|
|
# # 페이지를 아래로 스크롤하여 모든 상품 로드
|
|
# await self.scroll_with_wheel('down')
|
|
# await self.scroll_with_wheel('up')
|
|
|
|
# product_infos = []
|
|
# for i in range(1, 51): # 1부터 최대 50까지 상품 처리
|
|
# try:
|
|
# # 각 상품의 CSS 선택자를 동적으로 생성하여 접근
|
|
# product_name_locator = self.product_name_template.format(i=i)
|
|
# product_price_locator = self.product_price_template.format(i=i)
|
|
# product_image_locator = self.product_image_template.format(i=i)
|
|
|
|
# product_name_element = await self.page.query_selector(product_name_locator)
|
|
# product_price_element = await self.page.query_selector(product_price_locator)
|
|
# product_image_element = await self.page.query_selector(product_image_locator)
|
|
|
|
# if product_name_element and product_price_element and product_image_element:
|
|
# product_info = {
|
|
# "name": await product_name_element.text_content().strip(),
|
|
# "price": await product_price_element.text_content().strip(),
|
|
# "image_url": await product_image_element.get_attribute('src')
|
|
# }
|
|
# self.logger.debug(f"상품 {i}: {product_info}")
|
|
# product_infos.append(product_info)
|
|
# except Exception as e:
|
|
# self.logger.error(f"상품 {i} 정보 수집 중 오류 발생: {e}", exc_info=True)
|
|
# continue
|
|
|
|
# return product_infos
|
|
# except Exception as e:
|
|
# self.logger.error(f"상품 정보 수집 중 오류 발생: {e}", exc_info=True)
|
|
# return []
|
|
|
|
|
|
async def scroll_page_to_bottom(self, pause_time=0.2):
|
|
"""페이지의 맨 아래까지 스크롤하여 모든 동적 요소를 로드"""
|
|
self.logger.debug('페이지 스크롤 시작...')
|
|
previous_height = await self.page.evaluate("() => document.body.scrollHeight")
|
|
|
|
while True:
|
|
await self.page.evaluate("window.scrollBy(0, window.innerHeight);") # 한 화면씩 스크롤
|
|
await asyncio.sleep(pause_time) # 페이지 로딩 대기
|
|
current_height = await self.page.evaluate("() => document.body.scrollHeight")
|
|
if current_height == previous_height:
|
|
break # 더 이상 스크롤할 내용이 없으면 종료
|
|
previous_height = current_height
|
|
self.logger.debug('페이지 스크롤 완료.')
|
|
|
|
async def scroll_page_to_top(self, pause_time=0.2):
|
|
"""페이지의 맨 위까지 스크롤"""
|
|
self.logger.debug('페이지 위로 스크롤 시작...')
|
|
previous_height = await self.page.evaluate("() => window.pageYOffset")
|
|
|
|
while previous_height > 0:
|
|
await self.page.evaluate("window.scrollBy(0, -window.innerHeight);") # 한 화면씩 위로 스크롤
|
|
await asyncio.sleep(pause_time) # 페이지 로딩 대기
|
|
current_height = await self.page.evaluate("() => window.pageYOffset")
|
|
if current_height == previous_height:
|
|
break # 더 이상 스크롤할 내용이 없으면 종료
|
|
previous_height = current_height
|
|
|
|
self.logger.debug('페이지 위로 스크롤 완료.')
|