AutoPercenty3/browser_control.py

2498 lines
127 KiB
Python

from playwright.async_api import async_playwright, TimeoutError
from PySide6.QtCore import QThread, Signal, QMutex, QWaitCondition
import re
# import pyautogui
import time
import win32gui, win32con
from bs4 import BeautifulSoup
import asyncio
import os, sys, random
import requests
from PIL import Image
from io import BytesIO
import psutil
# from whale_new import WhaleTranslator
from src.wh_search import WhaleSearchParser
from src.wh_img_trans import WhaleImageTranslator
from src.whale_new import WhaleTranslator
from clipboardImageManager import ClipboardImageManager
from option import OptionHandler
from price import PriceHandler
from titleGenerator import TitleGenerator
from titleManager.sp_ForbiddenM import SupabaseForbiddenWordManager
from tags import TagsHandler
from titleManager.gpt_client import GPTClient
from thumb import ThumbnailHandler
import tempfile
import logging
class BrowserController(QThread):
# 브라우저 시작 시그널 정의
browser_started = Signal()
browser_error = Signal(str)
# 번역 작업 시작, 완료, 오류 시그널 정의
translation_started = Signal()
translation_completed = Signal(int)
translation_error = Signal(str)
start_stage_signal = Signal(int)
complete_stage_signal = Signal(int)
total_progressbar_signal = Signal(int, int) # (현재 값, 총 값)
update_detail_progress_signal = Signal(int, int) # (현재 값, 총 값)
set_progress_visible_signal = Signal(bool) # ProgressBar 표시/숨김
percentyJob_button_Enable = Signal(bool)
selected_group_name_signal = Signal(str) # 선택된 그룹 이름 시그널
def __init__(self, app, logger, locator_manager, price_setting_diag, detail_text_widget, login_infos, toggle_states, user_id, supabase_manager):
super().__init__()
self.app = app # AutoPercentyGUI 객체 저장
self.logger = logger
self.log_files = ["appTranslator.log", "appTranslator.log.1", "appTranslator.log.2", "appTranslator.log.3", "appTranslator.log.4", "appTranslator.log.5"]
self.locator_manager = locator_manager
self.price_setting_diag = price_setting_diag
self.detail_text_widget = detail_text_widget
self.toggle_states = toggle_states
self.login_infos = login_infos
self.user_id = user_id
self.supabase_manager = supabase_manager
self.parsing_page = None
self.chrome_hwnd = None
# 임시 디렉토리 생성
self.temp_dir = tempfile.mkdtemp() # 임시 디렉토리 생성
self.whale_translator = WhaleTranslator(logger)
self.playwright = None
self.browser = None
self.page = None
self.loop = None # 이벤트 루프를 저장할 변수
# 일시 정지 기능을 위한 QMutex와 QWaitCondition 설정
self.pause_mutex = QMutex()
self.pause_condition = QWaitCondition()
self.is_paused = False # 일시 정지 상태 플래그
self.gpt_client = GPTClient(logger=self.logger)
self.forbidden_word_manager = SupabaseForbiddenWordManager(self.logger, self.supabase_manager, self.user_id)
self.clipboardImageManager = ClipboardImageManager(logger=self.logger, watermark_font_size=36, debug_flag=self.toggle_states['debug_mode'])
self.optionHandler = OptionHandler(self.locator_manager, self, self.whale_translator, self.clipboardImageManager, self.logger, self.gpt_client, self.update_detail_progress_signal, self.set_progress_visible_signal, debug_flag=self.toggle_states['debug_mode'])
self.priceHandler = PriceHandler(self.locator_manager, self, self.logger, self.optionHandler, self.price_setting_diag, debug_flag=self.toggle_states['debug_mode'])
self.thumbnailHandler = ThumbnailHandler(self.locator_manager, self, self.logger, self.whale_translator, self.clipboardImageManager, self.toggle_states, self.update_detail_progress_signal, self.set_progress_visible_signal)
self.titleGenerator = TitleGenerator(self.locator_manager, self, self.logger, self.whale_translator, self.toggle_states, self.gpt_client, self.forbidden_word_manager, self.user_id, self.supabase_manager)
self.tagsHandler = TagsHandler(self.locator_manager, self, self.logger, self.toggle_states)
# self.trans_browser = WhaleImageTranslator(logger)
# BrowserController에 해당하는 모든 locator를 정의
self.chrome_window_name = self.locator_manager.get_locator('BrowserControl', 'chrome_window_name')
self.login_email_locator = self.locator_manager.get_locator('BrowserControl', 'login_email_locator')
self.login_password_locator = self.locator_manager.get_locator('BrowserControl', 'login_password_locator')
self.login_button_locator = self.locator_manager.get_locator('BrowserControl', 'login_button_locator')
self.admin_toggle_locator = self.locator_manager.get_locator('BrowserControl', 'admin_toggle_locator')
self.staff_id_locator = self.locator_manager.get_locator('BrowserControl', 'staff_id_locator')
self.staff_login_button_locator = self.locator_manager.get_locator('BrowserControl', 'staff_login_button_locator')
self.close_ad_dialog_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_dialog_locator')
self.close_ad_button_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_button_locator')
self.total_product_count_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_locator')
self.total_product_count_for_registed_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_for_registed_locator')
self.items_per_page_locator = self.locator_manager.get_locator('BrowserControl', 'items_per_page_locator')
self.product_edit_dialog_open_locator = self.locator_manager.get_locator('BrowserControl', 'product_edit_dialog_open_locator')
self.product_parent_locator= self.locator_manager.get_locator('BrowserControl', 'product_parent_locator')
self.product_name_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_name_inner_locator')
self.product_price_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_price_inner_locator')
self.product_image_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_image_inner_locator')
self.product_name_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_name_for_ed_template')
self.product_price_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_price_for_ed_template')
self.product_image_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_image_for_ed_template')
self.current_page = self.locator_manager.get_locator('BrowserControl', 'current_page')
self.next_page_button_template = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template')
self.new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
self.registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
self.current_page_locator = self.locator_manager.get_locator('BrowserControl', 'current_page_locator')
self.source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
self.ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
self.cke_text_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'cke_text_editing_area_locator')
self.cke_img_file_input_locator = self.locator_manager.get_locator('BrowserControl', 'cke_img_file_input_locator')
self.option_input_field_locator = self.locator_manager.get_locator('BrowserControl', 'option_input_field_locator')
self.title_tab_locator = self.locator_manager.get_locator('BrowserControl', 'title_tab_locator')
self.option_tab_locator = self.locator_manager.get_locator('BrowserControl', 'option_tab_locator')
self.price_tab_locator = self.locator_manager.get_locator('BrowserControl', 'price_tab_locator')
self.tag_tab_locator = self.locator_manager.get_locator('BrowserControl', 'tag_tab_locator')
self.thumb_tab_locator = self.locator_manager.get_locator('BrowserControl', 'thumb_tab_locator')
self.detail_tab_locator = self.locator_manager.get_locator('BrowserControl', 'detail_tab_locator')
self.upload_tab_locator = self.locator_manager.get_locator('BrowserControl', 'upload_tab_locator')
self.save_button_locator = self.locator_manager.get_locator('BrowserControl', 'save_button_locator')
self.group_dropdown_locator = self.locator_manager.get_locator('BrowserControl', 'group_dropdown_locator')
self.group_index_template = self.locator_manager.get_locator('BrowserControl', 'group_index_template')
self.dropdown_openstatus_locator = self.locator_manager.get_locator('BrowserControl', 'dropdown_openstatus_locator')
self.selected_group_name_locator = self.locator_manager.get_locator('BrowserControl', 'selected_group_name_locator')
self.product_edit_buttons = self.locator_manager.get_locator('BrowserControl', 'product_edit_buttons')
self.product_memo_buttons = self.locator_manager.get_locator('BrowserControl', 'product_memo_buttons')
self.memo_input_locator = self.locator_manager.get_locator('BrowserControl', 'memo_input_locator')
self.memo_save_btn_locator = self.locator_manager.get_locator('BrowserControl', 'memo_save_btn_locator')
# self.text_templates = self.locator_manager.selectors.get('DetailPageTextTemplates', {})
# # 스레드 종료 시 close_whale_window_if_exists 호출
# self.finished.connect(self.cleanup)
def get_page(self):
return self.page
def get_parsing_page(self):
return self.parsing_page
def get_whale(self):
return self.whale_translator
# async def monitor_browser_logs(self, page):
# """브라우저 콘솔 로그를 Python으로 캡처하여 출력"""
# page.on("console", lambda msg: self.logger.info(f"JS 콘솔 로그: {msg.type}: {msg.text}"))
# async def monitor_browser_logs(self, page):
# """브라우저 콘솔 로그를 Python으로 캡처하여 출력"""
# level_map = {
# "log": logging.INFO,
# "info": logging.INFO,
# "warning": logging.WARNING,
# "error": logging.ERROR,
# "debug": logging.DEBUG,
# }
# page.on(
# "console",
# lambda msg: self.logger.log(
# f"JS 콘솔 로그: {msg.type}: {msg.text}",
# level=level_map.get(msg.type, logging.INFO)
# )
# )
def get_base_dir(self):
"""
실행 환경에 따라 base_dir을 설정하는 메서드.
cx_Freeze로 패키징된 경우 실행 파일의 경로, 일반 Python 환경일 경우 __file__을 기준으로 설정.
"""
if getattr(sys, 'frozen', False): # 패키징된 경우
base_dir = os.path.dirname(sys.executable)
internal_dir = os.path.join(base_dir, 'lib') # lib 디렉토리 포함
if os.path.exists(internal_dir): # lib 디렉토리가 존재하면 base_dir로 설정
return internal_dir
else: # 일반 Python 실행 환경
base_dir = os.path.dirname(os.path.abspath(__file__))
return base_dir
async def start_browser_async(self):
"""비동기 Playwright 초기화 및 로그인 수행"""
try:
self.logger.log('크롬 브라우저를 실행합니다...', level=logging.DEBUG)
# WhaleTranslator 필요 여부 확인 및 초기화
optionIMGTrans_status = self.toggle_states.get('optionIMGTrans', False)
detail_IMGTrans_status = self.toggle_states.get('detail_IMGTrans', False)
thumb_status = self.toggle_states.get('thumb', False)
debug_mode = self.toggle_states.get('debug_mode', False)
self.logger.log(f"optionIMGTrans_status: {optionIMGTrans_status}, detail_IMGTrans_status: {detail_IMGTrans_status}, thumb_status: {thumb_status}", level=logging.DEBUG)
# if optionIMGTrans_status or detail_IMGTrans_status or thumb_status:
# self.logger.log('이미지번역을 위해 trans_browser를 로드합니다...', level=logging.DEBUG)
# self.whale_translator = WhaleTranslator(self.logger)
# self.whale_translator.start_whale_browser()
# Playwright 시작 및 브라우저 실행
self.playwright = await async_playwright().start()
base_path = self.get_base_dir()
self.logger.log(f"base_path: {base_path}", level=logging.DEBUG)
browser_path = os.path.join(base_path, 'src', 'browsers', 'chromium-1140', 'chrome-win','chrome.exe')
extension_path = os.path.join(base_path, 'src', 'browsers', 'extensions', '1.1.100_0')
user_data_dir = os.path.join(base_path, 'src', 'browsers', 'user_data')
self.logger.log(f"브라우저 경로: {browser_path}", level=logging.DEBUG)
self.logger.log(f"확장 프로그램 경로: {extension_path}", level=logging.DEBUG)
self.logger.log(f"사용자 폴더 경로: {user_data_dir}", level=logging.DEBUG)
self.video_dir = os.path.join(base_path, "recorded_videos")
if not os.path.exists(self.video_dir):
os.makedirs(self.video_dir)
self.logger.log(f"동영상 저장 폴더 생성됨: {self.video_dir}", level=logging.DEBUG)
if not os.path.exists(browser_path):
self.logger.log(f"브라우저 실행 파일이 없습니다: {browser_path}", level=logging.DEBUG)
raise FileNotFoundError(f"브라우저 실행 파일이 없습니다: {browser_path}")
# 사용자 데이터 디렉토리가 존재하지 않으면 생성
if not os.path.exists(user_data_dir):
os.makedirs(user_data_dir)
self.logger.log(f"{user_data_dir} 디렉토리가 생성되었습니다.", level=logging.DEBUG)
# User agent 설정
import random
user_agent = random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
])
self.logger.log(f"user_agent: {user_agent}", level=logging.DEBUG)
# 브라우저 시작 및 설정
self.browser = await self.playwright.chromium.launch_persistent_context(
user_data_dir,
headless=not debug_mode,
permissions=["geolocation", "notifications"],
geolocation={"latitude": 37.5665, "longitude": 126.9780},
locale="ko-KR",
args=[
'--disable-popup-blocking',
f'--disable-extensions-except={extension_path}',
f'--load-extension={extension_path}',
'--start-maximized',
'--window-size=1920,1080'
],
executable_path=browser_path,
user_agent=user_agent,
# record_video_dir=self.video_dir, # 동영상 저장 폴더 지정
# record_video_size={"width": 1280, "height": 720}
)
# 기본 페이지가 없을 수 있으므로 새로운 페이지 생성
self.page = await self.browser.new_page()
self.logger.log('새 페이지 로딩 중...', level=logging.INFO)
# await self.monitor_browser_logs(self.page)
# self.logger.log('자바스크립트의 로그를 모니터링하기 위해', level=logging.INFO)
await self.page.goto('https://percenty.co.kr/signin')
self.logger.log('percenty.co.kr/signin 로딩 완료', level=logging.INFO)
# 첫 번째 기본 탭 닫기
if self.browser.pages:
await self.browser.pages[0].close()
# 페이지 제목을 가져와서 창 제목으로 활용
page_title = await self.page.title()
self.logger.log(f'페이지 제목: {page_title}', level=logging.DEBUG)
# # 창 핸들 찾기 (동적으로 얻은 페이지 제목 사용)
# self.chrome_hwnd = self.find_window_by_title(page_title)
# if not self.chrome_hwnd:
# self.logger.log('크롬 창을 찾을 수 없습니다.', level=logging.WARNING)
# else:
# self.logger.log(f'크롬 창 핸들: {self.chrome_hwnd}', level=logging.DEBUG)
await self.login()
await self.close_ad_if_exists()
id_ed_mode = self.toggle_states.get('ed_mode', False)
if id_ed_mode:
await self.go_to_registered_product_page()
self.logger.log('등록 상품 관리 페이지로 이동 중...', level=logging.INFO)
else:
self.logger.log('신규 상품 등록 페이지로 이동 중...', level=logging.INFO)
await self.go_to_new_product_page()
group_index = self.toggle_states['group_index']
self.logger.log('선택한 그룹 인덱스로 이동', level=logging.INFO)
if group_index:
await self.select_group_index(group_index=group_index)
# 파싱용 브라우저 인스턴스를 별도로 실행 (headless 모드, 실제 브라우저처럼 보이도록 옵션 추가, 시크릿 모드 포함)
self.parsing_browser = await self.playwright.chromium.launch(
executable_path=browser_path, # 추가: self.browser와 같은 경로
headless=False,
args=[
'--disable-blink-features=AutomationControlled', # 자동화 탐지 우회
'--no-sandbox',
'--disable-infobars',
'--disable-dev-shm-usage',
'--disable-gpu',
'--start-minimized',
'--incognito', # 시크릿 모드 활성화
'--window-position=-32000,-32000',
]
)
# 파싱용 컨텍스트는 기본적으로 시크릿 모드 환경이며, 추가 설정으로 실제 브라우저와 유사한 점수를 줄 수 있음.
import random
self.parsing_context = await self.parsing_browser.new_context(
locale="ko-KR",
user_agent=random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
# 다른 UA들...
]),
viewport={"width": 1280, "height": 720},
extra_http_headers={
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7"
}
)
# 파싱용 페이지 생성
self.parsing_page = await self.parsing_context.new_page()
# 각 핸들러에 초기화된 page 객체 전달.
self.titleGenerator.update_page(self.page)
self.titleGenerator.update_parsing_page(self.parsing_page)
self.optionHandler.update_page(self.page)
self.tagsHandler.update_page(self.page)
self.thumbnailHandler.update_page(self.page)
self.priceHandler.update_page(self.page)
self.optionHandler.update_whale()
self.thumbnailHandler.update_whale()
# 신호 전송송
self.percentyJob_button_Enable.emit(True)
self.browser_started.emit() # 브라우저 시작 신호
except Exception as e:
self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR, exc_info=True)
self.browser_error.emit(str(e))
# async def start_browser(self):
# """크롬 브라우저 실행 및 페이지 로딩"""
# self.logger.log('크롬 브라우저 실행 중...', level=logging.DEBUG)
# try:
# # Playwright를 수동으로 실행하여 브라우저 유지
# self.playwright = await async_playwright().start()
# # cx_Freeze로 패키징된 경우와 일반 Python 실행 환경 구분하여 경로 설정
# if getattr(sys, 'frozen', False):
# browser_path = os.path.join(os.path.dirname(sys.executable), 'browsers', 'chromium-1112', 'chrome-win','chrome.exe')
# extension_path = os.path.join(os.path.dirname(sys.executable), 'browsers', 'extensions', '1.1.100_0')
# user_data_dir = os.path.join(os.path.dirname(sys.executable), 'browsers', 'user_data')
# else:
# browser_path = os.path.join(os.path.dirname(__file__), 'browsers', 'chromium-1112', 'chrome-win','chrome.exe')
# extension_path = os.path.join(os.path.dirname(__file__), 'browsers', 'extensions', '1.1.100_0')
# user_data_dir = os.path.join(os.path.dirname(__file__), 'browsers', 'user_data')
# self.logger.log(f"브라우저 경로: {browser_path}", level=logging.DEBUG)
# self.logger.log(f"확장 프로그램 경로: {extension_path}", level=logging.DEBUG)
# self.logger.log(f"사용자 폴더 경로: {user_data_dir}", level=logging.DEBUG)
# # 사용자 데이터 디렉토리가 존재하지 않으면 생성
# if not os.path.exists(user_data_dir):
# os.makedirs(user_data_dir)
# self.logger.log(f"{user_data_dir} 디렉토리가 생성되었습니다.", level=logging.DEBUG)
# # User agent 설정
# user_agent = random.choice([
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
# "Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
# ])
# self.logger.log(f"user_agent: {user_agent}", level=logging.DEBUG)
# # 브라우저 시작 및 설정
# self.browser = await self.playwright.chromium.launch_persistent_context(
# user_data_dir,
# headless=False,
# permissions=["geolocation", "notifications"],
# geolocation={"latitude": 37.5665, "longitude": 126.9780},
# locale="ko-KR",
# args=[
# '--disable-popup-blocking',
# f'--disable-extensions-except={extension_path}',
# f'--load-extension={extension_path}',
# '--start-maximized',
# '--window-size=1920,1080'
# ],
# executable_path=browser_path,
# user_agent=user_agent
# )
# # 기본 페이지가 없을 수 있으므로 새로운 페이지 생성
# self.page = await self.browser.new_page()
# self.logger.log('새 페이지 로딩 중...', level=logging.INFO)
# await self.page.goto('https://percenty.co.kr/signin')
# self.logger.log('percenty.co.kr/signin 로딩 완료', level=logging.INFO)
# # 첫 번째 기본 탭 닫기
# if self.browser.pages:
# await self.browser.pages[0].close()
# # 페이지 제목을 가져와서 창 제목으로 활용
# page_title = await self.page.title()
# self.logger.log(f'페이지 제목: {page_title}', level=logging.DEBUG)
# # 창 핸들 찾기 (동적으로 얻은 페이지 제목 사용)
# self.chrome_hwnd = self.find_window_by_title(page_title)
# if not self.chrome_hwnd:
# self.logger.log('크롬 창을 찾을 수 없습니다.', level=logging.WARNING)
# else:
# self.logger.log(f'크롬 창 핸들: {self.chrome_hwnd}', level=logging.DEBUG)
# await self.login()
# await self.close_ad_if_exists()
# if self.toggle_states['ed_mode']:
# await self.go_to_registered_product_page()
# self.logger.log('등록 상품 관리 페이지로 이동 중...', level=logging.INFO)
# else:
# self.logger.log('신규 상품 등록 페이지로 이동 중...', level=logging.INFO)
# await self.go_to_new_product_page()
# self.browser_started.emit() # 브라우저 시작 신호
# except Exception as e:
# self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR)
# self.browser_error.emit(str(e))
async def login(self):
"""로그인 처리"""
is_admin = self.login_infos['is_admin']
self.logger.log(f'로그인 시도 중: {"관리자" if is_admin else "직원"} 계정', level=logging.INFO)
if is_admin:
# 관리자 로그인 처리
await self.page.fill(self.login_email_locator, self.login_infos['admin_id'])
await self.page.fill(self.login_password_locator, self.login_infos['admin_pw'])
await self.page.click(self.login_button_locator)
else:
# 관리자 토글 버튼을 클릭해서 직원 로그인 화면 활성화
admin_toggle = self.page.locator(self.admin_toggle_locator)
if await admin_toggle.get_attribute("aria-checked") == "true":
await admin_toggle.click() # 관리자 모드에서 직원 모드로 전환
await self.page.fill(self.login_email_locator, self.login_infos['admin_id'])
await self.page.fill(self.staff_id_locator, self.login_infos['user_id'])
await self.page.fill(self.login_password_locator, self.login_infos['user_pw'])
await self.page.click(self.staff_login_button_locator)
self.logger.log(f'로그인 완료: {"관리자" if is_admin else "직원"} 계정', level=logging.INFO)
# await self.page.wait_for_load_state('networkidle', timeout=10000)
async def close_browser(self):
"""브라우저 종료"""
if self.browser:
await self.browser.close()
await self.playwright.stop()
self.cleanup()
self.logger.log('브라우저 종료됨.', level=logging.INFO)
# def find_window_by_title(self, window_name):
# """창 제목을 통해 핸들을 찾는 메서드"""
# def enum_windows_callback(hwnd, result):
# if win32gui.IsWindowVisible(hwnd) and window_name in win32gui.GetWindowText(hwnd):
# result.append(hwnd)
# result = []
# win32gui.EnumWindows(enum_windows_callback, result)
# return result[0] if result else None
# def switch_to_chrome(self):
# """크롬으로 포커스 전환"""
# if self.chrome_hwnd:
# win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE)
# win32gui.SetForegroundWindow(self.chrome_hwnd)
# self.logger.log('크롬 창으로 포커스 이동.', level=logging.DEBUG)
# else:
# self.logger.log('크롬 창을 찾을 수 없습니다.', level=logging.ERROR)
async def get_total_product_count(self):
total_count = 0
items_per_page = 0
try:
# total_count_elements = await self.page.query_selector_all(".sc-dOvA-dm.jqRNYf")
total_count_element = await self.page.query_selector(self.total_product_count_locator)
items_per_page_element = await self.page.query_selector(self.items_per_page_locator)
self.logger.log(f"total_count_element : {total_count_element}", level=logging.DEBUG)
if total_count_element:
total_count_text = await total_count_element.inner_text()
if "" in total_count_text and "개 상품" in total_count_text:
total_count = int(''.join(re.findall(r'\d+', total_count_text)))
self.logger.log(f"총 상품수 확인: {total_count}", level=logging.INFO)
# 페이지당 상품 수 추출
if items_per_page_element:
items_per_page_text = await items_per_page_element.get_attribute("title")
if items_per_page_text and "개씩 보기" in items_per_page_text:
items_per_page = int(''.join(re.findall(r'\d+', items_per_page_text)))
self.logger.log(f"페이지당 상품수 확인: {items_per_page} 개씩 보기", level=logging.INFO)
# 결과 반환
if total_count:
return {"total_count": total_count, "items_per_page": items_per_page}
return {"total_count": 0, "items_per_page": 0}
except Exception as e:
self.logger.log(f"상품 수를 가져오는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return {"total_count": 0, "items_per_page": 0}
def fetch_image_urls(self, html_content):
"""
HTML 콘텐츠에서 모든 <img> 태그의 URL을 추출하는 함수.
<figure> 안의 <img> 태그와 독립된 <img> 태그 모두 처리.
"""
soup = BeautifulSoup(html_content, 'html.parser')
# 모든 <img> 태그를 찾기
image_urls = []
img_tags = soup.find_all('img')
for img in img_tags:
# img 태그에서 src 속성 추출
if 'src' in img.attrs:
image_url = img['src']
image_urls.append(image_url)
self.logger.log(f"fetch_image_urls 에서 추출한 이미지URL 갯수 : {len(image_urls)}", level=logging.DEBUG)
self.logger.log(f"fetch_image_urls 에서 추출한 이미지URL 목록 : {image_urls}", level=logging.DEBUG)
return image_urls
async def close_ad_if_exists(self):
"""광고 다이얼로그가 있으면 닫기 버튼을 클릭하는 메서드"""
try:
# 광고 다이얼로그가 나타날 때까지 기다림
await self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=3000, state='visible')
self.logger.log("다이얼로그가 발견되었습니다. 닫기 버튼을 클릭합니다.", level=logging.INFO)
# 닫기 버튼 클릭
close_button = await self.page.query_selector(self.close_ad_button_locator)
if close_button:
await close_button.click()
self.logger.log("다이얼로그를 성공적으로 닫았습니다.", level=logging.INFO)
else:
self.logger.log("닫기 버튼을 찾지 못했습니다.", level=logging.WARNING)
except TimeoutError:
# 다이얼로그가 없을 때: info 수준의 로그로 기록
self.logger.log("다이얼로그가 발견되지 않았습니다. 타임아웃이 발생했습니다.", level=logging.INFO)
except Exception as e:
# 다른 예외 상황 발생 시 error로 기록
self.logger.log(f"다이얼로그 닫기 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
async def go_to_new_product_page(self):
"""신규 상품 등록 페이지로 이동"""
try:
new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
await self.page.click(new_product_page_locator)
self.logger.log("신규 상품 등록 페이지로 이동 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"신규 상품 등록 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def go_to_registered_product_page(self):
"""신규 상품 등록 페이지로 이동"""
try:
registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
await self.page.click(registered_product_page_locator)
self.logger.log("등록 상품 관리 페이지로 이동 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"등록 상품 관리 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def is_button_disabled(self, button):
"""버튼이 disabled 상태인지 확인"""
try:
# 버튼의 disabled 속성 확인
is_disabled = await button.get_attribute('disabled')
return is_disabled is not None # disabled 속성이 있으면 True 반환
except Exception as e:
self.logger.log(f"상품 수정 버튼 상태 확인 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return False # 오류 발생 시 기본적으로 활성화된 것으로 처리
async def select_group_index(self, group_index: int):
"""그룹 드롭다운 열고 옵션 선택"""
try:
self.logger.log(f"group_index : {group_index}", level=logging.INFO)
await self.page.evaluate("""
const targetElement = Array.from(document.querySelectorAll('span')).find(el => el.textContent.trim() === '수집 상품 목록');
if (targetElement) {
targetElement.scrollIntoView({ behavior: 'smooth', block: 'center' });
}
""")
self.logger.log(f"그룹박스 요소로 스크롤", level=logging.INFO)
await asyncio.sleep(1)
# await self.page.wait_for_load_state("networkidle")
group_option_locator = self.group_index_template.format(index=group_index)
# 드롭다운 열기
await self.page.wait_for_selector(self.group_dropdown_locator, timeout=3000, state='visible')
await self.page.click(self.group_dropdown_locator, timeout=3000, force=True)
self.logger.log("드롭다운을 성공적으로 클릭했습니다.", level=logging.INFO)
# 드롭다운 열림 상태 확인
await self.page.wait_for_selector(self.dropdown_openstatus_locator, timeout=3000)
self.logger.log("드롭다운이 열렸습니다.", level=logging.INFO)
# 옵션 선택
# await self.page.wait_for_selector(group_option_locator, timeout=3000)
await self.page.click(group_option_locator, timeout=3000)
self.logger.log(f"[{group_index}]번 그룹 선택 완료", level=logging.INFO)
selected_group_name = await self.page.inner_text(self.selected_group_name_locator)
self.logger.log(f"선택된 그룹 이름 : [{selected_group_name}]", level=logging.INFO)
# 선택된 그룹 이름을 시그널로 전달
self.selected_group_name_signal.emit(selected_group_name)
except TimeoutError:
self.logger.log("드롭다운 또는 옵션 선택 중 타임아웃이 발생했습니다.", level=logging.WARNING)
except Exception as e:
self.logger.log(f"그룹 선택 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
# async def get_selected_group_name(self):
# """선택된 그룹 이름 가져오기"""
# try:
# selected_group_name_locator = self.locator_manager.get_locator('BrowserControl', 'selected_group_name_locator')
# await self.page.wait_for_selector(selected_group_name_locator, timeout=3000, state='visible')
# selected_group_name = await self.page.inner_text(selected_group_name_locator)
# self.logger.log(f"선택된 그룹 이름: {selected_group_name}", level=logging.INFO)
# return selected_group_name
# except TimeoutError:
# self.logger.log("선택된 그룹 이름을 가져오는 중 타임아웃이 발생했습니다.", level=logging.WARNING)
# return None
# except Exception as e:
# self.logger.log(f"그룹 이름 가져오기 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
# return None
async def get_product_edit_buttons_by_templete(self):
"""현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기"""
# 버튼 선택자 설정
# buttons = self.page.locator(self.product_edit_button_template)
# memos = self.page.locator(self.product_memo_button_template)
"""
현재 페이지의 세부사항 수정 및 업로드 버튼과 메모 버튼을 매칭하여 반환.
각 상품에 대해 해외배송비 수정 버튼과 메모 버튼을 추출합니다.
"""
try:
# 수정 버튼 선택자 설정
# edit_buttons = self.page.locator('//button[span[text()="세부사항 수정 및 업로드"]]')
# memo_buttons = self.page.locator('button.ant-btn.css-1li46mu.ant-btn-default.ant-btn-icon-only')
edit_buttons = self.page.locator(self.product_edit_buttons)
memo_buttons = self.page.locator(self.product_memo_buttons)
# 수정 버튼 개수 확인
edit_button_count = await edit_buttons.count()
if edit_button_count == 0:
self.logger.log("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.", level=logging.WARNING)
return []
# 메모 버튼 개수 확인
memo_button_count = await memo_buttons.count()
if memo_button_count <= 2: # 첫 번째 버튼과 최소 상품 메모 버튼 제외
self.logger.log("메모 버튼이 유효하지 않습니다. 최소 3개 이상의 버튼에서 메모버튼이 유효합니다.", level=logging.WARNING)
return []
# 첫 번째 버튼 제외하고 나머지 버튼을 처리
valid_memo_buttons = [memo_buttons.nth(i) for i in range(1, memo_button_count)]
# 상품별 해외배송비 수정 버튼과 메모 버튼 매칭
product_buttons = []
index = 0 # valid_memo_buttons에서의 현재 인덱스
for i in range(edit_button_count):
if index + 1 >= len(valid_memo_buttons):
self.logger.log("메모 버튼의 개수가 부족합니다. 매칭에 실패했습니다.", level=logging.WARNING)
break
# 상품별 버튼 매칭
product_buttons.append({
"edit_button": edit_buttons.nth(i),
"shipping_button": valid_memo_buttons[index], # 해외배송비 수정 버튼
"memo_button": valid_memo_buttons[index + 1] # 메모 버튼
})
index += 2 # 한 상품에 대해 두 개의 버튼(해외배송비 + 메모 버튼)을 소모
# 최종 결과 로그
self.logger.log(f"현재 페이지의 수정할 상품 개수: {len(product_buttons)}", level=logging.INFO)
return product_buttons
except Exception as e:
self.logger.log(f"상품 수정 버튼을 찾는 중 오류: {e}", level=logging.ERROR, exc_info=True)
return []
async def open_product_edit_dialog(self, button):
"""상품 수정 다이얼로그 열기"""
try:
# 요소가 화면에 없을 경우 스크롤하여 보이도록 함
await button.scroll_into_view_if_needed()
self.logger.log("상품의 '세부사항 수정 및 업로드' 버튼을 화면에 보이도록 스크롤.", level=logging.DEBUG)
await button.click()
self.logger.log("세부사항 수정 다이얼로그 열기 완료.", level=logging.INFO)
# await self.page.wait_for_selector('div.ant-tabs-nav') # 다이얼로그가 완전히 로딩될 때까지 기다림
await self.page.wait_for_selector(self.product_edit_dialog_open_locator) # 다이얼로그가 완전히 로딩될 때까지 기다림
except Exception as e:
self.logger.log(f"세부사항 수정 다이얼로그 열기 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def click_detail_tab(self):
"""상세페이지 탭 클릭"""
try:
await self.page.click(self.detail_tab_locator)
self.logger.log("상세페이지 탭 클릭 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"상세페이지 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def click_option_tab(self):
"""옵션 탭 클릭"""
try:
await self.page.click(self.option_tab_locator)
self.logger.log("옵션 탭 클릭 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"옵션 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def click_price_tab(self):
"""가격 탭 클릭"""
try:
await self.page.click(self.price_tab_locator)
self.logger.log("가격 탭 클릭 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"가격 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def click_thumb_tab(self):
"""썸네일 탭 클릭"""
try:
await self.page.click(self.thumb_tab_locator)
self.logger.log("썸네일 탭 클릭 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"썸네일 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def click_tags_tab(self):
"""키워드 태그 탭 클릭"""
try:
await self.page.click(self.tag_tab_locator)
self.logger.log("키워드 태그 탭 클릭 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"키워드 태그 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def click_title_tab(self):
"""상품명 탭 클릭"""
try:
await self.page.click(self.title_tab_locator)
self.logger.log("상품명 탭 클릭 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"상품명 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
def generate_restored_html(self, urls):
"""이미지 URL 목록을 HTML 형식으로 변환하는 메서드"""
html_content = '<p>&nbsp;</p>'
for url in urls:
html_content += f'<figure class="image"><img src="{url}" style="aspect-ratio:1/1;"></figure>\n'
return html_content
def deleted_img_urls_from_logs(self):
"""로그 파일에서 상품명과 이미지 URL 목록을 추출하여 딕셔너리로 반환하는 메서드"""
image_data = {}
log_dir = os.path.join(os.path.dirname(__file__), "recovery_log")
# 로그 파일에서 필요한 정보만 추출
for log_file in self.log_files:
log_path = os.path.join(log_dir, log_file)
if os.path.exists(log_path):
with open(log_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
current_product = None
for line in lines:
# 상품명 추출
product_match = re.search(r"원본 상품명 '(.+?)'", line)
if product_match:
current_product = product_match.group(1)
image_data[current_product] = []
# 이미지 URL 목록 추출
url_match = re.search(r"fetch_image_urls 에서 추출한 이미지URL 목록 : \[(.+?)\]", line)
if url_match and current_product:
# 각 URL에서 불필요한 작은따옴표 제거
urls = [url.strip("'\"") for url in url_match.group(1).split(", ")]
image_data[current_product].extend(urls)
current_product = None # Reset after each product's URL extraction
self.logger.log(f"복구된 이미지 URL 데이터: {len(image_data)}", level=logging.DEBUG)
return image_data
async def recovery_image_urls(self, product_name, deleted_img_urls):
"""상품명과 삭제된 이미지 URL 데이터를 이용해 복구 작업을 수행하는 메서드"""
self.logger.log("상품명과 삭제된 이미지 URL 데이터를 이용해 복구 작업을 수행하는 메서드", level=logging.DEBUG)
if product_name in deleted_img_urls:
self.logger.log(f'recovery_image_urls: 삭제된 상품명 찾음 : {product_name}', level=logging.INFO)
# 소스 편집 모드로 전환
source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
await self.page.click(source_button_locator)
self.logger.log("recovery_image_urls : 소스 버튼 클릭 완료.", level=logging.DEBUG)
# 기존 extract_image_urls와 유사하게 HTML 소스를 가져옴
textarea = await self.page.wait_for_selector(ck_source_editing_area_locator, timeout=5000)
data_value = await textarea.get_attribute("data-value")
# HTML 소스에서 이미지 URL 추출
image_urls = self.fetch_image_urls(data_value)
self.logger.log(f'recovery_image_urls: 현재페이지의 HTML에서 추출된 이미지 URL 수: {len(image_urls)}', level=logging.INFO)
# 이미지 태그가 없으면 로그에서 추출한 데이터를 HTML로 복원하여 입력
if len(image_urls) == 0:
restored_html = self.generate_restored_html(deleted_img_urls[product_name])
await self.page.evaluate(f'document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", `{restored_html}`)')
self.logger.log("recovery_image_urls : 현재 페이지의 HTML의 이미지가 0이므로, 로그 데이터를 이용하여 HTML 복원 완료.", level=logging.DEBUG)
else:
self.logger.log("이미 이미지가 있으므로 복원 작업을 패스합니다.", level=logging.DEBUG)
# 소스 편집 모드 종료
await self.page.click(source_button_locator)
self.logger.log('소스 버튼 재 클릭 완료.', level=logging.DEBUG)
else:
self.logger.log(f"로그에 해당 상품명 '{product_name}'에 대한 데이터가 존재하지 않습니다.", level=logging.DEBUG)
def generate_restored_html(self, urls):
"""이미지 URL 목록을 HTML 형식으로 변환하는 메서드, 각 이미지의 가로세로 비율 추가"""
html_content = '<p>&nbsp;</p>\n'
for url in urls:
width, height = self.get_image_size(url)
aspect_ratio = f"{width}/{height}" if width and height else "1/1"
if width and height:
html_content += (
f'<figure class="image">'
f'<img style="aspect-ratio:{aspect_ratio};" src="{url}" width="{width}" height="{height}">'
f'</figure>\n'
)
else:
# 이미지 크기를 확인할 수 없을 경우 기본 형식으로 추가
html_content += f'<figure class="image"><img src="{url}"></figure>\n'
return html_content
def get_image_size(self, url):
"""이미지 URL로부터 가로와 세로 크기를 가져오는 메서드"""
try:
# URL에서 불필요한 따옴표 제거
cleaned_url = url.strip("'\"")
response = requests.get(cleaned_url, timeout=5)
response.raise_for_status()
image = Image.open(BytesIO(response.content))
return image.width, image.height
except Exception as e:
self.logger.log(f"이미지 크기 확인 실패 - {cleaned_url}: {e}", level=logging.WARNING)
return None, None
# async def extract_image_urls(self, optionHandler, is_option_data=False):
# """상세페이지에서 이미지 URL 추출"""
# try:
# # 소스 편집 모드로 전환
# source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
# ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
# # 소스 편집 모드로 전환
# await self.page.click(source_button_locator)
# self.logger.log("소스 버튼 클릭 완료.", level=logging.DEBUG)
# # 'data-value' 속성 값을 추출 (textarea 요소)
# textarea = await self.page.wait_for_selector(ck_source_editing_area_locator, timeout=5000)
# data_value = await textarea.get_attribute("data-value")
# # HTML 소스에서 이미지 URL 추출
# image_urls = self.fetch_image_urls(data_value)
# self.logger.log(f'추출된 이미지 URL 수: {len(image_urls)}', level=logging.INFO)
# # HTML 소스에서 이미지 URL 삭제
# self.logger.log('img 태그를 삭제 중...', level=logging.DEBUG)
# data_value_element = await self.page.query_selector(ck_source_editing_area_locator)
# new_value = ""
# if data_value_element:
# await self.page.evaluate(f'() => document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", "{new_value}")')
# updated_value = await data_value_element.get_attribute('data-value')
# self.logger.log(f'Updated data-value: {updated_value}', level=logging.DEBUG)
# else:
# self.logger.log('Element with data-value not found.', level=logging.DEBUG)
# self.logger.log('img 태그 삭제 완료.', level=logging.DEBUG)
# # img 태그의 class 삭제 후 다시 소스 버튼 클릭
# await self.page.click(source_button_locator)
# self.logger.log('소스 버튼 재 클릭 완료.', level=logging.DEBUG)
# # 텍스트 입력 필드 선택
# input_field = await self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
# await input_field.press('Enter')
# # # 선두부 텍스트 입력
# # for key in sorted(self.text_templates.keys()):
# # leading_text = self.text_templates[key]
# # if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
# # await input_field.type(leading_text)
# # await input_field.press('Enter')
# # self.logger.log(f"{key} 텍스트 입력 완료: {leading_text}", level=logging.INFO)
# # 선두부 텍스트 입력 (self.detail_text_widget의 get_lines 메서드 사용)
# for leading_text in self.detail_text_widget.get_lines():
# if leading_text: # 내용이 있는 경우에만
# await input_field.type(leading_text)
# await input_field.press('Enter')
# self.logger.log(f"텍스트 입력 완료: {leading_text}", level=logging.INFO)
# option_data = optionHandler.get_selected_translated_options()
# self.logger.log(f'option_data : {option_data}', level=logging.DEBUG)
# # if is_option_data:
# if option_data or option_data != {}:
# self.logger.log('옵션 데이터 입력 시작', level=logging.DEBUG)
# # option_data = {} # option_data 초기화
# is_single = optionHandler.option_info['is_single_option']
# # option_data = optionHandler.get_selected_translated_options()
# # is_single = True # 옵션입력 일단 제외
# # self.logger.log('옵션입력 일단 제외', level=logging.DEBUG)
# self.logger.log('가져온 옵션 데이터', level=logging.DEBUG)
# self.logger.log(f'{option_data}', level=logging.DEBUG)
# # # 옵션 입력 필드 선택
# # input_field = await self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
# # await input_field.press('Enter')
# # # 선두부 텍스트 입력
# # for key in sorted(self.text_templates.keys()):
# # leading_text = self.text_templates[key]
# # if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
# # await input_field.type(leading_text)
# # await input_field.press('Enter')
# # self.logger.log(f"{key} 텍스트 입력 완료: {leading_text}", level=logging.INFO)
# if not is_single:
# self.logger.log('단일옵션이 아니므로 옵션목록을 입력', level=logging.INFO)
# # 각 옵션을 한 줄씩 입력
# await input_field.type("# 옵션 목록")
# await input_field.press('Enter')
# # 첫 번째 옵션의 번역된 옵션명만 입력
# first_key = list(option_data.keys())[0] # key는 옵션이름 value는 가격
# first_value = option_data[first_key] # key는 옵션이름 value는 가격
# await input_field.type(f"- 1. {first_key}")
# await input_field.press('Enter') # 첫 번째 옵션 이후 엔터로 줄바꿈
# # 나머지 옵션도 번역된 옵션명만 입력
# for i, (key, value) in enumerate(list(option_data.items())[1:], start=2):
# await input_field.type(f"{key}") # 2번옵션부터 번호 떼고 입력. key는 옵션이름 value는 가격
# await input_field.press('Enter') # 엔터 키를 입력하여 줄바꿈
# # 목록 끝을 알리기 위해 엔터 두 번 입력
# await input_field.press('Enter')
# await input_field.press('Enter')
# # 후두부 텍스트 입력
# await input_field.type('### 나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.')
# await input_field.press('Enter')
# await input_field.type('---')
# await input_field.press('Enter')
# self.logger.log('옵션 데이터 입력 완료', level=logging.INFO)
# return image_urls
# except Exception as e:
# self.logger.log(f"이미지 URL 추출 & 옵션데이터 입력 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
# return image_urls if image_urls else []
# def paste_image_in_chrome(self, url, is_success_translated, toggle_states, is_watermark=False, watermark_text= ""):
# """크롬으로 포커스를 옮기고 클립보드의 이미지를 붙여넣고 엔터 입력"""
# self.logger.log("크롬으로 포커스를 옮기고 클립보드의 이미지를 붙여넣고 엔터 입력", level=logging.DEBUG)
# try:
# # self.switch_to_chrome() # 크롬으로 포커스 이동
# self.clipboardImageManager.process_clipboard(original_url=url, is_success_translated=is_success_translated, toggle_states=toggle_states) # 클립보드 내용을 처리
# # clipboard_content = pyperclip.paste()
# if self.clipboardImageManager.is_clipboard_image():
# pyautogui.hotkey('ctrl', 'v') # 클립보드 이미지 붙여넣기
# self.logger.log("이미지 붙여넣기 완료.", level=logging.INFO)
# pyautogui.press('right') # 오른쪽 입력
# self.logger.log("이미지 붙여넣기 완료.", level=logging.DEBUG)
# self.clipboardImageManager.clear_clipboard()
# self.logger.log("이미지 붙여넣기 완료로 클립보드 비우기.", level=logging.INFO)
# return True
# else:
# self.logger.log("클립보드가 비어있습니다.", level=logging.WARNING)
# return False
# except Exception as e:
# self.logger.log(f"이미지 붙여넣기 중 오류: {e}", level=logging.ERROR, exc_info=True)
# return False
async def save_and_ecs_product_edit(self):
"""상품 수정 후 저장 버튼 클릭"""
try:
await self.page.click(self.save_button_locator)
await self.page.keyboard.press("Escape")
self.logger.log("상품 수정 내용 저장 및 ECS 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"저장 버튼 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def save_product_edit(self):
"""상품 수정 후 저장 버튼 클릭"""
try:
await self.page.click(self.save_button_locator)
self.logger.log("상품 수정 내용 저장 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"저장 버튼 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def go_to_next_page(self):
"""다음 페이지로 이동"""
try:
# 현재 페이지가 몇 번째 페이지인지 확인 (클래스에 'ant-pagination-item-active'가 있는 요소)
current_page = await self.page.query_selector(self.current_page_locator)
if not current_page:
self.logger.log("현재 페이지 정보를 찾을 수 없습니다.", level=logging.WARNING)
return False
# 현재 활성화된 페이지 번호를 가져옴
current_page_number = int(await current_page.get_attribute("title"))
self.logger.log(f"현재페이지 : [{current_page_number}]", level=logging.INFO)
next_page_number = current_page_number + 1
# 다음 페이지 버튼을 찾음 (title 속성으로 다음 페이지를 찾음)
next_page_button_locator = self.next_page_button_template.format(page_number=next_page_number)
next_page_button = await self.page.query_selector(next_page_button_locator)
if next_page_button:
await next_page_button.click() # 페이지 버튼 클릭
# await self.page.wait_for_load_state('domcontentloaded') # 페이지 로딩이 완료될 때까지 대기
time.sleep(3)
self.logger.log(f"페이지 {next_page_number}로 이동 완료.", level=logging.INFO)
return True
else:
self.logger.log("다음 페이지가 없습니다.", level=logging.WARNING)
return False
except Exception as e:
self.logger.log(f"다음 페이지로 이동 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return False
def switch_to_chrome(self):
"""크롬으로 포커스 전환"""
try:
if not self.chrome_hwnd:
self.chrome_hwnd = self.find_window_by_title(self.chrome_window_name)
if self.chrome_hwnd:
win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(self.chrome_hwnd)
self.logger.log('크롬 창으로 포커스 이동.', level=logging.DEBUG)
else:
self.logger.log('크롬 창을 찾을 수 없습니다.', level=logging.WARNING)
except Exception as e:
self.logger.log(f"크롬 포커스 전환 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def scroll_with_wheel(self, direction="down", pause_time=0.5, max_scrolls=50):
"""
휠 스크롤을 사용하여 페이지를 위나 아래로 천천히 스크롤.
Parameters:
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
- pause_time: 스크롤 사이의 대기 시간 (초).
- max_scrolls: 최대 스크롤 횟수.
"""
scroll_count = 0
self.logger.log(f"스크롤 시작", level=logging.DEBUG)
# 현재 페이지 높이 가져오기
last_height = await self.page.evaluate("document.body.scrollHeight")
self.logger.log(f"현재 페이지 높이 가져오기 - {last_height}", level=logging.DEBUG)
while scroll_count < max_scrolls:
if direction == "down":
# 아래로 스크롤
self.logger.log(f"scroll_count[{scroll_count}]회 : 휠 아래로 1000px", level=logging.DEBUG)
await self.page.evaluate("window.scrollBy(0, 1000);")
elif direction == "up":
# 위로 스크롤
self.logger.log(f"scroll_count[{scroll_count}]회 : 휠 위로 1000px", level=logging.DEBUG)
await self.page.evaluate("window.scrollBy(0, -1000);")
else:
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
self.logger.log(f"pause_time 슬립 : {pause_time}", level=logging.DEBUG)
await asyncio.sleep(pause_time)
# 새로운 페이지 높이 가져오기
new_height = await self.page.evaluate("document.body.scrollHeight")
self.logger.log(f"새로운 페이지 높이 가져오기 - {new_height}", level=logging.DEBUG)
# 스크롤이 더 이상 필요 없는 경우(페이지 끝에 도달)
if direction == "down" and new_height == last_height:
self.logger.log(f"페이지 끝에 도달했습니다. 스크롤 횟수: {scroll_count}", level=logging.DEBUG)
break
elif direction == "up" and new_height == 0:
self.logger.log(f"페이지 시작에 도달했습니다. 스크롤 횟수: {scroll_count}", level=logging.DEBUG)
break
self.logger.log(f"새로운 페이지 높이를 현재높이로 재설정 - {new_height}", level=logging.DEBUG)
last_height = new_height
scroll_count += 1
self.logger.log(f"스크롤 카운트 + 1", level=logging.DEBUG)
if scroll_count == max_scrolls:
self.logger.log("최대 스크롤 횟수에 도달했습니다.", level=logging.DEBUG)
async def scroll_with_keyboard(self, direction="down", pause_time=0.5, max_scrolls=50):
"""
키보드를 사용하여 페이지를 위나 아래로 천천히 스크롤.
Parameters:
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
- pause_time: 스크롤 사이의 대기 시간 (초).
- max_scrolls: 최대 스크롤 횟수.
"""
scroll_count = 0
while scroll_count < max_scrolls:
if direction == "down":
# 아래로 스크롤 (Page Down 키 사용)
await self.page.keyboard.press("PageDown")
elif direction == "up":
# 위로 스크롤 (Page Up 키 사용)
await self.page.keyboard.press("PageUp")
else:
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
await asyncio.sleep(pause_time)
scroll_count += 1
if scroll_count == max_scrolls:
self.logger.log("최대 스크롤 횟수에 도달했습니다.", level=logging.DEBUG)
async def collect_product_info(self, items_per_page, ed_mode):
"""
상품 정보를 수집하는 메서드
"""
try:
product_infos = []
product_name_elements = [] # product_name_element를 저장할 리스트
# ed_mode에 따라 product_elements 설정
if ed_mode:
# 각 상품의 이름, 가격, 이미지를 위한 선택자 리스트 구성 (index가 2부터 시작)
product_elements = [
{
"name": self.product_name_for_ed_template.format(index=i),
"price": self.product_price_for_ed_template.format(index=i),
"image": self.product_image_for_ed_template.format(index=i)
}
for i in range(2, items_per_page + 2) # index가 2부터 시작하도록 설정
]
else:
# ed_mode=False일 때는 각 상품의 부모 요소를 모두 선택
product_elements = await self.page.query_selector_all(self.product_parent_locator)
for i, element in enumerate(product_elements[:items_per_page], start=1):
try:
if ed_mode:
# ed_mode=True일 때는 각 상품의 개별 선택자 사용
product_name_element = await self.page.wait_for_selector(element["name"], timeout=3000, state="attached")
product_price_element = await self.page.wait_for_selector(element["price"], timeout=3000, state="attached")
product_image_element = await self.page.wait_for_selector(element["image"], timeout=3000, state="attached")
else:
# ed_mode=False일 때 부모 요소 내의 선택자를 사용
product_name_element = await self.page.wait_for_selector(self.product_name_inner_locator, timeout=3000, state="attached")
product_price_element = await self.page.wait_for_selector(self.product_price_inner_locator, timeout=3000, state="attached")
product_image_element = await self.page.wait_for_selector(self.product_image_inner_locator, timeout=3000, state="attached")
# 요소가 존재하면 정보 추출
self.logger.log(f"product_name_element : {product_name_element}", level=logging.DEBUG)
self.logger.log(f"product_price_element : {product_price_element}", level=logging.DEBUG)
self.logger.log(f"product_image_element : {product_image_element}", level=logging.DEBUG)
if product_name_element and product_price_element and product_image_element:
# await의 결과를 각 변수에 저장
product_name_text = (await product_name_element.inner_text()).strip()
product_price_text = (await product_price_element.inner_text()).strip()
product_image_url = await product_image_element.get_attribute('src')
# product_info 딕셔너리에 결과 저장
product_info = {
"name": product_name_text,
"price": product_price_text,
"image_url": product_image_url
}
self.logger.log(f"상품 {i}: {product_info}", level=logging.DEBUG)
product_infos.append(product_info)
product_name_elements.append(product_name_element) # 각 product_name_element 추가
except Exception as e:
self.logger.log(f"상품 {i} 정보 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
continue
return product_infos, product_name_elements # product_infos와 product_name_elements 함께 반환
except Exception as e:
self.logger.log(f"상품 정보 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return []
async def scroll_page_to_bottom(self, pause_time=0.2):
"""페이지의 맨 아래까지 스크롤하여 모든 동적 요소를 로드"""
self.logger.log('페이지 스크롤 시작...', level=logging.INFO)
previous_height = await self.page.evaluate("() => document.body.scrollHeight")
while True:
await self.page.evaluate("window.scrollBy(0, window.innerHeight);") # 한 화면씩 스크롤
await asyncio.sleep(pause_time) # 페이지 로딩 대기
current_height = await self.page.evaluate("() => document.body.scrollHeight")
if current_height == previous_height:
break # 더 이상 스크롤할 내용이 없으면 종료
previous_height = current_height
self.logger.log('페이지 스크롤 완료.', level=logging.INFO)
async def scroll_page_to_top(self, pause_time=0.2):
"""페이지의 맨 위까지 스크롤"""
self.logger.log('페이지 위로 스크롤 시작...', level=logging.INFO)
previous_height = await self.page.evaluate("() => window.pageYOffset")
while previous_height > 0:
await self.page.evaluate("window.scrollBy(0, -window.innerHeight);") # 한 화면씩 위로 스크롤
await asyncio.sleep(pause_time) # 페이지 로딩 대기
current_height = await self.page.evaluate("() => window.pageYOffset")
if current_height == previous_height:
break # 더 이상 스크롤할 내용이 없으면 종료
previous_height = current_height
self.logger.log('페이지 위로 스크롤 완료.', level=logging.INFO)
async def start_Percenty_task(self):
self.logger.log('퍼센티 상품수정 작업을 시작합니다...', level=logging.DEBUG)
self.running = True # 번역 작업이 시작됨
self.translation_started.emit()
try:
# 금지어 목록 업데이트
self.forbidden_word_manager.refresh_forbidden_words()
self.whale_translator.start_trans_browser()
# 1. 총 상품 수 수집
await self.scroll_page_to_bottom() # 동적 로딩을 위해 끝까지 스크롤
# total_products = await self.browser_controller.get_total_product_count(ed_mode=self.toggle_states['ed_mode'])
# get_total_product_count 메서드 호출 후 결과를 딕셔너리로 받음
result = await self.get_total_product_count()
# 딕셔너리에서 총 상품 수와 페이지당 상품 수를 추출
total_products = result.get("total_count", 0)
items_per_page = result.get("items_per_page", 0)
self.logger.log(f"총 상품 수: {total_products}, 페이지당 상품 수: {items_per_page}", level=logging.DEBUG)
self.logger.log(f"[ self.toggle_states 상태 ]\n{self.toggle_states}", level=logging.DEBUG)
if total_products == 0:
self.logger.log('수집할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG)
return
completed_count = 0
page_number = 1
# 3. 총 상품 수만큼 반복 작업 수행
while self.running and completed_count < total_products:
self.logger.log(f'현재 페이지: {page_number}', level=logging.DEBUG)
# if not page_number == 1:
# await self.scroll_page_to_top()
# self.logger.log(f'1페이지가 아니므로 동적로딩을 위해 휠 스크롤 업', level=logging.DEBUG)
await self.scroll_page_to_top()
self.logger.log(f'동적로딩을 위해 휠 스크롤 업', level=logging.DEBUG)
if not self.toggle_states['ed_mode']:
# 4. 현재 페이지의 모든 "세부사항 수정 및 업로드" 버튼 찾기
self.logger.log('수정모드가 아니므로 상품수정 버튼 elements를 수집합니다.', level=logging.DEBUG)
product_buttons = await self.get_product_edit_buttons_by_templete()
else:
self.logger.log('상품정보 수집', level=logging.DEBUG)
product_infos, product_name_elements = await self.collect_product_info(items_per_page, ed_mode=self.toggle_states['ed_mode'])
self.logger.log(f"product_infos : {product_infos}", level=logging.DEBUG)
self.logger.log('수정모드이므로 상품명 elements를 수정버튼으로 활용합니다.', level=logging.DEBUG)
product_buttons = [{"edit_button": name_element, "memo_button": None, "shipping_button": None} for name_element in product_name_elements]
self.logger.log(f"product_buttons 갯수 : [{len(product_buttons)}]개", level=logging.DEBUG)
if not product_buttons:
self.logger.log('수정할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG)
break
if self.toggle_states['recovery_mode']:
deleted_imgs = self.deleted_img_urls_from_logs()
# 5. 각 상품에 대해 상품수정작업 수행
for index, button_set in enumerate(product_buttons, start=1):
edit_button = button_set.get("edit_button")
memo_button = button_set.get("memo_button")
shipping_button = button_set.get("shipping_button") # 해외배송비 수정 버튼
# 상품명 수집 오류 처리
self.logger.log(f'{index}/{len(product_buttons)} 버튼의 활성상태 확인 중...', level=logging.DEBUG)
is_disabled = await self.is_button_disabled(edit_button)
if is_disabled:
self.logger.log(f'{index}/{len(product_buttons)}: 상품의 수정버튼이 비활성화되어 있어 작업을 건너뜁니다.', level=logging.DEBUG)
continue
self.logger.log(f'{index}/{len(product_buttons)}: 세부사항 수정 작업 중...', level=logging.DEBUG)
# 상품 수정 다이얼로그 열기
await self.open_product_edit_dialog(edit_button)
title_infos = await self.titleGenerator.get_initial_info(self.price_setting_diag, use_lens=self.toggle_states['use_lens'], use_api=self.toggle_states['use_API'])
self.logger.log(f"title_infos : {title_infos}", level=logging.DEBUG)
# 금지카테고리 여부가 True이면, 금지카테고리 제목 설정 후 저장하고 다음 상품으로 넘어감.
if title_infos.get("is_banned_category", False):
banned_title = self.titleGenerator.set_banned_category_title(title_infos.get("banned_category_info"))
self.logger.log(f"금지카테고리 상품 처리: 새 제목: {banned_title}", level=logging.INFO)
# 상품명 설정 (TitleGenerator 내부 또는 별도 메서드를 통해)
is_set = await self.titleGenerator.set_product_name(banned_title)
if is_set:
self.logger.log("금지카테고리 상품명 설정 완료.", level=logging.INFO)
else:
self.logger.log("금지카테고리 상품명 설정 실패.", level=logging.WARNING)
await self.save_and_ecs_product_edit()
completed_count += 1
self.total_progressbar_signal.emit(completed_count, total_products)
continue # 다음 상품으로 넘어감
# 정상상품이면 상품명 수정과 카테고리 수집
is_title = self.toggle_states['title']
if is_title:
self.logger.log(f"상품명 수정 : {is_title} ", level=logging.DEBUG)
self.start_stage_signal.emit(0)
title_infos["generated_name"] = await self.titleGenerator.process_title()
self.complete_stage_signal.emit(0)
# 옵션 수정
self.start_stage_signal.emit(1)
is_optionTrnas = self.toggle_states.get('optionTrnas')
is_optionIMGTrans = self.toggle_states.get('optionIMGTrans')
is_optionAutoSelect = self.toggle_states.get('optionAutoSelect')
if is_optionTrnas or is_optionIMGTrans or is_optionAutoSelect:
self.logger.log(f"옵션수정 : optionTrnas={is_optionTrnas} + optionIMGTrans={is_optionIMGTrans} + optionAutoSelect={is_optionAutoSelect}", level=logging.DEBUG)
await self.edit_option(title_infos.get("original_name", None))
self.complete_stage_signal.emit(1)
# 가격 수정
self.start_stage_signal.emit(2)
is_price = self.toggle_states.get('price')
if is_price:
self.logger.log(f"가격수정 : {is_price} ", level=logging.DEBUG)
await self.edit_price(title_infos)
self.complete_stage_signal.emit(2)
# 썸네일 수정
self.start_stage_signal.emit(3)
thumb = self.toggle_states.get('thumb')
self.logger.log(f"썸네일수정 : {thumb} ", level=logging.DEBUG)
if thumb:
await self.edit_thumb()
self.complete_stage_signal.emit(3)
# 태그 수정
tag = self.toggle_states.get('tag')
if tag:
self.logger.log(f"키워드태그 수정 : {tag} ", level=logging.DEBUG)
self.start_stage_signal.emit(4)
await self.edit_tags(title_infos)
self.complete_stage_signal.emit(4)
# 상세페이지 수정 수정
if self.toggle_states['detail_Option'] or self.toggle_states['detail_IMGTrans']:
self.logger.log(f"상세페이지 수정 : {self.toggle_states['detail_Option']} + {self.toggle_states['detail_IMGTrans']}", level=logging.DEBUG)
# 상세페이지 수정
self.start_stage_signal.emit(5)
if not self.toggle_states['recovery_mode']:
await self.detail_trans()
else:
await self.detail_trans_for_recovery(title_infos["original_name"], deleted_imgs)
self.complete_stage_signal.emit(5)
# 수정 후 저장
self.logger.log('상품 세부사항 저장 중...', level=logging.DEBUG)
await self.save_and_ecs_product_edit()
# 메모 입력
if memo_button:
self.logger.log(f'{index}/{len(product_buttons)}: 메모 입력 중...', level=logging.DEBUG)
await self.insert_product_infos_to_memo(memo_button, title_infos)
completed_count += 1
self.logger.log(f'{completed_count}/[{total_products}]개 상품 수정 완료.', level=logging.INFO)
self.total_progressbar_signal.emit(completed_count, total_products)
if completed_count >= total_products:
self.logger.log(f'[{total_products}]개 상품 수정이 완료되었습니다.', level=logging.INFO)
self.translation_completed.emit(total_products)
return
# 6. 다음 페이지로 이동 (있으면)
if not await self.go_to_next_page():
self.logger.log('더 이상 페이지가 없습니다. 작업을 종료합니다.', level=logging.INFO)
break
page_number += 1
if self.running:
self.logger.log('모든 상품 번역 및 저장 완료.', level=logging.INFO)
self.whale_translator.close_trans_browser()
self.logger.log('웨일 종료', level=logging.INFO)
except Exception as e:
self.logger.log(f"번역 작업 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
self.translation_error.emit(str(e))
async def insert_product_infos_to_memo(self, memo_button, title_infos):
try:
# 1. 메모 버튼 유효성 확인
is_disabled = await self.is_button_disabled(memo_button)
if is_disabled:
self.logger.log("메모 버튼이 비활성화되어 있어 작업을 건너뜁니다.", level=logging.WARNING)
return
# 2. title_infos 유효성 확인
top_5_titles = title_infos.get("top_5_titles", [])
top_5_prices = title_infos.get("top_5_prices", [])
if not top_5_titles or not top_5_prices:
self.logger.log("title_infos의 top_5_titles 또는 top_5_prices가 비어 있어 작업을 건너뜁니다.", level=logging.WARNING)
return
# 3. 텍스트 생성
text = self.generate_titles_with_prices(title_infos)
if not text:
self.logger.log("생성된 텍스트가 비어 있어 작업을 건너뜁니다.", level=logging.WARNING)
return
# 4. 메모 버튼 클릭
await memo_button.click()
self.logger.log("메모 버튼을 클릭했습니다.", level=logging.DEBUG)
# 5. 텍스트 입력란 대기
# memo_input = await self.page.query_selector(self.memo_input_locator)
memo_input = self.page.locator(self.memo_input_locator)
await memo_input.wait_for(state="visible")
self.logger.log("메모 입력란이 나타났습니다.", level=logging.DEBUG)
# 6. 텍스트 입력.
await memo_input.click(text)
await asyncio.sleep(0.5)
await memo_input.fill(text)
self.logger.log(f"메모 입력란에 텍스트를 입력했습니다: {text}", level=logging.DEBUG)
# 7. 저장 버튼 대기 및 클릭
# save_button = self.page.locator('div.ant-modal-footer > button[type="button"].ant-btn.css-1li46mu.ant-btn-primary')
# save_button = await self.page.query_selector(self.memo_save_btn_locator)
save_button = self.page.locator(self.memo_save_btn_locator)
await save_button.wait_for(state="visible")
self.logger.log("메모 입력란의 저장 버튼이 나타났습니다.", level=logging.DEBUG)
await save_button.click()
self.logger.log("메모 입력란의 저장 버튼을 클릭했습니다.", level=logging.DEBUG)
await asyncio.sleep(0.5)
except Exception as e:
self.logger.log(f"메모 입력 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
# 에러 발생 시 스크린샷 저장
timestamp = int(time.time())
screenshot_path = os.path.join(self.temp_dir, f"screenshot_error_{timestamp}.png")
await self.page.screenshot(path=screenshot_path)
self.logger.log(f"메모 입력 중 에러 발생 으로 스크린샷 저장됨: {screenshot_path}", level=logging.INFO)
# ESC 키 2번 전송하여 팝업 제거 시도
self.logger.log("ESC 키를 2번 전송하여 팝업 제거 후 메모 입력 재시도합니다.", level=logging.WARNING)
await self.page.keyboard.press("Escape")
await self.page.keyboard.press("Escape")
await asyncio.sleep(0.5) # 잠시 대기
# 메모 입력 재시도
try:
await memo_button.click()
self.logger.log("재시도: 메모 버튼을 클릭했습니다.", level=logging.DEBUG)
memo_input = await self.page.query_selector(self.memo_input_locator)
await memo_input.wait_for(state="visible")
await memo_input.fill(text)
self.logger.log(f"재시도: 메모 입력란에 텍스트를 입력했습니다: {text}", level=logging.DEBUG)
save_button = await self.page.query_selector(self.memo_save_btn_locator)
await save_button.wait_for(state="visible")
await save_button.click()
self.logger.log("재시도: 저장 버튼을 클릭했습니다.", level=logging.DEBUG)
await asyncio.sleep(0.5)
except Exception as e2:
self.logger.log(f"재시도 후에도 메모 입력 중 오류 발생: {e2}", level=logging.ERROR, exc_info=True)
# ESC 키 2번 전송하여 팝업 제거 후 다음 상품으로 넘어감
self.logger.log("재시도 실패: ESC 키를 2번 전송하여 창을 닫고 다음 상품으로 넘어갑니다.", level=logging.WARNING)
await self.page.keyboard.press("Escape")
await self.page.keyboard.press("Escape")
# 다음 상품으로 넘어가기 위해 예외를 재발생하지 않고 그냥 리턴합니다.
return
# async def insert_product_infos_to_memo_ori(self, memo_button, title_infos):
# """메모 버튼을 클릭하고 텍스트를 입력한 뒤 저장"""
# try:
# # 1. 메모 버튼 유효성 확인
# is_disabled = await self.is_button_disabled(memo_button)
# if is_disabled:
# self.logger.log("메모 버튼이 비활성화되어 있어 작업을 건너뜁니다.", level=logging.WARNING)
# return
# # 2. title_infos 유효성 확인
# top_5_titles = title_infos.get("top_5_titles", [])
# top_5_prices = title_infos.get("top_5_prices", [])
# if not top_5_titles or not top_5_prices:
# self.logger.log("title_infos의 top_5_titles 또는 top_5_prices가 비어 있어 작업을 건너뜁니다.", level=logging.WARNING)
# return
# # 3. 텍스트 생성
# text = self.generate_titles_with_prices(title_infos)
# if not text:
# self.logger.log("생성된 텍스트가 비어 있어 작업을 건너뜁니다.", level=logging.WARNING)
# return
# # 4. 메모 버튼 클릭
# await memo_button.click()
# self.logger.log("메모 버튼을 클릭했습니다.", level=logging.DEBUG)
# # 5. 텍스트 입력란 대기
# memo_input = self.page.locator('[placeholder="상품에 대한 메모를 작성해주세요"]')
# await memo_input.wait_for(state="visible")
# self.logger.log("메모 입력란이 나타났습니다.", level=logging.DEBUG)
# # 6. 텍스트 입력
# await memo_input.fill(text)
# self.logger.log(f"메모 입력란에 텍스트를 입력했습니다: {text}", level=logging.DEBUG)
# # # 7. 체크박스 활성화 확인 및 체크 상태 설정
# # # # role="dialog" 내의 체크박스 입력 요소 찾기
# # dialog_locator = self.page.locator('[role="dialog"]')
# # checkbox_input = dialog_locator.locator('input.ant-checkbox-input')
# # checkbox_label = dialog_locator.locator('label.ant-checkbox-wrapper')
# # # 체크박스가 비활성화된 경우, 활성화 후 클릭
# # is_disabled = await checkbox_input.is_disabled()
# # if is_disabled:
# # self.logger.log("체크박스가 비활성화 상태입니다. 활성화를 시도합니다.", level=logging.DEBUG)
# # await self.page.evaluate('(checkbox) => checkbox.removeAttribute("disabled")', checkbox_input)
# # # # 체크 상태 확인
# # is_checked = await checkbox_label.get_attribute("class")
# # if "ant-checkbox-wrapper-checked" not in is_checked:
# # self.logger.log("체크박스가 체크되어 있지 않습니다. 체크를 수행합니다.", level=logging.DEBUG)
# # await checkbox_input.click()
# # else:
# # self.logger.log("체크박스가 이미 체크 상태입니다.", level=logging.DEBUG)
# # await self.page.evaluate('(checkbox) => checkbox.checked = true', checkbox_input)
# # 8. 저장 버튼 대기
# save_button = self.page.locator('div.ant-modal-footer > button[type="button"].ant-btn.css-1li46mu.ant-btn-primary')
# await save_button.wait_for(state="visible")
# self.logger.log("저장 버튼이 나타났습니다.", level=logging.DEBUG)
# # 9. 저장 버튼 클릭
# await save_button.click()
# self.logger.log("저장 버튼을 클릭했습니다.", level=logging.DEBUG)
# await asyncio.sleep(0.5)
# except Exception as e:
# self.logger.log(f"메모 입력 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
# # 에러 발생 시 스크린샷 저장
# timestamp = int(time.time())
# screenshot_path = os.path.join(self.temp_dir, f"screenshot_error_{timestamp}.png")
# await self.page.screenshot(path=screenshot_path)
# self.logger.log(f"에러 발생 시 스크린샷 저장됨: {screenshot_path}", level=logging.INFO)
# # 동영상 기록이 활성화되어 있다면, 페이지에 연결된 동영상 경로 확인 (페이지가 닫히면 video 파일이 최종 저장됩니다)
# if self.page.video:
# video_path = await self.page.video.path()
# self.logger.log(f"동영상 파일 경로: {video_path}", level=logging.INFO)
# # 필요하다면, 여기서 ffmpeg를 호출해 에러 전후 30초를 추출하는 추가 작업을 할 수 있습니다.
# # 예) os.system(f"ffmpeg -i {video_path} -ss {start_time} -t 60 {clipped_video_path}")
# raise e # 예외 재발생
def generate_titles_with_prices(self, title_infos):
"""top_5_titles와 top_5_prices를 매칭하여 텍스트 생성"""
try:
titles = title_infos.get("top_5_titles", [])
prices = title_infos.get("top_5_prices", [])
if not titles or not prices:
self.logger.log("top_5_titles 또는 top_5_prices가 비어 있습니다.", level=logging.WARNING)
return ""
# 매칭된 텍스트 생성
formatted_lines = []
for title, price in zip(titles, prices):
formatted_price = f"{int(price):,}" # 3자리수마다 콤마, "원" 추가
formatted_lines.append(f"{title} - {formatted_price}")
# 최종 텍스트 생성
result_text = "\n".join(formatted_lines)
self.logger.log(f"Generated titles with prices:\n{result_text}", level=logging.DEBUG)
return result_text
except Exception as e:
self.logger.log(f"텍스트 생성 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return ""
def is_valid_html(self, html_data: str) -> bool:
"""
HTML 데이터가 정상적인지 확인하는 메서드
조건: <img> 태그가 포함되어 있고 다수의 이미지 URL이 존재해야 함
"""
try:
# <img> 태그 내의 src 속성 값을 추출
img_urls = re.findall(r'<img[^>]+src=["\'](.*?)["\']', html_data)
# 최소 이미지 URL 개수를 기준으로 판단 (예: 5개 이상)
if len(img_urls) >= 5:
self.logger.log(f"HTML에 포함된 이미지 URL: {img_urls}", level=logging.DEBUG)
return True
else:
self.logger.log(f"HTML에 포함된 이미지 URL이 부족합니다. ({len(img_urls)}개)", level=logging.WARNING)
return False
except Exception as e:
self.logger.log(f"HTML 유효성 검사 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return False
async def detail_trans(self):
try:
self.set_progress_visible_signal.emit(True)
# 상세페이지 탭 클릭
await self.click_detail_tab()
# CKEditor HTML 데이터 가져오기
html_data = await self.get_ckeditor_data()
# self.logger.log(f"가져온 원본 CKEditor html_data : {html_data}", level=logging.DEBUG)
if not html_data:
self.logger.log("HTML 데이터가 비어 있습니다. 해당상품 수정을 건너뜁니다.", level=logging.WARNING)
return
# HTML 데이터가 정상적인지 확인
if self.is_valid_html(html_data):
self.logger.log("정상적인 HTML 데이터입니다.", level=logging.INFO)
else:
self.logger.log("HTML 데이터가 비정상적이거나 이미지 URL이 포함되어 있지 않습니다.", level=logging.WARNING)
return
# CKEditor 내용을 비우는 메서드 호출
await self.clear_ckeditor_data()
# 이미지 URL 추출
soup = BeautifulSoup(html_data, 'html.parser')
images = soup.find_all('img', src=True)
total_imgs = len(images)
original_images = [] # 원본 이미지 URL 저장
self.logger.log(f"가져온 이미지 갯수: {len(images)}", level=logging.DEBUG)
# # 텍스트 입력 필드 선택
# input_field2 = self.page.locator('div#productMainContentContainerId div.ck.ck-editor__main > div')
input_field2 = self.page.locator(self.cke_text_editing_area_locator)
await input_field2.click()
self.logger.log(f"입력필드2 선택", level=logging.DEBUG)
# 소개글과 옵션데이터 입력
# await self.page.keyboard.press("ArrowLeft")
# await self.page.keyboard.press("Enter")
# await self.page.keyboard.press("ArrowUp")
await self.input_detail_text(self.optionHandler)
# 이미지 번역
for index, img in enumerate(images):
original_url = img.get('src')
self.logger.log(f"[{index+1}]번째 original_url : {original_url}", level=logging.DEBUG)
if not original_url:
continue
original_images.append(original_url) # 원본 URL 저장
# 이미지 번역을 위한 임시 파일 경로 생성
img_path = os.path.join(self.temp_dir, f"translated_detail_image_{index+1}.png")
self.logger.log(f"[{index+1}]번째 임시 저장PATH 생성 : {img_path}", level=logging.DEBUG)
self.logger.log(f"브라우저 이미지 번역 프로세스", level=logging.DEBUG)
is_translated = self.whale_translator.translate_image(original_url)
# self.switch_to_chrome() # 크롬으로 포커스 이동
translated_img_path = self.clipboardImageManager.process_clipboard_to_save_path(original_url=original_url, is_success_translated=is_translated, toggle_states=self.toggle_states, path=img_path) # 클립보드 내용을 처리
self.logger.log(f"번역완료된 임시 translated_img의 로컬 path: {translated_img_path}", level=logging.DEBUG)
if translated_img_path:
# 번역된 이미지 업로드
await self.upload_image(translated_img_path)
self.clipboardImageManager.clear_clipboard()
self.logger.log(f"{index+1}번째 이미지 번역완료.", level=logging.INFO)
else:
self.logger.log(f"original_url 확인 필요: {original_url}", level=logging.WARNING)
self.update_detail_progress_signal.emit(index+1, total_imgs)
# if not is_translated:
# self.logger.log(f"[{index+1}]번째 이미지 번역 실패로 원본이미지 삭제목록에서 제외합니다.", level=logging.INFO)
# original_images.remove(original_url) # 번역 실패 시 제거
# continue
# # 변경된 내용 중 옵션 빈 줄 제외
uploaded_html_data = await self.get_ckeditor_data()
self.logger.log(f"{index+1} 번째 uploaded_html_data : [{uploaded_html_data}]", level=logging.DEBUG)
# removed_empty_line_html = self.remove_empty_list_items(modified_html_data)
# await self.set_ckeditor_data(removed_empty_line_html)
self.set_progress_visible_signal.emit(False)
self.logger.log("모든 이미지가 처리되었습니다.", level=logging.INFO)
await self.page.keyboard.press("ArrowRight")
await self.page.keyboard.press("Enter")
await self.page.keyboard.press("Enter")
await self.page.keyboard.press("Enter")
# # 원본 이미지 제거
# time.sleep(1)
# html_data = await self.get_ckeditor_data() # 최신 HTML 가져오기
# modified_html = await self.remove_original_images(html_data)
# # 수정된 HTML을 CKEditor에 다시 삽입
# await self.set_ckeditor_data(modified_html)
# self.logger.log(f"original_images_List : {original_images}", level=logging.INFO)
# self.logger.log(f"html_data : {html_data}", level=logging.INFO)
# self.logger.log(f"modified_html : {modified_html}", level=logging.INFO)
self.logger.log("원본 이미지가 제거된 HTML 데이터가 CKEditor에 삽입되었습니다.", level=logging.INFO)
# else:
# self.logger.log("CKEditor의 HTML이 비어있습니다. 상세페이지 처리를 건너뜁니다.", level=logging.WARNING)
except Exception as e:
self.logger.log(f"CKEditor 이미지 처리 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
# 수정 후 저장
self.logger.log('상품 세부사항 저장 중...', level=logging.INFO)
await self.save_product_edit()
async def remove_original_images(self, html_data):
"""
CKEditor HTML 데이터에서 연속된 엔터 3번 친 부분 이후의 데이터를 삭제합니다.
:param html_data: CKEditor의 HTML 데이터
:return: 수정된 HTML 데이터
"""
try:
self.logger.log("HTML 데이터에서 엔터 세 번 친 부분 이후 제거 작업 시작", level=logging.DEBUG)
# BeautifulSoup으로 HTML 데이터 파싱
soup = BeautifulSoup(html_data, 'html.parser')
# 모든 <p>&nbsp;</p> 태그를 찾음
enter_tags = soup.find_all('p', string="\xa0")
self.logger.log(f"찾은 <p>&nbsp;</p> 태그 수: {len(enter_tags)}", level=logging.DEBUG)
# 연속된 <p>&nbsp;</p> 태그 3개를 찾음
count = 0
start_removal = None
for i, tag in enumerate(enter_tags):
if i > 0 and tag.previous_sibling == enter_tags[i - 1]:
count += 1
else:
count = 1
if count == 3: # 연속된 세 번째 태그 발견
start_removal = tag
self.logger.log(f"엔터 세 번 연속된 마지막 태그 발견: {start_removal}", level=logging.DEBUG)
break
# 연속된 엔터 태그 이후 데이터 제거
if start_removal:
# 기준 태그 이후의 모든 형제를 삭제
for sibling in list(start_removal.find_all_next()):
sibling.decompose()
# 수정된 HTML 반환
modified_html = str(soup)
# 옵션목록 중 빈옵션 삭제
removed_empty_line_html = self.remove_empty_list_items(modified_html)
self.logger.log(f"최종 수정된 HTML 데이터: {removed_empty_line_html}", level=logging.DEBUG)
return modified_html
except Exception as e:
self.logger.log(f"HTML 데이터 수정 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return html_data # 오류 발생 시 원본 데이터 반환
def remove_empty_list_items(self, html_data):
"""
HTML 데이터에서 &nbsp;만 포함된 <li> 요소를 제거합니다.
:param html_data: CKEditor의 HTML 데이터
:return: 수정된 HTML 데이터
"""
try:
# BeautifulSoup으로 HTML 데이터 파싱
soup = BeautifulSoup(html_data, 'html.parser')
# 모든 <li> 태그 찾기
list_items = soup.find_all('li')
# &nbsp;만 포함된 <li> 태그 제거
for item in list_items:
if item.get_text(strip=True) == "\xa0": # &nbsp;는 \xa0로 표시됨
item.decompose() # 태그 제거
# 수정된 HTML 반환
modified_html = str(soup)
return modified_html
except Exception as e:
self.logger.log(f"remove_empty_list_items 수정 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return html_data # 오류 발생 시 원본 데이터 반환
async def remove_original_images_with_count(self, html_data, original_images):
"""
CKEditor HTML 데이터에서 original_images의 개수만큼 뒤에서부터 <img> 태그를 제거하되,
업로드된 이미지와 중복된 URL은 제외합니다.
:param html_data: CKEditor의 HTML 데이터
:param original_images: 원본 이미지 URL 리스트
:return: 수정된 HTML 데이터
"""
try:
self.logger.log("원본 이미지 제거 작업 시작", level=logging.DEBUG)
# HTML 데이터를 BeautifulSoup으로 파싱
soup = BeautifulSoup(html_data, 'html.parser')
# 모든 <img> 태그 탐색
images = soup.find_all('img', src=True)
self.logger.log(f"HTML에서 찾은 이미지 태그 수: {len(images)}", level=logging.DEBUG)
# 뒤에서부터 original_images의 개수만큼 제거
num_to_remove = len(original_images)
self.logger.log(f"제거할 이미지 갯수: {num_to_remove}", level=logging.DEBUG)
if num_to_remove > len(images):
self.logger.log("제거할 이미지 수가 전체 이미지 수를 초과합니다. 모든 이미지를 제거합니다.", level=logging.WARNING)
num_to_remove = len(images)
removed_count = 0 # 삭제된 이미지 수 카운트
for img in reversed(images): # 뒤에서부터 순회
src = img.get('src')
if src in original_images: # 원본 이미지 URL에 해당하는 경우만 삭제
self.logger.log(f"원본 이미지 제거: {src}", level=logging.DEBUG)
img.decompose()
removed_count += 1
if removed_count >= num_to_remove: # 지정된 개수만큼 삭제되면 중단
break
# 수정된 HTML을 반환
modified_html = str(soup)
# self.logger.log(f"수정된 HTML 데이터: {modified_html}", level=logging.DEBUG)
return modified_html
except Exception as e:
self.logger.log(f"원본 이미지 제거 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return html_data # 오류 발생 시 원본 데이터 반환
async def get_ckeditor_data(self):
try:
# self.logger.log("CKEditor 데이터를 가져옵니다.", level=logging.INFO)
# get_editor_data_script = """
# (function() {
# const editorElement = document.querySelector('.ck-editor__editable');
# if (!editorElement) {
# console.error('CKEditor 요소를 찾을 수 없습니다.');
# return null;
# }
# const editorInstance = editorElement.ckeditorInstance;
# if (!editorInstance) {
# console.error('CKEditor 인스턴스를 가져올 수 없습니다.');
# return null;
# }
# const editorData = editorInstance.getData(); // 현재 편집 중인 HTML 데이터 가져오기
# console.log('현재 CKEditor 데이터:', editorData);
# return editorData;
# })();
# """
self.logger.log("CKEditor 데이터를 가져옵니다.", level=logging.INFO)
get_editor_data_script = """
(function() {
const checkEditorLoaded = () => {
const editorElement = document.querySelector('.ck-editor__editable');
if (!editorElement) {
console.error('CKEditor 요소를 찾을 수 없습니다.');
return null;
}
const editorInstance = editorElement.ckeditorInstance;
if (!editorInstance) {
console.error('CKEditor 인스턴스가 아직 로드되지 않았습니다.');
return null;
}
const editorData = editorInstance.getData(); // 현재 편집 중인 HTML 데이터 가져오기
console.log('현재 CKEditor 데이터:', editorData);
return editorData;
};
let retries = 10; // 최대 재시도 횟수
let interval = 500; // 재시도 간격 (밀리초)
return new Promise((resolve, reject) => {
const checkInterval = setInterval(() => {
if (retries <= 0) {
clearInterval(checkInterval);
reject('CKEditor 데이터를 가져오는데 실패했습니다.');
}
const data = checkEditorLoaded();
if (data) {
clearInterval(checkInterval);
resolve(data);
}
retries--;
}, interval);
});
})();
"""
await self.page.wait_for_selector('.ck-editor__editable', timeout=10000)
ck_editor_data = await self.page.evaluate(get_editor_data_script)
# self.logger.log(f"CKEditor 데이터: {ck_editor_data}", level=logging.INFO)
if ck_editor_data:
self.logger.log("CKEditor 데이터를 성공적으로 가져왔습니다.", level=logging.INFO)
else:
self.logger.log("CKEditor 데이터를 가져오는데 실패했습니다.", level=logging.WARNING)
return ck_editor_data
except Exception as e:
self.logger.log(f"CKEditor 데이터를 가져오는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return None
async def clear_ckeditor_data(self):
try:
clear_editor_data_script = """
(function() {
const editorElement = document.querySelector('.ck-editor__editable');
if (!editorElement) {
console.error('CKEditor 요소를 찾을 수 없습니다.');
return false;
}
const editorInstance = editorElement.ckeditorInstance;
if (!editorInstance) {
console.error('CKEditor 인스턴스를 가져올 수 없습니다.');
return false;
}
editorInstance.setData(''); // CKEditor 데이터 비우기
console.log('CKEditor 데이터를 비웠습니다.');
return true;
})();
"""
result = await self.page.evaluate(clear_editor_data_script)
if result:
self.logger.log("CKEditor 데이터를 성공적으로 비웠습니다.", level=logging.INFO)
else:
self.logger.log("CKEditor 데이터를 비우는 데 실패했습니다.", level=logging.WARNING)
except Exception as e:
self.logger.log(f"CKEditor 데이터를 비우는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
async def set_ckeditor_data(self, html_data):
"""
CKEditor에 HTML 데이터를 설정합니다.
:param html_data: 삽입할 HTML 데이터
"""
try:
self.logger.log("CKEditor에 HTML 데이터를 설정하는 작업 시작", level=logging.DEBUG)
# JavaScript 스크립트를 통해 CKEditor에 데이터 설정
set_data_script = f"""
(function() {{
const editorElement = document.querySelector('.ck-editor__editable');
if (!editorElement) {{
console.error('CKEditor 요소를 찾을 수 없습니다.');
return;
}}
const editorInstance = editorElement.ckeditorInstance;
if (!editorInstance) {{
console.error('CKEditor 인스턴스를 가져올 수 없습니다.');
return;
}}
editorInstance.setData(`{html_data}`);
console.log('CKEditor에 새로운 데이터가 성공적으로 설정되었습니다.');
}})();
"""
# 스크립트 실행
await self.page.evaluate(set_data_script)
self.logger.log("CKEditor에 HTML 데이터를 성공적으로 설정했습니다.", level=logging.INFO)
except Exception as e:
self.logger.log(f"CKEditor 데이터 설정 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
async def input_detail_text(self, optionHandler):
try:
# 텍스트 입력 필드 선택
input_field = await self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
# await input_field.press('Enter')
# # 선두부 텍스트 입력
# for key in sorted(self.text_templates.keys()):
# leading_text = self.text_templates[key]
# if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
# await input_field.type(leading_text)
# await input_field.press('Enter')
# self.logger.log(f"{key} 텍스트 입력 완료: {leading_text}", level=logging.INFO)
# 선두부 텍스트 입력 (self.detail_text_widget의 get_lines 메서드 사용)
for leading_text in self.detail_text_widget.get_lines():
if leading_text: # 내용이 있는 경우에만
await input_field.type(leading_text)
await input_field.press('Enter')
self.logger.log(f"텍스트 입력 완료: {leading_text}", level=logging.INFO)
option_data = optionHandler.get_selected_translated_options()
self.logger.log(f'option_data : {option_data}', level=logging.DEBUG)
# if is_option_data:
if option_data or option_data != {}:
self.logger.log('옵션 데이터 입력 시작', level=logging.DEBUG)
is_single = optionHandler.option_info['is_single_option']
self.logger.log('가져온 옵션 데이터', level=logging.DEBUG)
self.logger.log(f'{option_data}', level=logging.DEBUG)
# # 옵션 입력 필드 선택
# input_field = await self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
# await input_field.press('Enter')
if not is_single:
self.logger.log('단일옵션이 아니므로 옵션목록을 입력', level=logging.INFO)
# 각 옵션을 한 줄씩 입력
await input_field.type("# 옵션 목록")
await input_field.press('Enter')
# 첫 번째 옵션의 번역된 옵션명만 입력
first_key = list(option_data.keys())[0] # key는 옵션이름 value는 가격
first_value = option_data[first_key] # key는 옵션이름 value는 가격
await input_field.type(f"- 1. {first_key}")
await input_field.press('Enter') # 첫 번째 옵션 이후 엔터로 줄바꿈
# 나머지 옵션도 번역된 옵션명만 입력
for i, (key, value) in enumerate(list(option_data.items())[1:], start=2):
await input_field.type(f"{key}") # 2번옵션부터 번호 떼고 입력. key는 옵션이름 value는 가격
await input_field.press('Enter') # 엔터 키를 입력하여 줄바꿈
# 목록 끝을 알리기 위해 엔터 1 번 입력
await input_field.press('Enter')
# 후두부 텍스트 입력
await input_field.type('### 나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.')
await input_field.press('Enter')
await input_field.type('---')
await input_field.press('Enter')
self.logger.log('옵션 데이터 입력 완료', level=logging.INFO)
except Exception as e:
self.logger.log(f"옵션데이터 입력 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def upload_image(self, img_path):
"""
이미지를 CKEditor에 업로드하고 가장 첫 번째 이미지 URL을 추출 후 삭제합니다.
:param img_path: 업로드할 이미지 파일 경로
:return: 업로드된 이미지의 URL
"""
try:
# 업로드 이전 이미지 리스트 저장
existing_images = await self.page.evaluate("""
() => Array.from(document.querySelectorAll('img')).map(img => img.src)
""")
# 1. 파일 업로드
# file_input_selector = 'div#productMainContentContainerId button[type="button"].ck.ck-button.ck-off.ck-file-dialog-button.ck-splitbutton__action > input'
file_input = self.page.locator(self.cke_img_file_input_locator)
await file_input.set_input_files(img_path)
self.logger.log(f"이미지가 성공적으로 업로드되었습니다: {img_path}", level=logging.INFO)
# # 2. 엔터 키 입력
# # await self.page.keyboard.press("Enter")
await self.page.keyboard.press("ArrowRight")
await self.page.keyboard.press("ArrowRight")
# 업로드된 이미지 URL 확인
current_images = await self.page.evaluate("""
() => Array.from(document.querySelectorAll('img')).map(img => img.src)
""")
new_images = list(set(current_images) - set(existing_images))
if new_images:
self.logger.log(f"업로드된 이미지 확인", level=logging.INFO)
return new_images[0]
self.logger.log("업로드된 이미지를 찾을 수 없습니다.", level=logging.WARNING)
return None
except Exception as e:
self.logger.log(f"이미지 업로드 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
async def wait_for_image_upload(self):
"""
이미지 업로드 후 DOM 상태가 업데이트되었는지 확인.
"""
try:
new_image = await self.page.wait_for_function("""
() => {
const images = Array.from(document.querySelectorAll("img[src*='uploaded_image']"));
return images.length > 0 ? images[images.length - 1].src : null;
}
""", timeout=10000) # 10초 대기
return new_image
except Exception as e:
self.logger.log(f"wait_for_function으로 이미지 업로드 감지 중 오류: {e}", level=logging.ERROR)
return None
# async def detail_trans_ori(self):
# # 상세페이지 탭 클릭
# await self.click_detail_tab()
# # 텍스트 입력 필드 선택
# input_field = await self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
# await input_field.press('Enter')
# get_editor_data_script = """
# (function() {
# const editorElement = document.querySelector('.ck-editor__editable');
# if (!editorElement) {
# console.error('CKEditor 요소를 찾을 수 없습니다.');
# return null;
# }
# const editorInstance = editorElement.ckeditorInstance;
# if (!editorInstance) {
# console.error('CKEditor 인스턴스를 가져올 수 없습니다.');
# return null;
# }
# const editorData = editorInstance.getData(); // 현재 편집 중인 HTML 데이터 가져오기
# console.log('현재 CKEditor 데이터:', editorData);
# return editorData;
# })();
# """
# original_editor_data = await self.page.evaluate(get_editor_data_script)
# self.logger.log(f"원본 CKEditor 데이터: {original_editor_data}", level=logging.INFO)
# # 이미지 URL 추출
# image_urls = await self.extract_image_urls(self.optionHandler, is_option_data=True) # 코루틴 실행
# total_images = len(image_urls)
# self.logger.log(f"현재 상품의 총 이미지 수 : {total_images}개", level=logging.INFO)
# # 이미지 번역 작업 진행
# for index, url in enumerate(image_urls):
# current_image_count = index +1
# # 임시 파일 경로 생성
# img_path = os.path.join(self.temp_dir, f"translated_detail_image_{current_image_count}.png")
# self.logger.log(f"웨일 브라우저를 활용한 이미지 번역 프로세스", level=logging.DEBUG)
# is_success_translated = self.whale_translator.translate_image(url)
# # is_paste_success = self.paste_image_in_chrome(self.clipboardImageManager, url, is_success_translated, self.toggle_states)
# is_paste_success = await self.paste_image_in_chrome_to_save_path(url, img_path, is_success_translated, self.toggle_states)
# if is_paste_success:
# self.logger.log(f"{url} gui 이미지 붙여넣기 성공", level=logging.DEBUG)
# else:
# self.logger.log(f"{url} gui 이미지 붙여넣기 실패", level=logging.DEBUG)
# current_image_count += 1
# # 수정 후 저장
# self.logger.log('상품 세부사항 저장 중...', level=logging.INFO)
# await self.save_product_edit()
async def detail_trans_for_recovery(self, product_name, deleted_imgs):
# 상세페이지 탭 클릭
await self.click_detail_tab()
self.logger.log('recovery_image_urls 메서드 호출', level=logging.DEBUG)
await self.recovery_image_urls(product_name, deleted_imgs)
# 수정 후 저장
self.logger.log('상품 세부사항 저장 중...', level=logging.INFO)
await self.save_product_edit()
async def edit_option(self, product_name):
# 상세페이지 탭 클릭
await self.click_option_tab()
# 옵션 최대선택갯수
max_option_count = self.toggle_states.get('max_option_count', 0)
self.current_options_info = await self.optionHandler.process_options(product_name, self.forbidden_word_manager, max_option_count, self.toggle_states)
# 수정 후 저장
await self.save_product_edit()
async def edit_price(self, title_infos):
# 상세페이지 탭 클릭
await self.click_price_tab()
# 가격 수정 프로세스
await self.priceHandler.process_price(title_infos=title_infos)
# 수정 후 저장
await self.save_product_edit()
async def edit_thumb(self):
# 상세페이지 탭 클릭
await self.click_thumb_tab()
# 가격 수정 프로세스
await self.thumbnailHandler.process_thumbnails()
# 수정 후 저장
await self.save_product_edit()
async def edit_tags(self, title_infos):
# 상세페이지 탭 클릭
await self.click_tags_tab()
# 가격 수정 프로세스
await self.tagsHandler.process_tags(self.forbidden_word_manager, title_infos=title_infos)
# 수정 후 저장
await self.save_product_edit()
# def start_browser_task(self):
# """브라우저 시작을 위한 스레드 시작"""
# self.start() # QThread의 start() 호출로 run()을 실행
# def run(self):
# """QThread의 run 메서드에서 이벤트 루프를 생성하고 비동기 메서드 실행"""
# self.loop = asyncio.new_event_loop() # 새로운 이벤트 루프 생성
# asyncio.set_event_loop(self.loop) # 이 스레드에서 사용할 이벤트 루프로 설정
# self.loop.run_forever() # 스레드 종료 시까지 이벤트 루프 실행 유지
def run(self):
"""QThread의 run 메서드에서 이벤트 루프 생성 및 실행"""
self.logger.log("run() - 이벤트 루프 초기화 시작", level=logging.DEBUG)
# 이벤트 루프가 없거나 종료된 경우에만 초기화
if not self.loop or self.loop.is_closed():
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.logger.log("run() - 이벤트 루프가 생성되었습니다.", level=logging.DEBUG)
# 이벤트 루프를 유지하여 외부에서 작업 추가 가능
self.loop.run_forever()
def start_browser_task(self):
"""이벤트 루프에서 브라우저 초기화 작업 추가"""
self.logger.log("start_browser_task - 브라우저 초기화 작업 시작", level=logging.DEBUG)
# 실행 중인 이벤트 루프에 비동기 작업 추가
if self.loop and not self.loop.is_closed():
asyncio.run_coroutine_threadsafe(self.start_browser_async(), self.loop)
self.browser_task_started = True # 작업 실행 플래그 설정
self.logger.log("start_browser_task - 비동기 작업이 추가되었습니다.", level=logging.DEBUG)
else:
self.logger.log("start_browser_task - 실행 중인 이벤트 루프가 없습니다.", level=logging.ERROR)
def start_PercentyJob_task(self):
"""번역 작업을 이벤트 루프에서 실행"""
# 이벤트 루프가 없거나 닫혀 있으면 새로 생성
# self.initialize_event_loop()
if self.loop and not self.loop.is_closed():
# 이미 실행 중인 이벤트 루프에 번역 작업 추가
asyncio.run_coroutine_threadsafe(self.start_Percenty_task(), self.loop)
else:
self.logger.log("이벤트 루프가 초기화되지 않았거나 이미 종료되었습니다.", level=logging.ERROR)
def initialize_event_loop(self):
"""이벤트 루프가 없거나 닫힌 경우 새로 생성"""
if not self.loop or self.loop.is_closed():
self.logger.log("initialize_event_loop - 새로운 이벤트 루프 생성 중", level=logging.DEBUG)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.logger.log("initialize_event_loop - 이벤트 루프가 생성되었습니다.", level=logging.DEBUG)
else:
self.logger.log("initialize_event_loop - 기존 이벤트 루프 사용 중", level=logging.DEBUG)
def stop(self):
"""Playwright를 안전하게 종료"""
try:
# 비동기 함수 호출을 동기식으로 처리
asyncio.run(self._stop_playwright())
except RuntimeError as e:
self.logger.log(f"Playwright 종료 중 오류 발생: {e}", level=logging.ERROR)
async def _stop_playwright(self):
"""스레드 종료 시 Playwright 리소스 정리 및 이벤트 루프 종료"""
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
if self.loop and not self.loop.is_closed():
self.loop.call_soon_threadsafe(self.loop.stop) # 이벤트 루프 종료
if self.whale_translator:
self.logger.log("Whale 브라우저 창 닫기 시도 중...", level=logging.INFO)
self.whale_translator.close_trans_browser()
def force_terminate_browser(self):
"""Playwright 브라우저 프로세스를 강제로 종료"""
try:
if self.browser:
browser_pid = self.browser.contexts[0]._channel.owner._impl_obj._browser_pid
browser_process = psutil.Process(browser_pid)
for child in browser_process.children(recursive=True):
child.kill()
browser_process.kill()
self.logger.log("브라우저 프로세스를 강제로 종료했습니다.", level=logging.WARNING)
if self.whale_translator:
self.whale_translator.close_trans_browser()
except Exception as e:
self.logger.log(f"브라우저 프로세스 강제 종료 중 오류 발생: {e}", level=logging.ERROR)
def terminate(self):
self.logger.log("크롬 스레드 종료", level=logging.INFO)
self.cleanup() # 종료 시 추가 정리 작업 호출
if self.whale_translator:
self.whale_translator.close_trans_browser()
super().terminate()
def cleanup(self):
pass
def check_pause(self):
"""일시 정지 상태라면 재개될 때까지 대기"""
self.pause_mutex.lock()
if self.is_paused:
self.pause_condition.wait(self.pause_mutex)
self.pause_mutex.unlock()
def pause(self):
"""스레드 일시 정지"""
self.pause_mutex.lock()
self.is_paused = True
self.pause_mutex.unlock()
self.logger.log("스레드가 일시 정지되었습니다.", level=logging.DEBUG)
def resume(self):
"""스레드 재개"""
self.pause_mutex.lock()
self.is_paused = False
self.pause_condition.wakeAll()
self.pause_mutex.unlock()
self.logger.log("스레드가 재개되었습니다.", level=logging.DEBUG)