AutoPercenty3/browser_control.py

2527 lines
132 KiB
Python

from playwright.async_api import async_playwright, TimeoutError
from PySide6.QtCore import QThread, Signal, QMutex, QWaitCondition
import re
# import pyautogui
import time
import win32gui, win32con
from bs4 import BeautifulSoup
import asyncio
import os, sys, random
import requests
from PIL import Image
from io import BytesIO
import json
import gc
import psutil
# from whale_new import WhaleTranslator
# from src.wh_search import WhaleSearchParser
# from src.wh_img_trans import WhaleImageTranslator
from src.whale_new import WhaleTranslator
from clipboardImageManager import ClipboardImageManager
from option import OptionHandler
from price import PriceHandler
from titleGenerator import TitleGenerator
from titleManager.sp_ForbiddenM import SupabaseForbiddenWordManager
from src.ppocr.ocr import ChineseImageOCRProcessor
from tags import TagsHandler
from titleManager.gpt_client import GPTClient
from thumb import ThumbnailHandler
import tempfile
import logging
class BrowserController(QThread):
# 브라우저 시작 시그널 정의
browser_started = Signal()
browser_error = Signal(str)
# 번역 작업 시작, 완료, 오류 시그널 정의
translation_started = Signal()
translation_completed = Signal(int)
translation_error = Signal(str)
start_stage_signal = Signal(int)
complete_stage_signal = Signal(int)
total_progressbar_signal = Signal(int, int) # (현재 값, 총 값)
update_detail_progress_signal = Signal(int, int) # (현재 값, 총 값)
set_progress_visible_signal = Signal(bool) # ProgressBar 표시/숨김
percentyJob_button_Enable = Signal(bool)
selected_group_name_signal = Signal(str) # 선택된 그룹 이름 시그널
def __init__(self, app, logger, locator_manager, price_setting_diag, detail_text_widget, login_infos, toggle_states, user_id, supabase_manager):
super().__init__()
self.app = app # AutoPercentyGUI 객체 저장
self.logger = logger
self.log_files = ["appTranslator.log", "appTranslator.log.1", "appTranslator.log.2", "appTranslator.log.3", "appTranslator.log.4", "appTranslator.log.5"]
self.locator_manager = locator_manager
self.price_setting_diag = price_setting_diag
self.detail_text_widget = detail_text_widget
self.toggle_states = toggle_states
self.login_infos = login_infos
self.user_id = user_id
self.supabase_manager = supabase_manager
self.parsing_page = None
self.current_options_info = None
self.chrome_hwnd = None
# 임시 디렉토리 생성
self.temp_dir = tempfile.mkdtemp() # 임시 디렉토리 생성
self.whale_translator = WhaleTranslator(logger, debug_flag=self.toggle_states['debug_mode'])
self.playwright = None
self.browser = None
self.page = None
self.loop = None # 이벤트 루프를 저장할 변수
# 일시 정지 기능을 위한 QMutex와 QWaitCondition 설정
self.pause_mutex = QMutex()
self.pause_condition = QWaitCondition()
self.is_paused = False # 일시 정지 상태 플래그
self.gpt_client = GPTClient(logger=self.logger)
self.forbidden_word_manager = SupabaseForbiddenWordManager(self.logger, self.supabase_manager, self.user_id)
self.clipboardImageManager = ClipboardImageManager(logger=self.logger, watermark_font_size=36, debug_flag=self.toggle_states['debug_mode'])
self.optionHandler = OptionHandler(self.locator_manager, self, self.whale_translator, self.clipboardImageManager, self.logger, self.gpt_client, self.update_detail_progress_signal, self.set_progress_visible_signal, debug_flag=self.toggle_states['debug_mode'])
self.priceHandler = PriceHandler(self.locator_manager, self, self.logger, self.optionHandler, self.price_setting_diag, debug_flag=self.toggle_states['debug_mode'])
self.thumbnailHandler = ThumbnailHandler(self.locator_manager, self, self.logger, self.whale_translator, self.clipboardImageManager, self.toggle_states, self.update_detail_progress_signal, self.set_progress_visible_signal)
self.titleGenerator = TitleGenerator(self.locator_manager, self, self.logger, self.whale_translator, self.toggle_states, self.gpt_client, self.forbidden_word_manager, self.user_id, self.supabase_manager)
self.tagsHandler = TagsHandler(self.locator_manager, self, self.logger, self.toggle_states)
# self.trans_browser = WhaleImageTranslator(logger)
# BrowserController에 해당하는 모든 locator를 정의
self.chrome_window_name = self.locator_manager.get_locator('BrowserControl', 'chrome_window_name')
self.login_email_locator = self.locator_manager.get_locator('BrowserControl', 'login_email_locator')
self.login_password_locator = self.locator_manager.get_locator('BrowserControl', 'login_password_locator')
self.login_button_locator = self.locator_manager.get_locator('BrowserControl', 'login_button_locator')
self.admin_toggle_locator = self.locator_manager.get_locator('BrowserControl', 'admin_toggle_locator')
self.staff_id_locator = self.locator_manager.get_locator('BrowserControl', 'staff_id_locator')
self.staff_login_button_locator = self.locator_manager.get_locator('BrowserControl', 'staff_login_button_locator')
self.close_ad_dialog_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_dialog_locator')
self.close_ad_button_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_button_locator')
self.total_product_count_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_locator')
self.total_product_count_for_registed_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_for_registed_locator')
self.items_per_page_locator = self.locator_manager.get_locator('BrowserControl', 'items_per_page_locator')
self.product_edit_dialog_open_locator = self.locator_manager.get_locator('BrowserControl', 'product_edit_dialog_open_locator')
self.product_dialog_close_btn = self.locator_manager.get_locator('BrowserControl', 'product_dialog_close_btn')
self.product_parent_locator= self.locator_manager.get_locator('BrowserControl', 'product_parent_locator')
self.product_name_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_name_inner_locator')
self.product_price_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_price_inner_locator')
self.product_image_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_image_inner_locator')
self.product_name_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_name_for_ed_template')
self.product_price_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_price_for_ed_template')
self.product_image_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_image_for_ed_template')
self.current_page = self.locator_manager.get_locator('BrowserControl', 'current_page')
self.next_page_button_template = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template')
self.new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
self.registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
self.current_page_locator = self.locator_manager.get_locator('BrowserControl', 'current_page_locator')
self.source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
self.ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
self.cke_text_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'cke_text_editing_area_locator')
self.cke_img_file_input_locator = self.locator_manager.get_locator('BrowserControl', 'cke_img_file_input_locator')
self.option_input_field_locator = self.locator_manager.get_locator('BrowserControl', 'option_input_field_locator')
self.title_tab_locator = self.locator_manager.get_locator('BrowserControl', 'title_tab_locator')
self.option_tab_locator = self.locator_manager.get_locator('BrowserControl', 'option_tab_locator')
self.price_tab_locator = self.locator_manager.get_locator('BrowserControl', 'price_tab_locator')
self.tag_tab_locator = self.locator_manager.get_locator('BrowserControl', 'tag_tab_locator')
self.thumb_tab_locator = self.locator_manager.get_locator('BrowserControl', 'thumb_tab_locator')
self.detail_tab_locator = self.locator_manager.get_locator('BrowserControl', 'detail_tab_locator')
self.upload_tab_locator = self.locator_manager.get_locator('BrowserControl', 'upload_tab_locator')
self.save_button_locator = self.locator_manager.get_locator('BrowserControl', 'save_button_locator')
self.group_dropdown_locator = self.locator_manager.get_locator('BrowserControl', 'group_dropdown_locator')
self.group_index_template = self.locator_manager.get_locator('BrowserControl', 'group_index_template')
self.dropdown_openstatus_locator = self.locator_manager.get_locator('BrowserControl', 'dropdown_openstatus_locator')
self.selected_group_name_locator = self.locator_manager.get_locator('BrowserControl', 'selected_group_name_locator')
self.product_edit_buttons = self.locator_manager.get_locator('BrowserControl', 'product_edit_buttons')
self.product_memo_buttons = self.locator_manager.get_locator('BrowserControl', 'product_memo_buttons')
self.memo_input_locator = self.locator_manager.get_locator('BrowserControl', 'memo_input_locator')
self.memo_save_btn_locator = self.locator_manager.get_locator('BrowserControl', 'memo_save_btn_locator')
# self.text_templates = self.locator_manager.selectors.get('DetailPageTextTemplates', {})
# # 스레드 종료 시 close_whale_window_if_exists 호출
# self.finished.connect(self.cleanup)
def get_page(self):
return self.page
def get_parsing_page(self):
return self.parsing_page
def get_whale(self):
return self.whale_translator
# async def monitor_browser_logs(self, page):
# """브라우저 콘솔 로그를 Python으로 캡처하여 출력"""
# page.on("console", lambda msg: self.logger.info(f"JS 콘솔 로그: {msg.type}: {msg.text}"))
# async def monitor_browser_logs(self, page):
# """브라우저 콘솔 로그를 Python으로 캡처하여 출력"""
# level_map = {
# "log": logging.INFO,
# "info": logging.INFO,
# "warning": logging.WARNING,
# "error": logging.ERROR,
# "debug": logging.DEBUG,
# }
# page.on(
# "console",
# lambda msg: self.logger.log(
# f"JS 콘솔 로그: {msg.type}: {msg.text}",
# level=level_map.get(msg.type, logging.INFO)
# )
# )
def get_base_dir(self):
"""
실행 환경에 따라 base_dir을 설정하는 메서드.
cx_Freeze로 패키징된 경우 실행 파일의 경로, 일반 Python 환경일 경우 __file__을 기준으로 설정.
"""
if getattr(sys, 'frozen', False): # 패키징된 경우
base_dir = os.path.dirname(sys.executable)
internal_dir = os.path.join(base_dir, 'lib', 'src') # lib 디렉토리 포함
if os.path.exists(internal_dir): # lib 디렉토리가 존재하면 base_dir로 설정
return internal_dir
else: # 일반 Python 실행 환경
base_dir = os.path.dirname(os.path.abspath(__file__))
debug_dir = os.path.join(base_dir, 'src') # lib 디렉토리 포함
return debug_dir
async def start_browser_async(self):
"""비동기 Playwright 초기화 및 로그인 수행"""
try:
self.logger.log('크롬 브라우저를 실행합니다...', level=logging.DEBUG)
# WhaleTranslator 필요 여부 확인 및 초기화
optionIMGTrans_status = self.toggle_states.get('optionIMGTrans', False)
detail_IMGTrans_status = self.toggle_states.get('detail_IMGTrans', False)
thumb_status = self.toggle_states.get('thumb', False)
debug_mode = self.toggle_states.get('debug_mode', False)
self.logger.log(f"optionIMGTrans_status: {optionIMGTrans_status}, detail_IMGTrans_status: {detail_IMGTrans_status}, thumb_status: {thumb_status}", level=logging.DEBUG)
# if optionIMGTrans_status or detail_IMGTrans_status or thumb_status:
# self.logger.log('이미지번역을 위해 trans_browser를 로드합니다...', level=logging.DEBUG)
# self.whale_translator = WhaleTranslator(self.logger)
# self.whale_translator.start_whale_browser()
# Playwright 시작 및 브라우저 실행
self.playwright = await async_playwright().start()
base_path = self.get_base_dir()
self.logger.log(f"base_path: {base_path}", level=logging.DEBUG)
browser_path = os.path.join(base_path, 'browsers', 'chromium-1140', 'chrome-win','chrome.exe')
extension_path = os.path.join(base_path, 'browsers', 'extensions', '1.1.100_0')
user_data_dir = os.path.join(base_path, 'browsers', 'user_data')
self.logger.log(f"브라우저 경로: {browser_path}", level=logging.DEBUG)
self.logger.log(f"확장 프로그램 경로: {extension_path}", level=logging.DEBUG)
self.logger.log(f"사용자 폴더 경로: {user_data_dir}", level=logging.DEBUG)
self.video_dir = os.path.join(base_path, "recorded_videos")
if not os.path.exists(self.video_dir):
os.makedirs(self.video_dir)
self.logger.log(f"동영상 저장 폴더 생성됨: {self.video_dir}", level=logging.DEBUG)
if not os.path.exists(browser_path):
self.logger.log(f"브라우저 실행 파일이 없습니다: {browser_path}", level=logging.DEBUG)
raise FileNotFoundError(f"브라우저 실행 파일이 없습니다: {browser_path}")
# 사용자 데이터 디렉토리가 존재하지 않으면 생성
if not os.path.exists(user_data_dir):
os.makedirs(user_data_dir)
self.logger.log(f"{user_data_dir} 디렉토리가 생성되었습니다.", level=logging.DEBUG)
# User agent 설정
import random
user_agent = random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
])
self.logger.log(f"user_agent: {user_agent}", level=logging.DEBUG)
# 브라우저 시작 및 설정
self.browser = await self.playwright.chromium.launch_persistent_context(
user_data_dir,
headless=not debug_mode,
permissions=["geolocation", "notifications"],
geolocation={"latitude": 37.5665, "longitude": 126.9780},
locale="ko-KR",
args=[
'--disable-popup-blocking',
f'--disable-extensions-except={extension_path}',
f'--load-extension={extension_path}',
'--start-maximized',
'--window-size=1920,1080'
],
executable_path=browser_path,
user_agent=user_agent,
# record_video_dir=self.video_dir, # 동영상 저장 폴더 지정
# record_video_size={"width": 1280, "height": 720}
)
# 기본 페이지가 없을 수 있으므로 새로운 페이지 생성
self.page = await self.browser.new_page()
self.logger.log('새 페이지 로딩 중...', level=logging.INFO)
# await self.monitor_browser_logs(self.page)
# self.logger.log('자바스크립트의 로그를 모니터링하기 위해', level=logging.INFO)
await self.page.goto('https://percenty.co.kr/signin')
self.logger.log('percenty.co.kr/signin 로딩 완료', level=logging.INFO)
# 첫 번째 기본 탭 닫기
if self.browser.pages:
await self.browser.pages[0].close()
# 페이지 제목을 가져와서 창 제목으로 활용
page_title = await self.page.title()
self.logger.log(f'페이지 제목: {page_title}', level=logging.DEBUG)
# # 창 핸들 찾기 (동적으로 얻은 페이지 제목 사용)
# self.chrome_hwnd = self.find_window_by_title(page_title)
# if not self.chrome_hwnd:
# self.logger.log('크롬 창을 찾을 수 없습니다.', level=logging.WARNING)
# else:
# self.logger.log(f'크롬 창 핸들: {self.chrome_hwnd}', level=logging.DEBUG)
await self.login()
await self.close_ad_if_exists_new()
await self.close_ad_if_exists()
id_ed_mode = self.toggle_states.get('ed_mode', False)
if id_ed_mode:
await self.go_to_registered_product_page()
self.logger.log('등록 상품 관리 페이지로 이동 중...', level=logging.INFO)
else:
self.logger.log('신규 상품 등록 페이지로 이동 중...', level=logging.INFO)
await self.go_to_new_product_page()
group_index = self.toggle_states['group_index']
self.logger.log('선택한 그룹 인덱스로 이동', level=logging.INFO)
if group_index:
await self.select_group_index(group_index=group_index)
# 파싱용 브라우저 인스턴스를 별도로 실행 (headless 모드, 실제 브라우저처럼 보이도록 옵션 추가, 시크릿 모드 포함)
self.parsing_browser = await self.playwright.chromium.launch(
executable_path=browser_path, # 추가: self.browser와 같은 경로
headless=False,
args=[
'--disable-blink-features=AutomationControlled', # 자동화 탐지 우회
'--no-sandbox',
'--disable-infobars',
'--disable-dev-shm-usage',
'--disable-gpu',
'--start-minimized',
'--incognito', # 시크릿 모드 활성화
'--window-position=-32000,-32000',
]
)
# 파싱용 컨텍스트는 기본적으로 시크릿 모드 환경이며, 추가 설정으로 실제 브라우저와 유사한 점수를 줄 수 있음.
import random
self.parsing_context = await self.parsing_browser.new_context(
locale="ko-KR",
user_agent=random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
# 다른 UA들...
]),
viewport={"width": 1280, "height": 720},
extra_http_headers={
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7"
}
)
# 파싱용 페이지 생성
self.parsing_page = await self.parsing_context.new_page()
# 각 핸들러에 초기화된 page 객체 전달.
self.titleGenerator.update_page(self.page)
self.titleGenerator.update_parsing_page(self.parsing_page)
self.optionHandler.update_page(self.page)
self.tagsHandler.update_page(self.page)
self.thumbnailHandler.update_page(self.page)
self.priceHandler.update_page(self.page)
self.optionHandler.update_whale()
self.thumbnailHandler.update_whale()
# 신호 전송
self.percentyJob_button_Enable.emit(True)
self.browser_started.emit() # 브라우저 시작 신호
except Exception as e:
self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR, exc_info=True)
self.browser_error.emit(str(e))
# async def start_browser(self):
# """크롬 브라우저 실행 및 페이지 로딩"""
# self.logger.log('크롬 브라우저 실행 중...', level=logging.DEBUG)
# try:
# # Playwright를 수동으로 실행하여 브라우저 유지
# self.playwright = await async_playwright().start()
# # cx_Freeze로 패키징된 경우와 일반 Python 실행 환경 구분하여 경로 설정
# if getattr(sys, 'frozen', False):
# browser_path = os.path.join(os.path.dirname(sys.executable), 'browsers', 'chromium-1112', 'chrome-win','chrome.exe')
# extension_path = os.path.join(os.path.dirname(sys.executable), 'browsers', 'extensions', '1.1.100_0')
# user_data_dir = os.path.join(os.path.dirname(sys.executable), 'browsers', 'user_data')
# else:
# browser_path = os.path.join(os.path.dirname(__file__), 'browsers', 'chromium-1112', 'chrome-win','chrome.exe')
# extension_path = os.path.join(os.path.dirname(__file__), 'browsers', 'extensions', '1.1.100_0')
# user_data_dir = os.path.join(os.path.dirname(__file__), 'browsers', 'user_data')
# self.logger.log(f"브라우저 경로: {browser_path}", level=logging.DEBUG)
# self.logger.log(f"확장 프로그램 경로: {extension_path}", level=logging.DEBUG)
# self.logger.log(f"사용자 폴더 경로: {user_data_dir}", level=logging.DEBUG)
# # 사용자 데이터 디렉토리가 존재하지 않으면 생성
# if not os.path.exists(user_data_dir):
# os.makedirs(user_data_dir)
# self.logger.log(f"{user_data_dir} 디렉토리가 생성되었습니다.", level=logging.DEBUG)
# # User agent 설정
# user_agent = random.choice([
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
# "Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
# ])
# self.logger.log(f"user_agent: {user_agent}", level=logging.DEBUG)
# # 브라우저 시작 및 설정
# self.browser = await self.playwright.chromium.launch_persistent_context(
# user_data_dir,
# headless=False,
# permissions=["geolocation", "notifications"],
# geolocation={"latitude": 37.5665, "longitude": 126.9780},
# locale="ko-KR",
# args=[
# '--disable-popup-blocking',
# f'--disable-extensions-except={extension_path}',
# f'--load-extension={extension_path}',
# '--start-maximized',
# '--window-size=1920,1080'
# ],
# executable_path=browser_path,
# user_agent=user_agent
# )
# # 기본 페이지가 없을 수 있으므로 새로운 페이지 생성
# self.page = await self.browser.new_page()
# self.logger.log('새 페이지 로딩 중...', level=logging.INFO)
# await self.page.goto('https://percenty.co.kr/signin')
# self.logger.log('percenty.co.kr/signin 로딩 완료', level=logging.INFO)
# # 첫 번째 기본 탭 닫기
# if self.browser.pages:
# await self.browser.pages[0].close()
# # 페이지 제목을 가져와서 창 제목으로 활용
# page_title = await self.page.title()
# self.logger.log(f'페이지 제목: {page_title}', level=logging.DEBUG)
# # 창 핸들 찾기 (동적으로 얻은 페이지 제목 사용)
# self.chrome_hwnd = self.find_window_by_title(page_title)
# if not self.chrome_hwnd:
# self.logger.log('크롬 창을 찾을 수 없습니다.', level=logging.WARNING)
# else:
# self.logger.log(f'크롬 창 핸들: {self.chrome_hwnd}', level=logging.DEBUG)
# await self.login()
# await self.close_ad_if_exists()
# if self.toggle_states['ed_mode']:
# await self.go_to_registered_product_page()
# self.logger.log('등록 상품 관리 페이지로 이동 중...', level=logging.INFO)
# else:
# self.logger.log('신규 상품 등록 페이지로 이동 중...', level=logging.INFO)
# await self.go_to_new_product_page()
# self.browser_started.emit() # 브라우저 시작 신호
# except Exception as e:
# self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR)
# self.browser_error.emit(str(e))
async def login(self):
"""로그인 처리"""
is_admin = self.login_infos['is_admin']
self.logger.log(f'로그인 시도 중: {"관리자" if is_admin else "직원"} 계정', level=logging.INFO)
if is_admin:
# 관리자 로그인 처리
await self.page.fill(self.login_email_locator, self.login_infos['admin_id'])
await self.page.fill(self.login_password_locator, self.login_infos['admin_pw'])
await self.page.click(self.login_button_locator)
else:
# 관리자 토글 버튼을 클릭해서 직원 로그인 화면 활성화
admin_toggle = self.page.locator(self.admin_toggle_locator)
if await admin_toggle.get_attribute("aria-checked") == "true":
await admin_toggle.click() # 관리자 모드에서 직원 모드로 전환
await self.page.fill(self.login_email_locator, self.login_infos['admin_id'])
await self.page.fill(self.staff_id_locator, self.login_infos['user_id'])
await self.page.fill(self.login_password_locator, self.login_infos['user_pw'])
await self.page.click(self.staff_login_button_locator)
self.logger.log(f'로그인 완료: {"관리자" if is_admin else "직원"} 계정', level=logging.INFO)
# await self.page.wait_for_load_state('networkidle', timeout=10000)
async def close_browser(self):
"""브라우저 종료"""
if self.browser:
await self.browser.close()
await self.playwright.stop()
self.cleanup()
self.logger.log('브라우저 종료됨.', level=logging.INFO)
# def find_window_by_title(self, window_name):
# """창 제목을 통해 핸들을 찾는 메서드"""
# def enum_windows_callback(hwnd, result):
# if win32gui.IsWindowVisible(hwnd) and window_name in win32gui.GetWindowText(hwnd):
# result.append(hwnd)
# result = []
# win32gui.EnumWindows(enum_windows_callback, result)
# return result[0] if result else None
# def switch_to_chrome(self):
# """크롬으로 포커스 전환"""
# if self.chrome_hwnd:
# win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE)
# win32gui.SetForegroundWindow(self.chrome_hwnd)
# self.logger.log('크롬 창으로 포커스 이동.', level=logging.DEBUG)
# else:
# self.logger.log('크롬 창을 찾을 수 없습니다.', level=logging.ERROR)
async def get_total_product_count(self):
total_count = 0
items_per_page = 0
try:
# total_count_elements = await self.page.query_selector_all(".sc-dOvA-dm.jqRNYf")
total_count_element = await self.page.query_selector(self.total_product_count_locator)
items_per_page_element = await self.page.query_selector(self.items_per_page_locator)
self.logger.log(f"total_count_element : {total_count_element}", level=logging.DEBUG)
if total_count_element:
total_count_text = await total_count_element.inner_text()
if "" in total_count_text and "개 상품" in total_count_text:
total_count = int(''.join(re.findall(r'\d+', total_count_text)))
self.logger.log(f"총 상품수 확인: {total_count}", level=logging.INFO)
# 페이지당 상품 수 추출
if items_per_page_element:
items_per_page_text = await items_per_page_element.get_attribute("title")
if items_per_page_text and "개씩 보기" in items_per_page_text:
items_per_page = int(''.join(re.findall(r'\d+', items_per_page_text)))
self.logger.log(f"페이지당 상품수 확인: {items_per_page} 개씩 보기", level=logging.INFO)
# 결과 반환
if total_count:
return {"total_count": total_count, "items_per_page": items_per_page}
return {"total_count": 0, "items_per_page": 0}
except Exception as e:
self.logger.log(f"상품 수를 가져오는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return {"total_count": 0, "items_per_page": 0}
def fetch_image_urls(self, html_content):
"""
HTML 콘텐츠에서 모든 <img> 태그의 URL을 추출하는 함수.
<figure> 안의 <img> 태그와 독립된 <img> 태그 모두 처리.
"""
soup = BeautifulSoup(html_content, 'html.parser')
# 모든 <img> 태그를 찾기
image_urls = []
img_tags = soup.find_all('img')
for img in img_tags:
# img 태그에서 src 속성 추출
if 'src' in img.attrs:
image_url = img['src']
image_urls.append(image_url)
self.logger.log(f"fetch_image_urls 에서 추출한 이미지URL 갯수 : {len(image_urls)}", level=logging.DEBUG)
self.logger.log(f"fetch_image_urls 에서 추출한 이미지URL 목록 : {image_urls}", level=logging.DEBUG)
return image_urls
async def close_ad_if_exists(self):
"""ESC 키를 두 번 전송하여 다이얼로그를 닫는 메서드"""
try:
# ESC 키를 한 번 전송
time.sleep(2)
await self.page.keyboard.press("Escape")
# 잠시 대기 (필요시, 다이얼로그 애니메이션 등 고려)
await self.page.wait_for_timeout(100)
await self.page.dispatch_event("body", "keydown", {
"key": "Escape",
"code": "Escape",
"keyCode": 27,
"which": 27
})
await self.page.wait_for_timeout(50)
await self.page.dispatch_event("body", "keyup", {
"key": "Escape",
"code": "Escape",
"keyCode": 27,
"which": 27
})
await self.page.wait_for_timeout(100)
# ESC 키를 두 번째 전송
esc_script = """
() => {
const dispatchEsc = () => {
const eventDown = new KeyboardEvent('keydown', {
key: 'Escape',
code: 'Escape',
keyCode: 27,
which: 27,
bubbles: true
});
document.dispatchEvent(eventDown);
const eventUp = new KeyboardEvent('keyup', {
key: 'Escape',
code: 'Escape',
keyCode: 27,
which: 27,
bubbles: true
});
document.dispatchEvent(eventUp);
};
dispatchEsc();
}
"""
# 첫 번째 ESC 전송
await self.page.evaluate(esc_script)
self.logger.log("ESC 키를 두 번 전송하여 다이얼로그를 닫았습니다.", level=logging.INFO)
except Exception as e:
self.logger.log(f"ESC 키 전송 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
async def close_ad_if_exists_new(self):
"""
다이얼로그가 존재하면 해당 다이얼로그 내부의 "닫기" 버튼을 클릭하고,
다이얼로그를 찾지 못하거나 내부에 닫기 버튼이 없으면 페이지 전체에서 "닫기" 버튼을 검색하여 클릭하는 메서드
"""
try:
# 다이얼로그가 나타날 때까지 최대 3초 대기
await self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=3000, state='visible')
self.logger.log("다이얼로그가 발견되었습니다. 내부의 닫기 버튼을 찾습니다.", level=logging.INFO)
# 다이얼로그 엘리먼트를 먼저 가져옴
dialog_element = await self.page.query_selector(self.close_ad_dialog_locator)
if dialog_element:
# 다이얼로그 내부에서 "닫기" 텍스트를 가진 버튼을 xpath로 찾음
close_button = await dialog_element.query_selector("xpath=.//button[span[text()='닫기']]")
if close_button:
await close_button.click()
self.logger.log("다이얼로그 내부의 닫기 버튼을 클릭하여 닫았습니다.", level=logging.INFO)
return # 닫기 성공 시 함수 종료
else:
self.logger.log("다이얼로그 내부에서 닫기 버튼을 찾지 못했습니다.", level=logging.WARNING)
else:
self.logger.log("대기 후에도 다이얼로그 엘리먼트를 찾지 못했습니다.", level=logging.WARNING)
except TimeoutError:
self.logger.log("다이얼로그가 나타나지 않았습니다.", level=logging.INFO)
except Exception as e:
self.logger.log(f"다이얼로그 닫기 시도 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
# 다이얼로그에서 닫기 버튼을 찾지 못한 경우, 페이지 전체에서 "닫기" 버튼을 검색
self.logger.log("페이지 전체에서 '닫기' 버튼을 검색합니다.", level=logging.INFO)
try:
global_close_button = await self.page.query_selector("xpath=//button[span[text()='닫기']]")
if global_close_button:
await global_close_button.click()
self.logger.log("페이지 전체에서 '닫기' 버튼을 클릭하였습니다.", level=logging.INFO)
else:
self.logger.log("페이지 전체에서 '닫기' 버튼을 찾지 못했습니다.", level=logging.WARNING)
except Exception as e:
self.logger.log(f"글로벌 '닫기' 버튼 검색 및 클릭 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
async def close_ad_if_exists_ori(self):
"""광고 다이얼로그가 있으면 닫기 버튼을 클릭하는 메서드"""
try:
# 광고 다이얼로그가 나타날 때까지 기다림
await self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=3000, state='visible')
self.logger.log("다이얼로그가 발견되었습니다. 닫기 버튼을 클릭합니다.", level=logging.INFO)
# 닫기 버튼 클릭
close_button = await self.page.query_selector(self.close_ad_button_locator)
if close_button:
await close_button.click()
self.logger.log("다이얼로그를 성공적으로 닫았습니다.", level=logging.INFO)
else:
self.logger.log("닫기 버튼을 찾지 못했습니다.", level=logging.WARNING)
except TimeoutError:
# 다이얼로그가 없을 때: info 수준의 로그로 기록
self.logger.log("다이얼로그가 발견되지 않았습니다. 타임아웃이 발생했습니다.", level=logging.INFO)
except Exception as e:
# 다른 예외 상황 발생 시 error로 기록
self.logger.log(f"다이얼로그 닫기 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
async def go_to_new_product_page(self):
"""신규 상품 등록 페이지로 이동"""
try:
new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
await self.page.click(new_product_page_locator)
self.logger.log("신규 상품 등록 페이지로 이동 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"신규 상품 등록 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def go_to_registered_product_page(self):
"""신규 상품 등록 페이지로 이동"""
try:
registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
await self.page.click(registered_product_page_locator)
self.logger.log("등록 상품 관리 페이지로 이동 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"등록 상품 관리 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def is_button_disabled(self, button):
"""버튼이 disabled 상태인지 확인"""
try:
# 버튼의 disabled 속성 확인
is_disabled = await button.get_attribute('disabled')
return is_disabled is not None # disabled 속성이 있으면 True 반환
except Exception as e:
self.logger.log(f"상품 수정 버튼 상태 확인 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return False # 오류 발생 시 기본적으로 활성화된 것으로 처리
async def select_group_index_ori(self, group_index: int):
"""그룹 드롭다운 열고 옵션 선택"""
try:
self.logger.log(f"group_index : {group_index}", level=logging.INFO)
await self.page.evaluate("""
const targetElement = Array.from(document.querySelectorAll('span')).find(el => el.textContent.trim() === '수집 상품 목록');
if (targetElement) {
targetElement.scrollIntoView({ behavior: 'smooth', block: 'center' });
}
""")
self.logger.log(f"그룹박스 요소로 스크롤", level=logging.INFO)
await asyncio.sleep(1)
# await self.page.wait_for_load_state("networkidle")
group_option_locator = self.group_index_template.format(index=group_index)
# 드롭다운 열기
await self.page.wait_for_selector(self.group_dropdown_locator, timeout=3000, state='visible')
await self.page.click(self.group_dropdown_locator, timeout=3000, force=True)
self.logger.log("드롭다운을 성공적으로 클릭했습니다.", level=logging.INFO)
# 드롭다운 열림 상태 확인
await self.page.wait_for_selector(self.dropdown_openstatus_locator, timeout=3000)
self.logger.log("드롭다운이 열렸습니다.", level=logging.INFO)
# 옵션 선택
# await self.page.wait_for_selector(group_option_locator, timeout=3000)
await self.page.click(group_option_locator, timeout=3000)
self.logger.log(f"[{group_index}]번 그룹 선택 완료", level=logging.INFO)
selected_group_name = await self.page.inner_text(self.selected_group_name_locator)
self.logger.log(f"선택된 그룹 이름 : [{selected_group_name}]", level=logging.INFO)
# 선택된 그룹 이름을 시그널로 전달
self.selected_group_name_signal.emit(selected_group_name)
except TimeoutError:
self.logger.log("드롭다운 또는 옵션 선택 중 타임아웃이 발생했습니다.", level=logging.WARNING)
except Exception as e:
self.logger.log(f"그룹 선택 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
async def select_group_index(self, group_index: int):
"""그룹 드롭다운 열고 옵션 선택"""
try:
self.logger.log(f"group_index : {group_index}", level=logging.INFO)
await self.page.evaluate("""
const targetElement = Array.from(document.querySelectorAll('span'))
.find(el => el.textContent.trim() === '수집 상품 목록');
if (targetElement) {
targetElement.scrollIntoView({ behavior: 'smooth', block: 'center' });
}
""")
self.logger.log("그룹박스 요소로 스크롤", level=logging.INFO)
await asyncio.sleep(1)
group_option_locator = self.group_index_template.format(index=group_index)
# 드롭다운 열기
await self.page.wait_for_selector(self.group_dropdown_locator, timeout=3000, state='visible')
await self.page.click(self.group_dropdown_locator, timeout=3000, force=True)
self.logger.log("드롭다운을 성공적으로 클릭했습니다.", level=logging.INFO)
# 드롭다운 열림 상태 확인
await self.page.wait_for_selector(self.dropdown_openstatus_locator, timeout=3000)
self.logger.log("드롭다운이 열렸습니다.", level=logging.INFO)
# 옵션 선택
await self.page.click(group_option_locator, timeout=3000)
self.logger.log(f"[{group_index}]번 그룹 선택 완료", level=logging.INFO)
selected_group_name = await self.page.inner_text(self.selected_group_name_locator)
self.logger.log(f"선택된 그룹 이름 : [{selected_group_name}]", level=logging.INFO)
# 선택된 그룹 이름을 시그널로 전달
self.selected_group_name_signal.emit(selected_group_name)
except TimeoutError:
self.logger.log("드롭다운 또는 옵션 선택 중 타임아웃이 발생했습니다.", level=logging.WARNING)
# 그룹 선택 실패 시 빈 문자열 시그널 전달
self.selected_group_name_signal.emit("")
except Exception as e:
self.logger.log(f"그룹 선택 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
self.selected_group_name_signal.emit("")
async def get_product_edit_buttons_by_template(self):
"""현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기"""
# 버튼 선택자 설정
# buttons = self.page.locator(self.product_edit_button_template)
# memos = self.page.locator(self.product_memo_button_template)
"""
현재 페이지의 세부사항 수정 및 업로드 버튼과 메모 버튼을 매칭하여 반환.
각 상품에 대해 해외배송비 수정 버튼과 메모 버튼을 추출합니다.
"""
try:
# 수정 버튼 선택자 설정
# edit_buttons = self.page.locator('//button[span[text()="세부사항 수정 및 업로드"]]')
# memo_buttons = self.page.locator('button.ant-btn.css-1li46mu.ant-btn-default.ant-btn-icon-only')
edit_buttons = self.page.locator(self.product_edit_buttons)
memo_buttons = self.page.locator(self.product_memo_buttons)
# 수정 버튼 개수 확인
edit_button_count = await edit_buttons.count()
if edit_button_count == 0:
self.logger.log("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.", level=logging.WARNING)
return []
# 메모 버튼 개수 확인
memo_button_count = await memo_buttons.count()
if memo_button_count <= 2: # 첫 번째 버튼과 최소 상품 메모 버튼 제외
self.logger.log("메모 버튼이 유효하지 않습니다. 최소 3개 이상의 버튼에서 메모버튼이 유효합니다.", level=logging.WARNING)
return []
# 첫 번째 버튼 제외하고 나머지 버튼을 처리
valid_memo_buttons = [memo_buttons.nth(i) for i in range(1, memo_button_count)]
# 상품별 해외배송비 수정 버튼과 메모 버튼 매칭
product_buttons = []
index = 0 # valid_memo_buttons에서의 현재 인덱스
for i in range(edit_button_count):
if index + 1 >= len(valid_memo_buttons):
self.logger.log("메모 버튼의 개수가 부족합니다. 매칭에 실패했습니다.", level=logging.WARNING)
break
# 상품별 버튼 매칭
product_buttons.append({
"edit_button": edit_buttons.nth(i),
"shipping_button": valid_memo_buttons[index], # 해외배송비 수정 버튼
"memo_button": valid_memo_buttons[index + 1] # 메모 버튼
})
index += 2 # 한 상품에 대해 두 개의 버튼(해외배송비 + 메모 버튼)을 소모
# 최종 결과 로그
self.logger.log(f"현재 페이지의 수정할 상품 개수: {len(product_buttons)}", level=logging.INFO)
return product_buttons
except Exception as e:
self.logger.log(f"상품 수정 버튼을 찾는 중 오류: {e}", level=logging.ERROR, exc_info=True)
return []
async def get_product_edit_buttons_by_template_new(self):
"""
템플릿 방식의 CSS 선택자를 이용해 현재 페이지의 수정 및 메모 버튼을 동적으로 찾습니다.
"""
product_buttons = []
# 예시로, 상품 항목들이 li.product-item 형태로 존재한다고 가정합니다.
product_items = self.page.locator("div#root li.product-item")
count = await product_items.count()
self.logger.log(f"현재 페이지에 {count}개의 상품 항목을 발견했습니다.", level=logging.INFO)
for i in range(1, count + 1):
edit_selector = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template').format(index=i)
memo_selector = self.locator_manager.get_locator('BrowserControl', 'product_memo_button_template').format(index=i)
edit_button = self.page.locator(edit_selector)
memo_button = self.page.locator(memo_selector)
if await edit_button.count() > 0:
product_buttons.append({
"edit_button": edit_button,
"memo_button": memo_button
})
else:
self.logger.log(f"상품 인덱스 {i}에 대해 수정 버튼을 찾지 못했습니다.", level=logging.WARNING)
return product_buttons
async def open_product_edit_dialog(self, button):
"""상품 수정 다이얼로그 열기"""
try:
# 요소가 화면에 없을 경우 스크롤하여 보이도록 함
await button.scroll_into_view_if_needed()
self.logger.log("상품의 '세부사항 수정 및 업로드' 버튼을 화면에 보이도록 스크롤.", level=logging.DEBUG)
await self.page.keyboard.press("Escape")
self.logger.log("이전 다이얼로그 닫기 완료.", level=logging.INFO)
await button.click()
self.logger.log("세부사항 수정 다이얼로그 열기 완료.", level=logging.INFO)
# await self.page.wait_for_selector('div.ant-tabs-nav') # 다이얼로그가 완전히 로딩될 때까지 기다림
await self.page.wait_for_selector(self.product_edit_dialog_open_locator) # 다이얼로그가 완전히 로딩될 때까지 기다림
except Exception as e:
self.logger.log(f"세부사항 수정 다이얼로그 열기 중 오류: {e}", level=logging.ERROR, exc_info=True)
# 에러 발생 시 스크린샷 저장
timestamp = int(time.time())
screenshot_path = os.path.join(self.temp_dir, f"screenshot_error_{timestamp}.png")
await self.page.screenshot(path=screenshot_path)
self.logger.log(f"세부사항 수정 다이얼로그 열기 중 오류 발생으로 스크린샷 저장됨: {screenshot_path}", level=logging.INFO)
async def click_detail_tab(self):
"""상세페이지 탭 클릭"""
try:
await self.page.click(self.detail_tab_locator)
self.logger.log("상세페이지 탭 클릭 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"상세페이지 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def click_option_tab(self):
"""옵션 탭 클릭"""
try:
await self.page.click(self.option_tab_locator)
self.logger.log("옵션 탭 클릭 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"옵션 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def click_price_tab(self):
"""가격 탭 클릭"""
try:
await self.page.click(self.price_tab_locator)
self.logger.log("가격 탭 클릭 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"가격 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def click_thumb_tab(self):
"""썸네일 탭 클릭"""
try:
await self.page.click(self.thumb_tab_locator)
self.logger.log("썸네일 탭 클릭 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"썸네일 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def click_tags_tab(self):
"""키워드 태그 탭 클릭"""
try:
await self.page.click(self.tag_tab_locator)
self.logger.log("키워드 태그 탭 클릭 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"키워드 태그 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def click_title_tab(self):
"""상품명 탭 클릭"""
try:
await self.page.click(self.title_tab_locator)
self.logger.log("상품명 탭 클릭 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"상품명 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
def generate_restored_html(self, urls):
"""이미지 URL 목록을 HTML 형식으로 변환하는 메서드"""
html_content = '<p>&nbsp;</p>'
for url in urls:
html_content += f'<figure class="image"><img src="{url}" style="aspect-ratio:1/1;"></figure>\n'
return html_content
def generate_restored_html(self, urls):
"""이미지 URL 목록을 HTML 형식으로 변환하는 메서드, 각 이미지의 가로세로 비율 추가"""
html_content = '<p>&nbsp;</p>\n'
for url in urls:
width, height = self.get_image_size(url)
aspect_ratio = f"{width}/{height}" if width and height else "1/1"
if width and height:
html_content += (
f'<figure class="image">'
f'<img style="aspect-ratio:{aspect_ratio};" src="{url}" width="{width}" height="{height}">'
f'</figure>\n'
)
else:
# 이미지 크기를 확인할 수 없을 경우 기본 형식으로 추가
html_content += f'<figure class="image"><img src="{url}"></figure>\n'
return html_content
def get_image_size(self, url):
"""이미지 URL로부터 가로와 세로 크기를 가져오는 메서드"""
try:
# URL에서 불필요한 따옴표 제거
cleaned_url = url.strip("'\"")
response = requests.get(cleaned_url, timeout=5)
response.raise_for_status()
image = Image.open(BytesIO(response.content))
return image.width, image.height
except Exception as e:
self.logger.log(f"이미지 크기 확인 실패 - {cleaned_url}: {e}", level=logging.WARNING)
return None, None
# async def extract_image_urls(self, optionHandler, is_option_data=False):
# """상세페이지에서 이미지 URL 추출"""
# try:
# # 소스 편집 모드로 전환
# source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
# ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
# # 소스 편집 모드로 전환
# await self.page.click(source_button_locator)
# self.logger.log("소스 버튼 클릭 완료.", level=logging.DEBUG)
# # 'data-value' 속성 값을 추출 (textarea 요소)
# textarea = await self.page.wait_for_selector(ck_source_editing_area_locator, timeout=5000)
# data_value = await textarea.get_attribute("data-value")
# # HTML 소스에서 이미지 URL 추출
# image_urls = self.fetch_image_urls(data_value)
# self.logger.log(f'추출된 이미지 URL 수: {len(image_urls)}', level=logging.INFO)
# # HTML 소스에서 이미지 URL 삭제
# self.logger.log('img 태그를 삭제 중...', level=logging.DEBUG)
# data_value_element = await self.page.query_selector(ck_source_editing_area_locator)
# new_value = ""
# if data_value_element:
# await self.page.evaluate(f'() => document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", "{new_value}")')
# updated_value = await data_value_element.get_attribute('data-value')
# self.logger.log(f'Updated data-value: {updated_value}', level=logging.DEBUG)
# else:
# self.logger.log('Element with data-value not found.', level=logging.DEBUG)
# self.logger.log('img 태그 삭제 완료.', level=logging.DEBUG)
# # img 태그의 class 삭제 후 다시 소스 버튼 클릭
# await self.page.click(source_button_locator)
# self.logger.log('소스 버튼 재 클릭 완료.', level=logging.DEBUG)
# # 텍스트 입력 필드 선택
# input_field = await self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
# await input_field.press('Enter')
# # # 선두부 텍스트 입력
# # for key in sorted(self.text_templates.keys()):
# # leading_text = self.text_templates[key]
# # if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
# # await input_field.type(leading_text)
# # await input_field.press('Enter')
# # self.logger.log(f"{key} 텍스트 입력 완료: {leading_text}", level=logging.INFO)
# # 선두부 텍스트 입력 (self.detail_text_widget의 get_lines 메서드 사용)
# for leading_text in self.detail_text_widget.get_lines():
# if leading_text: # 내용이 있는 경우에만
# await input_field.type(leading_text)
# await input_field.press('Enter')
# self.logger.log(f"텍스트 입력 완료: {leading_text}", level=logging.INFO)
# option_data = optionHandler.get_selected_translated_options()
# self.logger.log(f'option_data : {option_data}', level=logging.DEBUG)
# # if is_option_data:
# if option_data or option_data != {}:
# self.logger.log('옵션 데이터 입력 시작', level=logging.DEBUG)
# # option_data = {} # option_data 초기화
# is_single = optionHandler.option_info['is_single_option']
# # option_data = optionHandler.get_selected_translated_options()
# # is_single = True # 옵션입력 일단 제외
# # self.logger.log('옵션입력 일단 제외', level=logging.DEBUG)
# self.logger.log('가져온 옵션 데이터', level=logging.DEBUG)
# self.logger.log(f'{option_data}', level=logging.DEBUG)
# # # 옵션 입력 필드 선택
# # input_field = await self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
# # await input_field.press('Enter')
# # # 선두부 텍스트 입력
# # for key in sorted(self.text_templates.keys()):
# # leading_text = self.text_templates[key]
# # if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
# # await input_field.type(leading_text)
# # await input_field.press('Enter')
# # self.logger.log(f"{key} 텍스트 입력 완료: {leading_text}", level=logging.INFO)
# if not is_single:
# self.logger.log('단일옵션이 아니므로 옵션목록을 입력', level=logging.INFO)
# # 각 옵션을 한 줄씩 입력
# await input_field.type("# 옵션 목록")
# await input_field.press('Enter')
# # 첫 번째 옵션의 번역된 옵션명만 입력
# first_key = list(option_data.keys())[0] # key는 옵션이름 value는 가격
# first_value = option_data[first_key] # key는 옵션이름 value는 가격
# await input_field.type(f"- 1. {first_key}")
# await input_field.press('Enter') # 첫 번째 옵션 이후 엔터로 줄바꿈
# # 나머지 옵션도 번역된 옵션명만 입력
# for i, (key, value) in enumerate(list(option_data.items())[1:], start=2):
# await input_field.type(f"{key}") # 2번옵션부터 번호 떼고 입력. key는 옵션이름 value는 가격
# await input_field.press('Enter') # 엔터 키를 입력하여 줄바꿈
# # 목록 끝을 알리기 위해 엔터 두 번 입력
# await input_field.press('Enter')
# await input_field.press('Enter')
# # 후두부 텍스트 입력
# await input_field.type('### 나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.')
# await input_field.press('Enter')
# await input_field.type('---')
# await input_field.press('Enter')
# self.logger.log('옵션 데이터 입력 완료', level=logging.INFO)
# return image_urls
# except Exception as e:
# self.logger.log(f"이미지 URL 추출 & 옵션데이터 입력 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
# return image_urls if image_urls else []
# def paste_image_in_chrome(self, url, is_success_translated, toggle_states, is_watermark=False, watermark_text= ""):
# """크롬으로 포커스를 옮기고 클립보드의 이미지를 붙여넣고 엔터 입력"""
# self.logger.log("크롬으로 포커스를 옮기고 클립보드의 이미지를 붙여넣고 엔터 입력", level=logging.DEBUG)
# try:
# # self.switch_to_chrome() # 크롬으로 포커스 이동
# self.clipboardImageManager.process_clipboard(original_url=url, is_success_translated=is_success_translated, toggle_states=toggle_states) # 클립보드 내용을 처리
# # clipboard_content = pyperclip.paste()
# if self.clipboardImageManager.is_clipboard_image():
# pyautogui.hotkey('ctrl', 'v') # 클립보드 이미지 붙여넣기
# self.logger.log("이미지 붙여넣기 완료.", level=logging.INFO)
# pyautogui.press('right') # 오른쪽 입력
# self.logger.log("이미지 붙여넣기 완료.", level=logging.DEBUG)
# self.clipboardImageManager.clear_clipboard()
# self.logger.log("이미지 붙여넣기 완료로 클립보드 비우기.", level=logging.INFO)
# return True
# else:
# self.logger.log("클립보드가 비어있습니다.", level=logging.WARNING)
# return False
# except Exception as e:
# self.logger.log(f"이미지 붙여넣기 중 오류: {e}", level=logging.ERROR, exc_info=True)
# return False
async def save_and_ecs_product_edit_ori(self):
"""상품 수정 후 저장 버튼 클릭"""
try:
await self.page.click(self.save_button_locator)
await self.page.keyboard.press("Escape")
self.logger.log("상품 수정 내용 저장 및 ECS 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"저장 버튼 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
def reset_detail_trans_state(self):
"""detail_trans에 사용되는 상태를 초기화합니다."""
self.logger.log("BrowserController detail_trans 상태 초기화", level=logging.DEBUG)
# 임시 파일 처리
import os
import glob
try:
# 임시 디렉토리의 모든 파일 삭제
for file_path in glob.glob(os.path.join(self.temp_dir, "*.png")):
try:
os.remove(file_path)
self.logger.log(f"상세페이지 임시 파일 삭제: {file_path}", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"상세페이지 임시 파일 삭제 실패: {file_path}, 오류: {e}", level=logging.WARNING)
except Exception as e:
self.logger.log(f"상세페이지 임시 파일 처리 중 오류 발생: {e}", level=logging.ERROR)
async def save_and_ecs_product_edit(self):
"""상품 수정 후 저장 버튼 클릭 및 ESC 전송, 그리고 다이얼로그 닫힘 버튼 클릭을 통한 다이얼로그 종료 처리"""
try:
# 저장 버튼 클릭
await self.page.click(self.save_button_locator)
self.logger.log("저장 버튼 클릭 완료.", level=logging.INFO)
# 저장 동작 완료를 위한 잠시 대기
await asyncio.sleep(0.5)
# ESC 키를 최대 3회 전송하여 다이얼로그가 닫혔는지 확인
max_attempts = 3
for attempt in range(max_attempts):
await self.page.keyboard.press("Escape")
self.logger.log(f"ESC 키 전송 시도 {attempt+1}/{max_attempts}", level=logging.DEBUG)
await asyncio.sleep(0.5)
if not await self.page.is_visible(self.save_button_locator):
self.logger.log("상품 수정 다이얼로그가 성공적으로 닫혔습니다.", level=logging.INFO)
break
else:
self.logger.log("상품 수정 다이얼로그가 여전히 열려있습니다.", level=logging.WARNING)
else:
# ESC 전송 후에도 다이얼로그가 닫히지 않은 경우, product_dialog_close_btn을 클릭
self.logger.log("ESC 시도 후에도 다이얼로그가 닫히지 않아, 닫힘 버튼 클릭 시도합니다.", level=logging.INFO)
if await self.page.is_visible(self.product_dialog_close_btn):
await self.page.click(self.product_dialog_close_btn)
self.logger.log("닫힘 버튼 클릭 완료.", level=logging.INFO)
else:
self.logger.log("닫힘 버튼을 찾지 못했습니다.", level=logging.ERROR)
except Exception as e:
self.logger.log(f"저장 및 다이얼로그 종료 처리 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
# 에러 발생 시 스크린샷 저장
timestamp = int(time.time())
screenshot_path = os.path.join(self.temp_dir, f"screenshot_error_{timestamp}.png")
await self.page.screenshot(path=screenshot_path)
self.logger.log(f"저장 및 다이얼로그 종료 처리 중 에러 발생 으로 스크린샷 저장됨: {screenshot_path}", level=logging.INFO)
async def save_product_edit(self):
"""상품 수정 후 저장 버튼 클릭"""
try:
await self.page.click(self.save_button_locator)
self.logger.log("상품 수정 내용 저장 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"저장 버튼 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def go_to_next_page_ori(self):
"""다음 페이지로 이동"""
try:
# 현재 페이지가 몇 번째 페이지인지 확인 (클래스에 'ant-pagination-item-active'가 있는 요소)
current_page = await self.page.query_selector(self.current_page_locator)
if not current_page:
self.logger.log("현재 페이지 정보를 찾을 수 없습니다.", level=logging.WARNING)
return False
# 현재 활성화된 페이지 번호를 가져옴
current_page_number = int(await current_page.get_attribute("title"))
self.logger.log(f"현재페이지 : [{current_page_number}]", level=logging.INFO)
next_page_number = current_page_number + 1
# 다음 페이지 버튼을 찾음 (title 속성으로 다음 페이지를 찾음)
next_page_button_locator = self.next_page_button_template.format(page_number=next_page_number)
next_page_button = await self.page.query_selector(next_page_button_locator)
if next_page_button:
await next_page_button.click() # 페이지 버튼 클릭
# await self.page.wait_for_load_state('domcontentloaded') # 페이지 로딩이 완료될 때까지 대기
time.sleep(3)
self.logger.log(f"페이지 {next_page_number}로 이동 완료.", level=logging.INFO)
return True
else:
self.logger.log("다음 페이지가 없습니다.", level=logging.WARNING)
return False
except Exception as e:
self.logger.log(f"다음 페이지로 이동 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return False
async def go_to_next_page(self):
"""
수정된 CSS 선택자를 사용하여 다음 페이지로 이동합니다.
"""
try:
current_page_element = await self.page.query_selector(self.current_page_locator)
if not current_page_element:
self.logger.log("현재 페이지 정보를 찾을 수 없습니다.", level=logging.WARNING)
return False
current_page_number = int(await current_page_element.get_attribute("title"))
self.logger.log(f"현재 페이지 번호: {current_page_number}", level=logging.INFO)
next_page_number = current_page_number + 1
next_page_button_locator = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template').format(page_number=next_page_number)
self.logger.log(f"다음 페이지 선택자: {next_page_button_locator}", level=logging.DEBUG)
next_page_button = await self.page.query_selector(next_page_button_locator)
if next_page_button:
await next_page_button.click()
await self.page.wait_for_load_state('domcontentloaded', timeout=5000)
self.logger.log(f"페이지 {next_page_number}로 이동 완료.", level=logging.INFO)
return True
else:
self.logger.log("다음 페이지가 없습니다.", level=logging.WARNING)
return False
except Exception as e:
self.logger.log(f"다음 페이지로 이동 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return False
def switch_to_chrome(self):
"""크롬으로 포커스 전환"""
try:
if not self.chrome_hwnd:
self.chrome_hwnd = self.find_window_by_title(self.chrome_window_name)
if self.chrome_hwnd:
win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(self.chrome_hwnd)
self.logger.log('크롬 창으로 포커스 이동.', level=logging.DEBUG)
else:
self.logger.log('크롬 창을 찾을 수 없습니다.', level=logging.WARNING)
except Exception as e:
self.logger.log(f"크롬 포커스 전환 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def scroll_with_wheel(self, direction="down", pause_time=0.5, max_scrolls=50):
"""
휠 스크롤을 사용하여 페이지를 위나 아래로 천천히 스크롤.
Parameters:
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
- pause_time: 스크롤 사이의 대기 시간 (초).
- max_scrolls: 최대 스크롤 횟수.
"""
scroll_count = 0
self.logger.log(f"스크롤 시작", level=logging.DEBUG)
# 현재 페이지 높이 가져오기
last_height = await self.page.evaluate("document.body.scrollHeight")
self.logger.log(f"현재 페이지 높이 가져오기 - {last_height}", level=logging.DEBUG)
while scroll_count < max_scrolls:
if direction == "down":
# 아래로 스크롤
self.logger.log(f"scroll_count[{scroll_count}]회 : 휠 아래로 1000px", level=logging.DEBUG)
await self.page.evaluate("window.scrollBy(0, 1000);")
elif direction == "up":
# 위로 스크롤
self.logger.log(f"scroll_count[{scroll_count}]회 : 휠 위로 1000px", level=logging.DEBUG)
await self.page.evaluate("window.scrollBy(0, -1000);")
else:
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
self.logger.log(f"pause_time 슬립 : {pause_time}", level=logging.DEBUG)
await asyncio.sleep(pause_time)
# 새로운 페이지 높이 가져오기
new_height = await self.page.evaluate("document.body.scrollHeight")
self.logger.log(f"새로운 페이지 높이 가져오기 - {new_height}", level=logging.DEBUG)
# 스크롤이 더 이상 필요 없는 경우(페이지 끝에 도달)
if direction == "down" and new_height == last_height:
self.logger.log(f"페이지 끝에 도달했습니다. 스크롤 횟수: {scroll_count}", level=logging.DEBUG)
break
elif direction == "up" and new_height == 0:
self.logger.log(f"페이지 시작에 도달했습니다. 스크롤 횟수: {scroll_count}", level=logging.DEBUG)
break
self.logger.log(f"새로운 페이지 높이를 현재높이로 재설정 - {new_height}", level=logging.DEBUG)
last_height = new_height
scroll_count += 1
self.logger.log(f"스크롤 카운트 + 1", level=logging.DEBUG)
if scroll_count == max_scrolls:
self.logger.log("최대 스크롤 횟수에 도달했습니다.", level=logging.DEBUG)
async def scroll_with_keyboard(self, direction="down", pause_time=0.5, max_scrolls=50):
"""
키보드를 사용하여 페이지를 위나 아래로 천천히 스크롤.
Parameters:
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
- pause_time: 스크롤 사이의 대기 시간 (초).
- max_scrolls: 최대 스크롤 횟수.
"""
scroll_count = 0
while scroll_count < max_scrolls:
if direction == "down":
# 아래로 스크롤 (Page Down 키 사용)
await self.page.keyboard.press("PageDown")
elif direction == "up":
# 위로 스크롤 (Page Up 키 사용)
await self.page.keyboard.press("PageUp")
else:
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
await asyncio.sleep(pause_time)
scroll_count += 1
if scroll_count == max_scrolls:
self.logger.log("최대 스크롤 횟수에 도달했습니다.", level=logging.DEBUG)
async def collect_product_info(self, items_per_page, ed_mode):
"""
상품 정보를 수집하는 메서드
"""
try:
product_infos = []
product_name_elements = [] # product_name_element를 저장할 리스트
# ed_mode에 따라 product_elements 설정
if ed_mode:
# 각 상품의 이름, 가격, 이미지를 위한 선택자 리스트 구성 (index가 2부터 시작)
product_elements = [
{
"name": self.product_name_for_ed_template.format(index=i),
"price": self.product_price_for_ed_template.format(index=i),
"image": self.product_image_for_ed_template.format(index=i)
}
for i in range(2, items_per_page + 2) # index가 2부터 시작하도록 설정
]
else:
# ed_mode=False일 때는 각 상품의 부모 요소를 모두 선택
product_elements = await self.page.query_selector_all(self.product_parent_locator)
for i, element in enumerate(product_elements[:items_per_page], start=1):
try:
if ed_mode:
# ed_mode=True일 때는 각 상품의 개별 선택자 사용
product_name_element = await self.page.wait_for_selector(element["name"], timeout=3000, state="attached")
product_price_element = await self.page.wait_for_selector(element["price"], timeout=3000, state="attached")
product_image_element = await self.page.wait_for_selector(element["image"], timeout=3000, state="attached")
else:
# ed_mode=False일 때 부모 요소 내의 선택자를 사용
product_name_element = await self.page.wait_for_selector(self.product_name_inner_locator, timeout=3000, state="attached")
product_price_element = await self.page.wait_for_selector(self.product_price_inner_locator, timeout=3000, state="attached")
product_image_element = await self.page.wait_for_selector(self.product_image_inner_locator, timeout=3000, state="attached")
# 요소가 존재하면 정보 추출
self.logger.log(f"product_name_element : {product_name_element}", level=logging.DEBUG)
self.logger.log(f"product_price_element : {product_price_element}", level=logging.DEBUG)
self.logger.log(f"product_image_element : {product_image_element}", level=logging.DEBUG)
if product_name_element and product_price_element and product_image_element:
# await의 결과를 각 변수에 저장
product_name_text = (await product_name_element.inner_text()).strip()
product_price_text = (await product_price_element.inner_text()).strip()
product_image_url = await product_image_element.get_attribute('src')
# product_info 딕셔너리에 결과 저장
product_info = {
"name": product_name_text,
"price": product_price_text,
"image_url": product_image_url
}
self.logger.log(f"상품 {i}: {product_info}", level=logging.DEBUG)
product_infos.append(product_info)
product_name_elements.append(product_name_element) # 각 product_name_element 추가
except Exception as e:
self.logger.log(f"상품 {i} 정보 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
continue
return product_infos, product_name_elements # product_infos와 product_name_elements 함께 반환
except Exception as e:
self.logger.log(f"상품 정보 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return []
async def scroll_page_to_bottom(self, pause_time=0.2):
"""페이지의 맨 아래까지 스크롤하여 모든 동적 요소를 로드"""
self.logger.log('페이지 스크롤 시작...', level=logging.INFO)
previous_height = await self.page.evaluate("() => document.body.scrollHeight")
while True:
await self.page.evaluate("window.scrollBy(0, window.innerHeight);") # 한 화면씩 스크롤
await asyncio.sleep(pause_time) # 페이지 로딩 대기
current_height = await self.page.evaluate("() => document.body.scrollHeight")
if current_height == previous_height:
break # 더 이상 스크롤할 내용이 없으면 종료
previous_height = current_height
self.logger.log('페이지 스크롤 완료.', level=logging.INFO)
async def scroll_page_to_top(self, pause_time=0.2):
"""페이지의 맨 위까지 스크롤"""
self.logger.log('페이지 위로 스크롤 시작...', level=logging.INFO)
previous_height = await self.page.evaluate("() => window.pageYOffset")
while previous_height > 0:
await self.page.evaluate("window.scrollBy(0, -window.innerHeight);") # 한 화면씩 위로 스크롤
await asyncio.sleep(pause_time) # 페이지 로딩 대기
current_height = await self.page.evaluate("() => window.pageYOffset")
if current_height == previous_height:
break # 더 이상 스크롤할 내용이 없으면 종료
previous_height = current_height
self.logger.log('페이지 위로 스크롤 완료.', level=logging.INFO)
async def start_Percenty_task(self):
self.logger.log('퍼센티 상품수정 작업을 시작합니다...', level=logging.DEBUG)
self.running = True # 번역 작업이 시작됨
self.translation_started.emit()
try:
# 금지어 목록 업데이트
self.forbidden_word_manager.refresh_forbidden_words()
self.whale_translator.start_trans_browser()
# 1. 총 상품 수 수집
await self.scroll_page_to_bottom() # 동적 로딩을 위해 끝까지 스크롤
# total_products = await self.browser_controller.get_total_product_count(ed_mode=self.toggle_states['ed_mode'])
# get_total_product_count 메서드 호출 후 결과를 딕셔너리로 받음
result = await self.get_total_product_count()
# 딕셔너리에서 총 상품 수와 페이지당 상품 수를 추출
total_products = result.get("total_count", 0)
items_per_page = result.get("items_per_page", 0)
self.logger.log(f"총 상품 수: {total_products}, 페이지당 상품 수: {items_per_page}", level=logging.DEBUG)
self.logger.log(f"[ self.toggle_states 상태 ]\n{self.toggle_states}", level=logging.DEBUG)
if total_products == 0:
self.logger.log('수집할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG)
return
completed_count = 0
page_number = 1
# 3. 총 상품 수만큼 반복 작업 수행
while self.running and completed_count < total_products:
self.logger.log(f'현재 페이지: {page_number}', level=logging.DEBUG)
# if not page_number == 1:
# await self.scroll_page_to_top()
# self.logger.log(f'1페이지가 아니므로 동적로딩을 위해 휠 스크롤 업', level=logging.DEBUG)
await self.scroll_page_to_top()
self.logger.log(f'동적로딩을 위해 휠 스크롤 업', level=logging.DEBUG)
if not self.toggle_states['ed_mode']:
# 4. 현재 페이지의 모든 "세부사항 수정 및 업로드" 버튼 찾기
self.logger.log('수정모드가 아니므로 상품수정 버튼 elements를 수집합니다.', level=logging.DEBUG)
product_buttons = await self.get_product_edit_buttons_by_template()
else:
self.logger.log('상품정보 수집', level=logging.DEBUG)
product_infos, product_name_elements = await self.collect_product_info(items_per_page, ed_mode=self.toggle_states['ed_mode'])
self.logger.log(f"product_infos : {product_infos}", level=logging.DEBUG)
self.logger.log('수정모드이므로 상품명 elements를 수정버튼으로 활용합니다.', level=logging.DEBUG)
product_buttons = [{"edit_button": name_element, "memo_button": None, "shipping_button": None} for name_element in product_name_elements]
self.logger.log(f"product_buttons 갯수 : [{len(product_buttons)}]개", level=logging.DEBUG)
if not product_buttons:
self.logger.log('수정할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG)
break
# 5. 각 상품에 대해 상품수정작업 수행
for index, button_set in enumerate(product_buttons, start=1):
edit_button = button_set.get("edit_button")
memo_button = button_set.get("memo_button")
shipping_button = button_set.get("shipping_button") # 해외배송비 수정 버튼
# 상태 초기화 코드 추가
self.optionHandler.reset_state()
self.thumbnailHandler.reset_state()
self.titleGenerator.reset_state()
self.clipboardImageManager.reset_state() # 클립보드 초기화
self.reset_detail_trans_state() # detail_trans 관련 상태 초기화
# 그 외 임시 변수 초기화
self.current_options_info = None
# 상품명 수집 오류 처리
self.logger.log(f'{index}/{len(product_buttons)} 버튼의 활성상태 확인 중...', level=logging.DEBUG)
is_disabled = await self.is_button_disabled(edit_button)
if is_disabled:
self.logger.log(f'{index}/{len(product_buttons)}: 상품의 수정버튼이 비활성화되어 있어 작업을 건너뜁니다.', level=logging.DEBUG)
completed_count += 1
self.total_progressbar_signal.emit(completed_count, total_products)
continue
self.logger.log(f'{index}/{len(product_buttons)}: 세부사항 수정 작업 중...', level=logging.DEBUG)
# 상품 수정 다이얼로그 열기
await self.open_product_edit_dialog(edit_button)
title_infos = await self.titleGenerator.get_initial_info(self.price_setting_diag, use_lens=self.toggle_states['use_lens'], use_api=self.toggle_states['use_API'])
self.logger.log(f"title_infos : {title_infos}", level=logging.DEBUG)
# 금지카테고리 여부가 True이면, 금지카테고리 제목 설정 후 저장하고 다음 상품으로 넘어감.
if title_infos.get("is_banned_category", False):
banned_title = self.titleGenerator.set_banned_category_title(title_infos.get("banned_category_info"))
self.logger.log(f"금지카테고리 상품 처리: 새 제목: {banned_title}", level=logging.INFO)
# 상품명 설정 (TitleGenerator 내부 또는 별도 메서드를 통해)
is_set = await self.titleGenerator.set_product_name(banned_title)
if is_set:
self.logger.log("금지카테고리 상품명 설정 완료.", level=logging.INFO)
else:
self.logger.log("금지카테고리 상품명 설정 실패.", level=logging.WARNING)
await self.save_and_ecs_product_edit()
completed_count += 1
self.total_progressbar_signal.emit(completed_count, total_products)
continue # 다음 상품으로 넘어감
# 정상상품이면 상품명 수정과 카테고리 수집
is_title = self.toggle_states['title']
if is_title:
self.logger.log(f"상품명 수정 : {is_title} ", level=logging.DEBUG)
self.start_stage_signal.emit(0)
title_infos["generated_name"] = await self.titleGenerator.process_title()
self.complete_stage_signal.emit(0)
# 옵션 수정
self.start_stage_signal.emit(1)
is_optionTrnas = self.toggle_states.get('optionTrnas')
is_optionIMGTrans = self.toggle_states.get('optionIMGTrans')
is_optionAutoSelect = self.toggle_states.get('optionAutoSelect')
if is_optionTrnas or is_optionIMGTrans or is_optionAutoSelect:
self.logger.log(f"옵션수정 : optionTrnas={is_optionTrnas} + optionIMGTrans={is_optionIMGTrans} + optionAutoSelect={is_optionAutoSelect}", level=logging.DEBUG)
await self.edit_option(title_infos.get("original_name", None))
self.complete_stage_signal.emit(1)
# 가격 수정
self.start_stage_signal.emit(2)
is_price = self.toggle_states.get('price')
if is_price:
self.logger.log(f"가격수정 : {is_price} ", level=logging.DEBUG)
await self.edit_price(title_infos)
self.complete_stage_signal.emit(2)
# 썸네일 수정
self.start_stage_signal.emit(3)
thumb = self.toggle_states.get('thumb')
self.logger.log(f"썸네일수정 : {thumb} ", level=logging.DEBUG)
if thumb:
await self.edit_thumb()
self.complete_stage_signal.emit(3)
# 태그 수정
tag = self.toggle_states.get('tag')
if tag:
self.logger.log(f"키워드태그 수정 : {tag} ", level=logging.DEBUG)
self.start_stage_signal.emit(4)
await self.edit_tags(title_infos)
self.complete_stage_signal.emit(4)
# 상세페이지 수정 수정
detail_Option = self.toggle_states.get('detail_Option')
detail_IMGTrans = self.toggle_states.get('detail_IMGTrans')
if detail_Option or detail_IMGTrans:
self.logger.log(f"상세페이지 수정 : {detail_Option} + {detail_IMGTrans}", level=logging.DEBUG)
# 상세페이지 수정
self.start_stage_signal.emit(5)
await self.detail_trans(detail_Option, detail_IMGTrans)
self.complete_stage_signal.emit(5)
# 수정 후 저장
self.logger.log('상품 세부사항 저장 중...', level=logging.DEBUG)
await self.save_and_ecs_product_edit()
# 메모 입력
if memo_button:
self.logger.log(f'{index}/{len(product_buttons)}: 메모 입력 중...', level=logging.DEBUG)
await self.insert_product_infos_to_memo(memo_button, title_infos)
completed_count += 1
self.logger.log(f'{completed_count}/[{total_products}]개 상품 수정 완료.', level=logging.INFO)
self.total_progressbar_signal.emit(completed_count, total_products)
self.log_memory_usage(f"{completed_count} 개 상품 수정 후 ")
if (completed_count % 10) == 0:
gc.collect()
mem = psutil.virtual_memory()
self.logger.log(f"GC 호출 후, 메모리 사용량: {mem.percent}% 사용중", level=logging.DEBUG)
if completed_count >= total_products:
self.logger.log(f'[{total_products}]개 상품 수정이 완료되었습니다.', level=logging.INFO)
self.translation_completed.emit(total_products)
return
# 6. 다음 페이지로 이동 (있으면)
if not await self.go_to_next_page():
self.logger.log('더 이상 페이지가 없습니다. 작업을 종료합니다.', level=logging.INFO)
break
page_number += 1
if self.running:
self.logger.log('모든 상품 번역 및 저장 완료.', level=logging.INFO)
self.whale_translator.close_trans_browser()
self.logger.log('웨일 종료', level=logging.INFO)
except Exception as e:
self.logger.log(f"번역 작업 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
self.translation_error.emit(str(e))
async def insert_product_infos_to_memo(self, memo_button, title_infos):
try:
# 1. 메모 버튼 유효성 확인
is_disabled = await self.is_button_disabled(memo_button)
if is_disabled:
self.logger.log("메모 버튼이 비활성화되어 있어 작업을 건너뜁니다.", level=logging.WARNING)
return
# 2. title_infos 유효성 확인
top_5_titles = title_infos.get("top_5_titles", [])
top_5_prices = title_infos.get("top_5_prices", [])
if not top_5_titles or not top_5_prices:
self.logger.log("title_infos의 top_5_titles 또는 top_5_prices가 비어 있어 작업을 건너뜁니다.", level=logging.WARNING)
return
# 3. 텍스트 생성
text = self.generate_titles_with_prices(title_infos)
if not text:
self.logger.log("생성된 텍스트가 비어 있어 작업을 건너뜁니다.", level=logging.WARNING)
return
# 4. 메모 버튼 클릭
await memo_button.click()
self.logger.log("메모 버튼을 클릭했습니다.", level=logging.DEBUG)
# 5. 텍스트 입력란 대기
# memo_input = await self.page.query_selector(self.memo_input_locator)
memo_input = self.page.locator(self.memo_input_locator)
await memo_input.wait_for(state="visible")
self.logger.log("메모 입력란이 나타났습니다.", level=logging.DEBUG)
# 6. 텍스트 입력.
await memo_input.click()
await asyncio.sleep(0.5)
await memo_input.fill(text)
self.logger.log(f"메모 입력란에 텍스트를 입력했습니다: {text}", level=logging.DEBUG)
# 7. 저장 버튼 대기 및 클릭
# save_button = self.page.locator('div.ant-modal-footer > button[type="button"].ant-btn.css-1li46mu.ant-btn-primary')
# save_button = await self.page.query_selector(self.memo_save_btn_locator)
save_button = self.page.locator(self.memo_save_btn_locator)
await save_button.wait_for(state="visible")
self.logger.log("메모 입력란의 저장 버튼이 나타났습니다.", level=logging.DEBUG)
await save_button.click()
self.logger.log("메모 입력란의 저장 버튼을 클릭했습니다.", level=logging.DEBUG)
await asyncio.sleep(0.5)
except Exception as e:
self.logger.log(f"메모 입력 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
# 에러 발생 시 스크린샷 저장
timestamp = int(time.time())
screenshot_path = os.path.join(self.temp_dir, f"screenshot_error_{timestamp}.png")
await self.page.screenshot(path=screenshot_path)
self.logger.log(f"메모 입력 중 에러 발생 으로 스크린샷 저장됨: {screenshot_path}", level=logging.INFO)
# ESC 키 2번 전송하여 팝업 제거 시도
self.logger.log("ESC 키를 2번 전송하여 팝업 제거 후 메모 입력 재시도합니다.", level=logging.WARNING)
await self.page.keyboard.press("Escape")
await self.page.keyboard.press("Escape")
await asyncio.sleep(0.5) # 잠시 대기
# 메모 입력 재시도
try:
await memo_button.click()
self.logger.log("재시도: 메모 버튼을 클릭했습니다.", level=logging.DEBUG)
memo_input = self.page.locator(self.memo_input_locator)
await memo_input.wait_for(state="visible")
await memo_input.fill(text)
self.logger.log(f"재시도: 메모 입력란에 텍스트를 입력했습니다: {text}", level=logging.DEBUG)
save_button = await self.page.query_selector(self.memo_save_btn_locator)
save_button = self.page.locator(self.memo_save_btn_locator)
await save_button.wait_for(state="visible")
await save_button.click()
self.logger.log("재시도: 저장 버튼을 클릭했습니다.", level=logging.DEBUG)
await asyncio.sleep(0.5)
except Exception as e2:
self.logger.log(f"재시도 후에도 메모 입력 중 오류 발생: {e2}", level=logging.ERROR, exc_info=True)
# ESC 키 2번 전송하여 팝업 제거 후 다음 상품으로 넘어감
self.logger.log("재시도 실패: ESC 키를 2번 전송하여 창을 닫고 다음 상품으로 넘어갑니다.", level=logging.WARNING)
await self.page.keyboard.press("Escape")
await self.page.keyboard.press("Escape")
# 다음 상품으로 넘어가기 위해 예외를 재발생하지 않고 그냥 리턴합니다.
return
# async def insert_product_infos_to_memo_ori(self, memo_button, title_infos):
# """메모 버튼을 클릭하고 텍스트를 입력한 뒤 저장"""
# try:
# # 1. 메모 버튼 유효성 확인
# is_disabled = await self.is_button_disabled(memo_button)
# if is_disabled:
# self.logger.log("메모 버튼이 비활성화되어 있어 작업을 건너뜁니다.", level=logging.WARNING)
# return
# # 2. title_infos 유효성 확인
# top_5_titles = title_infos.get("top_5_titles", [])
# top_5_prices = title_infos.get("top_5_prices", [])
# if not top_5_titles or not top_5_prices:
# self.logger.log("title_infos의 top_5_titles 또는 top_5_prices가 비어 있어 작업을 건너뜁니다.", level=logging.WARNING)
# return
# # 3. 텍스트 생성
# text = self.generate_titles_with_prices(title_infos)
# if not text:
# self.logger.log("생성된 텍스트가 비어 있어 작업을 건너뜁니다.", level=logging.WARNING)
# return
# # 4. 메모 버튼 클릭
# await memo_button.click()
# self.logger.log("메모 버튼을 클릭했습니다.", level=logging.DEBUG)
# # 5. 텍스트 입력란 대기
# memo_input = self.page.locator('[placeholder="상품에 대한 메모를 작성해주세요"]')
# await memo_input.wait_for(state="visible")
# self.logger.log("메모 입력란이 나타났습니다.", level=logging.DEBUG)
# # 6. 텍스트 입력
# await memo_input.fill(text)
# self.logger.log(f"메모 입력란에 텍스트를 입력했습니다: {text}", level=logging.DEBUG)
# # # 7. 체크박스 활성화 확인 및 체크 상태 설정
# # # # role="dialog" 내의 체크박스 입력 요소 찾기
# # dialog_locator = self.page.locator('[role="dialog"]')
# # checkbox_input = dialog_locator.locator('input.ant-checkbox-input')
# # checkbox_label = dialog_locator.locator('label.ant-checkbox-wrapper')
# # # 체크박스가 비활성화된 경우, 활성화 후 클릭
# # is_disabled = await checkbox_input.is_disabled()
# # if is_disabled:
# # self.logger.log("체크박스가 비활성화 상태입니다. 활성화를 시도합니다.", level=logging.DEBUG)
# # await self.page.evaluate('(checkbox) => checkbox.removeAttribute("disabled")', checkbox_input)
# # # # 체크 상태 확인
# # is_checked = await checkbox_label.get_attribute("class")
# # if "ant-checkbox-wrapper-checked" not in is_checked:
# # self.logger.log("체크박스가 체크되어 있지 않습니다. 체크를 수행합니다.", level=logging.DEBUG)
# # await checkbox_input.click()
# # else:
# # self.logger.log("체크박스가 이미 체크 상태입니다.", level=logging.DEBUG)
# # await self.page.evaluate('(checkbox) => checkbox.checked = true', checkbox_input)
# # 8. 저장 버튼 대기
# save_button = self.page.locator('div.ant-modal-footer > button[type="button"].ant-btn.css-1li46mu.ant-btn-primary')
# await save_button.wait_for(state="visible")
# self.logger.log("저장 버튼이 나타났습니다.", level=logging.DEBUG)
# # 9. 저장 버튼 클릭
# await save_button.click()
# self.logger.log("저장 버튼을 클릭했습니다.", level=logging.DEBUG)
# await asyncio.sleep(0.5)
# except Exception as e:
# self.logger.log(f"메모 입력 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
# # 에러 발생 시 스크린샷 저장
# timestamp = int(time.time())
# screenshot_path = os.path.join(self.temp_dir, f"screenshot_error_{timestamp}.png")
# await self.page.screenshot(path=screenshot_path)
# self.logger.log(f"에러 발생 시 스크린샷 저장됨: {screenshot_path}", level=logging.INFO)
# # 동영상 기록이 활성화되어 있다면, 페이지에 연결된 동영상 경로 확인 (페이지가 닫히면 video 파일이 최종 저장됩니다)
# if self.page.video:
# video_path = await self.page.video.path()
# self.logger.log(f"동영상 파일 경로: {video_path}", level=logging.INFO)
# # 필요하다면, 여기서 ffmpeg를 호출해 에러 전후 30초를 추출하는 추가 작업을 할 수 있습니다.
# # 예) os.system(f"ffmpeg -i {video_path} -ss {start_time} -t 60 {clipped_video_path}")
# raise e # 예외 재발생
def generate_titles_with_prices(self, title_infos):
"""top_5_titles와 top_5_prices를 매칭하여 텍스트 생성"""
try:
titles = title_infos.get("top_5_titles", [])
prices = title_infos.get("top_5_prices", [])
if not titles or not prices:
self.logger.log("top_5_titles 또는 top_5_prices가 비어 있습니다.", level=logging.WARNING)
return ""
# 매칭된 텍스트 생성
formatted_lines = []
for title, price in zip(titles, prices):
formatted_price = f"{int(price):,}" # 3자리수마다 콤마, "원" 추가
formatted_lines.append(f"{title} - {formatted_price}")
# 최종 텍스트 생성
result_text = "\n".join(formatted_lines)
self.logger.log(f"Generated titles with prices:\n{result_text}", level=logging.DEBUG)
return result_text
except Exception as e:
self.logger.log(f"텍스트 생성 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return ""
def is_valid_html(self, html_data: str) -> bool:
"""
HTML 데이터가 정상적인지 확인하는 메서드
조건: <img> 태그가 포함되어 있고 다수의 이미지 URL이 존재해야 함
"""
try:
# <img> 태그 내의 src 속성 값을 추출
img_urls = re.findall(r'<img[^>]+src=["\'](.*?)["\']', html_data)
# 최소 이미지 URL 개수를 기준으로 판단 (예: 5개 이상)
if len(img_urls) >= 5:
self.logger.log(f"HTML에 포함된 이미지 URL: {img_urls}", level=logging.DEBUG)
return True
else:
self.logger.log(f"HTML에 포함된 이미지 URL이 부족합니다. ({len(img_urls)}개)", level=logging.WARNING)
return False
except Exception as e:
self.logger.log(f"HTML 유효성 검사 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return False
async def detail_trans(self, detail_Option, detail_IMGTrans):
try:
# 상세페이지 탭 클릭
await self.click_detail_tab()
await asyncio.sleep(0.5)
html_data = await self.get_ckeditor_data()
if not html_data or len(html_data) < 10:
self.logger.log(f"HTML 데이터가 없거나 너무 짧습니다: {html_data}", level=logging.INFO)
return
# 이미지 URL을 추출
image_urls = self.fetch_image_urls(html_data)
original_image_count = len(image_urls)
self.logger.log(f"추출된 이미지 URL 수: {original_image_count}", level=logging.INFO)
if not image_urls:
self.logger.log("이미지가 없습니다.", level=logging.INFO)
return
# OCR 결과를 저장할 딕셔너리
ocr_results = {}
# OCR 처리가 활성화된 경우
if self.toggle_states.get('ocr', False):
self.logger.log("OCR 기능을 사용하여 이미지 필터링을 시작합니다.", level=logging.INFO)
# OCR 프로세서 초기화
ocr_processor = ChineseImageOCRProcessor(logger=self.logger, use_gpu=False, det_enabled=True, horizontal_only=False)
# unwanted_words 가져오기 (TEXT 필드에서 JSON 형식으로 저장됨)
unwanted_words_json = self.toggle_states.get('unwanted_words', '{}')
unwanted_words = {}
# JSON 문자열을 파이썬 딕셔너리로 변환
try:
if isinstance(unwanted_words_json, str) and unwanted_words_json.startswith('{'):
unwanted_words = json.loads(unwanted_words_json)
else:
# 이미 딕셔너리인 경우
unwanted_words = unwanted_words_json if isinstance(unwanted_words_json, dict) else {}
self.logger.log(f"unwanted_words 로드 완료: {unwanted_words}", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"unwanted_words 처리 중 오류 발생: {str(e)}", level=logging.ERROR)
unwanted_words = {}
if not unwanted_words:
self.logger.log("unwanted_words가 설정되지 않았습니다. 기본값을 사용합니다.", level=logging.WARNING)
# OCR 처리
try:
# OCR 결과 받기 - True: 번역 대상, False: 원본 유지
ocr_results = ocr_processor.check_image_in_zhcn(image_urls, unwanted_words)
self.logger.log(f"OCR 결과: {ocr_results}", level=logging.INFO)
translate_count = sum(1 for result in ocr_results.values() if result)
original_count = sum(1 for result in ocr_results.values() if not result)
self.logger.log(f"필터링 결과: 원본 {original_image_count}개, 번역 대상 {translate_count}개, 원본 유지 {original_count}", level=logging.INFO)
except Exception as e:
self.logger.log(f"OCR 처리 중 오류 발생: {str(e)}", level=logging.ERROR)
# OCR 처리 실패 시 기본값으로 모든 이미지 번역 대상 설정
ocr_results = {url: True for url in image_urls}
else:
# OCR 비활성화 시 모든 이미지 번역 대상으로 설정
ocr_results = {url: True for url in image_urls}
# 상세페이지 옵션에 따라 처리
if detail_Option or detail_IMGTrans:
if detail_IMGTrans:
self.logger.log("이미지 번역 시작...", level=logging.INFO)
self.set_progress_visible_signal.emit(True)
# CKEditor 내용 지우기
await self.clear_ckeditor_data()
# 텍스트 입력 필드 선택
input_field = self.page.locator(self.cke_text_editing_area_locator)
await input_field.click()
self.logger.log("입력 필드 선택", level=logging.DEBUG)
# 소개글과 옵션데이터 입력
if detail_Option:
await self.input_detail_text(self.optionHandler)
# 이미지 번역 처리
success_count = 0
translate_count = 0
original_count = 0
for idx, (img_url, should_translate) in enumerate(ocr_results.items()):
try:
# 진행률 업데이트
self.update_detail_progress_signal.emit(idx + 1, len(ocr_results))
# 이미지 번역을 위한 임시 파일 경로 생성
img_path = os.path.join(self.temp_dir, f"translated_detail_image_{idx+1}.png")
self.logger.log(f"[{idx+1}]번째 이미지 처리 시작: {img_url}", level=logging.DEBUG)
# 디렉토리 존재 확인 및 생성
os.makedirs(os.path.dirname(img_path), exist_ok=True)
if should_translate:
# 번역 대상인 경우 (OCR 결과가 True)
self.logger.log(f"웨일 번역기로 이미지 번역 시작: {img_url}", level=logging.INFO)
# 웨일 브라우저로 이미지 번역
is_translated = self.whale_translator.translate_image(img_url)
translate_count += 1
# 번역된 이미지를 임시 파일로 저장
translated_img_path = self.clipboardImageManager.process_clipboard_to_save_path(
original_url=img_url,
is_success_translated=is_translated,
toggle_states=self.toggle_states,
path=img_path
)
if translated_img_path:
# 번역된 이미지 업로드
await self.upload_image(translated_img_path)
self.clipboardImageManager.clear_clipboard()
success_count += 1
self.logger.log(f"{idx+1}번째 이미지 번역 성공", level=logging.INFO)
else:
self.logger.log(f"{idx+1}번째 이미지 번역 실패", level=logging.WARNING)
# 번역 실패 시 원본 이미지 업로드
downloaded_img_path = await self.download_image_direct(img_url, img_path)
if downloaded_img_path:
await self.upload_image(downloaded_img_path)
self.logger.log(f"{idx+1}번째 이미지 원본으로 대체 업로드 성공", level=logging.INFO)
else:
# 번역하지 않는 경우 (OCR 결과가 False)
self.logger.log(f"원본 이미지 유지 (직접 다운로드): {img_url}", level=logging.DEBUG)
original_count += 1
# 원본 이미지 직접 다운로드 후 업로드
downloaded_img_path = await self.download_image_direct(img_url, img_path)
if downloaded_img_path:
# 다운로드한 이미지 업로드
await self.upload_image(downloaded_img_path)
self.logger.log(f"{idx+1}번째 이미지 원본 업로드 성공", level=logging.INFO)
else:
self.logger.log(f"{idx+1}번째 이미지 다운로드 실패", level=logging.WARNING)
await asyncio.sleep(0.2) # 과부하 방지
except Exception as img_e:
self.logger.log(f"이미지 처리 중 오류 발생: {str(img_e)}", level=logging.ERROR)
self.logger.log(f"이미지 처리 완료: 번역 {translate_count}개 (성공 {success_count}개), 원본 유지 {original_count}", level=logging.INFO)
# 번역 완료 후 프로그레스바 숨김
self.set_progress_visible_signal.emit(False)
# 추가 줄바꿈 입력
await self.page.keyboard.press("ArrowRight")
await self.page.keyboard.press("Enter")
await self.page.keyboard.press("Enter")
await self.page.keyboard.press("Enter")
# 작업 완료
return True
else:
# 상세페이지 옵션이 비활성화된 경우
return False
except Exception as e:
self.logger.log(f"detail_trans 중 오류 발생: {str(e)}", level=logging.ERROR)
self.set_progress_visible_signal.emit(False)
return False
async def download_image_direct(self, url, save_path):
"""
이미지 URL에서 직접 이미지를 다운로드하는 메서드
Args:
url (str): 다운로드할 이미지 URL
save_path (str): 저장할 파일 경로
Returns:
str: 저장된 이미지 경로 또는 실패 시 None
"""
try:
# HTTP 요청 헤더 설정
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "image/webp,image/apng,image/*,*/*;q=0.8",
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
"Referer": "https://www.example.com/", # 필요에 따라 변경
"Connection": "keep-alive",
"Pragma": "no-cache",
"Cache-Control": "no-cache"
}
# 이미지 다운로드
response = requests.get(url, headers=headers, stream=True, timeout=10)
response.raise_for_status()
# 이미지 저장
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
self.logger.log(f"이미지 다운로드 성공: {url} -> {save_path}", level=logging.DEBUG)
return save_path
except Exception as e:
self.logger.log(f"이미지 다운로드 실패: {url}, 오류: {e}", level=logging.ERROR)
return None
async def upload_image(self, img_path):
"""
이미지를 CKEditor에 업로드하고 가장 첫 번째 이미지 URL을 추출 후 삭제합니다.
:param img_path: 업로드할 이미지 파일 경로
:return: 업로드된 이미지의 URL
"""
try:
# 업로드 이전 이미지 리스트 저장
existing_images = await self.page.evaluate("""
() => Array.from(document.querySelectorAll('img')).map(img => img.src)
""")
# 1. 파일 업로드
file_input = self.page.locator(self.cke_img_file_input_locator)
await file_input.set_input_files(img_path)
self.logger.log(f"이미지가 성공적으로 업로드되었습니다: {img_path}", level=logging.INFO)
# # 2. 옆으로
await self.page.keyboard.press("ArrowRight")
await self.page.keyboard.press("ArrowRight")
# 업로드된 이미지 URL 확인
current_images = await self.page.evaluate("""
() => Array.from(document.querySelectorAll('img')).map(img => img.src)
""")
new_images = list(set(current_images) - set(existing_images))
if new_images:
self.logger.log(f"업로드된 이미지 확인", level=logging.INFO)
return new_images[0]
self.logger.log("업로드된 이미지를 찾을 수 없습니다.", level=logging.WARNING)
return None
except Exception as e:
self.logger.log(f"이미지 업로드 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
async def input_detail_text(self, optionHandler):
"""
상세페이지에 소개글과 옵션 데이터를 입력하는 메서드
Args:
optionHandler (OptionHandler): 옵션 데이터 처리 객체
Returns:
None
"""
try:
# 텍스트 입력 필드 선택
input_field = await self.page.wait_for_selector(self.cke_text_editing_area_locator, timeout=5000)
if not input_field:
self.logger.log("텍스트 입력 필드를 찾을 수 없습니다.", level=logging.ERROR)
return
# 선두부 텍스트 입력 (detail_text_widget의 get_lines 메서드 사용)
for leading_text in self.detail_text_widget.get_lines():
if leading_text: # 내용이 있는 경우에만
await input_field.type(leading_text)
await input_field.press('Enter')
self.logger.log(f"텍스트 입력 완료: {leading_text}", level=logging.DEBUG)
# 옵션 데이터 입력 (단일 옵션이 아닌 경우에만)
option_data = optionHandler.get_selected_translated_options()
if option_data and len(option_data) > 0:
is_single = optionHandler.option_info.get('is_single_option', True)
if not is_single:
self.logger.log('단일옵션이 아니므로 옵션목록을 입력', level=logging.INFO)
# 옵션 목록 헤더 입력
await input_field.type("# 옵션 목록")
await input_field.press('Enter')
# 첫 번째 옵션 입력
first_key = list(option_data.keys())[0]
await input_field.type(f"- 1. {first_key}")
await input_field.press('Enter')
# 나머지 옵션 입력
for i, key in enumerate(list(option_data.keys())[1:], start=2):
await input_field.type(f"- {i}. {key}")
await input_field.press('Enter')
# 목록 종료 및 후두부 텍스트 입력
await input_field.press('Enter')
await input_field.type('### 나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.')
await input_field.press('Enter')
await input_field.type('---')
await input_field.press('Enter')
self.logger.log('옵션 데이터 입력 완료', level=logging.INFO)
except Exception as e:
self.logger.log(f"상세페이지 텍스트 입력 중 오류 발생: {e}", level=logging.ERROR)
async def edit_option(self, product_name):
# 상세페이지 탭 클릭
await self.click_option_tab()
# 옵션 최대선택갯수
max_option_count = self.toggle_states.get('max_option_count', 0)
self.current_options_info = await self.optionHandler.process_options(product_name, self.forbidden_word_manager, max_option_count, self.toggle_states)
# 수정 후 저장
await self.save_product_edit()
async def edit_price(self, title_infos):
# 상세페이지 탭 클릭
await self.click_price_tab()
# 가격 수정 프로세스
await self.priceHandler.process_price(title_infos=title_infos)
# 수정 후 저장
await self.save_product_edit()
async def edit_thumb(self):
# 상세페이지 탭 클릭
await self.click_thumb_tab()
# 가격 수정 프로세스
await self.thumbnailHandler.process_thumbnails(self.toggle_states)
# 수정 후 저장
await self.save_product_edit()
async def edit_tags(self, title_infos):
# 상세페이지 탭 클릭
await self.click_tags_tab()
# 가격 수정 프로세스
await self.tagsHandler.process_tags(self.forbidden_word_manager, title_infos=title_infos)
# 수정 후 저장
await self.save_product_edit()
# def start_browser_task(self):
# """브라우저 시작을 위한 스레드 시작"""
# self.start() # QThread의 start() 호출로 run()을 실행
# def run(self):
# """QThread의 run 메서드에서 이벤트 루프를 생성하고 비동기 메서드 실행"""
# self.loop = asyncio.new_event_loop() # 새로운 이벤트 루프 생성
# asyncio.set_event_loop(self.loop) # 이 스레드에서 사용할 이벤트 루프로 설정
# self.loop.run_forever() # 스레드 종료 시까지 이벤트 루프 실행 유지
def run(self):
"""QThread의 run 메서드에서 이벤트 루프 생성 및 실행"""
self.logger.log("run() - 이벤트 루프 초기화 시작", level=logging.DEBUG)
# 이벤트 루프가 없거나 종료된 경우에만 초기화
if not self.loop or self.loop.is_closed():
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.logger.log("run() - 이벤트 루프가 생성되었습니다.", level=logging.DEBUG)
# 이벤트 루프를 유지하여 외부에서 작업 추가 가능
self.loop.run_forever()
def start_browser_task(self):
"""이벤트 루프에서 브라우저 초기화 작업 추가"""
self.logger.log("start_browser_task - 브라우저 초기화 작업 시작", level=logging.DEBUG)
# 실행 중인 이벤트 루프에 비동기 작업 추가
if self.loop and not self.loop.is_closed():
asyncio.run_coroutine_threadsafe(self.start_browser_async(), self.loop)
self.browser_task_started = True # 작업 실행 플래그 설정
self.logger.log("start_browser_task - 비동기 작업이 추가되었습니다.", level=logging.DEBUG)
else:
self.logger.log("start_browser_task - 실행 중인 이벤트 루프가 없습니다.", level=logging.ERROR)
def start_PercentyJob_task(self):
"""번역 작업을 이벤트 루프에서 실행"""
# 이벤트 루프가 없거나 닫혀 있으면 새로 생성
# self.initialize_event_loop()
if self.loop and not self.loop.is_closed():
# 이미 실행 중인 이벤트 루프에 번역 작업 추가
asyncio.run_coroutine_threadsafe(self.start_Percenty_task(), self.loop)
else:
self.logger.log("이벤트 루프가 초기화되지 않았거나 이미 종료되었습니다.", level=logging.ERROR)
def initialize_event_loop(self):
"""이벤트 루프가 없거나 닫힌 경우 새로 생성"""
if not self.loop or self.loop.is_closed():
self.logger.log("initialize_event_loop - 새로운 이벤트 루프 생성 중", level=logging.DEBUG)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.logger.log("initialize_event_loop - 이벤트 루프가 생성되었습니다.", level=logging.DEBUG)
else:
self.logger.log("initialize_event_loop - 기존 이벤트 루프 사용 중", level=logging.DEBUG)
def stop(self):
"""Playwright를 안전하게 종료"""
try:
# 비동기 함수 호출을 동기식으로 처리
asyncio.run(self._stop_playwright())
except RuntimeError as e:
self.logger.log(f"Playwright 종료 중 오류 발생: {e}", level=logging.ERROR)
async def _stop_playwright(self):
"""스레드 종료 시 Playwright 리소스 정리 및 이벤트 루프 종료"""
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
if self.loop and not self.loop.is_closed():
self.loop.call_soon_threadsafe(self.loop.stop) # 이벤트 루프 종료
if self.whale_translator:
self.logger.log("Whale 브라우저 창 닫기 시도 중...", level=logging.INFO)
self.whale_translator.close_trans_browser()
def force_terminate_browser(self):
"""Playwright 브라우저 프로세스를 강제로 종료"""
try:
if self.browser:
browser_pid = self.browser.contexts[0]._channel.owner._impl_obj._browser_pid
browser_process = psutil.Process(browser_pid)
for child in browser_process.children(recursive=True):
child.kill()
browser_process.kill()
self.logger.log("브라우저 프로세스를 강제로 종료했습니다.", level=logging.WARNING)
if self.whale_translator:
self.whale_translator.close_trans_browser()
except Exception as e:
self.logger.log(f"브라우저 프로세스 강제 종료 중 오류 발생: {e}", level=logging.ERROR)
def terminate(self):
self.logger.log("크롬 스레드 종료", level=logging.INFO)
self.cleanup() # 종료 시 추가 정리 작업 호출
if self.whale_translator:
self.whale_translator.close_trans_browser()
super().terminate()
def cleanup(self):
pass
def check_pause(self):
"""일시 정지 상태라면 재개될 때까지 대기"""
self.pause_mutex.lock()
if self.is_paused:
self.pause_condition.wait(self.pause_mutex)
self.pause_mutex.unlock()
def pause(self):
"""스레드 일시 정지"""
self.pause_mutex.lock()
self.is_paused = True
self.pause_mutex.unlock()
self.logger.log("스레드가 일시 정지되었습니다.", level=logging.DEBUG)
def resume(self):
"""스레드 재개"""
self.pause_mutex.lock()
self.is_paused = False
self.pause_condition.wakeAll()
self.pause_mutex.unlock()
self.logger.log("스레드가 재개되었습니다.", level=logging.DEBUG)
def log_memory_usage(self, message=""):
mem = psutil.virtual_memory()
self.logger.log(f"{message} 메모리 사용: 총 {mem.total/1024/1024:.1f}MB, 사용 {mem.used/1024/1024:.1f}MB, 사용률 {mem.percent}%", level=logging.DEBUG)
async def get_ckeditor_data(self):
"""CKEditor에서 HTML 데이터를 가져옵니다."""
try:
# 소스 버튼 locator
source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
# 소스 편집 영역 locator
ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
# 소스 모드로 전환
await self.page.click(source_button_locator)
self.logger.log("소스 버튼 클릭 완료.", level=logging.DEBUG)
# 데이터 가져오기
textarea = await self.page.wait_for_selector(ck_source_editing_area_locator, timeout=5000)
html_data = await textarea.get_attribute("data-value")
# 다시 에디터 모드로 전환
await self.page.click(source_button_locator)
self.logger.log("소스 버튼 재클릭 완료.", level=logging.DEBUG)
return html_data
except Exception as e:
self.logger.log(f"CKEditor 데이터 가져오기 실패: {e}", level=logging.ERROR)
return ""
async def clear_ckeditor_data(self):
"""CKEditor의 내용을 지웁니다."""
try:
# 소스 버튼 locator
source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
# 소스 편집 영역 locator
ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
# 소스 모드로 전환
await self.page.click(source_button_locator)
self.logger.log("소스 버튼 클릭 완료.", level=logging.DEBUG)
# 데이터 지우기
await self.page.evaluate(f'document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", "")')
# 다시 에디터 모드로 전환
await self.page.click(source_button_locator)
self.logger.log("소스 버튼 재클릭 완료.", level=logging.DEBUG)
return True
except Exception as e:
self.logger.log(f"CKEditor 데이터 지우기 실패: {e}", level=logging.ERROR)
return False
async def set_ckeditor_data(self, html_data):
"""CKEditor에 HTML 데이터를 설정합니다."""
try:
# 소스 버튼 locator
source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
# 소스 편집 영역 locator
ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
# 소스 모드로 전환
await self.page.click(source_button_locator)
self.logger.log("소스 버튼 클릭 완료.", level=logging.DEBUG)
# 데이터 넣기 - JS 인젝션으로 특수 문자 처리 (JSON.stringify로 escape)
escaped_html = html_data.replace('\\', '\\\\').replace('`', '\\`').replace('$', '\\$')
await self.page.evaluate(f'document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", `{escaped_html}`)')
# 다시 에디터 모드로 전환
await self.page.click(source_button_locator)
self.logger.log("소스 버튼 재클릭 완료.", level=logging.DEBUG)
return True
except Exception as e:
self.logger.log(f"CKEditor 데이터 설정 실패: {e}", level=logging.ERROR)
return False
async def remove_original_images(self, html_data):
"""HTML에서 이미지 태그를 찾아 웨일 번역 URL로 변경합니다.
Args:
html_data (str): 원본 HTML 데이터
Returns:
str: 수정된 HTML 데이터
"""
try:
# HTML 파싱
soup = BeautifulSoup(html_data, 'html.parser')
# 이미지 태그들 찾기
img_tags = soup.find_all('img')
# 각 이미지에 대해 처리
for img in img_tags:
if 'src' in img.attrs:
original_url = img['src']
# 웨일 번역 URL로 변경
translated_url = original_url.replace("http://", "https://translate.whale.naver.net/image-translate/v1/apis/lt/url?target=ko&url=http://")
translated_url = translated_url.replace("https://", "https://translate.whale.naver.net/image-translate/v1/apis/lt/url?target=ko&url=https://")
if translated_url == original_url:
self.logger.log(f"URL 변환 실패: {original_url}", level=logging.WARNING)
else:
img['src'] = translated_url
self.logger.log(f"이미지 번역 URL로 변경: {original_url} -> {translated_url}", level=logging.DEBUG)
# 수정된 HTML 반환
return str(soup)
except Exception as e:
self.logger.log(f"이미지 URL 변경 실패: {e}", level=logging.ERROR)
return html_data