2692 lines
138 KiB
Python
2692 lines
138 KiB
Python
from playwright.async_api import async_playwright, TimeoutError
|
||
from PySide6.QtCore import QThread, Signal, QMutex, QWaitCondition, Slot
|
||
import re
|
||
# import pyautogui
|
||
import time
|
||
import win32gui, win32con
|
||
from bs4 import BeautifulSoup
|
||
import asyncio
|
||
import os, sys, random
|
||
import requests
|
||
import inspect
|
||
import gc
|
||
import psutil
|
||
import shutil
|
||
from datetime import datetime, timedelta
|
||
import math
|
||
from PIL import Image
|
||
from io import BytesIO
|
||
|
||
# from src.wh_search import WhaleSearchParser
|
||
# from src.wh_img_trans import WhaleImageTranslator
|
||
from src.whale_new import WhaleTranslator
|
||
|
||
from src.modules.clipboardImageManager import ClipboardImageManager
|
||
from src.contents.option import OptionHandler
|
||
from src.contents.price import PriceHandler
|
||
from src.contents.details import DetailHandler
|
||
from src.contents.titleGenerator import TitleGenerator
|
||
from src.contents.thumb import ThumbnailHandler
|
||
|
||
from src.modules.image_processor import ImageProcessor
|
||
|
||
from src.contents.tags import TagsHandler
|
||
from src.titleManager.sp_ForbiddenM import SupabaseForbiddenWordManager
|
||
from src.titleManager.gpt_client import GPTClient
|
||
|
||
import tempfile
|
||
import logging
|
||
|
||
# 새로운 모듈 import
|
||
import string
|
||
|
||
class BrowserController(QThread):
|
||
|
||
# 브라우저 시작 시그널 정의
|
||
browser_started = Signal()
|
||
|
||
unknown_browser_error = Signal(str)
|
||
browser_create_error = Signal(str)
|
||
browser_login_error = Signal(str)
|
||
browser_ad1_close_error = Signal(str)
|
||
browser_ad2_close_error = Signal(str)
|
||
browser_group_list_error = Signal(str)
|
||
browser_handler_update_error = Signal(str)
|
||
browser_parsing_page_error = Signal(str)
|
||
browser_registered_product_page_error = Signal(str)
|
||
browser_new_product_page_error = Signal(str)
|
||
|
||
# 번역 작업 시작, 완료, 오류 시그널 정의
|
||
translation_started = Signal()
|
||
translation_completed = Signal(int)
|
||
translation_error = Signal(str)
|
||
|
||
start_stage_signal = Signal(int)
|
||
complete_stage_signal = Signal(int)
|
||
|
||
total_progressbar_signal = Signal(int, int) # (현재 값, 총 값)
|
||
|
||
update_detail_progress_signal = Signal(int, int) # (현재 값, 총 값)
|
||
set_progress_visible_signal = Signal(bool) # ProgressBar 표시/숨김
|
||
|
||
percentyJob_button_Enable = Signal(bool)
|
||
|
||
selected_group_name_signal = Signal(str, int) # 선택된 그룹 이름 시그널
|
||
group_list_signal = Signal(list) # 그룹 리스트 시그널
|
||
|
||
# 일시정지 확인 시그널
|
||
pause_confirmed = Signal()
|
||
|
||
def __init__(self, app, logger, locator_manager, price_setting_diag, detail_text_widget, login_infos, toggle_states, user_id, supabase_manager):
|
||
super().__init__()
|
||
self.app = app # AutoPercentyGUI 객체 저장
|
||
self.logger = logger
|
||
self.locator_manager = locator_manager
|
||
self.price_setting_diag = price_setting_diag
|
||
self.detail_text_widget = detail_text_widget
|
||
self.toggle_states = toggle_states
|
||
self.login_infos = login_infos
|
||
self.user_id = user_id
|
||
self.supabase_manager = supabase_manager
|
||
|
||
self.avx_supported = False
|
||
|
||
self.parsing_page = None
|
||
self.current_options_info = None
|
||
self.chrome_hwnd = None
|
||
|
||
self.base_path = self.get_base_dir()
|
||
self.logger.log(f"base_path: {self.base_path}", level=logging.DEBUG)
|
||
|
||
# 1. 임시 이미지 폴더 (작업 중)
|
||
self.TEMP_IMAGE_DIR = os.path.join(self.base_path, 'temp_images')
|
||
os.makedirs(self.TEMP_IMAGE_DIR, exist_ok=True)
|
||
self.logger.log(f"임시 디렉토리 생성: {self.TEMP_IMAGE_DIR}", level=logging.INFO)
|
||
|
||
# 2. 에러 스크린샷 폴더 (장기 보관)
|
||
self.ERROR_SCREENSHOT_DIR = os.path.join(self.base_path, 'error_screenshots')
|
||
os.makedirs(self.ERROR_SCREENSHOT_DIR, exist_ok=True)
|
||
self.logger.log(f"error 디렉토리 생성: {self.ERROR_SCREENSHOT_DIR}", level=logging.INFO)
|
||
|
||
self.clear_old_screenshot_dirs(days=14)
|
||
self.logger.log(f"error 디렉토리 삭제 완료", level=logging.INFO)
|
||
|
||
self.whale_translator = WhaleTranslator(logger, debug_flag=self.toggle_states['debug_mode'])
|
||
|
||
self.playwright = None
|
||
self.browser = None
|
||
self.page = None
|
||
|
||
self.loop = None # 이벤트 루프를 저장할 변수
|
||
|
||
# 일시 정지 기능을 위한 QMutex와 QWaitCondition 설정
|
||
self.pause_mutex = QMutex()
|
||
self.pause_condition = QWaitCondition()
|
||
self.is_paused = False # 일시 정지 상태 플래그
|
||
|
||
self.gpt_client = GPTClient(logger=self.logger)
|
||
|
||
self.forbidden_word_manager = SupabaseForbiddenWordManager(self.logger, self.supabase_manager, self.user_id)
|
||
|
||
self.clipboardImageManager = ClipboardImageManager(logger=self.logger, watermark_font_size=36, debug_flag=self.toggle_states['debug_mode'])
|
||
|
||
# ImageProcessor를 먼저 초기화
|
||
self.imageProcessor = ImageProcessor(self.logger, self.page, self.whale_translator, self.clipboardImageManager, self.TEMP_IMAGE_DIR, self.toggle_states)
|
||
|
||
# ImageProcessor를 포함하여 다른 핸들러들 초기화
|
||
self.optionHandler = OptionHandler(self.locator_manager, self, self.whale_translator, self.clipboardImageManager, self.TEMP_IMAGE_DIR, self.logger, self.gpt_client, self.update_detail_progress_signal, self.set_progress_visible_signal, toggle_states=self.toggle_states, imageProcessor=self.imageProcessor)
|
||
self.priceHandler = PriceHandler(self.locator_manager, self, self.logger, self.optionHandler, self.price_setting_diag, self.toggle_states, debug_flag=self.toggle_states['debug_mode'])
|
||
self.thumbnailHandler = ThumbnailHandler(self.locator_manager, self, self.logger, self.whale_translator, self.clipboardImageManager, self.toggle_states, self.update_detail_progress_signal, self.set_progress_visible_signal, self.base_path)
|
||
self.titleGenerator = TitleGenerator(self.locator_manager, self, self.logger, self.whale_translator, self.toggle_states, self.gpt_client, self.forbidden_word_manager, self.user_id, self.supabase_manager)
|
||
self.tagsHandler = TagsHandler(self.locator_manager, self, self.logger, self.toggle_states)
|
||
self.detailHandler = DetailHandler(self.locator_manager, self, self.whale_translator, self.clipboardImageManager, self.detail_text_widget, self.TEMP_IMAGE_DIR, self.imageProcessor, self.logger, self.gpt_client, self.update_detail_progress_signal, self.set_progress_visible_signal, self.toggle_states)
|
||
|
||
# BrowserController에 해당하는 모든 locator를 정의
|
||
self.chrome_window_name = self.locator_manager.get_locator('BrowserControl', 'chrome_window_name')
|
||
self.login_email_locator = self.locator_manager.get_locator('BrowserControl', 'login_email_locator')
|
||
self.login_password_locator = self.locator_manager.get_locator('BrowserControl', 'login_password_locator')
|
||
self.login_button_locator = self.locator_manager.get_locator('BrowserControl', 'login_button_locator')
|
||
self.admin_toggle_locator = self.locator_manager.get_locator('BrowserControl', 'admin_toggle_locator')
|
||
self.staff_id_locator = self.locator_manager.get_locator('BrowserControl', 'staff_id_locator')
|
||
self.staff_login_button_locator = self.locator_manager.get_locator('BrowserControl', 'staff_login_button_locator')
|
||
self.close_ad_dialog_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_dialog_locator')
|
||
self.close_ad_button_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_button_locator')
|
||
self.total_product_count_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_locator')
|
||
self.total_product_count_for_registed_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_for_registed_locator')
|
||
self.items_per_page_locator = self.locator_manager.get_locator('BrowserControl', 'items_per_page_locator')
|
||
self.product_edit_dialog_open_locator = self.locator_manager.get_locator('BrowserControl', 'product_edit_dialog_open_locator')
|
||
self.product_dialog_close_btn = self.locator_manager.get_locator('BrowserControl', 'product_dialog_close_btn')
|
||
|
||
self.product_parent_locator= self.locator_manager.get_locator('BrowserControl', 'product_parent_locator')
|
||
self.product_name_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_name_inner_locator')
|
||
self.product_price_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_price_inner_locator')
|
||
self.product_image_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_image_inner_locator')
|
||
self.product_name_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_name_for_ed_template')
|
||
self.product_price_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_price_for_ed_template')
|
||
self.product_image_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_image_for_ed_template')
|
||
self.current_page = self.locator_manager.get_locator('BrowserControl', 'current_page')
|
||
self.next_page_button_template = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template')
|
||
self.new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
|
||
self.registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
|
||
self.current_page_locator = self.locator_manager.get_locator('BrowserControl', 'current_page_locator')
|
||
# self.source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
|
||
# self.ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
|
||
# self.cke_text_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'cke_text_editing_area_locator')
|
||
# self.cke_img_file_input_locator = self.locator_manager.get_locator('BrowserControl', 'cke_img_file_input_locator')
|
||
|
||
self.option_input_field_locator = self.locator_manager.get_locator('BrowserControl', 'option_input_field_locator')
|
||
self.title_tab_locator = self.locator_manager.get_locator('BrowserControl', 'title_tab_locator')
|
||
self.option_tab_locator = self.locator_manager.get_locator('BrowserControl', 'option_tab_locator')
|
||
self.price_tab_locator = self.locator_manager.get_locator('BrowserControl', 'price_tab_locator')
|
||
self.tag_tab_locator = self.locator_manager.get_locator('BrowserControl', 'tag_tab_locator')
|
||
self.thumb_tab_locator = self.locator_manager.get_locator('BrowserControl', 'thumb_tab_locator')
|
||
self.detail_tab_locator = self.locator_manager.get_locator('BrowserControl', 'detail_tab_locator')
|
||
# self.upload_tab_locator = self.locator_manager.get_locator('BrowserControl', 'upload_tab_locator')
|
||
self.save_button_locator = self.locator_manager.get_locator('BrowserControl', 'save_button_locator')
|
||
self.group_dropdown_locator = self.locator_manager.get_locator('BrowserControl', 'group_dropdown_locator')
|
||
self.group_index_template = self.locator_manager.get_locator('BrowserControl', 'group_index_template')
|
||
self.group_options_selector_locator = self.locator_manager.get_locator('BrowserControl', 'group_options_selector_locator')
|
||
self.dropdown_openstatus_locator = self.locator_manager.get_locator('BrowserControl', 'dropdown_openstatus_locator')
|
||
self.selected_group_name_locator = self.locator_manager.get_locator('BrowserControl', 'selected_group_name_locator')
|
||
|
||
self.product_edit_buttons = self.locator_manager.get_locator('BrowserControl', 'product_edit_buttons')
|
||
self.product_memo_buttons = self.locator_manager.get_locator('BrowserControl', 'product_memo_buttons')
|
||
|
||
self.memo_input_locator = self.locator_manager.get_locator('BrowserControl', 'memo_input_locator')
|
||
self.memo_exposer_locator = self.locator_manager.get_locator('BrowserControl', 'memo_exposer_locator')
|
||
self.memo_save_btn_locator = self.locator_manager.get_locator('BrowserControl', 'memo_save_btn_locator')
|
||
|
||
# self.text_templates = self.locator_manager.selectors.get('DetailPageTextTemplates', {})
|
||
|
||
# # 스레드 종료 시 close_whale_window_if_exists 호출
|
||
# self.finished.connect(self.cleanup)
|
||
|
||
def get_page(self):
|
||
return self.page
|
||
|
||
def get_parsing_page(self):
|
||
return self.parsing_page
|
||
|
||
def get_whale(self):
|
||
return self.whale_translator
|
||
|
||
def generate_random_suffix(self, length=4):
|
||
"""랜덤한 영문+숫자 조합 문자열 생성"""
|
||
chars = string.ascii_uppercase + string.digits
|
||
return ''.join(random.choices(chars, k=length))
|
||
|
||
|
||
# async def monitor_browser_logs(self, page):
|
||
# """브라우저 콘솔 로그를 Python으로 캡처하여 출력"""
|
||
# page.on("console", lambda msg: self.logger.info(f"JS 콘솔 로그: {msg.type}: {msg.text}"))
|
||
|
||
# async def monitor_browser_logs(self, page):
|
||
# """브라우저 콘솔 로그를 Python으로 캡처하여 출력"""
|
||
# level_map = {
|
||
# "log": logging.INFO,
|
||
# "info": logging.INFO,
|
||
# "warning": logging.WARNING,
|
||
# "error": logging.ERROR,
|
||
# "debug": logging.DEBUG,
|
||
# }
|
||
|
||
# page.on(
|
||
# "console",
|
||
# lambda msg: self.logger.log(
|
||
# f"JS 콘솔 로그: {msg.type}: {msg.text}",
|
||
# level=level_map.get(msg.type, logging.INFO)
|
||
# )
|
||
# )
|
||
|
||
def get_base_dir(self):
|
||
"""
|
||
실행 환경에 따라 base_dir을 설정하는 메서드.
|
||
cx_Freeze로 패키징된 경우 실행 파일의 경로, 일반 Python 환경일 경우 __file__을 기준으로 설정.
|
||
"""
|
||
if getattr(sys, 'frozen', False): # 패키징된 경우
|
||
base_dir = os.path.dirname(sys.executable)
|
||
internal_dir = os.path.join(base_dir, 'lib', 'src') # lib 디렉토리 포함
|
||
if os.path.exists(internal_dir): # lib 디렉토리가 존재하면 base_dir로 설정
|
||
return internal_dir
|
||
|
||
else: # 일반 Python 실행 환경
|
||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||
debug_dir = os.path.join(base_dir, 'src') # lib 디렉토리 포함
|
||
|
||
return debug_dir
|
||
|
||
async def start_browser_async(self):
|
||
"""비동기 Playwright 초기화 및 로그인 수행"""
|
||
try:
|
||
# self.whale_translator.start_trans_browser()
|
||
# # 바뀐 검색주소
|
||
# # 'https://search.shopping.naver.com/search/all?adQuery=주차차단기&agency=true&frm=MOSCPRO&origQuery=주차차단기&pagingIndex=1&pagingSize=40&productSet=overseas&query=주차차단기&sort=rel×tamp=&viewType=list'
|
||
# self.whale_translator.lens_Search(image_url="https://file.percenty.co.kr/public/652bed8e865b1f32ea62bf1f/products/6847c740ecbe0d32a37a8570/b637c29c-80eb-43eb-975f-58279bc5fde1.jpg")
|
||
# time.sleep(1000000)
|
||
|
||
try:
|
||
self.logger.log('알바생 브라우저를 실행합니다...', level=logging.DEBUG)
|
||
|
||
# WhaleTranslator 필요 여부 확인 및 초기화
|
||
optionIMGTrans_status = self.toggle_states.get('optionIMGTrans', False)
|
||
detail_IMGTrans_status = self.toggle_states.get('detail_IMGTrans', False)
|
||
thumb_status = self.toggle_states.get('thumb', False)
|
||
debug_mode = self.toggle_states.get('debug_mode', False)
|
||
|
||
self.logger.log(f"optionIMGTrans_status: {optionIMGTrans_status}, detail_IMGTrans_status: {detail_IMGTrans_status}, thumb_status: {thumb_status}", level=logging.DEBUG)
|
||
|
||
# if optionIMGTrans_status or detail_IMGTrans_status or thumb_status:
|
||
# self.logger.log('이미지번역을 위해 trans_browser를 로드합니다...', level=logging.DEBUG)
|
||
# self.whale_translator = WhaleTranslator(self.logger)
|
||
# self.whale_translator.start_whale_browser()
|
||
|
||
# Playwright 시작 및 브라우저 실행
|
||
self.playwright = await async_playwright().start()
|
||
|
||
browser_path = os.path.join(self.base_path, 'browsers', 'chromium-1140', 'chrome-win','chrome.exe')
|
||
extension_path = os.path.join(self.base_path, 'browsers', 'extensions', '1.1.100_0')
|
||
user_data_dir = os.path.join(self.base_path, 'browsers', 'user_data')
|
||
|
||
self.logger.log(f"브라우저 경로: {browser_path}", level=logging.DEBUG)
|
||
self.logger.log(f"확장 프로그램 경로: {extension_path}", level=logging.DEBUG)
|
||
self.logger.log(f"사용자 폴더 경로: {user_data_dir}", level=logging.DEBUG)
|
||
|
||
self.video_dir = os.path.join(self.base_path, "recorded_videos")
|
||
if not os.path.exists(self.video_dir):
|
||
os.makedirs(self.video_dir)
|
||
self.logger.log(f"동영상 저장 폴더 생성됨: {self.video_dir}", level=logging.DEBUG)
|
||
|
||
if not os.path.exists(browser_path):
|
||
self.logger.log(f"브라우저 실행 파일이 없습니다: {browser_path}", level=logging.DEBUG)
|
||
raise FileNotFoundError(f"브라우저 실행 파일이 없습니다: {browser_path}")
|
||
|
||
# 사용자 데이터 디렉토리가 존재하지 않으면 생성
|
||
if not os.path.exists(user_data_dir):
|
||
os.makedirs(user_data_dir)
|
||
self.logger.log(f"{user_data_dir} 디렉토리가 생성되었습니다.", level=logging.DEBUG)
|
||
|
||
|
||
# User agent 설정
|
||
import random
|
||
user_agent = random.choice([
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
|
||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
|
||
])
|
||
self.logger.log(f"user_agent: {user_agent}", level=logging.DEBUG)
|
||
|
||
# 브라우저 시작 및 설정
|
||
self.browser = await self.playwright.chromium.launch_persistent_context(
|
||
user_data_dir,
|
||
headless=not debug_mode,
|
||
permissions=["geolocation", "notifications"],
|
||
geolocation={"latitude": 37.5665, "longitude": 126.9780},
|
||
locale="ko-KR",
|
||
args=[
|
||
'--disable-popup-blocking',
|
||
f'--disable-extensions-except={extension_path}',
|
||
f'--load-extension={extension_path}',
|
||
'--start-maximized',
|
||
'--window-size=1920,1080'
|
||
],
|
||
executable_path=browser_path,
|
||
user_agent=user_agent,
|
||
|
||
# record_video_dir=self.video_dir, # 동영상 저장 폴더 지정
|
||
# record_video_size={"width": 1280, "height": 720}
|
||
|
||
)
|
||
|
||
# 기본 페이지가 없을 수 있으므로 새로운 페이지 생성
|
||
self.page = await self.browser.new_page()
|
||
self.logger.log('새 페이지 로딩 중...', level=logging.INFO)
|
||
|
||
|
||
await self.page.goto('https://percenty.co.kr/signin')
|
||
self.logger.log('percenty.co.kr/signin 로딩 완료', level=logging.INFO)
|
||
|
||
# 첫 번째 기본 탭 닫기
|
||
if self.browser.pages:
|
||
await self.browser.pages[0].close()
|
||
|
||
# 페이지 제목을 가져와서 창 제목으로 활용
|
||
page_title = await self.page.title()
|
||
self.logger.log(f'페이지 제목: {page_title}', level=logging.DEBUG)
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"브라우저 객체 생성 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_create_error.emit(str(e))
|
||
return
|
||
|
||
try:
|
||
is_login_success = await self.login()
|
||
if not is_login_success:
|
||
self.logger.log("로그인 실패", level=logging.ERROR)
|
||
return
|
||
except Exception as e:
|
||
self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_login_error.emit(str(e))
|
||
return
|
||
|
||
try:
|
||
# await self.close_ad_if_exists_with_ESC_Key()
|
||
# await self.close_ad_if_exists_new()
|
||
await self.close_ant_modal_dialogs()
|
||
except Exception as e:
|
||
self.logger.log(f"광고 닫기 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_ad1_close_error.emit(str(e))
|
||
return
|
||
|
||
id_ed_mode = self.toggle_states.get('ed_mode', False)
|
||
if id_ed_mode:
|
||
# await self.go_to_registered_product_page()
|
||
self.logger.log('등록 상품 관리 페이지로 이동 중...', level=logging.INFO)
|
||
try:
|
||
registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
|
||
await self.page.click(registered_product_page_locator)
|
||
self.logger.log("등록 상품 관리 페이지로 이동 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
self.logger.log(f"등록 상품 관리 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_registered_product_page_error.emit(str(e))
|
||
return
|
||
|
||
else:
|
||
self.logger.log('신규 상품 등록 페이지로 이동 중...', level=logging.INFO)
|
||
# await self.go_to_new_product_page()
|
||
try:
|
||
new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
|
||
await self.page.click(new_product_page_locator)
|
||
await self.close_ad_if_exists_with_ESC_Key()
|
||
await self.close_ant_modal_dialogs()
|
||
self.logger.log("신규 상품 등록 페이지로 이동 완료.", level=logging.INFO)
|
||
|
||
# self.logger.log(f"신규 상품 등록 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
except Exception as e:
|
||
self.logger.log(f"광고 닫기 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_ad2_close_error.emit(str(e))
|
||
return
|
||
|
||
# group_index = self.toggle_states['group_index']
|
||
# self.logger.log('선택한 그룹 인덱스로 이동', level=logging.INFO)
|
||
# if group_index:
|
||
# await self.select_group_index(group_index=group_index)
|
||
|
||
try:
|
||
# 그룹 이름 리스트 가져오기
|
||
group_names_list = await self.get_group_names_list()
|
||
self.group_list_signal.emit(group_names_list)
|
||
except Exception as e:
|
||
self.logger.log(f"그룹 이름 리스트 가져오기 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_group_list_error.emit(str(e))
|
||
return
|
||
|
||
try:
|
||
# 파싱용 브라우저 인스턴스를 별도로 실행 (headless 모드, 실제 브라우저처럼 보이도록 옵션 추가, 시크릿 모드 포함)
|
||
self.parsing_browser = await self.playwright.chromium.launch(
|
||
executable_path=browser_path, # 추가: self.browser와 같은 경로
|
||
headless=False,
|
||
args=[
|
||
'--disable-blink-features=AutomationControlled', # 자동화 탐지 우회
|
||
'--no-sandbox',
|
||
'--disable-infobars',
|
||
'--disable-dev-shm-usage',
|
||
'--disable-gpu',
|
||
'--start-minimized',
|
||
'--incognito', # 시크릿 모드 활성화
|
||
'--window-position=-32000,-32000',
|
||
]
|
||
)
|
||
|
||
# 파싱용 컨텍스트는 기본적으로 시크릿 모드 환경이며, 추가 설정으로 실제 브라우저와 유사한 점수를 줄 수 있음.
|
||
import random
|
||
self.parsing_context = await self.parsing_browser.new_context(
|
||
locale="ko-KR",
|
||
user_agent=random.choice([
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
|
||
]),
|
||
viewport={"width": 1280, "height": 720},
|
||
extra_http_headers={
|
||
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7"
|
||
}
|
||
)
|
||
|
||
# 파싱용 페이지 생성
|
||
self.parsing_page = await self.parsing_context.new_page()
|
||
except Exception as e:
|
||
self.logger.log(f"파싱용 페이지 생성 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_parsing_page_error.emit(str(e))
|
||
return
|
||
try:
|
||
# 각 핸들러에 초기화된 page 객체 전달.
|
||
self.titleGenerator.update_page(self.page)
|
||
self.titleGenerator.update_parsing_page(self.parsing_page)
|
||
self.optionHandler.update_page(self.page)
|
||
self.tagsHandler.update_page(self.page)
|
||
self.thumbnailHandler.update_page(self.page)
|
||
self.priceHandler.update_page(self.page)
|
||
self.detailHandler.update_page(self.page)
|
||
self.imageProcessor.update_page(self.parsing_page)
|
||
|
||
self.optionHandler.update_whale()
|
||
self.thumbnailHandler.update_whale()
|
||
self.detailHandler.update_whale()
|
||
except Exception as e:
|
||
self.logger.log(f"핸들러 업데이트 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_handler_update_error.emit(str(e))
|
||
return
|
||
|
||
|
||
# 신호 전송
|
||
self.browser_started.emit() # 브라우저 시작 신호
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.unknown_browser_error.emit(str(e))
|
||
return
|
||
# async def start_browser(self):
|
||
# """크롬 브라우저 실행 및 페이지 로딩"""
|
||
# self.logger.log('크롬 브라우저 실행 중...', level=logging.DEBUG)
|
||
# try:
|
||
# # Playwright를 수동으로 실행하여 브라우저 유지
|
||
# self.playwright = await async_playwright().start()
|
||
|
||
# # cx_Freeze로 패키징된 경우와 일반 Python 실행 환경 구분하여 경로 설정
|
||
# if getattr(sys, 'frozen', False):
|
||
# browser_path = os.path.join(os.path.dirname(sys.executable), 'browsers', 'chromium-1112', 'chrome-win','chrome.exe')
|
||
# extension_path = os.path.join(os.path.dirname(sys.executable), 'browsers', 'extensions', '1.1.100_0')
|
||
# user_data_dir = os.path.join(os.path.dirname(sys.executable), 'browsers', 'user_data')
|
||
# else:
|
||
# browser_path = os.path.join(os.path.dirname(__file__), 'browsers', 'chromium-1112', 'chrome-win','chrome.exe')
|
||
# extension_path = os.path.join(os.path.dirname(__file__), 'browsers', 'extensions', '1.1.100_0')
|
||
# user_data_dir = os.path.join(os.path.dirname(__file__), 'browsers', 'user_data')
|
||
|
||
# self.logger.log(f"브라우저 경로: {browser_path}", level=logging.DEBUG)
|
||
# self.logger.log(f"확장 프로그램 경로: {extension_path}", level=logging.DEBUG)
|
||
# self.logger.log(f"사용자 폴더 경로: {user_data_dir}", level=logging.DEBUG)
|
||
|
||
# # 사용자 데이터 디렉토리가 존재하지 않으면 생성
|
||
# if not os.path.exists(user_data_dir):
|
||
# os.makedirs(user_data_dir)
|
||
# self.logger.log(f"{user_data_dir} 디렉토리가 생성되었습니다.", level=logging.DEBUG)
|
||
|
||
|
||
# # User agent 설정
|
||
# user_agent = random.choice([
|
||
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
|
||
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
|
||
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
|
||
# "Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
|
||
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
|
||
# ])
|
||
# self.logger.log(f"user_agent: {user_agent}", level=logging.DEBUG)
|
||
|
||
# # 브라우저 시작 및 설정
|
||
# self.browser = await self.playwright.chromium.launch_persistent_context(
|
||
# user_data_dir,
|
||
# headless=False,
|
||
# permissions=["geolocation", "notifications"],
|
||
# geolocation={"latitude": 37.5665, "longitude": 126.9780},
|
||
# locale="ko-KR",
|
||
# args=[
|
||
# '--disable-popup-blocking',
|
||
# f'--disable-extensions-except={extension_path}',
|
||
# f'--load-extension={extension_path}',
|
||
# '--start-maximized',
|
||
# '--window-size=1920,1080'
|
||
# ],
|
||
# executable_path=browser_path,
|
||
# user_agent=user_agent
|
||
# )
|
||
|
||
# # 기본 페이지가 없을 수 있으므로 새로운 페이지 생성
|
||
# self.page = await self.browser.new_page()
|
||
# self.logger.log('새 페이지 로딩 중...', level=logging.INFO)
|
||
|
||
# await self.page.goto('https://percenty.co.kr/signin')
|
||
# self.logger.log('percenty.co.kr/signin 로딩 완료', level=logging.INFO)
|
||
|
||
# # 첫 번째 기본 탭 닫기
|
||
# if self.browser.pages:
|
||
# await self.browser.pages[0].close()
|
||
|
||
# # 페이지 제목을 가져와서 창 제목으로 활용
|
||
# page_title = await self.page.title()
|
||
# self.logger.log(f'페이지 제목: {page_title}', level=logging.DEBUG)
|
||
|
||
# # 창 핸들 찾기 (동적으로 얻은 페이지 제목 사용)
|
||
# self.chrome_hwnd = self.find_window_by_title(page_title)
|
||
# if not self.chrome_hwnd:
|
||
# self.logger.log('크롬 창을 찾을 수 없습니다.', level=logging.WARNING)
|
||
# else:
|
||
# self.logger.log(f'크롬 창 핸들: {self.chrome_hwnd}', level=logging.DEBUG)
|
||
|
||
# await self.login()
|
||
# await self.close_ad_if_exists()
|
||
|
||
# if self.toggle_states['ed_mode']:
|
||
# await self.go_to_registered_product_page()
|
||
# self.logger.log('등록 상품 관리 페이지로 이동 중...', level=logging.INFO)
|
||
# else:
|
||
# self.logger.log('신규 상품 등록 페이지로 이동 중...', level=logging.INFO)
|
||
# await self.go_to_new_product_page()
|
||
|
||
# self.browser_started.emit() # 브라우저 시작 신호
|
||
# except Exception as e:
|
||
# self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR)
|
||
# self.browser_error.emit(str(e))
|
||
async def login(self) -> bool:
|
||
"""로그인 처리: menu 또는 dialog 요소만 기다립니다."""
|
||
is_admin = self.login_infos.get('is_admin', False)
|
||
who = "관리자" if is_admin else "직원"
|
||
self.logger.log(f'로그인 시도 중: {who} 계정', level=logging.INFO)
|
||
|
||
# 필수 정보 누락 체크
|
||
required = ['admin_id', 'admin_pw'] if is_admin else ['admin_id', 'user_id', 'user_pw']
|
||
missing = [k for k in required if not self.login_infos.get(k)]
|
||
if missing:
|
||
msg = f'로그인 정보 누락: {", ".join(missing)}'
|
||
self.logger.log(msg, level=logging.ERROR)
|
||
self.browser_login_error.emit(msg)
|
||
return False
|
||
|
||
try:
|
||
# 1) 자격증명 입력 & 로그인 클릭
|
||
if is_admin:
|
||
await self.page.fill(self.login_email_locator, self.login_infos['admin_id'])
|
||
await self.page.fill(self.login_password_locator, self.login_infos['admin_pw'])
|
||
await self.page.click(self.login_button_locator)
|
||
else:
|
||
admin_toggle = self.page.locator(self.admin_toggle_locator)
|
||
if await admin_toggle.get_attribute("aria-checked") == "true":
|
||
await admin_toggle.click()
|
||
await self.page.fill(self.login_email_locator, self.login_infos['admin_id'])
|
||
await self.page.fill(self.staff_id_locator, self.login_infos['user_id'])
|
||
await self.page.fill(self.login_password_locator, self.login_infos['user_pw'])
|
||
await self.page.click(self.staff_login_button_locator)
|
||
|
||
# 2) 로그인 성공 요소만 기다리기 (5초)
|
||
try:
|
||
locator = await self.page.wait_for_selector(
|
||
'ul[role="menu"], [role="dialog"]',
|
||
timeout=5000
|
||
)
|
||
role = await locator.get_attribute("role")
|
||
if role == "menu":
|
||
self.logger.log(f'로그인 성공: {who} 계정 (menu 발견)', level=logging.INFO)
|
||
else:
|
||
self.logger.log(f'로그인 성공: {who} 계정 (dialog 발견)', level=logging.INFO)
|
||
return True
|
||
|
||
except Exception:
|
||
# menu/dialog 둘 다 못 찾은 경우
|
||
msg = f'로그인 실패: {who} 계정 (menu/dialog 미발견)'
|
||
self.logger.log(msg, level=logging.ERROR)
|
||
self.browser_login_error.emit(msg)
|
||
await self.save_error_screenshot(f'login_failed_{who}')
|
||
return False
|
||
|
||
except Exception as e:
|
||
# 클릭/입력 중 예외
|
||
msg = f'로그인 처리 중 오류: {who} 계정 – {e}'
|
||
self.logger.log(msg, level=logging.ERROR, exc_info=True)
|
||
self.browser_login_error.emit(msg)
|
||
await self.save_error_screenshot(f'login_exception_{who}')
|
||
return False
|
||
|
||
async def close_browser(self):
|
||
"""브라우저 종료"""
|
||
if self.browser:
|
||
await self.browser.close()
|
||
await self.playwright.stop()
|
||
self.cleanup()
|
||
self.logger.log('브라우저 종료됨.', level=logging.INFO)
|
||
|
||
# def find_window_by_title(self, window_name):
|
||
# """창 제목을 통해 핸들을 찾는 메서드"""
|
||
# def enum_windows_callback(hwnd, result):
|
||
# if win32gui.IsWindowVisible(hwnd) and window_name in win32gui.GetWindowText(hwnd):
|
||
# result.append(hwnd)
|
||
# result = []
|
||
# win32gui.EnumWindows(enum_windows_callback, result)
|
||
# return result[0] if result else None
|
||
|
||
# def switch_to_chrome(self):
|
||
# """크롬으로 포커스 전환"""
|
||
# if self.chrome_hwnd:
|
||
# win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE)
|
||
# win32gui.SetForegroundWindow(self.chrome_hwnd)
|
||
# self.logger.log('크롬 창으로 포커스 이동.', level=logging.DEBUG)
|
||
# else:
|
||
# self.logger.log('크롬 창을 찾을 수 없습니다.', level=logging.ERROR)
|
||
|
||
async def get_total_product_count(self):
|
||
total_count = 0
|
||
items_per_page = 0
|
||
|
||
try:
|
||
# total_count_elements = await self.page.query_selector_all(".sc-dOvA-dm.jqRNYf")
|
||
total_count_element = await self.page.query_selector(self.total_product_count_locator)
|
||
items_per_page_element = await self.page.query_selector(self.items_per_page_locator)
|
||
|
||
self.logger.log(f"total_count_element : {total_count_element}", level=logging.DEBUG)
|
||
|
||
if total_count_element:
|
||
total_count_text = await total_count_element.inner_text()
|
||
if "총" in total_count_text and "개 상품" in total_count_text:
|
||
total_count = int(''.join(re.findall(r'\d+', total_count_text)))
|
||
self.logger.log(f"총 상품수 확인: {total_count} 개", level=logging.INFO)
|
||
|
||
# 페이지당 상품 수 추출
|
||
if items_per_page_element:
|
||
items_per_page_text = await items_per_page_element.get_attribute("title")
|
||
if items_per_page_text and "개씩 보기" in items_per_page_text:
|
||
items_per_page = int(''.join(re.findall(r'\d+', items_per_page_text)))
|
||
self.logger.log(f"페이지당 상품수 확인: {items_per_page} 개씩 보기", level=logging.INFO)
|
||
|
||
# 결과 반환
|
||
if total_count:
|
||
return {"total_count": total_count, "items_per_page": items_per_page}
|
||
|
||
return {"total_count": 0, "items_per_page": 0}
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"상품 수를 가져오는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
return {"total_count": 0, "items_per_page": 0}
|
||
|
||
# def fetch_image_urls(self, html_content):
|
||
# """
|
||
# HTML 콘텐츠에서 모든 <img> 태그의 URL을 추출하는 함수.
|
||
# <figure> 안의 <img> 태그와 독립된 <img> 태그 모두 처리.
|
||
# """
|
||
# soup = BeautifulSoup(html_content, 'html.parser')
|
||
|
||
# # 모든 <img> 태그를 찾기
|
||
# image_urls = []
|
||
# img_tags = soup.find_all('img')
|
||
|
||
# for img in img_tags:
|
||
# # img 태그에서 src 속성 추출
|
||
# if 'src' in img.attrs:
|
||
# image_url = img['src']
|
||
# image_urls.append(image_url)
|
||
# self.logger.log(f"fetch_image_urls 에서 추출한 이미지URL 갯수 : {len(image_urls)} 개", level=logging.DEBUG)
|
||
|
||
# self.logger.log(f"fetch_image_urls 에서 추출한 이미지URL 목록 : {image_urls}", level=logging.DEBUG)
|
||
|
||
# return image_urls
|
||
|
||
async def close_ant_modal_dialogs(self, max_loops=3):
|
||
"""
|
||
antd 모달 다이얼로그에서 '다시 보지 않기', '닫기' 버튼을 최대 max_loops번 반복 클릭합니다.
|
||
self.page, self.logger 사용
|
||
"""
|
||
total_closed = 0
|
||
for loop in range(max_loops):
|
||
closed_this_round = 0
|
||
|
||
# 다이얼로그 생성될 때까지 3초간 대기 (한 번만)
|
||
try:
|
||
await self.page.wait_for_selector('.ant-modal-root', timeout=3000)
|
||
except Exception:
|
||
# screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log("3초 대기 내에 .ant-modal-root 다이얼로그 미등장", level=logging.WARNING)
|
||
|
||
# '다시 보지 않기' 버튼 (span 텍스트 포함)
|
||
dont_show_btns = self.page.locator(
|
||
'.ant-modal-root button span:text("다시 보지 않기")'
|
||
).locator('..') # span의 부모 button
|
||
count1 = await dont_show_btns.count()
|
||
for i in range(count1):
|
||
await dont_show_btns.nth(i).click()
|
||
closed_this_round += 1
|
||
self.logger.log(f'"다시 보지 않기" 버튼 클릭 (loop {loop+1}, {i+1}/{count1})', level=logging.INFO)
|
||
await self.page.wait_for_timeout(200)
|
||
|
||
# '닫기' 버튼 (span 텍스트 포함)
|
||
close_btns = self.page.locator(
|
||
'.ant-modal-root button span:text("닫기")'
|
||
).locator('..')
|
||
count2 = await close_btns.count()
|
||
for i in range(count2):
|
||
await close_btns.nth(i).click()
|
||
closed_this_round += 1
|
||
self.logger.log(f'"닫기" 버튼 클릭 (loop {loop+1}, {i+1}/{count2})', level=logging.INFO)
|
||
await self.page.wait_for_timeout(200)
|
||
|
||
total_closed += closed_this_round
|
||
if closed_this_round == 0:
|
||
break
|
||
await self.page.wait_for_timeout(300)
|
||
self.logger.log(f"총 {total_closed}개 다이얼로그 버튼 클릭 완료", level=logging.INFO)
|
||
return total_closed > 0
|
||
|
||
|
||
|
||
async def close_ad_if_exists_with_ESC_Key(self):
|
||
"""ESC 키를 두 번 전송하여 다이얼로그를 닫는 메서드"""
|
||
try:
|
||
# JavaScript로 ESC 키 이벤트 발생
|
||
await self.page.evaluate("""
|
||
(() => {
|
||
const escEvent = new KeyboardEvent('keydown', {
|
||
key: 'Escape',
|
||
code: 'Escape',
|
||
keyCode: 27,
|
||
which: 27,
|
||
bubbles: true,
|
||
cancelable: true
|
||
});
|
||
document.dispatchEvent(escEvent);
|
||
return true;
|
||
})()
|
||
""")
|
||
time.sleep(1)
|
||
await self.page.evaluate("""
|
||
(() => {
|
||
const escEvent = new KeyboardEvent('keydown', {
|
||
key: 'Escape',
|
||
code: 'Escape',
|
||
keyCode: 27,
|
||
which: 27,
|
||
bubbles: true,
|
||
cancelable: true
|
||
});
|
||
document.dispatchEvent(escEvent);
|
||
return true;
|
||
})()
|
||
""")
|
||
self.logger.log("ESC 키를 전송하여 다이얼로그를 닫았습니다.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"ESC 키 전송 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
|
||
async def close_ad_if_exists_new(self):
|
||
"""
|
||
다이얼로그가 존재하면 해당 다이얼로그 내부의 "닫기" 버튼을 클릭하고,
|
||
다이얼로그를 찾지 못하거나 내부에 닫기 버튼이 없으면 페이지 전체에서 "닫기" 버튼을 검색하여 클릭하는 메서드
|
||
"""
|
||
# 새로운 다이얼로그의 닫기(X) 버튼 먼저 시도
|
||
self.logger.log("새로운 다이얼로그의 닫기(X) 버튼 먼저 시도", level=logging.INFO)
|
||
|
||
# 1. 모달 다이얼로그 처리 (ant-modal-wrap)
|
||
try:
|
||
# 모달 처리 JavaScript 코드
|
||
modal_js = """
|
||
() => {
|
||
// 모달 찾기
|
||
const modalWraps = document.querySelectorAll('.ant-modal-wrap.ant-modal-centered');
|
||
if (modalWraps.length === 0) return 0;
|
||
|
||
console.log(`${modalWraps.length}개의 모달 발견`);
|
||
let closed = 0;
|
||
|
||
// 각 모달에 대해
|
||
for (const modal of modalWraps) {
|
||
try {
|
||
// 1. 닫기 버튼 찾기
|
||
const closeBtn = modal.querySelector('.ant-modal-close');
|
||
if (closeBtn) {
|
||
closeBtn.click();
|
||
console.log('모달 닫기 버튼 클릭');
|
||
closed++;
|
||
continue;
|
||
}
|
||
|
||
// 2. '닫기' 텍스트 버튼 찾기
|
||
const buttons = modal.querySelectorAll('button');
|
||
let found = false;
|
||
for (const btn of buttons) {
|
||
if (btn.textContent.includes('닫기')) {
|
||
btn.click();
|
||
console.log('닫기 텍스트 버튼 클릭');
|
||
closed++;
|
||
found = true;
|
||
break;
|
||
}
|
||
}
|
||
if (found) continue;
|
||
|
||
// 3. 마지막 수단: 모달 숨기기
|
||
modal.style.display = 'none';
|
||
const mask = document.querySelector('.ant-modal-mask');
|
||
if (mask) mask.style.display = 'none';
|
||
console.log('모달 강제 숨김');
|
||
closed++;
|
||
} catch (e) {
|
||
console.error('모달 처리 오류:', e);
|
||
}
|
||
}
|
||
return closed;
|
||
}
|
||
"""
|
||
modal_count = await self.page.evaluate(modal_js)
|
||
if modal_count > 0:
|
||
self.logger.log(f"{modal_count}개의 모달 다이얼로그 닫기 처리됨", level=logging.INFO)
|
||
await self.page.wait_for_timeout(1000) # 모달이 사라질 때까지 대기
|
||
return True
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"모달 다이얼로그 처리 중 오류: {e}", level=logging.INFO)
|
||
|
||
# 2. drawer-close 버튼 처리
|
||
try:
|
||
drawer_js = """
|
||
() => {
|
||
const drawerCloseButtons = Array.from(document.querySelectorAll('button.ant-drawer-close'));
|
||
let count = 0;
|
||
drawerCloseButtons.forEach(button => {
|
||
try {
|
||
button.click();
|
||
console.log('drawer-close 버튼 클릭');
|
||
count++;
|
||
} catch (e) {
|
||
console.error('drawer-close 클릭 오류:', e);
|
||
}
|
||
});
|
||
return count;
|
||
}
|
||
"""
|
||
drawer_count = await self.page.evaluate(drawer_js)
|
||
if drawer_count > 0:
|
||
self.logger.log(f"{drawer_count}개의 drawer-close 버튼 클릭됨", level=logging.INFO)
|
||
await self.page.wait_for_timeout(1000)
|
||
return True
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"drawer-close 버튼 처리 중 오류: {e}", level=logging.INFO)
|
||
|
||
# 3. 배경 클릭 시도
|
||
try:
|
||
backdrop_js = """
|
||
() => {
|
||
const backdropSelectors = [
|
||
'.ant-modal-mask',
|
||
'.ant-drawer-mask',
|
||
'.modal-backdrop',
|
||
'.dialog-backdrop',
|
||
'[role="dialog"] + div',
|
||
'.overlay',
|
||
'.modal-overlay'
|
||
];
|
||
|
||
let clicked = 0;
|
||
for (const selector of backdropSelectors) {
|
||
const backdrops = document.querySelectorAll(selector);
|
||
for (const backdrop of backdrops) {
|
||
try {
|
||
backdrop.click();
|
||
console.log(`${selector} 배경 클릭`);
|
||
clicked++;
|
||
} catch (e) {
|
||
console.error(`${selector} 클릭 오류:`, e);
|
||
}
|
||
}
|
||
}
|
||
return clicked;
|
||
}
|
||
"""
|
||
backdrop_count = await self.page.evaluate(backdrop_js)
|
||
if backdrop_count > 0:
|
||
self.logger.log(f"{backdrop_count}개의 배경 요소 클릭됨", level=logging.INFO)
|
||
await self.page.wait_for_timeout(1000)
|
||
return True
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"배경 클릭 처리 중 오류: {e}", level=logging.INFO)
|
||
|
||
# 4. ESC 키 시도
|
||
try:
|
||
await self.page.keyboard.down("Escape")
|
||
await self.page.wait_for_timeout(100)
|
||
await self.page.keyboard.up("Escape")
|
||
self.logger.log("ESC 키 전송 (down-up 방식)", level=logging.INFO)
|
||
await self.page.wait_for_timeout(1000)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"ESC 키 전송 중 오류: {e}", level=logging.INFO)
|
||
|
||
# 5. 기존 다이얼로그 처리 (이전 코드)
|
||
try:
|
||
# 다이얼로그가 나타날 때까지 최대 3초 대기
|
||
dialog_element = await self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=3000, state='visible')
|
||
self.logger.log("다이얼로그가 발견되었습니다. 내부의 닫기 버튼을 찾습니다.", level=logging.INFO)
|
||
|
||
if dialog_element:
|
||
# 다이얼로그 내부에서 "닫기" 텍스트를 가진 버튼을 xpath로 찾음
|
||
close_button = await dialog_element.query_selector("xpath=.//button[span[text()='닫기']]")
|
||
if close_button:
|
||
await close_button.click()
|
||
self.logger.log("다이얼로그 내부의 닫기 버튼을 클릭하여 닫았습니다.", level=logging.INFO)
|
||
return # 닫기 성공 시 함수 종료
|
||
else:
|
||
self.logger.log("다이얼로그 내부에서 닫기 버튼을 찾지 못했습니다.", level=logging.WARNING)
|
||
else:
|
||
self.logger.log("대기 후에도 다이얼로그 엘리먼트를 찾지 못했습니다.", level=logging.WARNING)
|
||
try:
|
||
await self.page.keyboard.press("Escape")
|
||
await self.page.wait_for_timeout(100)
|
||
await self.page.dispatch_event("body", "keydown", {
|
||
"key": "Escape",
|
||
"code": "Escape",
|
||
"keyCode": 27,
|
||
"which": 27
|
||
})
|
||
await self.page.wait_for_timeout(50)
|
||
await self.page.dispatch_event("body", "keyup", {
|
||
"key": "Escape",
|
||
"code": "Escape",
|
||
"keyCode": 27,
|
||
"which": 27
|
||
})
|
||
await self.page.wait_for_timeout(100)
|
||
|
||
self.logger.log("ECS를 전송하여 닫았습니다.", level=logging.INFO)
|
||
return
|
||
except Exception as e:
|
||
self.logger.log(f"닫기 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
|
||
except TimeoutError:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log("다이얼로그가 나타나지 않았습니다.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"다이얼로그 닫기 시도 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def close_ad_if_exists_ori(self):
|
||
"""광고 다이얼로그가 있으면 닫기 버튼을 클릭하는 메서드"""
|
||
try:
|
||
# 광고 다이얼로그가 나타날 때까지 기다림
|
||
await self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=3000, state='visible')
|
||
self.logger.log("다이얼로그가 발견되었습니다. 닫기 버튼을 클릭합니다.", level=logging.INFO)
|
||
|
||
# 닫기 버튼 클릭
|
||
close_button = await self.page.query_selector(self.close_ad_button_locator)
|
||
if close_button:
|
||
await close_button.click()
|
||
self.logger.log("다이얼로그를 성공적으로 닫았습니다.", level=logging.INFO)
|
||
else:
|
||
self.logger.log("닫기 버튼을 찾지 못했습니다.", level=logging.WARNING)
|
||
|
||
except TimeoutError:
|
||
# 다이얼로그가 없을 때: info 수준의 로그로 기록
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log("다이얼로그가 발견되지 않았습니다. 타임아웃이 발생했습니다.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
# 다른 예외 상황 발생 시 error로 기록
|
||
self.logger.log(f"다이얼로그 닫기 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
# async def go_to_new_product_page(self):
|
||
# """신규 상품 등록 페이지로 이동"""
|
||
# try:
|
||
# new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
|
||
# await self.page.click(new_product_page_locator)
|
||
# self.logger.log("신규 상품 등록 페이지로 이동 완료.", level=logging.INFO)
|
||
# except Exception as e:
|
||
# self.logger.log(f"신규 상품 등록 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
|
||
# async def go_to_registered_product_page(self):
|
||
# """신규 상품 등록 페이지로 이동"""
|
||
# try:
|
||
# registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
|
||
# await self.page.click(registered_product_page_locator)
|
||
# self.logger.log("등록 상품 관리 페이지로 이동 완료.", level=logging.INFO)
|
||
# except Exception as e:
|
||
# self.logger.log(f"등록 상품 관리 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def is_button_disabled(self, button):
|
||
"""버튼이 disabled 상태인지 확인"""
|
||
try:
|
||
# 버튼의 disabled 속성 확인
|
||
is_disabled = await button.get_attribute('disabled')
|
||
return is_disabled is not None # disabled 속성이 있으면 True 반환
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"상품 수정 버튼 상태 확인 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
return False # 오류 발생 시 기본적으로 활성화된 것으로 처리
|
||
|
||
# async def select_group_index_ori(self, group_index: int):
|
||
# """그룹 드롭다운 열고 옵션 선택"""
|
||
# try:
|
||
# self.logger.log(f"group_index : {group_index}", level=logging.INFO)
|
||
|
||
|
||
# await self.page.evaluate("""
|
||
# const targetElement = Array.from(document.querySelectorAll('span')).find(el => el.textContent.trim() === '수집 상품 목록');
|
||
# if (targetElement) {
|
||
# targetElement.scrollIntoView({ behavior: 'smooth', block: 'center' });
|
||
# }
|
||
# """)
|
||
|
||
# self.logger.log(f"그룹박스 요소로 스크롤", level=logging.INFO)
|
||
|
||
# await asyncio.sleep(1)
|
||
# # await self.page.wait_for_load_state("networkidle")
|
||
|
||
# group_option_locator = self.group_index_template.format(index=group_index)
|
||
|
||
# # 드롭다운 열기
|
||
# await self.page.wait_for_selector(self.group_dropdown_locator, timeout=3000, state='visible')
|
||
# await self.page.click(self.group_dropdown_locator, timeout=3000, force=True)
|
||
# self.logger.log("드롭다운을 성공적으로 클릭했습니다.", level=logging.INFO)
|
||
|
||
# # 드롭다운 열림 상태 확인
|
||
# await self.page.wait_for_selector(self.dropdown_openstatus_locator, timeout=3000)
|
||
# self.logger.log("드롭다운이 열렸습니다.", level=logging.INFO)
|
||
|
||
# # 옵션 선택
|
||
# # await self.page.wait_for_selector(group_option_locator, timeout=3000)
|
||
# await self.page.click(group_option_locator, timeout=3000)
|
||
# self.logger.log(f"[{group_index}]번 그룹 선택 완료", level=logging.INFO)
|
||
|
||
# selected_group_name = await self.page.inner_text(self.selected_group_name_locator)
|
||
# self.logger.log(f"선택된 그룹 이름 : [{selected_group_name}]", level=logging.INFO)
|
||
|
||
# # 선택된 그룹 이름을 시그널로 전달
|
||
# self.selected_group_name_signal.emit(selected_group_name)
|
||
|
||
# except TimeoutError:
|
||
# self.logger.log("드롭다운 또는 옵션 선택 중 타임아웃이 발생했습니다.", level=logging.WARNING)
|
||
# except Exception as e:
|
||
# self.logger.log(f"그룹 선택 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def select_group_index(self, group_index: int):
|
||
"""그룹 드롭다운 열고 옵션 선택"""
|
||
try:
|
||
self.logger.log(f"group_index : {group_index}", level=logging.INFO)
|
||
|
||
await self.page.evaluate("""
|
||
const targetElement = Array.from(document.querySelectorAll('span'))
|
||
.find(el => el.textContent.trim() === '수집 상품 목록');
|
||
if (targetElement) {
|
||
targetElement.scrollIntoView({ behavior: 'smooth', block: 'center' });
|
||
}
|
||
""")
|
||
|
||
self.logger.log("그룹박스 요소로 스크롤", level=logging.INFO)
|
||
|
||
await asyncio.sleep(1)
|
||
group_option_locator = self.group_index_template.format(index=group_index+1)
|
||
|
||
# 드롭다운 열기
|
||
await self.page.wait_for_selector(self.group_dropdown_locator, timeout=3000, state='visible')
|
||
await self.page.click(self.group_dropdown_locator, timeout=3000, force=True)
|
||
self.logger.log("드롭다운을 성공적으로 클릭했습니다.", level=logging.INFO)
|
||
|
||
# 드롭다운 열림 상태 확인
|
||
await self.page.wait_for_selector(self.dropdown_openstatus_locator, timeout=3000)
|
||
self.logger.log("드롭다운이 열렸습니다.", level=logging.INFO)
|
||
|
||
# 옵션 선택
|
||
self.logger.log(f"[{group_option_locator}] : group_option_locator", level=logging.INFO)
|
||
|
||
await self.page.click(group_option_locator, timeout=3000)
|
||
self.logger.log(f"[{group_index}]번 그룹 선택 완료", level=logging.INFO)
|
||
|
||
selected_group_name = await self.page.inner_text(self.selected_group_name_locator)
|
||
self.logger.log(f"선택된 그룹 이름 : [{selected_group_name}]", level=logging.INFO)
|
||
|
||
# 총 상품 개수 가져오기
|
||
product_count_info = await self.get_total_product_count()
|
||
total_products = product_count_info.get("total_count", 0)
|
||
items_per_page = product_count_info.get("items_per_page", 0)
|
||
self.logger.log(f"총 상품 개수 : [{total_products}]", level=logging.INFO)
|
||
|
||
|
||
# 선택된 그룹 이름을 시그널로 전달
|
||
self.selected_group_name_signal.emit(selected_group_name, total_products)
|
||
|
||
except TimeoutError:
|
||
self.logger.log("드롭다운 또는 옵션 선택 중 타임아웃이 발생했습니다.", level=logging.WARNING)
|
||
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"그룹 선택 중 타임아웃이 발생했습니다. 스크린샷 저장됨: {screenshot_path}", level=logging.WARNING)
|
||
|
||
# 그룹 선택 실패 시 빈 문자열 시그널 전달
|
||
self.selected_group_name_signal.emit("")
|
||
except Exception as e:
|
||
self.logger.log(f"그룹 선택 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.selected_group_name_signal.emit("")
|
||
|
||
async def get_group_names_list(self):
|
||
"""그룹 드롭다운을 열고 모든 그룹 이름 목록을 가져오기"""
|
||
try:
|
||
self.logger.log("그룹 이름 목록 가져오기 시작", level=logging.INFO)
|
||
|
||
# 수집 상품 목록 요소로 스크롤
|
||
await self.page.evaluate("""
|
||
const targetElement = Array.from(document.querySelectorAll('span'))
|
||
.find(el => el.textContent.trim() === '수집 상품 목록');
|
||
if (targetElement) {
|
||
targetElement.scrollIntoView({ behavior: 'smooth', block: 'center' });
|
||
}
|
||
""")
|
||
|
||
self.logger.log("그룹박스 요소로 스크롤", level=logging.INFO)
|
||
await asyncio.sleep(1)
|
||
|
||
# 드롭다운 열기
|
||
await self.page.wait_for_selector(self.group_dropdown_locator, timeout=3000, state='visible')
|
||
await self.page.click(self.group_dropdown_locator, timeout=3000, force=True)
|
||
self.logger.log("드롭다운을 성공적으로 클릭했습니다.", level=logging.INFO)
|
||
|
||
# 드롭다운 열림 상태 확인
|
||
await self.page.wait_for_selector(self.dropdown_openstatus_locator, timeout=3000)
|
||
self.logger.log("드롭다운이 열렸습니다.", level=logging.INFO)
|
||
|
||
# rc-virtual-list가 로딩될 때까지 잠시 대기
|
||
await asyncio.sleep(0.5)
|
||
|
||
|
||
|
||
# 그룹 옵션들이 로딩될 때까지 대기
|
||
await self.page.wait_for_selector(self.group_options_selector_locator, timeout=3000)
|
||
|
||
# 모든 그룹 이름 텍스트 가져오기
|
||
group_names = await self.page.evaluate(f"""
|
||
() => {{
|
||
const elements = document.querySelectorAll('{self.group_options_selector_locator}');
|
||
return Array.from(elements).map(el => el.textContent.trim());
|
||
}}
|
||
""")
|
||
|
||
|
||
|
||
# await self.page.wait_for_selector("div.ant-select-item-option-content", timeout=6000)
|
||
|
||
# group_names = await self.page.evaluate("""
|
||
# Array.from(document.querySelectorAll('div.ant-select-item-option-content'))
|
||
# .map(el => el.textContent.trim())
|
||
# """)
|
||
|
||
|
||
|
||
self.logger.log(f"발견된 그룹 목록: {group_names}", level=logging.INFO)
|
||
|
||
# 드롭다운 닫기 (ESC 키 또는 다른 곳 클릭)
|
||
await self.page.keyboard.press("Escape")
|
||
self.logger.log("드롭다운 닫기 완료", level=logging.INFO)
|
||
|
||
return group_names
|
||
|
||
except TimeoutError:
|
||
self.logger.log("그룹 목록 가져오기 중 타임아웃이 발생했습니다.", level=logging.WARNING)
|
||
# 드롭다운이 열려있을 수 있으므로 ESC로 닫기 시도
|
||
save_screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"self.group_options_selector_locator: {self.group_options_selector_locator}", level=logging.WARNING)
|
||
self.logger.log(f"그룹 목록 가져오기 중 타임아웃이 발생했습니다. 스크린샷 저장됨: {save_screenshot_path}", level=logging.WARNING)
|
||
|
||
try:
|
||
await self.page.keyboard.press("Escape")
|
||
except:
|
||
pass
|
||
return []
|
||
except Exception as e:
|
||
self.logger.log(f"그룹 목록 가져오기 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
# 드롭다운이 열려있을 수 있으므로 ESC로 닫기 시도
|
||
try:
|
||
await self.page.keyboard.press("Escape")
|
||
except:
|
||
pass
|
||
return []
|
||
|
||
async def get_product_edit_buttons_by_template(self):
|
||
"""현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기"""
|
||
# 버튼 선택자 설정
|
||
# buttons = self.page.locator(self.product_edit_button_template)
|
||
# memos = self.page.locator(self.product_memo_button_template)
|
||
|
||
"""
|
||
현재 페이지의 세부사항 수정 및 업로드 버튼과 메모 버튼을 매칭하여 반환.
|
||
각 상품에 대해 해외배송비 수정 버튼과 메모 버튼을 추출합니다.
|
||
"""
|
||
try:
|
||
# 수정 버튼 선택자 설정
|
||
# edit_buttons = self.page.locator('//button[span[text()="세부사항 수정 및 업로드"]]')
|
||
# memo_buttons = self.page.locator('button.ant-btn.css-1li46mu.ant-btn-default.ant-btn-icon-only')
|
||
edit_buttons = self.page.locator(self.product_edit_buttons)
|
||
memo_buttons = self.page.locator(self.product_memo_buttons)
|
||
|
||
# 수정 버튼 개수 확인
|
||
edit_button_count = await edit_buttons.count()
|
||
if edit_button_count == 0:
|
||
self.logger.log("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.", level=logging.WARNING)
|
||
return []
|
||
|
||
# 메모 버튼 개수 확인
|
||
memo_button_count = await memo_buttons.count()
|
||
if memo_button_count <= 2: # 첫 번째 버튼과 최소 상품 메모 버튼 제외
|
||
self.logger.log("메모 버튼이 유효하지 않습니다. 최소 3개 이상의 버튼에서 메모버튼이 유효합니다.", level=logging.WARNING)
|
||
return []
|
||
|
||
# 첫 번째 버튼 제외하고 나머지 버튼을 처리
|
||
valid_memo_buttons = [memo_buttons.nth(i) for i in range(1, memo_button_count)]
|
||
|
||
# 상품별 해외배송비 수정 버튼과 메모 버튼 매칭
|
||
product_buttons = []
|
||
index = 0 # valid_memo_buttons에서의 현재 인덱스
|
||
|
||
for i in range(edit_button_count):
|
||
if index + 1 >= len(valid_memo_buttons):
|
||
self.logger.log("메모 버튼의 개수가 부족합니다. 매칭에 실패했습니다.", level=logging.WARNING)
|
||
break
|
||
|
||
# 상품별 버튼 매칭
|
||
product_buttons.append({
|
||
"edit_button": edit_buttons.nth(i),
|
||
"shipping_button": valid_memo_buttons[index], # 해외배송비 수정 버튼
|
||
"memo_button": valid_memo_buttons[index + 1] # 메모 버튼
|
||
})
|
||
index += 2 # 한 상품에 대해 두 개의 버튼(해외배송비 + 메모 버튼)을 소모
|
||
|
||
# 최종 결과 로그
|
||
self.logger.log(f"현재 페이지의 수정할 상품 개수: {len(product_buttons)}", level=logging.INFO)
|
||
|
||
return product_buttons
|
||
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"상품 수정 버튼을 찾는 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
return []
|
||
|
||
async def get_product_edit_buttons_by_template_new(self):
|
||
"""
|
||
템플릿 방식의 CSS 선택자를 이용해 현재 페이지의 수정 및 메모 버튼을 동적으로 찾습니다.
|
||
"""
|
||
product_buttons = []
|
||
# 예시로, 상품 항목들이 li.product-item 형태로 존재한다고 가정합니다.
|
||
product_items = self.page.locator("div#root li.product-item")
|
||
count = await product_items.count()
|
||
self.logger.log(f"현재 페이지에 {count}개의 상품 항목을 발견했습니다.", level=logging.INFO)
|
||
for i in range(1, count + 1):
|
||
edit_selector = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template').format(index=i)
|
||
memo_selector = self.locator_manager.get_locator('BrowserControl', 'product_memo_button_template').format(index=i)
|
||
edit_button = self.page.locator(edit_selector)
|
||
memo_button = self.page.locator(memo_selector)
|
||
if await edit_button.count() > 0:
|
||
product_buttons.append({
|
||
"edit_button": edit_button,
|
||
"memo_button": memo_button
|
||
})
|
||
else:
|
||
self.logger.log(f"상품 인덱스 {i}에 대해 수정 버튼을 찾지 못했습니다.", level=logging.WARNING)
|
||
return product_buttons
|
||
|
||
async def open_product_edit_dialog(self, button):
|
||
"""상품 수정 다이얼로그 열기"""
|
||
try:
|
||
# 요소가 화면에 없을 경우 스크롤하여 보이도록 함
|
||
await button.scroll_into_view_if_needed()
|
||
self.logger.log("상품의 '세부사항 수정 및 업로드' 버튼을 화면에 보이도록 스크롤.", level=logging.DEBUG)
|
||
|
||
await self.page.keyboard.press("Escape")
|
||
self.logger.log("이전 다이얼로그 닫기 완료.", level=logging.INFO)
|
||
|
||
await button.click()
|
||
self.logger.log("세부사항 수정 다이얼로그 열기 완료.", level=logging.INFO)
|
||
|
||
# await self.page.wait_for_selector('div.ant-tabs-nav') # 다이얼로그가 완전히 로딩될 때까지 기다림
|
||
await self.page.wait_for_selector(self.product_edit_dialog_open_locator) # 다이얼로그가 완전히 로딩될 때까지 기다림
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"세부사항 수정 다이얼로그 열기 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
# 에러 발생 시 스크린샷 저장
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"세부사항 수정 다이얼로그 열기 중 오류 발생으로 스크린샷 저장됨: {screenshot_path}", level=logging.INFO)
|
||
|
||
async def save_error_screenshot(self, error_tag: str = None) -> str:
|
||
"""
|
||
에러 발생시 스크린샷 저장.
|
||
- error_tag가 None이면 호출한 함수 이름과 줄 번호를 자동으로 사용.
|
||
"""
|
||
# 1) 호출자 정보를 가져온다
|
||
caller = inspect.stack()[1]
|
||
func_name = caller.function
|
||
lineno = caller.lineno
|
||
# 코드 컨텍스트(한 줄) 추출 (없으면 빈 문자열)
|
||
code_line = caller.code_context[0].strip() if caller.code_context else ""
|
||
|
||
# 2) error_tag 미지정 시 자동 생성
|
||
if not error_tag:
|
||
error_tag = f"{func_name}_L{lineno}"
|
||
|
||
# 3) 날짜별 디렉토리 준비
|
||
today = datetime.now().strftime("%Y%m%d")
|
||
date_dir = os.path.join(self.ERROR_SCREENSHOT_DIR, today)
|
||
os.makedirs(date_dir, exist_ok=True)
|
||
|
||
# 4) 타임스탬프
|
||
now = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
filename = f"screenshot_{error_tag}_{now}.png"
|
||
full_path = os.path.join(date_dir, filename)
|
||
|
||
# 5) 스크린샷 찍기
|
||
await self.page.screenshot(path=full_path)
|
||
|
||
# 6) 디버깅용으로 호출 줄과 코드도 로그에 남기기
|
||
self.logger.log(f"스크린샷 저장: {full_path} (called at {func_name} L{lineno}: {code_line})", level=logging.INFO)
|
||
|
||
return full_path
|
||
|
||
def clear_old_screenshot_dirs(self, days=14):
|
||
"""에러스크린샷 디렉토리에서 지정일(기본 14일) 지난 하위폴더 삭제"""
|
||
now = datetime.now()
|
||
for dir_name in os.listdir(self.ERROR_SCREENSHOT_DIR):
|
||
dir_path = os.path.join(self.ERROR_SCREENSHOT_DIR, dir_name)
|
||
if os.path.isdir(dir_path):
|
||
try:
|
||
# 폴더명 YYYYMMDD로 파싱
|
||
dir_date = datetime.strptime(dir_name, '%Y%m%d')
|
||
if now - dir_date > timedelta(days=days):
|
||
# 오래된 폴더 삭제
|
||
import shutil
|
||
shutil.rmtree(dir_path)
|
||
self.logger.log(f"[스크린샷] {dir_path} 삭제됨", level=logging.INFO)
|
||
except Exception as e:
|
||
self.logger.log(f"[스크린샷] {dir_path} 삭제 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
def get_error_screenshot_dir(self):
|
||
"""에러 스크린샷 디렉토리 경로 반환"""
|
||
return self.ERROR_SCREENSHOT_DIR
|
||
|
||
async def click_detail_tab(self):
|
||
"""상세페이지 탭 클릭"""
|
||
try:
|
||
await self.page.click(self.detail_tab_locator)
|
||
self.logger.log("상세페이지 탭 클릭 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"상세페이지 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def click_option_tab(self):
|
||
"""옵션 탭 클릭"""
|
||
try:
|
||
await self.page.click(self.option_tab_locator)
|
||
self.logger.log("옵션 탭 클릭 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"옵션 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def click_price_tab(self):
|
||
"""가격 탭 클릭"""
|
||
try:
|
||
await self.page.click(self.price_tab_locator)
|
||
self.logger.log("가격 탭 클릭 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"가격 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def click_thumb_tab(self):
|
||
"""썸네일 탭 클릭"""
|
||
try:
|
||
await self.page.click(self.thumb_tab_locator)
|
||
self.logger.log("썸네일 탭 클릭 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"썸네일 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def click_tags_tab(self):
|
||
"""키워드 태그 탭 클릭"""
|
||
try:
|
||
await self.page.click(self.tag_tab_locator)
|
||
self.logger.log("키워드 태그 탭 클릭 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"키워드 태그 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def click_title_tab(self):
|
||
"""상품명 탭 클릭"""
|
||
try:
|
||
await self.page.click(self.title_tab_locator)
|
||
self.logger.log("상품명 탭 클릭 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"상품명 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
# def generate_restored_html(self, urls):
|
||
# """이미지 URL 목록을 HTML 형식으로 변환하는 메서드"""
|
||
# html_content = '<p> </p>'
|
||
# for url in urls:
|
||
# html_content += f'<figure class="image"><img src="{url}" style="aspect-ratio:1/1;"></figure>\n'
|
||
# return html_content
|
||
|
||
|
||
def generate_restored_html(self, urls):
|
||
"""이미지 URL 목록을 HTML 형식으로 변환하는 메서드, 각 이미지의 가로세로 비율 추가"""
|
||
html_content = '<p> </p>\n'
|
||
for url in urls:
|
||
width, height = self.get_image_size(url)
|
||
aspect_ratio = f"{width}/{height}" if width and height else "1/1"
|
||
if width and height:
|
||
html_content += (
|
||
f'<figure class="image">'
|
||
f'<img style="aspect-ratio:{aspect_ratio};" src="{url}" width="{width}" height="{height}">'
|
||
f'</figure>\n'
|
||
)
|
||
else:
|
||
# 이미지 크기를 확인할 수 없을 경우 기본 형식으로 추가
|
||
html_content += f'<figure class="image"><img src="{url}"></figure>\n'
|
||
return html_content
|
||
|
||
def get_image_size(self, url):
|
||
"""이미지 URL로부터 가로와 세로 크기를 가져오는 메서드"""
|
||
try:
|
||
# URL에서 불필요한 따옴표 제거
|
||
cleaned_url = url.strip("'\"")
|
||
|
||
response = requests.get(cleaned_url, timeout=5)
|
||
response.raise_for_status()
|
||
image = Image.open(BytesIO(response.content))
|
||
return image.width, image.height
|
||
except Exception as e:
|
||
|
||
self.logger.log(f"이미지 크기 확인 실패 - {cleaned_url}: {e}", level=logging.WARNING)
|
||
return None, None
|
||
|
||
|
||
# async def extract_image_urls(self, optionHandler, is_option_data=False):
|
||
# """상세페이지에서 이미지 URL 추출"""
|
||
# try:
|
||
# # 소스 편집 모드로 전환
|
||
# source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
|
||
# ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
|
||
|
||
# # 소스 편집 모드로 전환
|
||
# await self.page.click(source_button_locator)
|
||
# self.logger.log("소스 버튼 클릭 완료.", level=logging.DEBUG)
|
||
|
||
|
||
# # 'data-value' 속성 값을 추출 (textarea 요소)
|
||
# textarea = await self.page.wait_for_selector(ck_source_editing_area_locator, timeout=5000)
|
||
# data_value = await textarea.get_attribute("data-value")
|
||
|
||
|
||
# # HTML 소스에서 이미지 URL 추출
|
||
# image_urls = self.fetch_image_urls(data_value)
|
||
# self.logger.log(f'추출된 이미지 URL 수: {len(image_urls)}', level=logging.INFO)
|
||
|
||
# # HTML 소스에서 이미지 URL 삭제
|
||
# self.logger.log('img 태그를 삭제 중...', level=logging.DEBUG)
|
||
# data_value_element = await self.page.query_selector(ck_source_editing_area_locator)
|
||
# new_value = ""
|
||
# if data_value_element:
|
||
# await self.page.evaluate(f'() => document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", "{new_value}")')
|
||
# updated_value = await data_value_element.get_attribute('data-value')
|
||
# self.logger.log(f'Updated data-value: {updated_value}', level=logging.DEBUG)
|
||
# else:
|
||
# self.logger.log('Element with data-value not found.', level=logging.DEBUG)
|
||
# self.logger.log('img 태그 삭제 완료.', level=logging.DEBUG)
|
||
|
||
# # img 태그의 class 삭제 후 다시 소스 버튼 클릭
|
||
# await self.page.click(source_button_locator)
|
||
# self.logger.log('소스 버튼 재 클릭 완료.', level=logging.DEBUG)
|
||
|
||
|
||
# # 텍스트 입력 필드 선택
|
||
# input_field = await self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
|
||
# await input_field.press('Enter')
|
||
|
||
# # # 선두부 텍스트 입력
|
||
# # for key in sorted(self.text_templates.keys()):
|
||
# # leading_text = self.text_templates[key]
|
||
# # if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
|
||
# # await input_field.type(leading_text)
|
||
# # await input_field.press('Enter')
|
||
# # self.logger.log(f"{key} 텍스트 입력 완료: {leading_text}", level=logging.INFO)
|
||
|
||
# # 선두부 텍스트 입력 (self.detail_text_widget의 get_lines 메서드 사용)
|
||
# for leading_text in self.detail_text_widget.get_lines():
|
||
# if leading_text: # 내용이 있는 경우에만
|
||
# await input_field.type(leading_text)
|
||
# await input_field.press('Enter')
|
||
# self.logger.log(f"텍스트 입력 완료: {leading_text}", level=logging.INFO)
|
||
|
||
|
||
# option_data = optionHandler.get_selected_translated_options()
|
||
# self.logger.log(f'option_data : {option_data}', level=logging.DEBUG)
|
||
|
||
# # if is_option_data:
|
||
# if option_data or option_data != {}:
|
||
# self.logger.log('옵션 데이터 입력 시작', level=logging.DEBUG)
|
||
# # option_data = {} # option_data 초기화
|
||
# is_single = optionHandler.option_info['is_single_option']
|
||
# # option_data = optionHandler.get_selected_translated_options()
|
||
|
||
# # is_single = True # 옵션입력 일단 제외
|
||
# # self.logger.log('옵션입력 일단 제외', level=logging.DEBUG)
|
||
|
||
# self.logger.log('가져온 옵션 데이터', level=logging.DEBUG)
|
||
# self.logger.log(f'{option_data}', level=logging.DEBUG)
|
||
|
||
# # # 옵션 입력 필드 선택
|
||
# # input_field = await self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
|
||
# # await input_field.press('Enter')
|
||
|
||
# # # 선두부 텍스트 입력
|
||
# # for key in sorted(self.text_templates.keys()):
|
||
# # leading_text = self.text_templates[key]
|
||
# # if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
|
||
# # await input_field.type(leading_text)
|
||
# # await input_field.press('Enter')
|
||
# # self.logger.log(f"{key} 텍스트 입력 완료: {leading_text}", level=logging.INFO)
|
||
|
||
# if not is_single:
|
||
# self.logger.log('단일옵션이 아니므로 옵션목록을 입력', level=logging.INFO)
|
||
|
||
# # 각 옵션을 한 줄씩 입력
|
||
# await input_field.type("# 옵션 목록")
|
||
# await input_field.press('Enter')
|
||
|
||
# # 첫 번째 옵션의 번역된 옵션명만 입력
|
||
# first_key = list(option_data.keys())[0] # key는 옵션이름 value는 가격
|
||
# first_value = option_data[first_key] # key는 옵션이름 value는 가격
|
||
# await input_field.type(f"- 1. {first_key}")
|
||
# await input_field.press('Enter') # 첫 번째 옵션 이후 엔터로 줄바꿈
|
||
|
||
# # 나머지 옵션도 번역된 옵션명만 입력
|
||
# for i, (key, value) in enumerate(list(option_data.items())[1:], start=2):
|
||
# await input_field.type(f"{key}") # 2번옵션부터 번호 떼고 입력. key는 옵션이름 value는 가격
|
||
# await input_field.press('Enter') # 엔터 키를 입력하여 줄바꿈
|
||
|
||
# # 목록 끝을 알리기 위해 엔터 두 번 입력
|
||
# await input_field.press('Enter')
|
||
# await input_field.press('Enter')
|
||
|
||
# # 후두부 텍스트 입력
|
||
# await input_field.type('### 나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.')
|
||
# await input_field.press('Enter')
|
||
# await input_field.type('---')
|
||
# await input_field.press('Enter')
|
||
|
||
# self.logger.log('옵션 데이터 입력 완료', level=logging.INFO)
|
||
|
||
# return image_urls
|
||
# except Exception as e:
|
||
# self.logger.log(f"이미지 URL 추출 & 옵션데이터 입력 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
# return image_urls if image_urls else []
|
||
|
||
# def paste_image_in_chrome(self, url, is_success_translated, toggle_states, is_watermark=False, watermark_text= ""):
|
||
# """크롬으로 포커스를 옮기고 클립보드의 이미지를 붙여넣고 엔터 입력"""
|
||
# self.logger.log("크롬으로 포커스를 옮기고 클립보드의 이미지를 붙여넣고 엔터 입력", level=logging.DEBUG)
|
||
# try:
|
||
# # self.switch_to_chrome() # 크롬으로 포커스 이동
|
||
# self.clipboardImageManager.process_clipboard(original_url=url, is_success_translated=is_success_translated, toggle_states=toggle_states) # 클립보드 내용을 처리
|
||
# # clipboard_content = pyperclip.paste()
|
||
# if self.clipboardImageManager.is_clipboard_image():
|
||
# pyautogui.hotkey('ctrl', 'v') # 클립보드 이미지 붙여넣기
|
||
# self.logger.log("이미지 붙여넣기 완료.", level=logging.INFO)
|
||
# pyautogui.press('right') # 오른쪽 입력
|
||
# self.logger.log("이미지 붙여넣기 완료.", level=logging.DEBUG)
|
||
# self.clipboardImageManager.clear_clipboard()
|
||
# self.logger.log("이미지 붙여넣기 완료로 클립보드 비우기.", level=logging.INFO)
|
||
# return True
|
||
# else:
|
||
# self.logger.log("클립보드가 비어있습니다.", level=logging.WARNING)
|
||
# return False
|
||
# except Exception as e:
|
||
# self.logger.log(f"이미지 붙여넣기 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
# return False
|
||
|
||
async def save_and_ecs_product_edit_ori(self):
|
||
"""상품 수정 후 저장 버튼 클릭"""
|
||
try:
|
||
await self.page.click(self.save_button_locator)
|
||
await self.page.keyboard.press("Escape")
|
||
self.logger.log("상품 수정 내용 저장 및 ECS 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"저장 버튼 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
|
||
async def save_and_ecs_product_edit(self):
|
||
"""상품 수정 후 저장 버튼 클릭 및 ESC 전송, 그리고 다이얼로그 닫힘 버튼 클릭을 통한 다이얼로그 종료 처리"""
|
||
try:
|
||
# 저장 버튼 클릭
|
||
await self.page.click(self.save_button_locator)
|
||
self.logger.log("저장 버튼 클릭 완료.", level=logging.INFO)
|
||
|
||
# 저장 동작 완료를 위한 잠시 대기
|
||
await asyncio.sleep(0.5)
|
||
|
||
# ESC 키를 최대 3회 전송하여 다이얼로그가 닫혔는지 확인
|
||
max_attempts = 3
|
||
for attempt in range(max_attempts):
|
||
await self.page.keyboard.press("Escape")
|
||
self.logger.log(f"ESC 키 전송 시도 {attempt+1}/{max_attempts}", level=logging.DEBUG)
|
||
await asyncio.sleep(0.5)
|
||
if not await self.page.is_visible(self.save_button_locator):
|
||
self.logger.log("상품 수정 다이얼로그가 성공적으로 닫혔습니다.", level=logging.INFO)
|
||
break
|
||
else:
|
||
self.logger.log("상품 수정 다이얼로그가 여전히 열려있습니다.", level=logging.WARNING)
|
||
else:
|
||
# ESC 전송 후에도 다이얼로그가 닫히지 않은 경우, product_dialog_close_btn을 클릭
|
||
self.logger.log("ESC 시도 후에도 다이얼로그가 닫히지 않아, 닫힘 버튼 클릭 시도합니다.", level=logging.INFO)
|
||
if await self.page.is_visible(self.product_dialog_close_btn):
|
||
await self.page.click(self.product_dialog_close_btn)
|
||
self.logger.log("닫힘 버튼 클릭 완료.", level=logging.INFO)
|
||
else:
|
||
self.logger.log("닫힘 버튼을 찾지 못했습니다.", level=logging.ERROR)
|
||
except Exception as e:
|
||
|
||
self.logger.log(f"저장 및 다이얼로그 종료 처리 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
# 에러 발생 시 스크린샷 저장
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"저장 및 다이얼로그 종료 처리 중 에러 발생 으로 스크린샷 저장됨: {screenshot_path}", level=logging.INFO)
|
||
|
||
async def save_product_edit(self):
|
||
"""상품 수정 후 저장 버튼 클릭"""
|
||
try:
|
||
await self.page.click(self.save_button_locator)
|
||
self.logger.log("상품 수정 내용 저장 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"저장 버튼 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def go_to_next_page_ori(self):
|
||
"""다음 페이지로 이동"""
|
||
try:
|
||
# 현재 페이지가 몇 번째 페이지인지 확인 (클래스에 'ant-pagination-item-active'가 있는 요소)
|
||
current_page = await self.page.query_selector(self.current_page_locator)
|
||
|
||
if not current_page:
|
||
self.logger.log("현재 페이지 정보를 찾을 수 없습니다.", level=logging.WARNING)
|
||
return False
|
||
|
||
# 현재 활성화된 페이지 번호를 가져옴
|
||
current_page_number = int(await current_page.get_attribute("title"))
|
||
self.logger.log(f"현재페이지 : [{current_page_number}]", level=logging.INFO)
|
||
|
||
next_page_number = current_page_number + 1
|
||
|
||
# 다음 페이지 버튼을 찾음 (title 속성으로 다음 페이지를 찾음)
|
||
next_page_button_locator = self.next_page_button_template.format(page_number=next_page_number)
|
||
next_page_button = await self.page.query_selector(next_page_button_locator)
|
||
|
||
if next_page_button:
|
||
await next_page_button.click() # 페이지 버튼 클릭
|
||
# await self.page.wait_for_load_state('domcontentloaded') # 페이지 로딩이 완료될 때까지 대기
|
||
time.sleep(3)
|
||
self.logger.log(f"페이지 {next_page_number}로 이동 완료.", level=logging.INFO)
|
||
return True
|
||
else:
|
||
self.logger.log("다음 페이지가 없습니다.", level=logging.WARNING)
|
||
return False
|
||
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"다음 페이지로 이동 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
return False
|
||
|
||
async def go_to_next_page(self):
|
||
"""
|
||
수정된 CSS 선택자를 사용하여 다음 페이지로 이동합니다.
|
||
"""
|
||
try:
|
||
current_page_element = await self.page.query_selector(self.current_page_locator)
|
||
if not current_page_element:
|
||
self.logger.log("현재 페이지 정보를 찾을 수 없습니다.", level=logging.WARNING)
|
||
return False
|
||
|
||
current_page_number = int(await current_page_element.get_attribute("title"))
|
||
self.logger.log(f"현재 페이지 번호: {current_page_number}", level=logging.INFO)
|
||
next_page_number = current_page_number + 1
|
||
next_page_button_locator = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template').format(page_number=next_page_number)
|
||
self.logger.log(f"다음 페이지 선택자: {next_page_button_locator}", level=logging.DEBUG)
|
||
next_page_button = await self.page.query_selector(next_page_button_locator)
|
||
|
||
if next_page_button:
|
||
await next_page_button.click()
|
||
await self.page.wait_for_load_state('domcontentloaded', timeout=5000)
|
||
self.logger.log(f"페이지 {next_page_number}로 이동 완료.", level=logging.INFO)
|
||
return True
|
||
else:
|
||
self.logger.log("다음 페이지가 없습니다.", level=logging.WARNING)
|
||
return False
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"다음 페이지로 이동 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
return False
|
||
|
||
async def scroll_with_wheel(self, direction="down", pause_time=0.5, max_scrolls=50):
|
||
"""
|
||
휠 스크롤을 사용하여 페이지를 위나 아래로 천천히 스크롤.
|
||
|
||
Parameters:
|
||
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
|
||
- pause_time: 스크롤 사이의 대기 시간 (초).
|
||
- max_scrolls: 최대 스크롤 횟수.
|
||
"""
|
||
scroll_count = 0
|
||
|
||
self.logger.log(f"스크롤 시작", level=logging.DEBUG)
|
||
|
||
# 현재 페이지 높이 가져오기
|
||
last_height = await self.page.evaluate("document.body.scrollHeight")
|
||
self.logger.log(f"현재 페이지 높이 가져오기 - {last_height}", level=logging.DEBUG)
|
||
|
||
while scroll_count < max_scrolls:
|
||
if direction == "down":
|
||
# 아래로 스크롤
|
||
self.logger.log(f"scroll_count[{scroll_count}]회 : 휠 아래로 1000px", level=logging.DEBUG)
|
||
await self.page.evaluate("window.scrollBy(0, 1000);")
|
||
elif direction == "up":
|
||
# 위로 스크롤
|
||
self.logger.log(f"scroll_count[{scroll_count}]회 : 휠 위로 1000px", level=logging.DEBUG)
|
||
await self.page.evaluate("window.scrollBy(0, -1000);")
|
||
else:
|
||
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
|
||
|
||
self.logger.log(f"pause_time 슬립 : {pause_time}", level=logging.DEBUG)
|
||
await asyncio.sleep(pause_time)
|
||
|
||
# 새로운 페이지 높이 가져오기
|
||
new_height = await self.page.evaluate("document.body.scrollHeight")
|
||
self.logger.log(f"새로운 페이지 높이 가져오기 - {new_height}", level=logging.DEBUG)
|
||
|
||
# 스크롤이 더 이상 필요 없는 경우(페이지 끝에 도달)
|
||
if direction == "down" and new_height == last_height:
|
||
self.logger.log(f"페이지 끝에 도달했습니다. 스크롤 횟수: {scroll_count}", level=logging.DEBUG)
|
||
break
|
||
elif direction == "up" and new_height == 0:
|
||
self.logger.log(f"페이지 시작에 도달했습니다. 스크롤 횟수: {scroll_count}", level=logging.DEBUG)
|
||
break
|
||
|
||
self.logger.log(f"새로운 페이지 높이를 현재높이로 재설정 - {new_height}", level=logging.DEBUG)
|
||
last_height = new_height
|
||
scroll_count += 1
|
||
self.logger.log(f"스크롤 카운트 + 1", level=logging.DEBUG)
|
||
|
||
if scroll_count == max_scrolls:
|
||
self.logger.log("최대 스크롤 횟수에 도달했습니다.", level=logging.DEBUG)
|
||
|
||
async def scroll_with_keyboard(self, direction="down", pause_time=0.5, max_scrolls=50):
|
||
"""
|
||
키보드를 사용하여 페이지를 위나 아래로 천천히 스크롤.
|
||
|
||
Parameters:
|
||
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
|
||
- pause_time: 스크롤 사이의 대기 시간 (초).
|
||
- max_scrolls: 최대 스크롤 횟수.
|
||
"""
|
||
scroll_count = 0
|
||
|
||
while scroll_count < max_scrolls:
|
||
if direction == "down":
|
||
# 아래로 스크롤 (Page Down 키 사용)
|
||
await self.page.keyboard.press("PageDown")
|
||
elif direction == "up":
|
||
# 위로 스크롤 (Page Up 키 사용)
|
||
await self.page.keyboard.press("PageUp")
|
||
else:
|
||
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
|
||
|
||
await asyncio.sleep(pause_time)
|
||
|
||
scroll_count += 1
|
||
|
||
if scroll_count == max_scrolls:
|
||
self.logger.log("최대 스크롤 횟수에 도달했습니다.", level=logging.DEBUG)
|
||
|
||
|
||
async def collect_product_info(self, items_per_page, ed_mode):
|
||
"""
|
||
상품 정보를 수집하는 메서드
|
||
"""
|
||
try:
|
||
product_infos = []
|
||
product_name_elements = [] # product_name_element를 저장할 리스트
|
||
|
||
# ed_mode에 따라 product_elements 설정
|
||
if ed_mode:
|
||
# 각 상품의 이름, 가격, 이미지를 위한 선택자 리스트 구성 (index가 2부터 시작)
|
||
product_elements = [
|
||
{
|
||
"name": self.product_name_for_ed_template.format(index=i),
|
||
"price": self.product_price_for_ed_template.format(index=i),
|
||
"image": self.product_image_for_ed_template.format(index=i)
|
||
}
|
||
for i in range(2, items_per_page + 2) # index가 2부터 시작하도록 설정
|
||
]
|
||
else:
|
||
# ed_mode=False일 때는 각 상품의 부모 요소를 모두 선택
|
||
product_elements = await self.page.query_selector_all(self.product_parent_locator)
|
||
|
||
for i, element in enumerate(product_elements[:items_per_page], start=1):
|
||
try:
|
||
if ed_mode:
|
||
# ed_mode=True일 때는 각 상품의 개별 선택자 사용
|
||
product_name_element = await self.page.wait_for_selector(element["name"], timeout=3000, state="attached")
|
||
product_price_element = await self.page.wait_for_selector(element["price"], timeout=3000, state="attached")
|
||
product_image_element = await self.page.wait_for_selector(element["image"], timeout=3000, state="attached")
|
||
else:
|
||
# ed_mode=False일 때 부모 요소 내의 선택자를 사용
|
||
product_name_element = await self.page.wait_for_selector(self.product_name_inner_locator, timeout=3000, state="attached")
|
||
product_price_element = await self.page.wait_for_selector(self.product_price_inner_locator, timeout=3000, state="attached")
|
||
product_image_element = await self.page.wait_for_selector(self.product_image_inner_locator, timeout=3000, state="attached")
|
||
|
||
# 요소가 존재하면 정보 추출
|
||
self.logger.log(f"product_name_element : {product_name_element}", level=logging.DEBUG)
|
||
self.logger.log(f"product_price_element : {product_price_element}", level=logging.DEBUG)
|
||
self.logger.log(f"product_image_element : {product_image_element}", level=logging.DEBUG)
|
||
|
||
if product_name_element and product_price_element and product_image_element:
|
||
# await의 결과를 각 변수에 저장
|
||
product_name_text = (await product_name_element.inner_text()).strip()
|
||
product_price_text = (await product_price_element.inner_text()).strip()
|
||
product_image_url = await product_image_element.get_attribute('src')
|
||
|
||
# product_info 딕셔너리에 결과 저장
|
||
product_info = {
|
||
"name": product_name_text,
|
||
"price": product_price_text,
|
||
"image_url": product_image_url
|
||
}
|
||
self.logger.log(f"상품 {i}: {product_info}", level=logging.DEBUG)
|
||
product_infos.append(product_info)
|
||
product_name_elements.append(product_name_element) # 각 product_name_element 추가
|
||
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"상품 {i} 정보 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
continue
|
||
|
||
return product_infos, product_name_elements # product_infos와 product_name_elements 함께 반환
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"상품 정보 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
return []
|
||
|
||
|
||
|
||
|
||
async def scroll_page_to_bottom(self, pause_time=0.2):
|
||
"""페이지의 맨 아래까지 스크롤하여 모든 동적 요소를 로드"""
|
||
self.logger.log('페이지 스크롤 시작...', level=logging.INFO)
|
||
previous_height = await self.page.evaluate("() => document.body.scrollHeight")
|
||
|
||
while True:
|
||
await self.page.evaluate("window.scrollBy(0, window.innerHeight);") # 한 화면씩 스크롤
|
||
await asyncio.sleep(pause_time) # 페이지 로딩 대기
|
||
current_height = await self.page.evaluate("() => document.body.scrollHeight")
|
||
if current_height == previous_height:
|
||
break # 더 이상 스크롤할 내용이 없으면 종료
|
||
previous_height = current_height
|
||
self.logger.log('페이지 스크롤 완료.', level=logging.INFO)
|
||
|
||
async def scroll_page_to_top(self, pause_time=0.2):
|
||
"""페이지의 맨 위까지 스크롤"""
|
||
self.logger.log('페이지 위로 스크롤 시작...', level=logging.INFO)
|
||
previous_height = await self.page.evaluate("() => window.pageYOffset")
|
||
|
||
while previous_height > 0:
|
||
await self.page.evaluate("window.scrollBy(0, -window.innerHeight);") # 한 화면씩 위로 스크롤
|
||
await asyncio.sleep(pause_time) # 페이지 로딩 대기
|
||
current_height = await self.page.evaluate("() => window.pageYOffset")
|
||
if current_height == previous_height:
|
||
break # 더 이상 스크롤할 내용이 없으면 종료
|
||
previous_height = current_height
|
||
|
||
self.logger.log('페이지 위로 스크롤 완료.', level=logging.INFO)
|
||
|
||
|
||
|
||
|
||
async def start_Percenty_task(self):
|
||
self.logger.log('퍼센티 상품수정 작업을 시작합니다...', level=logging.DEBUG)
|
||
self.running = True # 번역 작업이 시작됨
|
||
self.translation_started.emit()
|
||
|
||
try:
|
||
# 금지어 목록 업데이트
|
||
self.forbidden_word_manager.refresh_forbidden_words()
|
||
|
||
# 웨일 브라우저 시작 - 최대 3번 재시도
|
||
max_retries = 3
|
||
retry_count = 0
|
||
whale_window = None
|
||
|
||
while retry_count < max_retries:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
self.logger.log(f'웨일 브라우저 시작 시도 {retry_count + 1}/{max_retries}...', level=logging.INFO)
|
||
whale_window = self.whale_translator.start_trans_browser()
|
||
|
||
if whale_window:
|
||
self.logger.log('웨일 브라우저가 성공적으로 시작되었습니다.', level=logging.INFO)
|
||
break
|
||
|
||
retry_count += 1
|
||
self.logger.log(f'웨일 브라우저 시작 실패. {retry_count}/{max_retries}', level=logging.WARNING)
|
||
await asyncio.sleep(1) # 잠시 대기 후 재시도
|
||
|
||
if not whale_window:
|
||
error_msg = "웨일 브라우저를 시작할 수 없습니다. 상품수정을 중단합니다."
|
||
self.logger.log(error_msg, level=logging.ERROR)
|
||
self.translation_error.emit(error_msg)
|
||
return
|
||
|
||
if whale_window and self.toggle_states['collect_method_combo'] == "lens":
|
||
self.whale_translator.check_capcha()
|
||
|
||
# 1. 총 상품 수 수집
|
||
self.check_pause() # 일시중지 상태 확인
|
||
await self.scroll_page_to_bottom() # 동적 로딩을 위해 끝까지 스크롤
|
||
|
||
# total_products = await self.browser_controller.get_total_product_count(ed_mode=self.toggle_states['ed_mode'])
|
||
|
||
# get_total_product_count 메서드 호출 후 결과를 딕셔너리로 받음
|
||
result = await self.get_total_product_count()
|
||
# 딕셔너리에서 총 상품 수와 페이지당 상품 수를 추출
|
||
total_products = result.get("total_count", 0)
|
||
items_per_page = result.get("items_per_page", 0)
|
||
total_pages = math.ceil(total_products / items_per_page)
|
||
self.logger.log(f"총 상품: {total_products}, 페이지당: {items_per_page}, 총 페이지: {total_pages}", level=logging.DEBUG)
|
||
self.logger.log(f"[ self.toggle_states 상태 ]\n{self.toggle_states}", level=logging.DEBUG)
|
||
|
||
if total_products == 0:
|
||
self.logger.log('수집할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG)
|
||
return
|
||
|
||
completed_count = 0
|
||
page_number = 1
|
||
|
||
# 3. 총 상품 수만큼 반복 작업 수행
|
||
# while self.running and completed_count < total_products:
|
||
for page_number in range(1, total_pages + 1):
|
||
self.check_pause() # 일시중지 상태 확인
|
||
# self.logger.log(f'현재 페이지: {page_number}', level=logging.DEBUG)
|
||
self.logger.log(f'=== 페이지 {page_number}/{total_pages} 시작 ===', level=logging.INFO)
|
||
|
||
|
||
# if not page_number == 1:
|
||
# await self.scroll_page_to_top()
|
||
# self.logger.log(f'1페이지가 아니므로 동적로딩을 위해 휠 스크롤 업', level=logging.DEBUG)
|
||
|
||
await self.scroll_page_to_top()
|
||
self.logger.log(f'동적로딩을 위해 휠 스크롤 업', level=logging.DEBUG)
|
||
|
||
if not self.toggle_states['ed_mode']:
|
||
# 4. 현재 페이지의 모든 "세부사항 수정 및 업로드" 버튼 찾기
|
||
self.logger.log('수정모드가 아니므로 상품수정 버튼 elements를 수집합니다.', level=logging.DEBUG)
|
||
product_buttons = await self.get_product_edit_buttons_by_template()
|
||
else:
|
||
self.logger.log('상품정보 수집', level=logging.DEBUG)
|
||
product_infos, product_name_elements = await self.collect_product_info(items_per_page, ed_mode=self.toggle_states['ed_mode'])
|
||
self.logger.log(f"product_infos : {product_infos}", level=logging.DEBUG)
|
||
self.logger.log('수정모드이므로 상품명 elements를 수정버튼으로 활용합니다.', level=logging.DEBUG)
|
||
product_buttons = [{"edit_button": name_element, "memo_button": None, "shipping_button": None} for name_element in product_name_elements]
|
||
|
||
self.logger.log(f"product_buttons 갯수 : [{len(product_buttons)}]개", level=logging.DEBUG)
|
||
|
||
if not product_buttons:
|
||
self.logger.log('수정할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG)
|
||
break
|
||
|
||
# 5. 각 상품에 대해 상품수정작업 수행
|
||
for index, button_set in enumerate(product_buttons, start=1):
|
||
try:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
edit_button = button_set.get("edit_button")
|
||
memo_button = button_set.get("memo_button")
|
||
shipping_button = button_set.get("shipping_button") # 해외배송비 수정 버튼
|
||
|
||
# 상태 초기화 코드 추가
|
||
self.titleGenerator.reset_state()
|
||
self.optionHandler.reset_state()
|
||
self.priceHandler.reset_state()
|
||
self.tagsHandler.reset_state()
|
||
self.thumbnailHandler.reset_state()
|
||
self.detailHandler.reset_state()
|
||
self.clipboardImageManager.reset_state() # 클립보드 초기화
|
||
|
||
# 그 외 임시 변수 초기화
|
||
self.current_options_info = None
|
||
|
||
# 상품명 수집 오류 처리
|
||
self.logger.log(f'{index}/{len(product_buttons)} 버튼의 활성상태 확인 중...', level=logging.DEBUG)
|
||
|
||
is_disabled = await self.is_button_disabled(edit_button)
|
||
if is_disabled:
|
||
self.logger.log(f'{index}/{len(product_buttons)}: 상품의 수정버튼이 비활성화되어 있어 작업을 건너뜁니다.', level=logging.DEBUG)
|
||
# completed_count += 1
|
||
# self.total_progressbar_signal.emit(completed_count, total_products)
|
||
continue
|
||
|
||
self.logger.log(f'{index}/{len(product_buttons)}: 세부사항 수정 작업 중...', level=logging.DEBUG)
|
||
|
||
# 상품 수정 다이얼로그 열기
|
||
await self.open_product_edit_dialog(edit_button)
|
||
|
||
self.check_pause() # 일시중지 상태 확인
|
||
title_infos = await self.titleGenerator.get_initial_info(self.price_setting_diag)
|
||
self.logger.log(f"title_infos : {title_infos}", level=logging.DEBUG)
|
||
|
||
|
||
# 금지카테고리 여부가 True이면, 금지카테고리 제목 설정 후 저장하고 다음 상품으로 넘어감.
|
||
# if title_infos.get("is_banned_category", False):
|
||
if title_infos.get("is_banned_category", False):
|
||
banned_title = self.titleGenerator.set_banned_category_title(title_infos.get("banned_category_info"))
|
||
self.logger.log(f"금지카테고리 상품 처리: 새 제목: {banned_title}", level=logging.INFO)
|
||
|
||
# 랜덤 4자리 붙이기
|
||
random_suffix = self.generate_random_suffix()
|
||
|
||
# 상품명 설정 (TitleGenerator 내부 또는 별도 메서드를 통해)
|
||
is_set = await self.titleGenerator.set_product_name(banned_title + random_suffix)
|
||
if is_set:
|
||
self.logger.log("금지카테고리 상품명 설정 완료.", level=logging.INFO)
|
||
else:
|
||
self.logger.log("금지카테고리 상품명 설정 실패.", level=logging.WARNING)
|
||
await self.save_and_ecs_product_edit()
|
||
# completed_count += 1
|
||
# self.total_progressbar_signal.emit(completed_count, total_products)
|
||
continue # 다음 상품으로 넘어감
|
||
|
||
|
||
# 정상상품이면 상품명 수정과 카테고리 수집
|
||
is_title = self.toggle_states['title']
|
||
is_title_shuffle = self.toggle_states['title_shuffle']
|
||
if is_title or is_title_shuffle:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
self.logger.log(f"상품명 수정 : {is_title} ", level=logging.DEBUG)
|
||
self.start_stage_signal.emit(0)
|
||
title_infos["generated_name"] = await self.titleGenerator.process_title()
|
||
self.complete_stage_signal.emit(0)
|
||
|
||
# 옵션 수정
|
||
self.start_stage_signal.emit(1)
|
||
is_optionTrnas = self.toggle_states.get('optionTrnas')
|
||
is_optionIMGTrans = self.toggle_states.get('optionIMGTrans')
|
||
is_optionAutoSelect = self.toggle_states.get('optionAutoSelect')
|
||
|
||
if is_optionTrnas or is_optionIMGTrans or is_optionAutoSelect:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
self.logger.log(f"옵션수정 : optionTrnas={is_optionTrnas} + optionIMGTrans={is_optionIMGTrans} + optionAutoSelect={is_optionAutoSelect}", level=logging.DEBUG)
|
||
await self.edit_option(title_infos.get("original_name", None))
|
||
|
||
self.complete_stage_signal.emit(1)
|
||
|
||
# 가격 수정
|
||
self.start_stage_signal.emit(2)
|
||
is_price = self.toggle_states.get('price')
|
||
if is_price:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
self.logger.log(f"가격수정 : {is_price} ", level=logging.DEBUG)
|
||
await self.edit_price(title_infos)
|
||
self.complete_stage_signal.emit(2)
|
||
|
||
# 썸네일 수정
|
||
self.start_stage_signal.emit(3)
|
||
thumb = self.toggle_states.get('thumb')
|
||
self.logger.log(f"썸네일수정 : {thumb} ", level=logging.DEBUG)
|
||
|
||
if thumb:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
await self.edit_thumb()
|
||
self.complete_stage_signal.emit(3)
|
||
|
||
# 태그 수정
|
||
tag = self.toggle_states.get('tag')
|
||
if tag:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
self.logger.log(f"키워드태그 수정 : {tag} ", level=logging.DEBUG)
|
||
self.start_stage_signal.emit(4)
|
||
await self.edit_tags(title_infos)
|
||
self.complete_stage_signal.emit(4)
|
||
|
||
# 상세페이지 수정 수정
|
||
detail_Option = self.toggle_states.get('detail_Option')
|
||
detail_IMGTrans = self.toggle_states.get('detail_IMGTrans')
|
||
|
||
if detail_Option or detail_IMGTrans:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
self.logger.log(f"상세페이지 수정 : {detail_Option} + {detail_IMGTrans}", level=logging.DEBUG)
|
||
|
||
# 상세페이지 수정
|
||
self.start_stage_signal.emit(5)
|
||
await self.edit_detail(detail_Option, detail_IMGTrans)
|
||
self.complete_stage_signal.emit(5)
|
||
|
||
# 수정 후 저장
|
||
self.logger.log('상품 세부사항 저장 중...', level=logging.DEBUG)
|
||
await self.save_and_ecs_product_edit()
|
||
|
||
# 메모 입력
|
||
if memo_button and self.toggle_states['memo']:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
self.logger.log(f'{index}/{len(product_buttons)}: 메모 입력 중...', level=logging.DEBUG)
|
||
await self.insert_product_infos_to_memo(memo_button, title_infos)
|
||
|
||
# completed_count += 1
|
||
# self.logger.log(f'{completed_count}/[{total_products}]개 상품 수정 완료.', level=logging.INFO)
|
||
# self.total_progressbar_signal.emit(completed_count, total_products)
|
||
|
||
# self.log_memory_usage(f"{completed_count} 개 상품 수정 후 ")
|
||
|
||
# if (completed_count % 10) == 0:
|
||
# gc.collect()
|
||
# mem = psutil.virtual_memory()
|
||
# self.logger.log(f"GC 호출 후, 메모리 사용량: {mem.percent}% 사용중", level=logging.DEBUG)
|
||
|
||
# if completed_count >= total_products:
|
||
# self.logger.log(f'[{total_products}]개 상품 수정이 완료되었습니다.', level=logging.INFO)
|
||
# # self.translation_completed.emit(total_products)
|
||
# return
|
||
except Exception as item_err:
|
||
# 이 상품만 실패로 기록하고, 다음 상품으로
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f" ▶ 상품 {index}/{len(product_buttons)} 처리중 오류: {item_err}", level=logging.ERROR, exc_info=True)
|
||
|
||
finally:
|
||
# 성공·실패 관계없이 진행 카운트와 프로그레스바 업데이트
|
||
completed_count += 1
|
||
self.logger.log(f'{completed_count}/[{total_products}]개 상품 수정 완료.', level=logging.INFO)
|
||
self.total_progressbar_signal.emit(completed_count, total_products)
|
||
|
||
if (completed_count % 10) == 0:
|
||
gc.collect()
|
||
mem = psutil.virtual_memory()
|
||
self.logger.log(f"GC 호출 후, 메모리 사용량: {mem.percent}% 사용중", level=logging.DEBUG)
|
||
|
||
# 목표 도달 체크
|
||
if completed_count >= total_products:
|
||
break
|
||
|
||
# (3) 다음 페이지로 이동
|
||
if page_number < total_pages:
|
||
success = await self.go_to_next_page()
|
||
if not success:
|
||
err = f"페이지 {page_number} → {page_number+1} 이동 실패"
|
||
self.logger.log(err, level=logging.ERROR)
|
||
self.translation_error.emit(err)
|
||
return
|
||
else:
|
||
self.logger.log("마지막 페이지까지 모두 처리했습니다.", level=logging.INFO)
|
||
|
||
except Exception as fatal:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
# 전체 작업 중에 발생한 치명적 예외
|
||
self.logger.log(f"전체 작업 중 오류: {fatal}", level=logging.ERROR, exc_info=True)
|
||
self.translation_error.emit(str(fatal))
|
||
|
||
else:
|
||
# 예외 없이 ‘정상적으로’ 전체 루프를 마쳤을 때
|
||
self.logger.log(f"모든 상품({completed_count}/{total_products}) 처리 완료", level=logging.INFO)
|
||
self.translation_completed.emit(completed_count)
|
||
|
||
finally:
|
||
# 브라우저나 리소스는 항상 닫아 주기
|
||
try:
|
||
self.whale_translator.close_trans_browser()
|
||
except:
|
||
pass
|
||
|
||
|
||
async def insert_product_infos_to_memo(self, memo_button, title_infos):
|
||
try:
|
||
collect_method = self.toggle_states['collect_method_combo']
|
||
is_new_memo_first = self.toggle_states['memo_toggle_order']
|
||
is_memo_exposure = self.toggle_states['memo_toggle_exposer']
|
||
|
||
# 1. 메모 버튼 유효성 확인
|
||
if await self.is_button_disabled(memo_button):
|
||
self.logger.log("메모 버튼이 비활성화되어 있어 작업을 건너뜁니다.", level=logging.WARNING)
|
||
return
|
||
|
||
# 2. title_infos 유효성 검사
|
||
top_5_titles = title_infos.get("top_5_titles", [])
|
||
top_5_prices = title_infos.get("top_5_prices", [])
|
||
if not top_5_titles or not top_5_prices:
|
||
self.logger.log("title_infos에 메모할 내용이 없어 건너뜁니다.", level=logging.WARNING)
|
||
return
|
||
|
||
# 3. 메모용 텍스트 생성
|
||
new_memo = self.generate_titles_with_prices(title_infos)
|
||
if not new_memo:
|
||
self.logger.log("생성된 메모 텍스트가 비어 있어 건너뜁니다.", level=logging.WARNING)
|
||
return
|
||
|
||
# 4. 메모 다이얼로그 열기
|
||
await memo_button.click()
|
||
self.logger.log("메모 버튼 클릭", level=logging.DEBUG)
|
||
|
||
memo_input = self.page.locator(self.memo_input_locator)
|
||
await memo_input.wait_for(state="visible")
|
||
self.logger.log("메모 입력란 대기 완료", level=logging.DEBUG)
|
||
|
||
# 5. 기존 메모 읽어오기
|
||
existing = await memo_input.input_value()
|
||
combined = existing.strip()
|
||
|
||
# 6. 새 메모 조합
|
||
suffix = "\n" + "-"*10 + "\n" + f"수집방법:[{collect_method}]\n"
|
||
if combined:
|
||
if is_new_memo_first:
|
||
combined = new_memo + suffix + combined
|
||
else:
|
||
combined = combined + suffix + new_memo
|
||
else:
|
||
combined = f"수집방법:[{collect_method}]\n" + new_memo
|
||
|
||
# 7. 길이 제한 (2000자)
|
||
if len(combined) > 2000:
|
||
combined = combined[:2000]
|
||
|
||
# 8. 채우기
|
||
await memo_input.fill(combined)
|
||
self.logger.log(f"메모 입력: {combined!r}", level=logging.DEBUG)
|
||
|
||
# 9. 노출 체크박스 처리
|
||
# chk = self.page.locator(
|
||
# 'label:has-text("상품 목록에 메모 내용 노출하기") '
|
||
# 'input[type="checkbox"]'
|
||
# )
|
||
# await chk.wait_for(state="attached")
|
||
# checked = await chk.is_checked()
|
||
# if is_memo_exposure and not checked:
|
||
# await chk.click()
|
||
# self.logger.log("메모 노출 체크 ON", level=logging.DEBUG)
|
||
# elif not is_memo_exposure and checked:
|
||
# await chk.click()
|
||
# self.logger.log("메모 노출 체크 OFF", level=logging.DEBUG)
|
||
|
||
exposer = self.page.locator(self.memo_exposer_locator)
|
||
|
||
# 노출 체크박스 존재 대기
|
||
await exposer.wait_for(state="attached")
|
||
|
||
# 노출 체크박스 활성화 대기
|
||
await exposer.wait_for(state="enabled", timeout=5000)
|
||
|
||
if is_memo_exposure:
|
||
await exposer.check()
|
||
self.logger.log("메모 노출 체크 ON", level=logging.DEBUG)
|
||
|
||
else:
|
||
await exposer.uncheck()
|
||
self.logger.log("메모 노출 체크 OFF", level=logging.DEBUG)
|
||
|
||
# 10. 저장
|
||
save_btn = self.page.locator(self.memo_save_btn_locator)
|
||
await save_btn.wait_for(state="visible")
|
||
await save_btn.click()
|
||
self.logger.log("메모 저장 클릭", level=logging.DEBUG)
|
||
await asyncio.sleep(0.3)
|
||
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"메모 입력 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
# ESC 키 2번 전송하여 팝업 제거 시도
|
||
self.logger.log("ESC 키를 2번 전송하여 팝업 제거 후 메모 입력 재시도합니다.", level=logging.WARNING)
|
||
await self.page.keyboard.press("Escape")
|
||
await self.page.keyboard.press("Escape")
|
||
await asyncio.sleep(0.5) # 잠시 대기
|
||
|
||
# 메모 입력 재시도
|
||
try:
|
||
await memo_button.click()
|
||
self.logger.log("재시도: 메모 버튼을 클릭했습니다.", level=logging.DEBUG)
|
||
memo_input = self.page.locator(self.memo_input_locator)
|
||
await memo_input.wait_for(state="visible")
|
||
self.logger.log("재시도: 메모 입력란 대기 완료", level=logging.DEBUG)
|
||
|
||
# 5. 기존 메모 읽어오기
|
||
existing = await memo_input.input_value()
|
||
combined = existing.strip()
|
||
|
||
# 6. 새 메모 조합
|
||
suffix = "\n" + "-"*10 + "\n" + f"수집방법:[{collect_method}]\n"
|
||
if combined:
|
||
if is_new_memo_first:
|
||
combined = new_memo + suffix + combined
|
||
else:
|
||
combined = combined + suffix + new_memo
|
||
else:
|
||
combined = f"수집방법:[{collect_method}]\n" + new_memo
|
||
|
||
# 7. 길이 제한 (2000자)
|
||
if len(combined) > 2000:
|
||
combined = combined[:2000]
|
||
|
||
# 8. 채우기
|
||
await memo_input.fill(combined)
|
||
self.logger.log(f"재시도: 메모 입력: {combined!r}", level=logging.DEBUG)
|
||
|
||
# 9. 노출 체크박스 처리
|
||
# chk = self.page.locator(
|
||
# 'label:has-text("상품 목록에 메모 내용 노출하기") '
|
||
# 'input[type="checkbox"]'
|
||
# )
|
||
# await chk.wait_for(state="attached")
|
||
# checked = await chk.is_checked()
|
||
# if is_memo_exposure and not checked:
|
||
# await chk.click()
|
||
# self.logger.log("재시도: 메모 노출 체크 ON", level=logging.DEBUG)
|
||
# elif not is_memo_exposure and checked:
|
||
# await chk.click()
|
||
# self.logger.log("재시도: 메모 노출 체크 OFF", level=logging.DEBUG)
|
||
|
||
is_memo_exposure = self.toggle_states.get("memo_exposure", False)
|
||
exposer = self.page.locator(self.memo_exposer_locator)
|
||
|
||
# 노출 체크박스 존재 대기
|
||
await exposer.wait_for(state="attached")
|
||
|
||
# 노출 체크박스 활성화 대기
|
||
await exposer.wait_for(state="enabled", timeout=5000)
|
||
|
||
if is_memo_exposure:
|
||
await exposer.check()
|
||
else:
|
||
await exposer.uncheck()
|
||
|
||
# 10. 저장
|
||
save_btn = self.page.locator(self.memo_save_btn_locator)
|
||
await save_btn.wait_for(state="visible")
|
||
await save_btn.click()
|
||
self.logger.log("재시도: 메모 저장 클릭", level=logging.DEBUG)
|
||
await asyncio.sleep(0.3)
|
||
except Exception as e2:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"재시도 후에도 메모 입력 중 오류 발생: {e2}", level=logging.ERROR, exc_info=True)
|
||
# ESC 키 2번 전송하여 팝업 제거 후 다음 상품으로 넘어감
|
||
self.logger.log("재시도 실패: ESC 키를 2번 전송하여 창을 닫고 다음 상품으로 넘어갑니다.", level=logging.WARNING)
|
||
await self.page.keyboard.press("Escape")
|
||
await self.page.keyboard.press("Escape")
|
||
# 다음 상품으로 넘어가기 위해 예외를 재발생하지 않고 그냥 리턴합니다.
|
||
return
|
||
|
||
def generate_titles_with_prices(self, title_infos):
|
||
"""top_5_titles와 top_5_prices를 매칭하여 텍스트 생성"""
|
||
try:
|
||
titles = title_infos.get("top_5_titles", [])
|
||
prices = title_infos.get("top_5_prices", [])
|
||
|
||
if not titles or not prices:
|
||
self.logger.log("top_5_titles 또는 top_5_prices가 비어 있습니다.", level=logging.WARNING)
|
||
return ""
|
||
|
||
# 매칭된 텍스트 생성
|
||
formatted_lines = []
|
||
for title, price in zip(titles, prices):
|
||
formatted_price = f"{int(price):,}원" # 3자리수마다 콤마, "원" 추가
|
||
formatted_lines.append(f"{title} - {formatted_price}")
|
||
|
||
# 최종 텍스트 생성
|
||
result_text = "\n".join(formatted_lines)
|
||
self.logger.log(f"Generated titles with prices:\n{result_text}", level=logging.DEBUG)
|
||
return result_text
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"텍스트 생성 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
return ""
|
||
|
||
def clear_temp_folder(self, folder_path):
|
||
"""
|
||
폴더 내 모든 파일 삭제, 폴더 없으면 생성
|
||
"""
|
||
if os.path.exists(folder_path):
|
||
# 폴더 내 파일/폴더 삭제
|
||
for filename in os.listdir(folder_path):
|
||
file_path = os.path.join(folder_path, filename)
|
||
try:
|
||
if os.path.isfile(file_path) or os.path.islink(file_path):
|
||
os.unlink(file_path)
|
||
elif os.path.isdir(file_path):
|
||
shutil.rmtree(file_path)
|
||
except Exception as e:
|
||
self.logger.log(f"파일 삭제 실패: {file_path}, 에러: {e}", level=logging.ERROR, exc_info=True)
|
||
else:
|
||
os.makedirs(folder_path, exist_ok=True)
|
||
|
||
async def edit_option(self, product_name):
|
||
# 상세페이지 탭 클릭
|
||
await self.click_option_tab()
|
||
|
||
# 옵션 최대선택갯수
|
||
self.current_options_info = await self.optionHandler.process_options(product_name, self.forbidden_word_manager, self.toggle_states)
|
||
|
||
# 수정 후 저장
|
||
await self.save_product_edit()
|
||
|
||
async def edit_price(self, title_infos):
|
||
# 상세페이지 탭 클릭
|
||
await self.click_price_tab()
|
||
|
||
# 가격 수정 프로세스
|
||
await self.priceHandler.process_price(title_infos=title_infos)
|
||
|
||
# 수정 후 저장
|
||
await self.save_product_edit()
|
||
|
||
|
||
async def edit_thumb(self):
|
||
# 상세페이지 탭 클릭
|
||
await self.click_thumb_tab()
|
||
|
||
# 가격 수정 프로세스
|
||
await self.thumbnailHandler.process_thumbnails(self.toggle_states)
|
||
|
||
# 수정 후 저장
|
||
await self.save_product_edit()
|
||
|
||
async def edit_tags(self, title_infos):
|
||
# 상세페이지 탭 클릭
|
||
await self.click_tags_tab()
|
||
|
||
# 가격 수정 프로세스
|
||
await self.tagsHandler.process_tags(self.forbidden_word_manager, title_infos=title_infos)
|
||
|
||
# 수정 후 저장
|
||
await self.save_product_edit()
|
||
|
||
async def edit_detail(self, detail_Option, detail_IMGTrans):
|
||
# 상세페이지 탭 클릭
|
||
await self.click_detail_tab()
|
||
|
||
# 상세페이지 수정 프로세스
|
||
await self.detailHandler.process_detail(detail_Option, detail_IMGTrans, self.optionHandler)
|
||
|
||
# 수정 후 저장
|
||
await self.save_product_edit()
|
||
|
||
|
||
|
||
def run(self):
|
||
"""QThread의 run 메서드에서 이벤트 루프 생성 및 실행"""
|
||
self.logger.log("run() - 이벤트 루프 초기화 시작", level=logging.DEBUG)
|
||
|
||
# 이벤트 루프가 없거나 종료된 경우에만 초기화
|
||
if not self.loop or self.loop.is_closed():
|
||
self.loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(self.loop)
|
||
self.logger.log("run() - 이벤트 루프가 생성되었습니다.", level=logging.DEBUG)
|
||
|
||
# 이벤트 루프를 유지하여 외부에서 작업 추가 가능
|
||
self.loop.run_forever()
|
||
|
||
def start_browser_task(self, avx_supported):
|
||
"""이벤트 루프에서 브라우저 초기화 작업 추가"""
|
||
self.logger.log(f"start_browser_task - 브라우저 초기화 작업 시작 : {self.toggle_states}", level=logging.DEBUG)
|
||
self.avx_supported = avx_supported
|
||
|
||
# 실행 중인 이벤트 루프에 비동기 작업 추가
|
||
if self.loop and not self.loop.is_closed():
|
||
asyncio.run_coroutine_threadsafe(self.start_browser_async(), self.loop)
|
||
self.browser_task_started = True # 작업 실행 플래그 설정
|
||
self.logger.log("start_browser_task - 비동기 작업이 추가되었습니다.", level=logging.DEBUG)
|
||
else:
|
||
self.logger.log("start_browser_task - 실행 중인 이벤트 루프가 없습니다.", level=logging.ERROR)
|
||
|
||
def select_group_task(self, group_index: int):
|
||
"""이벤트 루프에서 그룹 선택 작업 실행"""
|
||
self.logger.log(f"select_group_task - 그룹 선택 작업 추가: {group_index}", level=logging.DEBUG)
|
||
if self.loop and not self.loop.is_closed():
|
||
asyncio.run_coroutine_threadsafe(self.select_group_index(group_index), self.loop)
|
||
self.logger.log("select_group_task - 비동기 그룹선택 작업이 추가됨.", level=logging.DEBUG)
|
||
else:
|
||
self.logger.log("select_group_task - 실행 중인 이벤트 루프가 없습니다.", level=logging.ERROR)
|
||
|
||
def start_PercentyJob_task(self):
|
||
"""번역 작업을 이벤트 루프에서 실행"""
|
||
# 이벤트 루프가 없거나 닫혀 있으면 새로 생성
|
||
# self.initialize_event_loop()
|
||
|
||
if self.loop and not self.loop.is_closed():
|
||
# 이미 실행 중인 이벤트 루프에 번역 작업 추가
|
||
asyncio.run_coroutine_threadsafe(self.start_Percenty_task(), self.loop)
|
||
else:
|
||
self.logger.log("이벤트 루프가 초기화되지 않았거나 이미 종료되었습니다.", level=logging.ERROR)
|
||
|
||
def initialize_event_loop(self):
|
||
"""이벤트 루프가 없거나 닫힌 경우 새로 생성"""
|
||
if not self.loop or self.loop.is_closed():
|
||
self.logger.log("initialize_event_loop - 새로운 이벤트 루프 생성 중", level=logging.DEBUG)
|
||
self.loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(self.loop)
|
||
self.logger.log("initialize_event_loop - 이벤트 루프가 생성되었습니다.", level=logging.DEBUG)
|
||
else:
|
||
self.logger.log("initialize_event_loop - 기존 이벤트 루프 사용 중", level=logging.DEBUG)
|
||
|
||
def stop(self):
|
||
"""Playwright를 안전하게 종료"""
|
||
try:
|
||
# 비동기 함수 호출을 동기식으로 처리
|
||
asyncio.run(self._stop_playwright())
|
||
except RuntimeError as e:
|
||
self.logger.log(f"Playwright 종료 중 오류 발생: {e}", level=logging.ERROR)
|
||
|
||
async def _stop_playwright(self):
|
||
"""모든 페이지/브라우저/Playwright 종료"""
|
||
# 1. 페이지 닫기
|
||
if self.browser:
|
||
for page in self.browser.pages:
|
||
try: await page.close()
|
||
except: pass
|
||
try: await self.browser.close()
|
||
except: pass
|
||
# 2. Playwright 종료
|
||
if self.playwright:
|
||
try: await self.playwright.stop()
|
||
except: pass
|
||
# 3. WhaleTranslator 등 추가 리소스
|
||
if self.whale_translator:
|
||
self.whale_translator.close_trans_browser()
|
||
# 4. 기타 핸들러 정리
|
||
if hasattr(self, 'imageProcessor') and self.imageProcessor:
|
||
self.imageProcessor.cleanup()
|
||
# 5. 루프 종료
|
||
if self.loop and not self.loop.is_closed():
|
||
self.loop.call_soon_threadsafe(self.loop.stop)
|
||
|
||
def force_terminate_browser(self):
|
||
"""Playwright 브라우저 프로세스를 강제로 종료"""
|
||
try:
|
||
if self.browser:
|
||
browser_pid = self.browser.contexts[0]._channel.owner._impl_obj._browser_pid
|
||
browser_process = psutil.Process(browser_pid)
|
||
for child in browser_process.children(recursive=True):
|
||
child.kill()
|
||
browser_process.kill()
|
||
self.logger.log("브라우저 프로세스를 강제로 종료했습니다.", level=logging.WARNING)
|
||
|
||
if self.whale_translator:
|
||
self.whale_translator.close_trans_browser()
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"브라우저 프로세스 강제 종료 중 오류 발생: {e}", level=logging.ERROR)
|
||
|
||
def terminate(self):
|
||
"""QThread 종료시 정리"""
|
||
self.logger.log("크롬 스레드 종료", level=logging.INFO)
|
||
self.request_cleanup() # QThread 이벤트루프에서 정리
|
||
super().terminate()
|
||
|
||
def cleanup(self):
|
||
"""BrowserController 종료 시 리소스 정리"""
|
||
try:
|
||
# ImageProcessor 리소스 정리
|
||
if hasattr(self, 'imageProcessor') and self.imageProcessor:
|
||
self.logger.log("ImageProcessor 리소스 정리 중...", level=logging.INFO)
|
||
self.imageProcessor.cleanup()
|
||
except Exception as e:
|
||
self.logger.log(f"리소스 정리 중 오류 발생: {e}", level=logging.ERROR)
|
||
|
||
def request_cleanup(self):
|
||
"""브라우저 리소스 정리 명령을 QThread 이벤트루프에 올림"""
|
||
if self.loop and not self.loop.is_closed():
|
||
asyncio.run_coroutine_threadsafe(self._stop_playwright(), self.loop)
|
||
else:
|
||
self.logger.log("정리 요청 시 루프가 없습니다.", level=logging.ERROR)
|
||
|
||
def check_pause(self):
|
||
"""일시 정지 상태라면 재개될 때까지 대기"""
|
||
self.pause_mutex.lock()
|
||
if self.is_paused:
|
||
# 일시정지 시작 시그널 발생
|
||
self.pause_confirmed.emit()
|
||
self.logger.log("일시정지 상태 확인 - 작업 일시중지됨", level=logging.INFO)
|
||
self.pause_condition.wait(self.pause_mutex)
|
||
self.pause_mutex.unlock()
|
||
|
||
def pause(self):
|
||
"""스레드 일시 정지"""
|
||
self.pause_mutex.lock()
|
||
self.is_paused = True
|
||
self.pause_mutex.unlock()
|
||
self.logger.log("스레드가 일시 정지되었습니다.", level=logging.DEBUG)
|
||
|
||
def resume(self):
|
||
"""스레드 재개"""
|
||
self.pause_mutex.lock()
|
||
self.is_paused = False
|
||
self.pause_condition.wakeAll()
|
||
self.pause_mutex.unlock()
|
||
self.logger.log("스레드가 재개되었습니다.", level=logging.DEBUG)
|
||
|
||
|
||
def log_memory_usage(self, message=""):
|
||
mem = psutil.virtual_memory()
|
||
self.logger.log(f"{message} 메모리 사용: 총 {mem.total/1024/1024:.1f}MB, 사용 {mem.used/1024/1024:.1f}MB, 사용률 {mem.percent}%", level=logging.DEBUG)
|
||
|
||
def pause_task(self):
|
||
"""작업을 일시정지합니다"""
|
||
self.logger.log("작업 일시정지 요청...", level=logging.INFO)
|
||
|
||
# 일시정지 상태 설정
|
||
self.pause()
|
||
|
||
# 디테일 프로그레스바 메시지 업데이트 (시그널 발송)
|
||
self.update_detail_progress_signal.emit(0, 0)
|
||
|
||
# 추가 로직이 필요한 경우 여기에 구현
|
||
|
||
def resume_task(self):
|
||
"""일시정지된 작업을 재개합니다"""
|
||
self.logger.log("작업 재개 요청...", level=logging.INFO)
|
||
|
||
# 재개 상태 설정
|
||
self.resume()
|
||
|
||
# 추가 로직이 필요한 경우 여기에 구현
|