AutoPercenty3/browser_control_sync.py

1822 lines
86 KiB
Python

from playwright.sync_api import sync_playwright, TimeoutError
from PySide6.QtCore import QThread, Signal, QMutex, QWaitCondition
import re
import time
from bs4 import BeautifulSoup
import os, sys, random
import requests
from PIL import Image
from io import BytesIO
from whale_new import WhaleTranslator
from clipboardImageManager import ClipboardImageManager
from option import OptionHandler
from price import PriceHandler
from title import TitleHandler
from thumb import ThumbnailHandler
import tempfile
class BrowserController(QThread):
# 브라우저 시작 시그널 정의
browser_started = Signal()
browser_error = Signal(str)
# 번역 작업 시작, 완료, 오류 시그널 정의
translation_started = Signal()
translation_completed = Signal()
translation_error = Signal(str)
def __init__(self, app, logger, locator_manager, vertexAI, cmb_diag, login_infos, toggle_states):
super().__init__()
self.logger = logger
self.log_files = ["AutoPercenty3.log", "AutoPercenty3.log.1", "AutoPercenty3.log.2", "AutoPercenty3.log.3", "AutoPercenty3.log.4", "AutoPercenty3.log.5"]
self.locator_manager = locator_manager
self.vertexAI = vertexAI
self.cmb_diag = cmb_diag
self.toggle_states = toggle_states
self.login_infos = login_infos
self.chrome_hwnd = None
# 임시 디렉토리 생성
self.temp_dir = tempfile.mkdtemp() # 임시 디렉토리 생성
self.whale_translator = None
self.playwright = None
self.browser = None
self.page = None
# 일시 정지 기능을 위한 QMutex와 QWaitCondition 설정
self.pause_mutex = QMutex()
self.pause_condition = QWaitCondition()
self.is_paused = False # 일시 정지 상태 플래그
self.clipboardImageManager = ClipboardImageManager(logger=self.logger, watermark_font_size=36, debug_flag=self.toggle_states['debug_mode'])
self.optionHandler = OptionHandler(self.locator_manager, self, self.whale_translator, self.clipboardImageManager, self.logger, self.vertexAI, debug_flag=self.toggle_states['debug_mode'])
self.priceHandler = PriceHandler(self.locator_manager, self, self.logger, self.optionHandler, self.vertexAI, self.cmb_diag, debug_flag=self.toggle_states['debug_mode'])
self.titleHandler = TitleHandler(self.locator_manager, self, self.logger)
self.thumbnailHandler = ThumbnailHandler(self.locator_manager, self, self.logger, self.whale_translator, self.clipboardImageManager, self.toggle_states)
# BrowserController에 해당하는 모든 locator를 정의
self.chrome_window_name = self.locator_manager.get_locator('BrowserControl', 'chrome_window_name')
self.login_email_locator = self.locator_manager.get_locator('BrowserControl', 'login_email_locator')
self.login_password_locator = self.locator_manager.get_locator('BrowserControl', 'login_password_locator')
self.login_button_locator = self.locator_manager.get_locator('BrowserControl', 'login_button_locator')
self.admin_toggle_locator = self.locator_manager.get_locator('BrowserControl', 'admin_toggle_locator')
self.staff_id_locator = self.locator_manager.get_locator('BrowserControl', 'staff_id_locator')
self.staff_login_button_locator = self.locator_manager.get_locator('BrowserControl', 'staff_login_button_locator')
self.close_ad_dialog_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_dialog_locator')
self.close_ad_button_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_button_locator')
self.total_product_count_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_locator')
self.total_product_count_for_registed_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_for_registed_locator')
self.product_parent_locator= self.locator_manager.get_locator('BrowserControl', 'product_parent_locator')
self.product_name_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_name_inner_locator')
self.product_price_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_price_inner_locator')
self.product_image_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_image_inner_locator')
self.product_name_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_name_for_ed_template')
self.product_price_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_price_for_ed_template')
self.product_image_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_image_for_ed_template')
self.product_edit_button_template = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template')
self.current_page = self.locator_manager.get_locator('BrowserControl', 'current_page')
self.next_page_button_template = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template')
self.new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
self.registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
self.current_page_locator = self.locator_manager.get_locator('BrowserControl', 'current_page_locator')
self.source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
self.ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
self.option_input_field_locator = self.locator_manager.get_locator('BrowserControl', 'option_input_field_locator')
self.title_tab_locator = self.locator_manager.get_locator('BrowserControl', 'title_tab_locator')
self.option_tab_locator = self.locator_manager.get_locator('BrowserControl', 'option_tab_locator')
self.price_tab_locator = self.locator_manager.get_locator('BrowserControl', 'price_tab_locator')
self.tag_tab_locator = self.locator_manager.get_locator('BrowserControl', 'tag_tab_locator')
self.thumb_tab_locator = self.locator_manager.get_locator('BrowserControl', 'thumb_tab_locator')
self.detail_tab_locator = self.locator_manager.get_locator('BrowserControl', 'detail_tab_locator')
self.upload_tab_locator = self.locator_manager.get_locator('BrowserControl', 'upload_tab_locator')
self.save_button_locator = self.locator_manager.get_locator('BrowserControl', 'save_button_locator')
self.text_templates = self.locator_manager.selectors.get('DetailPageTextTemplates', {})
# # 스레드 종료 시 close_whale_window_if_exists 호출
# self.finished.connect(self.cleanup)
def get_page(self):
return self.page
def get_whale(self):
return self.whale_translator
def monitor_browser_logs(self):
"""브라우저 로그 및 오류를 모니터링"""
try:
self.page.on("console", lambda msg: self.logger.info(f"JS 로그: {msg.type}: {msg.text}"))
self.page.on("requestfailed", lambda req: self.logger.error(f"요청 실패: {req.url}"))
except Exception as e:
self.logger.error(f"브라우저 로그 모니터링 중 오류 발생: {e}", exc_info=True)
def get_base_dir(self):
"""
실행 환경에 따라 base_dir을 설정하는 메서드.
cx_Freeze로 패키징된 경우 실행 파일의 경로, 일반 Python 환경일 경우 __file__을 기준으로 설정.
"""
if getattr(sys, 'frozen', False): # 패키징된 경우
base_dir = os.path.dirname(sys.executable)
internal_dir = os.path.join(base_dir, '_internal') # _internal 디렉토리 포함
if os.path.exists(internal_dir): # _internal 디렉토리가 존재하면 base_dir로 설정
return internal_dir
else: # 일반 Python 실행 환경
base_dir = os.path.dirname(os.path.abspath(__file__))
return base_dir
def start_browser(self):
"""비동기 Playwright 초기화 및 로그인 수행"""
try:
self.logger.debug('크롬 브라우저를 실행합니다...')
# WhaleTranslator 필요 여부 확인 및 초기화
optionIMGTrans_status = self.toggle_states.get('optionIMGTrans', False)
detail_IMGTrans_status = self.toggle_states.get('detail_IMGTrans', False)
thumb_status = self.toggle_states.get('thumb', False)
self.logger.debug(f"optionIMGTrans_status: {optionIMGTrans_status}, detail_IMGTrans_status: {detail_IMGTrans_status}, thumb_status: {thumb_status}")
if optionIMGTrans_status or detail_IMGTrans_status or thumb_status:
self.logger.debug('이미지번역을 위해 웨일을 로드합니다...')
self.whale_translator = WhaleTranslator(self.logger)
self.whale_translator.start_whale_browser()
# Playwright 시작 및 브라우저 실행
self.logger.debug('playwright().start 로드합니다...')
self.playwright = sync_playwright().start()
self.logger.debug('playwright().start 완료')
base_path = self.get_base_dir()
self.logger.debug(f"base_path: {base_path}")
browser_path = os.path.join(base_path, 'browsers', 'chromium-1140', 'chrome-win','chrome.exe')
extension_path = os.path.join(base_path, 'browsers', 'extensions', '1.1.100_0')
user_data_dir = os.path.join(base_path, 'browsers', 'user_data')
cache_dir = os.path.join(base_path, 'browsers', 'chromium-1140', 'cache') # 캐시 폴더 경로
if not os.path.exists(browser_path):
self.logger.error(f"브라우저 실행 파일이 없습니다: {browser_path}")
raise FileNotFoundError(f"브라우저 실행 파일이 없습니다: {browser_path}")
self.logger.debug(f"브라우저 경로: {browser_path}")
self.logger.debug(f"확장 프로그램 경로: {extension_path}")
self.logger.debug(f"사용자 폴더 경로: {user_data_dir}")
# 사용자 데이터 디렉토리가 존재하지 않으면 생성
if not os.path.exists(user_data_dir):
os.makedirs(user_data_dir)
self.logger.debug(f"{user_data_dir} 디렉토리가 생성되었습니다.")
# cache 디렉토리가 존재하지 않으면 생성
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
self.logger.debug(f"{cache_dir} 디렉토리가 생성되었습니다.")
# User agent 설정
user_agent = random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
])
self.logger.debug(f"user_agent: {user_agent}")
viewport = {"width": 1280, "height": 720}
# 브라우저 시작 및 설정
self.browser = self.playwright.chromium.launch_persistent_context(
user_data_dir,
headless=True,
viewport=viewport,
permissions=["geolocation", "notifications"],
geolocation={"latitude": 37.5665, "longitude": 126.9780},
locale="ko-KR",
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-popup-blocking',
'--disable-blink-features=AutomationControlled',
f'--disable-extensions-except={extension_path}',
f'--load-extension={extension_path}',
'--start-maximized',
'--window-size=1280,720',
f"--disk-cache-dir={cache_dir}", # Playwright 캐시 폴더 지정
],
executable_path=browser_path,
user_agent=user_agent
)
# 브라우저 종료 이벤트 로깅
self.browser.on("disconnected", lambda: self.logger.error("브라우저가 종료되었습니다."))
# 기본 페이지가 없을 수 있으므로 새로운 페이지 생성
self.page = self.browser.new_page()
self.logger.info('새 페이지 로딩 중...')
# stealth_sync(self.page)
# self.logger.debug('스텔스 설정')
# self.monitor_browser_logs()
# self.logger.info('자바스크립트의 로그를 모니터링하기 위한 로그 설정')
self.page.goto('https://percenty.co.kr/signin')
# self.page.goto('https://percenty.co.kr')
self.logger.info('percenty.co.kr/signin 로딩 완료')
# 첫 번째 기본 탭 닫기
if self.browser.pages:
self.browser.pages[0].close()
time.sleep(0.3)
# 페이지 제목을 가져와서 창 제목으로 활용
page_title = self.page.title()
self.logger.debug(f'페이지 제목: {page_title}')
# # 창 핸들 찾기 (동적으로 얻은 페이지 제목 사용)
# self.chrome_hwnd = self.find_window_by_title(page_title)
# if not self.chrome_hwnd:
# self.logger.warning('크롬 창을 찾을 수 없습니다.')
# else:
# self.logger.debug(f'크롬 창 핸들: {self.chrome_hwnd}')
self.login()
self.close_ad_if_exists()
id_ed_mode = self.toggle_states.get('ed_mode', False)
if id_ed_mode:
self.go_to_registered_product_page()
self.logger.info('등록 상품 관리 페이지로 이동 중...')
else:
self.logger.info('신규 상품 등록 페이지로 이동 중...')
self.go_to_new_product_page()
# 각 핸들러에 초기화된 page 객체 전달.
self.optionHandler.update_page(self.page)
self.titleHandler.update_page(self.page)
self.priceHandler.update_page(self.page)
self.thumbnailHandler.update_page(self.page)
time.sleep(1)
self.start_Percenty_task()
except Exception as e:
self.logger.error(f"브라우저 시작 오류: {str(e)}", exc_info=True)
self.browser_error.emit(str(e))
def start_Percenty_task(self):
self.logger.debug('퍼센티 상품수정 작업을 시작합니다...')
self.running = True # 번역 작업이 시작됨
try:
# 1. 총 상품 수 수집
self.scroll_page_to_bottom() # 동적 로딩을 위해 끝까지 스크롤
# total_products = self.browser_controller.get_total_product_count(ed_mode=self.toggle_states['ed_mode'])
# get_total_product_count 메서드 호출 후 결과를 딕셔너리로 받음
result = self.get_total_product_count()
# 딕셔너리에서 총 상품 수와 페이지당 상품 수를 추출
total_products = result.get("total_count", 0)
items_per_page = result.get("items_per_page", 0)
self.logger.debug(f"총 상품 수: {total_products}, 페이지당 상품 수: {items_per_page}")
self.logger.debug(f"[ self.toggle_states 상태 ]\n{self.toggle_states}")
if total_products == 0:
self.logger.debug('수집할 상품이 없습니다. 작업을 종료합니다.')
return
completed_count = 0
page_number = 1
# 3. 총 상품 수만큼 반복 작업 수행
while completed_count < total_products:
self.logger.debug(f'현재 페이지: {page_number}')
if not page_number == 1:
self.scroll_page_to_top()
self.logger.debug(f'1페이지가 아니므로 상품의 동적로딩을 위해 휠 스크롤 업')
is_ed_mode = self.toggle_states.get('ed_mode', False)
if not is_ed_mode:
# 4. 현재 페이지의 모든 "세부사항 수정 및 업로드" 버튼 찾기
self.logger.debug('수정모드가 아니므로 상품수정 버튼 elements를 수집합니다.')
product_buttons = self.get_product_edit_buttons_by_templete()
else:
self.logger.debug('상품정보 수집')
product_infos, product_name_elements = self.collect_product_info(items_per_page, ed_mode=is_ed_mode)
self.logger.debug(f"product_infos : {product_infos}")
self.logger.debug('수정모드이므로 상품명 elements를 수정버튼으로 활용합니다.')
product_buttons = product_name_elements
self.logger.debug(f"product_buttons 갯수 : [{len(product_buttons)}]개")
if not product_buttons:
self.logger.debug('수정할 상품이 없습니다. 작업을 종료합니다.')
break
# 5. 각 상품에 대해 상품수정작업 수행
for index, button in enumerate(product_buttons, start=1):
# 상품명 수집 오류 처리
self.logger.debug(f'{index}/{len(product_buttons)} 버튼의 활성상태 확인 중...')
is_disabled = self.is_button_disabled(button)
if is_disabled:
self.logger.debug(f'{index}/{len(product_buttons)}: 상품의 수정버튼이 비활성화되어 있어 작업을 건너뜁니다.')
continue
self.logger.debug(f'{index}/{len(product_buttons)}: 세부사항 수정 작업 중...')
# 상품 수정 다이얼로그 열기
self.open_product_edit_dialog(button)
# 상품명과 카테고리 수집
product_name = self.titleHandler.get_original_product_name() # 원본상품명 가져오기
product_category = self.titleHandler.get_category(market='ss') # 카테고리 가져오기
# 옵션 수정
is_optionTrnas = self.toggle_states.get('optionTrnas')
is_optionIMGTrans = self.toggle_states.get('optionIMGTrans')
is_optionAutoSelect = self.toggle_states.get('optionAutoSelect')
if is_optionTrnas or is_optionIMGTrans or is_optionAutoSelect:
self.logger.debug(f"옵션수정 : optionTrnas={is_optionTrnas} + optionIMGTrans={is_optionIMGTrans} + optionAutoSelect={is_optionAutoSelect}")
self.edit_option(product_name)
# 가격 수정
is_price = self.toggle_states.get('price')
if is_price:
self.logger.debug(f"가격수정 : {is_price} ")
self.edit_price(product_category)
# 썸네일 수정
thumb = self.toggle_states.get('thumb')
if thumb:
self.logger.debug(f"썸네일수정 : {thumb} ")
self.edit_thumb()
# 태그 수정
if self.toggle_states.get('tag'):
pass
# 상품명 수정
if self.toggle_states.get('title'):
pass
# 상세페이지 수정
detail_Option = self.toggle_states.get('detail_Option')
detail_IMGTrans = self.toggle_states.get('detail_IMGTrans')
recovery_mode = self.toggle_states.get('recovery_mode')
if detail_Option or detail_IMGTrans or recovery_mode:
self.logger.debug(f"상세페이지 수정 : {detail_Option} + {detail_IMGTrans}")
if recovery_mode:
deleted_imgs = self.deleted_img_urls_from_logs()
self.detail_trans_for_recovery(product_name, deleted_imgs)
else:
self.detail_trans()
# 수정 후 저장
self.logger.debug('상품 세부사항 저장 중...')
self.save_and_ecs_product_edit()
completed_count += 1
self.logger.info(f'{completed_count}/[{total_products}]개 상품 수정 완료.')
if completed_count >= total_products:
self.logger.info(f'[{total_products}]개 상품 수정이 완료되었습니다.')
return
# 6. 다음 페이지로 이동 (있으면)
if not self.go_to_next_page():
self.logger.info('더 이상 페이지가 없습니다. 작업을 종료합니다.')
break
page_number += 1
if self.running:
self.logger.info('모든 상품 번역 및 저장 완료.')
except Exception as e:
self.logger.error(f"번역 작업 중 오류 발생: {e}", exc_info=True)
def login(self):
"""로그인 처리"""
try:
is_admin = self.login_infos['is_admin']
self.logger.info(f'로그인 시도 중: {"관리자" if is_admin else "직원"} 계정')
if is_admin:
# 관리자 로그인 처리
self.page.fill(self.login_email_locator, self.login_infos['admin_id'])
self.page.fill(self.login_password_locator, self.login_infos['admin_pw'])
self.page.click(self.login_button_locator)
else:
# 관리자 토글 버튼을 클릭해서 직원 로그인 화면 활성화
admin_toggle = self.page.locator(self.admin_toggle_locator)
admin_toggle.wait_for(state="attached", timeout=20000) # 타임아웃 확장
if admin_toggle.get_attribute("aria-checked") == "true":
admin_toggle.click() # 관리자 모드에서 직원 모드로 전환
self.page.fill(self.login_email_locator, self.login_infos['admin_id'])
self.page.fill(self.staff_id_locator, self.login_infos['user_id'])
self.page.fill(self.login_password_locator, self.login_infos['user_pw'])
self.page.click(self.staff_login_button_locator)
self.logger.info(f'로그인 완료: {"관리자" if is_admin else "직원"} 계정')
except Exception as e:
self.logger.error(f"로그인 오류: {str(e)}", exc_info=True)
self.browser_error.emit(str(e))
def close_browser(self):
"""브라우저 종료"""
if self.browser:
self.browser.close()
self.playwright.stop()
self.cleanup()
self.logger.info('브라우저 종료됨.')
def get_total_product_count(self):
total_count = 0
items_per_page = 0
try:
# total_count_elements = self.page.query_selector_all(".sc-dOvA-dm.jqRNYf")
total_count_element = self.page.query_selector("div#root span:has-text('개 상품')")
items_per_page_element = self.page.query_selector("div#root [title$='개씩 보기']")
self.logger.debug(f"total_count_element : {total_count_element}")
if total_count_element:
total_count_text = total_count_element.inner_text()
if "" in total_count_text and "개 상품" in total_count_text:
total_count = int(''.join(re.findall(r'\d+', total_count_text)))
self.logger.info(f"총 상품수 확인: {total_count}")
# 페이지당 상품 수 추출
if items_per_page_element:
items_per_page_text = items_per_page_element.get_attribute("title")
if items_per_page_text and "개씩 보기" in items_per_page_text:
items_per_page = int(''.join(re.findall(r'\d+', items_per_page_text)))
self.logger.info(f"페이지당 상품수 확인: {items_per_page} 개씩 보기")
# 결과 반환
if total_count:
return {"total_count": total_count, "items_per_page": items_per_page}
return {"total_count": 0, "items_per_page": 0}
except Exception as e:
self.logger.error(f"상품 수를 가져오는 중 오류 발생: {e}", exc_info=True)
return {"total_count": 0, "items_per_page": 0}
def fetch_image_urls(self, html_content):
"""
HTML 콘텐츠에서 모든 <img> 태그의 URL을 추출하는 함수.
<figure> 안의 <img> 태그와 독립된 <img> 태그 모두 처리.
"""
soup = BeautifulSoup(html_content, 'html.parser')
# 모든 <img> 태그를 찾기
image_urls = []
img_tags = soup.find_all('img')
for img in img_tags:
# img 태그에서 src 속성 추출
if 'src' in img.attrs:
image_url = img['src']
image_urls.append(image_url)
self.logger.debug(f"fetch_image_urls 에서 추출한 이미지URL 갯수 : {len(image_urls)}")
self.logger.debug(f"fetch_image_urls 에서 추출한 이미지URL 목록 : {image_urls}")
return image_urls
def close_ad_if_exists(self):
"""광고 다이얼로그가 있으면 닫기 버튼을 클릭하는 메서드"""
try:
# 광고 다이얼로그가 나타날 때까지 기다림
self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=3000, state='visible')
self.logger.info("다이얼로그가 발견되었습니다. 닫기 버튼을 클릭합니다.")
# 닫기 버튼 클릭
close_button = self.page.query_selector(self.close_ad_button_locator)
if close_button:
close_button.click()
self.logger.info("다이얼로그를 성공적으로 닫았습니다.")
else:
self.logger.warning("닫기 버튼을 찾지 못했습니다.")
except TimeoutError:
# 다이얼로그가 없을 때: info 수준의 로그로 기록
self.logger.info("다이얼로그가 발견되지 않았습니다. 타임아웃이 발생했습니다.")
except Exception as e:
# 다른 예외 상황 발생 시 error로 기록
self.logger.error(f"다이얼로그 닫기 중 오류 발생: {e}", exc_info=True)
def go_to_new_product_page(self):
"""신규 상품 등록 페이지로 이동"""
try:
new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
self.page.click(new_product_page_locator)
self.logger.info("신규 상품 등록 페이지로 이동 완료.")
except Exception as e:
self.logger.error(f"신규 상품 등록 페이지 이동 중 오류: {e}", exc_info=True)
def go_to_registered_product_page(self):
"""신규 상품 등록 페이지로 이동"""
try:
registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
self.page.click(registered_product_page_locator)
self.logger.info("등록 상품 관리 페이지로 이동 완료.")
except Exception as e:
self.logger.error(f"등록 상품 관리 페이지 이동 중 오류: {e}", exc_info=True)
def is_button_disabled(self, button):
"""버튼이 disabled 상태인지 확인"""
try:
# 버튼의 disabled 속성 확인
is_disabled = button.get_attribute('disabled')
return is_disabled is not None # disabled 속성이 있으면 True 반환
except Exception as e:
self.logger.error(f"상품 수정 버튼 상태 확인 중 오류 발생: {e}", exc_info=True)
return False # 오류 발생 시 기본적으로 활성화된 것으로 처리
def get_product_edit_buttons_by_templete(self):
"""현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기"""
try:
# 버튼 선택자 설정
# edit_button_selector_template = f'//button[span[text()="세부사항 수정 및 업로드"]]'
self.product_edit_button_template
# 선택자를 사용해 버튼 객체를 찾음
buttons = self.page.locator(self.product_edit_button_template)
# 버튼이 존재하는지 확인
button_count = buttons.count()
if button_count == 0:
self.logger.warning("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.")
return []
self.logger.info(f"현재 페이지의 수정할 상품 개수: {button_count}")
# 모든 버튼을 리스트로 반환
return [buttons.nth(i) for i in range(button_count)]
except Exception as e:
self.logger.error(f"상품 수정 버튼을 찾는 중 오류: {e}", exc_info=True)
return []
def click_modify_button_by_text(self, index):
"""인덱스에 해당하는 '세부사항 수정 및 업로드' 버튼 클릭"""
try:
# config.ini에서 선택자 가져오기
button_template = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template')
button_selector = f'({button_template})[{index}]'
button = self.page.query_selector(button_selector)
# 버튼이 화면에 보이도록 스크롤 후 클릭
if button:
button.scroll_into_view_if_needed()
self.page.evaluate('arguments[0].click();', button)
self.logger.info(f'{index}번째 상품의 수정 버튼 클릭 완료')
else:
self.logger.warning(f'{index}번째 상품의 수정 버튼을 찾지 못했습니다.')
except Exception as e:
self.logger.error(f'{index}번째 상품의 수정 버튼 클릭 중 오류: {str(e)}')
def open_product_edit_dialog(self, button):
"""상품 수정 다이얼로그 열기"""
try:
# 요소가 화면에 없을 경우 스크롤하여 보이도록 함
button.scroll_into_view_if_needed()
self.logger.debug("상품의 '세부사항 수정 및 업로드' 버튼을 화면에 보이도록 스크롤.")
button.click()
self.logger.info("세부사항 수정 다이얼로그 열기 완료.")
self.page.wait_for_selector('div.ant-tabs-nav') # 다이얼로그가 완전히 로딩될 때까지 기다림
except Exception as e:
self.logger.error(f"세부사항 수정 다이얼로그 열기 중 오류: {e}", exc_info=True)
def click_detail_tab(self):
"""상세페이지 탭 클릭"""
try:
self.page.click(self.detail_tab_locator)
self.logger.info("상세페이지 탭 클릭 완료.")
except Exception as e:
self.logger.error(f"상세페이지 탭 클릭 중 오류: {e}", exc_info=True)
def click_option_tab(self):
"""옵션 탭 클릭"""
try:
self.page.click(self.option_tab_locator)
self.logger.info("옵션 탭 클릭 완료.")
except Exception as e:
self.logger.error(f"옵션 탭 클릭 중 오류: {e}", exc_info=True)
def click_price_tab(self):
"""가격 탭 클릭"""
try:
self.page.click(self.price_tab_locator)
self.logger.info("가격 탭 클릭 완료.")
except Exception as e:
self.logger.error(f"가격 탭 클릭 중 오류: {e}", exc_info=True)
def click_thumb_tab(self):
"""썸네일 탭 클릭"""
try:
self.page.click(self.thumb_tab_locator)
self.logger.info("썸네일 탭 클릭 완료.")
except Exception as e:
self.logger.error(f"썸네일 탭 클릭 중 오류: {e}", exc_info=True)
def click_title_tab(self):
"""상품명 탭 클릭"""
try:
self.page.click(self.title_tab_locator)
self.logger.info("상품명 탭 클릭 완료.")
except Exception as e:
self.logger.error(f"상품명 탭 클릭 중 오류: {e}", exc_info=True)
def generate_restored_html(self, urls):
"""이미지 URL 목록을 HTML 형식으로 변환하는 메서드"""
html_content = '<p>&nbsp;</p>'
for url in urls:
html_content += f'<figure class="image"><img src="{url}" style="aspect-ratio:1/1;"></figure>\n'
return html_content
def deleted_img_urls_from_logs(self):
"""로그 파일에서 상품명과 이미지 URL 목록을 추출하여 딕셔너리로 반환하는 메서드"""
image_data = {}
log_dir = os.path.join(os.path.dirname(__file__), "recovery_log")
# 로그 파일에서 필요한 정보만 추출
for log_file in self.log_files:
log_path = os.path.join(log_dir, log_file)
if os.path.exists(log_path):
with open(log_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
current_product = None
for line in lines:
# 상품명 추출
product_match = re.search(r"원본 상품명 '(.+?)'", line)
if product_match:
current_product = product_match.group(1)
image_data[current_product] = []
# 이미지 URL 목록 추출
url_match = re.search(r"fetch_image_urls 에서 추출한 이미지URL 목록 : \[(.+?)\]", line)
if url_match and current_product:
# 각 URL에서 불필요한 작은따옴표 제거
urls = [url.strip("'\"") for url in url_match.group(1).split(", ")]
image_data[current_product].extend(urls)
current_product = None # Reset after each product's URL extraction
self.logger.debug(f"복구된 이미지 URL 데이터: {len(image_data)}")
return image_data
def recovery_image_urls(self, product_name, deleted_img_urls):
"""상품명과 삭제된 이미지 URL 데이터를 이용해 복구 작업을 수행하는 메서드"""
self.logger.debug("상품명과 삭제된 이미지 URL 데이터를 이용해 복구 작업을 수행하는 메서드")
if product_name in deleted_img_urls:
self.logger.info(f'recovery_image_urls: 삭제된 상품명 찾음 : {product_name}')
# 소스 편집 모드로 전환
source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
self.page.click(source_button_locator)
self.logger.debug("recovery_image_urls : 소스 버튼 클릭 완료.")
# 기존 extract_image_urls와 유사하게 HTML 소스를 가져옴
textarea = self.page.wait_for_selector(ck_source_editing_area_locator, timeout=5000)
data_value = textarea.get_attribute("data-value")
# HTML 소스에서 이미지 URL 추출
image_urls = self.fetch_image_urls(data_value)
self.logger.info(f'recovery_image_urls: 현재페이지의 HTML에서 추출된 이미지 URL 수: {len(image_urls)}')
# 이미지 태그가 없으면 로그에서 추출한 데이터를 HTML로 복원하여 입력
if len(image_urls) == 0:
restored_html = self.generate_restored_html(deleted_img_urls[product_name])
self.page.evaluate(f'document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", `{restored_html}`)')
self.logger.debug("recovery_image_urls : 현재 페이지의 HTML의 이미지가 0이므로, 로그 데이터를 이용하여 HTML 복원 완료.")
else:
self.logger.debug("이미 이미지가 있으므로 복원 작업을 패스합니다.")
# 소스 편집 모드 종료
self.page.click(source_button_locator)
self.logger.debug('소스 버튼 재 클릭 완료.')
else:
self.logger.debug(f"로그에 해당 상품명 '{product_name}'에 대한 데이터가 존재하지 않습니다.")
def generate_restored_html(self, urls):
"""이미지 URL 목록을 HTML 형식으로 변환하는 메서드, 각 이미지의 가로세로 비율 추가"""
html_content = '<p>&nbsp;</p>\n'
for url in urls:
width, height = self.get_image_size(url)
aspect_ratio = f"{width}/{height}" if width and height else "1/1"
if width and height:
html_content += (
f'<figure class="image">'
f'<img style="aspect-ratio:{aspect_ratio};" src="{url}" width="{width}" height="{height}">'
f'</figure>\n'
)
else:
# 이미지 크기를 확인할 수 없을 경우 기본 형식으로 추가
html_content += f'<figure class="image"><img src="{url}"></figure>\n'
return html_content
def get_image_size(self, url):
"""이미지 URL로부터 가로와 세로 크기를 가져오는 메서드"""
try:
# URL에서 불필요한 따옴표 제거
cleaned_url = url.strip("'\"")
response = requests.get(cleaned_url, timeout=5)
response.raise_for_status()
image = Image.open(BytesIO(response.content))
return image.width, image.height
except Exception as e:
self.logger.warning(f"이미지 크기 확인 실패 - {cleaned_url}: {e}")
return None, None
def extract_image_urls(self, optionHandler, is_option_data=False):
"""상세페이지에서 이미지 URL 추출"""
try:
# 소스 편집 모드로 전환
source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
# 소스 편집 모드로 전환
self.page.click(source_button_locator)
self.logger.debug("소스 버튼 클릭 완료.")
# 'data-value' 속성 값을 추출 (textarea 요소)
textarea = self.page.wait_for_selector(ck_source_editing_area_locator, timeout=5000)
data_value = textarea.get_attribute("data-value")
# HTML 소스에서 이미지 URL 추출
image_urls = self.fetch_image_urls(data_value)
self.logger.info(f'추출된 이미지 URL 수: {len(image_urls)}')
# HTML 소스에서 이미지 URL 삭제
self.logger.debug('img 태그를 삭제 중...')
data_value_element = self.page.query_selector(ck_source_editing_area_locator)
new_value = ""
if data_value_element:
self.page.evaluate(f'() => document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", "{new_value}")')
updated_value = data_value_element.get_attribute('data-value')
self.logger.debug(f'Updated data-value: {updated_value}')
else:
self.logger.debug('Element with data-value not found.')
self.logger.debug('img 태그 삭제 완료.')
# img 태그의 class 삭제 후 다시 소스 버튼 클릭
self.page.click(source_button_locator)
self.logger.debug('소스 버튼 재 클릭 완료.')
# 텍스트 입력 필드 선택
input_field = self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
input_field.press('Enter')
# 선두부 텍스트 입력
for key in sorted(self.text_templates.keys()):
leading_text = self.text_templates[key]
if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
input_field.type(leading_text)
input_field.press('Enter')
self.logger.info(f"{key} 텍스트 입력 완료: {leading_text}")
option_data = optionHandler.get_selected_translated_options()
self.logger.debug(f'option_data : {option_data}')
# if is_option_data:
if option_data or option_data != {}:
self.logger.debug('옵션 데이터 입력 시작')
# option_data = {} # option_data 초기화
is_single = optionHandler.option_info['is_single_option']
# option_data = optionHandler.get_selected_translated_options()
# is_single = True # 옵션입력 일단 제외
# self.logger.debug('옵션입력 일단 제외')
self.logger.debug('가져온 옵션 데이터')
self.logger.debug(f'{option_data}')
# # 옵션 입력 필드 선택
# input_field = self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
# input_field.press('Enter')
# # 선두부 텍스트 입력
# for key in sorted(self.text_templates.keys()):
# leading_text = self.text_templates[key]
# if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
# input_field.type(leading_text)
# input_field.press('Enter')
# self.logger.info(f"{key} 텍스트 입력 완료: {leading_text}")
if not is_single:
self.logger.info('단일옵션이 아니므로 옵션목록을 입력')
# 각 옵션을 한 줄씩 입력
input_field.type("# 옵션 목록")
input_field.press('Enter')
# 첫 번째 옵션의 번역된 옵션명만 입력
first_key = list(option_data.keys())[0] # key는 옵션이름 value는 가격
first_value = option_data[first_key] # key는 옵션이름 value는 가격
input_field.type(f"- 1. {first_key}")
input_field.press('Enter') # 첫 번째 옵션 이후 엔터로 줄바꿈
# 나머지 옵션도 번역된 옵션명만 입력
for i, (key, value) in enumerate(list(option_data.items())[1:], start=2):
input_field.type(f"{key}") # 2번옵션부터 번호 떼고 입력. key는 옵션이름 value는 가격
input_field.press('Enter') # 엔터 키를 입력하여 줄바꿈
# 목록 끝을 알리기 위해 엔터 두 번 입력
input_field.press('Enter')
input_field.press('Enter')
# 후두부 텍스트 입력
input_field.type('### 나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.')
input_field.press('Enter')
input_field.type('---')
input_field.press('Enter')
self.logger.info('옵션 데이터 입력 완료')
return image_urls
except Exception as e:
self.logger.error(f"이미지 URL 추출 & 옵션데이터 입력 처리 중 오류: {e}", exc_info=True)
return image_urls if image_urls else []
def save_and_ecs_product_edit(self):
"""상품 수정 후 저장 버튼 클릭"""
try:
self.page.click(self.save_button_locator)
self.page.keyboard.press("Escape")
self.logger.info("상품 수정 내용 저장 및 ECS 완료.")
except Exception as e:
self.logger.error(f"저장 버튼 클릭 중 오류: {e}", exc_info=True)
def save_product_edit(self):
"""상품 수정 후 저장 버튼 클릭"""
try:
self.page.click(self.save_button_locator)
self.logger.info("상품 수정 내용 저장 완료.")
except Exception as e:
self.logger.error(f"저장 버튼 클릭 중 오류: {e}", exc_info=True)
def go_to_next_page(self):
"""다음 페이지로 이동"""
try:
# 현재 페이지가 몇 번째 페이지인지 확인 (클래스에 'ant-pagination-item-active'가 있는 요소)
current_page = self.page.query_selector(self.current_page_locator)
if not current_page:
self.logger.warning("현재 페이지 정보를 찾을 수 없습니다.")
return False
# 현재 활성화된 페이지 번호를 가져옴
current_page_number = int(current_page.get_attribute("title"))
self.logger.info(f"현재페이지 : [{current_page_number}]")
next_page_number = current_page_number + 1
# 다음 페이지 버튼을 찾음 (title 속성으로 다음 페이지를 찾음)
next_page_button_locator = self.next_page_button_template.format(page_number=next_page_number)
next_page_button = self.page.query_selector(next_page_button_locator)
if next_page_button:
next_page_button.click() # 페이지 버튼 클릭
# self.page.wait_for_load_state('domcontentloaded') # 페이지 로딩이 완료될 때까지 대기
time.sleep(3)
self.logger.info(f"페이지 {next_page_number}로 이동 완료.")
return True
else:
self.logger.warning("다음 페이지가 없습니다.")
return False
except Exception as e:
self.logger.error(f"다음 페이지로 이동 중 오류 발생: {e}", exc_info=True)
return False
# def switch_to_chrome(self):
# """크롬으로 포커스 전환"""
# try:
# if not self.chrome_hwnd:
# self.chrome_hwnd = self.find_window_by_title(self.chrome_window_name)
# if self.chrome_hwnd:
# win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE)
# win32gui.SetForegroundWindow(self.chrome_hwnd)
# self.logger.debug('크롬 창으로 포커스 이동.')
# else:
# self.logger.warning('크롬 창을 찾을 수 없습니다.')
# except Exception as e:
# self.logger.error(f"크롬 포커스 전환 중 오류: {e}", exc_info=True)
def scroll_with_wheel(self, direction="down", pause_time=0.5, max_scrolls=50):
"""
휠 스크롤을 사용하여 페이지를 위나 아래로 천천히 스크롤.
Parameters:
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
- pause_time: 스크롤 사이의 대기 시간 (초).
- max_scrolls: 최대 스크롤 횟수.
"""
scroll_count = 0
self.logger.debug(f"스크롤 시작")
# 현재 페이지 높이 가져오기
last_height = self.page.evaluate("document.body.scrollHeight")
self.logger.debug(f"현재 페이지 높이 가져오기 - {last_height}")
while scroll_count < max_scrolls:
if direction == "down":
# 아래로 스크롤
self.logger.debug(f"scroll_count[{scroll_count}]회 : 휠 아래로 1000px")
self.page.evaluate("window.scrollBy(0, 1000);")
elif direction == "up":
# 위로 스크롤
self.logger.debug(f"scroll_count[{scroll_count}]회 : 휠 위로 1000px")
self.page.evaluate("window.scrollBy(0, -1000);")
else:
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
self.logger.debug(f"pause_time 슬립 : {pause_time}")
time.sleep(pause_time)
# 새로운 페이지 높이 가져오기
new_height = self.page.evaluate("document.body.scrollHeight")
self.logger.debug(f"새로운 페이지 높이 가져오기 - {new_height}")
# 스크롤이 더 이상 필요 없는 경우(페이지 끝에 도달)
if direction == "down" and new_height == last_height:
self.logger.debug(f"페이지 끝에 도달했습니다. 스크롤 횟수: {scroll_count}")
break
elif direction == "up" and new_height == 0:
self.logger.debug(f"페이지 시작에 도달했습니다. 스크롤 횟수: {scroll_count}")
break
self.logger.debug(f"새로운 페이지 높이를 현재높이로 재설정 - {new_height}")
last_height = new_height
scroll_count += 1
self.logger.debug(f"스크롤 카운트 + 1")
if scroll_count == max_scrolls:
self.logger.debug("최대 스크롤 횟수에 도달했습니다.")
def scroll_with_keyboard(self, direction="down", pause_time=0.5, max_scrolls=50):
"""
키보드를 사용하여 페이지를 위나 아래로 천천히 스크롤.
Parameters:
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
- pause_time: 스크롤 사이의 대기 시간 (초).
- max_scrolls: 최대 스크롤 횟수.
"""
scroll_count = 0
while scroll_count < max_scrolls:
if direction == "down":
# 아래로 스크롤 (Page Down 키 사용)
self.page.keyboard.press("PageDown")
elif direction == "up":
# 위로 스크롤 (Page Up 키 사용)
self.page.keyboard.press("PageUp")
else:
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
time.sleep(pause_time)
scroll_count += 1
if scroll_count == max_scrolls:
self.logger.debug("최대 스크롤 횟수에 도달했습니다.")
def collect_product_info(self, items_per_page, ed_mode):
"""
상품 정보를 수집하는 메서드
"""
try:
product_infos = []
product_name_elements = [] # product_name_element를 저장할 리스트
# ed_mode에 따라 product_elements 설정
if ed_mode:
# 각 상품의 이름, 가격, 이미지를 위한 선택자 리스트 구성 (index가 2부터 시작)
product_elements = [
{
"name": self.product_name_for_ed_template.format(index=i),
"price": self.product_price_for_ed_template.format(index=i),
"image": self.product_image_for_ed_template.format(index=i)
}
for i in range(2, items_per_page + 2) # index가 2부터 시작하도록 설정
]
else:
# ed_mode=False일 때는 각 상품의 부모 요소를 모두 선택
product_elements = self.page.query_selector_all(self.product_parent_locator)
for i, element in enumerate(product_elements[:items_per_page], start=1):
try:
if ed_mode:
# ed_mode=True일 때는 각 상품의 개별 선택자 사용
product_name_element = self.page.wait_for_selector(element["name"], timeout=3000, state="attached")
product_price_element = self.page.wait_for_selector(element["price"], timeout=3000, state="attached")
product_image_element = self.page.wait_for_selector(element["image"], timeout=3000, state="attached")
else:
# ed_mode=False일 때 부모 요소 내의 선택자를 사용
product_name_element = self.page.wait_for_selector(self.product_name_inner_locator, timeout=3000, state="attached")
product_price_element = self.page.wait_for_selector(self.product_price_inner_locator, timeout=3000, state="attached")
product_image_element = self.page.wait_for_selector(self.product_image_inner_locator, timeout=3000, state="attached")
# 요소가 존재하면 정보 추출
self.logger.debug(f"product_name_element : {product_name_element}")
self.logger.debug(f"product_price_element : {product_price_element}")
self.logger.debug(f"product_image_element : {product_image_element}")
if product_name_element and product_price_element and product_image_element:
# await의 결과를 각 변수에 저장
product_name_text = (product_name_element.inner_text()).strip()
product_price_text = (product_price_element.inner_text()).strip()
product_image_url = product_image_element.get_attribute('src')
# product_info 딕셔너리에 결과 저장
product_info = {
"name": product_name_text,
"price": product_price_text,
"image_url": product_image_url
}
self.logger.debug(f"상품 {i}: {product_info}")
product_infos.append(product_info)
product_name_elements.append(product_name_element) # 각 product_name_element 추가
except Exception as e:
self.logger.error(f"상품 {i} 정보 수집 중 오류 발생: {e}", exc_info=True)
continue
return product_infos, product_name_elements # product_infos와 product_name_elements 함께 반환
except Exception as e:
self.logger.error(f"상품 정보 수집 중 오류 발생: {e}", exc_info=True)
return []
def scroll_page_to_bottom(self, pause_time=0.2):
"""페이지의 맨 아래까지 스크롤하여 모든 동적 요소를 로드"""
self.logger.info('페이지 스크롤 시작...')
previous_height = self.page.evaluate("() => document.body.scrollHeight")
while True:
self.page.evaluate("window.scrollBy(0, window.innerHeight);") # 한 화면씩 스크롤
time.sleep(pause_time) # 페이지 로딩 대기
current_height = self.page.evaluate("() => document.body.scrollHeight")
if current_height == previous_height:
break # 더 이상 스크롤할 내용이 없으면 종료
previous_height = current_height
self.logger.info('페이지 스크롤 완료.')
def scroll_page_to_top(self, pause_time=0.2):
"""페이지의 맨 위까지 스크롤"""
self.logger.info('페이지 위로 스크롤 시작...')
previous_height = self.page.evaluate("() => window.pageYOffset")
while previous_height > 0:
self.page.evaluate("window.scrollBy(0, -window.innerHeight);") # 한 화면씩 위로 스크롤
time.sleep(pause_time) # 페이지 로딩 대기
current_height = self.page.evaluate("() => window.pageYOffset")
if current_height == previous_height:
break # 더 이상 스크롤할 내용이 없으면 종료
previous_height = current_height
self.logger.info('페이지 위로 스크롤 완료.')
def start_Percenty_task(self):
self.logger.debug('퍼센티 상품수정 작업을 시작합니다...')
self.running = True # 번역 작업이 시작됨
try:
# 1. 총 상품 수 수집
self.scroll_page_to_bottom() # 동적 로딩을 위해 끝까지 스크롤
# total_products = self.browser_controller.get_total_product_count(ed_mode=self.toggle_states['ed_mode'])
# get_total_product_count 메서드 호출 후 결과를 딕셔너리로 받음
result = self.get_total_product_count()
# 딕셔너리에서 총 상품 수와 페이지당 상품 수를 추출
total_products = result.get("total_count", 0)
items_per_page = result.get("items_per_page", 0)
self.logger.debug(f"총 상품 수: {total_products}, 페이지당 상품 수: {items_per_page}")
self.logger.debug(f"[ self.toggle_states 상태 ]\n{self.toggle_states}")
if total_products == 0:
self.logger.debug('수집할 상품이 없습니다. 작업을 종료합니다.')
return
completed_count = 0
page_number = 1
# 3. 총 상품 수만큼 반복 작업 수행
while completed_count < total_products:
self.logger.debug(f'현재 페이지: {page_number}')
if not page_number == 1:
self.scroll_page_to_top()
self.logger.debug(f'1페이지가 아니므로 상품의 동적로딩을 위해 휠 스크롤 업')
is_ed_mode = self.toggle_states.get('ed_mode', False)
if not is_ed_mode:
# 4. 현재 페이지의 모든 "세부사항 수정 및 업로드" 버튼 찾기
self.logger.debug('수정모드가 아니므로 상품수정 버튼 elements를 수집합니다.')
product_buttons = self.get_product_edit_buttons_by_templete()
else:
self.logger.debug('상품정보 수집')
product_infos, product_name_elements = self.collect_product_info(items_per_page, ed_mode=is_ed_mode)
self.logger.debug(f"product_infos : {product_infos}")
self.logger.debug('수정모드이므로 상품명 elements를 수정버튼으로 활용합니다.')
product_buttons = product_name_elements
self.logger.debug(f"product_buttons 갯수 : [{len(product_buttons)}]개")
if not product_buttons:
self.logger.debug('수정할 상품이 없습니다. 작업을 종료합니다.')
break
# 5. 각 상품에 대해 상품수정작업 수행
for index, button in enumerate(product_buttons, start=1):
# 상품명 수집 오류 처리
self.logger.debug(f'{index}/{len(product_buttons)} 버튼의 활성상태 확인 중...')
is_disabled = self.is_button_disabled(button)
if is_disabled:
self.logger.debug(f'{index}/{len(product_buttons)}: 상품의 수정버튼이 비활성화되어 있어 작업을 건너뜁니다.')
continue
self.logger.debug(f'{index}/{len(product_buttons)}: 세부사항 수정 작업 중...')
# 상품 수정 다이얼로그 열기
self.open_product_edit_dialog(button)
# 상품명과 카테고리 수집
product_name = self.titleHandler.get_original_product_name() # 원본상품명 가져오기
product_category = self.titleHandler.get_category(market='ss') # 카테고리 가져오기
# 옵션 수정
is_optionTrnas = self.toggle_states.get('optionTrnas')
is_optionIMGTrans = self.toggle_states.get('optionIMGTrans')
is_optionAutoSelect = self.toggle_states.get('optionAutoSelect')
if is_optionTrnas or is_optionIMGTrans or is_optionAutoSelect:
self.logger.debug(f"옵션수정 : optionTrnas={is_optionTrnas} + optionIMGTrans={is_optionIMGTrans} + optionAutoSelect={is_optionAutoSelect}")
self.edit_option(product_name)
# 가격 수정
is_price = self.toggle_states.get('price')
if is_price:
self.logger.debug(f"가격수정 : {is_price} ")
self.edit_price(product_category)
# 썸네일 수정
thumb = self.toggle_states.get('thumb')
if thumb:
self.logger.debug(f"썸네일수정 : {thumb} ")
self.edit_thumb()
# 태그 수정
if self.toggle_states.get('tag'):
pass
# 상품명 수정
if self.toggle_states.get('title'):
pass
# 상세페이지 수정
detail_Option = self.toggle_states.get('detail_Option')
detail_IMGTrans = self.toggle_states.get('detail_IMGTrans')
recovery_mode = self.toggle_states.get('recovery_mode')
if detail_Option or detail_IMGTrans or recovery_mode:
self.logger.debug(f"상세페이지 수정 : {detail_Option} + {detail_IMGTrans}")
if recovery_mode:
deleted_imgs = self.deleted_img_urls_from_logs()
self.detail_trans_for_recovery(product_name, deleted_imgs)
else:
self.detail_trans()
# 수정 후 저장
self.logger.debug('상품 세부사항 저장 중...')
self.save_and_ecs_product_edit()
completed_count += 1
self.logger.info(f'{completed_count}/[{total_products}]개 상품 수정 완료.')
if completed_count >= total_products:
self.logger.info(f'[{total_products}]개 상품 수정이 완료되었습니다.')
return
# 6. 다음 페이지로 이동 (있으면)
if not self.go_to_next_page():
self.logger.info('더 이상 페이지가 없습니다. 작업을 종료합니다.')
break
page_number += 1
if self.running:
self.logger.info('모든 상품 번역 및 저장 완료.')
except Exception as e:
self.logger.error(f"번역 작업 중 오류 발생: {e}", exc_info=True)
def detail_trans(self):
try:
# 상세페이지 탭 클릭
self.click_detail_tab()
# # 텍스트 입력 필드 선택
input_field2 = self.page.locator('div#productMainContentContainerId div.ck.ck-editor__main > div')
input_field2.click()
self.logger.debug(f"입력필드2 선택")
# 소개글과 옵션데이터 입력
self.page.keyboard.press("ArrowLeft")
self.page.keyboard.press("Enter")
self.page.keyboard.press("ArrowUp")
self.input_detail_text(self.optionHandler)
# CKEditor HTML 데이터 가져오기
html_data = self.get_ckeditor_data()
self.logger.debug(f"가져온 원본 CKEditor html_data : {html_data}")
# 이미지 URL 추출
soup = BeautifulSoup(html_data, 'html.parser')
images = soup.find_all('img', src=True)
original_images = [] # 원본 이미지 URL 저장
self.logger.debug(f"가져온 이미지 갯수: {len(images)}")
for index, img in enumerate(images):
original_url = img.get('src')
self.logger.debug(f"[{index}]번째 original_url : {original_url}")
if not original_url:
continue
original_images.append(original_url) # 원본 URL 저장
# 이미지 번역을 위한 임시 파일 경로 생성
img_path = os.path.join(self.temp_dir, f"translated_detail_image_{index}.png")
self.logger.debug(f"[{index}]번째 임시 저장PATH 생성 : {img_path}")
self.logger.debug(f"웨일 브라우저를 활용한 이미지 번역 프로세스")
is_translated = self.whale_translator.translate_image(original_url)
# self.switch_to_chrome() # 크롬으로 포커스 이동
translated_img_path = self.clipboardImageManager.process_clipboard_to_save_path(original_url=original_url, is_success_translated=is_translated, toggle_states=self.toggle_states, path=img_path) # 클립보드 내용을 처리
self.logger.info(f"번역완료된 임시 translated_img의 로컬 path: {translated_img_path}")
# 번역된 이미지 업로드
self.upload_image(translated_img_path)
self.clipboardImageManager.clear_clipboard()
self.logger.info(f"{index}번째 이미지가 번역완료되어 클립보드를 초기화 했습니다.")
if not is_translated:
self.logger.info(f"[{index}]번째 이미지 번역 실패로 원본이미지 삭제목록에서 제외합니다.")
original_images.remove(original_url) # 번역 실패 시 제거
continue
self.logger.info("모든 이미지가 처리되었습니다.")
self.page.keyboard.press("Enter")
self.page.keyboard.press("Enter")
self.page.keyboard.press("Enter")
# 원본 이미지 제거
time.sleep(1)
html_data = self.get_ckeditor_data() # 최신 HTML 가져오기
modified_html = self.remove_original_images(html_data)
# 수정된 HTML을 CKEditor에 다시 삽입
self.set_ckeditor_data(modified_html)
# self.logger.info(f"html_data : {html_data}")
# self.logger.info(f"original_images : {original_images}")
# self.logger.info(f"modified_html : {modified_html}")
self.logger.info("원본 이미지가 제거된 HTML 데이터가 CKEditor에 삽입되었습니다.")
except Exception as e:
self.logger.error(f"CKEditor 이미지 처리 중 오류 발생: {e}", exc_info=True)
# 수정 후 저장
self.logger.info('상품 세부사항 저장 중...')
self.save_product_edit()
def remove_original_images(self, html_data):
"""
CKEditor HTML 데이터에서 연속된 엔터 3번 친 부분 이후의 데이터를 삭제합니다.
:param html_data: CKEditor의 HTML 데이터
:return: 수정된 HTML 데이터
"""
try:
self.logger.debug("HTML 데이터에서 엔터 세 번 친 부분 이후 제거 작업 시작")
# BeautifulSoup으로 HTML 데이터 파싱
soup = BeautifulSoup(html_data, 'html.parser')
# 모든 <p>&nbsp;</p> 태그를 찾음
enter_tags = soup.find_all('p', string="\xa0")
self.logger.debug(f"찾은 <p>&nbsp;</p> 태그 수: {len(enter_tags)}")
# 연속된 <p>&nbsp;</p> 태그 3개를 찾음
count = 0
start_removal = None
for i, tag in enumerate(enter_tags):
if i > 0 and tag.previous_sibling == enter_tags[i - 1]:
count += 1
else:
count = 1
if count == 3: # 연속된 세 번째 태그 발견
start_removal = tag
self.logger.debug(f"엔터 세 번 연속된 마지막 태그 발견: {start_removal}")
break
# 연속된 엔터 태그 이후 데이터 제거
if start_removal:
# 기준 태그 이후의 모든 형제를 삭제
for sibling in list(start_removal.find_all_next()):
sibling.decompose()
# 수정된 HTML 반환
modified_html = str(soup)
# 옵션목록 중 빈옵션 삭제
modified_html = self.remove_empty_list_items(modified_html)
self.logger.debug(f"최종 수정된 HTML 데이터: {modified_html}")
return modified_html
except Exception as e:
self.logger.error(f"HTML 데이터 수정 중 오류 발생: {e}", exc_info=True)
return html_data # 오류 발생 시 원본 데이터 반환
def remove_empty_list_items(self, html_data):
"""
HTML 데이터에서 &nbsp;만 포함된 <li> 요소를 제거합니다.
:param html_data: CKEditor의 HTML 데이터
:return: 수정된 HTML 데이터
"""
try:
# BeautifulSoup으로 HTML 데이터 파싱
soup = BeautifulSoup(html_data, 'html.parser')
# 모든 <li> 태그 찾기
list_items = soup.find_all('li')
# &nbsp;만 포함된 <li> 태그 제거
for item in list_items:
if item.get_text(strip=True) == "\xa0": # &nbsp;는 \xa0로 표시됨
item.decompose() # 태그 제거
# 수정된 HTML 반환
modified_html = str(soup)
return modified_html
except Exception as e:
print(f"오류 발생: {e}")
return html_data # 오류 발생 시 원본 데이터 반환
# def remove_original_images_with_count(self, html_data, original_images):
# """
# CKEditor HTML 데이터에서 original_images의 개수만큼 뒤에서부터 <img> 태그를 제거하되,
# 업로드된 이미지와 중복된 URL은 제외합니다.
# :param html_data: CKEditor의 HTML 데이터
# :param original_images: 원본 이미지 URL 리스트
# :return: 수정된 HTML 데이터
# """
# try:
# self.logger.debug("원본 이미지 제거 작업 시작")
# # HTML 데이터를 BeautifulSoup으로 파싱
# soup = BeautifulSoup(html_data, 'html.parser')
# # 모든 <img> 태그 탐색
# images = soup.find_all('img', src=True)
# self.logger.debug(f"HTML에서 찾은 이미지 태그 수: {len(images)}")
# # 뒤에서부터 original_images의 개수만큼 제거
# num_to_remove = len(original_images)
# self.logger.debug(f"제거할 이미지 갯수: {num_to_remove}")
# if num_to_remove > len(images):
# self.logger.warning("제거할 이미지 수가 전체 이미지 수를 초과합니다. 모든 이미지를 제거합니다.")
# num_to_remove = len(images)
# removed_count = 0 # 삭제된 이미지 수 카운트
# for img in reversed(images): # 뒤에서부터 순회
# src = img.get('src')
# if src in original_images: # 원본 이미지 URL에 해당하는 경우만 삭제
# self.logger.debug(f"원본 이미지 제거: {src}")
# img.decompose()
# removed_count += 1
# if removed_count >= num_to_remove: # 지정된 개수만큼 삭제되면 중단
# break
# # 수정된 HTML을 반환
# modified_html = str(soup)
# self.logger.debug(f"수정된 HTML 데이터: {modified_html}")
# return modified_html
# except Exception as e:
# self.logger.error(f"원본 이미지 제거 중 오류 발생: {e}", exc_info=True)
# return html_data # 오류 발생 시 원본 데이터 반환
def get_ckeditor_data(self):
try:
get_editor_data_script = """
(function() {
const editorElement = document.querySelector('.ck-editor__editable');
if (!editorElement) {
console.error('CKEditor 요소를 찾을 수 없습니다.');
return null;
}
const editorInstance = editorElement.ckeditorInstance;
if (!editorInstance) {
console.error('CKEditor 인스턴스를 가져올 수 없습니다.');
return null;
}
const editorData = editorInstance.getData(); // 현재 편집 중인 HTML 데이터 가져오기
console.log('현재 CKEditor 데이터:', editorData);
return editorData;
})();
"""
ck_editor_data = self.page.evaluate(get_editor_data_script)
self.logger.info(f"CKEditor 데이터: {ck_editor_data}")
return ck_editor_data
except Exception as e:
self.logger.error(f"CKEditor 데이터를 가져오는 중 오류 발생: {e}", exc_info=True)
return None
def set_ckeditor_data(self, html_data):
"""
CKEditor에 HTML 데이터를 설정합니다.
:param html_data: 삽입할 HTML 데이터
"""
try:
self.logger.debug("CKEditor에 HTML 데이터를 설정하는 작업 시작")
# JavaScript 스크립트를 통해 CKEditor에 데이터 설정
set_data_script = f"""
(function() {{
const editorElement = document.querySelector('.ck-editor__editable');
if (!editorElement) {{
console.error('CKEditor 요소를 찾을 수 없습니다.');
return;
}}
const editorInstance = editorElement.ckeditorInstance;
if (!editorInstance) {{
console.error('CKEditor 인스턴스를 가져올 수 없습니다.');
return;
}}
editorInstance.setData(`{html_data}`);
console.log('CKEditor에 새로운 데이터가 성공적으로 설정되었습니다.');
}})();
"""
# 스크립트 실행
self.page.evaluate(set_data_script)
self.logger.info("CKEditor에 HTML 데이터를 성공적으로 설정했습니다.")
except Exception as e:
self.logger.error(f"CKEditor 데이터 설정 중 오류 발생: {e}", exc_info=True)
def input_detail_text(self, optionHandler):
try:
# 텍스트 입력 필드 선택
input_field = self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
# input_field.press('Enter')
# 선두부 텍스트 입력
for key in sorted(self.text_templates.keys()):
leading_text = self.text_templates[key]
if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
input_field.type(leading_text)
input_field.press('Enter')
self.logger.info(f"{key} 텍스트 입력 완료: {leading_text}")
option_data = optionHandler.get_selected_translated_options()
self.logger.debug(f'option_data : {option_data}')
# if is_option_data:
if option_data or option_data != {}:
self.logger.debug('옵션 데이터 입력 시작')
is_single = optionHandler.option_info['is_single_option']
self.logger.debug('가져온 옵션 데이터')
self.logger.debug(f'{option_data}')
# # 옵션 입력 필드 선택
# input_field = self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
# input_field.press('Enter')
if not is_single:
self.logger.info('단일옵션이 아니므로 옵션목록을 입력')
# 각 옵션을 한 줄씩 입력
input_field.type("# 옵션 목록")
input_field.press('Enter')
# 첫 번째 옵션의 번역된 옵션명만 입력
first_key = list(option_data.keys())[0] # key는 옵션이름 value는 가격
first_value = option_data[first_key] # key는 옵션이름 value는 가격
input_field.type(f"- 1. {first_key}")
input_field.press('Enter') # 첫 번째 옵션 이후 엔터로 줄바꿈
# 나머지 옵션도 번역된 옵션명만 입력
for i, (key, value) in enumerate(list(option_data.items())[1:], start=2):
input_field.type(f"{key}") # 2번옵션부터 번호 떼고 입력. key는 옵션이름 value는 가격
input_field.press('Enter') # 엔터 키를 입력하여 줄바꿈
# 목록 끝을 알리기 위해 엔터 두 번 입력
input_field.press('Enter')
input_field.press('Enter')
# 후두부 텍스트 입력
input_field.type('### 나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.')
input_field.press('Enter')
input_field.type('---')
input_field.press('Enter')
self.logger.info('옵션 데이터 입력 완료')
except Exception as e:
self.logger.error(f"옵션데이터 입력 처리 중 오류: {e}", exc_info=True)
def upload_image(self, img_path):
"""
이미지를 CKEditor에 업로드하고 가장 첫 번째 이미지 URL을 추출 후 삭제합니다.
:param img_path: 업로드할 이미지 파일 경로
:return: 업로드된 이미지의 URL
"""
try:
# 1. 파일 업로드
file_input_selector = 'div#productMainContentContainerId button[type="button"].ck.ck-button.ck-off.ck-file-dialog-button.ck-splitbutton__action > input'
file_input = self.page.locator(file_input_selector)
file_input.set_input_files(img_path)
self.logger.info(f"이미지가 성공적으로 업로드되었습니다: {img_path}")
# 2. 엔터 키 입력
self.page.keyboard.press("Enter")
except Exception as e:
self.logger.error(f"이미지 업로드 중 오류 발생: {e}", exc_info=True)
# def detail_trans_ori(self):
# # 상세페이지 탭 클릭
# self.click_detail_tab()
# # 텍스트 입력 필드 선택
# input_field = self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
# input_field.press('Enter')
# get_editor_data_script = """
# (function() {
# const editorElement = document.querySelector('.ck-editor__editable');
# if (!editorElement) {
# console.error('CKEditor 요소를 찾을 수 없습니다.');
# return null;
# }
# const editorInstance = editorElement.ckeditorInstance;
# if (!editorInstance) {
# console.error('CKEditor 인스턴스를 가져올 수 없습니다.');
# return null;
# }
# const editorData = editorInstance.getData(); // 현재 편집 중인 HTML 데이터 가져오기
# console.log('현재 CKEditor 데이터:', editorData);
# return editorData;
# })();
# """
# original_editor_data = self.page.evaluate(get_editor_data_script)
# self.logger.info(f"원본 CKEditor 데이터: {original_editor_data}")
# # 이미지 URL 추출
# image_urls = self.extract_image_urls(self.optionHandler, is_option_data=True) # 코루틴 실행
# total_images = len(image_urls)
# self.logger.info(f"현재 상품의 총 이미지 수 : {total_images}개")
# # 이미지 번역 작업 진행
# for index, url in enumerate(image_urls):
# current_image_count = index +1
# # 임시 파일 경로 생성
# img_path = os.path.join(self.temp_dir, f"translated_detail_image_{index}.png")
# self.logger.debug(f"웨일 브라우저를 활용한 이미지 번역 프로세스")
# is_success_translated = self.whale_translator.translate_image(url)
# # is_paste_success = self.paste_image_in_chrome(self.clipboardImageManager, url, is_success_translated, self.toggle_states)
# is_paste_success = self.paste_image_in_chrome_to_save_path(url, img_path, is_success_translated, self.toggle_states)
# if is_paste_success:
# self.logger.debug(f"{url} gui 이미지 붙여넣기 성공")
# else:
# self.logger.debug(f"{url} gui 이미지 붙여넣기 실패")
# current_image_count += 1
# # 수정 후 저장
# self.logger.info('상품 세부사항 저장 중...')
# self.save_product_edit()
def detail_trans_for_recovery(self, product_name, deleted_imgs):
# 상세페이지 탭 클릭
self.click_detail_tab()
self.logger.debug('recovery_image_urls 메서드 호출')
self.recovery_image_urls(product_name, deleted_imgs)
# 수정 후 저장
self.logger.info('상품 세부사항 저장 중...')
self.save_product_edit()
def edit_option(self, product_name):
# 상세페이지 탭 클릭
self.click_option_tab()
# 옵션 최대선택갯수
max_option_count = self.toggle_states.get('max_option_count', 0)
self.current_options_info = self.optionHandler.process_options(product_name, max_option_count, self.toggle_states)
# 수정 후 저장
self.save_product_edit()
def edit_price(self, product_category):
# 상세페이지 탭 클릭
self.click_price_tab()
# 가격 수정 프로세스
self.priceHandler.process_price(category=product_category, is_group = self.titleHandler.is_group_ESM)
# 수정 후 저장
self.save_product_edit()
def edit_thumb(self):
# 상세페이지 탭 클릭
self.click_thumb_tab()
# 가격 수정 프로세스
self.thumbnailHandler.process_thumbnails()
# 수정 후 저장
self.save_product_edit()
# def run(self):
# """QThread의 run 메서드: 실행할 Playwright 관련 작업 추가"""
# self.logger.debug("QThread가 시작되었습니다.")
# # 브라우저 시작 및 초기화
# try:
# self.start_browser()
# self.browser_started.emit() # 브라우저 성공적으로 시작됨
# except Exception as e:
# self.logger.error(f"브라우저 실행 중 오류 발생: {e}")
# self.browser_error.emit(str(e)) # GUI에 오류 전달
def run(self):
"""QThread의 run 메서드: 실행할 Playwright 관련 작업 추가"""
self.logger.debug("QThread가 시작되었습니다.")
try:
# Playwright 시작
self.start_browser()
self.browser_started.emit() # 브라우저 성공 신호
# 무한 루프로 대기하며 GUI와 상호작용
while True:
if self.isInterruptionRequested(): # QThread가 종료 요청을 받았는지 확인
self.logger.debug("QThread 종료 요청 수신")
break
time.sleep(0.1) # CPU 점유율 낮추기 위해 대기
except Exception as e:
self.logger.error(f"브라우저 실행 중 오류 발생: {e}")
self.browser_error.emit(str(e)) # GUI에 오류 전달
finally:
# 종료 시 Playwright 리소스 정리
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
self.logger.debug("브라우저 및 Playwright 리소스 정리 완료")
def stop(self):
"""Playwright 리소스 정리 및 스레드 종료"""
self.logger.debug("stop - Playwright 종료")
try:
if self.browser:
self.browser.close() # 브라우저 닫기
self.logger.info("브라우저가 종료되었습니다.")
if self.playwright:
self.playwright.stop() # Playwright 인스턴스 종료
self.logger.info("Playwright가 종료되었습니다.")
if self.whale_translator:
self.logger.debug("WhaleTranslator 종료 중...")
self.whale_translator.stop_whale_browser() # WhaleTranslator 종료
self.whale_translator = None
self.logger.debug("BrowserController 종료 완료.")
except Exception as e:
self.logger.error(f"Playwright 종료 중 오류 발생: {e}")
def terminate(self):
"""스레드 종료 및 리소스 정리"""
self.logger.info("스레드 종료 요청")
try:
self.stop() # Playwright 리소스 정리
self.logger.info("Playwright 리소스가 정리되었습니다.")
except Exception as e:
self.logger.error(f"terminate 중 오류 발생: {e}")
finally:
super().terminate() # QThread의 기본 종료 메서드 호출
self.logger.info("스레드가 종료되었습니다.")
def cleanup(self):
pass
def check_pause(self):
"""일시 정지 상태라면 재개될 때까지 대기"""
self.pause_mutex.lock()
if self.is_paused:
self.pause_condition.wait(self.pause_mutex)
self.pause_mutex.unlock()
def pause(self):
"""스레드 일시 정지"""
self.pause_mutex.lock()
self.is_paused = True
self.pause_mutex.unlock()
self.logger.debug("스레드가 일시 정지되었습니다.")
def resume(self):
"""스레드 재개"""
self.pause_mutex.lock()
self.is_paused = False
self.pause_condition.wakeAll()
self.pause_mutex.unlock()
self.logger.debug("스레드가 재개되었습니다.")