3416 lines
175 KiB
Python
3416 lines
175 KiB
Python
from playwright.async_api import async_playwright, TimeoutError
|
||
|
||
from playwright.__main__ import main as playwright_main
|
||
from PySide6.QtCore import QThread, Signal, QMutex, QWaitCondition, Slot
|
||
import re
|
||
# import pyautogui
|
||
import time
|
||
import json
|
||
import asyncio
|
||
import os, sys, random
|
||
import requests
|
||
import inspect
|
||
import gc
|
||
import psutil
|
||
import shutil
|
||
from datetime import datetime, timedelta
|
||
import math
|
||
from io import BytesIO
|
||
|
||
# from src.wh_search import WhaleSearchParser
|
||
# from src.wh_img_trans import WhaleImageTranslator
|
||
# from src.whale_new import WhaleTranslator
|
||
|
||
# from src.modules.clipboardImageManager import ClipboardImageManager
|
||
from src.contents.option import OptionHandler
|
||
from src.contents.price import PriceHandler
|
||
# from src.contents.details import DetailHandler
|
||
from src.contents.details import DetailHandler
|
||
from src.contents.titleGenerator import TitleGenerator
|
||
from src.contents.thumb import ThumbnailHandler
|
||
|
||
from src.translator.papago_translator import PapagoTranslator
|
||
|
||
# from src.modules.image_processor import ImageProcessor
|
||
|
||
from src.contents.tags import TagsHandler
|
||
from src.titleManager.sp_ForbiddenM import SupabaseForbiddenWordManager
|
||
from src.titleManager.gpt_client import GPTClient
|
||
|
||
import tempfile
|
||
import logging
|
||
|
||
# 새로운 모듈 import
|
||
import string
|
||
|
||
class BrowserController(QThread):
|
||
|
||
# 브라우저 시작 시그널 정의
|
||
browser_started = Signal()
|
||
browser_create_error = Signal(str)
|
||
|
||
image_processor_error = Signal(str)
|
||
premium_event_started = Signal(str)
|
||
|
||
unknown_browser_error = Signal(str)
|
||
browser_login_error = Signal(str)
|
||
browser_ad1_close_error = Signal(str)
|
||
browser_ad2_close_error = Signal(str)
|
||
browser_group_list_error = Signal(str)
|
||
browser_handler_update_error = Signal(str)
|
||
browser_parsing_page_error = Signal(str)
|
||
browser_registered_product_page_error = Signal(str)
|
||
browser_new_product_page_error = Signal(str)
|
||
|
||
# 번역 작업 시작, 완료, 오류 시그널 정의
|
||
translation_started = Signal()
|
||
translation_completed = Signal(int)
|
||
translation_error = Signal(str)
|
||
|
||
start_stage_signal = Signal(int)
|
||
complete_stage_signal = Signal(int)
|
||
|
||
total_progressbar_signal = Signal(int, int) # (현재 값, 총 값)
|
||
|
||
update_detail_progress_signal = Signal(int, int) # (현재 값, 총 값)
|
||
set_progress_visible_signal = Signal(bool) # ProgressBar 표시/숨김
|
||
|
||
percentyJob_button_Enable = Signal(bool)
|
||
|
||
selected_group_name_signal = Signal(str, int) # 선택된 그룹 이름 시그널
|
||
group_list_signal = Signal(list) # 그룹 리스트 시그널
|
||
|
||
# 일시정지 확인 시그널
|
||
pause_confirmed = Signal()
|
||
|
||
def __init__(self, app, logger, locator_manager, price_setting_diag, detail_text_widget, login_infos, toggle_states, user_id, supabase_manager):
|
||
super().__init__()
|
||
self.app = app # Main_GUI 객체 저장
|
||
self.logger = logger
|
||
self.locator_manager = locator_manager
|
||
self.price_setting_diag = price_setting_diag
|
||
self.detail_text_widget = detail_text_widget
|
||
self.toggle_states = toggle_states
|
||
self.login_infos = login_infos
|
||
self.user_id = user_id
|
||
self.supabase_manager = supabase_manager
|
||
|
||
self.is_trans_enabled = False
|
||
self.authenticated_by_admin = False
|
||
self.user_membership_level = None
|
||
self.is_valid_level = False
|
||
|
||
self.image_processor = None
|
||
self.is_image_processor_init = False
|
||
|
||
self.route_registered = False
|
||
self.browser_task_started = False
|
||
|
||
self.parsing_page = None
|
||
self.current_options_info = None
|
||
self.chrome_hwnd = None
|
||
|
||
self.base_path = self.toggle_states['base_dir']
|
||
self.logger.log(f"base_path: {self.base_path}", level=logging.DEBUG)
|
||
|
||
# 1. 임시 이미지 폴더 (작업 중)
|
||
self.TEMP_IMAGE_DIR = self.toggle_states.get('TEMP_IMAGE_DIR', "")
|
||
|
||
# 2. 에러 스크린샷 폴더 (장기 보관)
|
||
self.ERROR_SCREENSHOT_DIR = self.toggle_states.get('ERROR_SCREENSHOT_DIR', "")
|
||
|
||
self.clear_old_screenshot_dirs(days=14)
|
||
self.logger.log(f"error 디렉토리 삭제 완료", level=logging.INFO)
|
||
|
||
# self.whale_translator = WhaleTranslator(logger, debug_flag=self.toggle_states['debug_mode'])
|
||
|
||
self.playwright = None
|
||
self.browser = None
|
||
self.page = None
|
||
self.parsing_page = None
|
||
|
||
self.loop = None # 이벤트 루프를 저장할 변수
|
||
|
||
# 일시 정지 기능을 위한 QMutex와 QWaitCondition 설정
|
||
self.pause_mutex = QMutex()
|
||
self.pause_condition = QWaitCondition()
|
||
self.is_paused = False # 일시 정지 상태 플래그
|
||
|
||
self.gpt_client = GPTClient(logger=self.logger, supabase_manager=self.supabase_manager, user_id=self.user_id)
|
||
|
||
self.forbidden_word_manager = SupabaseForbiddenWordManager(self.logger, self.supabase_manager, self.user_id)
|
||
|
||
# self.clipboardImageManager = ClipboardImageManager(logger=self.logger, watermark_font_size=36, debug_flag=self.toggle_states['debug_mode'])
|
||
|
||
# ImageProcessor를 먼저 초기화
|
||
# self.imageProcessor = ImageProcessor(self.logger, self.page, self.whale_translator, self.clipboardImageManager, self.TEMP_IMAGE_DIR, self.toggle_states)
|
||
|
||
self.papago_translator = PapagoTranslator(self.logger, self, self.page)
|
||
|
||
# ImageProcessor를 포함하여 다른 핸들러들 초기화
|
||
self.optionHandler = OptionHandler(self.locator_manager, self, self.TEMP_IMAGE_DIR, self.logger, self.gpt_client, imageProcessor=self.image_processor, update_detail_progress_signal=self.update_detail_progress_signal, set_progress_visible_signal=self.set_progress_visible_signal, toggle_states=self.toggle_states, papago_translator=self.papago_translator)
|
||
self.priceHandler = PriceHandler(self.locator_manager, self, self.logger, self.optionHandler, self.price_setting_diag, self.toggle_states, debug_flag=self.toggle_states['debug_mode'])
|
||
self.thumbnailHandler = ThumbnailHandler(self.locator_manager, self, self.logger, self.toggle_states, self.image_processor, self.update_detail_progress_signal, self.set_progress_visible_signal, self.TEMP_IMAGE_DIR)
|
||
# self.titleGenerator = TitleGenerator(self.locator_manager, self, self.logger, self.whale_translator, self.toggle_states, self.gpt_client, self.forbidden_word_manager, self.user_id, self.supabase_manager)
|
||
self.titleGenerator = TitleGenerator(self.locator_manager, self, self.logger, self.toggle_states, self.gpt_client, self.forbidden_word_manager, self.user_id, self.supabase_manager, papago_translator=self.papago_translator)
|
||
self.tagsHandler = TagsHandler(self.locator_manager, self, self.logger, self.toggle_states)
|
||
|
||
self.detailHandler = DetailHandler(self.locator_manager, self, self.image_processor, self.detail_text_widget, self.TEMP_IMAGE_DIR, self.logger, self.gpt_client, self.update_detail_progress_signal, self.set_progress_visible_signal, self.toggle_states)
|
||
|
||
# BrowserController에 해당하는 모든 locator를 정의
|
||
self.chrome_window_name = self.locator_manager.get_locator('BrowserControl', 'chrome_window_name')
|
||
self.login_email_locator = self.locator_manager.get_locator('BrowserControl', 'login_email_locator')
|
||
self.login_password_locator = self.locator_manager.get_locator('BrowserControl', 'login_password_locator')
|
||
self.login_button_locator = self.locator_manager.get_locator('BrowserControl', 'login_button_locator')
|
||
self.admin_toggle_locator = self.locator_manager.get_locator('BrowserControl', 'admin_toggle_locator')
|
||
self.staff_id_locator = self.locator_manager.get_locator('BrowserControl', 'staff_id_locator')
|
||
self.staff_login_button_locator = self.locator_manager.get_locator('BrowserControl', 'staff_login_button_locator')
|
||
self.close_ad_dialog_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_dialog_locator')
|
||
self.close_ad_button_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_button_locator')
|
||
self.total_product_count_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_locator')
|
||
self.total_product_count_for_registed_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_for_registed_locator')
|
||
self.items_per_page_locator = self.locator_manager.get_locator('BrowserControl', 'items_per_page_locator')
|
||
self.product_edit_dialog_open_locator = self.locator_manager.get_locator('BrowserControl', 'product_edit_dialog_open_locator')
|
||
self.product_dialog_close_btn = self.locator_manager.get_locator('BrowserControl', 'product_dialog_close_btn')
|
||
|
||
self.product_parent_locator= self.locator_manager.get_locator('BrowserControl', 'product_parent_locator')
|
||
self.product_name_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_name_inner_locator')
|
||
self.product_price_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_price_inner_locator')
|
||
self.product_image_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_image_inner_locator')
|
||
self.product_name_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_name_for_ed_template')
|
||
self.product_price_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_price_for_ed_template')
|
||
self.product_image_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_image_for_ed_template')
|
||
self.current_page = self.locator_manager.get_locator('BrowserControl', 'current_page')
|
||
self.next_page_button_template = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template')
|
||
self.new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
|
||
self.registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
|
||
self.current_page_locator = self.locator_manager.get_locator('BrowserControl', 'current_page_locator')
|
||
# self.source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
|
||
# self.ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
|
||
# self.cke_text_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'cke_text_editing_area_locator')
|
||
# self.cke_img_file_input_locator = self.locator_manager.get_locator('BrowserControl', 'cke_img_file_input_locator')
|
||
|
||
self.option_input_field_locator = self.locator_manager.get_locator('BrowserControl', 'option_input_field_locator')
|
||
self.title_tab_locator = self.locator_manager.get_locator('BrowserControl', 'title_tab_locator')
|
||
self.option_tab_locator = self.locator_manager.get_locator('BrowserControl', 'option_tab_locator')
|
||
self.price_tab_locator = self.locator_manager.get_locator('BrowserControl', 'price_tab_locator')
|
||
self.tag_tab_locator = self.locator_manager.get_locator('BrowserControl', 'tag_tab_locator')
|
||
self.thumb_tab_locator = self.locator_manager.get_locator('BrowserControl', 'thumb_tab_locator')
|
||
self.detail_tab_locator = self.locator_manager.get_locator('BrowserControl', 'detail_tab_locator')
|
||
# self.upload_tab_locator = self.locator_manager.get_locator('BrowserControl', 'upload_tab_locator')
|
||
self.save_button_locator = self.locator_manager.get_locator('BrowserControl', 'save_button_locator')
|
||
self.group_dropdown_locator = self.locator_manager.get_locator('BrowserControl', 'group_dropdown_locator')
|
||
self.group_index_template = self.locator_manager.get_locator('BrowserControl', 'group_index_template')
|
||
self.group_options_selector_locator = self.locator_manager.get_locator('BrowserControl', 'group_options_selector_locator')
|
||
self.dropdown_openstatus_locator = self.locator_manager.get_locator('BrowserControl', 'dropdown_openstatus_locator')
|
||
self.selected_group_name_locator = self.locator_manager.get_locator('BrowserControl', 'selected_group_name_locator')
|
||
|
||
self.product_edit_buttons = self.locator_manager.get_locator('BrowserControl', 'product_edit_buttons')
|
||
self.product_memo_buttons = self.locator_manager.get_locator('BrowserControl', 'product_memo_buttons')
|
||
|
||
self.memo_input_locator = self.locator_manager.get_locator('BrowserControl', 'memo_input_locator')
|
||
self.memo_exposer_locator = self.locator_manager.get_locator('BrowserControl', 'memo_exposer_locator')
|
||
self.memo_save_btn_locator = self.locator_manager.get_locator('BrowserControl', 'memo_save_btn_locator')
|
||
|
||
self.welcome_popup_closeBTN_selector = self.locator_manager.get_locator('BrowserControl', 'welcome_popup_closeBTN_selector')
|
||
|
||
self.modal_selector = self.locator_manager.get_locator('BrowserControl', 'modal_selector')
|
||
self.dialog_ok_selector = self.locator_manager.get_locator('BrowserControl', 'dialog_ok_selector')
|
||
self.dialog_selector = self.locator_manager.get_locator('BrowserControl', 'dialog_selector')
|
||
|
||
|
||
# self.text_templates = self.locator_manager.selectors.get('DetailPageTextTemplates', {})
|
||
|
||
# # 스레드 종료 시 close_whale_window_if_exists 호출
|
||
# self.finished.connect(self.cleanup)
|
||
|
||
|
||
|
||
def update_image_processor_info(self, is_trans_enabled, authenticated_by_admin, user_membership_level):
|
||
self.is_trans_enabled = is_trans_enabled
|
||
self.authenticated_by_admin = authenticated_by_admin
|
||
self.user_membership_level = user_membership_level
|
||
self.is_valid_level = self.user_membership_level == 'basic' or self.user_membership_level == 'premium' or self.user_membership_level == 'vip'
|
||
|
||
def init_image_processor(self):
|
||
try:
|
||
if self.is_valid_level and self.is_trans_enabled:
|
||
from src.modules.image_processor3 import ImageProcessor3
|
||
self.image_processor = ImageProcessor3(logger=self.logger, page=self.page, toggle_states=self.toggle_states, unwanted_words=self.toggle_states['unwanted_words'], authenticated_by_admin=self.authenticated_by_admin, base_dir=self.base_path, papago_translator=self.papago_translator)
|
||
|
||
self.logger.log(f"이미지 프로세서 초기화 완료", level=logging.INFO)
|
||
if self.image_processor:
|
||
return True
|
||
else:
|
||
return False
|
||
else:
|
||
self.logger.log(f"self.is_valid_level : {self.is_valid_level}, self.is_trans_enabled : {self.is_trans_enabled}", level=logging.INFO)
|
||
self.logger.log(f"이미지 프로세서 초기화 조건이 맞지 않습니다.", level=logging.INFO)
|
||
return False
|
||
except Exception as e:
|
||
self.logger.log(f"ImageProcessor3 초기화 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
return False
|
||
|
||
|
||
def update_toggle_states(self, toggle_states):
|
||
self.toggle_states = toggle_states
|
||
self.logger.log(f"toggle_states : {self.toggle_states}", level=logging.DEBUG)
|
||
|
||
self.unwanted_words = self.toggle_states['unwanted_words']
|
||
self.logger.log(f"unwanted_words : {self.unwanted_words}", level=logging.DEBUG)
|
||
|
||
async def init_routes(self, page):
|
||
|
||
if not self.route_registered:
|
||
await self.page.route("**/translate-inpaint", self.handle_route)
|
||
self.route_registered = True
|
||
self.logger.log("translate-inpaint 라우트 등록 완료", level=logging.INFO)
|
||
|
||
async def handle_route(self, route, request):
|
||
self.logger.log(f"요청 URL: {request.url}", level=logging.DEBUG)
|
||
if "translate-inpaint" in request.url:
|
||
try:
|
||
# 타임아웃 설정으로 API 응답 지연 방지 (15초)
|
||
response = await asyncio.wait_for(route.fetch(), timeout=15.0)
|
||
# 응답 본문 추출
|
||
body = await response.text()
|
||
# JSON 파싱
|
||
data = json.loads(body)
|
||
|
||
# self.logger.log(f"응답 본문: {body}", level=logging.DEBUG)
|
||
|
||
# unwanted_words = {
|
||
# "보증": "_보증은개뿔_치환용임_",
|
||
# "보상": "", # 빈 문자열은 삭제
|
||
# }
|
||
|
||
for block in data["data"]["translateImage"]["blocks"]:
|
||
text = block["transText"]
|
||
for word, repl in self.unwanted_words.items():
|
||
# "단어 경계" 기준 치환. 한글, 영어, 숫자 모두 경계 처리
|
||
text = re.sub(
|
||
rf'(?<![가-힣A-Za-z0-9]){re.escape(word)}(?![가-힣A-Za-z0-9])',
|
||
repl,
|
||
text
|
||
)
|
||
block["transText"] = text
|
||
|
||
# 조작된 데이터로 응답 생성
|
||
new_body = json.dumps(data)
|
||
|
||
# self.logger.log(f"조작된 데이터: {new_body}", level=logging.DEBUG)
|
||
|
||
await route.fulfill(
|
||
status=response.status,
|
||
headers=response.headers,
|
||
body=new_body
|
||
)
|
||
except asyncio.TimeoutError:
|
||
# API 타임아웃 발생 시 원본 요청을 그대로 통과시킴
|
||
self.logger.log(f"API 응답 타임아웃 발생, 원본 요청 통과: {request.url}", level=logging.WARNING)
|
||
await route.continue_()
|
||
except Exception as e:
|
||
# 기타 오류 발생 시 원본 요청을 그대로 통과시킴
|
||
self.logger.log(f"API 응답 처리 중 오류 발생: {e}, 원본 요청 통과", level=logging.WARNING)
|
||
await route.continue_()
|
||
else:
|
||
# 나머지는 기본 패스
|
||
await route.continue_()
|
||
|
||
|
||
def get_page(self):
|
||
return self.page
|
||
|
||
def get_parsing_page(self):
|
||
return self.parsing_page
|
||
|
||
def get_whale(self):
|
||
return self.whale_translator
|
||
|
||
def generate_random_suffix(self, length=4):
|
||
"""랜덤한 영문+숫자 조합 문자열 생성"""
|
||
chars = string.ascii_uppercase + string.digits
|
||
return ''.join(random.choices(chars, k=length))
|
||
|
||
|
||
# async def monitor_browser_logs(self, page):
|
||
# """브라우저 콘솔 로그를 Python으로 캡처하여 출력"""
|
||
# page.on("console", lambda msg: self.logger.info(f"JS 콘솔 로그: {msg.type}: {msg.text}"))
|
||
|
||
# async def monitor_browser_logs(self, page):
|
||
# """브라우저 콘솔 로그를 Python으로 캡처하여 출력"""
|
||
# level_map = {
|
||
# "log": logging.INFO,
|
||
# "info": logging.INFO,
|
||
# "warning": logging.WARNING,
|
||
# "error": logging.ERROR,
|
||
# "debug": logging.DEBUG,
|
||
# }
|
||
|
||
# page.on(
|
||
# "console",
|
||
# lambda msg: self.logger.log(
|
||
# f"JS 콘솔 로그: {msg.type}: {msg.text}",
|
||
# level=level_map.get(msg.type, logging.INFO)
|
||
# )
|
||
# )
|
||
|
||
# def get_base_dir(self):
|
||
# """
|
||
# 실행 환경에 따라 base_dir을 설정하는 메서드.
|
||
# cx_Freeze로 패키징된 경우 실행 파일의 경로, 일반 Python 환경일 경우 __file__을 기준으로 설정.
|
||
# """
|
||
# if getattr(sys, 'frozen', False): # 패키징된 경우
|
||
# base_dir = os.path.dirname(sys.executable)
|
||
# internal_dir = os.path.join(base_dir, 'lib', 'src') # lib 디렉토리 포함
|
||
# if os.path.exists(internal_dir): # lib 디렉토리가 존재하면 base_dir로 설정
|
||
# return internal_dir
|
||
|
||
# else: # 일반 Python 실행 환경
|
||
# base_dir = os.path.dirname(os.path.abspath(__file__))
|
||
# debug_dir = os.path.join(base_dir, 'src') # lib 디렉토리 포함
|
||
|
||
# return debug_dir
|
||
|
||
async def close_welcome_popup_if_exists(self, page, timeout_sec: int = 3):
|
||
"""
|
||
로그인 직후 간헐적으로 뜨는 '환영합니다' 팝업이 있을 경우 닫아준다.
|
||
일정 시간 동안 팝업이 감지되지 않으면 아무것도 하지 않고 종료한다.
|
||
|
||
Args:
|
||
page (Page): Playwright의 page 객체
|
||
timeout_sec (int): 팝업을 최대 몇 초 동안 기다릴지 (기본값 3초)
|
||
"""
|
||
# shadow-root 내부의 닫기 버튼을 가리키는 선택자
|
||
# self.welcome_popup_closeBTN_selector = '#ch-plugin >>> div:has(span.a11y-hidden:has-text("닫기")) button'
|
||
|
||
try:
|
||
close_btn = page.locator(self.welcome_popup_closeBTN_selector)
|
||
await close_btn.wait_for(state='visible', timeout=timeout_sec * 1000)
|
||
await close_btn.click()
|
||
self.logger.log("환영 다이얼로그를 닫았습니다.", level=logging.INFO)
|
||
|
||
except TimeoutError:
|
||
# timeout 내에 닫기 버튼이 안 뜨면 그냥 넘어감
|
||
self.logger.log("환영 다이얼로그가 감지되지 않았습니다.", level=logging.INFO)
|
||
|
||
finally:
|
||
await page.keyboard.press('Escape')
|
||
await asyncio.sleep(0.53)
|
||
await page.keyboard.press('Escape')
|
||
|
||
async def start_browser_async(self):
|
||
"""비동기 Playwright 초기화 및 로그인 수행"""
|
||
try:
|
||
|
||
self.is_image_processor_init = self.init_image_processor()
|
||
|
||
|
||
if not self.is_image_processor_init:
|
||
self.logger.log(f"이미지 프로세서를 사용하지 않습니다.", level=logging.INFO)
|
||
self.image_processor_error.emit("이미지 프로세서 초기화 오류로 대체번역을 사용합니다.\n 최대 3,000회/일 사용가능하나 사용상 주의 바랍니다.")
|
||
|
||
if self.user_membership_level == "premium" and self.authenticated_by_admin:
|
||
self.logger.log(f"한시적 이벤트로 7월 한달간 프리미엄 이상 회원에게 자체번역을 제공합니다.", level=logging.INFO)
|
||
self.premium_event_started.emit("한시적 이벤트로 7월 한달간 프리미엄 이상 회원에게 자체번역을 제공합니다.\n 자체번역 설정은 저장되지 않으니 실행때마다 설정해주세요")
|
||
|
||
# self.whale_translator.start_trans_browser()
|
||
# # 바뀐 검색주소
|
||
# # 'https://search.shopping.naver.com/search/all?adQuery=주차차단기&agency=true&frm=MOSCPRO&origQuery=주차차단기&pagingIndex=1&pagingSize=40&productSet=overseas&query=주차차단기&sort=rel×tamp=&viewType=list'
|
||
# self.whale_translator.lens_Search(image_url="https://file.percenty.co.kr/public/652bed8e865b1f32ea62bf1f/products/6847c740ecbe0d32a37a8570/b637c29c-80eb-43eb-975f-58279bc5fde1.jpg")
|
||
# time.sleep(1000000)
|
||
|
||
try:
|
||
self.logger.log('알바생 브라우저를 실행합니다...', level=logging.DEBUG)
|
||
|
||
# WhaleTranslator 필요 여부 확인 및 초기화
|
||
optionIMGTrans_status = self.toggle_states.get('optionIMGTrans', False)
|
||
detail_IMGTrans_status = self.toggle_states.get('detail_IMGTrans', False)
|
||
thumb_status = self.toggle_states.get('thumb', False)
|
||
debug_mode = self.toggle_states.get('debug_mode', False)
|
||
|
||
self.logger.log(f"optionIMGTrans_status: {optionIMGTrans_status}, detail_IMGTrans_status: {detail_IMGTrans_status}, thumb_status: {thumb_status}", level=logging.DEBUG)
|
||
|
||
# Playwright 시작 및 브라우저 실행
|
||
try:
|
||
# os.environ["PLAYWRIGHT_BROWSERS_PATH"] = os.path.expandvars(r"%LOCALAPPDATA%\ms-playwright")
|
||
self.playwright = await async_playwright().start()
|
||
except Exception as e:
|
||
self.logger.log(f"브라우저 기동 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
return
|
||
|
||
browser_path = os.path.join(self.base_path, 'browsers', 'chromium-1155', 'chrome-win','chrome.exe')
|
||
extension_path = os.path.join(self.base_path, 'browsers', 'extensions', '1.1.100_0')
|
||
user_data_dir = os.path.join(self.base_path, 'browsers', 'user_data')
|
||
if not os.path.exists(user_data_dir):
|
||
os.makedirs(user_data_dir, exist_ok=True)
|
||
self.logger.log(f"{user_data_dir} 디렉토리가 생성되었습니다.", level=logging.DEBUG)
|
||
|
||
|
||
self.logger.log(f"브라우저 경로: {browser_path}", level=logging.DEBUG)
|
||
self.logger.log(f"확장 프로그램 경로: {extension_path}", level=logging.DEBUG)
|
||
self.logger.log(f"사용자 폴더 경로: {user_data_dir}", level=logging.DEBUG)
|
||
|
||
self.video_dir = os.path.join(self.base_path, "recorded_videos")
|
||
if not os.path.exists(self.video_dir):
|
||
os.makedirs(self.video_dir)
|
||
self.logger.log(f"동영상 저장 폴더 생성됨: {self.video_dir}", level=logging.DEBUG)
|
||
|
||
if not os.path.exists(browser_path):
|
||
self.logger.log(f"브라우저 실행 파일이 없습니다: {browser_path}", level=logging.DEBUG)
|
||
raise FileNotFoundError(f"브라우저 실행 파일이 없습니다: {browser_path}")
|
||
|
||
|
||
# # User agent 설정
|
||
# user_agent = random.choice([
|
||
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
|
||
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
|
||
# ])
|
||
|
||
user_agent = random.choice([
|
||
# Chrome
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.6422.113 Safari/537.36",
|
||
# Whale (Chrome 기반)
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Whale/3.25.232.15 Chrome/125.0.6422.113 Safari/537.36",
|
||
])
|
||
|
||
|
||
self.logger.log(f"user_agent: {user_agent}", level=logging.DEBUG)
|
||
|
||
# 브라우저 시작 및 설정
|
||
self.browser = await self.playwright.chromium.launch_persistent_context(
|
||
user_data_dir,
|
||
headless=not debug_mode,
|
||
permissions=["geolocation", "notifications"],
|
||
geolocation={"latitude": 37.5665, "longitude": 126.9780},
|
||
locale="ko-KR",
|
||
args=[
|
||
'--disable-blink-features=AutomationControlled',
|
||
'--disable-popup-blocking',
|
||
f'--disable-extensions-except={extension_path}',
|
||
f'--load-extension={extension_path}',
|
||
'--start-maximized',
|
||
'--window-size=1920,1080'
|
||
],
|
||
executable_path=browser_path,
|
||
user_agent=user_agent,
|
||
|
||
# record_video_dir=self.video_dir, # 동영상 저장 폴더 지정
|
||
# record_video_size={"width": 1280, "height": 720}
|
||
|
||
)
|
||
|
||
# 기본 페이지가 없을 수 있으므로 새로운 페이지 생성
|
||
self.page = await self.browser.new_page()
|
||
self.page.set_default_navigation_timeout(60000)
|
||
self.page.set_default_timeout(60000)
|
||
self.logger.log('새 페이지 로딩 중...', level=logging.INFO)
|
||
|
||
|
||
await self.page.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
|
||
await self.page.add_init_script("""
|
||
Object.defineProperty(navigator, 'plugins', {
|
||
get: () => [1, 2, 3],
|
||
});
|
||
Object.defineProperty(navigator, 'languages', {
|
||
get: () => ['ko-KR', 'en-US']
|
||
});
|
||
window.chrome = { runtime: {} };
|
||
""")
|
||
|
||
await self.page.goto('https://percenty.co.kr/signin')
|
||
self.logger.log('percenty.co.kr/signin 로딩 완료', level=logging.INFO)
|
||
|
||
# 첫 번째 기본 탭 닫기
|
||
if self.browser.pages:
|
||
await self.browser.pages[0].close()
|
||
|
||
# 페이지 제목을 가져와서 창 제목으로 활용
|
||
page_title = await self.page.title()
|
||
self.logger.log(f'페이지 제목: {page_title}', level=logging.DEBUG)
|
||
|
||
# 라우트 등록
|
||
await self.init_routes(self.page)
|
||
|
||
self.logger.log(f"라우트 등록 완료", level=logging.DEBUG)
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"브라우저 객체 생성 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_create_error.emit(str(e))
|
||
return
|
||
|
||
try:
|
||
is_login_success = await self.login()
|
||
if not is_login_success:
|
||
self.logger.log("로그인 실패", level=logging.ERROR)
|
||
return
|
||
except Exception as e:
|
||
self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_login_error.emit(str(e))
|
||
return
|
||
|
||
try:
|
||
# await self.close_ad_if_exists_with_ESC_Key()
|
||
# await self.close_ad_if_exists_new()
|
||
await self.close_ant_modal_dialogs()
|
||
except Exception as e:
|
||
self.logger.log(f"광고 닫기 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_ad1_close_error.emit(str(e))
|
||
return
|
||
|
||
id_ed_mode = self.toggle_states.get('ed_mode', False)
|
||
if id_ed_mode:
|
||
# await self.go_to_registered_product_page()
|
||
self.logger.log('등록 상품 관리 페이지로 이동 중...', level=logging.INFO)
|
||
try:
|
||
registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
|
||
await self.page.click(registered_product_page_locator)
|
||
self.logger.log("등록 상품 관리 페이지로 이동 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
self.logger.log(f"등록 상품 관리 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_registered_product_page_error.emit(str(e))
|
||
return
|
||
|
||
else:
|
||
self.logger.log('신규 상품 등록 페이지로 이동 중...', level=logging.INFO)
|
||
# await self.go_to_new_product_page()
|
||
try:
|
||
new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
|
||
await self.page.click(new_product_page_locator)
|
||
await self.close_ad_if_exists_with_ESC_Key()
|
||
await self.close_ant_modal_dialogs()
|
||
self.logger.log("신규 상품 등록 페이지로 이동 완료.", level=logging.INFO)
|
||
|
||
# self.logger.log(f"신규 상품 등록 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
except Exception as e:
|
||
self.logger.log(f"광고 닫기 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_ad2_close_error.emit(str(e))
|
||
return
|
||
|
||
# group_index = self.toggle_states['group_index']
|
||
# self.logger.log('선택한 그룹 인덱스로 이동', level=logging.INFO)
|
||
# if group_index:
|
||
# await self.select_group_index(group_index=group_index)
|
||
|
||
try:
|
||
# 그룹 이름 리스트 가져오기
|
||
group_names_list = await self.get_group_names_list()
|
||
self.group_list_signal.emit(group_names_list)
|
||
except Exception as e:
|
||
self.logger.log(f"그룹 이름 리스트 가져오기 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_group_list_error.emit(str(e))
|
||
return
|
||
|
||
try:
|
||
# 각 핸들러에 초기화된 page 객체 전달.
|
||
self.titleGenerator.update_page(self.page ,self.toggle_states)
|
||
# self.titleGenerator.update_parsing_page(self.parsing_page)
|
||
self.optionHandler.update_page(self.page ,self.toggle_states, self.image_processor)
|
||
self.tagsHandler.update_page(self.page ,self.toggle_states)
|
||
self.thumbnailHandler.update_page(self.page ,self.toggle_states, self.image_processor)
|
||
self.priceHandler.update_page(self.page ,self.toggle_states)
|
||
self.detailHandler.update_page(self.page ,self.toggle_states, self.image_processor)
|
||
# self.imageProcessor.update_page(self.page ,self.toggle_states)
|
||
|
||
# self.optionHandler.update_whale()
|
||
# self.thumbnailHandler.update_whale()
|
||
# self.detailHandler.update_whale()
|
||
|
||
self.logger.log(f"핸들러 업데이트 완료", level=logging.DEBUG)
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"핸들러 업데이트 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_handler_update_error.emit(str(e))
|
||
return
|
||
|
||
|
||
try:
|
||
# 파싱용 브라우저 인스턴스를 별도로 실행 (headless 모드, 실제 브라우저처럼 보이도록 옵션 추가, 시크릿 모드 포함)
|
||
self.parsing_browser = await self.playwright.chromium.launch(
|
||
executable_path=browser_path, # 추가: 메인 브라우저와 동일한 실행 파일 사용
|
||
headless=False,
|
||
args=[
|
||
'--disable-blink-features=AutomationControlled', # 자동화 탐지 우회
|
||
'--no-sandbox',
|
||
'--disable-infobars',
|
||
'--disable-dev-shm-usage',
|
||
'--disable-gpu',
|
||
'--start-minimized',
|
||
'--incognito', # 시크릿 모드 활성화
|
||
'--window-position=-32000,-32000',
|
||
]
|
||
)
|
||
|
||
# 파싱용 컨텍스트는 기본적으로 시크릿 모드 환경이며, 추가 설정으로 실제 브라우저와 유사한 점수를 줄 수 있음.
|
||
self.parsing_context = await self.parsing_browser.new_context(
|
||
locale="ko-KR",
|
||
user_agent=random.choice([
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
|
||
]),
|
||
viewport={"width": 1280, "height": 720},
|
||
extra_http_headers={
|
||
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7"
|
||
}
|
||
)
|
||
|
||
# 파싱용 페이지 생성
|
||
self.parsing_page = await self.parsing_context.new_page()
|
||
self.parsing_page.set_default_navigation_timeout(60000)
|
||
self.parsing_page.set_default_timeout(60000)
|
||
|
||
self.papago_translator.update_page(self.parsing_page)
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"파싱용 페이지 생성 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.browser_parsing_page_error.emit(str(e))
|
||
return
|
||
|
||
# 신호 전송
|
||
self.browser_started.emit() # 브라우저 시작 신호
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.unknown_browser_error.emit(str(e))
|
||
return
|
||
|
||
|
||
# async def start_browser(self):
|
||
# """크롬 브라우저 실행 및 페이지 로딩"""
|
||
# self.logger.log('크롬 브라우저 실행 중...', level=logging.DEBUG)
|
||
# try:
|
||
# # Playwright를 수동으로 실행하여 브라우저 유지
|
||
# self.playwright = await async_playwright().start()
|
||
|
||
# # cx_Freeze로 패키징된 경우와 일반 Python 실행 환경 구분하여 경로 설정
|
||
# if getattr(sys, 'frozen', False):
|
||
# browser_path = os.path.join(os.path.dirname(sys.executable), 'browsers', 'chromium-1112', 'chrome-win','chrome.exe')
|
||
# extension_path = os.path.join(os.path.dirname(sys.executable), 'browsers', 'extensions', '1.1.100_0')
|
||
# user_data_dir = os.path.join(os.path.dirname(sys.executable), 'browsers', 'user_data')
|
||
# else:
|
||
# browser_path = os.path.join(os.path.dirname(__file__), 'browsers', 'chromium-1112', 'chrome-win','chrome.exe')
|
||
# extension_path = os.path.join(os.path.dirname(__file__), 'browsers', 'extensions', '1.1.100_0')
|
||
# user_data_dir = os.path.join(os.path.dirname(__file__), 'browsers', 'user_data')
|
||
|
||
# self.logger.log(f"브라우저 경로: {browser_path}", level=logging.DEBUG)
|
||
# self.logger.log(f"확장 프로그램 경로: {extension_path}", level=logging.DEBUG)
|
||
# self.logger.log(f"사용자 폴더 경로: {user_data_dir}", level=logging.DEBUG)
|
||
|
||
# # 사용자 데이터 디렉토리가 존재하지 않으면 생성
|
||
# if not os.path.exists(user_data_dir):
|
||
# os.makedirs(user_data_dir)
|
||
# self.logger.log(f"{user_data_dir} 디렉토리가 생성되었습니다.", level=logging.DEBUG)
|
||
|
||
|
||
# # User agent 설정
|
||
# user_agent = random.choice([
|
||
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
|
||
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
|
||
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
|
||
# "Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
|
||
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
|
||
# ])
|
||
# self.logger.log(f"user_agent: {user_agent}", level=logging.DEBUG)
|
||
|
||
# # 브라우저 시작 및 설정
|
||
# self.browser = await self.playwright.chromium.launch_persistent_context(
|
||
# user_data_dir,
|
||
# headless=False,
|
||
# permissions=["geolocation", "notifications"],
|
||
# geolocation={"latitude": 37.5665, "longitude": 126.9780},
|
||
# locale="ko-KR",
|
||
# args=[
|
||
# '--disable-popup-blocking',
|
||
# f'--disable-extensions-except={extension_path}',
|
||
# f'--load-extension={extension_path}',
|
||
# '--start-maximized',
|
||
# '--window-size=1920,1080'
|
||
# ],
|
||
# executable_path=browser_path,
|
||
# user_agent=user_agent
|
||
# )
|
||
|
||
# # 기본 페이지가 없을 수 있으므로 새로운 페이지 생성
|
||
# self.page = await self.browser.new_page()
|
||
# self.logger.log('새 페이지 로딩 중...', level=logging.INFO)
|
||
|
||
# await self.page.goto('https://percenty.co.kr/signin')
|
||
# self.logger.log('percenty.co.kr/signin 로딩 완료', level=logging.INFO)
|
||
|
||
# # 첫 번째 기본 탭 닫기
|
||
# if self.browser.pages:
|
||
# await self.browser.pages[0].close()
|
||
|
||
# # 페이지 제목을 가져와서 창 제목으로 활용
|
||
# page_title = await self.page.title()
|
||
# self.logger.log(f'페이지 제목: {page_title}', level=logging.DEBUG)
|
||
|
||
# # 창 핸들 찾기 (동적으로 얻은 페이지 제목 사용)
|
||
# self.chrome_hwnd = self.find_window_by_title(page_title)
|
||
# if not self.chrome_hwnd:
|
||
# self.logger.log('크롬 창을 찾을 수 없습니다.', level=logging.WARNING)
|
||
# else:
|
||
# self.logger.log(f'크롬 창 핸들: {self.chrome_hwnd}', level=logging.DEBUG)
|
||
|
||
# await self.login()
|
||
# await self.close_ad_if_exists()
|
||
|
||
# if self.toggle_states['ed_mode']:
|
||
# await self.go_to_registered_product_page()
|
||
# self.logger.log('등록 상품 관리 페이지로 이동 중...', level=logging.INFO)
|
||
# else:
|
||
# self.logger.log('신규 상품 등록 페이지로 이동 중...', level=logging.INFO)
|
||
# await self.go_to_new_product_page()
|
||
|
||
# self.browser_started.emit() # 브라우저 시작 신호
|
||
# except Exception as e:
|
||
# self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR)
|
||
# self.browser_error.emit(str(e))
|
||
async def login(self) -> bool:
|
||
"""로그인 처리: 다양한 로그인 성공 지표를 확인합니다."""
|
||
is_admin = self.login_infos.get('is_admin', False)
|
||
who = "관리자" if is_admin else "직원"
|
||
self.logger.log(f'로그인 시도 중: {who} 계정', level=logging.INFO)
|
||
|
||
# 필수 정보 누락 체크
|
||
required = ['admin_id', 'admin_pw'] if is_admin else ['admin_id', 'user_id', 'user_pw']
|
||
missing = [k for k in required if not self.login_infos.get(k)]
|
||
if missing:
|
||
msg = f'로그인 정보 누락: {", ".join(missing)}'
|
||
self.logger.log(msg, level=logging.ERROR)
|
||
self.browser_login_error.emit(msg)
|
||
return False
|
||
|
||
try:
|
||
try:
|
||
initial_body_html = await self.page.evaluate('() => document.body.innerHTML')
|
||
initial_body_length = len(initial_body_html)
|
||
self.logger.log(f'로그인 전 페이지 body 길이: {initial_body_length}', level=logging.DEBUG)
|
||
except:
|
||
initial_body_length = 0
|
||
|
||
# 1) 자격증명 입력 & 로그인 클릭
|
||
if is_admin:
|
||
await self.page.fill(self.login_email_locator, self.login_infos['admin_id'])
|
||
await self.page.fill(self.login_password_locator, self.login_infos['admin_pw'])
|
||
await self.page.click(self.login_button_locator)
|
||
else:
|
||
admin_toggle = self.page.locator(self.admin_toggle_locator)
|
||
if await admin_toggle.get_attribute("aria-checked") == "true":
|
||
await admin_toggle.click()
|
||
await self.page.fill(self.login_email_locator, self.login_infos['admin_id'])
|
||
await self.page.fill(self.staff_id_locator, self.login_infos['user_id'])
|
||
await self.page.fill(self.login_password_locator, self.login_infos['user_pw'])
|
||
await self.page.click(self.staff_login_button_locator)
|
||
|
||
self.logger.log(f'로그인 버튼 클릭 완료: {who} 계정', level=logging.DEBUG)
|
||
|
||
# 2) 다양한 방법으로 로그인 성공 확인
|
||
|
||
# 방법 1: 기존 선택자들로 확인
|
||
success_selectors = [
|
||
'[role="menu"]', # ul[role="menu"]보다 더 포괄적
|
||
'[role="dialog"]',
|
||
'span:text("신규 상품 등록")', # 신규 상품 등록 텍스트
|
||
'span:text("등록 상품 관리")' # 등록 상품 관리 텍스트
|
||
]
|
||
|
||
try:
|
||
# 여러 선택자를 동시에 기다림 (짧은 시간)
|
||
selector_string = ', '.join(success_selectors)
|
||
self.logger.log(f'로그인 성공 요소 대기 중... 선택자: {selector_string}', level=logging.DEBUG)
|
||
|
||
locator = await self.page.wait_for_selector(
|
||
selector_string,
|
||
timeout=5000 # 5초로 단축
|
||
)
|
||
|
||
if locator:
|
||
self.logger.log(f'로그인 성공: {who} 계정', level=logging.INFO)
|
||
self.logger.log(f'환영메세지 확인 중...', level=logging.INFO)
|
||
await self.close_welcome_popup_if_exists(self.page)
|
||
return True
|
||
|
||
except Exception as wait_error:
|
||
self.logger.log(f'메뉴 요소 대기 실패: {str(wait_error)}', level=logging.DEBUG)
|
||
|
||
# DOM 변경 감지
|
||
try:
|
||
new_body_html = await self.page.evaluate('() => document.body.innerHTML')
|
||
new_body_length = len(new_body_html)
|
||
body_change_ratio = abs(new_body_length - initial_body_length) / max(initial_body_length, 1)
|
||
|
||
self.logger.log(f'로그인 후 페이지 body 길이: {new_body_length} (변경률: {body_change_ratio:.2%})', level=logging.DEBUG)
|
||
|
||
# 페이지 내용이 10% 이상 변경되었다면 로그인 성공으로 간주
|
||
if body_change_ratio > 0.1:
|
||
self.logger.log(f'페이지 내용 변경으로 로그인 성공 확인: {who} 계정 (변경률: {body_change_ratio:.2%})', level=logging.DEBUG)
|
||
return True
|
||
|
||
except Exception as dom_error:
|
||
self.logger.log(f'DOM 변경 감지 중 오류: {dom_error}', level=logging.DEBUG)
|
||
|
||
# 방법 3: 로그인 폼 사라짐 확인
|
||
try:
|
||
login_form_exists = await self.page.query_selector(self.login_button_locator) is not None
|
||
if not login_form_exists:
|
||
self.logger.log(f'로그인 폼 사라짐으로 로그인 성공 확인: {who} 계정', level=logging.DEBUG)
|
||
return True
|
||
except:
|
||
pass
|
||
|
||
# 모든 방법으로 확인 실패
|
||
msg = f'로그인은 성공하였으나 페이지 확인 실패: {who} 계정'
|
||
self.logger.log(msg, level=logging.ERROR)
|
||
self.logger.log(f'대기 오류 상세: {str(wait_error)}', level=logging.DEBUG)
|
||
self.browser_login_error.emit(msg)
|
||
await self.save_error_screenshot(f'login_failed_{who}')
|
||
return False
|
||
|
||
except Exception as e:
|
||
# 클릭/입력 중 예외
|
||
msg = f'로그인 처리 중 오류: {who} 계정 – {e}'
|
||
self.logger.log(msg, level=logging.ERROR, exc_info=True)
|
||
self.browser_login_error.emit(msg)
|
||
await self.save_error_screenshot(f'login_exception_{who}')
|
||
return False
|
||
|
||
async def close_browser(self):
|
||
"""브라우저 종료"""
|
||
if self.browser:
|
||
await self.browser.close()
|
||
await self.playwright.stop()
|
||
self.cleanup()
|
||
self.logger.log('브라우저 종료됨.', level=logging.INFO)
|
||
|
||
# def find_window_by_title(self, window_name):
|
||
# """창 제목을 통해 핸들을 찾는 메서드"""
|
||
# def enum_windows_callback(hwnd, result):
|
||
# if win32gui.IsWindowVisible(hwnd) and window_name in win32gui.GetWindowText(hwnd):
|
||
# result.append(hwnd)
|
||
# result = []
|
||
# win32gui.EnumWindows(enum_windows_callback, result)
|
||
# return result[0] if result else None
|
||
|
||
# def switch_to_chrome(self):
|
||
# """크롬으로 포커스 전환"""
|
||
# if self.chrome_hwnd:
|
||
# win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE)
|
||
# win32gui.SetForegroundWindow(self.chrome_hwnd)
|
||
# self.logger.log('크롬 창으로 포커스 이동.', level=logging.DEBUG)
|
||
# else:
|
||
# self.logger.log('크롬 창을 찾을 수 없습니다.', level=logging.ERROR)
|
||
|
||
async def get_total_product_count(self):
|
||
total_count = 0
|
||
items_per_page = 0
|
||
|
||
try:
|
||
# total_count_elements = await self.page.query_selector_all(".sc-dOvA-dm.jqRNYf")
|
||
total_count_element = await self.page.query_selector(self.total_product_count_locator)
|
||
items_per_page_element = await self.page.query_selector(self.items_per_page_locator)
|
||
|
||
self.logger.log(f"total_count_element : {total_count_element}", level=logging.DEBUG)
|
||
|
||
if total_count_element:
|
||
total_count_text = await total_count_element.inner_text()
|
||
if "총" in total_count_text and "개 상품" in total_count_text:
|
||
total_count = int(''.join(re.findall(r'\d+', total_count_text)))
|
||
self.logger.log(f"총 상품수 확인: {total_count} 개", level=logging.INFO)
|
||
|
||
# 페이지당 상품 수 추출
|
||
if items_per_page_element:
|
||
items_per_page_text = await items_per_page_element.get_attribute("title")
|
||
if items_per_page_text and "개씩 보기" in items_per_page_text:
|
||
items_per_page = int(''.join(re.findall(r'\d+', items_per_page_text)))
|
||
self.logger.log(f"페이지당 상품수 확인: {items_per_page} 개씩 보기", level=logging.INFO)
|
||
|
||
# 결과 반환
|
||
if total_count:
|
||
return {"total_count": total_count, "items_per_page": items_per_page}
|
||
|
||
return {"total_count": 0, "items_per_page": 0}
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"상품 수를 가져오는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
return {"total_count": 0, "items_per_page": 0}
|
||
|
||
# def fetch_image_urls(self, html_content):
|
||
# """
|
||
# HTML 콘텐츠에서 모든 <img> 태그의 URL을 추출하는 함수.
|
||
# <figure> 안의 <img> 태그와 독립된 <img> 태그 모두 처리.
|
||
# """
|
||
# soup = BeautifulSoup(html_content, 'html.parser')
|
||
|
||
# # 모든 <img> 태그를 찾기
|
||
# image_urls = []
|
||
# img_tags = soup.find_all('img')
|
||
|
||
# for img in img_tags:
|
||
# # img 태그에서 src 속성 추출
|
||
# if 'src' in img.attrs:
|
||
# image_url = img['src']
|
||
# image_urls.append(image_url)
|
||
# self.logger.log(f"fetch_image_urls 에서 추출한 이미지URL 갯수 : {len(image_urls)} 개", level=logging.DEBUG)
|
||
|
||
# self.logger.log(f"fetch_image_urls 에서 추출한 이미지URL 목록 : {image_urls}", level=logging.DEBUG)
|
||
|
||
# return image_urls
|
||
|
||
async def close_ant_modal_dialogs(self, max_loops=3):
|
||
"""
|
||
antd 모달 다이얼로그에서 '다시 보지 않기', '닫기' 버튼을 최대 max_loops번 반복 클릭합니다.
|
||
self.page, self.logger 사용
|
||
"""
|
||
total_closed = 0
|
||
for loop in range(max_loops):
|
||
closed_this_round = 0
|
||
|
||
# 다이얼로그 생성될 때까지 3초간 대기 (한 번만)
|
||
try:
|
||
await self.page.wait_for_selector('.ant-modal-root', timeout=3000)
|
||
except Exception:
|
||
# screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log("3초 대기 내에 .ant-modal-root 다이얼로그 미등장", level=logging.WARNING)
|
||
|
||
# '다시 보지 않기' 버튼 (span 텍스트 포함)
|
||
dont_show_btns = self.page.locator(
|
||
'.ant-modal-root button span:text("다시 보지 않기")'
|
||
).locator('..') # span의 부모 button
|
||
count1 = await dont_show_btns.count()
|
||
for i in range(count1):
|
||
await dont_show_btns.nth(i).click()
|
||
closed_this_round += 1
|
||
self.logger.log(f'"다시 보지 않기" 버튼 클릭 (loop {loop+1}, {i+1}/{count1})', level=logging.INFO)
|
||
await self.page.wait_for_timeout(200)
|
||
|
||
# '닫기' 버튼 (span 텍스트 포함)
|
||
close_btns = self.page.locator(
|
||
'.ant-modal-root button span:text("닫기")'
|
||
).locator('..')
|
||
count2 = await close_btns.count()
|
||
for i in range(count2):
|
||
await close_btns.nth(i).click()
|
||
closed_this_round += 1
|
||
self.logger.log(f'"닫기" 버튼 클릭 (loop {loop+1}, {i+1}/{count2})', level=logging.INFO)
|
||
await self.page.wait_for_timeout(200)
|
||
|
||
total_closed += closed_this_round
|
||
if closed_this_round == 0:
|
||
break
|
||
await self.page.wait_for_timeout(300)
|
||
self.logger.log(f"총 {total_closed}개 다이얼로그 버튼 클릭 완료", level=logging.INFO)
|
||
return total_closed > 0
|
||
|
||
|
||
|
||
async def close_ad_if_exists_with_ESC_Key(self):
|
||
"""ESC 키를 두 번 전송하여 다이얼로그를 닫는 메서드"""
|
||
try:
|
||
# JavaScript로 ESC 키 이벤트 발생
|
||
await self.page.evaluate("""
|
||
(() => {
|
||
const escEvent = new KeyboardEvent('keydown', {
|
||
key: 'Escape',
|
||
code: 'Escape',
|
||
keyCode: 27,
|
||
which: 27,
|
||
bubbles: true,
|
||
cancelable: true
|
||
});
|
||
document.dispatchEvent(escEvent);
|
||
return true;
|
||
})()
|
||
""")
|
||
time.sleep(1)
|
||
await self.page.evaluate("""
|
||
(() => {
|
||
const escEvent = new KeyboardEvent('keydown', {
|
||
key: 'Escape',
|
||
code: 'Escape',
|
||
keyCode: 27,
|
||
which: 27,
|
||
bubbles: true,
|
||
cancelable: true
|
||
});
|
||
document.dispatchEvent(escEvent);
|
||
return true;
|
||
})()
|
||
""")
|
||
self.logger.log("ESC 키를 전송하여 다이얼로그를 닫았습니다.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"ESC 키 전송 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
|
||
async def close_ad_if_exists_new(self):
|
||
"""
|
||
다이얼로그가 존재하면 해당 다이얼로그 내부의 "닫기" 버튼을 클릭하고,
|
||
다이얼로그를 찾지 못하거나 내부에 닫기 버튼이 없으면 페이지 전체에서 "닫기" 버튼을 검색하여 클릭하는 메서드
|
||
"""
|
||
# 새로운 다이얼로그의 닫기(X) 버튼 먼저 시도
|
||
self.logger.log("새로운 다이얼로그의 닫기(X) 버튼 먼저 시도", level=logging.INFO)
|
||
|
||
# 1. 모달 다이얼로그 처리 (ant-modal-wrap)
|
||
try:
|
||
# 모달 처리 JavaScript 코드
|
||
modal_js = """
|
||
() => {
|
||
// 모달 찾기
|
||
const modalWraps = document.querySelectorAll('.ant-modal-wrap.ant-modal-centered');
|
||
if (modalWraps.length === 0) return 0;
|
||
|
||
console.log(`${modalWraps.length}개의 모달 발견`);
|
||
let closed = 0;
|
||
|
||
// 각 모달에 대해
|
||
for (const modal of modalWraps) {
|
||
try {
|
||
// 1. 닫기 버튼 찾기
|
||
const closeBtn = modal.querySelector('.ant-modal-close');
|
||
if (closeBtn) {
|
||
closeBtn.click();
|
||
console.log('모달 닫기 버튼 클릭');
|
||
closed++;
|
||
continue;
|
||
}
|
||
|
||
// 2. '닫기' 텍스트 버튼 찾기
|
||
const buttons = modal.querySelectorAll('button');
|
||
let found = false;
|
||
for (const btn of buttons) {
|
||
if (btn.textContent.includes('닫기')) {
|
||
btn.click();
|
||
console.log('닫기 텍스트 버튼 클릭');
|
||
closed++;
|
||
found = true;
|
||
break;
|
||
}
|
||
}
|
||
if (found) continue;
|
||
|
||
// 3. 마지막 수단: 모달 숨기기
|
||
modal.style.display = 'none';
|
||
const mask = document.querySelector('.ant-modal-mask');
|
||
if (mask) mask.style.display = 'none';
|
||
console.log('모달 강제 숨김');
|
||
closed++;
|
||
} catch (e) {
|
||
console.error('모달 처리 오류:', e);
|
||
}
|
||
}
|
||
return closed;
|
||
}
|
||
"""
|
||
modal_count = await self.page.evaluate(modal_js)
|
||
if modal_count > 0:
|
||
self.logger.log(f"{modal_count}개의 모달 다이얼로그 닫기 처리됨", level=logging.INFO)
|
||
await self.page.wait_for_timeout(1000) # 모달이 사라질 때까지 대기
|
||
return True
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"모달 다이얼로그 처리 중 오류: {e}", level=logging.INFO)
|
||
|
||
# 2. drawer-close 버튼 처리
|
||
try:
|
||
drawer_js = """
|
||
() => {
|
||
const drawerCloseButtons = Array.from(document.querySelectorAll('button.ant-drawer-close'));
|
||
let count = 0;
|
||
drawerCloseButtons.forEach(button => {
|
||
try {
|
||
button.click();
|
||
console.log('drawer-close 버튼 클릭');
|
||
count++;
|
||
} catch (e) {
|
||
console.error('drawer-close 클릭 오류:', e);
|
||
}
|
||
});
|
||
return count;
|
||
}
|
||
"""
|
||
drawer_count = await self.page.evaluate(drawer_js)
|
||
if drawer_count > 0:
|
||
self.logger.log(f"{drawer_count}개의 drawer-close 버튼 클릭됨", level=logging.INFO)
|
||
await self.page.wait_for_timeout(1000)
|
||
return True
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"drawer-close 버튼 처리 중 오류: {e}", level=logging.INFO)
|
||
|
||
# 3. 배경 클릭 시도
|
||
try:
|
||
backdrop_js = """
|
||
() => {
|
||
const backdropSelectors = [
|
||
'.ant-modal-mask',
|
||
'.ant-drawer-mask',
|
||
'.modal-backdrop',
|
||
'.dialog-backdrop',
|
||
'[role="dialog"] + div',
|
||
'.overlay',
|
||
'.modal-overlay'
|
||
];
|
||
|
||
let clicked = 0;
|
||
for (const selector of backdropSelectors) {
|
||
const backdrops = document.querySelectorAll(selector);
|
||
for (const backdrop of backdrops) {
|
||
try {
|
||
backdrop.click();
|
||
console.log(`${selector} 배경 클릭`);
|
||
clicked++;
|
||
} catch (e) {
|
||
console.error(`${selector} 클릭 오류:`, e);
|
||
}
|
||
}
|
||
}
|
||
return clicked;
|
||
}
|
||
"""
|
||
backdrop_count = await self.page.evaluate(backdrop_js)
|
||
if backdrop_count > 0:
|
||
self.logger.log(f"{backdrop_count}개의 배경 요소 클릭됨", level=logging.INFO)
|
||
await self.page.wait_for_timeout(1000)
|
||
return True
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"배경 클릭 처리 중 오류: {e}", level=logging.INFO)
|
||
|
||
# 4. ESC 키 시도
|
||
try:
|
||
await self.page.keyboard.down("Escape")
|
||
await self.page.wait_for_timeout(100)
|
||
await self.page.keyboard.up("Escape")
|
||
self.logger.log("ESC 키 전송 (down-up 방식)", level=logging.INFO)
|
||
await self.page.wait_for_timeout(1000)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"ESC 키 전송 중 오류: {e}", level=logging.INFO)
|
||
|
||
# 5. 기존 다이얼로그 처리 (이전 코드)
|
||
try:
|
||
# 다이얼로그가 나타날 때까지 최대 3초 대기
|
||
dialog_element = await self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=3000, state='visible')
|
||
self.logger.log("다이얼로그가 발견되었습니다. 내부의 닫기 버튼을 찾습니다.", level=logging.INFO)
|
||
|
||
if dialog_element:
|
||
# 다이얼로그 내부에서 "닫기" 텍스트를 가진 버튼을 xpath로 찾음
|
||
close_button = await dialog_element.query_selector("xpath=.//button[span[text()='닫기']]")
|
||
if close_button:
|
||
await close_button.click()
|
||
self.logger.log("다이얼로그 내부의 닫기 버튼을 클릭하여 닫았습니다.", level=logging.INFO)
|
||
return # 닫기 성공 시 함수 종료
|
||
else:
|
||
self.logger.log("다이얼로그 내부에서 닫기 버튼을 찾지 못했습니다.", level=logging.WARNING)
|
||
else:
|
||
self.logger.log("대기 후에도 다이얼로그 엘리먼트를 찾지 못했습니다.", level=logging.WARNING)
|
||
try:
|
||
await self.page.keyboard.press("Escape")
|
||
await self.page.wait_for_timeout(100)
|
||
await self.page.dispatch_event("body", "keydown", {
|
||
"key": "Escape",
|
||
"code": "Escape",
|
||
"keyCode": 27,
|
||
"which": 27
|
||
})
|
||
await self.page.wait_for_timeout(50)
|
||
await self.page.dispatch_event("body", "keyup", {
|
||
"key": "Escape",
|
||
"code": "Escape",
|
||
"keyCode": 27,
|
||
"which": 27
|
||
})
|
||
await self.page.wait_for_timeout(100)
|
||
|
||
self.logger.log("ECS를 전송하여 닫았습니다.", level=logging.INFO)
|
||
return
|
||
except Exception as e:
|
||
self.logger.log(f"닫기 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
|
||
except TimeoutError:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log("다이얼로그가 나타나지 않았습니다.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"다이얼로그 닫기 시도 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def close_ad_if_exists_ori(self):
|
||
"""광고 다이얼로그가 있으면 닫기 버튼을 클릭하는 메서드"""
|
||
try:
|
||
# 광고 다이얼로그가 나타날 때까지 기다림
|
||
await self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=3000, state='visible')
|
||
self.logger.log("다이얼로그가 발견되었습니다. 닫기 버튼을 클릭합니다.", level=logging.INFO)
|
||
|
||
# 닫기 버튼 클릭
|
||
close_button = await self.page.query_selector(self.close_ad_button_locator)
|
||
if close_button:
|
||
await close_button.click()
|
||
self.logger.log("다이얼로그를 성공적으로 닫았습니다.", level=logging.INFO)
|
||
else:
|
||
self.logger.log("닫기 버튼을 찾지 못했습니다.", level=logging.WARNING)
|
||
|
||
except TimeoutError:
|
||
# 다이얼로그가 없을 때: info 수준의 로그로 기록
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log("다이얼로그가 발견되지 않았습니다. 타임아웃이 발생했습니다.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
# 다른 예외 상황 발생 시 error로 기록
|
||
self.logger.log(f"다이얼로그 닫기 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
# async def go_to_new_product_page(self):
|
||
# """신규 상품 등록 페이지로 이동"""
|
||
# try:
|
||
# new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
|
||
# await self.page.click(new_product_page_locator)
|
||
# self.logger.log("신규 상품 등록 페이지로 이동 완료.", level=logging.INFO)
|
||
# except Exception as e:
|
||
# self.logger.log(f"신규 상품 등록 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
|
||
# async def go_to_registered_product_page(self):
|
||
# """신규 상품 등록 페이지로 이동"""
|
||
# try:
|
||
# registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
|
||
# await self.page.click(registered_product_page_locator)
|
||
# self.logger.log("등록 상품 관리 페이지로 이동 완료.", level=logging.INFO)
|
||
# except Exception as e:
|
||
# self.logger.log(f"등록 상품 관리 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def is_button_disabled(self, button):
|
||
"""버튼이 disabled 상태인지 확인"""
|
||
try:
|
||
# 버튼의 disabled 속성 확인
|
||
is_disabled = await button.get_attribute('disabled')
|
||
return is_disabled is not None # disabled 속성이 있으면 True 반환
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"상품 수정 버튼 상태 확인 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
return False # 오류 발생 시 기본적으로 활성화된 것으로 처리
|
||
|
||
# async def select_group_index_ori(self, group_index: int):
|
||
# """그룹 드롭다운 열고 옵션 선택"""
|
||
# try:
|
||
# self.logger.log(f"group_index : {group_index}", level=logging.INFO)
|
||
|
||
|
||
# await self.page.evaluate("""
|
||
# const targetElement = Array.from(document.querySelectorAll('span')).find(el => el.textContent.trim() === '수집 상품 목록');
|
||
# if (targetElement) {
|
||
# targetElement.scrollIntoView({ behavior: 'smooth', block: 'center' });
|
||
# }
|
||
# """)
|
||
|
||
# self.logger.log(f"그룹박스 요소로 스크롤", level=logging.INFO)
|
||
|
||
# await asyncio.sleep(1)
|
||
# # await self.page.wait_for_load_state("networkidle")
|
||
|
||
# group_option_locator = self.group_index_template.format(index=group_index)
|
||
|
||
# # 드롭다운 열기
|
||
# await self.page.wait_for_selector(self.group_dropdown_locator, timeout=3000, state='visible')
|
||
# await self.page.click(self.group_dropdown_locator, timeout=3000, force=True)
|
||
# self.logger.log("드롭다운을 성공적으로 클릭했습니다.", level=logging.INFO)
|
||
|
||
# # 드롭다운 열림 상태 확인
|
||
# await self.page.wait_for_selector(self.dropdown_openstatus_locator, timeout=3000)
|
||
# self.logger.log("드롭다운이 열렸습니다.", level=logging.INFO)
|
||
|
||
# # 옵션 선택
|
||
# # await self.page.wait_for_selector(group_option_locator, timeout=3000)
|
||
# await self.page.click(group_option_locator, timeout=3000)
|
||
# self.logger.log(f"[{group_index}]번 그룹 선택 완료", level=logging.INFO)
|
||
|
||
# selected_group_name = await self.page.inner_text(self.selected_group_name_locator)
|
||
# self.logger.log(f"선택된 그룹 이름 : [{selected_group_name}]", level=logging.INFO)
|
||
|
||
# # 선택된 그룹 이름을 시그널로 전달
|
||
# self.selected_group_name_signal.emit(selected_group_name)
|
||
|
||
# except TimeoutError:
|
||
# self.logger.log("드롭다운 또는 옵션 선택 중 타임아웃이 발생했습니다.", level=logging.WARNING)
|
||
# except Exception as e:
|
||
# self.logger.log(f"그룹 선택 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def select_group_index(self, group_index: int):
|
||
"""그룹 드롭다운 열고 옵션 선택"""
|
||
try:
|
||
self.logger.log(f"group_index : {group_index}", level=logging.INFO)
|
||
|
||
await self.page.evaluate("""
|
||
const targetElement = Array.from(document.querySelectorAll('span'))
|
||
.find(el => el.textContent.trim() === '수집 상품 목록');
|
||
if (targetElement) {
|
||
targetElement.scrollIntoView({ behavior: 'smooth', block: 'center' });
|
||
}
|
||
""")
|
||
|
||
self.logger.log("그룹박스 요소로 스크롤", level=logging.INFO)
|
||
|
||
await asyncio.sleep(1)
|
||
group_option_locator = self.group_index_template.format(index=group_index+1)
|
||
|
||
# 드롭다운 열기
|
||
await self.page.wait_for_selector(self.group_dropdown_locator, timeout=3000, state='visible')
|
||
await self.page.click(self.group_dropdown_locator, timeout=3000, force=True)
|
||
self.logger.log("드롭다운을 성공적으로 클릭했습니다.", level=logging.INFO)
|
||
|
||
# 드롭다운 열림 상태 확인
|
||
await self.page.wait_for_selector(self.dropdown_openstatus_locator, timeout=3000)
|
||
self.logger.log("드롭다운이 열렸습니다.", level=logging.INFO)
|
||
|
||
# 옵션 선택
|
||
self.logger.log(f"[{group_option_locator}] : group_option_locator", level=logging.INFO)
|
||
|
||
await self.page.click(group_option_locator, timeout=3000)
|
||
self.logger.log(f"[{group_index}]번 그룹 선택 완료", level=logging.INFO)
|
||
|
||
selected_group_name = await self.page.inner_text(self.selected_group_name_locator)
|
||
self.logger.log(f"선택된 그룹 이름 : [{selected_group_name}]", level=logging.INFO)
|
||
|
||
# 총 상품 개수 가져오기
|
||
product_count_info = await self.get_total_product_count()
|
||
total_products = product_count_info.get("total_count", 0)
|
||
items_per_page = product_count_info.get("items_per_page", 0)
|
||
self.logger.log(f"총 상품 개수 : [{total_products}]", level=logging.INFO)
|
||
|
||
|
||
# 선택된 그룹 이름을 시그널로 전달
|
||
self.selected_group_name_signal.emit(selected_group_name, total_products)
|
||
|
||
except TimeoutError:
|
||
self.logger.log("드롭다운 또는 옵션 선택 중 타임아웃이 발생했습니다.", level=logging.WARNING)
|
||
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"그룹 선택 중 타임아웃이 발생했습니다. 스크린샷 저장됨: {screenshot_path}", level=logging.WARNING)
|
||
|
||
# 그룹 선택 실패 시 빈 문자열 시그널 전달
|
||
self.selected_group_name_signal.emit("")
|
||
except Exception as e:
|
||
self.logger.log(f"그룹 선택 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.selected_group_name_signal.emit("")
|
||
|
||
async def get_group_names_list(self):
|
||
"""그룹 드롭다운을 열고 모든 그룹 이름 목록을 가져오기"""
|
||
try:
|
||
self.logger.log("그룹 이름 목록 가져오기 시작", level=logging.INFO)
|
||
|
||
# 수집 상품 목록 요소로 스크롤
|
||
await self.page.evaluate("""
|
||
const targetElement = Array.from(document.querySelectorAll('span'))
|
||
.find(el => el.textContent.trim() === '수집 상품 목록');
|
||
if (targetElement) {
|
||
targetElement.scrollIntoView({ behavior: 'smooth', block: 'center' });
|
||
}
|
||
""")
|
||
|
||
self.logger.log("그룹박스 요소로 스크롤", level=logging.INFO)
|
||
await asyncio.sleep(1)
|
||
|
||
# 드롭다운 열기
|
||
await self.page.wait_for_selector(self.group_dropdown_locator, timeout=3000, state='visible')
|
||
await self.page.click(self.group_dropdown_locator, timeout=3000, force=True)
|
||
self.logger.log("드롭다운을 성공적으로 클릭했습니다.", level=logging.INFO)
|
||
|
||
# 드롭다운 열림 상태 확인
|
||
await self.page.wait_for_selector(self.dropdown_openstatus_locator, timeout=3000)
|
||
self.logger.log("드롭다운이 열렸습니다.", level=logging.INFO)
|
||
|
||
# rc-virtual-list가 로딩될 때까지 잠시 대기
|
||
await asyncio.sleep(0.5)
|
||
|
||
|
||
|
||
# 그룹 옵션들이 로딩될 때까지 대기
|
||
await self.page.wait_for_selector(self.group_options_selector_locator, timeout=3000)
|
||
|
||
# 모든 그룹 이름 텍스트 가져오기
|
||
group_names = await self.page.evaluate(f"""
|
||
() => {{
|
||
const elements = document.querySelectorAll('{self.group_options_selector_locator}');
|
||
return Array.from(elements).map(el => el.textContent.trim());
|
||
}}
|
||
""")
|
||
|
||
|
||
|
||
# await self.page.wait_for_selector("div.ant-select-item-option-content", timeout=6000)
|
||
|
||
# group_names = await self.page.evaluate("""
|
||
# Array.from(document.querySelectorAll('div.ant-select-item-option-content'))
|
||
# .map(el => el.textContent.trim())
|
||
# """)
|
||
|
||
|
||
|
||
self.logger.log(f"발견된 그룹 목록: {group_names}", level=logging.INFO)
|
||
|
||
# 드롭다운 닫기 (ESC 키 또는 다른 곳 클릭)
|
||
await self.page.keyboard.press("Escape")
|
||
self.logger.log("드롭다운 닫기 완료", level=logging.INFO)
|
||
|
||
return group_names
|
||
|
||
except TimeoutError:
|
||
self.logger.log("그룹 목록 가져오기 중 타임아웃이 발생했습니다.", level=logging.WARNING)
|
||
# 드롭다운이 열려있을 수 있으므로 ESC로 닫기 시도
|
||
save_screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"self.group_options_selector_locator: {self.group_options_selector_locator}", level=logging.WARNING)
|
||
self.logger.log(f"그룹 목록 가져오기 중 타임아웃이 발생했습니다. 스크린샷 저장됨: {save_screenshot_path}", level=logging.WARNING)
|
||
|
||
try:
|
||
await self.page.keyboard.press("Escape")
|
||
except:
|
||
pass
|
||
return []
|
||
except Exception as e:
|
||
self.logger.log(f"그룹 목록 가져오기 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
screenshot_path = await self.save_error_screenshot()
|
||
# 드롭다운이 열려있을 수 있으므로 ESC로 닫기 시도
|
||
try:
|
||
await self.page.keyboard.press("Escape")
|
||
except:
|
||
pass
|
||
return []
|
||
|
||
async def get_product_edit_buttons_by_template(self):
|
||
"""현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기"""
|
||
# 버튼 선택자 설정
|
||
# buttons = self.page.locator(self.product_edit_button_template)
|
||
# memos = self.page.locator(self.product_memo_button_template)
|
||
|
||
"""
|
||
현재 페이지의 세부사항 수정 및 업로드 버튼과 메모 버튼을 매칭하여 반환.
|
||
각 상품에 대해 해외배송비 수정 버튼과 메모 버튼을 추출합니다.
|
||
"""
|
||
try:
|
||
# 수정 버튼 선택자 설정
|
||
# edit_buttons = self.page.locator('//button[span[text()="세부사항 수정 및 업로드"]]')
|
||
# memo_buttons = self.page.locator('button.ant-btn.css-1li46mu.ant-btn-default.ant-btn-icon-only')
|
||
edit_buttons = self.page.locator(self.product_edit_buttons)
|
||
memo_buttons = self.page.locator(self.product_memo_buttons)
|
||
|
||
# 수정 버튼 개수 확인
|
||
edit_button_count = await edit_buttons.count()
|
||
if edit_button_count == 0:
|
||
self.logger.log("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.", level=logging.WARNING)
|
||
return []
|
||
|
||
# 메모 버튼 개수 확인
|
||
memo_button_count = await memo_buttons.count()
|
||
if memo_button_count <= 2: # 첫 번째 버튼과 최소 상품 메모 버튼 제외
|
||
self.logger.log("메모 버튼이 유효하지 않습니다. 최소 3개 이상의 버튼에서 메모버튼이 유효합니다.", level=logging.WARNING)
|
||
return []
|
||
|
||
# 첫 번째 버튼 제외하고 나머지 버튼을 처리
|
||
valid_memo_buttons = [memo_buttons.nth(i) for i in range(1, memo_button_count)]
|
||
|
||
# 상품별 해외배송비 수정 버튼과 메모 버튼 매칭
|
||
product_buttons = []
|
||
index = 0 # valid_memo_buttons에서의 현재 인덱스
|
||
|
||
for i in range(edit_button_count):
|
||
if index + 1 >= len(valid_memo_buttons):
|
||
self.logger.log("메모 버튼의 개수가 부족합니다. 매칭에 실패했습니다.", level=logging.WARNING)
|
||
break
|
||
|
||
# 상품별 버튼 매칭
|
||
product_buttons.append({
|
||
"edit_button": edit_buttons.nth(i),
|
||
"shipping_button": valid_memo_buttons[index], # 해외배송비 수정 버튼
|
||
"memo_button": valid_memo_buttons[index + 1] # 메모 버튼
|
||
})
|
||
index += 2 # 한 상품에 대해 두 개의 버튼(해외배송비 + 메모 버튼)을 소모
|
||
|
||
# 최종 결과 로그
|
||
self.logger.log(f"현재 페이지의 수정할 상품 개수: {len(product_buttons)}", level=logging.INFO)
|
||
|
||
return product_buttons
|
||
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"상품 수정 버튼을 찾는 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
return []
|
||
|
||
async def get_product_edit_buttons_by_template_new(self):
|
||
"""
|
||
템플릿 방식의 CSS 선택자를 이용해 현재 페이지의 수정 및 메모 버튼을 동적으로 찾습니다.
|
||
"""
|
||
product_buttons = []
|
||
# 예시로, 상품 항목들이 li.product-item 형태로 존재한다고 가정합니다.
|
||
product_items = self.page.locator("div#root li.product-item")
|
||
count = await product_items.count()
|
||
self.logger.log(f"현재 페이지에 {count}개의 상품 항목을 발견했습니다.", level=logging.INFO)
|
||
for i in range(1, count + 1):
|
||
edit_selector = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template').format(index=i)
|
||
memo_selector = self.locator_manager.get_locator('BrowserControl', 'product_memo_button_template').format(index=i)
|
||
edit_button = self.page.locator(edit_selector)
|
||
memo_button = self.page.locator(memo_selector)
|
||
if await edit_button.count() > 0:
|
||
product_buttons.append({
|
||
"edit_button": edit_button,
|
||
"memo_button": memo_button
|
||
})
|
||
else:
|
||
self.logger.log(f"상품 인덱스 {i}에 대해 수정 버튼을 찾지 못했습니다.", level=logging.WARNING)
|
||
return product_buttons
|
||
|
||
async def open_product_edit_dialog(self, button):
|
||
"""상품 수정 다이얼로그 열기"""
|
||
try:
|
||
# 요소가 화면에 없을 경우 스크롤하여 보이도록 함
|
||
await button.scroll_into_view_if_needed()
|
||
self.logger.log("상품의 '세부사항 수정 및 업로드' 버튼을 화면에 보이도록 스크롤.", level=logging.DEBUG)
|
||
|
||
await self.page.keyboard.press("Escape")
|
||
self.logger.log("이전 다이얼로그 닫기 완료.", level=logging.INFO)
|
||
|
||
await button.click()
|
||
self.logger.log("세부사항 수정 다이얼로그 열기 완료.", level=logging.INFO)
|
||
|
||
# await self.page.wait_for_selector('div.ant-tabs-nav') # 다이얼로그가 완전히 로딩될 때까지 기다림
|
||
await self.page.wait_for_selector(self.product_edit_dialog_open_locator) # 다이얼로그가 완전히 로딩될 때까지 기다림
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"세부사항 수정 다이얼로그 열기 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
# 에러 발생 시 스크린샷 저장
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"세부사항 수정 다이얼로그 열기 중 오류 발생으로 스크린샷 저장됨: {screenshot_path}", level=logging.INFO)
|
||
|
||
async def save_error_screenshot(self, error_tag: str = None) -> str:
|
||
"""
|
||
에러 발생시 스크린샷 저장.
|
||
- error_tag가 None이면 호출한 함수 이름과 줄 번호를 자동으로 사용.
|
||
"""
|
||
try:
|
||
if self.page:
|
||
|
||
# 1) 호출자 정보를 가져온다
|
||
caller = inspect.stack()[1]
|
||
func_name = caller.function
|
||
lineno = caller.lineno
|
||
# 코드 컨텍스트(한 줄) 추출 (없으면 빈 문자열)
|
||
code_line = caller.code_context[0].strip() if caller.code_context else ""
|
||
|
||
# 2) error_tag 미지정 시 자동 생성
|
||
if not error_tag:
|
||
error_tag = f"{func_name}_L{lineno}"
|
||
|
||
# 3) 날짜별 디렉토리 준비
|
||
today = datetime.now().strftime("%Y%m%d")
|
||
date_dir = os.path.join(self.ERROR_SCREENSHOT_DIR, today)
|
||
os.makedirs(date_dir, exist_ok=True)
|
||
|
||
# 4) 타임스탬프
|
||
now = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
filename = f"screenshot_{error_tag}_{now}.png"
|
||
full_path = os.path.join(date_dir, filename)
|
||
|
||
# 5) 스크린샷 찍기 (페이지가 살아있는지 추가 확인)
|
||
try:
|
||
# 페이지가 이미 닫혔으면 스크린샷을 건너뜀
|
||
if self.page.is_closed():
|
||
self.logger.log("스크린샷 건너뜀: 페이지가 닫혀 있습니다.", level=logging.WARNING)
|
||
return None
|
||
except Exception:
|
||
# is_closed() 호출 자체가 실패할 수도 있으므로 무시
|
||
pass
|
||
|
||
# full_page=False 로 렌더링 시간 단축, 타임아웃은 60초로 상향
|
||
await self.page.screenshot(path=full_path, full_page=False, timeout=60000)
|
||
|
||
# 6) 디버깅용으로 호출 줄과 코드도 로그에 남기기
|
||
self.logger.log(f"스크린샷 저장: {full_path} (called at {func_name} L{lineno}: {code_line})", level=logging.INFO)
|
||
|
||
return full_path
|
||
else:
|
||
self.logger.log("스크린샷 저장 중 오류: 페이지가 없습니다.", level=logging.ERROR)
|
||
return None
|
||
except Exception as e:
|
||
self.logger.log(f"스크린샷 저장 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
return None
|
||
|
||
def clear_old_screenshot_dirs(self, days=14):
|
||
"""에러스크린샷 디렉토리에서 지정일(기본 14일) 지난 하위폴더 삭제"""
|
||
now = datetime.now()
|
||
for dir_name in os.listdir(self.ERROR_SCREENSHOT_DIR):
|
||
dir_path = os.path.join(self.ERROR_SCREENSHOT_DIR, dir_name)
|
||
if os.path.isdir(dir_path):
|
||
try:
|
||
# 폴더명 YYYYMMDD로 파싱
|
||
dir_date = datetime.strptime(dir_name, '%Y%m%d')
|
||
if now - dir_date > timedelta(days=days):
|
||
# 오래된 폴더 삭제
|
||
import shutil
|
||
shutil.rmtree(dir_path)
|
||
self.logger.log(f"[스크린샷] {dir_path} 삭제됨", level=logging.INFO)
|
||
except Exception as e:
|
||
self.logger.log(f"[스크린샷] {dir_path} 삭제 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
def get_error_screenshot_dir(self):
|
||
"""에러 스크린샷 디렉토리 경로 반환"""
|
||
return self.ERROR_SCREENSHOT_DIR
|
||
|
||
async def click_detail_tab(self):
|
||
"""상세페이지 탭 클릭"""
|
||
try:
|
||
await self.page.click(self.detail_tab_locator)
|
||
self.logger.log("상세페이지 탭 클릭 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"상세페이지 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def click_option_tab(self):
|
||
"""옵션 탭 클릭"""
|
||
try:
|
||
await self.page.click(self.option_tab_locator)
|
||
self.logger.log("옵션 탭 클릭 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"옵션 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def click_price_tab(self):
|
||
"""가격 탭 클릭"""
|
||
try:
|
||
await self.page.click(self.price_tab_locator)
|
||
self.logger.log("가격 탭 클릭 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"가격 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def click_thumb_tab(self):
|
||
"""썸네일 탭 클릭"""
|
||
try:
|
||
await self.page.click(self.thumb_tab_locator)
|
||
self.logger.log("썸네일 탭 클릭 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"썸네일 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def click_tags_tab(self):
|
||
"""키워드 태그 탭 클릭"""
|
||
try:
|
||
await self.page.click(self.tag_tab_locator)
|
||
self.logger.log("키워드 태그 탭 클릭 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"키워드 태그 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def click_title_tab(self):
|
||
"""상품명 탭 클릭"""
|
||
try:
|
||
await self.page.click(self.title_tab_locator)
|
||
self.logger.log("상품명 탭 클릭 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"상품명 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
# def generate_restored_html(self, urls):
|
||
# """이미지 URL 목록을 HTML 형식으로 변환하는 메서드"""
|
||
# html_content = '<p> </p>'
|
||
# for url in urls:
|
||
# html_content += f'<figure class="image"><img src="{url}" style="aspect-ratio:1/1;"></figure>\n'
|
||
# return html_content
|
||
|
||
|
||
def generate_restored_html(self, urls):
|
||
"""이미지 URL 목록을 HTML 형식으로 변환하는 메서드, 각 이미지의 가로세로 비율 추가"""
|
||
html_content = '<p> </p>\n'
|
||
for url in urls:
|
||
width, height = self.get_image_size(url)
|
||
aspect_ratio = f"{width}/{height}" if width and height else "1/1"
|
||
if width and height:
|
||
html_content += (
|
||
f'<figure class="image">'
|
||
f'<img style="aspect-ratio:{aspect_ratio};" src="{url}" width="{width}" height="{height}">'
|
||
f'</figure>\n'
|
||
)
|
||
else:
|
||
# 이미지 크기를 확인할 수 없을 경우 기본 형식으로 추가
|
||
html_content += f'<figure class="image"><img src="{url}"></figure>\n'
|
||
return html_content
|
||
|
||
def get_image_size(self, url):
|
||
"""이미지 URL로부터 가로와 세로 크기를 가져오는 메서드"""
|
||
try:
|
||
# URL에서 불필요한 따옴표 제거
|
||
cleaned_url = url.strip("'\"")
|
||
|
||
response = requests.get(cleaned_url, timeout=5)
|
||
response.raise_for_status()
|
||
image = Image.open(BytesIO(response.content))
|
||
return image.width, image.height
|
||
except Exception as e:
|
||
|
||
self.logger.log(f"이미지 크기 확인 실패 - {cleaned_url}: {e}", level=logging.WARNING)
|
||
return None, None
|
||
|
||
|
||
# async def extract_image_urls(self, optionHandler, is_option_data=False):
|
||
# """상세페이지에서 이미지 URL 추출"""
|
||
# try:
|
||
# # 소스 편집 모드로 전환
|
||
# source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
|
||
# ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
|
||
|
||
# # 소스 편집 모드로 전환
|
||
# await self.page.click(source_button_locator)
|
||
# self.logger.log("소스 버튼 클릭 완료.", level=logging.DEBUG)
|
||
|
||
|
||
# # 'data-value' 속성 값을 추출 (textarea 요소)
|
||
# textarea = await self.page.wait_for_selector(ck_source_editing_area_locator, timeout=5000)
|
||
# data_value = await textarea.get_attribute("data-value")
|
||
|
||
|
||
# # HTML 소스에서 이미지 URL 추출
|
||
# image_urls = self.fetch_image_urls(data_value)
|
||
# self.logger.log(f'추출된 이미지 URL 수: {len(image_urls)}', level=logging.INFO)
|
||
|
||
# # HTML 소스에서 이미지 URL 삭제
|
||
# self.logger.log('img 태그를 삭제 중...', level=logging.DEBUG)
|
||
# data_value_element = await self.page.query_selector(ck_source_editing_area_locator)
|
||
# new_value = ""
|
||
# if data_value_element:
|
||
# await self.page.evaluate(f'() => document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", "{new_value}")')
|
||
# updated_value = await data_value_element.get_attribute('data-value')
|
||
# self.logger.log(f'Updated data-value: {updated_value}', level=logging.DEBUG)
|
||
# else:
|
||
# self.logger.log('Element with data-value not found.', level=logging.DEBUG)
|
||
# self.logger.log('img 태그 삭제 완료.', level=logging.DEBUG)
|
||
|
||
# # img 태그의 class 삭제 후 다시 소스 버튼 클릭
|
||
# await self.page.click(source_button_locator)
|
||
# self.logger.log('소스 버튼 재 클릭 완료.', level=logging.DEBUG)
|
||
|
||
|
||
# # 텍스트 입력 필드 선택
|
||
# input_field = await self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
|
||
# await input_field.press('Enter')
|
||
|
||
# # # 선두부 텍스트 입력
|
||
# # for key in sorted(self.text_templates.keys()):
|
||
# # leading_text = self.text_templates[key]
|
||
# # if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
|
||
# # await input_field.type(leading_text)
|
||
# # await input_field.press('Enter')
|
||
# # self.logger.log(f"{key} 텍스트 입력 완료: {leading_text}", level=logging.INFO)
|
||
|
||
# # 선두부 텍스트 입력 (self.detail_text_widget의 get_lines 메서드 사용)
|
||
# for leading_text in self.detail_text_widget.get_lines():
|
||
# if leading_text: # 내용이 있는 경우에만
|
||
# await input_field.type(leading_text)
|
||
# await input_field.press('Enter')
|
||
# self.logger.log(f"텍스트 입력 완료: {leading_text}", level=logging.INFO)
|
||
|
||
|
||
# option_data = optionHandler.get_selected_translated_options()
|
||
# self.logger.log(f'option_data : {option_data}', level=logging.DEBUG)
|
||
|
||
# # if is_option_data:
|
||
# if option_data or option_data != {}:
|
||
# self.logger.log('옵션 데이터 입력 시작', level=logging.DEBUG)
|
||
# # option_data = {} # option_data 초기화
|
||
# is_single = optionHandler.option_info['is_single_option']
|
||
# # option_data = optionHandler.get_selected_translated_options()
|
||
|
||
# # is_single = True # 옵션입력 일단 제외
|
||
# # self.logger.log('옵션입력 일단 제외', level=logging.DEBUG)
|
||
|
||
# self.logger.log('가져온 옵션 데이터', level=logging.DEBUG)
|
||
# self.logger.log(f'{option_data}', level=logging.DEBUG)
|
||
|
||
# # # 옵션 입력 필드 선택
|
||
# # input_field = await self.page.wait_for_selector(self.option_input_field_locator, timeout=5000)
|
||
# # await input_field.press('Enter')
|
||
|
||
# # # 선두부 텍스트 입력
|
||
# # for key in sorted(self.text_templates.keys()):
|
||
# # leading_text = self.text_templates[key]
|
||
# # if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기
|
||
# # await input_field.type(leading_text)
|
||
# # await input_field.press('Enter')
|
||
# # self.logger.log(f"{key} 텍스트 입력 완료: {leading_text}", level=logging.INFO)
|
||
|
||
# if not is_single:
|
||
# self.logger.log('단일옵션이 아니므로 옵션목록을 입력', level=logging.INFO)
|
||
|
||
# # 각 옵션을 한 줄씩 입력
|
||
# await input_field.type("# 옵션 목록")
|
||
# await input_field.press('Enter')
|
||
|
||
# # 첫 번째 옵션의 번역된 옵션명만 입력
|
||
# first_key = list(option_data.keys())[0] # key는 옵션이름 value는 가격
|
||
# first_value = option_data[first_key] # key는 옵션이름 value는 가격
|
||
# await input_field.type(f"- 1. {first_key}")
|
||
# await input_field.press('Enter') # 첫 번째 옵션 이후 엔터로 줄바꿈
|
||
|
||
# # 나머지 옵션도 번역된 옵션명만 입력
|
||
# for i, (key, value) in enumerate(list(option_data.items())[1:], start=2):
|
||
# await input_field.type(f"{key}") # 2번옵션부터 번호 떼고 입력. key는 옵션이름 value는 가격
|
||
# await input_field.press('Enter') # 엔터 키를 입력하여 줄바꿈
|
||
|
||
# # 목록 끝을 알리기 위해 엔터 두 번 입력
|
||
# await input_field.press('Enter')
|
||
# await input_field.press('Enter')
|
||
|
||
# # 후두부 텍스트 입력
|
||
# await input_field.type('### 나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.')
|
||
# await input_field.press('Enter')
|
||
# await input_field.type('---')
|
||
# await input_field.press('Enter')
|
||
|
||
# self.logger.log('옵션 데이터 입력 완료', level=logging.INFO)
|
||
|
||
# return image_urls
|
||
# except Exception as e:
|
||
# self.logger.log(f"이미지 URL 추출 & 옵션데이터 입력 처리 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
# return image_urls if image_urls else []
|
||
|
||
# def paste_image_in_chrome(self, url, is_success_translated, toggle_states, is_watermark=False, watermark_text= ""):
|
||
# """크롬으로 포커스를 옮기고 클립보드의 이미지를 붙여넣고 엔터 입력"""
|
||
# self.logger.log("크롬으로 포커스를 옮기고 클립보드의 이미지를 붙여넣고 엔터 입력", level=logging.DEBUG)
|
||
# try:
|
||
# # self.switch_to_chrome() # 크롬으로 포커스 이동
|
||
# self.clipboardImageManager.process_clipboard(original_url=url, is_success_translated=is_success_translated, toggle_states=toggle_states) # 클립보드 내용을 처리
|
||
# # clipboard_content = pyperclip.paste()
|
||
# if self.clipboardImageManager.is_clipboard_image():
|
||
# pyautogui.hotkey('ctrl', 'v') # 클립보드 이미지 붙여넣기
|
||
# self.logger.log("이미지 붙여넣기 완료.", level=logging.INFO)
|
||
# pyautogui.press('right') # 오른쪽 입력
|
||
# self.logger.log("이미지 붙여넣기 완료.", level=logging.DEBUG)
|
||
# self.clipboardImageManager.clear_clipboard()
|
||
# self.logger.log("이미지 붙여넣기 완료로 클립보드 비우기.", level=logging.INFO)
|
||
# return True
|
||
# else:
|
||
# self.logger.log("클립보드가 비어있습니다.", level=logging.WARNING)
|
||
# return False
|
||
# except Exception as e:
|
||
# self.logger.log(f"이미지 붙여넣기 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
# return False
|
||
|
||
async def handle_exist_productname_popup(self, timeout_sec: int = 3) -> bool:
|
||
"""
|
||
이미 존재하는 상품명 알림(다이얼로그/모달) 처리
|
||
|
||
- 다이얼로그가 뜨면 확인버튼 클릭 후 True 반환
|
||
- 다이얼로그가 안 뜨면 모달 감지 후 True 반환
|
||
- 둘 다 없으면 False 반환
|
||
"""
|
||
# 1. 다이얼로그 감지
|
||
try:
|
||
dialog = self.page.locator(self.dialog_selector)
|
||
await dialog.wait_for(state="visible", timeout=timeout_sec * 1000)
|
||
self.logger.log("다이얼로그 팝업 감지됨(이미 존재하는 상품명).", level=logging.INFO)
|
||
ok_btn = self.page.locator(self.dialog_ok_selector)
|
||
await ok_btn.click()
|
||
self.logger.log("다이얼로그 확인버튼 클릭 완료.", level=logging.INFO)
|
||
try:
|
||
await self.page.locator(self.dialog_selector).wait_for(state="detached", timeout=2000)
|
||
return True
|
||
except TimeoutError:
|
||
await ok_btn.click()
|
||
await self.page.locator(self.dialog_selector).wait_for(state="detached", timeout=2000)
|
||
return True
|
||
except TimeoutError:
|
||
self.logger.log("다이얼로그 팝업 감지 안됨.", level=logging.WARNING)
|
||
|
||
return False # 상품명 재수정 필요 없음
|
||
|
||
async def handle_save_with_error_recovery(self, title_infos, context="일반"):
|
||
"""
|
||
저장 시 발생할 수 있는 오류를 처리하는 통합 메서드
|
||
|
||
Args:
|
||
title_infos (dict): 상품 정보 딕셔너리
|
||
context (str): 호출 컨텍스트 (옵션, 가격, 썸네일, 태그, 상세페이지 등)
|
||
|
||
Returns:
|
||
str: "SAVED" - 정상 저장 완료
|
||
"DELETED" - 상품 삭제 완료
|
||
"ERROR" - 처리 실패
|
||
"DUPLICATE_HANDLED" - 중복 상품명 처리 완료
|
||
"""
|
||
try:
|
||
# 저장 버튼 클릭
|
||
await self.page.click(self.save_button_locator)
|
||
self.logger.log(f"[{context}] 저장 버튼 클릭 완료.", level=logging.INFO)
|
||
|
||
# 저장 동작 완료를 위한 잠시 대기
|
||
await asyncio.sleep(0.5)
|
||
|
||
# 중복 상품명 팝업 확인
|
||
rename_need = await self.handle_exist_productname_popup()
|
||
if rename_need:
|
||
self.logger.log(f"[{context}] 상품명 중복발생 : 재설정 필요.", level=logging.INFO)
|
||
if title_infos.get("generated_name"):
|
||
new_title = title_infos["generated_name"]
|
||
else:
|
||
random_suffix = self.generate_random_suffix()
|
||
new_title = "[중복상품]" + title_infos.get("collection_name", "____") + random_suffix
|
||
|
||
duplication_suffix = f'[{context}중복]'
|
||
|
||
# 상품명 탭으로 이동하여 재설정
|
||
await self.edit_product_name()
|
||
await asyncio.sleep(0.5)
|
||
|
||
is_set = await self.titleGenerator.set_product_name(new_title, duplication_suffix)
|
||
if is_set:
|
||
self.logger.log(f"[{context}] 상품명 재설정 완료", level=logging.INFO)
|
||
return "DUPLICATE_HANDLED"
|
||
else:
|
||
self.logger.log(f"[{context}] 상품명 재설정 실패", level=logging.ERROR)
|
||
return "ERROR"
|
||
|
||
# 최종 저장인 경우 다이얼로그 닫기
|
||
if context == "최종저장":
|
||
try:
|
||
if await self.page.is_visible(self.product_dialog_close_btn):
|
||
await self.page.click(self.product_dialog_close_btn)
|
||
self.logger.log(f"[{context}] 다이얼로그 닫기 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
self.logger.log(f"[{context}] 다이얼로그 닫기 중 오류: {e}", level=logging.WARNING)
|
||
|
||
return "SAVED"
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"[{context}] 저장 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
# 에러 발생 시 스크린샷 저장
|
||
screenshot_path = await self.save_error_screenshot(f"{context}_save_error")
|
||
self.logger.log(f"[{context}] 저장 오류로 스크린샷 저장: {screenshot_path}", level=logging.INFO)
|
||
|
||
# 1차 시도: 상품명 재설정
|
||
try:
|
||
self.logger.log(f"[{context}] 1차 시도: 상품명 재설정을 시도합니다.", level=logging.INFO)
|
||
|
||
# 상품명 탭으로 이동
|
||
await self.edit_product_name()
|
||
await asyncio.sleep(0.5)
|
||
|
||
# 새로운 상품명 설정
|
||
if title_infos.get("generated_name"):
|
||
new_title = title_infos["generated_name"]
|
||
else:
|
||
random_suffix = self.generate_random_suffix()
|
||
new_title = "[중복상품]" + title_infos.get("collection_name", "====") + random_suffix
|
||
|
||
error_suffix = f'[{context}오류복구]'
|
||
is_set = await self.titleGenerator.set_product_name(new_title, error_suffix)
|
||
|
||
if is_set:
|
||
self.logger.log(f"[{context}] 상품명 재설정 완료, 다시 저장을 시도합니다.", level=logging.INFO)
|
||
|
||
# 다시 저장 시도
|
||
await self.page.click(self.save_button_locator)
|
||
await asyncio.sleep(0.5)
|
||
|
||
# 최종 저장인 경우 다이얼로그 닫기
|
||
if context == "최종저장":
|
||
try:
|
||
if await self.page.is_visible(self.product_dialog_close_btn):
|
||
await self.page.click(self.product_dialog_close_btn)
|
||
self.logger.log(f"[{context}] 다이얼로그 닫기 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
self.logger.log(f"[{context}] 다이얼로그 닫기 중 오류: {e}", level=logging.WARNING)
|
||
|
||
return "SAVED"
|
||
|
||
except Exception as retry_error:
|
||
self.logger.log(f"[{context}] 상품명 재설정 시도 중 오류 발생: {retry_error}", level=logging.ERROR, exc_info=True)
|
||
|
||
# 2차 시도: 상품 삭제 (모든 컨텍스트에서 가능)
|
||
try:
|
||
self.logger.log(f"[{context}] 2차 시도: 상품 삭제를 시도합니다.", level=logging.WARNING)
|
||
|
||
# 먼저 다른 다이얼로그들 처리
|
||
await self.handle_other_dialogs_before_delete()
|
||
|
||
# 삭제 버튼 존재 확인 및 클릭
|
||
delete_button_selector = "div.Product_Edit_Detail_Dialog_Drawer .ant-btn-dangerous[type='button']"
|
||
|
||
await self.page.wait_for_selector(delete_button_selector, timeout=5000)
|
||
self.logger.log(f"[{context}] 삭제 버튼 발견, 클릭합니다.", level=logging.INFO)
|
||
|
||
await self.page.click(delete_button_selector)
|
||
await asyncio.sleep(0.5)
|
||
|
||
# 삭제 확인 다이얼로그의 확인 버튼 클릭
|
||
confirm_delete_selector = "div.ant-modal-centered [role='dialog'] .ant-btn.ant-btn-dangerous[type='button']"
|
||
|
||
await self.page.wait_for_selector(confirm_delete_selector, timeout=5000)
|
||
self.logger.log(f"[{context}] 삭제 확인 다이얼로그 발견, 확인 버튼을 클릭합니다.", level=logging.INFO)
|
||
|
||
await self.page.click(confirm_delete_selector)
|
||
await asyncio.sleep(0.5)
|
||
|
||
# 삭제 버튼이 사라졌는지 확인 (상품수정 다이얼로그가 닫혔는지 확인)
|
||
await self.page.wait_for_selector(delete_button_selector, state="detached", timeout=10000)
|
||
self.logger.log(f"[{context}] 상품 삭제 완료, 다이얼로그가 닫혔습니다.", level=logging.INFO)
|
||
|
||
return "DELETED"
|
||
|
||
except Exception as delete_error:
|
||
self.logger.log(f"[{context}] 상품 삭제 시도 중 오류 발생: {delete_error}", level=logging.ERROR, exc_info=True)
|
||
|
||
return "ERROR"
|
||
|
||
async def handle_other_dialogs_before_delete(self):
|
||
"""
|
||
상품 삭제 전에 다른 다이얼로그들을 처리하는 메서드
|
||
div.ant-modal-confirm[role='dialog'] 형태의 다이얼로그에서
|
||
삭제, 저장, 닫기, 확인, 취소 버튼을 찾아 처리
|
||
"""
|
||
try:
|
||
# 최대 3번 시도
|
||
for attempt in range(3):
|
||
# 다이얼로그가 있는지 확인
|
||
modal_dialogs = await self.page.query_selector_all("div.ant-modal-confirm[role='dialog']")
|
||
|
||
if not modal_dialogs:
|
||
self.logger.log("처리할 다이얼로그가 없습니다.", level=logging.DEBUG)
|
||
break
|
||
|
||
self.logger.log(f"시도 {attempt + 1}: {len(modal_dialogs)}개의 다이얼로그 발견", level=logging.INFO)
|
||
|
||
for i, dialog in enumerate(modal_dialogs):
|
||
try:
|
||
# 버튼 우선순위: 삭제 > 확인 > 저장 > 닫기 > 취소
|
||
button_texts = ["삭제", "확인", "저장", "닫기", "취소"]
|
||
|
||
clicked = False
|
||
for button_text in button_texts:
|
||
# 버튼 찾기
|
||
button = await dialog.query_selector(f"button:has-text('{button_text}')")
|
||
if button:
|
||
await button.click()
|
||
self.logger.log(f"다이얼로그 {i+1}에서 '{button_text}' 버튼 클릭", level=logging.INFO)
|
||
clicked = True
|
||
await asyncio.sleep(0.3)
|
||
break
|
||
|
||
if not clicked:
|
||
self.logger.log(f"다이얼로그 {i+1}에서 처리할 버튼을 찾지 못했습니다.", level=logging.WARNING)
|
||
|
||
except Exception as dialog_error:
|
||
self.logger.log(f"다이얼로그 {i+1} 처리 중 오류: {dialog_error}", level=logging.WARNING)
|
||
|
||
# 다이얼로그 처리 후 잠시 대기
|
||
await asyncio.sleep(0.5)
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"다이얼로그 처리 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def save_product_edit(self):
|
||
"""상품 수정 후 저장 버튼 클릭"""
|
||
try:
|
||
await self.page.click(self.save_button_locator)
|
||
self.logger.log("상품 수정 내용 저장 완료.", level=logging.INFO)
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"저장 버튼 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
async def go_to_next_page_ori(self):
|
||
"""다음 페이지로 이동"""
|
||
try:
|
||
# 현재 페이지가 몇 번째 페이지인지 확인 (클래스에 'ant-pagination-item-active'가 있는 요소)
|
||
current_page = await self.page.query_selector(self.current_page_locator)
|
||
|
||
if not current_page:
|
||
self.logger.log("현재 페이지 정보를 찾을 수 없습니다.", level=logging.WARNING)
|
||
return False
|
||
|
||
# 현재 활성화된 페이지 번호를 가져옴
|
||
current_page_number = int(await current_page.get_attribute("title"))
|
||
self.logger.log(f"현재페이지 : [{current_page_number}]", level=logging.INFO)
|
||
|
||
next_page_number = current_page_number + 1
|
||
# 다음 페이지 버튼을 찾음 (title 속성으로 다음 페이지를 찾음)
|
||
next_page_button_locator = self.next_page_button_template.format(page_number=next_page_number)
|
||
next_page_button = await self.page.query_selector(next_page_button_locator)
|
||
|
||
if next_page_button:
|
||
await next_page_button.click() # 페이지 버튼 클릭
|
||
# await self.page.wait_for_load_state('domcontentloaded') # 페이지 로딩이 완료될 때까지 대기
|
||
time.sleep(3)
|
||
self.logger.log(f"페이지 {next_page_number}로 이동 완료.", level=logging.INFO)
|
||
return True
|
||
else:
|
||
self.logger.log("다음 페이지가 없습니다.", level=logging.WARNING)
|
||
return False
|
||
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"다음 페이지로 이동 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
return False
|
||
|
||
async def go_to_next_page(self):
|
||
"""
|
||
수정된 CSS 선택자를 사용하여 다음 페이지로 이동합니다.
|
||
"""
|
||
try:
|
||
current_page_element = await self.page.query_selector(self.current_page_locator)
|
||
if not current_page_element:
|
||
self.logger.log("현재 페이지 정보를 찾을 수 없습니다.", level=logging.WARNING)
|
||
return False
|
||
|
||
current_page_number = int(await current_page_element.get_attribute("title"))
|
||
self.logger.log(f"현재 페이지 번호: {current_page_number}", level=logging.INFO)
|
||
next_page_number = current_page_number + 1
|
||
next_page_button_locator = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template').format(page_number=next_page_number)
|
||
self.logger.log(f"다음 페이지 선택자: {next_page_button_locator}", level=logging.DEBUG)
|
||
next_page_button = await self.page.query_selector(next_page_button_locator)
|
||
|
||
if next_page_button:
|
||
await next_page_button.click()
|
||
await self.page.wait_for_load_state('domcontentloaded', timeout=5000)
|
||
self.logger.log(f"페이지 {next_page_number}로 이동 완료.", level=logging.INFO)
|
||
return True
|
||
else:
|
||
self.logger.log("다음 페이지가 없습니다.", level=logging.WARNING)
|
||
return False
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"다음 페이지로 이동 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
return False
|
||
|
||
async def scroll_with_wheel(self, direction="down", pause_time=0.5, max_scrolls=50):
|
||
"""
|
||
휠 스크롤을 사용하여 페이지를 위나 아래로 천천히 스크롤.
|
||
|
||
Parameters:
|
||
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
|
||
- pause_time: 스크롤 사이의 대기 시간 (초).
|
||
- max_scrolls: 최대 스크롤 횟수.
|
||
"""
|
||
scroll_count = 0
|
||
|
||
self.logger.log(f"스크롤 시작", level=logging.DEBUG)
|
||
|
||
# 현재 페이지 높이 가져오기
|
||
last_height = await self.page.evaluate("document.body.scrollHeight")
|
||
self.logger.log(f"현재 페이지 높이 가져오기 - {last_height}", level=logging.DEBUG)
|
||
|
||
while scroll_count < max_scrolls:
|
||
if direction == "down":
|
||
# 아래로 스크롤
|
||
self.logger.log(f"scroll_count[{scroll_count}]회 : 휠 아래로 1000px", level=logging.DEBUG)
|
||
await self.page.evaluate("window.scrollBy(0, 1000);")
|
||
elif direction == "up":
|
||
# 위로 스크롤
|
||
self.logger.log(f"scroll_count[{scroll_count}]회 : 휠 위로 1000px", level=logging.DEBUG)
|
||
await self.page.evaluate("window.scrollBy(0, -1000);")
|
||
else:
|
||
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
|
||
|
||
self.logger.log(f"pause_time 슬립 : {pause_time}", level=logging.DEBUG)
|
||
await asyncio.sleep(pause_time)
|
||
|
||
# 새로운 페이지 높이 가져오기
|
||
new_height = await self.page.evaluate("document.body.scrollHeight")
|
||
self.logger.log(f"새로운 페이지 높이 가져오기 - {new_height}", level=logging.DEBUG)
|
||
|
||
# 스크롤이 더 이상 필요 없는 경우(페이지 끝에 도달)
|
||
if direction == "down" and new_height == last_height:
|
||
self.logger.log(f"페이지 끝에 도달했습니다. 스크롤 횟수: {scroll_count}", level=logging.DEBUG)
|
||
break
|
||
elif direction == "up" and new_height == 0:
|
||
self.logger.log(f"페이지 시작에 도달했습니다. 스크롤 횟수: {scroll_count}", level=logging.DEBUG)
|
||
break
|
||
|
||
self.logger.log(f"새로운 페이지 높이를 현재높이로 재설정 - {new_height}", level=logging.DEBUG)
|
||
last_height = new_height
|
||
scroll_count += 1
|
||
self.logger.log(f"스크롤 카운트 + 1", level=logging.DEBUG)
|
||
|
||
if scroll_count == max_scrolls:
|
||
self.logger.log("최대 스크롤 횟수에 도달했습니다.", level=logging.DEBUG)
|
||
|
||
async def scroll_with_keyboard(self, direction="down", pause_time=0.5, max_scrolls=50):
|
||
"""
|
||
키보드를 사용하여 페이지를 위나 아래로 천천히 스크롤.
|
||
|
||
Parameters:
|
||
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
|
||
- pause_time: 스크롤 사이의 대기 시간 (초).
|
||
- max_scrolls: 최대 스크롤 횟수.
|
||
"""
|
||
scroll_count = 0
|
||
|
||
while scroll_count < max_scrolls:
|
||
if direction == "down":
|
||
# 아래로 스크롤 (Page Down 키 사용)
|
||
await self.page.keyboard.press("PageDown")
|
||
elif direction == "up":
|
||
# 위로 스크롤 (Page Up 키 사용)
|
||
await self.page.keyboard.press("PageUp")
|
||
else:
|
||
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
|
||
|
||
await asyncio.sleep(pause_time)
|
||
|
||
scroll_count += 1
|
||
|
||
if scroll_count == max_scrolls:
|
||
self.logger.log("최대 스크롤 횟수에 도달했습니다.", level=logging.DEBUG)
|
||
|
||
|
||
async def collect_product_info(self, items_per_page, ed_mode):
|
||
"""
|
||
상품 정보를 수집하는 메서드
|
||
"""
|
||
try:
|
||
product_infos = []
|
||
product_name_elements = [] # product_name_element를 저장할 리스트
|
||
|
||
# ed_mode에 따라 product_elements 설정
|
||
if ed_mode:
|
||
# 각 상품의 이름, 가격, 이미지를 위한 선택자 리스트 구성 (index가 2부터 시작)
|
||
product_elements = [
|
||
{
|
||
"name": self.product_name_for_ed_template.format(index=i),
|
||
"price": self.product_price_for_ed_template.format(index=i),
|
||
"image": self.product_image_for_ed_template.format(index=i)
|
||
}
|
||
for i in range(2, items_per_page + 2) # index가 2부터 시작하도록 설정
|
||
]
|
||
else:
|
||
# ed_mode=False일 때는 각 상품의 부모 요소를 모두 선택
|
||
product_elements = await self.page.query_selector_all(self.product_parent_locator)
|
||
|
||
for i, element in enumerate(product_elements[:items_per_page], start=1):
|
||
try:
|
||
if ed_mode:
|
||
# ed_mode=True일 때는 각 상품의 개별 선택자 사용
|
||
product_name_element = await self.page.wait_for_selector(element["name"], timeout=3000, state="attached")
|
||
product_price_element = await self.page.wait_for_selector(element["price"], timeout=3000, state="attached")
|
||
product_image_element = await self.page.wait_for_selector(element["image"], timeout=3000, state="attached")
|
||
else:
|
||
# ed_mode=False일 때 부모 요소 내의 선택자를 사용
|
||
product_name_element = await self.page.wait_for_selector(self.product_name_inner_locator, timeout=3000, state="attached")
|
||
product_price_element = await self.page.wait_for_selector(self.product_price_inner_locator, timeout=3000, state="attached")
|
||
product_image_element = await self.page.wait_for_selector(self.product_image_inner_locator, timeout=3000, state="attached")
|
||
|
||
# 요소가 존재하면 정보 추출
|
||
self.logger.log(f"product_name_element : {product_name_element}", level=logging.DEBUG)
|
||
self.logger.log(f"product_price_element : {product_price_element}", level=logging.DEBUG)
|
||
self.logger.log(f"product_image_element : {product_image_element}", level=logging.DEBUG)
|
||
|
||
if product_name_element and product_price_element and product_image_element:
|
||
# await의 결과를 각 변수에 저장
|
||
product_name_text = (await product_name_element.inner_text()).strip()
|
||
product_price_text = (await product_price_element.inner_text()).strip()
|
||
product_image_url = await product_image_element.get_attribute('src')
|
||
|
||
# product_info 딕셔너리에 결과 저장
|
||
product_info = {
|
||
"name": product_name_text,
|
||
"price": product_price_text,
|
||
"image_url": product_image_url
|
||
}
|
||
self.logger.log(f"상품 {i}: {product_info}", level=logging.DEBUG)
|
||
product_infos.append(product_info)
|
||
product_name_elements.append(product_name_element) # 각 product_name_element 추가
|
||
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"상품 {i} 정보 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
continue
|
||
|
||
return product_infos, product_name_elements # product_infos와 product_name_elements 함께 반환
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"상품 정보 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
return []
|
||
|
||
|
||
|
||
|
||
async def scroll_page_to_bottom(self, pause_time=0.2):
|
||
"""페이지의 맨 아래까지 스크롤하여 모든 동적 요소를 로드"""
|
||
self.logger.log('페이지 스크롤 시작...', level=logging.INFO)
|
||
|
||
try:
|
||
# 타임아웃 설정으로 API 대기 중에도 안전하게 동작
|
||
previous_height = await asyncio.wait_for(
|
||
self.page.evaluate("() => document.body.scrollHeight"),
|
||
timeout=5.0
|
||
)
|
||
|
||
retry_count = 0
|
||
max_retries = 10 # 최대 재시도 횟수
|
||
|
||
while retry_count < max_retries:
|
||
try:
|
||
# 스크롤 실행에도 타임아웃 적용
|
||
await asyncio.wait_for(
|
||
self.page.evaluate("window.scrollBy(0, window.innerHeight);"),
|
||
timeout=3.0
|
||
)
|
||
await asyncio.sleep(pause_time)
|
||
|
||
# 현재 높이 체크에도 타임아웃 적용
|
||
current_height = await asyncio.wait_for(
|
||
self.page.evaluate("() => document.body.scrollHeight"),
|
||
timeout=3.0
|
||
)
|
||
|
||
if current_height == previous_height:
|
||
break # 더 이상 스크롤할 내용이 없으면 종료
|
||
previous_height = current_height
|
||
retry_count += 1
|
||
|
||
except asyncio.TimeoutError:
|
||
self.logger.log(f'스크롤 중 타임아웃 발생 (시도 {retry_count + 1}/{max_retries})', level=logging.WARNING)
|
||
retry_count += 1
|
||
await asyncio.sleep(1.0) # 타임아웃 후 잠시 대기
|
||
|
||
# 페이지 상태 재확인
|
||
try:
|
||
previous_height = await asyncio.wait_for(
|
||
self.page.evaluate("() => document.body.scrollHeight"),
|
||
timeout=2.0
|
||
)
|
||
except asyncio.TimeoutError:
|
||
self.logger.log('페이지 상태 확인 실패, 스크롤 중단', level=logging.WARNING)
|
||
break
|
||
|
||
if retry_count >= max_retries:
|
||
self.logger.log('최대 재시도 횟수 도달, 스크롤 강제 종료', level=logging.WARNING)
|
||
else:
|
||
self.logger.log('페이지 스크롤 완료.', level=logging.INFO)
|
||
|
||
except asyncio.TimeoutError:
|
||
self.logger.log('페이지 스크롤 초기화 중 타임아웃 발생, 스크롤 건너뛰기', level=logging.WARNING)
|
||
except Exception as e:
|
||
self.logger.log(f'페이지 스크롤 중 예외 발생: {e}', level=logging.ERROR)
|
||
# 에러가 발생해도 작업을 계속 진행
|
||
|
||
async def scroll_page_to_top(self, pause_time=0.2):
|
||
"""페이지의 맨 위까지 스크롤"""
|
||
self.logger.log('페이지 위로 스크롤 시작...', level=logging.INFO)
|
||
|
||
try:
|
||
# 타임아웃 설정으로 API 대기 중에도 안전하게 동작
|
||
previous_height = await asyncio.wait_for(
|
||
self.page.evaluate("() => window.pageYOffset"),
|
||
timeout=5.0
|
||
)
|
||
|
||
retry_count = 0
|
||
max_retries = 10 # 최대 재시도 횟수
|
||
|
||
while previous_height > 0 and retry_count < max_retries:
|
||
try:
|
||
# 스크롤 실행에도 타임아웃 적용
|
||
await asyncio.wait_for(
|
||
self.page.evaluate("window.scrollBy(0, -window.innerHeight);"),
|
||
timeout=3.0
|
||
)
|
||
await asyncio.sleep(pause_time)
|
||
|
||
# 현재 높이 체크에도 타임아웃 적용
|
||
current_height = await asyncio.wait_for(
|
||
self.page.evaluate("() => window.pageYOffset"),
|
||
timeout=3.0
|
||
)
|
||
|
||
if current_height == previous_height:
|
||
break # 더 이상 스크롤할 내용이 없으면 종료
|
||
previous_height = current_height
|
||
retry_count += 1
|
||
|
||
except asyncio.TimeoutError:
|
||
self.logger.log(f'스크롤 중 타임아웃 발생 (시도 {retry_count + 1}/{max_retries})', level=logging.WARNING)
|
||
retry_count += 1
|
||
await asyncio.sleep(1.0) # 타임아웃 후 잠시 대기
|
||
|
||
# 페이지 상태 재확인
|
||
try:
|
||
previous_height = await asyncio.wait_for(
|
||
self.page.evaluate("() => window.pageYOffset"),
|
||
timeout=2.0
|
||
)
|
||
except asyncio.TimeoutError:
|
||
self.logger.log('페이지 상태 확인 실패, 스크롤 중단', level=logging.WARNING)
|
||
break
|
||
|
||
if retry_count >= max_retries:
|
||
self.logger.log('최대 재시도 횟수 도달, 스크롤 강제 종료', level=logging.WARNING)
|
||
else:
|
||
self.logger.log('페이지 위로 스크롤 완료.', level=logging.INFO)
|
||
|
||
except asyncio.TimeoutError:
|
||
self.logger.log('페이지 스크롤 초기화 중 타임아웃 발생, 스크롤 건너뛰기', level=logging.WARNING)
|
||
except Exception as e:
|
||
self.logger.log(f'페이지 스크롤 중 예외 발생: {e}', level=logging.ERROR)
|
||
# 에러가 발생해도 작업을 계속 진행
|
||
|
||
|
||
|
||
|
||
async def start_Percenty_task(self):
|
||
self.logger.log('퍼센티 상품수정 작업을 시작합니다...', level=logging.DEBUG)
|
||
self.running = True # 번역 작업이 시작됨
|
||
self.translation_started.emit()
|
||
|
||
try:
|
||
# 금지어 목록 업데이트
|
||
self.forbidden_word_manager.refresh_forbidden_words()
|
||
|
||
optionIMGTrans_type = self.toggle_states['optionIMGTrans_type']
|
||
detail_IMGTrans_type = self.toggle_states['detail_IMGTrans_type']
|
||
thumb_trans_type = self.toggle_states['thumb_trans_type']
|
||
|
||
self.logger.log(f"optionIMGTrans_type : {optionIMGTrans_type}", level=logging.DEBUG)
|
||
self.logger.log(f"detail_IMGTrans_type : {detail_IMGTrans_type}", level=logging.DEBUG)
|
||
self.logger.log(f"thumb_trans_type : {thumb_trans_type}", level=logging.DEBUG)
|
||
|
||
# 1. 총 상품 수 수집
|
||
self.check_pause() # 일시중지 상태 확인
|
||
await self.scroll_page_to_bottom() # 동적 로딩을 위해 끝까지 스크롤
|
||
|
||
# total_products = await self.browser_controller.get_total_product_count(ed_mode=self.toggle_states['ed_mode'])
|
||
|
||
# get_total_product_count 메서드 호출 후 결과를 딕셔너리로 받음
|
||
result = await self.get_total_product_count()
|
||
# 딕셔너리에서 총 상품 수와 페이지당 상품 수를 추출
|
||
total_products = result.get("total_count", 0)
|
||
items_per_page = result.get("items_per_page", 0)
|
||
total_pages = math.ceil(total_products / items_per_page)
|
||
self.logger.log(f"총 상품: {total_products}, 페이지당: {items_per_page}, 총 페이지: {total_pages}", level=logging.DEBUG)
|
||
self.logger.log(f"[ self.toggle_states 상태 ]\n{self.toggle_states}", level=logging.DEBUG)
|
||
|
||
if total_products == 0:
|
||
self.logger.log('수집할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG)
|
||
return
|
||
|
||
completed_count = 0
|
||
page_number = 1
|
||
|
||
# 3. 총 상품 수만큼 반복 작업 수행
|
||
# while self.running and completed_count < total_products:
|
||
for page_number in range(1, total_pages + 1):
|
||
self.check_pause() # 일시중지 상태 확인
|
||
# self.logger.log(f'현재 페이지: {page_number}', level=logging.DEBUG)
|
||
self.logger.log(f'=== 페이지 {page_number}/{total_pages} 시작 ===', level=logging.INFO)
|
||
|
||
|
||
# if not page_number == 1:
|
||
# await self.scroll_page_to_top()
|
||
# self.logger.log(f'1페이지가 아니므로 동적로딩을 위해 휠 스크롤 업', level=logging.DEBUG)
|
||
|
||
await self.scroll_page_to_top()
|
||
self.logger.log(f'동적로딩을 위해 휠 스크롤 업', level=logging.DEBUG)
|
||
|
||
if not self.toggle_states['ed_mode']:
|
||
# 4. 현재 페이지의 모든 "세부사항 수정 및 업로드" 버튼 찾기
|
||
self.logger.log('수정모드가 아니므로 상품수정 버튼 elements를 수집합니다.', level=logging.DEBUG)
|
||
product_buttons = await self.get_product_edit_buttons_by_template()
|
||
else:
|
||
self.logger.log('상품정보 수집', level=logging.DEBUG)
|
||
product_infos, product_name_elements = await self.collect_product_info(items_per_page, ed_mode=self.toggle_states['ed_mode'])
|
||
self.logger.log(f"product_infos : {product_infos}", level=logging.DEBUG)
|
||
self.logger.log('수정모드이므로 상품명 elements를 수정버튼으로 활용합니다.', level=logging.DEBUG)
|
||
product_buttons = [{"edit_button": name_element, "memo_button": None, "shipping_button": None} for name_element in product_name_elements]
|
||
|
||
self.logger.log(f"product_buttons 갯수 : [{len(product_buttons)}]개", level=logging.DEBUG)
|
||
|
||
if not product_buttons:
|
||
self.logger.log('수정할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG)
|
||
break
|
||
|
||
# 5. 각 상품에 대해 상품수정작업 수행
|
||
for index, button_set in enumerate(product_buttons, start=1):
|
||
try:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
edit_button = button_set.get("edit_button")
|
||
memo_button = button_set.get("memo_button")
|
||
shipping_button = button_set.get("shipping_button") # 해외배송비 수정 버튼
|
||
|
||
# 상태 초기화 코드 추가
|
||
self.titleGenerator.reset_state()
|
||
self.optionHandler.reset_state()
|
||
self.priceHandler.reset_state()
|
||
self.tagsHandler.reset_state()
|
||
self.thumbnailHandler.reset_state()
|
||
self.detailHandler.reset_state()
|
||
# self.clipboardImageManager.reset_state() # 클립보드 초기화
|
||
|
||
# 그 외 임시 변수 초기화
|
||
self.current_options_info = None
|
||
|
||
# 상품명 수집 오류 처리
|
||
self.logger.log(f'{index}/{len(product_buttons)} 버튼의 활성상태 확인 중...', level=logging.DEBUG)
|
||
|
||
is_disabled = await self.is_button_disabled(edit_button)
|
||
if is_disabled:
|
||
self.logger.log(f'{index}/{len(product_buttons)}: 상품의 수정버튼이 비활성화되어 있어 작업을 건너뜁니다.', level=logging.DEBUG)
|
||
# completed_count += 1
|
||
# self.total_progressbar_signal.emit(completed_count, total_products)
|
||
continue
|
||
|
||
self.logger.log(f'{index}/{len(product_buttons)}: 세부사항 수정 작업 중...', level=logging.DEBUG)
|
||
|
||
# 상품 수정 다이얼로그 열기
|
||
await self.open_product_edit_dialog(edit_button)
|
||
|
||
self.check_pause() # 일시중지 상태 확인
|
||
title_infos = await self.titleGenerator.get_initial_info(self.price_setting_diag)
|
||
self.logger.log(f"title_infos : {title_infos}", level=logging.DEBUG)
|
||
|
||
|
||
# 금지카테고리 여부가 True이면, 금지카테고리 제목 설정 후 저장하고 다음 상품으로 넘어감.
|
||
# if title_infos.get("is_banned_category", False):
|
||
if title_infos.get("is_banned_category", False):
|
||
banned_title = self.titleGenerator.set_banned_category_title(title_infos.get("banned_category_info"))
|
||
self.logger.log(f"금지카테고리 상품 처리: 새 제목: {banned_title}", level=logging.INFO)
|
||
|
||
# 랜덤 4자리 붙이기
|
||
random_suffix = self.generate_random_suffix()
|
||
|
||
# 상품명 설정 (TitleGenerator 내부 또는 별도 메서드를 통해)
|
||
is_set = await self.titleGenerator.set_product_name(banned_title + random_suffix)
|
||
if is_set:
|
||
self.logger.log("금지카테고리 상품명 설정 완료.", level=logging.INFO)
|
||
else:
|
||
self.logger.log("금지카테고리 상품명 설정 실패.", level=logging.WARNING)
|
||
|
||
# 최종 저장 (다이얼로그 닫기 포함)
|
||
save_result = await self.handle_save_with_error_recovery(title_infos, "최종저장")
|
||
|
||
if save_result == "DUPLICATE_HANDLED":
|
||
self.logger.log("금지카테고리 상품명 중복 처리 완료.", level=logging.INFO)
|
||
elif save_result == "SAVED":
|
||
self.logger.log("금지카테고리 상품 저장 완료.", level=logging.INFO)
|
||
elif save_result == "DELETED":
|
||
self.logger.log("금지카테고리 상품 삭제 완료.", level=logging.INFO)
|
||
else:
|
||
self.logger.log("금지카테고리 상품 처리 실패.", level=logging.ERROR)
|
||
|
||
continue # 다음 상품으로 넘어감
|
||
|
||
await self.random_human_behavior(self.page)
|
||
|
||
# 정상상품이면 상품명 수정과 카테고리 수집
|
||
is_title = self.toggle_states['title']
|
||
is_title_shuffle = self.toggle_states['title_shuffle']
|
||
if is_title or is_title_shuffle:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
self.logger.log(f"상품명 수정 : {is_title} ", level=logging.DEBUG)
|
||
self.start_stage_signal.emit(0)
|
||
title_infos["generated_name"] = await self.titleGenerator.process_title()
|
||
self.complete_stage_signal.emit(0)
|
||
|
||
# 옵션 수정
|
||
self.start_stage_signal.emit(1)
|
||
await self.random_human_behavior(self.page)
|
||
is_optionTrnas = self.toggle_states.get('optionTrnas')
|
||
is_optionIMGTrans = self.toggle_states.get('optionIMGTrans')
|
||
is_optionAutoSelect = self.toggle_states.get('optionAutoSelect')
|
||
|
||
if is_optionTrnas or is_optionIMGTrans or is_optionAutoSelect:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
self.logger.log(f"옵션수정 : optionTrnas={is_optionTrnas} + optionIMGTrans={is_optionIMGTrans} + optionAutoSelect={is_optionAutoSelect}", level=logging.DEBUG)
|
||
option_result = await self.edit_option(title_infos.get("original_name", None), title_infos)
|
||
if option_result == "DELETED":
|
||
self.logger.log("옵션 탭에서 상품이 삭제되었습니다. 다음 상품으로 넘어갑니다.", level=logging.INFO)
|
||
# 총 상품수 감소 및 버튼 재수집
|
||
total_products, product_buttons = await self.handle_product_deletion_in_loop(total_products, items_per_page)
|
||
continue # 다음 상품으로 넘어감
|
||
|
||
await self.random_human_behavior(self.page)
|
||
|
||
self.complete_stage_signal.emit(1)
|
||
|
||
# 가격 수정
|
||
self.start_stage_signal.emit(2)
|
||
is_price = self.toggle_states.get('price')
|
||
await self.random_human_behavior(self.page)
|
||
if is_price:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
self.logger.log(f"가격수정 : {is_price} ", level=logging.DEBUG)
|
||
price_result = await self.edit_price(title_infos)
|
||
if price_result == "DELETED":
|
||
self.logger.log("가격 탭에서 상품이 삭제되었습니다. 다음 상품으로 넘어갑니다.", level=logging.INFO)
|
||
# 총 상품수 감소 및 버튼 재수집
|
||
total_products, product_buttons = await self.handle_product_deletion_in_loop(total_products, items_per_page)
|
||
continue # 다음 상품으로 넘어감
|
||
self.complete_stage_signal.emit(2)
|
||
|
||
# 썸네일 수정
|
||
self.start_stage_signal.emit(3)
|
||
thumb = self.toggle_states.get('thumb')
|
||
thumb_nukki = self.toggle_states.get('thumb_nukki')
|
||
self.logger.log(f"썸네일수정 : {thumb} ", level=logging.DEBUG)
|
||
|
||
if thumb or thumb_nukki:
|
||
await self.random_human_behavior(self.page)
|
||
self.check_pause() # 일시중지 상태 확인
|
||
thumb_result = await self.edit_thumb(title_infos)
|
||
if thumb_result == "DELETED":
|
||
self.logger.log("썸네일 탭에서 상품이 삭제되었습니다. 다음 상품으로 넘어갑니다.", level=logging.INFO)
|
||
# 총 상품수 감소 및 버튼 재수집
|
||
total_products, product_buttons = await self.handle_product_deletion_in_loop(total_products, items_per_page)
|
||
continue # 다음 상품으로 넘어감
|
||
self.complete_stage_signal.emit(3)
|
||
|
||
# 태그 수정
|
||
tag = self.toggle_states.get('tag')
|
||
tag_ai = self.toggle_states.get('tag_ai')
|
||
|
||
await self.random_human_behavior(self.page)
|
||
if tag or tag_ai:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
self.logger.log(f"키워드태그 수정 : {tag} + {tag_ai} ", level=logging.DEBUG)
|
||
self.start_stage_signal.emit(4)
|
||
tag_result = await self.edit_tags(title_infos, tag, tag_ai)
|
||
if tag_result == "DELETED":
|
||
self.logger.log("태그 탭에서 상품이 삭제되었습니다. 다음 상품으로 넘어갑니다.", level=logging.INFO)
|
||
# 총 상품수 감소 및 버튼 재수집
|
||
total_products, product_buttons = await self.handle_product_deletion_in_loop(total_products, items_per_page)
|
||
continue # 다음 상품으로 넘어감
|
||
self.complete_stage_signal.emit(4)
|
||
|
||
# 상세페이지 수정 수정
|
||
detail_Option = self.toggle_states.get('detail_Option')
|
||
detail_IMGTrans = self.toggle_states.get('detail_IMGTrans')
|
||
detail_IMGTrans_type = self.toggle_states.get('detail_IMGTrans_type')
|
||
|
||
if detail_Option or detail_IMGTrans:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
self.logger.log(f"상세페이지 수정 : {detail_Option} + {detail_IMGTrans} + {detail_IMGTrans_type}", level=logging.DEBUG)
|
||
|
||
# 상세페이지 수정
|
||
await self.random_human_behavior(self.page)
|
||
self.start_stage_signal.emit(5)
|
||
detail_result = await self.edit_detail(title_infos)
|
||
if detail_result == "DELETED":
|
||
self.logger.log("상세페이지 탭에서 상품이 삭제되었습니다. 다음 상품으로 넘어갑니다.", level=logging.INFO)
|
||
# 총 상품수 감소 및 버튼 재수집
|
||
total_products, product_buttons = await self.handle_product_deletion_in_loop(total_products, items_per_page)
|
||
continue # 다음 상품으로 넘어감
|
||
self.complete_stage_signal.emit(5)
|
||
await self.random_human_behavior(self.page)
|
||
|
||
# 수정 후 저장
|
||
self.logger.log('상품 세부사항 저장 중...', level=logging.DEBUG)
|
||
|
||
save_result = await self.handle_save_with_error_recovery(title_infos, "최종저장")
|
||
|
||
if save_result == "DELETED":
|
||
# 상품이 삭제된 경우 총 상품수 감소
|
||
total_products, product_buttons = await self.handle_product_deletion_in_loop(total_products, items_per_page)
|
||
self.logger.log(f"상품 삭제로 인해 총 상품수 감소: {total_products}", level=logging.INFO)
|
||
self.logger.log(f"상품 삭제 후 재수집된 product_buttons 갯수: [{len(product_buttons)}]개", level=logging.INFO)
|
||
continue # 다음 상품으로 넘어가지 않고 현재 인덱스를 유지
|
||
|
||
elif save_result == "ERROR":
|
||
# 저장도 삭제도 실패한 경우
|
||
self.logger.log("상품 저장 및 삭제 모두 실패했습니다. 다음 상품으로 넘어갑니다.", level=logging.ERROR)
|
||
# 에러가 발생해도 다음 상품으로 진행
|
||
|
||
# save_result == "SAVED"인 경우는 정상 저장이므로 그대로 진행
|
||
|
||
# 메모 입력: 저장이 정상 완료(SAVED) 또는 중복 처리 완료(DUPLICATE_HANDLED)된 경우에만 진행
|
||
if save_result in ("SAVED", "DUPLICATE_HANDLED") and memo_button and self.toggle_states['memo']:
|
||
self.check_pause() # 일시중지 상태 확인
|
||
self.logger.log(f'{index}/{len(product_buttons)}: 메모 입력 중...', level=logging.DEBUG)
|
||
await self.insert_product_infos_to_memo(memo_button, title_infos)
|
||
await self.random_human_behavior(self.page)
|
||
|
||
except Exception as item_err:
|
||
# 이 상품만 실패로 기록하고, 다음 상품으로
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f" ▶ 상품 {index}/{len(product_buttons)} 처리중 오류: {item_err}", level=logging.ERROR, exc_info=True)
|
||
|
||
finally:
|
||
# 이전 상품 처리 중 예외로 다이얼로그가 남아 있을 수 있으므로 먼저 닫아준다.
|
||
try:
|
||
if await self.page.is_visible(self.product_dialog_close_btn):
|
||
await self.page.click(self.product_dialog_close_btn)
|
||
await asyncio.sleep(0.2)
|
||
except Exception as close_err:
|
||
self.logger.log(f"잔존 다이얼로그 닫기 실패: {close_err}", level=logging.DEBUG)
|
||
|
||
# 성공·실패 관계없이 진행 카운트와 프로그레스바 업데이트
|
||
completed_count += 1
|
||
self.logger.log(f'{completed_count}/[{total_products}]개 상품 수정 완료.', level=logging.INFO)
|
||
|
||
title_infos.clear() # 메모 입력 후 초기화
|
||
self.logger.log(f"title_infos 초기화", level=logging.DEBUG)
|
||
|
||
if self.is_image_processor_init:
|
||
is_reset_ocr = self.image_processor.reset_ocr_module()
|
||
if is_reset_ocr:
|
||
self.logger.log(f"상품수정 완료로 인해 OCR 모듈 초기화", level=logging.DEBUG)
|
||
|
||
self.total_progressbar_signal.emit(completed_count, total_products)
|
||
|
||
if (completed_count % 10) == 0:
|
||
gc.collect()
|
||
mem = psutil.virtual_memory()
|
||
self.logger.log(f"GC 호출 후, 메모리 사용량: {mem.percent}% 사용중", level=logging.DEBUG)
|
||
|
||
# 목표 도달 체크
|
||
if completed_count >= total_products:
|
||
break
|
||
|
||
# (3) 다음 페이지로 이동
|
||
await self.random_human_behavior(self.page)
|
||
if page_number < total_pages:
|
||
success = await self.go_to_next_page()
|
||
if not success:
|
||
err = f"페이지 {page_number} → {page_number+1} 이동 실패"
|
||
self.logger.log(err, level=logging.ERROR)
|
||
self.translation_error.emit(err)
|
||
return
|
||
else:
|
||
self.logger.log("마지막 페이지까지 모두 처리했습니다.", level=logging.INFO)
|
||
|
||
except Exception as fatal:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
# 전체 작업 중에 발생한 치명적 예외
|
||
self.logger.log(f"전체 작업 중 오류: {fatal}", level=logging.ERROR, exc_info=True)
|
||
self.translation_error.emit(str(fatal))
|
||
|
||
else:
|
||
# 예외 없이 '정상적으로' 전체 루프를 마쳤을 때
|
||
self.logger.log(f"모든 상품({completed_count}/{total_products}) 처리 완료", level=logging.INFO)
|
||
self.translation_completed.emit(completed_count)
|
||
|
||
async def insert_product_infos_to_memo(self, memo_button, title_infos):
|
||
try:
|
||
collect_method = self.toggle_states['collect_method_combo']
|
||
is_new_memo_first = self.toggle_states['memo_toggle_order']
|
||
is_memo_exposure = self.toggle_states['memo_toggle_exposer']
|
||
|
||
# 1. 메모 버튼 유효성 확인
|
||
if await self.is_button_disabled(memo_button):
|
||
self.logger.log("메모 버튼이 비활성화되어 있어 작업을 건너뜁니다.", level=logging.WARNING)
|
||
return
|
||
|
||
# 2. title_infos 유효성 검사
|
||
top_5_titles = title_infos.get("top_5_titles", [])
|
||
top_5_prices = title_infos.get("top_5_prices", [])
|
||
if not top_5_titles or not top_5_prices:
|
||
self.logger.log("title_infos에 메모할 내용이 없어 건너뜁니다.", level=logging.WARNING)
|
||
return
|
||
|
||
# 3. 메모용 텍스트 생성
|
||
new_memo = self.generate_titles_with_prices(title_infos)
|
||
if not new_memo:
|
||
self.logger.log("생성된 메모 텍스트가 비어 있어 건너뜁니다.", level=logging.WARNING)
|
||
return
|
||
|
||
# 4. 메모 다이얼로그 열기
|
||
await memo_button.click()
|
||
self.logger.log("메모 버튼 클릭", level=logging.DEBUG)
|
||
|
||
memo_input = self.page.locator(self.memo_input_locator)
|
||
await memo_input.wait_for(state="visible")
|
||
self.logger.log("메모 입력란 대기 완료", level=logging.DEBUG)
|
||
|
||
# 5. 기존 메모 읽어오기
|
||
existing = await memo_input.input_value()
|
||
combined = existing.strip()
|
||
|
||
# 6. 새 메모 조합
|
||
suffix = "\n" + "-"*10 + "\n"
|
||
if combined:
|
||
if is_new_memo_first:
|
||
combined = new_memo + suffix + combined
|
||
else:
|
||
combined = combined + suffix + new_memo
|
||
else:
|
||
combined = new_memo
|
||
|
||
# 7. 길이 제한 (2000자)
|
||
if len(combined) > 2000:
|
||
combined = combined[:2000]
|
||
|
||
# 8. 채우기
|
||
await memo_input.fill(combined)
|
||
self.logger.log(f"메모 입력: {combined!r}", level=logging.DEBUG)
|
||
|
||
# 9. 노출 체크박스 처리
|
||
# chk = self.page.locator(
|
||
# 'label:has-text("상품 목록에 메모 내용 노출하기") '
|
||
# 'input[type="checkbox"]'
|
||
# )
|
||
# await chk.wait_for(state="attached")
|
||
# checked = await chk.is_checked()
|
||
# if is_memo_exposure and not checked:
|
||
# await chk.click()
|
||
# self.logger.log("메모 노출 체크 ON", level=logging.DEBUG)
|
||
# elif not is_memo_exposure and checked:
|
||
# await chk.click()
|
||
# self.logger.log("메모 노출 체크 OFF", level=logging.DEBUG)
|
||
|
||
exposer = self.page.locator(self.memo_exposer_locator)
|
||
|
||
# 노출 체크박스 존재 대기
|
||
await exposer.wait_for(state="attached")
|
||
|
||
# 노출 체크박스가 보이고 활성화될 때까지 대기
|
||
await exposer.wait_for(state="visible", timeout=5000)
|
||
|
||
# 활성화 상태 확인 (disabled 속성이 없으면 활성화된 것)
|
||
max_attempts = 10
|
||
for attempt in range(max_attempts):
|
||
is_disabled = await exposer.get_attribute('disabled')
|
||
if is_disabled is None: # disabled 속성이 없으면 활성화됨
|
||
break
|
||
if attempt < max_attempts - 1:
|
||
await asyncio.sleep(0.5)
|
||
else:
|
||
self.logger.log("노출 체크박스가 활성화되지 않았습니다.", level=logging.WARNING)
|
||
|
||
if is_memo_exposure:
|
||
await exposer.check()
|
||
self.logger.log("메모 노출 체크 ON", level=logging.DEBUG)
|
||
|
||
else:
|
||
await exposer.uncheck()
|
||
self.logger.log("메모 노출 체크 OFF", level=logging.DEBUG)
|
||
|
||
# 10. 저장
|
||
save_btn = self.page.locator(self.memo_save_btn_locator)
|
||
await save_btn.wait_for(state="visible")
|
||
await save_btn.click()
|
||
self.logger.log("메모 저장 클릭", level=logging.DEBUG)
|
||
await asyncio.sleep(0.3)
|
||
|
||
except Exception as e:
|
||
screenshot_path = await self.save_error_screenshot()
|
||
self.logger.log(f"메모 입력 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
|
||
# ESC 키 2번 전송하여 팝업 제거 시도
|
||
# self.logger.log("ESC 키를 2번 전송하여 팝업 제거 후 메모 입력 재시도합니다.", level=logging.WARNING)
|
||
await self.page.keyboard.press("Escape")
|
||
await self.page.keyboard.press("Escape")
|
||
await asyncio.sleep(0.5) # 잠시 대기
|
||
|
||
# # 메모 입력 재시도
|
||
# try:
|
||
# await memo_button.click()
|
||
# self.logger.log("재시도: 메모 버튼을 클릭했습니다.", level=logging.DEBUG)
|
||
# memo_input = self.page.locator(self.memo_input_locator)
|
||
# await memo_input.wait_for(state="visible")
|
||
# self.logger.log("재시도: 메모 입력란 대기 완료", level=logging.DEBUG)
|
||
|
||
# # 5. 기존 메모 읽어오기
|
||
# existing = await memo_input.input_value()
|
||
# combined = existing.strip()
|
||
|
||
# # 6. 새 메모 조합
|
||
# suffix = "\n" + "-"*10 + "\n" + f"수집방법:[{collect_method}]\n"
|
||
# if combined:
|
||
# if is_new_memo_first:
|
||
# combined = new_memo + suffix + combined
|
||
# else:
|
||
# combined = combined + suffix + new_memo
|
||
# else:
|
||
# combined = f"수집방법:[{collect_method}]\n" + new_memo
|
||
|
||
# # 7. 길이 제한 (2000자)
|
||
# if len(combined) > 2000:
|
||
# combined = combined[:2000]
|
||
|
||
# # 8. 채우기
|
||
# await memo_input.fill(combined)
|
||
# self.logger.log(f"재시도: 메모 입력: {combined!r}", level=logging.DEBUG)
|
||
|
||
# # 9. 노출 체크박스 처리
|
||
# # chk = self.page.locator(
|
||
# # 'label:has-text("상품 목록에 메모 내용 노출하기") '
|
||
# # 'input[type="checkbox"]'
|
||
# # )
|
||
# # await chk.wait_for(state="attached")
|
||
# # checked = await chk.is_checked()
|
||
# # if is_memo_exposure and not checked:
|
||
# # await chk.click()
|
||
# # self.logger.log("재시도: 메모 노출 체크 ON", level=logging.DEBUG)
|
||
# # elif not is_memo_exposure and checked:
|
||
# # await chk.click()
|
||
# # self.logger.log("재시도: 메모 노출 체크 OFF", level=logging.DEBUG)
|
||
|
||
# is_memo_exposure = self.toggle_states.get("memo_exposure", False)
|
||
# exposer = self.page.locator(self.memo_exposer_locator)
|
||
|
||
# # 노출 체크박스 존재 대기
|
||
# await exposer.wait_for(state="attached")
|
||
|
||
# # 노출 체크박스가 보이고 활성화될 때까지 대기
|
||
# await exposer.wait_for(state="visible", timeout=5000)
|
||
|
||
# # 활성화 상태 확인 (disabled 속성이 없으면 활성화된 것)
|
||
# max_attempts = 10
|
||
# for attempt in range(max_attempts):
|
||
# is_disabled = await exposer.get_attribute('disabled')
|
||
# if is_disabled is None: # disabled 속성이 없으면 활성화됨
|
||
# break
|
||
# if attempt < max_attempts - 1:
|
||
# await asyncio.sleep(0.5)
|
||
# else:
|
||
# self.logger.log("노출 체크박스가 활성화되지 않았습니다.", level=logging.WARNING)
|
||
|
||
# if is_memo_exposure:
|
||
# await exposer.check()
|
||
# else:
|
||
# await exposer.uncheck()
|
||
|
||
# # 10. 저장
|
||
# save_btn = self.page.locator(self.memo_save_btn_locator)
|
||
# await save_btn.wait_for(state="visible")
|
||
# await save_btn.click()
|
||
# self.logger.log("재시도: 메모 저장 클릭", level=logging.DEBUG)
|
||
# await asyncio.sleep(0.3)
|
||
# except Exception as e2:
|
||
# screenshot_path = await self.save_error_screenshot()
|
||
# self.logger.log(f"재시도 후에도 메모 입력 중 오류 발생: {e2}", level=logging.ERROR, exc_info=True)
|
||
# # ESC 키 2번 전송하여 팝업 제거 후 재시도 로직을 제거하고, 이번 상품의 메모 입력을 건너뛰고 반환합니다.
|
||
# self.logger.log("ESC 키를 2번 전송하여 메모 창을 닫고 해당 상품의 메모 입력을 건너뜁니다.", level=logging.WARNING)
|
||
# await self.page.keyboard.press("Escape")
|
||
# await self.page.keyboard.press("Escape")
|
||
# await asyncio.sleep(0.5) # 잠시 대기
|
||
# await asyncio.sleep(0.5)
|
||
# return
|
||
|
||
def generate_titles_with_prices(self, title_infos):
|
||
"""top_5_titles와 top_5_prices를 매칭하여 텍스트 생성"""
|
||
try:
|
||
titles = title_infos.get("top_5_titles", [])
|
||
prices = title_infos.get("top_5_prices", [])
|
||
|
||
if not titles or not prices:
|
||
self.logger.log("top_5_titles 또는 top_5_prices가 비어 있습니다.", level=logging.WARNING)
|
||
return ""
|
||
|
||
# 매칭된 텍스트 생성
|
||
formatted_lines = []
|
||
for title, price in zip(titles, prices):
|
||
formatted_price = f"{int(price):,}원" # 3자리수마다 콤마, "원" 추가
|
||
formatted_lines.append(f"{title} - {formatted_price}")
|
||
|
||
# 최종 텍스트 생성
|
||
result_text = "\n".join(formatted_lines)
|
||
self.logger.log(f"Generated titles with prices:\n{result_text}", level=logging.DEBUG)
|
||
return result_text
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"텍스트 생성 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
|
||
return ""
|
||
|
||
def clear_temp_folder(self, folder_path):
|
||
"""
|
||
폴더 내 모든 파일 삭제, 폴더 없으면 생성
|
||
"""
|
||
if os.path.exists(folder_path):
|
||
# 폴더 내 파일/폴더 삭제
|
||
for filename in os.listdir(folder_path):
|
||
file_path = os.path.join(folder_path, filename)
|
||
try:
|
||
if os.path.isfile(file_path) or os.path.islink(file_path):
|
||
os.unlink(file_path)
|
||
elif os.path.isdir(file_path):
|
||
shutil.rmtree(file_path)
|
||
except Exception as e:
|
||
self.logger.log(f"파일 삭제 실패: {file_path}, 에러: {e}", level=logging.ERROR, exc_info=True)
|
||
else:
|
||
os.makedirs(folder_path, exist_ok=True)
|
||
|
||
|
||
|
||
async def edit_product_name(self):
|
||
# 상세페이지 탭 클릭
|
||
await self.click_title_tab()
|
||
|
||
|
||
async def edit_option(self, product_name, title_infos=None):
|
||
# 옵션 탭 클릭
|
||
await self.click_option_tab()
|
||
|
||
# 옵션 최대선택갯수
|
||
self.current_options_info = await self.optionHandler.process_options(product_name, self.forbidden_word_manager, self.toggle_states)
|
||
|
||
# 수정 후 저장
|
||
if title_infos:
|
||
return await self.save_tab_changes(title_infos, "옵션")
|
||
return "SAVED"
|
||
|
||
async def edit_price(self, title_infos):
|
||
# 가격 탭 클릭
|
||
await self.click_price_tab()
|
||
|
||
# 가격 수정 프로세스
|
||
await self.priceHandler.process_price(title_infos=title_infos)
|
||
|
||
# 수정 후 저장
|
||
return await self.save_tab_changes(title_infos, "가격")
|
||
|
||
|
||
async def edit_thumb(self, title_infos=None):
|
||
# 썸네일 탭 클릭
|
||
await self.click_thumb_tab()
|
||
|
||
# 썸네일 수정 프로세스
|
||
await self.thumbnailHandler.process_thumbnails(self.toggle_states)
|
||
|
||
# 수정 후 저장
|
||
if title_infos:
|
||
return await self.save_tab_changes(title_infos, "썸네일")
|
||
return "SAVED"
|
||
|
||
async def edit_tags(self, title_infos, tag, tag_ai):
|
||
# 태그 탭 클릭
|
||
await self.click_tags_tab()
|
||
|
||
# 태그 수정 프로세스
|
||
await self.tagsHandler.process_tags(self.forbidden_word_manager, title_infos=title_infos, gpt_client=self.gpt_client, tag=tag, tag_ai=tag_ai)
|
||
|
||
# 수정 후 저장
|
||
return await self.save_tab_changes(title_infos, "태그")
|
||
|
||
async def edit_detail(self, title_infos=None):
|
||
# 상세페이지 탭 클릭
|
||
await self.click_detail_tab()
|
||
|
||
# 상세페이지 수정 프로세스
|
||
await self.detailHandler.process_detail(self.optionHandler, self.toggle_states)
|
||
|
||
# 수정 후 저장
|
||
if title_infos:
|
||
return await self.save_tab_changes(title_infos, "상세페이지")
|
||
return "SAVED"
|
||
|
||
|
||
|
||
def run(self):
|
||
"""QThread의 run 메서드에서 이벤트 루프 생성 및 실행"""
|
||
self.logger.log("run() - 이벤트 루프 초기화 시작", level=logging.DEBUG)
|
||
|
||
# 이벤트 루프가 없거나 종료된 경우에만 초기화
|
||
if not self.loop or self.loop.is_closed():
|
||
self.loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(self.loop)
|
||
self.logger.log("run() - 이벤트 루프가 생성되었습니다.", level=logging.DEBUG)
|
||
|
||
# 이벤트 루프를 유지하여 외부에서 작업 추가 가능
|
||
self.loop.run_forever()
|
||
|
||
def start_browser_task(self):
|
||
"""이벤트 루프에서 브라우저 초기화 작업 추가"""
|
||
self.logger.log(f"start_browser_task - 브라우저 초기화 작업 시작 : {self.toggle_states}", level=logging.DEBUG)
|
||
|
||
# 실행 중인 이벤트 루프에 비동기 작업 추가
|
||
if self.loop and not self.loop.is_closed():
|
||
asyncio.run_coroutine_threadsafe(self.start_browser_async(), self.loop)
|
||
self.browser_task_started = True # 작업 실행 플래그 설정
|
||
self.logger.log("start_browser_task - 비동기 작업이 추가되었습니다.", level=logging.DEBUG)
|
||
else:
|
||
self.logger.log("start_browser_task - 실행 중인 이벤트 루프가 없습니다.", level=logging.ERROR)
|
||
|
||
def select_group_task(self, group_index: int):
|
||
"""이벤트 루프에서 그룹 선택 작업 실행"""
|
||
self.logger.log(f"select_group_task - 그룹 선택 작업 추가: {group_index}", level=logging.DEBUG)
|
||
if self.loop and not self.loop.is_closed():
|
||
asyncio.run_coroutine_threadsafe(self.select_group_index(group_index), self.loop)
|
||
self.logger.log("select_group_task - 비동기 그룹선택 작업이 추가됨.", level=logging.DEBUG)
|
||
else:
|
||
self.logger.log("select_group_task - 실행 중인 이벤트 루프가 없습니다.", level=logging.ERROR)
|
||
|
||
def start_PercentyJob_task(self):
|
||
"""번역 작업을 이벤트 루프에서 실행"""
|
||
# 이벤트 루프가 없거나 닫혀 있으면 새로 생성
|
||
# self.initialize_event_loop()
|
||
|
||
if self.loop and not self.loop.is_closed():
|
||
# 이미 실행 중인 이벤트 루프에 번역 작업 추가
|
||
asyncio.run_coroutine_threadsafe(self.start_Percenty_task(), self.loop)
|
||
else:
|
||
self.logger.log("이벤트 루프가 초기화되지 않았거나 이미 종료되었습니다.", level=logging.ERROR)
|
||
|
||
def initialize_event_loop(self):
|
||
"""이벤트 루프가 없거나 닫힌 경우 새로 생성"""
|
||
if not self.loop or self.loop.is_closed():
|
||
self.logger.log("initialize_event_loop - 새로운 이벤트 루프 생성 중", level=logging.DEBUG)
|
||
self.loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(self.loop)
|
||
self.logger.log("initialize_event_loop - 이벤트 루프가 생성되었습니다.", level=logging.DEBUG)
|
||
else:
|
||
self.logger.log("initialize_event_loop - 기존 이벤트 루프 사용 중", level=logging.DEBUG)
|
||
|
||
def stop(self):
|
||
"""Playwright를 안전하게 종료"""
|
||
try:
|
||
# 비동기 함수 호출을 동기식으로 처리
|
||
asyncio.run(self._stop_playwright())
|
||
except RuntimeError as e:
|
||
self.logger.log(f"Playwright 종료 중 오류 발생: {e}", level=logging.ERROR)
|
||
|
||
async def _stop_playwright(self):
|
||
"""모든 페이지/브라우저/Playwright 종료"""
|
||
# 1. 페이지 닫기
|
||
if self.browser:
|
||
for page in self.browser.pages:
|
||
try: await self.page.close()
|
||
except: pass
|
||
try: await self.browser.close()
|
||
except: pass
|
||
# 2. Playwright 종료
|
||
if self.playwright:
|
||
try: await self.playwright.stop()
|
||
except: pass
|
||
# 3. WhaleTranslator 등 추가 리소스
|
||
if self.whale_translator:
|
||
self.whale_translator.close_trans_browser()
|
||
# 4. 기타 핸들러 정리
|
||
if hasattr(self, 'imageProcessor') and self.imageProcessor:
|
||
self.imageProcessor.cleanup()
|
||
# 5. 루프 종료
|
||
if self.loop and not self.loop.is_closed():
|
||
self.loop.call_soon_threadsafe(self.loop.stop)
|
||
|
||
def force_terminate_browser(self):
|
||
"""Playwright 브라우저 프로세스를 강제로 종료"""
|
||
try:
|
||
if self.browser:
|
||
browser_pid = self.browser.contexts[0]._channel.owner._impl_obj._browser_pid
|
||
browser_process = psutil.Process(browser_pid)
|
||
for child in browser_process.children(recursive=True):
|
||
child.kill()
|
||
browser_process.kill()
|
||
self.logger.log("브라우저 프로세스를 강제로 종료했습니다.", level=logging.WARNING)
|
||
|
||
if self.whale_translator:
|
||
self.whale_translator.close_trans_browser()
|
||
|
||
except Exception as e:
|
||
self.logger.log(f"브라우저 프로세스 강제 종료 중 오류 발생: {e}", level=logging.ERROR)
|
||
|
||
def terminate(self):
|
||
"""QThread 종료시 정리"""
|
||
self.logger.log("크롬 스레드 종료", level=logging.INFO)
|
||
self.request_cleanup() # QThread 이벤트루프에서 정리
|
||
super().terminate()
|
||
|
||
def cleanup(self):
|
||
"""BrowserController 종료 시 리소스 정리"""
|
||
try:
|
||
# ImageProcessor 리소스 정리
|
||
if hasattr(self, 'imageProcessor') and self.imageProcessor:
|
||
self.logger.log("ImageProcessor 리소스 정리 중...", level=logging.INFO)
|
||
self.imageProcessor.cleanup()
|
||
except Exception as e:
|
||
self.logger.log(f"리소스 정리 중 오류 발생: {e}", level=logging.ERROR)
|
||
|
||
def request_cleanup(self):
|
||
"""브라우저 리소스 정리 명령을 QThread 이벤트루프에 올림"""
|
||
if self.loop and not self.loop.is_closed():
|
||
asyncio.run_coroutine_threadsafe(self._stop_playwright(), self.loop)
|
||
else:
|
||
self.logger.log("정리 요청 시 루프가 없습니다.", level=logging.ERROR)
|
||
|
||
def check_pause(self):
|
||
"""일시 정지 상태라면 재개될 때까지 대기"""
|
||
self.pause_mutex.lock()
|
||
if self.is_paused:
|
||
# 일시정지 시작 시그널 발생
|
||
self.pause_confirmed.emit()
|
||
self.logger.log("일시정지 상태 확인 - 작업 일시중지됨", level=logging.INFO)
|
||
self.pause_condition.wait(self.pause_mutex)
|
||
self.pause_mutex.unlock()
|
||
|
||
def pause(self):
|
||
"""스레드 일시 정지"""
|
||
self.pause_mutex.lock()
|
||
self.is_paused = True
|
||
self.pause_mutex.unlock()
|
||
self.logger.log("스레드가 일시 정지되었습니다.", level=logging.DEBUG)
|
||
|
||
def resume(self):
|
||
"""스레드 재개"""
|
||
self.pause_mutex.lock()
|
||
self.is_paused = False
|
||
self.pause_condition.wakeAll()
|
||
self.pause_mutex.unlock()
|
||
self.logger.log("스레드가 재개되었습니다.", level=logging.DEBUG)
|
||
|
||
|
||
def log_memory_usage(self, message=""):
|
||
mem = psutil.virtual_memory()
|
||
self.logger.log(f"{message} 메모리 사용: 총 {mem.total/1024/1024:.1f}MB, 사용 {mem.used/1024/1024:.1f}MB, 사용률 {mem.percent}%", level=logging.DEBUG)
|
||
|
||
def pause_task(self):
|
||
"""작업을 일시정지합니다"""
|
||
self.logger.log("작업 일시정지 요청...", level=logging.INFO)
|
||
|
||
# 일시정지 상태 설정
|
||
self.pause()
|
||
|
||
# 디테일 프로그레스바 메시지 업데이트 (시그널 발송)
|
||
self.update_detail_progress_signal.emit(0, 0)
|
||
|
||
# 추가 로직이 필요한 경우 여기에 구현
|
||
|
||
def resume_task(self):
|
||
"""일시정지된 작업을 재개합니다"""
|
||
self.logger.log("작업 재개 요청...", level=logging.INFO)
|
||
|
||
# 재개 상태 설정
|
||
self.resume()
|
||
|
||
# 추가 로직이 필요한 경우 여기에 구현
|
||
|
||
|
||
|
||
async def get_focused_element(self, page):
|
||
"""
|
||
현재 포커스된 엘리먼트 반환 (없으면 None)
|
||
"""
|
||
handle = await self.page.evaluate_handle("document.activeElement")
|
||
# BODY 태그인 경우 의미 없음 → None 처리
|
||
tagname = await handle.evaluate("el => el.tagName")
|
||
if tagname.lower() == "body":
|
||
return None
|
||
return handle
|
||
|
||
async def restore_focus(self, page, handle):
|
||
"""
|
||
전달받은 handle에 다시 focus (handle이 None이면 무시)
|
||
"""
|
||
if handle:
|
||
try:
|
||
await handle.evaluate("el => el.focus()")
|
||
except Exception:
|
||
pass
|
||
|
||
async def random_human_like_mouse_move(self, page, move_area=None):
|
||
"""
|
||
사람처럼 마우스를 랜덤하게 이동시키는 함수.
|
||
- page: playwright의 page 객체
|
||
- move_area: (x1, y1, x2, y2) tuple. 지정하지 않으면 브라우저 전체창 기준
|
||
"""
|
||
# 브라우저 크기 확인
|
||
viewport = page.viewport_size or {"width": 1280, "height": 720}
|
||
x1, y1 = 0, 0
|
||
x2, y2 = viewport["width"], viewport["height"]
|
||
if move_area:
|
||
x1, y1, x2, y2 = move_area
|
||
# 랜덤 출발점
|
||
sx = random.randint(x1, x2-1)
|
||
sy = random.randint(y1, y2-1)
|
||
ex = random.randint(x1, x2-1)
|
||
ey = random.randint(y1, y2-1)
|
||
|
||
# 점차적으로 이동 (5~20 step)
|
||
steps = random.randint(8, 18)
|
||
await self.page.mouse.move(sx, sy)
|
||
await asyncio.sleep(random.uniform(0.1, 0.3))
|
||
for i in range(steps):
|
||
nx = int(sx + (ex-sx)*i/steps + random.uniform(-2, 2))
|
||
ny = int(sy + (ey-sy)*i/steps + random.uniform(-2, 2))
|
||
await self.page.mouse.move(nx, ny)
|
||
await asyncio.sleep(random.uniform(0.015, 0.04))
|
||
# 마지막 위치
|
||
await self.page.mouse.move(ex, ey)
|
||
await asyncio.sleep(random.uniform(0.1, 0.4))
|
||
|
||
async def random_human_like_keyboard_input(self, page):
|
||
"""
|
||
사람처럼 무의미한 키를 랜덤하게 누르는 함수.
|
||
- tab, shift, alt, ctrl, esc 등은 영향 거의 없음
|
||
- 단, 자동화 중간 입력창 포커스 상태는 주의!
|
||
"""
|
||
keys = ["Shift", "Alt", "Control"] # 영향이 적은 키들 (Ctrl -> Control로 변경)
|
||
key = random.choice(keys)
|
||
await self.page.keyboard.down(key)
|
||
await asyncio.sleep(random.uniform(0.05, 0.13))
|
||
await self.page.keyboard.up(key)
|
||
await asyncio.sleep(random.uniform(0.05, 0.13))
|
||
|
||
async def random_human_behavior(self, page):
|
||
"""
|
||
- 60%: 마우스 이동
|
||
- 20%: 키보드 의미 없는 입력
|
||
- 20%: 아무 행동도 안 함 (휴식/멍 때림)
|
||
|
||
포커스 백업/복원 포함한 랜덤 인간 행동
|
||
"""
|
||
# 포커스 백업
|
||
orig_focus = await self.get_focused_element(page)
|
||
|
||
# 행동
|
||
r = random.random()
|
||
if r < 0.6:
|
||
await self.random_human_like_mouse_move(page)
|
||
elif r < 0.8:
|
||
await self. random_human_like_keyboard_input(page)
|
||
else:
|
||
# 아무것도 안하고 잠시 휴식 (랜덤 대기)
|
||
await asyncio.sleep(random.uniform(0.3, 1.3))
|
||
|
||
# 약간의 랜덤 대기 후 포커스 복원
|
||
await asyncio.sleep(random.uniform(0.03, 0.13))
|
||
await self.restore_focus(page, orig_focus)
|
||
|
||
async def save_tab_changes(self, title_infos, tab_name):
|
||
"""
|
||
각 탭에서 변경사항을 저장할 때 사용하는 메서드
|
||
|
||
Args:
|
||
title_infos (dict): 상품 정보 딕셔너리
|
||
tab_name (str): 탭 이름 (옵션, 가격, 썸네일, 태그, 상세페이지)
|
||
|
||
Returns:
|
||
str: 저장 결과 ("SAVED", "DELETED", "DUPLICATE_HANDLED", "ERROR")
|
||
"""
|
||
save_result = await self.handle_save_with_error_recovery(title_infos, tab_name)
|
||
|
||
if save_result in ["SAVED", "DUPLICATE_HANDLED"]:
|
||
self.logger.log(f"[{tab_name}] 탭 저장 완료.", level=logging.INFO)
|
||
elif save_result == "DELETED":
|
||
self.logger.log(f"[{tab_name}] 탭에서 상품 삭제 완료.", level=logging.INFO)
|
||
else:
|
||
self.logger.log(f"[{tab_name}] 탭 저장 실패: {save_result}", level=logging.ERROR)
|
||
|
||
return save_result
|
||
|
||
async def handle_product_deletion_in_loop(self, total_products, items_per_page):
|
||
"""
|
||
상품 삭제 후 처리를 위한 헬퍼 함수
|
||
- 총 상품수 감소
|
||
- 페이지 스크롤 후 상품 버튼 재수집
|
||
|
||
Returns:
|
||
tuple: (updated_total_products, updated_product_buttons)
|
||
"""
|
||
total_products -= 1
|
||
await asyncio.sleep(1)
|
||
await self.scroll_page_to_top()
|
||
|
||
if not self.toggle_states['ed_mode']:
|
||
product_buttons = await self.get_product_edit_buttons_by_template()
|
||
else:
|
||
product_infos, product_name_elements = await self.collect_product_info(items_per_page, ed_mode=self.toggle_states['ed_mode'])
|
||
product_buttons = [{"edit_button": name_element, "memo_button": None, "shipping_button": None} for name_element in product_name_elements]
|
||
|
||
return total_products, product_buttons
|
||
|