1822 lines
86 KiB
Python
1822 lines
86 KiB
Python
from playwright.sync_api import sync_playwright, TimeoutError
|
|
from PySide6.QtCore import QThread, Signal, QMutex, QWaitCondition
|
|
import re
|
|
import time
|
|
from bs4 import BeautifulSoup
|
|
import os, sys, random
|
|
import requests
|
|
from PIL import Image
|
|
from io import BytesIO
|
|
|
|
from whale_new import WhaleTranslator
|
|
from clipboardImageManager import ClipboardImageManager
|
|
from option import OptionHandler
|
|
from price import PriceHandler
|
|
from title import TitleHandler
|
|
from thumb import ThumbnailHandler
|
|
import tempfile
|
|
|
|
class BrowserController(QThread):
|
|
# 브라우저 시작 시그널 정의
|
|
browser_started = Signal()
|
|
browser_error = Signal(str)
|
|
# 번역 작업 시작, 완료, 오류 시그널 정의
|
|
translation_started = Signal()
|
|
translation_completed = Signal()
|
|
translation_error = Signal(str)
|
|
|
|
def __init__(self, app, logger, locator_manager, vertexAI, cmb_diag, login_infos, toggle_states):
|
|
super().__init__()
|
|
self.logger = logger
|
|
self.log_files = ["AutoPercenty3.log", "AutoPercenty3.log.1", "AutoPercenty3.log.2", "AutoPercenty3.log.3", "AutoPercenty3.log.4", "AutoPercenty3.log.5"]
|
|
self.locator_manager = locator_manager
|
|
self.vertexAI = vertexAI
|
|
self.cmb_diag = cmb_diag
|
|
self.toggle_states = toggle_states
|
|
self.login_infos = login_infos
|
|
self.chrome_hwnd = None
|
|
# 임시 디렉토리 생성
|
|
self.temp_dir = tempfile.mkdtemp() # 임시 디렉토리 생성
|
|
|
|
self.whale_translator = None
|
|
|
|
self.playwright = None
|
|
self.browser = None
|
|
self.page = None
|
|
|
|
# 일시 정지 기능을 위한 QMutex와 QWaitCondition 설정
|
|
self.pause_mutex = QMutex()
|
|
self.pause_condition = QWaitCondition()
|
|
self.is_paused = False # 일시 정지 상태 플래그
|
|
|
|
self.clipboardImageManager = ClipboardImageManager(logger=self.logger, watermark_font_size=36, debug_flag=self.toggle_states['debug_mode'])
|
|
self.optionHandler = OptionHandler(self.locator_manager, self, self.whale_translator, self.clipboardImageManager, self.logger, self.vertexAI, debug_flag=self.toggle_states['debug_mode'])
|
|
self.priceHandler = PriceHandler(self.locator_manager, self, self.logger, self.optionHandler, self.vertexAI, self.cmb_diag, debug_flag=self.toggle_states['debug_mode'])
|
|
self.titleHandler = TitleHandler(self.locator_manager, self, self.logger)
|
|
self.thumbnailHandler = ThumbnailHandler(self.locator_manager, self, self.logger, self.whale_translator, self.clipboardImageManager, self.toggle_states)
|
|
|
|
# BrowserController에 해당하는 모든 locator를 정의
|
|
self.chrome_window_name = self.locator_manager.get_locator('BrowserControl', 'chrome_window_name')
|
|
self.login_email_locator = self.locator_manager.get_locator('BrowserControl', 'login_email_locator')
|
|
self.login_password_locator = self.locator_manager.get_locator('BrowserControl', 'login_password_locator')
|
|
self.login_button_locator = self.locator_manager.get_locator('BrowserControl', 'login_button_locator')
|
|
self.admin_toggle_locator = self.locator_manager.get_locator('BrowserControl', 'admin_toggle_locator')
|
|
self.staff_id_locator = self.locator_manager.get_locator('BrowserControl', 'staff_id_locator')
|
|
self.staff_login_button_locator = self.locator_manager.get_locator('BrowserControl', 'staff_login_button_locator')
|
|
self.close_ad_dialog_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_dialog_locator')
|
|
self.close_ad_button_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_button_locator')
|
|
self.total_product_count_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_locator')
|
|
self.total_product_count_for_registed_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_for_registed_locator')
|
|
self.product_parent_locator= self.locator_manager.get_locator('BrowserControl', 'product_parent_locator')
|
|
self.product_name_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_name_inner_locator')
|
|
self.product_price_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_price_inner_locator')
|
|
self.product_image_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_image_inner_locator')
|
|
self.product_name_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_name_for_ed_template')
|
|
self.product_price_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_price_for_ed_template')
|
|
self.product_image_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_image_for_ed_template')
|
|
self.product_edit_button_template = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template')
|
|
self.current_page = self.locator_manager.get_locator('BrowserControl', 'current_page')
|
|
self.next_page_button_template = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template')
|
|
self.new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
|
|
self.registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
|
|
self.current_page_locator = self.locator_manager.get_locator('BrowserControl', 'current_page_locator')
|
|
self.source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
|
|
self.ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
|
|
self.option_input_field_locator = self.locator_manager.get_locator('BrowserControl', 'option_input_field_locator')
|
|
self.title_tab_locator = self.locator_manager.get_locator('BrowserControl', 'title_tab_locator')
|
|
self.option_tab_locator = self.locator_manager.get_locator('BrowserControl', 'option_tab_locator')
|
|
self.price_tab_locator = self.locator_manager.get_locator('BrowserControl', 'price_tab_locator')
|
|
self.tag_tab_locator = self.locator_manager.get_locator('BrowserControl', 'tag_tab_locator')
|
|
self.thumb_tab_locator = self.locator_manager.get_locator('BrowserControl', 'thumb_tab_locator')
|
|
self.detail_tab_locator = self.locator_manager.get_locator('BrowserControl', 'detail_tab_locator')
|
|
self.upload_tab_locator = self.locator_manager.get_locator('BrowserControl', 'upload_tab_locator')
|
|
self.save_button_locator = self.locator_manager.get_locator('BrowserControl', 'save_button_locator')
|
|
|
|
self.text_templates = self.locator_manager.selectors.get('DetailPageTextTemplates', {})
|
|
|
|
|
|
# # 스레드 종료 시 close_whale_window_if_exists 호출
|
|
# self.finished.connect(self.cleanup)
|
|
|
|
def get_page(self):
|
|
return self.page
|
|
|
|
def get_whale(self):
|
|
return self.whale_translator
|
|
|
|
def monitor_browser_logs(self):
|
|
"""브라우저 로그 및 오류를 모니터링"""
|
|
try:
|
|
self.page.on("console", lambda msg: self.logger.info(f"JS 로그: {msg.type}: {msg.text}"))
|
|
self.page.on("requestfailed", lambda req: self.logger.error(f"요청 실패: {req.url}"))
|
|
except Exception as e:
|
|
self.logger.error(f"브라우저 로그 모니터링 중 오류 발생: {e}", exc_info=True)
|
|
|
|
def get_base_dir(self):
|
|
"""
|
|
실행 환경에 따라 base_dir을 설정하는 메서드.
|
|
cx_Freeze로 패키징된 경우 실행 파일의 경로, 일반 Python 환경일 경우 __file__을 기준으로 설정.
|
|
"""
|
|
if getattr(sys, 'frozen', False): # 패키징된 경우
|
|
base_dir = os.path.dirname(sys.executable)
|
|
internal_dir = os.path.join(base_dir, '_internal') # _internal 디렉토리 포함
|
|
if os.path.exists(internal_dir): # _internal 디렉토리가 존재하면 base_dir로 설정
|
|
return internal_dir
|
|
|
|
else: # 일반 Python 실행 환경
|
|
base_dir = os.path.dirname(os.path.abspath(__file__))
|
|
return base_dir
|
|
|
|
def start_browser(self):
|
|
"""비동기 Playwright 초기화 및 로그인 수행"""
|
|
try:
|
|
self.logger.debug('크롬 브라우저를 실행합니다...')
|
|
|
|
# WhaleTranslator 필요 여부 확인 및 초기화
|
|
optionIMGTrans_status = self.toggle_states.get('optionIMGTrans', False)
|
|
detail_IMGTrans_status = self.toggle_states.get('detail_IMGTrans', False)
|
|
thumb_status = self.toggle_states.get('thumb', False)
|
|
|
|
self.logger.debug(f"optionIMGTrans_status: {optionIMGTrans_status}, detail_IMGTrans_status: {detail_IMGTrans_status}, thumb_status: {thumb_status}")
|
|
|
|
if optionIMGTrans_status or detail_IMGTrans_status or thumb_status:
|
|
self.logger.debug('이미지번역을 위해 웨일을 로드합니다...')
|
|
self.whale_translator = WhaleTranslator(self.logger)
|
|
self.whale_translator.start_whale_browser()
|
|
|
|
# Playwright 시작 및 브라우저 실행
|
|
self.logger.debug('playwright().start 로드합니다...')
|
|
|
|
self.playwright = sync_playwright().start()
|
|
self.logger.debug('playwright().start 완료')
|
|
|
|
base_path = self.get_base_dir()
|
|
self.logger.debug(f"base_path: {base_path}")
|
|
|
|
browser_path = os.path.join(base_path, 'browsers', 'chromium-1140', 'chrome-win','chrome.exe')
|
|
extension_path = os.path.join(base_path, 'browsers', 'extensions', '1.1.100_0')
|
|
user_data_dir = os.path.join(base_path, 'browsers', 'user_data')
|
|
cache_dir = os.path.join(base_path, 'browsers', 'chromium-1140', 'cache') # 캐시 폴더 경로
|
|
|
|
if not os.path.exists(browser_path):
|
|
self.logger.error(f"브라우저 실행 파일이 없습니다: {browser_path}")
|
|
raise FileNotFoundError(f"브라우저 실행 파일이 없습니다: {browser_path}")
|
|
|
|
self.logger.debug(f"브라우저 경로: {browser_path}")
|
|
self.logger.debug(f"확장 프로그램 경로: {extension_path}")
|
|
self.logger.debug(f"사용자 폴더 경로: {user_data_dir}")
|
|
|
|
# 사용자 데이터 디렉토리가 존재하지 않으면 생성
|
|
if not os.path.exists(user_data_dir):
|
|
os.makedirs(user_data_dir)
|
|
self.logger.debug(f"{user_data_dir} 디렉토리가 생성되었습니다.")
|
|
|
|
# cache 디렉토리가 존재하지 않으면 생성
|
|
if not os.path.exists(cache_dir):
|
|
os.makedirs(cache_dir)
|
|
self.logger.debug(f"{cache_dir} 디렉토리가 생성되었습니다.")
|
|
|
|
|
|
# User agent 설정
|
|
user_agent = random.choice([
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
|
|
])
|
|
self.logger.debug(f"user_agent: {user_agent}")
|
|
|
|
viewport = {"width": 1280, "height": 720}
|
|
|
|
# 브라우저 시작 및 설정
|
|
self.browser = self.playwright.chromium.launch_persistent_context(
|
|
user_data_dir,
|
|
headless=True,
|
|
viewport=viewport,
|
|
permissions=["geolocation", "notifications"],
|
|
geolocation={"latitude": 37.5665, "longitude": 126.9780},
|
|
locale="ko-KR",
|
|
args=[
|
|
'--no-sandbox',
|
|
'--disable-setuid-sandbox',
|
|
'--disable-popup-blocking',
|
|
'--disable-blink-features=AutomationControlled',
|
|
f'--disable-extensions-except={extension_path}',
|
|
f'--load-extension={extension_path}',
|
|
'--start-maximized',
|
|
'--window-size=1280,720',
|
|
f"--disk-cache-dir={cache_dir}", # Playwright 캐시 폴더 지정
|
|
],
|
|
executable_path=browser_path,
|
|
user_agent=user_agent
|
|
)
|
|
|
|
# 브라우저 종료 이벤트 로깅
|
|
self.browser.on("disconnected", lambda: self.logger.error("브라우저가 종료되었습니다."))
|
|
|
|
# 기본 페이지가 없을 수 있으므로 새로운 페이지 생성
|
|
self.page = self.browser.new_page()
|
|
self.logger.info('새 페이지 로딩 중...')
|
|
|
|
# stealth_sync(self.page)
|
|
# self.logger.debug('스텔스 설정')
|
|
|
|
# self.monitor_browser_logs()
|
|
# self.logger.info('자바스크립트의 로그를 모니터링하기 위한 로그 설정')
|
|
|
|
self.page.goto('https://percenty.co.kr/signin')
|
|
# self.page.goto('https://percenty.co.kr')
|
|
self.logger.info('percenty.co.kr/signin 로딩 완료')
|
|
|
|
|
|
# 첫 번째 기본 탭 닫기
|
|
if self.browser.pages:
|
|
self.browser.pages[0].close()
|
|
time.sleep(0.3)
|
|
|
|
# 페이지 제목을 가져와서 창 제목으로 활용
|
|
page_title = self.page.title()
|
|
self.logger.debug(f'페이지 제목: {page_title}')
|
|
|
|
# # 창 핸들 찾기 (동적으로 얻은 페이지 제목 사용)
|
|
# self.chrome_hwnd = self.find_window_by_title(page_title)
|
|
# if not self.chrome_hwnd:
|
|
# self.logger.warning('크롬 창을 찾을 수 없습니다.')
|
|
# else:
|
|
# self.logger.debug(f'크롬 창 핸들: {self.chrome_hwnd}')
|
|
|
|
self.login()
|
|
self.close_ad_if_exists()
|
|
|
|
id_ed_mode = self.toggle_states.get('ed_mode', False)
|
|
if id_ed_mode:
|
|
self.go_to_registered_product_page()
|
|
self.logger.info('등록 상품 관리 페이지로 이동 중...')
|
|
else:
|
|
self.logger.info('신규 상품 등록 페이지로 이동 중...')
|
|
self.go_to_new_product_page()
|
|
|
|
|
|
# 각 핸들러에 초기화된 page 객체 전달.
|
|
self.optionHandler.update_page(self.page)
|
|
self.titleHandler.update_page(self.page)
|
|
self.priceHandler.update_page(self.page)
|
|
self.thumbnailHandler.update_page(self.page)
|
|
|
|
time.sleep(1)
|
|
self.start_Percenty_task()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"브라우저 시작 오류: {str(e)}", exc_info=True)
|
|
self.browser_error.emit(str(e))
|
|
|
|
|
|
def start_Percenty_task(self):
|
|
self.logger.debug('퍼센티 상품수정 작업을 시작합니다...')
|
|
self.running = True # 번역 작업이 시작됨
|
|
|
|
try:
|
|
# 1. 총 상품 수 수집
|
|
self.scroll_page_to_bottom() # 동적 로딩을 위해 끝까지 스크롤
|
|
|
|
# total_products = self.browser_controller.get_total_product_count(ed_mode=self.toggle_states['ed_mode'])
|
|
|
|
# get_total_product_count 메서드 호출 후 결과를 딕셔너리로 받음
|
|
result = self.get_total_product_count()
|
|
# 딕셔너리에서 총 상품 수와 페이지당 상품 수를 추출
|
|
total_products = result.get("total_count", 0)
|
|
items_per_page = result.get("items_per_page", 0)
|
|
self.logger.debug(f"총 상품 수: {total_products}, 페이지당 상품 수: {items_per_page}")
|
|
|
|
self.logger.debug(f"[ self.toggle_states 상태 ]\n{self.toggle_states}")
|
|
|
|
if total_products == 0:
|
|
self.logger.debug('수집할 상품이 없습니다. 작업을 종료합니다.')
|
|
return
|
|
|
|
completed_count = 0
|
|
page_number = 1
|
|
|
|
# 3. 총 상품 수만큼 반복 작업 수행
|
|
while completed_count < total_products:
|
|
self.logger.debug(f'현재 페이지: {page_number}')
|
|
|
|
if not page_number == 1:
|
|
self.scroll_page_to_top()
|
|
self.logger.debug(f'1페이지가 아니므로 상품의 동적로딩을 위해 휠 스크롤 업')
|
|
|
|
is_ed_mode = self.toggle_states.get('ed_mode', False)
|
|
if not is_ed_mode:
|
|
# 4. 현재 페이지의 모든 "세부사항 수정 및 업로드" 버튼 찾기
|
|
self.logger.debug('수정모드가 아니므로 상품수정 버튼 elements를 수집합니다.')
|
|
product_buttons = self.get_product_edit_buttons_by_templete()
|
|
else:
|
|
self.logger.debug('상품정보 수집')
|
|
product_infos, product_name_elements = self.collect_product_info(items_per_page, ed_mode=is_ed_mode)
|
|
self.logger.debug(f"product_infos : {product_infos}")
|
|
self.logger.debug('수정모드이므로 상품명 elements를 수정버튼으로 활용합니다.')
|
|
product_buttons = product_name_elements
|
|
|
|
self.logger.debug(f"product_buttons 갯수 : [{len(product_buttons)}]개")
|
|
|
|
if not product_buttons:
|
|
self.logger.debug('수정할 상품이 없습니다. 작업을 종료합니다.')
|
|
break
|
|
|
|
# 5. 각 상품에 대해 상품수정작업 수행
|
|
for index, button in enumerate(product_buttons, start=1):
|
|
|
|
# 상품명 수집 오류 처리
|
|
self.logger.debug(f'{index}/{len(product_buttons)} 버튼의 활성상태 확인 중...')
|
|
|
|
is_disabled = self.is_button_disabled(button)
|
|
if is_disabled:
|
|
self.logger.debug(f'{index}/{len(product_buttons)}: 상품의 수정버튼이 비활성화되어 있어 작업을 건너뜁니다.')
|
|
continue
|
|
|
|
self.logger.debug(f'{index}/{len(product_buttons)}: 세부사항 수정 작업 중...')
|
|
|
|
# 상품 수정 다이얼로그 열기
|
|
self.open_product_edit_dialog(button)
|
|
|
|
# 상품명과 카테고리 수집
|
|
product_name = self.titleHandler.get_original_product_name() # 원본상품명 가져오기
|
|
product_category = self.titleHandler.get_category(market='ss') # 카테고리 가져오기
|
|
|
|
# 옵션 수정
|
|
is_optionTrnas = self.toggle_states.get('optionTrnas')
|
|
is_optionIMGTrans = self.toggle_states.get('optionIMGTrans')
|
|
is_optionAutoSelect = self.toggle_states.get('optionAutoSelect')
|
|
|
|
if is_optionTrnas or is_optionIMGTrans or is_optionAutoSelect:
|
|
self.logger.debug(f"옵션수정 : optionTrnas={is_optionTrnas} + optionIMGTrans={is_optionIMGTrans} + optionAutoSelect={is_optionAutoSelect}")
|
|
self.edit_option(product_name)
|
|
|
|
# 가격 수정
|
|
is_price = self.toggle_states.get('price')
|
|
if is_price:
|
|
self.logger.debug(f"가격수정 : {is_price} ")
|
|
self.edit_price(product_category)
|
|
|
|
# 썸네일 수정
|
|
thumb = self.toggle_states.get('thumb')
|
|
if thumb:
|
|
self.logger.debug(f"썸네일수정 : {thumb} ")
|
|
self.edit_thumb()
|
|
|
|
# 태그 수정
|
|
if self.toggle_states.get('tag'):
|
|
pass
|
|
|
|
# 상품명 수정
|
|
if self.toggle_states.get('title'):
|
|
pass
|
|
|
|
# 상세페이지 수정
|
|
detail_Option = self.toggle_states.get('detail_Option')
|
|
detail_IMGTrans = self.toggle_states.get('detail_IMGTrans')
|
|
recovery_mode = self.toggle_states.get('recovery_mode')
|
|
if detail_Option or detail_IMGTrans or recovery_mode:
|
|
self.logger.debug(f"상세페이지 수정 : {detail_Option} + {detail_IMGTrans}")
|
|
|
|
if recovery_mode:
|
|
deleted_imgs = self.deleted_img_urls_from_logs()
|
|
self.detail_trans_for_recovery(product_name, deleted_imgs)
|
|
else:
|
|
self.detail_trans()
|
|
|
|
# 수정 후 저장
|
|
self.logger.debug('상품 세부사항 저장 중...')
|
|
self.save_and_ecs_product_edit()
|
|
|
|
completed_count += 1
|
|
self.logger.info(f'{completed_count}/[{total_products}]개 상품 수정 완료.')
|
|
|
|
if completed_count >= total_products:
|
|
self.logger.info(f'[{total_products}]개 상품 수정이 완료되었습니다.')
|
|
return
|
|
|
|
# 6. 다음 페이지로 이동 (있으면)
|
|
if not self.go_to_next_page():
|
|
self.logger.info('더 이상 페이지가 없습니다. 작업을 종료합니다.')
|
|
break
|
|
page_number += 1
|
|
|
|
if self.running:
|
|
self.logger.info('모든 상품 번역 및 저장 완료.')
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"번역 작업 중 오류 발생: {e}", exc_info=True)
|
|
|
|
|
|
def login(self):
|
|
"""로그인 처리"""
|
|
try:
|
|
is_admin = self.login_infos['is_admin']
|
|
self.logger.info(f'로그인 시도 중: {"관리자" if is_admin else "직원"} 계정')
|
|
|
|
if is_admin:
|
|
# 관리자 로그인 처리
|
|
self.page.fill(self.login_email_locator, self.login_infos['admin_id'])
|
|
self.page.fill(self.login_password_locator, self.login_infos['admin_pw'])
|
|
self.page.click(self.login_button_locator)
|
|
else:
|
|
# 관리자 토글 버튼을 클릭해서 직원 로그인 화면 활성화
|
|
admin_toggle = self.page.locator(self.admin_toggle_locator)
|
|
admin_toggle.wait_for(state="attached", timeout=20000) # 타임아웃 확장
|
|
|
|
if admin_toggle.get_attribute("aria-checked") == "true":
|
|
admin_toggle.click() # 관리자 모드에서 직원 모드로 전환
|
|
|
|
self.page.fill(self.login_email_locator, self.login_infos['admin_id'])
|
|
self.page.fill(self.staff_id_locator, self.login_infos['user_id'])
|
|
self.page.fill(self.login_password_locator, self.login_infos['user_pw'])
|
|
self.page.click(self.staff_login_button_locator)
|
|
|
|
self.logger.info(f'로그인 완료: {"관리자" if is_admin else "직원"} 계정')
|
|
except Exception as e:
|
|
self.logger.error(f"로그인 오류: {str(e)}", exc_info=True)
|
|
self.browser_error.emit(str(e))
|
|
|
|
def close_browser(self):
|
|
"""브라우저 종료"""
|
|
if self.browser:
|
|
self.browser.close()
|
|
self.playwright.stop()
|
|
self.cleanup()
|
|
self.logger.info('브라우저 종료됨.')
|
|
|
|
def get_total_product_count(self):
|
|
total_count = 0
|
|
items_per_page = 0
|
|
|
|
try:
|
|
# total_count_elements = self.page.query_selector_all(".sc-dOvA-dm.jqRNYf")
|
|
total_count_element = self.page.query_selector("div#root span:has-text('개 상품')")
|
|
items_per_page_element = self.page.query_selector("div#root [title$='개씩 보기']")
|
|
|
|
self.logger.debug(f"total_count_element : {total_count_element}")
|
|
|
|
if total_count_element:
|
|
total_count_text = total_count_element.inner_text()
|
|
if "총" in total_count_text and "개 상품" in total_count_text:
|
|
total_count = int(''.join(re.findall(r'\d+', total_count_text)))
|
|
self.logger.info(f"총 상품수 확인: {total_count} 개")
|
|
|
|
# 페이지당 상품 수 추출
|
|
if items_per_page_element:
|
|
items_per_page_text = items_per_page_element.get_attribute("title")
|
|
if items_per_page_text and "개씩 보기" in items_per_page_text:
|
|
items_per_page = int(''.join(re.findall(r'\d+', items_per_page_text)))
|
|
self.logger.info(f"페이지당 상품수 확인: {items_per_page} 개씩 보기")
|
|
|
|
# 결과 반환
|
|
if total_count:
|
|
return {"total_count": total_count, "items_per_page": items_per_page}
|
|
|
|
return {"total_count": 0, "items_per_page": 0}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"상품 수를 가져오는 중 오류 발생: {e}", exc_info=True)
|
|
return {"total_count": 0, "items_per_page": 0}
|
|
|
|
def fetch_image_urls(self, html_content):
|
|
"""
|
|
HTML 콘텐츠에서 모든 <img> 태그의 URL을 추출하는 함수.
|
|
<figure> 안의 <img> 태그와 독립된 <img> 태그 모두 처리.
|
|
"""
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
# 모든 <img> 태그를 찾기
|
|
image_urls = []
|
|
img_tags = soup.find_all('img')
|
|
|
|
for img in img_tags:
|
|
# img 태그에서 src 속성 추출
|
|
if 'src' in img.attrs:
|
|
image_url = img['src']
|
|
image_urls.append(image_url)
|
|
self.logger.debug(f"fetch_image_urls 에서 추출한 이미지URL 갯수 : {len(image_urls)} 개")
|
|
|
|
self.logger.debug(f"fetch_image_urls 에서 추출한 이미지URL 목록 : {image_urls}")
|
|
|
|
return image_urls
|
|
|
|
def close_ad_if_exists(self):
|
|
"""광고 다이얼로그가 있으면 닫기 버튼을 클릭하는 메서드"""
|
|
try:
|
|
# 광고 다이얼로그가 나타날 때까지 기다림
|
|
self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=3000, state='visible')
|
|
self.logger.info("다이얼로그가 발견되었습니다. 닫기 버튼을 클릭합니다.")
|
|
|
|
# 닫기 버튼 클릭
|
|
close_button = self.page.query_selector(self.close_ad_button_locator)
|
|
if close_button:
|
|
close_button.click()
|
|
self.logger.info("다이얼로그를 성공적으로 닫았습니다.")
|
|
else:
|
|
self.logger.warning("닫기 버튼을 찾지 못했습니다.")
|
|
|
|
except TimeoutError:
|
|
# 다이얼로그가 없을 때: info 수준의 로그로 기록
|
|
self.logger.info("다이얼로그가 발견되지 않았습니다. 타임아웃이 발생했습니다.")
|
|
except Exception as e:
|
|
# 다른 예외 상황 발생 시 error로 기록
|
|
self.logger.error(f"다이얼로그 닫기 중 오류 발생: {e}", exc_info=True)
|
|
|
|
def go_to_new_product_page(self):
|
|
"""신규 상품 등록 페이지로 이동"""
|
|
try:
|
|
new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
|
|
self.page.click(new_product_page_locator)
|
|
self.logger.info("신규 상품 등록 페이지로 이동 완료.")
|
|
except Exception as e:
|
|
self.logger.error(f"신규 상품 등록 페이지 이동 중 오류: {e}", exc_info=True)
|
|
|
|
def go_to_registered_product_page(self):
|
|
"""신규 상품 등록 페이지로 이동"""
|
|
try:
|
|
registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
|
|
self.page.click(registered_product_page_locator)
|
|
self.logger.info("등록 상품 관리 페이지로 이동 완료.")
|
|
except Exception as e:
|
|
self.logger.error(f"등록 상품 관리 페이지 이동 중 오류: {e}", exc_info=True)
|
|
|
|
def is_button_disabled(self, button):
|
|
"""버튼이 disabled 상태인지 확인"""
|
|
try:
|
|
# 버튼의 disabled 속성 확인
|
|
is_disabled = button.get_attribute('disabled')
|
|
return is_disabled is not None # disabled 속성이 있으면 True 반환
|
|
except Exception as e:
|
|
self.logger.error(f"상품 수정 버튼 상태 확인 중 오류 발생: {e}", exc_info=True)
|
|
return False # 오류 발생 시 기본적으로 활성화된 것으로 처리
|
|
|
|
def get_product_edit_buttons_by_templete(self):
|
|
"""현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기"""
|
|
try:
|
|
# 버튼 선택자 설정
|
|
# edit_button_selector_template = f'//button[span[text()="세부사항 수정 및 업로드"]]'
|
|
self.product_edit_button_template
|
|
# 선택자를 사용해 버튼 객체를 찾음
|
|
buttons = self.page.locator(self.product_edit_button_template)
|
|
|
|
# 버튼이 존재하는지 확인
|
|
button_count = buttons.count()
|
|
if button_count == 0:
|
|
self.logger.warning("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.")
|
|
return []
|
|
|
|
self.logger.info(f"현재 페이지의 수정할 상품 개수: {button_count}")
|
|
|
|
# 모든 버튼을 리스트로 반환
|
|
return [buttons.nth(i) for i in range(button_count)]
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"상품 수정 버튼을 찾는 중 오류: {e}", exc_info=True)
|
|
return []
|
|
|
|
|
|
def click_modify_button_by_text(self, index):
|
|
"""인덱스에 해당하는 '세부사항 수정 및 업로드' 버튼 클릭"""
|
|
try:
|
|
# config.ini에서 선택자 가져오기
|
|
button_template = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template')
|
|
button_selector = f'({button_template})[{index}]'
|
|
|
|
button = self.page.query_selector(button_selector)
|
|
|
|
# 버튼이 화면에 보이도록 스크롤 후 클릭
|
|
if button:
|
|
button.scroll_into_view_if_needed()
|
|
self.page.evaluate('arguments[0].click();', button)
|
|
self.logger.info(f'{index}번째 상품의 수정 버튼 클릭 완료')
|
|
else:
|
|
self.logger.warning(f'{index}번째 상품의 수정 버튼을 찾지 못했습니다.')
|
|
except Exception as e:
|
|
self.logger.error(f'{index}번째 상품의 수정 버튼 클릭 중 오류: {str(e)}')
|
|
|
|
def open_product_edit_dialog(self, button):
|
|
"""상품 수정 다이얼로그 열기"""
|
|
try:
|
|
# 요소가 화면에 없을 경우 스크롤하여 보이도록 함
|
|
button.scroll_into_view_if_needed()
|
|
self.logger.debug("상품의 '세부사항 수정 및 업로드' 버튼을 화면에 보이도록 스크롤.")
|
|
|
|
button.click()
|
|
self.logger.info("세부사항 수정 다이얼로그 열기 완료.")
|
|
self.page.wait_for_selector('div.ant-tabs-nav') # 다이얼로그가 완전히 로딩될 때까지 기다림
|
|
except Exception as e:
|
|
self.logger.error(f"세부사항 수정 다이얼로그 열기 중 오류: {e}", exc_info=True)
|
|
|
|
def click_detail_tab(self):
|
|
"""상세페이지 탭 클릭"""
|
|
try:
|
|
self.page.click(self.detail_tab_locator)
|
|
self.logger.info("상세페이지 탭 클릭 완료.")
|
|
except Exception as e:
|
|
self.logger.error(f"상세페이지 탭 클릭 중 오류: {e}", exc_info=True)
|
|
|
|
def click_option_tab(self):
|
|
"""옵션 탭 클릭"""
|
|
try:
|
|
self.page.click(self.option_tab_locator)
|
|
self.logger.info("옵션 탭 클릭 완료.")
|
|
except Exception as e:
|
|
self.logger.error(f"옵션 탭 클릭 중 오류: {e}", exc_info=True)
|
|
|
|
def click_price_tab(self):
|
|
"""가격 탭 클릭"""
|
|
try:
|
|
self.page.click(self.price_tab_locator)
|
|
self.logger.info("가격 탭 클릭 완료.")
|
|
except Exception as e:
|
|
self.logger.error(f"가격 탭 클릭 중 오류: {e}", exc_info=True)
|
|
|
|
def click_thumb_tab(self):
|
|
"""썸네일 탭 클릭"""
|
|
try:
|
|
self.page.click(self.thumb_tab_locator)
|
|
self.logger.info("썸네일 탭 클릭 완료.")
|
|
except Exception as e:
|
|
self.logger.error(f"썸네일 탭 클릭 중 오류: {e}", exc_info=True)
|
|
|
|
def click_title_tab(self):
|
|
"""상품명 탭 클릭"""
|
|
try:
|
|
self.page.click(self.title_tab_locator)
|
|
self.logger.info("상품명 탭 클릭 완료.")
|
|
except Exception as e:
|
|
self.logger.error(f"상품명 탭 클릭 중 오류: {e}", exc_info=True)
|
|
|
|
def generate_restored_html(self, urls):
|
|
"""이미지 URL 목록을 HTML 형식으로 변환하는 메서드"""
|
|
html_content = '<p> </p>'
|
|
for url in urls:
|
|
html_content += f'<figure class="image"><img src="{url}" style="aspect-ratio:1/1;"></figure>\n'
|
|
return html_content
|
|
|
|
def deleted_img_urls_from_logs(self):
|
|
"""로그 파일에서 상품명과 이미지 URL 목록을 추출하여 딕셔너리로 반환하는 메서드"""
|
|
image_data = {}
|
|
log_dir = os.path.join(os.path.dirname(__file__), "recovery_log")
|
|
|
|
# 로그 파일에서 필요한 정보만 추출
|
|
for log_file in self.log_files:
|
|
log_path = os.path.join(log_dir, log_file)
|
|
if os.path.exists(log_path):
|
|
with open(log_path, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
|
|
current_product = None
|
|
for line in lines:
|
|
# 상품명 추출
|
|
product_match = re.search(r"원본 상품명 '(.+?)'", line)
|
|
if product_match:
|
|
current_product = product_match.group(1)
|
|
image_data[current_product] = []
|
|
|
|
# 이미지 URL 목록 추출
|
|
url_match = re.search(r"fetch_image_urls 에서 추출한 이미지URL 목록 : \[(.+?)\]", line)
|
|
if url_match and current_product:
|
|
# 각 URL에서 불필요한 작은따옴표 제거
|
|
urls = [url.strip("'\"") for url in url_match.group(1).split(", ")]
|
|
image_data[current_product].extend(urls)
|
|
current_product = None # Reset after each product's URL extraction
|
|
|
|
self.logger.debug(f"복구된 이미지 URL 데이터: {len(image_data)}")
|
|
return image_data
|
|
|
|
def recovery_image_urls(self, product_name, deleted_img_urls):
|
|
"""상품명과 삭제된 이미지 URL 데이터를 이용해 복구 작업을 수행하는 메서드"""
|
|
self.logger.debug("상품명과 삭제된 이미지 URL 데이터를 이용해 복구 작업을 수행하는 메서드")
|
|
|
|
if product_name in deleted_img_urls:
|
|
self.logger.info(f'recovery_image_urls: 삭제된 상품명 찾음 : {product_name}')
|
|
# 소스 편집 모드로 전환
|
|
source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
|
|
ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
|
|
self.page.click(source_button_locator)
|
|
self.logger.debug("recovery_image_urls : 소스 버튼 클릭 완료.")
|
|
|
|
# 기존 extract_image_urls와 유사하게 HTML 소스를 가져옴
|
|
textarea = self.page.wait_for_selector(ck_source_editing_area_locator, timeout=5000)
|
|
data_value = textarea.get_attribute("data-value")
|
|
|
|
# HTML 소스에서 이미지 URL 추출
|
|
image_urls = self.fetch_image_urls(data_value)
|
|
self.logger.info(f'recovery_image_urls: 현재페이지의 HTML에서 추출된 이미지 URL 수: {len(image_urls)}')
|
|
|
|
# 이미지 태그가 없으면 로그에서 추출한 데이터를 HTML로 복원하여 입력
|
|
if len(image_urls) == 0:
|
|
restored_html = self.generate_restored_html(deleted_img_urls[product_name])
|
|
self.page.evaluate(f'document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", `{restored_html}`)')
|
|
self.logger.debug("recovery_image_urls : 현재 페이지의 HTML의 이미지가 0이므로, 로그 데이터를 이용하여 HTML 복원 완료.")
|
|
else:
|
|
self.logger.debug("이미 이미지가 있으므로 복원 작업을 패스합니다.")
|
|
|
|
# 소스 편집 모드 종료
|
|
self.page.click(source_button_locator)
|
|
self.logger.debug('소스 버튼 재 클릭 완료.')
|
|
else:
|
|
self.logger.debug(f"로그에 해당 상품명 '{product_name}'에 대한 데이터가 존재하지 않습니다.")
|
|
|
|
def generate_restored_html(self, urls):
|
|
"""이미지 URL 목록을 HTML 형식으로 변환하는 메서드, 각 이미지의 가로세로 비율 추가"""
|
|
html_content = '<p> </p>\n'
|
|
for url in urls:
|
|
width, height = self.get_image_size(url)
|
|
aspect_ratio = f"{width}/{height}" if width and height else "1/1"
|
|
if width and height:
|
|
html_content += (
|
|
f'<figure class="image">'
|
|
f'<img style="aspect-ratio:{aspect_ratio};" src="{url}" width="{width}" height="{height}">'
|
|
f'</figure>\n'
|
|
)
|
|
else:
|
|
# 이미지 크기를 확인할 수 없을 경우 기본 형식으로 추가
|
|
html_content += f'<figure class="image"><img src="{url}"></figure>\n'
|
|
return html_content
|
|
|
|
def get_image_size(self, url):
|
|
"""이미지 URL로부터 가로와 세로 크기를 가져오는 메서드"""
|
|
try:
|
|
# URL에서 불필요한 따옴표 제거
|
|
cleaned_url = url.strip("'\"")
|
|
|
|
response = requests.get(cleaned_url, timeout=5)
|
|
response.raise_for_status()
|
|
image = Image.open(BytesIO(response.content))
|
|
return image.width, image.height
|
|
except Exception as e:
|
|
self.logger.warning(f"이미지 크기 확인 실패 - {cleaned_url}: {e}")
|
|
return None, None
|
|
|
|
def extract_image_urls(self, optionHandler, is_option_data=False):
|
|
"""상세페이지에서 이미지 URL 추출"""
|
|
try:
|
|
# 소스 편집 모드로 전환
|
|
source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
|
|
ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
|
|
|
|
# 소스 편집 모드로 전환
|
|
self.page.click(source_button_locator)
|
|
self.logger.debug("소스 버튼 클릭 완료.")
|
|
|
|
|
|
# 'data-value' 속성 값을 추출 (textarea 요소)
|
|
textarea = self.page.wait_for_selector(ck_source_editing_area_locator, timeout=5000)
|
|
data_value = textarea.get_attribute("data-value")
|
|
|
|
|
|
# HTML 소스에서 이미지 URL 추출
|
|
image_urls = self.fetch_image_urls(data_value)
|
|
self.logger.info(f'추출된 이미지 URL 수: {len(image_urls)}')
|
|
|
|
# HTML 소스에서 이미지 URL 삭제
|
|
self.logger.debug('img 태그를 삭제 중...')
|
|
data_value_element = self.page.query_selector(ck_source_editing_area_locator)
|
|
new_value = ""
|
|
if data_value_element:
|
|
self.page.evaluate(f'() => document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", "{new_value}")')
|
|
updated_value = data_value_element.get_attribute('data-value')
|
|
self.logger.debug(f'Updated data-value: {updated_value}')
|
|
else:
|
|
self.logger.debug('Element with data-value not found.')
|
|
self.logger.debug('img 태그 삭제 완료.')
|
|
|
|
# img 태그의 class 삭제 후 다시 소스 버튼 클릭
|
|
self.page.click(source_button_locator)
|
|
self.logger.debug('소스 버튼 재 클릭 완료.')
|
|
|
|
|
|
# 텍스트 입력 필드 선택
|
|
input_field = self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
|
|
input_field.press('Enter')
|
|
|
|
# 선두부 텍스트 입력
|
|
for key in sorted(self.text_templates.keys()):
|
|
leading_text = self.text_templates[key]
|
|
if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
|
|
input_field.type(leading_text)
|
|
input_field.press('Enter')
|
|
self.logger.info(f"{key} 텍스트 입력 완료: {leading_text}")
|
|
|
|
option_data = optionHandler.get_selected_translated_options()
|
|
self.logger.debug(f'option_data : {option_data}')
|
|
|
|
# if is_option_data:
|
|
if option_data or option_data != {}:
|
|
self.logger.debug('옵션 데이터 입력 시작')
|
|
# option_data = {} # option_data 초기화
|
|
is_single = optionHandler.option_info['is_single_option']
|
|
# option_data = optionHandler.get_selected_translated_options()
|
|
|
|
# is_single = True # 옵션입력 일단 제외
|
|
# self.logger.debug('옵션입력 일단 제외')
|
|
|
|
self.logger.debug('가져온 옵션 데이터')
|
|
self.logger.debug(f'{option_data}')
|
|
|
|
# # 옵션 입력 필드 선택
|
|
# input_field = self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
|
|
# input_field.press('Enter')
|
|
|
|
# # 선두부 텍스트 입력
|
|
# for key in sorted(self.text_templates.keys()):
|
|
# leading_text = self.text_templates[key]
|
|
# if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
|
|
# input_field.type(leading_text)
|
|
# input_field.press('Enter')
|
|
# self.logger.info(f"{key} 텍스트 입력 완료: {leading_text}")
|
|
|
|
if not is_single:
|
|
self.logger.info('단일옵션이 아니므로 옵션목록을 입력')
|
|
|
|
# 각 옵션을 한 줄씩 입력
|
|
input_field.type("# 옵션 목록")
|
|
input_field.press('Enter')
|
|
|
|
# 첫 번째 옵션의 번역된 옵션명만 입력
|
|
first_key = list(option_data.keys())[0] # key는 옵션이름 value는 가격
|
|
first_value = option_data[first_key] # key는 옵션이름 value는 가격
|
|
input_field.type(f"- 1. {first_key}")
|
|
input_field.press('Enter') # 첫 번째 옵션 이후 엔터로 줄바꿈
|
|
|
|
# 나머지 옵션도 번역된 옵션명만 입력
|
|
for i, (key, value) in enumerate(list(option_data.items())[1:], start=2):
|
|
input_field.type(f"{key}") # 2번옵션부터 번호 떼고 입력. key는 옵션이름 value는 가격
|
|
input_field.press('Enter') # 엔터 키를 입력하여 줄바꿈
|
|
|
|
# 목록 끝을 알리기 위해 엔터 두 번 입력
|
|
input_field.press('Enter')
|
|
input_field.press('Enter')
|
|
|
|
# 후두부 텍스트 입력
|
|
input_field.type('### 나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.')
|
|
input_field.press('Enter')
|
|
input_field.type('---')
|
|
input_field.press('Enter')
|
|
|
|
self.logger.info('옵션 데이터 입력 완료')
|
|
|
|
return image_urls
|
|
except Exception as e:
|
|
self.logger.error(f"이미지 URL 추출 & 옵션데이터 입력 처리 중 오류: {e}", exc_info=True)
|
|
return image_urls if image_urls else []
|
|
|
|
def save_and_ecs_product_edit(self):
|
|
"""상품 수정 후 저장 버튼 클릭"""
|
|
try:
|
|
self.page.click(self.save_button_locator)
|
|
self.page.keyboard.press("Escape")
|
|
self.logger.info("상품 수정 내용 저장 및 ECS 완료.")
|
|
except Exception as e:
|
|
self.logger.error(f"저장 버튼 클릭 중 오류: {e}", exc_info=True)
|
|
|
|
def save_product_edit(self):
|
|
"""상품 수정 후 저장 버튼 클릭"""
|
|
try:
|
|
self.page.click(self.save_button_locator)
|
|
self.logger.info("상품 수정 내용 저장 완료.")
|
|
except Exception as e:
|
|
self.logger.error(f"저장 버튼 클릭 중 오류: {e}", exc_info=True)
|
|
|
|
def go_to_next_page(self):
|
|
"""다음 페이지로 이동"""
|
|
try:
|
|
# 현재 페이지가 몇 번째 페이지인지 확인 (클래스에 'ant-pagination-item-active'가 있는 요소)
|
|
current_page = self.page.query_selector(self.current_page_locator)
|
|
|
|
if not current_page:
|
|
self.logger.warning("현재 페이지 정보를 찾을 수 없습니다.")
|
|
return False
|
|
|
|
# 현재 활성화된 페이지 번호를 가져옴
|
|
current_page_number = int(current_page.get_attribute("title"))
|
|
self.logger.info(f"현재페이지 : [{current_page_number}]")
|
|
|
|
next_page_number = current_page_number + 1
|
|
|
|
# 다음 페이지 버튼을 찾음 (title 속성으로 다음 페이지를 찾음)
|
|
next_page_button_locator = self.next_page_button_template.format(page_number=next_page_number)
|
|
next_page_button = self.page.query_selector(next_page_button_locator)
|
|
|
|
if next_page_button:
|
|
next_page_button.click() # 페이지 버튼 클릭
|
|
# self.page.wait_for_load_state('domcontentloaded') # 페이지 로딩이 완료될 때까지 대기
|
|
time.sleep(3)
|
|
self.logger.info(f"페이지 {next_page_number}로 이동 완료.")
|
|
return True
|
|
else:
|
|
self.logger.warning("다음 페이지가 없습니다.")
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"다음 페이지로 이동 중 오류 발생: {e}", exc_info=True)
|
|
return False
|
|
|
|
# def switch_to_chrome(self):
|
|
# """크롬으로 포커스 전환"""
|
|
# try:
|
|
# if not self.chrome_hwnd:
|
|
# self.chrome_hwnd = self.find_window_by_title(self.chrome_window_name)
|
|
# if self.chrome_hwnd:
|
|
# win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE)
|
|
# win32gui.SetForegroundWindow(self.chrome_hwnd)
|
|
# self.logger.debug('크롬 창으로 포커스 이동.')
|
|
# else:
|
|
# self.logger.warning('크롬 창을 찾을 수 없습니다.')
|
|
# except Exception as e:
|
|
# self.logger.error(f"크롬 포커스 전환 중 오류: {e}", exc_info=True)
|
|
|
|
def scroll_with_wheel(self, direction="down", pause_time=0.5, max_scrolls=50):
|
|
"""
|
|
휠 스크롤을 사용하여 페이지를 위나 아래로 천천히 스크롤.
|
|
|
|
Parameters:
|
|
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
|
|
- pause_time: 스크롤 사이의 대기 시간 (초).
|
|
- max_scrolls: 최대 스크롤 횟수.
|
|
"""
|
|
scroll_count = 0
|
|
|
|
self.logger.debug(f"스크롤 시작")
|
|
|
|
# 현재 페이지 높이 가져오기
|
|
last_height = self.page.evaluate("document.body.scrollHeight")
|
|
self.logger.debug(f"현재 페이지 높이 가져오기 - {last_height}")
|
|
|
|
while scroll_count < max_scrolls:
|
|
if direction == "down":
|
|
# 아래로 스크롤
|
|
self.logger.debug(f"scroll_count[{scroll_count}]회 : 휠 아래로 1000px")
|
|
self.page.evaluate("window.scrollBy(0, 1000);")
|
|
elif direction == "up":
|
|
# 위로 스크롤
|
|
self.logger.debug(f"scroll_count[{scroll_count}]회 : 휠 위로 1000px")
|
|
self.page.evaluate("window.scrollBy(0, -1000);")
|
|
else:
|
|
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
|
|
|
|
self.logger.debug(f"pause_time 슬립 : {pause_time}")
|
|
time.sleep(pause_time)
|
|
|
|
# 새로운 페이지 높이 가져오기
|
|
new_height = self.page.evaluate("document.body.scrollHeight")
|
|
self.logger.debug(f"새로운 페이지 높이 가져오기 - {new_height}")
|
|
|
|
# 스크롤이 더 이상 필요 없는 경우(페이지 끝에 도달)
|
|
if direction == "down" and new_height == last_height:
|
|
self.logger.debug(f"페이지 끝에 도달했습니다. 스크롤 횟수: {scroll_count}")
|
|
break
|
|
elif direction == "up" and new_height == 0:
|
|
self.logger.debug(f"페이지 시작에 도달했습니다. 스크롤 횟수: {scroll_count}")
|
|
break
|
|
|
|
self.logger.debug(f"새로운 페이지 높이를 현재높이로 재설정 - {new_height}")
|
|
last_height = new_height
|
|
scroll_count += 1
|
|
self.logger.debug(f"스크롤 카운트 + 1")
|
|
|
|
if scroll_count == max_scrolls:
|
|
self.logger.debug("최대 스크롤 횟수에 도달했습니다.")
|
|
|
|
def scroll_with_keyboard(self, direction="down", pause_time=0.5, max_scrolls=50):
|
|
"""
|
|
키보드를 사용하여 페이지를 위나 아래로 천천히 스크롤.
|
|
|
|
Parameters:
|
|
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
|
|
- pause_time: 스크롤 사이의 대기 시간 (초).
|
|
- max_scrolls: 최대 스크롤 횟수.
|
|
"""
|
|
scroll_count = 0
|
|
|
|
while scroll_count < max_scrolls:
|
|
if direction == "down":
|
|
# 아래로 스크롤 (Page Down 키 사용)
|
|
self.page.keyboard.press("PageDown")
|
|
elif direction == "up":
|
|
# 위로 스크롤 (Page Up 키 사용)
|
|
self.page.keyboard.press("PageUp")
|
|
else:
|
|
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
|
|
|
|
time.sleep(pause_time)
|
|
|
|
scroll_count += 1
|
|
|
|
if scroll_count == max_scrolls:
|
|
self.logger.debug("최대 스크롤 횟수에 도달했습니다.")
|
|
|
|
|
|
def collect_product_info(self, items_per_page, ed_mode):
|
|
"""
|
|
상품 정보를 수집하는 메서드
|
|
"""
|
|
try:
|
|
product_infos = []
|
|
product_name_elements = [] # product_name_element를 저장할 리스트
|
|
|
|
# ed_mode에 따라 product_elements 설정
|
|
if ed_mode:
|
|
# 각 상품의 이름, 가격, 이미지를 위한 선택자 리스트 구성 (index가 2부터 시작)
|
|
product_elements = [
|
|
{
|
|
"name": self.product_name_for_ed_template.format(index=i),
|
|
"price": self.product_price_for_ed_template.format(index=i),
|
|
"image": self.product_image_for_ed_template.format(index=i)
|
|
}
|
|
for i in range(2, items_per_page + 2) # index가 2부터 시작하도록 설정
|
|
]
|
|
else:
|
|
# ed_mode=False일 때는 각 상품의 부모 요소를 모두 선택
|
|
product_elements = self.page.query_selector_all(self.product_parent_locator)
|
|
|
|
for i, element in enumerate(product_elements[:items_per_page], start=1):
|
|
try:
|
|
if ed_mode:
|
|
# ed_mode=True일 때는 각 상품의 개별 선택자 사용
|
|
product_name_element = self.page.wait_for_selector(element["name"], timeout=3000, state="attached")
|
|
product_price_element = self.page.wait_for_selector(element["price"], timeout=3000, state="attached")
|
|
product_image_element = self.page.wait_for_selector(element["image"], timeout=3000, state="attached")
|
|
else:
|
|
# ed_mode=False일 때 부모 요소 내의 선택자를 사용
|
|
product_name_element = self.page.wait_for_selector(self.product_name_inner_locator, timeout=3000, state="attached")
|
|
product_price_element = self.page.wait_for_selector(self.product_price_inner_locator, timeout=3000, state="attached")
|
|
product_image_element = self.page.wait_for_selector(self.product_image_inner_locator, timeout=3000, state="attached")
|
|
|
|
# 요소가 존재하면 정보 추출
|
|
self.logger.debug(f"product_name_element : {product_name_element}")
|
|
self.logger.debug(f"product_price_element : {product_price_element}")
|
|
self.logger.debug(f"product_image_element : {product_image_element}")
|
|
|
|
if product_name_element and product_price_element and product_image_element:
|
|
# await의 결과를 각 변수에 저장
|
|
product_name_text = (product_name_element.inner_text()).strip()
|
|
product_price_text = (product_price_element.inner_text()).strip()
|
|
product_image_url = product_image_element.get_attribute('src')
|
|
|
|
# product_info 딕셔너리에 결과 저장
|
|
product_info = {
|
|
"name": product_name_text,
|
|
"price": product_price_text,
|
|
"image_url": product_image_url
|
|
}
|
|
self.logger.debug(f"상품 {i}: {product_info}")
|
|
product_infos.append(product_info)
|
|
product_name_elements.append(product_name_element) # 각 product_name_element 추가
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"상품 {i} 정보 수집 중 오류 발생: {e}", exc_info=True)
|
|
continue
|
|
|
|
return product_infos, product_name_elements # product_infos와 product_name_elements 함께 반환
|
|
except Exception as e:
|
|
self.logger.error(f"상품 정보 수집 중 오류 발생: {e}", exc_info=True)
|
|
return []
|
|
|
|
|
|
|
|
def scroll_page_to_bottom(self, pause_time=0.2):
|
|
"""페이지의 맨 아래까지 스크롤하여 모든 동적 요소를 로드"""
|
|
self.logger.info('페이지 스크롤 시작...')
|
|
previous_height = self.page.evaluate("() => document.body.scrollHeight")
|
|
|
|
while True:
|
|
self.page.evaluate("window.scrollBy(0, window.innerHeight);") # 한 화면씩 스크롤
|
|
time.sleep(pause_time) # 페이지 로딩 대기
|
|
current_height = self.page.evaluate("() => document.body.scrollHeight")
|
|
if current_height == previous_height:
|
|
break # 더 이상 스크롤할 내용이 없으면 종료
|
|
previous_height = current_height
|
|
self.logger.info('페이지 스크롤 완료.')
|
|
|
|
def scroll_page_to_top(self, pause_time=0.2):
|
|
"""페이지의 맨 위까지 스크롤"""
|
|
self.logger.info('페이지 위로 스크롤 시작...')
|
|
previous_height = self.page.evaluate("() => window.pageYOffset")
|
|
|
|
while previous_height > 0:
|
|
self.page.evaluate("window.scrollBy(0, -window.innerHeight);") # 한 화면씩 위로 스크롤
|
|
time.sleep(pause_time) # 페이지 로딩 대기
|
|
current_height = self.page.evaluate("() => window.pageYOffset")
|
|
if current_height == previous_height:
|
|
break # 더 이상 스크롤할 내용이 없으면 종료
|
|
previous_height = current_height
|
|
|
|
self.logger.info('페이지 위로 스크롤 완료.')
|
|
|
|
|
|
|
|
|
|
def start_Percenty_task(self):
|
|
self.logger.debug('퍼센티 상품수정 작업을 시작합니다...')
|
|
self.running = True # 번역 작업이 시작됨
|
|
|
|
try:
|
|
# 1. 총 상품 수 수집
|
|
self.scroll_page_to_bottom() # 동적 로딩을 위해 끝까지 스크롤
|
|
|
|
# total_products = self.browser_controller.get_total_product_count(ed_mode=self.toggle_states['ed_mode'])
|
|
|
|
# get_total_product_count 메서드 호출 후 결과를 딕셔너리로 받음
|
|
result = self.get_total_product_count()
|
|
# 딕셔너리에서 총 상품 수와 페이지당 상품 수를 추출
|
|
total_products = result.get("total_count", 0)
|
|
items_per_page = result.get("items_per_page", 0)
|
|
self.logger.debug(f"총 상품 수: {total_products}, 페이지당 상품 수: {items_per_page}")
|
|
|
|
self.logger.debug(f"[ self.toggle_states 상태 ]\n{self.toggle_states}")
|
|
|
|
if total_products == 0:
|
|
self.logger.debug('수집할 상품이 없습니다. 작업을 종료합니다.')
|
|
return
|
|
|
|
completed_count = 0
|
|
page_number = 1
|
|
|
|
# 3. 총 상품 수만큼 반복 작업 수행
|
|
while completed_count < total_products:
|
|
self.logger.debug(f'현재 페이지: {page_number}')
|
|
|
|
if not page_number == 1:
|
|
self.scroll_page_to_top()
|
|
self.logger.debug(f'1페이지가 아니므로 상품의 동적로딩을 위해 휠 스크롤 업')
|
|
|
|
is_ed_mode = self.toggle_states.get('ed_mode', False)
|
|
if not is_ed_mode:
|
|
# 4. 현재 페이지의 모든 "세부사항 수정 및 업로드" 버튼 찾기
|
|
self.logger.debug('수정모드가 아니므로 상품수정 버튼 elements를 수집합니다.')
|
|
product_buttons = self.get_product_edit_buttons_by_templete()
|
|
else:
|
|
self.logger.debug('상품정보 수집')
|
|
product_infos, product_name_elements = self.collect_product_info(items_per_page, ed_mode=is_ed_mode)
|
|
self.logger.debug(f"product_infos : {product_infos}")
|
|
self.logger.debug('수정모드이므로 상품명 elements를 수정버튼으로 활용합니다.')
|
|
product_buttons = product_name_elements
|
|
|
|
self.logger.debug(f"product_buttons 갯수 : [{len(product_buttons)}]개")
|
|
|
|
if not product_buttons:
|
|
self.logger.debug('수정할 상품이 없습니다. 작업을 종료합니다.')
|
|
break
|
|
|
|
# 5. 각 상품에 대해 상품수정작업 수행
|
|
for index, button in enumerate(product_buttons, start=1):
|
|
|
|
# 상품명 수집 오류 처리
|
|
self.logger.debug(f'{index}/{len(product_buttons)} 버튼의 활성상태 확인 중...')
|
|
|
|
is_disabled = self.is_button_disabled(button)
|
|
if is_disabled:
|
|
self.logger.debug(f'{index}/{len(product_buttons)}: 상품의 수정버튼이 비활성화되어 있어 작업을 건너뜁니다.')
|
|
continue
|
|
|
|
self.logger.debug(f'{index}/{len(product_buttons)}: 세부사항 수정 작업 중...')
|
|
|
|
# 상품 수정 다이얼로그 열기
|
|
self.open_product_edit_dialog(button)
|
|
|
|
# 상품명과 카테고리 수집
|
|
product_name = self.titleHandler.get_original_product_name() # 원본상품명 가져오기
|
|
product_category = self.titleHandler.get_category(market='ss') # 카테고리 가져오기
|
|
|
|
# 옵션 수정
|
|
is_optionTrnas = self.toggle_states.get('optionTrnas')
|
|
is_optionIMGTrans = self.toggle_states.get('optionIMGTrans')
|
|
is_optionAutoSelect = self.toggle_states.get('optionAutoSelect')
|
|
|
|
if is_optionTrnas or is_optionIMGTrans or is_optionAutoSelect:
|
|
self.logger.debug(f"옵션수정 : optionTrnas={is_optionTrnas} + optionIMGTrans={is_optionIMGTrans} + optionAutoSelect={is_optionAutoSelect}")
|
|
self.edit_option(product_name)
|
|
|
|
# 가격 수정
|
|
is_price = self.toggle_states.get('price')
|
|
if is_price:
|
|
self.logger.debug(f"가격수정 : {is_price} ")
|
|
self.edit_price(product_category)
|
|
|
|
# 썸네일 수정
|
|
thumb = self.toggle_states.get('thumb')
|
|
if thumb:
|
|
self.logger.debug(f"썸네일수정 : {thumb} ")
|
|
self.edit_thumb()
|
|
|
|
# 태그 수정
|
|
if self.toggle_states.get('tag'):
|
|
pass
|
|
|
|
# 상품명 수정
|
|
if self.toggle_states.get('title'):
|
|
pass
|
|
|
|
# 상세페이지 수정
|
|
detail_Option = self.toggle_states.get('detail_Option')
|
|
detail_IMGTrans = self.toggle_states.get('detail_IMGTrans')
|
|
recovery_mode = self.toggle_states.get('recovery_mode')
|
|
if detail_Option or detail_IMGTrans or recovery_mode:
|
|
self.logger.debug(f"상세페이지 수정 : {detail_Option} + {detail_IMGTrans}")
|
|
|
|
if recovery_mode:
|
|
deleted_imgs = self.deleted_img_urls_from_logs()
|
|
self.detail_trans_for_recovery(product_name, deleted_imgs)
|
|
else:
|
|
self.detail_trans()
|
|
|
|
# 수정 후 저장
|
|
self.logger.debug('상품 세부사항 저장 중...')
|
|
self.save_and_ecs_product_edit()
|
|
|
|
completed_count += 1
|
|
self.logger.info(f'{completed_count}/[{total_products}]개 상품 수정 완료.')
|
|
|
|
if completed_count >= total_products:
|
|
self.logger.info(f'[{total_products}]개 상품 수정이 완료되었습니다.')
|
|
return
|
|
|
|
# 6. 다음 페이지로 이동 (있으면)
|
|
if not self.go_to_next_page():
|
|
self.logger.info('더 이상 페이지가 없습니다. 작업을 종료합니다.')
|
|
break
|
|
page_number += 1
|
|
|
|
if self.running:
|
|
self.logger.info('모든 상품 번역 및 저장 완료.')
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"번역 작업 중 오류 발생: {e}", exc_info=True)
|
|
|
|
|
|
|
|
|
|
|
|
def detail_trans(self):
|
|
try:
|
|
# 상세페이지 탭 클릭
|
|
self.click_detail_tab()
|
|
|
|
# # 텍스트 입력 필드 선택
|
|
input_field2 = self.page.locator('div#productMainContentContainerId div.ck.ck-editor__main > div')
|
|
input_field2.click()
|
|
self.logger.debug(f"입력필드2 선택")
|
|
|
|
# 소개글과 옵션데이터 입력
|
|
self.page.keyboard.press("ArrowLeft")
|
|
self.page.keyboard.press("Enter")
|
|
self.page.keyboard.press("ArrowUp")
|
|
self.input_detail_text(self.optionHandler)
|
|
|
|
# CKEditor HTML 데이터 가져오기
|
|
html_data = self.get_ckeditor_data()
|
|
self.logger.debug(f"가져온 원본 CKEditor html_data : {html_data}")
|
|
|
|
# 이미지 URL 추출
|
|
soup = BeautifulSoup(html_data, 'html.parser')
|
|
images = soup.find_all('img', src=True)
|
|
|
|
original_images = [] # 원본 이미지 URL 저장
|
|
self.logger.debug(f"가져온 이미지 갯수: {len(images)} 개")
|
|
|
|
for index, img in enumerate(images):
|
|
original_url = img.get('src')
|
|
self.logger.debug(f"[{index}]번째 original_url : {original_url}")
|
|
|
|
if not original_url:
|
|
continue
|
|
|
|
original_images.append(original_url) # 원본 URL 저장
|
|
|
|
# 이미지 번역을 위한 임시 파일 경로 생성
|
|
img_path = os.path.join(self.temp_dir, f"translated_detail_image_{index}.png")
|
|
self.logger.debug(f"[{index}]번째 임시 저장PATH 생성 : {img_path}")
|
|
|
|
self.logger.debug(f"웨일 브라우저를 활용한 이미지 번역 프로세스")
|
|
is_translated = self.whale_translator.translate_image(original_url)
|
|
|
|
# self.switch_to_chrome() # 크롬으로 포커스 이동
|
|
|
|
translated_img_path = self.clipboardImageManager.process_clipboard_to_save_path(original_url=original_url, is_success_translated=is_translated, toggle_states=self.toggle_states, path=img_path) # 클립보드 내용을 처리
|
|
self.logger.info(f"번역완료된 임시 translated_img의 로컬 path: {translated_img_path}")
|
|
|
|
# 번역된 이미지 업로드
|
|
self.upload_image(translated_img_path)
|
|
|
|
self.clipboardImageManager.clear_clipboard()
|
|
self.logger.info(f"{index}번째 이미지가 번역완료되어 클립보드를 초기화 했습니다.")
|
|
|
|
if not is_translated:
|
|
self.logger.info(f"[{index}]번째 이미지 번역 실패로 원본이미지 삭제목록에서 제외합니다.")
|
|
original_images.remove(original_url) # 번역 실패 시 제거
|
|
continue
|
|
|
|
self.logger.info("모든 이미지가 처리되었습니다.")
|
|
|
|
self.page.keyboard.press("Enter")
|
|
self.page.keyboard.press("Enter")
|
|
self.page.keyboard.press("Enter")
|
|
|
|
# 원본 이미지 제거
|
|
time.sleep(1)
|
|
html_data = self.get_ckeditor_data() # 최신 HTML 가져오기
|
|
modified_html = self.remove_original_images(html_data)
|
|
|
|
# 수정된 HTML을 CKEditor에 다시 삽입
|
|
self.set_ckeditor_data(modified_html)
|
|
# self.logger.info(f"html_data : {html_data}")
|
|
# self.logger.info(f"original_images : {original_images}")
|
|
# self.logger.info(f"modified_html : {modified_html}")
|
|
self.logger.info("원본 이미지가 제거된 HTML 데이터가 CKEditor에 삽입되었습니다.")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"CKEditor 이미지 처리 중 오류 발생: {e}", exc_info=True)
|
|
|
|
# 수정 후 저장
|
|
self.logger.info('상품 세부사항 저장 중...')
|
|
self.save_product_edit()
|
|
|
|
def remove_original_images(self, html_data):
|
|
"""
|
|
CKEditor HTML 데이터에서 연속된 엔터 3번 친 부분 이후의 데이터를 삭제합니다.
|
|
:param html_data: CKEditor의 HTML 데이터
|
|
:return: 수정된 HTML 데이터
|
|
"""
|
|
try:
|
|
self.logger.debug("HTML 데이터에서 엔터 세 번 친 부분 이후 제거 작업 시작")
|
|
|
|
# BeautifulSoup으로 HTML 데이터 파싱
|
|
soup = BeautifulSoup(html_data, 'html.parser')
|
|
|
|
# 모든 <p> </p> 태그를 찾음
|
|
enter_tags = soup.find_all('p', string="\xa0")
|
|
self.logger.debug(f"찾은 <p> </p> 태그 수: {len(enter_tags)}")
|
|
|
|
# 연속된 <p> </p> 태그 3개를 찾음
|
|
count = 0
|
|
start_removal = None
|
|
for i, tag in enumerate(enter_tags):
|
|
if i > 0 and tag.previous_sibling == enter_tags[i - 1]:
|
|
count += 1
|
|
else:
|
|
count = 1
|
|
|
|
if count == 3: # 연속된 세 번째 태그 발견
|
|
start_removal = tag
|
|
self.logger.debug(f"엔터 세 번 연속된 마지막 태그 발견: {start_removal}")
|
|
break
|
|
|
|
# 연속된 엔터 태그 이후 데이터 제거
|
|
if start_removal:
|
|
# 기준 태그 이후의 모든 형제를 삭제
|
|
for sibling in list(start_removal.find_all_next()):
|
|
sibling.decompose()
|
|
|
|
# 수정된 HTML 반환
|
|
modified_html = str(soup)
|
|
|
|
# 옵션목록 중 빈옵션 삭제
|
|
modified_html = self.remove_empty_list_items(modified_html)
|
|
|
|
self.logger.debug(f"최종 수정된 HTML 데이터: {modified_html}")
|
|
return modified_html
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"HTML 데이터 수정 중 오류 발생: {e}", exc_info=True)
|
|
return html_data # 오류 발생 시 원본 데이터 반환
|
|
|
|
def remove_empty_list_items(self, html_data):
|
|
"""
|
|
HTML 데이터에서 만 포함된 <li> 요소를 제거합니다.
|
|
:param html_data: CKEditor의 HTML 데이터
|
|
:return: 수정된 HTML 데이터
|
|
"""
|
|
try:
|
|
# BeautifulSoup으로 HTML 데이터 파싱
|
|
soup = BeautifulSoup(html_data, 'html.parser')
|
|
|
|
# 모든 <li> 태그 찾기
|
|
list_items = soup.find_all('li')
|
|
|
|
# 만 포함된 <li> 태그 제거
|
|
for item in list_items:
|
|
if item.get_text(strip=True) == "\xa0": # 는 \xa0로 표시됨
|
|
item.decompose() # 태그 제거
|
|
|
|
# 수정된 HTML 반환
|
|
modified_html = str(soup)
|
|
return modified_html
|
|
|
|
except Exception as e:
|
|
print(f"오류 발생: {e}")
|
|
return html_data # 오류 발생 시 원본 데이터 반환
|
|
|
|
# def remove_original_images_with_count(self, html_data, original_images):
|
|
# """
|
|
# CKEditor HTML 데이터에서 original_images의 개수만큼 뒤에서부터 <img> 태그를 제거하되,
|
|
# 업로드된 이미지와 중복된 URL은 제외합니다.
|
|
# :param html_data: CKEditor의 HTML 데이터
|
|
# :param original_images: 원본 이미지 URL 리스트
|
|
# :return: 수정된 HTML 데이터
|
|
# """
|
|
# try:
|
|
# self.logger.debug("원본 이미지 제거 작업 시작")
|
|
|
|
# # HTML 데이터를 BeautifulSoup으로 파싱
|
|
# soup = BeautifulSoup(html_data, 'html.parser')
|
|
|
|
# # 모든 <img> 태그 탐색
|
|
# images = soup.find_all('img', src=True)
|
|
# self.logger.debug(f"HTML에서 찾은 이미지 태그 수: {len(images)}")
|
|
|
|
# # 뒤에서부터 original_images의 개수만큼 제거
|
|
# num_to_remove = len(original_images)
|
|
# self.logger.debug(f"제거할 이미지 갯수: {num_to_remove}")
|
|
|
|
# if num_to_remove > len(images):
|
|
# self.logger.warning("제거할 이미지 수가 전체 이미지 수를 초과합니다. 모든 이미지를 제거합니다.")
|
|
# num_to_remove = len(images)
|
|
|
|
# removed_count = 0 # 삭제된 이미지 수 카운트
|
|
# for img in reversed(images): # 뒤에서부터 순회
|
|
# src = img.get('src')
|
|
# if src in original_images: # 원본 이미지 URL에 해당하는 경우만 삭제
|
|
# self.logger.debug(f"원본 이미지 제거: {src}")
|
|
# img.decompose()
|
|
# removed_count += 1
|
|
|
|
# if removed_count >= num_to_remove: # 지정된 개수만큼 삭제되면 중단
|
|
# break
|
|
|
|
# # 수정된 HTML을 반환
|
|
# modified_html = str(soup)
|
|
# self.logger.debug(f"수정된 HTML 데이터: {modified_html}")
|
|
# return modified_html
|
|
|
|
# except Exception as e:
|
|
# self.logger.error(f"원본 이미지 제거 중 오류 발생: {e}", exc_info=True)
|
|
# return html_data # 오류 발생 시 원본 데이터 반환
|
|
|
|
def get_ckeditor_data(self):
|
|
try:
|
|
get_editor_data_script = """
|
|
(function() {
|
|
const editorElement = document.querySelector('.ck-editor__editable');
|
|
if (!editorElement) {
|
|
console.error('CKEditor 요소를 찾을 수 없습니다.');
|
|
return null;
|
|
}
|
|
|
|
const editorInstance = editorElement.ckeditorInstance;
|
|
if (!editorInstance) {
|
|
console.error('CKEditor 인스턴스를 가져올 수 없습니다.');
|
|
return null;
|
|
}
|
|
|
|
const editorData = editorInstance.getData(); // 현재 편집 중인 HTML 데이터 가져오기
|
|
console.log('현재 CKEditor 데이터:', editorData);
|
|
return editorData;
|
|
})();
|
|
"""
|
|
|
|
ck_editor_data = self.page.evaluate(get_editor_data_script)
|
|
self.logger.info(f"CKEditor 데이터: {ck_editor_data}")
|
|
|
|
return ck_editor_data
|
|
except Exception as e:
|
|
self.logger.error(f"CKEditor 데이터를 가져오는 중 오류 발생: {e}", exc_info=True)
|
|
return None
|
|
|
|
def set_ckeditor_data(self, html_data):
|
|
"""
|
|
CKEditor에 HTML 데이터를 설정합니다.
|
|
:param html_data: 삽입할 HTML 데이터
|
|
"""
|
|
try:
|
|
self.logger.debug("CKEditor에 HTML 데이터를 설정하는 작업 시작")
|
|
|
|
# JavaScript 스크립트를 통해 CKEditor에 데이터 설정
|
|
set_data_script = f"""
|
|
(function() {{
|
|
const editorElement = document.querySelector('.ck-editor__editable');
|
|
if (!editorElement) {{
|
|
console.error('CKEditor 요소를 찾을 수 없습니다.');
|
|
return;
|
|
}}
|
|
|
|
const editorInstance = editorElement.ckeditorInstance;
|
|
if (!editorInstance) {{
|
|
console.error('CKEditor 인스턴스를 가져올 수 없습니다.');
|
|
return;
|
|
}}
|
|
|
|
editorInstance.setData(`{html_data}`);
|
|
console.log('CKEditor에 새로운 데이터가 성공적으로 설정되었습니다.');
|
|
}})();
|
|
"""
|
|
|
|
# 스크립트 실행
|
|
self.page.evaluate(set_data_script)
|
|
self.logger.info("CKEditor에 HTML 데이터를 성공적으로 설정했습니다.")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"CKEditor 데이터 설정 중 오류 발생: {e}", exc_info=True)
|
|
|
|
def input_detail_text(self, optionHandler):
|
|
try:
|
|
# 텍스트 입력 필드 선택
|
|
input_field = self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
|
|
# input_field.press('Enter')
|
|
|
|
# 선두부 텍스트 입력
|
|
for key in sorted(self.text_templates.keys()):
|
|
leading_text = self.text_templates[key]
|
|
if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
|
|
input_field.type(leading_text)
|
|
input_field.press('Enter')
|
|
self.logger.info(f"{key} 텍스트 입력 완료: {leading_text}")
|
|
|
|
option_data = optionHandler.get_selected_translated_options()
|
|
self.logger.debug(f'option_data : {option_data}')
|
|
|
|
# if is_option_data:
|
|
if option_data or option_data != {}:
|
|
self.logger.debug('옵션 데이터 입력 시작')
|
|
|
|
is_single = optionHandler.option_info['is_single_option']
|
|
|
|
self.logger.debug('가져온 옵션 데이터')
|
|
self.logger.debug(f'{option_data}')
|
|
|
|
# # 옵션 입력 필드 선택
|
|
# input_field = self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
|
|
# input_field.press('Enter')
|
|
|
|
if not is_single:
|
|
self.logger.info('단일옵션이 아니므로 옵션목록을 입력')
|
|
|
|
# 각 옵션을 한 줄씩 입력
|
|
input_field.type("# 옵션 목록")
|
|
input_field.press('Enter')
|
|
|
|
# 첫 번째 옵션의 번역된 옵션명만 입력
|
|
first_key = list(option_data.keys())[0] # key는 옵션이름 value는 가격
|
|
first_value = option_data[first_key] # key는 옵션이름 value는 가격
|
|
input_field.type(f"- 1. {first_key}")
|
|
input_field.press('Enter') # 첫 번째 옵션 이후 엔터로 줄바꿈
|
|
|
|
# 나머지 옵션도 번역된 옵션명만 입력
|
|
for i, (key, value) in enumerate(list(option_data.items())[1:], start=2):
|
|
input_field.type(f"{key}") # 2번옵션부터 번호 떼고 입력. key는 옵션이름 value는 가격
|
|
input_field.press('Enter') # 엔터 키를 입력하여 줄바꿈
|
|
|
|
# 목록 끝을 알리기 위해 엔터 두 번 입력
|
|
input_field.press('Enter')
|
|
input_field.press('Enter')
|
|
|
|
# 후두부 텍스트 입력
|
|
input_field.type('### 나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.')
|
|
input_field.press('Enter')
|
|
input_field.type('---')
|
|
input_field.press('Enter')
|
|
|
|
self.logger.info('옵션 데이터 입력 완료')
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"옵션데이터 입력 처리 중 오류: {e}", exc_info=True)
|
|
|
|
|
|
def upload_image(self, img_path):
|
|
"""
|
|
이미지를 CKEditor에 업로드하고 가장 첫 번째 이미지 URL을 추출 후 삭제합니다.
|
|
:param img_path: 업로드할 이미지 파일 경로
|
|
:return: 업로드된 이미지의 URL
|
|
"""
|
|
try:
|
|
# 1. 파일 업로드
|
|
file_input_selector = 'div#productMainContentContainerId button[type="button"].ck.ck-button.ck-off.ck-file-dialog-button.ck-splitbutton__action > input'
|
|
file_input = self.page.locator(file_input_selector)
|
|
file_input.set_input_files(img_path)
|
|
self.logger.info(f"이미지가 성공적으로 업로드되었습니다: {img_path}")
|
|
|
|
# 2. 엔터 키 입력
|
|
self.page.keyboard.press("Enter")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"이미지 업로드 중 오류 발생: {e}", exc_info=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# def detail_trans_ori(self):
|
|
# # 상세페이지 탭 클릭
|
|
# self.click_detail_tab()
|
|
|
|
# # 텍스트 입력 필드 선택
|
|
# input_field = self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
|
|
# input_field.press('Enter')
|
|
|
|
# get_editor_data_script = """
|
|
# (function() {
|
|
# const editorElement = document.querySelector('.ck-editor__editable');
|
|
# if (!editorElement) {
|
|
# console.error('CKEditor 요소를 찾을 수 없습니다.');
|
|
# return null;
|
|
# }
|
|
|
|
# const editorInstance = editorElement.ckeditorInstance;
|
|
# if (!editorInstance) {
|
|
# console.error('CKEditor 인스턴스를 가져올 수 없습니다.');
|
|
# return null;
|
|
# }
|
|
|
|
# const editorData = editorInstance.getData(); // 현재 편집 중인 HTML 데이터 가져오기
|
|
# console.log('현재 CKEditor 데이터:', editorData);
|
|
# return editorData;
|
|
# })();
|
|
# """
|
|
|
|
# original_editor_data = self.page.evaluate(get_editor_data_script)
|
|
# self.logger.info(f"원본 CKEditor 데이터: {original_editor_data}")
|
|
|
|
# # 이미지 URL 추출
|
|
# image_urls = self.extract_image_urls(self.optionHandler, is_option_data=True) # 코루틴 실행
|
|
# total_images = len(image_urls)
|
|
# self.logger.info(f"현재 상품의 총 이미지 수 : {total_images}개")
|
|
|
|
# # 이미지 번역 작업 진행
|
|
# for index, url in enumerate(image_urls):
|
|
# current_image_count = index +1
|
|
|
|
# # 임시 파일 경로 생성
|
|
# img_path = os.path.join(self.temp_dir, f"translated_detail_image_{index}.png")
|
|
|
|
# self.logger.debug(f"웨일 브라우저를 활용한 이미지 번역 프로세스")
|
|
# is_success_translated = self.whale_translator.translate_image(url)
|
|
|
|
# # is_paste_success = self.paste_image_in_chrome(self.clipboardImageManager, url, is_success_translated, self.toggle_states)
|
|
# is_paste_success = self.paste_image_in_chrome_to_save_path(url, img_path, is_success_translated, self.toggle_states)
|
|
|
|
# if is_paste_success:
|
|
# self.logger.debug(f"{url} gui 이미지 붙여넣기 성공")
|
|
# else:
|
|
# self.logger.debug(f"{url} gui 이미지 붙여넣기 실패")
|
|
|
|
# current_image_count += 1
|
|
|
|
# # 수정 후 저장
|
|
# self.logger.info('상품 세부사항 저장 중...')
|
|
# self.save_product_edit()
|
|
|
|
def detail_trans_for_recovery(self, product_name, deleted_imgs):
|
|
# 상세페이지 탭 클릭
|
|
self.click_detail_tab()
|
|
|
|
self.logger.debug('recovery_image_urls 메서드 호출')
|
|
self.recovery_image_urls(product_name, deleted_imgs)
|
|
|
|
# 수정 후 저장
|
|
self.logger.info('상품 세부사항 저장 중...')
|
|
self.save_product_edit()
|
|
|
|
def edit_option(self, product_name):
|
|
# 상세페이지 탭 클릭
|
|
self.click_option_tab()
|
|
|
|
# 옵션 최대선택갯수
|
|
max_option_count = self.toggle_states.get('max_option_count', 0)
|
|
self.current_options_info = self.optionHandler.process_options(product_name, max_option_count, self.toggle_states)
|
|
|
|
# 수정 후 저장
|
|
self.save_product_edit()
|
|
|
|
def edit_price(self, product_category):
|
|
# 상세페이지 탭 클릭
|
|
self.click_price_tab()
|
|
|
|
# 가격 수정 프로세스
|
|
self.priceHandler.process_price(category=product_category, is_group = self.titleHandler.is_group_ESM)
|
|
|
|
# 수정 후 저장
|
|
self.save_product_edit()
|
|
|
|
|
|
def edit_thumb(self):
|
|
# 상세페이지 탭 클릭
|
|
self.click_thumb_tab()
|
|
|
|
# 가격 수정 프로세스
|
|
self.thumbnailHandler.process_thumbnails()
|
|
|
|
# 수정 후 저장
|
|
self.save_product_edit()
|
|
|
|
|
|
# def run(self):
|
|
# """QThread의 run 메서드: 실행할 Playwright 관련 작업 추가"""
|
|
# self.logger.debug("QThread가 시작되었습니다.")
|
|
# # 브라우저 시작 및 초기화
|
|
# try:
|
|
# self.start_browser()
|
|
# self.browser_started.emit() # 브라우저 성공적으로 시작됨
|
|
|
|
# except Exception as e:
|
|
# self.logger.error(f"브라우저 실행 중 오류 발생: {e}")
|
|
# self.browser_error.emit(str(e)) # GUI에 오류 전달
|
|
|
|
|
|
|
|
def run(self):
|
|
"""QThread의 run 메서드: 실행할 Playwright 관련 작업 추가"""
|
|
self.logger.debug("QThread가 시작되었습니다.")
|
|
|
|
try:
|
|
# Playwright 시작
|
|
self.start_browser()
|
|
self.browser_started.emit() # 브라우저 성공 신호
|
|
|
|
# 무한 루프로 대기하며 GUI와 상호작용
|
|
while True:
|
|
if self.isInterruptionRequested(): # QThread가 종료 요청을 받았는지 확인
|
|
self.logger.debug("QThread 종료 요청 수신")
|
|
break
|
|
time.sleep(0.1) # CPU 점유율 낮추기 위해 대기
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"브라우저 실행 중 오류 발생: {e}")
|
|
self.browser_error.emit(str(e)) # GUI에 오류 전달
|
|
|
|
finally:
|
|
# 종료 시 Playwright 리소스 정리
|
|
if self.browser:
|
|
self.browser.close()
|
|
if self.playwright:
|
|
self.playwright.stop()
|
|
self.logger.debug("브라우저 및 Playwright 리소스 정리 완료")
|
|
|
|
|
|
def stop(self):
|
|
"""Playwright 리소스 정리 및 스레드 종료"""
|
|
self.logger.debug("stop - Playwright 종료")
|
|
try:
|
|
if self.browser:
|
|
self.browser.close() # 브라우저 닫기
|
|
self.logger.info("브라우저가 종료되었습니다.")
|
|
if self.playwright:
|
|
self.playwright.stop() # Playwright 인스턴스 종료
|
|
self.logger.info("Playwright가 종료되었습니다.")
|
|
|
|
if self.whale_translator:
|
|
self.logger.debug("WhaleTranslator 종료 중...")
|
|
self.whale_translator.stop_whale_browser() # WhaleTranslator 종료
|
|
self.whale_translator = None
|
|
self.logger.debug("BrowserController 종료 완료.")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Playwright 종료 중 오류 발생: {e}")
|
|
|
|
def terminate(self):
|
|
"""스레드 종료 및 리소스 정리"""
|
|
self.logger.info("스레드 종료 요청")
|
|
try:
|
|
self.stop() # Playwright 리소스 정리
|
|
self.logger.info("Playwright 리소스가 정리되었습니다.")
|
|
except Exception as e:
|
|
self.logger.error(f"terminate 중 오류 발생: {e}")
|
|
finally:
|
|
super().terminate() # QThread의 기본 종료 메서드 호출
|
|
self.logger.info("스레드가 종료되었습니다.")
|
|
|
|
def cleanup(self):
|
|
pass
|
|
|
|
def check_pause(self):
|
|
"""일시 정지 상태라면 재개될 때까지 대기"""
|
|
self.pause_mutex.lock()
|
|
if self.is_paused:
|
|
self.pause_condition.wait(self.pause_mutex)
|
|
self.pause_mutex.unlock()
|
|
|
|
def pause(self):
|
|
"""스레드 일시 정지"""
|
|
self.pause_mutex.lock()
|
|
self.is_paused = True
|
|
self.pause_mutex.unlock()
|
|
self.logger.debug("스레드가 일시 정지되었습니다.")
|
|
|
|
def resume(self):
|
|
"""스레드 재개"""
|
|
self.pause_mutex.lock()
|
|
self.is_paused = False
|
|
self.pause_condition.wakeAll()
|
|
self.pause_mutex.unlock()
|
|
self.logger.debug("스레드가 재개되었습니다.") |