AutoPercenty3/browser_control.py

898 lines
45 KiB
Python

# from playwright.sync_api import sync_playwright
from playwright.async_api import async_playwright, TimeoutError
import re
import pyautogui
import time
import win32gui, win32con
from bs4 import BeautifulSoup
import asyncio
import os, sys, random
import requests
from PIL import Image
from io import BytesIO
class BrowserController:
def __init__(self, app, logger, locator_manager):
self.app = app
self.logger = logger
self.log_files = ["appTranslator.log", "appTranslator.log.1", "appTranslator.log.2", "appTranslator.log.3", "appTranslator.log.4", "appTranslator.log.5"]
self.locator_manager = locator_manager
# self.chrome_window_name = "퍼센티 - 셀러들을 위한 AI 구매대행 솔루션 - Chrome"
# self.whale_window_name = "새 시크릿 탭 - Whale"
self.chrome_hwnd = None
self.whale_hwnd = None
self.playwright = None
self.browser = None
self.page = None
# BrowserController에 해당하는 모든 locator를 정의
self.chrome_window_name = self.locator_manager.get_locator('BrowserControl', 'chrome_window_name')
self.login_email_locator = self.locator_manager.get_locator('BrowserControl', 'login_email_locator')
self.login_password_locator = self.locator_manager.get_locator('BrowserControl', 'login_password_locator')
self.login_button_locator = self.locator_manager.get_locator('BrowserControl', 'login_button_locator')
self.admin_toggle_locator = self.locator_manager.get_locator('BrowserControl', 'admin_toggle_locator')
self.staff_id_locator = self.locator_manager.get_locator('BrowserControl', 'staff_id_locator')
self.staff_login_button_locator = self.locator_manager.get_locator('BrowserControl', 'staff_login_button_locator')
self.close_ad_dialog_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_dialog_locator')
self.close_ad_button_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_button_locator')
self.total_product_count_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_locator')
self.total_product_count_for_registed_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_for_registed_locator')
self.product_name_template = self.locator_manager.get_locator('BrowserControl', 'product_name_template')
self.product_price_template = self.locator_manager.get_locator('BrowserControl', 'product_price_template')
self.product_image_template = self.locator_manager.get_locator('BrowserControl', 'product_image_template')
self.product_edit_button_template = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template')
self.current_page = self.locator_manager.get_locator('BrowserControl', 'current_page')
self.next_page_button_template = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template')
self.new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
self.current_page_locator = self.locator_manager.get_locator('BrowserControl', 'current_page_locator')
self.source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
self.ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
self.option_input_field_locator = self.locator_manager.get_locator('BrowserControl', 'option_input_field_locator')
self.title_tab_locator = self.locator_manager.get_locator('BrowserControl', 'title_tab_locator')
self.option_tab_locator = self.locator_manager.get_locator('BrowserControl', 'option_tab_locator')
self.price_tab_locator = self.locator_manager.get_locator('BrowserControl', 'price_tab_locator')
self.tag_tab_locator = self.locator_manager.get_locator('BrowserControl', 'tag_tab_locator')
self.thumb_tab_locator = self.locator_manager.get_locator('BrowserControl', 'thumb_tab_locator')
self.detail_tab_locator = self.locator_manager.get_locator('BrowserControl', 'detail_tab_locator')
self.upload_tab_locator = self.locator_manager.get_locator('BrowserControl', 'upload_tab_locator')
self.save_button_locator = self.locator_manager.get_locator('BrowserControl', 'save_button_locator')
self.text_templates = self.locator_manager.selectors.get('DetailPageTextTemplates', {})
def get_page(self):
return self.page
async def start_browser(self):
"""크롬 브라우저 실행 및 페이지 로딩"""
self.logger.debug('크롬 브라우저 실행 중...')
# Playwright를 수동으로 실행하여 브라우저 유지
self.playwright = await async_playwright().start()
# cx_Freeze로 패키징된 경우와 일반 Python 실행 환경 구분하여 경로 설정
if getattr(sys, 'frozen', False):
browser_path = os.path.join(os.path.dirname(sys.executable), 'browsers', 'chromium-1112', 'chrome-win','chrome.exe')
extension_path = os.path.join(os.path.dirname(sys.executable), 'browsers', 'extensions', '1.1.100_0')
user_data_dir = os.path.join(os.path.dirname(sys.executable), 'browsers', 'user_data')
else:
browser_path = os.path.join(os.path.dirname(__file__), 'browsers', 'chromium-1112', 'chrome-win','chrome.exe')
extension_path = os.path.join(os.path.dirname(__file__), 'browsers', 'extensions', '1.1.100_0')
user_data_dir = os.path.join(os.path.dirname(__file__), 'browsers', 'user_data')
self.logger.debug(f"브라우저 경로: {browser_path}")
self.logger.debug(f"확장 프로그램 경로: {extension_path}")
self.logger.debug(f"사용자 폴더 경로: {user_data_dir}")
# 사용자 데이터 디렉토리가 존재하지 않으면 생성
if not os.path.exists(user_data_dir):
os.makedirs(user_data_dir)
self.logger.debug(f"{user_data_dir} 디렉토리가 생성되었습니다.")
# User agent 설정
user_agent = random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
])
self.logger.debug(f"user_agent: {user_agent}")
# 브라우저 시작 및 설정
self.browser = await self.playwright.chromium.launch_persistent_context(
user_data_dir,
headless=False,
permissions=["geolocation", "notifications"],
geolocation={"latitude": 37.5665, "longitude": 126.9780},
locale="ko-KR",
args=[
'--disable-popup-blocking',
f'--disable-extensions-except={extension_path}',
f'--load-extension={extension_path}',
'--start-maximized',
'--window-size=1920,1080'
],
executable_path=browser_path,
user_agent=user_agent
)
# 기본 페이지가 없을 수 있으므로 새로운 페이지 생성
self.page = await self.browser.new_page()
self.logger.info('새 페이지 로딩 중...')
await self.page.goto('https://percenty.co.kr/signin')
self.logger.info('percenty.co.kr/signin 로딩 완료')
# 첫 번째 기본 탭 닫기
if self.browser.pages:
await self.browser.pages[0].close()
# 페이지 제목을 가져와서 창 제목으로 활용
page_title = await self.page.title()
self.logger.debug(f'페이지 제목: {page_title}')
# 창 핸들 찾기 (동적으로 얻은 페이지 제목 사용)
self.chrome_hwnd = self.find_window_by_title(page_title)
if not self.chrome_hwnd:
self.logger.warning('크롬 창을 찾을 수 없습니다.')
else:
self.logger.debug(f'크롬 창 핸들: {self.chrome_hwnd}')
# await self.page.wait_for_load_state('networkidle', timeout=10000)
async def login(self, admin_id, user_id, admin_password, user_password, is_admin=False):
"""로그인 처리"""
self.logger.info(f'로그인 시도 중: {"관리자" if is_admin else "직원"} 계정')
if is_admin:
# 관리자 로그인 처리
await self.page.fill(self.login_email_locator, admin_id)
await self.page.fill(self.login_password_locator, admin_password)
await self.page.click(self.login_button_locator)
else:
# 관리자 토글 버튼을 클릭해서 직원 로그인 화면 활성화
admin_toggle = self.page.locator(self.admin_toggle_locator)
if await admin_toggle.get_attribute("aria-checked") == "true":
await admin_toggle.click() # 관리자 모드에서 직원 모드로 전환
await self.page.fill(self.login_email_locator, admin_id)
await self.page.fill(self.staff_id_locator, user_id)
await self.page.fill(self.login_password_locator, user_password)
await self.page.click(self.staff_login_button_locator)
self.logger.info(f'로그인 완료: {"관리자" if is_admin else "직원"} 계정')
# await self.page.wait_for_load_state('networkidle', timeout=10000)
await self.close_ad_if_exists()
async def close_browser(self):
"""브라우저 종료"""
if self.browser:
await self.browser.close()
await self.playwright.stop()
self.logger.info('브라우저 종료됨.')
def find_window_by_title(self, window_name):
"""창 제목을 통해 핸들을 찾는 메서드"""
def enum_windows_callback(hwnd, result):
if win32gui.IsWindowVisible(hwnd) and window_name in win32gui.GetWindowText(hwnd):
result.append(hwnd)
result = []
win32gui.EnumWindows(enum_windows_callback, result)
return result[0] if result else None
def switch_to_chrome(self):
"""크롬으로 포커스 전환"""
if self.chrome_hwnd:
win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(self.chrome_hwnd)
self.logger.debug('크롬 창으로 포커스 이동.')
else:
self.logger.error('크롬 창을 찾을 수 없습니다.')
async def get_total_product_count_ori(self):
try:
# JavaScript로 해당 요소의 텍스트를 가져옴
element_text = await self.page.evaluate('''() => {
let element = document.querySelector('#root > div > div > div > div > main > div > div.sc-ezreuY.kYrYVh > div.sc-dChVcU.cRrUlt > div.sc-izQBue.dxiUJm > div > div:nth-child(1) > label > span:nth-child(2)');
return element ? element.innerText : null;
}''')
if element_text:
self.logger.debug(f"가져온 텍스트: {element_text}") # 텍스트 확인용 로그
# "총 xx개 상품"에서 숫자만 추출
count = int(''.join(filter(str.isdigit, element_text)))
return count
else:
self.logger.debug("요소를 찾을 수 없습니다.")
return 0
except Exception as e:
self.logger.debug(f"상품 수를 가져오는 중 오류 발생: {e}", exc_info=True)
return 0
async def get_total_product_count(self):
total_count = 0
items_per_page = 0
try:
# total_count_elements = await self.page.query_selector_all(".sc-dOvA-dm.jqRNYf")
total_count_element = await self.page.query_selector("div#root span:has-text('개 상품')")
items_per_page_element = await self.page.query_selector("div#root [title$='개씩 보기']")
self.logger.debug(f"total_count_element : {total_count_element}")
if total_count_element:
total_count_text = await total_count_element.inner_text()
if "" in total_count_text and "개 상품" in total_count_text:
total_count = int(''.join(re.findall(r'\d+', total_count_text)))
self.logger.info(f"총 상품수 확인: {total_count}")
# 페이지당 상품 수 추출
if items_per_page_element:
items_per_page_text = await items_per_page_element.get_attribute("title")
if items_per_page_text and "개씩 보기" in items_per_page_text:
items_per_page = int(''.join(re.findall(r'\d+', items_per_page_text)))
self.logger.info(f"페이지당 상품수 확인: {items_per_page} 개씩 보기")
# 결과 반환
if total_count:
return {"total_count": total_count, "items_per_page": items_per_page}
return {"total_count": 0, "items_per_page": 0}
except Exception as e:
self.logger.error(f"상품 수를 가져오는 중 오류 발생: {e}", exc_info=True)
return {"total_count": 0, "items_per_page": 0}
# def fetch_image_urls_ori(self, html_content):
# """
# HTML 콘텐츠에서 모든 <img> 태그의 URL을 순서대로 추출하고 중복 제거.
# """
# soup = BeautifulSoup(html_content, 'html.parser')
# # 순서를 유지하면서 중복을 제거하기 위해 리스트 사용
# image_urls = []
# seen_urls = set()
# # <figure class="image"> 내부의 모든 <img> 태그 찾기
# figures = soup.find_all('figure', class_='image')
# for figure in figures:
# img_tag = figure.find('img')
# if img_tag and 'src' in img_tag.attrs:
# url = img_tag['src']
# if url not in seen_urls:
# image_urls.append(url)
# seen_urls.add(url) # 중복 방지
# # class="image_resized"를 가진 모든 <img> 태그 찾기
# images_resized = soup.find_all('img', class_='image_resized')
# for img in images_resized:
# if img and 'src' in img.attrs:
# url = img['src']
# if url not in seen_urls:
# image_urls.append(url)
# seen_urls.add(url) # 중복 방지
# return image_urls
def fetch_image_urls(self, html_content):
"""
HTML 콘텐츠에서 모든 <img> 태그의 URL을 추출하는 함수.
<figure> 안의 <img> 태그와 독립된 <img> 태그 모두 처리.
"""
soup = BeautifulSoup(html_content, 'html.parser')
# 모든 <img> 태그를 찾기
image_urls = []
img_tags = soup.find_all('img')
for img in img_tags:
# img 태그에서 src 속성 추출
if 'src' in img.attrs:
image_url = img['src']
image_urls.append(image_url)
self.logger.debug(f"fetch_image_urls 에서 추출한 이미지URL 갯수 : {len(image_urls)}")
self.logger.debug(f"fetch_image_urls 에서 추출한 이미지URL 목록 : {image_urls}")
return image_urls
async def close_ad_if_exists(self):
"""광고 다이얼로그가 있으면 닫기 버튼을 클릭하는 메서드"""
try:
# 광고 다이얼로그가 나타날 때까지 기다림
await self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=5000, state='visible')
self.logger.info("다이얼로그가 발견되었습니다. 닫기 버튼을 클릭합니다.")
# 닫기 버튼 클릭
close_button = await self.page.query_selector(self.close_ad_button_locator)
if close_button:
await close_button.click()
self.logger.info("다이얼로그를 성공적으로 닫았습니다.")
else:
self.logger.warning("닫기 버튼을 찾지 못했습니다.")
except TimeoutError:
# 다이얼로그가 없을 때: info 수준의 로그로 기록
self.logger.info("다이얼로그가 발견되지 않았습니다. 타임아웃이 발생했습니다.")
except Exception as e:
# 다른 예외 상황 발생 시 error로 기록
self.logger.error(f"다이얼로그 닫기 중 오류 발생: {e}", exc_info=True)
async def go_to_new_product_page(self):
"""신규 상품 등록 페이지로 이동"""
try:
new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
await self.page.click(new_product_page_locator)
self.logger.info("신규 상품 등록 페이지로 이동 완료.")
except Exception as e:
self.logger.error(f"신규 상품 등록 페이지 이동 중 오류: {e}", exc_info=True)
# async def get_product_edit_buttons(self):
# """현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기"""
# try:
# # 버튼 선택자를 가져옴
# edit_button_selector = self.product_edit_button
# if not edit_button_selector:
# self.logger.warning("상품 수정 버튼의 선택자를 찾을 수 없습니다.")
# return []
# # 선택자를 사용해 버튼 객체를 찾음
# buttons = self.page.locator(edit_button_selector)
# # 버튼이 존재하는지 확인
# if await buttons.count() == 0:
# self.logger.warning("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.")
# return []
# count = await buttons.count()
# self.logger.info(f"수정할 상품 개수: {count}")
# # 모든 버튼을 리스트로 반환
# return [buttons.nth(i) for i in range(count)]
# except Exception as e:
# self.logger.error(f"상품 수정 버튼을 찾는 중 오류: {e}", exc_info=True)
# return []
async def is_button_disabled(self, button):
"""버튼이 disabled 상태인지 확인"""
try:
# 버튼의 disabled 속성 확인
is_disabled = await button.get_attribute('disabled')
return is_disabled is not None # disabled 속성이 있으면 True 반환
except Exception as e:
self.logger.error(f"상품 수정 버튼 상태 확인 중 오류 발생: {e}", exc_info=True)
return False # 오류 발생 시 기본적으로 활성화된 것으로 처리
async def get_product_edit_buttons_by_templete(self):
"""현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기"""
try:
# 버튼 선택자 설정
# edit_button_selector_template = f'//button[span[text()="세부사항 수정 및 업로드"]]'
self.product_edit_button_template
# 선택자를 사용해 버튼 객체를 찾음
buttons = self.page.locator(self.product_edit_button_template)
# 버튼이 존재하는지 확인
button_count = await buttons.count()
if button_count == 0:
self.logger.warning("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.")
return []
self.logger.info(f"현재 페이지의 수정할 상품 개수: {button_count}")
# 모든 버튼을 리스트로 반환
return [buttons.nth(i) for i in range(button_count)]
except Exception as e:
self.logger.error(f"상품 수정 버튼을 찾는 중 오류: {e}", exc_info=True)
return []
async def click_modify_button_by_text(self, index):
"""인덱스에 해당하는 '세부사항 수정 및 업로드' 버튼 클릭"""
try:
# config.ini에서 선택자 가져오기
button_template = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template')
button_selector = f'({button_template})[{index}]'
button = await self.page.query_selector(button_selector)
# 버튼이 화면에 보이도록 스크롤 후 클릭
if button:
await button.scroll_into_view_if_needed()
await self.page.evaluate('arguments[0].click();', button)
self.logger.info(f'{index}번째 상품의 수정 버튼 클릭 완료')
else:
self.logger.warning(f'{index}번째 상품의 수정 버튼을 찾지 못했습니다.')
except Exception as e:
self.logger.error(f'{index}번째 상품의 수정 버튼 클릭 중 오류: {str(e)}')
async def open_product_edit_dialog(self, button):
"""상품 수정 다이얼로그 열기"""
try:
# 요소가 화면에 없을 경우 스크롤하여 보이도록 함
await button.scroll_into_view_if_needed()
self.logger.debug("상품의 '세부사항 수정 및 업로드' 버튼을 화면에 보이도록 스크롤.")
await button.click()
self.logger.info("세부사항 수정 다이얼로그 열기 완료.")
await self.page.wait_for_selector('div.ant-tabs-nav') # 다이얼로그가 완전히 로딩될 때까지 기다림
except Exception as e:
self.logger.error(f"세부사항 수정 다이얼로그 열기 중 오류: {e}", exc_info=True)
async def click_detail_tab(self):
"""상세페이지 탭 클릭"""
try:
await self.page.click(self.detail_tab_locator)
self.logger.info("상세페이지 탭 클릭 완료.")
except Exception as e:
self.logger.error(f"상세페이지 탭 클릭 중 오류: {e}", exc_info=True)
async def click_option_tab(self):
"""옵션 탭 클릭"""
try:
await self.page.click(self.option_tab_locator)
self.logger.info("옵션 탭 클릭 완료.")
except Exception as e:
self.logger.error(f"옵션 탭 클릭 중 오류: {e}", exc_info=True)
async def click_price_tab(self):
"""가격 탭 클릭"""
try:
await self.page.click(self.price_tab_locator)
self.logger.info("가격 탭 클릭 완료.")
except Exception as e:
self.logger.error(f"가격 탭 클릭 중 오류: {e}", exc_info=True)
async def click_title_tab(self):
"""상품명 탭 클릭"""
try:
await self.page.click(self.title_tab_locator)
self.logger.info("상품명 탭 클릭 완료.")
except Exception as e:
self.logger.error(f"상품명 탭 클릭 중 오류: {e}", exc_info=True)
def generate_restored_html(self, urls):
"""이미지 URL 목록을 HTML 형식으로 변환하는 메서드"""
html_content = '<p>&nbsp;</p>'
for url in urls:
html_content += f'<figure class="image"><img src="{url}" style="aspect-ratio:1/1;"></figure>\n'
return html_content
def deleted_img_urls_from_logs(self):
"""로그 파일에서 상품명과 이미지 URL 목록을 추출하여 딕셔너리로 반환하는 메서드"""
image_data = {}
log_dir = os.path.join(os.path.dirname(__file__), "recovery_log")
# 로그 파일에서 필요한 정보만 추출
for log_file in self.log_files:
log_path = os.path.join(log_dir, log_file)
if os.path.exists(log_path):
with open(log_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
current_product = None
for line in lines:
# 상품명 추출
product_match = re.search(r"원본 상품명 '(.+?)'", line)
if product_match:
current_product = product_match.group(1)
image_data[current_product] = []
# 이미지 URL 목록 추출
url_match = re.search(r"fetch_image_urls 에서 추출한 이미지URL 목록 : \[(.+?)\]", line)
if url_match and current_product:
# 각 URL에서 불필요한 작은따옴표 제거
urls = [url.strip("'\"") for url in url_match.group(1).split(", ")]
image_data[current_product].extend(urls)
current_product = None # Reset after each product's URL extraction
self.logger.debug(f"복구된 이미지 URL 데이터: {image_data}")
return image_data
async def recovery_image_urls(self, product_name, deleted_img_urls):
"""상품명과 삭제된 이미지 URL 데이터를 이용해 복구 작업을 수행하는 메서드"""
self.logger.debug("상품명과 삭제된 이미지 URL 데이터를 이용해 복구 작업을 수행하는 메서드")
if product_name in deleted_img_urls:
# 소스 편집 모드로 전환
source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
await self.page.click(source_button_locator)
self.logger.debug("recovery_image_urls : 소스 버튼 클릭 완료.")
# 기존 extract_image_urls와 유사하게 HTML 소스를 가져옴
textarea = await self.page.wait_for_selector(ck_source_editing_area_locator, timeout=5000)
data_value = await textarea.get_attribute("data-value")
# HTML 소스에서 이미지 URL 추출
image_urls = self.fetch_image_urls(data_value)
self.logger.info(f'recovery_image_urls추출된 이미지 URL 수: {len(image_urls)}')
# 이미지 태그가 없으면 로그에서 추출한 데이터를 HTML로 복원하여 입력
if len(image_urls) == 0:
restored_html = self.generate_restored_html(deleted_img_urls[product_name])
await self.page.evaluate(f'document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", `{restored_html}`)')
self.logger.debug("recovery_image_urls로그 데이터를 이용하여 HTML 복원 완료.")
else:
self.logger.debug("이미 이미지가 있으므로 복원 작업을 패스합니다.")
# 소스 편집 모드 종료
await self.page.click(source_button_locator)
self.logger.debug('소스 버튼 재 클릭 완료.')
else:
self.logger.debug(f"로그에 해당 상품명 '{product_name}'에 대한 데이터가 존재하지 않습니다.")
def generate_restored_html(self, urls):
"""이미지 URL 목록을 HTML 형식으로 변환하는 메서드, 각 이미지의 가로세로 비율 추가"""
html_content = '<p>&nbsp;</p>\n'
for url in urls:
width, height = self.get_image_size(url)
aspect_ratio = f"{width}/{height}" if width and height else "1/1"
if width and height:
html_content += (
f'<figure class="image">'
f'<img style="aspect-ratio:{aspect_ratio};" src="{url}" width="{width}" height="{height}">'
f'</figure>\n'
)
else:
# 이미지 크기를 확인할 수 없을 경우 기본 형식으로 추가
html_content += f'<figure class="image"><img src="{url}"></figure>\n'
return html_content
def get_image_size(self, url):
"""이미지 URL로부터 가로와 세로 크기를 가져오는 메서드"""
try:
# URL에서 불필요한 따옴표 제거
cleaned_url = url.strip("'\"")
response = requests.get(cleaned_url, timeout=5)
response.raise_for_status()
image = Image.open(BytesIO(response.content))
return image.width, image.height
except Exception as e:
self.logger.warning(f"이미지 크기 확인 실패 - {cleaned_url}: {e}")
return None, None
async def extract_image_urls(self, optionHandler, is_option_data=False):
"""상세페이지에서 이미지 URL 추출"""
try:
# 소스 편집 모드로 전환
source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
# 소스 편집 모드로 전환
await self.page.click(source_button_locator)
self.logger.debug("소스 버튼 클릭 완료.")
# 'data-value' 속성 값을 추출 (textarea 요소)
textarea = await self.page.wait_for_selector(ck_source_editing_area_locator, timeout=5000)
data_value = await textarea.get_attribute("data-value")
# HTML 소스에서 이미지 URL 추출
image_urls = self.fetch_image_urls(data_value)
self.logger.info(f'추출된 이미지 URL 수: {len(image_urls)}')
# HTML 소스에서 이미지 URL 삭제
self.logger.debug('img 태그를 삭제 중...')
data_value_element = await self.page.query_selector(ck_source_editing_area_locator)
new_value = ""
if data_value_element:
await self.page.evaluate(f'() => document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", "{new_value}")')
updated_value = await data_value_element.get_attribute('data-value')
self.logger.debug(f'Updated data-value: {updated_value}')
else:
self.logger.debug('Element with data-value not found.')
self.logger.debug('img 태그 삭제 완료.')
# img 태그의 class 삭제 후 다시 소스 버튼 클릭
await self.page.click(source_button_locator)
self.logger.debug('소스 버튼 재 클릭 완료.')
if is_option_data:
self.logger.debug('옵션 데이터 입력 시작')
option_data = {} # option_data 초기화
option_data = optionHandler.get_selected_translated_options()
is_single = optionHandler.option_info['is_single_option']
is_single = True # 옵션입력 일단 제외
self.logger.debug('옵션입력 일단 제외')
self.logger.debug('가져온 옵션 데이터')
self.logger.debug(f'{option_data}')
# 옵션 입력 필드 선택
input_field = await self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
await input_field.press('Enter')
# 선두부 텍스트 입력
for key in sorted(self.text_templates.keys()):
leading_text = self.text_templates[key]
if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
await input_field.type(leading_text)
await input_field.press('Enter')
self.logger.info(f"{key} 텍스트 입력 완료: {leading_text}")
if not is_single:
self.logger.info('단일옵션이 아니므로 옵션목록을 입력')
# 각 옵션을 한 줄씩 입력
await input_field.type("# 옵션 목록")
await input_field.press('Enter')
# 첫 번째 옵션의 번역된 옵션명만 입력
first_key = list(option_data.keys())[0]
first_value = option_data[first_key]
await input_field.type(f"- 1. {first_value}")
await input_field.press('Enter') # 첫 번째 옵션 이후 엔터로 줄바꿈
# 나머지 옵션도 번역된 옵션명만 입력
for i, (key, value) in enumerate(list(option_data.items())[1:], start=2):
await input_field.type(f"{i}. {value}") # 옵션 번호와 번역된 옵션명만 입력
await input_field.press('Enter') # 엔터 키를 입력하여 줄바꿈
# 목록 끝을 알리기 위해 엔터 두 번 입력
await input_field.press('Enter')
await input_field.press('Enter')
# 후두부 텍스트 입력
await input_field.type('### 나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.')
await input_field.press('Enter')
await input_field.type('---')
await input_field.press('Enter')
self.logger.info('옵션 데이터 입력 완료')
return image_urls
except Exception as e:
self.logger.error(f"이미지 URL 추출 & 옵션데이터 입력 처리 중 오류: {e}", exc_info=True)
return image_urls if image_urls else []
def paste_image_in_chrome(self, clipboardImageManager, url, is_success_translated, toggle_states, is_watermark=False, watermark_text= "", opacity_percent=25):
"""크롬으로 포커스를 옮기고 클립보드의 이미지를 붙여넣고 엔터 입력"""
self.logger.debug("크롬으로 포커스를 옮기고 클립보드의 이미지를 붙여넣고 엔터 입력")
try:
self.switch_to_chrome() # 크롬으로 포커스 이동
clipboardImageManager.process_clipboard(original_url=url, is_success_translated=is_success_translated, toggle_states=toggle_states, opacity_percent=opacity_percent) # 클립보드 내용을 처리
# clipboard_content = pyperclip.paste()
if clipboardImageManager.is_clipboard_image():
pyautogui.hotkey('ctrl', 'v') # 클립보드 이미지 붙여넣기
self.logger.info("이미지 붙여넣기 완료.")
pyautogui.press('right') # 오른쪽 입력
self.logger.debug("이미지 붙여넣기 완료.")
clipboardImageManager.clear_clipboard()
self.logger.info("이미지 붙여넣기 완료로 클립보드 비우기.")
return True
else:
self.logger.warning("클립보드가 비어있습니다.")
return False
except Exception as e:
self.logger.error(f"이미지 붙여넣기 중 오류: {e}", exc_info=True)
return False
async def save_and_ecs_product_edit(self):
"""상품 수정 후 저장 버튼 클릭"""
try:
await self.page.click(self.save_button_locator)
await self.page.keyboard.press("Escape")
self.logger.info("상품 수정 내용 저장 및 ECS 완료.")
except Exception as e:
self.logger.error(f"저장 버튼 클릭 중 오류: {e}", exc_info=True)
async def save_product_edit(self):
"""상품 수정 후 저장 버튼 클릭"""
try:
await self.page.click(self.save_button_locator)
self.logger.info("상품 수정 내용 저장 완료.")
except Exception as e:
self.logger.error(f"저장 버튼 클릭 중 오류: {e}", exc_info=True)
async def go_to_next_page(self):
"""다음 페이지로 이동"""
try:
# 현재 페이지가 몇 번째 페이지인지 확인 (클래스에 'ant-pagination-item-active'가 있는 요소)
current_page = await self.page.query_selector(self.current_page_locator)
if not current_page:
self.logger.warning("현재 페이지 정보를 찾을 수 없습니다.")
return False
# 현재 활성화된 페이지 번호를 가져옴
current_page_number = int(await current_page.get_attribute("title"))
next_page_number = current_page_number + 1
# 다음 페이지 버튼을 찾음 (title 속성으로 다음 페이지를 찾음)
next_page_button_locator = self.next_page_button_template.format(page_number=next_page_number)
next_page_button = await self.page.query_selector(next_page_button_locator)
if next_page_button:
await next_page_button.click() # 페이지 버튼 클릭
# await self.page.wait_for_load_state('domcontentloaded') # 페이지 로딩이 완료될 때까지 대기
self.logger.info(f"페이지 {next_page_number}로 이동 완료.")
return True
else:
self.logger.warning("다음 페이지가 없습니다.")
return False
except Exception as e:
self.logger.error(f"다음 페이지로 이동 중 오류 발생: {e}", exc_info=True)
return False
def switch_to_chrome(self):
"""크롬으로 포커스 전환"""
try:
if not self.chrome_hwnd:
self.chrome_hwnd = self.find_window_by_title(self.chrome_window_name)
if self.chrome_hwnd:
win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(self.chrome_hwnd)
self.logger.debug('크롬 창으로 포커스 이동.')
else:
self.logger.warning('크롬 창을 찾을 수 없습니다.')
except Exception as e:
self.logger.error(f"크롬 포커스 전환 중 오류: {e}", exc_info=True)
async def scroll_with_wheel(self, direction="down", pause_time=0.5, max_scrolls=50):
"""
휠 스크롤을 사용하여 페이지를 위나 아래로 천천히 스크롤.
Parameters:
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
- pause_time: 스크롤 사이의 대기 시간 (초).
- max_scrolls: 최대 스크롤 횟수.
"""
scroll_count = 0
self.logger.debug(f"스크롤 시작")
# 현재 페이지 높이 가져오기
last_height = await self.page.evaluate("document.body.scrollHeight")
self.logger.debug(f"현재 페이지 높이 가져오기 - {last_height}")
while scroll_count < max_scrolls:
if direction == "down":
# 아래로 스크롤
self.logger.debug(f"scroll_count[{scroll_count}]회 : 휠 아래로 1000px")
await self.page.evaluate("window.scrollBy(0, 1000);")
elif direction == "up":
# 위로 스크롤
self.logger.debug(f"scroll_count[{scroll_count}]회 : 휠 위로 1000px")
await self.page.evaluate("window.scrollBy(0, -1000);")
else:
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
self.logger.debug(f"pause_time 슬립 : {pause_time}")
await asyncio.sleep(pause_time)
# 새로운 페이지 높이 가져오기
new_height = await self.page.evaluate("document.body.scrollHeight")
self.logger.debug(f"새로운 페이지 높이 가져오기 - {new_height}")
# 스크롤이 더 이상 필요 없는 경우(페이지 끝에 도달)
if direction == "down" and new_height == last_height:
self.logger.debug(f"페이지 끝에 도달했습니다. 스크롤 횟수: {scroll_count}")
break
elif direction == "up" and new_height == 0:
self.logger.debug(f"페이지 시작에 도달했습니다. 스크롤 횟수: {scroll_count}")
break
self.logger.debug(f"새로운 페이지 높이를 현재높이로 재설정 - {new_height}")
last_height = new_height
scroll_count += 1
self.logger.debug(f"스크롤 카운트 + 1")
if scroll_count == max_scrolls:
self.logger.debug("최대 스크롤 횟수에 도달했습니다.")
async def scroll_with_keyboard(self, direction="down", pause_time=0.5, max_scrolls=50):
"""
키보드를 사용하여 페이지를 위나 아래로 천천히 스크롤.
Parameters:
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
- pause_time: 스크롤 사이의 대기 시간 (초).
- max_scrolls: 최대 스크롤 횟수.
"""
scroll_count = 0
while scroll_count < max_scrolls:
if direction == "down":
# 아래로 스크롤 (Page Down 키 사용)
await self.page.keyboard.press("PageDown")
elif direction == "up":
# 위로 스크롤 (Page Up 키 사용)
await self.page.keyboard.press("PageUp")
else:
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
await asyncio.sleep(pause_time)
scroll_count += 1
if scroll_count == max_scrolls:
self.logger.debug("최대 스크롤 횟수에 도달했습니다.")
async def collect_product_info(self):
"""
상품 정보를 수집하는 메서드
"""
try:
# 페이지를 아래로 스크롤하여 모든 상품 로드
await self.scroll_with_wheel('down')
await self.scroll_with_wheel('up')
product_infos = []
for i in range(1, 51): # 1부터 최대 50까지 상품 처리
try:
# 각 상품의 CSS 선택자를 동적으로 생성하여 접근
product_name_locator = self.product_name_template.format(i=i)
product_price_locator = self.product_price_template.format(i=i)
product_image_locator = self.product_image_template.format(i=i)
product_name_element = await self.page.query_selector(product_name_locator)
product_price_element = await self.page.query_selector(product_price_locator)
product_image_element = await self.page.query_selector(product_image_locator)
if product_name_element and product_price_element and product_image_element:
product_info = {
"name": await product_name_element.text_content().strip(),
"price": await product_price_element.text_content().strip(),
"image_url": await product_image_element.get_attribute('src')
}
self.logger.debug(f"상품 {i}: {product_info}")
product_infos.append(product_info)
except Exception as e:
self.logger.error(f"상품 {i} 정보 수집 중 오류 발생: {e}", exc_info=True)
continue
return product_infos
except Exception as e:
self.logger.error(f"상품 정보 수집 중 오류 발생: {e}", exc_info=True)
return []
async def scroll_page_to_bottom(self, pause_time=0.2):
"""페이지의 맨 아래까지 스크롤하여 모든 동적 요소를 로드"""
self.logger.info('페이지 스크롤 시작...')
previous_height = await self.page.evaluate("() => document.body.scrollHeight")
while True:
await self.page.evaluate("window.scrollBy(0, window.innerHeight);") # 한 화면씩 스크롤
await asyncio.sleep(pause_time) # 페이지 로딩 대기
current_height = await self.page.evaluate("() => document.body.scrollHeight")
if current_height == previous_height:
break # 더 이상 스크롤할 내용이 없으면 종료
previous_height = current_height
self.logger.info('페이지 스크롤 완료.')
async def scroll_page_to_top(self, pause_time=0.2):
"""페이지의 맨 위까지 스크롤"""
self.logger.info('페이지 위로 스크롤 시작...')
previous_height = await self.page.evaluate("() => window.pageYOffset")
while previous_height > 0:
await self.page.evaluate("window.scrollBy(0, -window.innerHeight);") # 한 화면씩 위로 스크롤
await asyncio.sleep(pause_time) # 페이지 로딩 대기
current_height = await self.page.evaluate("() => window.pageYOffset")
if current_height == previous_height:
break # 더 이상 스크롤할 내용이 없으면 종료
previous_height = current_height
self.logger.info('페이지 위로 스크롤 완료.')