AutoPercenty3/limited_browser_control.py

1820 lines
95 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from playwright.async_api import async_playwright, TimeoutError
from PySide6.QtCore import QThread, Signal, QMutex, QWaitCondition, Slot
import re
# import pyautogui
import time
import win32gui, win32con
from bs4 import BeautifulSoup
import asyncio
import os, sys, random
import requests
import json
import gc
import psutil
import shutil
from datetime import datetime, timedelta
import math
from PIL import Image
from io import BytesIO
# from src.wh_search import WhaleSearchParser
# from src.wh_img_trans import WhaleImageTranslator
from src.whale_new import WhaleTranslator
from src.modules.clipboardImageManager import ClipboardImageManager
from src.contents.option import OptionHandler
from src.contents.price import PriceHandler
from src.contents.details import DetailHandler
from src.contents.titleGenerator import TitleGenerator
from src.contents.thumb import ThumbnailHandler
from src.modules.image_processor import ImageProcessor
from src.contents.tags import TagsHandler
from src.titleManager.sp_ForbiddenM import SupabaseForbiddenWordManager
from src.titleManager.gpt_client import GPTClient
from locatorManager_by_SP import LocatorManager
import tempfile
import logging
# 새로운 모듈 import
import string
class BrowserController(QThread):
def __init__(self, app, logger, base_path, login_infos, toggle_states_for_limited, biz_dbManager, sp_user_id, supabase_manager):
super().__init__()
self.app = app # AutoPercentyGUI 객체 저장
self.logger = logger
self.base_path = base_path
self.login_infos = login_infos
self.biz_dbManager = biz_dbManager
self.sp_user_id = sp_user_id
self.supabase_manager = supabase_manager
self.toggle_states_for_limited = toggle_states_for_limited
self.parsing_page = None
self.chrome_hwnd = None
# 2. 에러 스크린샷 폴더 (장기 보관)
self.ERROR_SCREENSHOT_DIR = os.path.join(self.base_path, 'error_screenshots')
os.makedirs(self.ERROR_SCREENSHOT_DIR, exist_ok=True)
self.logger.log(f"error 디렉토리 생성: {self.ERROR_SCREENSHOT_DIR}", level=logging.INFO)
self.clear_old_screenshot_dirs(days=14)
self.logger.log(f"error 디렉토리 삭제 완료", level=logging.INFO)
self.playwright = None
self.browser = None
self.page = None
self.loop = None # 이벤트 루프를 저장할 변수
self.gpt_client = GPTClient(logger=self.logger)
self.whale_translator = None
self.forbidden_word_manager =None
self.locator_manager = LocatorManager(self.supabase_manager, self.logger)
self.titleGenerator = TitleGenerator(self.locator_manager, self, self.logger, self.whale_translator, self.toggle_states_for_limited, self.gpt_client, self.forbidden_word_manager, self.sp_user_id, self.supabase_manager)
# BrowserController에 해당하는 모든 locator를 정의
self.chrome_window_name = self.locator_manager.get_locator('BrowserControl', 'chrome_window_name')
self.login_email_locator = self.locator_manager.get_locator('BrowserControl', 'login_email_locator')
self.login_password_locator = self.locator_manager.get_locator('BrowserControl', 'login_password_locator')
self.login_button_locator = self.locator_manager.get_locator('BrowserControl', 'login_button_locator')
self.admin_toggle_locator = self.locator_manager.get_locator('BrowserControl', 'admin_toggle_locator')
self.staff_id_locator = self.locator_manager.get_locator('BrowserControl', 'staff_id_locator')
self.staff_login_button_locator = self.locator_manager.get_locator('BrowserControl', 'staff_login_button_locator')
self.close_ad_dialog_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_dialog_locator')
self.close_ad_button_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_button_locator')
self.total_product_count_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_locator')
self.total_product_count_for_registed_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_for_registed_locator')
self.items_per_page_locator = self.locator_manager.get_locator('BrowserControl', 'items_per_page_locator')
self.product_edit_dialog_open_locator = self.locator_manager.get_locator('BrowserControl', 'product_edit_dialog_open_locator')
self.product_dialog_close_btn = self.locator_manager.get_locator('BrowserControl', 'product_dialog_close_btn')
self.product_parent_locator= self.locator_manager.get_locator('BrowserControl', 'product_parent_locator')
self.product_name_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_name_inner_locator')
self.product_price_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_price_inner_locator')
self.product_image_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_image_inner_locator')
self.product_name_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_name_for_ed_template')
self.product_price_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_price_for_ed_template')
self.product_image_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_image_for_ed_template')
self.current_page = self.locator_manager.get_locator('BrowserControl', 'current_page')
self.next_page_button_template = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template')
self.new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator')
self.registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
self.current_page_locator = self.locator_manager.get_locator('BrowserControl', 'current_page_locator')
self.option_input_field_locator = self.locator_manager.get_locator('BrowserControl', 'option_input_field_locator')
self.title_tab_locator = self.locator_manager.get_locator('BrowserControl', 'title_tab_locator')
self.option_tab_locator = self.locator_manager.get_locator('BrowserControl', 'option_tab_locator')
self.price_tab_locator = self.locator_manager.get_locator('BrowserControl', 'price_tab_locator')
self.tag_tab_locator = self.locator_manager.get_locator('BrowserControl', 'tag_tab_locator')
self.thumb_tab_locator = self.locator_manager.get_locator('BrowserControl', 'thumb_tab_locator')
self.detail_tab_locator = self.locator_manager.get_locator('BrowserControl', 'detail_tab_locator')
# self.upload_tab_locator = self.locator_manager.get_locator('BrowserControl', 'upload_tab_locator')
self.save_button_locator = self.locator_manager.get_locator('BrowserControl', 'save_button_locator')
self.group_dropdown_locator = self.locator_manager.get_locator('BrowserControl', 'group_dropdown_locator')
self.group_index_template = self.locator_manager.get_locator('BrowserControl', 'group_index_template')
self.group_options_selector_locator = self.locator_manager.get_locator('BrowserControl', 'group_options_selector_locator')
self.dropdown_openstatus_locator = self.locator_manager.get_locator('BrowserControl', 'dropdown_openstatus_locator')
self.selected_group_name_locator = self.locator_manager.get_locator('BrowserControl', 'selected_group_name_locator')
self.product_edit_buttons = self.locator_manager.get_locator('BrowserControl', 'product_edit_buttons')
self.product_memo_buttons = self.locator_manager.get_locator('BrowserControl', 'product_memo_buttons')
self.memo_input_locator = self.locator_manager.get_locator('BrowserControl', 'memo_input_locator')
self.memo_exposer_locator = self.locator_manager.get_locator('BrowserControl', 'memo_exposer_locator')
self.memo_save_btn_locator = self.locator_manager.get_locator('BrowserControl', 'memo_save_btn_locator')
# self.text_templates = self.locator_manager.selectors.get('DetailPageTextTemplates', {})
# # 스레드 종료 시 close_whale_window_if_exists 호출
# self.finished.connect(self.cleanup)
def get_page(self):
return self.page
def get_parsing_page(self):
return self.parsing_page
def get_whale(self):
return self.whale_translator
def generate_random_suffix(self, length=4):
"""랜덤한 영문+숫자 조합 문자열 생성"""
chars = string.ascii_uppercase + string.digits
return ''.join(random.choices(chars, k=length))
async def start_browser_async(self):
"""비동기 Playwright 초기화 및 로그인 수행"""
try:
try:
self.logger.log('알바생 브라우저를 실행합니다...', level=logging.DEBUG)
debug_mode = True
# Playwright 시작 및 브라우저 실행
self.playwright = await async_playwright().start()
browser_path = os.path.join(self.base_path, 'src', 'browsers', 'chromium-1140', 'chrome-win','chrome.exe')
user_data_dir = os.path.join(self.base_path, 'src', 'browsers', 'user_data')
self.logger.log(f"브라우저 경로: {browser_path}", level=logging.DEBUG)
self.logger.log(f"사용자 폴더 경로: {user_data_dir}", level=logging.DEBUG)
if not os.path.exists(browser_path):
self.logger.log(f"브라우저 실행 파일이 없습니다: {browser_path}", level=logging.DEBUG)
raise FileNotFoundError(f"브라우저 실행 파일이 없습니다: {browser_path}")
# 사용자 데이터 디렉토리가 존재하지 않으면 생성
if not os.path.exists(user_data_dir):
os.makedirs(user_data_dir)
self.logger.log(f"{user_data_dir} 디렉토리가 생성되었습니다.", level=logging.DEBUG)
# User agent 설정
import random
user_agent = random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
])
self.logger.log(f"user_agent: {user_agent}", level=logging.DEBUG)
# 브라우저 시작 및 설정
self.browser = await self.playwright.chromium.launch_persistent_context(
user_data_dir,
headless=not debug_mode,
permissions=["geolocation", "notifications"],
geolocation={"latitude": 37.5665, "longitude": 126.9780},
locale="ko-KR",
args=[
'--disable-popup-blocking',
'--start-maximized',
'--window-size=1920,1080'
],
executable_path=browser_path,
user_agent=user_agent,
)
# 기본 페이지가 없을 수 있으므로 새로운 페이지 생성
self.page_for_upload = await self.browser.new_page()
self.logger.log('새 페이지 로딩 중...', level=logging.INFO)
await self.page_for_upload.goto('https://percenty.co.kr/signin')
self.logger.log('percenty.co.kr/signin 로딩 완료', level=logging.INFO)
# 첫 번째 기본 탭 닫기
if self.browser.pages:
await self.browser.pages[0].close()
# 페이지 제목을 가져와서 창 제목으로 활용
page_title = await self.page_for_upload.title()
self.logger.log(f'페이지 제목: {page_title}', level=logging.DEBUG)
except Exception as e:
self.logger.log(f"브라우저 객체 생성 오류: {str(e)}", level=logging.ERROR, exc_info=True)
return
try:
is_login_success = await self.login(self.page_for_upload)
if not is_login_success:
self.logger.log("로그인 실패", level=logging.ERROR)
return
except Exception as e:
self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR, exc_info=True)
return
# 기본 페이지가 없을 수 있으므로 새로운 페이지 생성
self.page_for_market = await self.browser.new_page()
self.logger.log('새 페이지 로딩 중...', level=logging.INFO)
await self.page_for_market.goto('https://percenty.co.kr')
self.logger.log('percenty.co.kr 로딩 완료', level=logging.INFO)
try:
await self.close_ant_modal_dialogs(self.page_for_upload)
except Exception as e:
self.logger.log(f"광고 닫기 오류: {str(e)}", level=logging.ERROR, exc_info=True)
return
self.logger.log('등록 상품 관리 페이지로 이동 중...', level=logging.INFO)
try:
registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator')
await self.page_for_upload.click(registered_product_page_locator)
self.logger.log("등록 상품 관리 페이지로 이동 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"등록 상품 관리 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
return
self.logger.log('마켓관리리 페이지로 이동 중...', level=logging.INFO)
# await self.go_to_new_product_page()
try:
market_management_page_locator = self.locator_manager.get_locator('BrowserControl', 'market_management_page_locator')
await self.page_for_market.click(market_management_page_locator)
await self.close_ad_if_exists_with_ESC_Key(self.page_for_market)
await self.close_ant_modal_dialogs(self.page_for_market)
self.logger.log("마켓관리리 페이지로 이동 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"마켓관리리 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
return
try:
# 각 핸들러에 초기화된 page 객체 전달.
self.titleGenerator.update_page(self.page_for_upload)
except Exception as e:
self.logger.log(f"핸들러 업데이트 오류: {str(e)}", level=logging.ERROR, exc_info=True)
return
except Exception as e:
self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR, exc_info=True)
return
async def login(self, page) -> bool:
"""로그인 처리: menu 또는 dialog 요소만 기다립니다."""
is_admin = True
who = "관리자" if is_admin else "직원"
self.logger.log(f'로그인 시도 중: {who} 계정', level=logging.INFO)
# 필수 정보 누락 체크
required = ['admin_id', 'admin_pw'] if is_admin else ['admin_id', 'user_id', 'user_pw']
missing = [k for k in required if not self.login_infos.get(k)]
if missing:
msg = f'로그인 정보 누락: {", ".join(missing)}'
self.logger.log(msg, level=logging.ERROR)
return False
try:
# 1) 자격증명 입력 & 로그인 클릭
if is_admin:
await page.fill(self.login_email_locator, self.login_infos['admin_id'])
await page.fill(self.login_password_locator, self.login_infos['admin_pw'])
await page.click(self.login_button_locator)
else:
admin_toggle = page.locator(self.admin_toggle_locator)
if await admin_toggle.get_attribute("aria-checked") == "true":
await admin_toggle.click()
await page.fill(self.login_email_locator, self.login_infos['admin_id'])
await page.fill(self.staff_id_locator, self.login_infos['user_id'])
await page.fill(self.login_password_locator, self.login_infos['user_pw'])
await page.click(self.staff_login_button_locator)
# 2) 로그인 성공 요소만 기다리기 (5초)
try:
locator = await page.wait_for_selector(
'ul[role="menu"], [role="dialog"]',
timeout=5000
)
role = await locator.get_attribute("role")
if role == "menu":
self.logger.log(f'로그인 성공: {who} 계정 (menu 발견)', level=logging.INFO)
else:
self.logger.log(f'로그인 성공: {who} 계정 (dialog 발견)', level=logging.INFO)
return True
except Exception:
# menu/dialog 둘 다 못 찾은 경우
msg = f'로그인 실패: {who} 계정 (menu/dialog 미발견)'
self.logger.log(msg, level=logging.ERROR)
await self.save_error_screenshot(f'login_failed_{who}')
return False
except Exception as e:
# 클릭/입력 중 예외
msg = f'로그인 처리 중 오류: {who} 계정 {e}'
self.logger.log(msg, level=logging.ERROR, exc_info=True)
await self.save_error_screenshot(f'login_exception_{who}')
return False
async def close_browser(self):
"""브라우저 종료"""
if self.browser:
await self.browser.close()
await self.playwright.stop()
self.cleanup()
self.logger.log('브라우저 종료됨.', level=logging.INFO)
async def get_total_product_count(self):
total_count = 0
items_per_page = 0
try:
# total_count_elements = await self.page.query_selector_all(".sc-dOvA-dm.jqRNYf")
total_count_element = await self.page.query_selector(self.total_product_count_locator)
items_per_page_element = await self.page.query_selector(self.items_per_page_locator)
self.logger.log(f"total_count_element : {total_count_element}", level=logging.DEBUG)
if total_count_element:
total_count_text = await total_count_element.inner_text()
if "" in total_count_text and "개 상품" in total_count_text:
total_count = int(''.join(re.findall(r'\d+', total_count_text)))
self.logger.log(f"총 상품수 확인: {total_count}", level=logging.INFO)
# 페이지당 상품 수 추출
if items_per_page_element:
items_per_page_text = await items_per_page_element.get_attribute("title")
if items_per_page_text and "개씩 보기" in items_per_page_text:
items_per_page = int(''.join(re.findall(r'\d+', items_per_page_text)))
self.logger.log(f"페이지당 상품수 확인: {items_per_page} 개씩 보기", level=logging.INFO)
# 결과 반환
if total_count:
return {"total_count": total_count, "items_per_page": items_per_page}
return {"total_count": 0, "items_per_page": 0}
except Exception as e:
self.logger.log(f"상품 수를 가져오는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return {"total_count": 0, "items_per_page": 0}
async def close_ant_modal_dialogs(self, page, max_loops=3):
"""
antd 모달 다이얼로그에서 '다시 보지 않기', '닫기' 버튼을 최대 max_loops번 반복 클릭합니다.
self.page, self.logger 사용
"""
total_closed = 0
for loop in range(max_loops):
closed_this_round = 0
# 다이얼로그 생성될 때까지 3초간 대기 (한 번만)
try:
await page.wait_for_selector('.ant-modal-root', timeout=3000)
except Exception:
self.logger.log("3초 대기 내에 .ant-modal-root 다이얼로그 미등장", level=logging.WARNING)
# '다시 보지 않기' 버튼 (span 텍스트 포함)
dont_show_btns = page.locator(
'.ant-modal-root button span:text("다시 보지 않기")'
).locator('..') # span의 부모 button
count1 = await dont_show_btns.count()
for i in range(count1):
await dont_show_btns.nth(i).click()
closed_this_round += 1
self.logger.log(f'"다시 보지 않기" 버튼 클릭 (loop {loop+1}, {i+1}/{count1})', level=logging.INFO)
await page.wait_for_timeout(200)
# '닫기' 버튼 (span 텍스트 포함)
close_btns = page.locator(
'.ant-modal-root button span:text("닫기")'
).locator('..')
count2 = await close_btns.count()
for i in range(count2):
await close_btns.nth(i).click()
closed_this_round += 1
self.logger.log(f'"닫기" 버튼 클릭 (loop {loop+1}, {i+1}/{count2})', level=logging.INFO)
await page.wait_for_timeout(200)
total_closed += closed_this_round
if closed_this_round == 0:
break
await page.wait_for_timeout(300)
self.logger.log(f"{total_closed}개 다이얼로그 버튼 클릭 완료", level=logging.INFO)
return total_closed > 0
async def close_ad_if_exists_with_ESC_Key(self, page):
"""ESC 키를 두 번 전송하여 다이얼로그를 닫는 메서드"""
try:
# JavaScript로 ESC 키 이벤트 발생
await page.evaluate("""
(() => {
const escEvent = new KeyboardEvent('keydown', {
key: 'Escape',
code: 'Escape',
keyCode: 27,
which: 27,
bubbles: true,
cancelable: true
});
document.dispatchEvent(escEvent);
return true;
})()
""")
time.sleep(1)
await page.evaluate("""
(() => {
const escEvent = new KeyboardEvent('keydown', {
key: 'Escape',
code: 'Escape',
keyCode: 27,
which: 27,
bubbles: true,
cancelable: true
});
document.dispatchEvent(escEvent);
return true;
})()
""")
self.logger.log("ESC 키를 전송하여 다이얼로그를 닫았습니다.", level=logging.INFO)
except Exception as e:
self.logger.log(f"ESC 키 전송 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def close_ad_if_exists_new(self):
"""
다이얼로그가 존재하면 해당 다이얼로그 내부의 "닫기" 버튼을 클릭하고,
다이얼로그를 찾지 못하거나 내부에 닫기 버튼이 없으면 페이지 전체에서 "닫기" 버튼을 검색하여 클릭하는 메서드
"""
# 새로운 다이얼로그의 닫기(X) 버튼 먼저 시도
self.logger.log("새로운 다이얼로그의 닫기(X) 버튼 먼저 시도", level=logging.INFO)
# 1. 모달 다이얼로그 처리 (ant-modal-wrap)
try:
# 모달 처리 JavaScript 코드
modal_js = """
() => {
// 모달 찾기
const modalWraps = document.querySelectorAll('.ant-modal-wrap.ant-modal-centered');
if (modalWraps.length === 0) return 0;
console.log(`${modalWraps.length}개의 모달 발견`);
let closed = 0;
// 각 모달에 대해
for (const modal of modalWraps) {
try {
// 1. 닫기 버튼 찾기
const closeBtn = modal.querySelector('.ant-modal-close');
if (closeBtn) {
closeBtn.click();
console.log('모달 닫기 버튼 클릭');
closed++;
continue;
}
// 2. '닫기' 텍스트 버튼 찾기
const buttons = modal.querySelectorAll('button');
let found = false;
for (const btn of buttons) {
if (btn.textContent.includes('닫기')) {
btn.click();
console.log('닫기 텍스트 버튼 클릭');
closed++;
found = true;
break;
}
}
if (found) continue;
// 3. 마지막 수단: 모달 숨기기
modal.style.display = 'none';
const mask = document.querySelector('.ant-modal-mask');
if (mask) mask.style.display = 'none';
console.log('모달 강제 숨김');
closed++;
} catch (e) {
console.error('모달 처리 오류:', e);
}
}
return closed;
}
"""
modal_count = await self.page.evaluate(modal_js)
if modal_count > 0:
self.logger.log(f"{modal_count}개의 모달 다이얼로그 닫기 처리됨", level=logging.INFO)
await self.page.wait_for_timeout(1000) # 모달이 사라질 때까지 대기
return True
except Exception as e:
self.logger.log(f"모달 다이얼로그 처리 중 오류: {e}", level=logging.INFO)
# 2. drawer-close 버튼 처리
try:
drawer_js = """
() => {
const drawerCloseButtons = Array.from(document.querySelectorAll('button.ant-drawer-close'));
let count = 0;
drawerCloseButtons.forEach(button => {
try {
button.click();
console.log('drawer-close 버튼 클릭');
count++;
} catch (e) {
console.error('drawer-close 클릭 오류:', e);
}
});
return count;
}
"""
drawer_count = await self.page.evaluate(drawer_js)
if drawer_count > 0:
self.logger.log(f"{drawer_count}개의 drawer-close 버튼 클릭됨", level=logging.INFO)
await self.page.wait_for_timeout(1000)
return True
except Exception as e:
self.logger.log(f"drawer-close 버튼 처리 중 오류: {e}", level=logging.INFO)
# 3. 배경 클릭 시도
try:
backdrop_js = """
() => {
const backdropSelectors = [
'.ant-modal-mask',
'.ant-drawer-mask',
'.modal-backdrop',
'.dialog-backdrop',
'[role="dialog"] + div',
'.overlay',
'.modal-overlay'
];
let clicked = 0;
for (const selector of backdropSelectors) {
const backdrops = document.querySelectorAll(selector);
for (const backdrop of backdrops) {
try {
backdrop.click();
console.log(`${selector} 배경 클릭`);
clicked++;
} catch (e) {
console.error(`${selector} 클릭 오류:`, e);
}
}
}
return clicked;
}
"""
backdrop_count = await self.page.evaluate(backdrop_js)
if backdrop_count > 0:
self.logger.log(f"{backdrop_count}개의 배경 요소 클릭됨", level=logging.INFO)
await self.page.wait_for_timeout(1000)
return True
except Exception as e:
self.logger.log(f"배경 클릭 처리 중 오류: {e}", level=logging.INFO)
# 4. ESC 키 시도
try:
await self.page.keyboard.down("Escape")
await self.page.wait_for_timeout(100)
await self.page.keyboard.up("Escape")
self.logger.log("ESC 키 전송 (down-up 방식)", level=logging.INFO)
await self.page.wait_for_timeout(1000)
except Exception as e:
self.logger.log(f"ESC 키 전송 중 오류: {e}", level=logging.INFO)
# 5. 기존 다이얼로그 처리 (이전 코드)
try:
# 다이얼로그가 나타날 때까지 최대 3초 대기
dialog_element = await self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=3000, state='visible')
self.logger.log("다이얼로그가 발견되었습니다. 내부의 닫기 버튼을 찾습니다.", level=logging.INFO)
if dialog_element:
# 다이얼로그 내부에서 "닫기" 텍스트를 가진 버튼을 xpath로 찾음
close_button = await dialog_element.query_selector("xpath=.//button[span[text()='닫기']]")
if close_button:
await close_button.click()
self.logger.log("다이얼로그 내부의 닫기 버튼을 클릭하여 닫았습니다.", level=logging.INFO)
return # 닫기 성공 시 함수 종료
else:
self.logger.log("다이얼로그 내부에서 닫기 버튼을 찾지 못했습니다.", level=logging.WARNING)
else:
self.logger.log("대기 후에도 다이얼로그 엘리먼트를 찾지 못했습니다.", level=logging.WARNING)
try:
await self.page.keyboard.press("Escape")
await self.page.wait_for_timeout(100)
await self.page.dispatch_event("body", "keydown", {
"key": "Escape",
"code": "Escape",
"keyCode": 27,
"which": 27
})
await self.page.wait_for_timeout(50)
await self.page.dispatch_event("body", "keyup", {
"key": "Escape",
"code": "Escape",
"keyCode": 27,
"which": 27
})
await self.page.wait_for_timeout(100)
self.logger.log("ECS를 전송하여 닫았습니다.", level=logging.INFO)
return
except Exception as e:
self.logger.log(f"닫기 중 오류: {e}", level=logging.ERROR, exc_info=True)
except TimeoutError:
self.logger.log("다이얼로그가 나타나지 않았습니다.", level=logging.INFO)
except Exception as e:
self.logger.log(f"다이얼로그 닫기 시도 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
async def is_button_disabled(self, button):
"""버튼이 disabled 상태인지 확인"""
try:
# 버튼의 disabled 속성 확인
is_disabled = await button.get_attribute('disabled')
return is_disabled is not None # disabled 속성이 있으면 True 반환
except Exception as e:
self.logger.log(f"상품 수정 버튼 상태 확인 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return False # 오류 발생 시 기본적으로 활성화된 것으로 처리
async def select_group_index(self, group_index: int):
"""그룹 드롭다운 열고 옵션 선택"""
try:
self.logger.log(f"group_index : {group_index}", level=logging.INFO)
await self.page.evaluate("""
const targetElement = Array.from(document.querySelectorAll('span'))
.find(el => el.textContent.trim() === '수집 상품 목록');
if (targetElement) {
targetElement.scrollIntoView({ behavior: 'smooth', block: 'center' });
}
""")
self.logger.log("그룹박스 요소로 스크롤", level=logging.INFO)
await asyncio.sleep(1)
group_option_locator = self.group_index_template.format(index=group_index+1)
# 드롭다운 열기
await self.page.wait_for_selector(self.group_dropdown_locator, timeout=3000, state='visible')
await self.page.click(self.group_dropdown_locator, timeout=3000, force=True)
self.logger.log("드롭다운을 성공적으로 클릭했습니다.", level=logging.INFO)
# 드롭다운 열림 상태 확인
await self.page.wait_for_selector(self.dropdown_openstatus_locator, timeout=3000)
self.logger.log("드롭다운이 열렸습니다.", level=logging.INFO)
# 옵션 선택
self.logger.log(f"[{group_option_locator}] : group_option_locator", level=logging.INFO)
await self.page.click(group_option_locator, timeout=3000)
self.logger.log(f"[{group_index}]번 그룹 선택 완료", level=logging.INFO)
selected_group_name = await self.page.inner_text(self.selected_group_name_locator)
self.logger.log(f"선택된 그룹 이름 : [{selected_group_name}]", level=logging.INFO)
# 총 상품 개수 가져오기
product_count_info = await self.get_total_product_count()
total_products = product_count_info.get("total_count", 0)
items_per_page = product_count_info.get("items_per_page", 0)
self.logger.log(f"총 상품 개수 : [{total_products}]", level=logging.INFO)
except TimeoutError:
self.logger.log("드롭다운 또는 옵션 선택 중 타임아웃이 발생했습니다.", level=logging.WARNING)
screenshot_path = await self.save_error_screenshot("group_select_timeout")
self.logger.log(f"그룹 선택 중 타임아웃이 발생했습니다. 스크린샷 저장됨: {screenshot_path}", level=logging.WARNING)
except Exception as e:
self.logger.log(f"그룹 선택 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
async def get_group_names_list(self):
"""그룹 드롭다운을 열고 모든 그룹 이름 목록을 가져오기"""
try:
self.logger.log("그룹 이름 목록 가져오기 시작", level=logging.INFO)
# 수집 상품 목록 요소로 스크롤
await self.page.evaluate("""
const targetElement = Array.from(document.querySelectorAll('span'))
.find(el => el.textContent.trim() === '수집 상품 목록');
if (targetElement) {
targetElement.scrollIntoView({ behavior: 'smooth', block: 'center' });
}
""")
self.logger.log("그룹박스 요소로 스크롤", level=logging.INFO)
await asyncio.sleep(1)
# 드롭다운 열기
await self.page.wait_for_selector(self.group_dropdown_locator, timeout=3000, state='visible')
await self.page.click(self.group_dropdown_locator, timeout=3000, force=True)
self.logger.log("드롭다운을 성공적으로 클릭했습니다.", level=logging.INFO)
# 드롭다운 열림 상태 확인
await self.page.wait_for_selector(self.dropdown_openstatus_locator, timeout=3000)
self.logger.log("드롭다운이 열렸습니다.", level=logging.INFO)
# rc-virtual-list가 로딩될 때까지 잠시 대기
await asyncio.sleep(0.5)
# 그룹 옵션들이 로딩될 때까지 대기
await self.page.wait_for_selector(self.group_options_selector_locator, timeout=3000)
# 모든 그룹 이름 텍스트 가져오기
group_names = await self.page.evaluate(f"""
() => {{
const elements = document.querySelectorAll('{self.group_options_selector_locator}');
return Array.from(elements).map(el => el.textContent.trim());
}}
""")
self.logger.log(f"발견된 그룹 목록: {group_names}", level=logging.INFO)
# 드롭다운 닫기 (ESC 키 또는 다른 곳 클릭)
await self.page.keyboard.press("Escape")
self.logger.log("드롭다운 닫기 완료", level=logging.INFO)
return group_names
except TimeoutError:
self.logger.log("그룹 목록 가져오기 중 타임아웃이 발생했습니다.", level=logging.WARNING)
# 드롭다운이 열려있을 수 있으므로 ESC로 닫기 시도
save_screenshot_path = await self.save_error_screenshot("group_list_timeout")
self.logger.log(f"self.group_options_selector_locator: {self.group_options_selector_locator}", level=logging.WARNING)
self.logger.log(f"그룹 목록 가져오기 중 타임아웃이 발생했습니다. 스크린샷 저장됨: {save_screenshot_path}", level=logging.WARNING)
try:
await self.page.keyboard.press("Escape")
except:
pass
return []
except Exception as e:
self.logger.log(f"그룹 목록 가져오기 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
# 드롭다운이 열려있을 수 있으므로 ESC로 닫기 시도
try:
await self.page.keyboard.press("Escape")
except:
pass
return []
async def get_product_edit_buttons_by_template(self):
"""현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기"""
# 버튼 선택자 설정
# buttons = self.page.locator(self.product_edit_button_template)
# memos = self.page.locator(self.product_memo_button_template)
"""
현재 페이지의 세부사항 수정 및 업로드 버튼과 메모 버튼을 매칭하여 반환.
각 상품에 대해 해외배송비 수정 버튼과 메모 버튼을 추출합니다.
"""
try:
# 수정 버튼 선택자 설정
# edit_buttons = self.page.locator('//button[span[text()="세부사항 수정 및 업로드"]]')
# memo_buttons = self.page.locator('button.ant-btn.css-1li46mu.ant-btn-default.ant-btn-icon-only')
edit_buttons = self.page.locator(self.product_edit_buttons)
memo_buttons = self.page.locator(self.product_memo_buttons)
# 수정 버튼 개수 확인
edit_button_count = await edit_buttons.count()
if edit_button_count == 0:
self.logger.log("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.", level=logging.WARNING)
return []
# 메모 버튼 개수 확인
memo_button_count = await memo_buttons.count()
if memo_button_count <= 2: # 첫 번째 버튼과 최소 상품 메모 버튼 제외
self.logger.log("메모 버튼이 유효하지 않습니다. 최소 3개 이상의 버튼에서 메모버튼이 유효합니다.", level=logging.WARNING)
return []
# 첫 번째 버튼 제외하고 나머지 버튼을 처리
valid_memo_buttons = [memo_buttons.nth(i) for i in range(1, memo_button_count)]
# 상품별 해외배송비 수정 버튼과 메모 버튼 매칭
product_buttons = []
index = 0 # valid_memo_buttons에서의 현재 인덱스
for i in range(edit_button_count):
if index + 1 >= len(valid_memo_buttons):
self.logger.log("메모 버튼의 개수가 부족합니다. 매칭에 실패했습니다.", level=logging.WARNING)
break
# 상품별 버튼 매칭
product_buttons.append({
"edit_button": edit_buttons.nth(i),
"shipping_button": valid_memo_buttons[index], # 해외배송비 수정 버튼
"memo_button": valid_memo_buttons[index + 1] # 메모 버튼
})
index += 2 # 한 상품에 대해 두 개의 버튼(해외배송비 + 메모 버튼)을 소모
# 최종 결과 로그
self.logger.log(f"현재 페이지의 수정할 상품 개수: {len(product_buttons)}", level=logging.INFO)
return product_buttons
except Exception as e:
self.logger.log(f"상품 수정 버튼을 찾는 중 오류: {e}", level=logging.ERROR, exc_info=True)
return []
async def get_product_edit_buttons_by_template_new(self):
"""
템플릿 방식의 CSS 선택자를 이용해 현재 페이지의 수정 및 메모 버튼을 동적으로 찾습니다.
"""
product_buttons = []
# 예시로, 상품 항목들이 li.product-item 형태로 존재한다고 가정합니다.
product_items = self.page.locator("div#root li.product-item")
count = await product_items.count()
self.logger.log(f"현재 페이지에 {count}개의 상품 항목을 발견했습니다.", level=logging.INFO)
for i in range(1, count + 1):
edit_selector = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template').format(index=i)
memo_selector = self.locator_manager.get_locator('BrowserControl', 'product_memo_button_template').format(index=i)
edit_button = self.page.locator(edit_selector)
memo_button = self.page.locator(memo_selector)
if await edit_button.count() > 0:
product_buttons.append({
"edit_button": edit_button,
"memo_button": memo_button
})
else:
self.logger.log(f"상품 인덱스 {i}에 대해 수정 버튼을 찾지 못했습니다.", level=logging.WARNING)
return product_buttons
async def open_product_edit_dialog(self, button):
"""상품 수정 다이얼로그 열기"""
try:
# 요소가 화면에 없을 경우 스크롤하여 보이도록 함
await button.scroll_into_view_if_needed()
self.logger.log("상품의 '세부사항 수정 및 업로드' 버튼을 화면에 보이도록 스크롤.", level=logging.DEBUG)
await self.page.keyboard.press("Escape")
self.logger.log("이전 다이얼로그 닫기 완료.", level=logging.INFO)
await button.click()
self.logger.log("세부사항 수정 다이얼로그 열기 완료.", level=logging.INFO)
# await self.page.wait_for_selector('div.ant-tabs-nav') # 다이얼로그가 완전히 로딩될 때까지 기다림
await self.page.wait_for_selector(self.product_edit_dialog_open_locator) # 다이얼로그가 완전히 로딩될 때까지 기다림
except Exception as e:
self.logger.log(f"세부사항 수정 다이얼로그 열기 중 오류: {e}", level=logging.ERROR, exc_info=True)
# 에러 발생 시 스크린샷 저장
screenshot_path = await self.save_error_screenshot()
self.logger.log(f"세부사항 수정 다이얼로그 열기 중 오류 발생으로 스크린샷 저장됨: {screenshot_path}", level=logging.INFO)
async def save_error_screenshot(self, error_tag="error"):
"""에러 발생시 에러스크린샷 저장"""
today = datetime.now().strftime('%Y%m%d')
date_dir = os.path.join(self.ERROR_SCREENSHOT_DIR, today)
os.makedirs(date_dir, exist_ok=True)
timestamp = int(time.time())
filename = f"screenshot_{error_tag}_{timestamp}.png"
path = os.path.join(date_dir, filename)
await self.page.screenshot(path=path)
return path
def clear_old_screenshot_dirs(self, days=14):
"""에러스크린샷 디렉토리에서 지정일(기본 14일) 지난 하위폴더 삭제"""
now = datetime.now()
for dir_name in os.listdir(self.ERROR_SCREENSHOT_DIR):
dir_path = os.path.join(self.ERROR_SCREENSHOT_DIR, dir_name)
if os.path.isdir(dir_path):
try:
# 폴더명 YYYYMMDD로 파싱
dir_date = datetime.strptime(dir_name, '%Y%m%d')
if now - dir_date > timedelta(days=days):
# 오래된 폴더 삭제
import shutil
shutil.rmtree(dir_path)
self.logger.log(f"[스크린샷] {dir_path} 삭제됨", level=logging.INFO)
except Exception as e:
self.logger.log(f"[스크린샷] {dir_path} 삭제 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def click_title_tab(self):
"""상품명 탭 클릭"""
try:
await self.page.click(self.title_tab_locator)
self.logger.log("상품명 탭 클릭 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"상품명 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def save_and_ecs_product_edit(self):
"""상품 수정 후 저장 버튼 클릭 및 ESC 전송, 그리고 다이얼로그 닫힘 버튼 클릭을 통한 다이얼로그 종료 처리"""
try:
# 저장 버튼 클릭
await self.page.click(self.save_button_locator)
self.logger.log("저장 버튼 클릭 완료.", level=logging.INFO)
# 저장 동작 완료를 위한 잠시 대기
await asyncio.sleep(0.5)
# ESC 키를 최대 3회 전송하여 다이얼로그가 닫혔는지 확인
max_attempts = 3
for attempt in range(max_attempts):
await self.page.keyboard.press("Escape")
self.logger.log(f"ESC 키 전송 시도 {attempt+1}/{max_attempts}", level=logging.DEBUG)
await asyncio.sleep(0.5)
if not await self.page.is_visible(self.save_button_locator):
self.logger.log("상품 수정 다이얼로그가 성공적으로 닫혔습니다.", level=logging.INFO)
break
else:
self.logger.log("상품 수정 다이얼로그가 여전히 열려있습니다.", level=logging.WARNING)
else:
# ESC 전송 후에도 다이얼로그가 닫히지 않은 경우, product_dialog_close_btn을 클릭
self.logger.log("ESC 시도 후에도 다이얼로그가 닫히지 않아, 닫힘 버튼 클릭 시도합니다.", level=logging.INFO)
if await self.page.is_visible(self.product_dialog_close_btn):
await self.page.click(self.product_dialog_close_btn)
self.logger.log("닫힘 버튼 클릭 완료.", level=logging.INFO)
else:
self.logger.log("닫힘 버튼을 찾지 못했습니다.", level=logging.ERROR)
except Exception as e:
self.logger.log(f"저장 및 다이얼로그 종료 처리 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
# 에러 발생 시 스크린샷 저장
screenshot_path = await self.save_error_screenshot()
self.logger.log(f"저장 및 다이얼로그 종료 처리 중 에러 발생 으로 스크린샷 저장됨: {screenshot_path}", level=logging.INFO)
async def save_product_edit(self):
"""상품 수정 후 저장 버튼 클릭"""
try:
await self.page.click(self.save_button_locator)
self.logger.log("상품 수정 내용 저장 완료.", level=logging.INFO)
except Exception as e:
self.logger.log(f"저장 버튼 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def go_to_next_page(self):
"""
수정된 CSS 선택자를 사용하여 다음 페이지로 이동합니다.
"""
try:
current_page_element = await self.page.query_selector(self.current_page_locator)
if not current_page_element:
self.logger.log("현재 페이지 정보를 찾을 수 없습니다.", level=logging.WARNING)
return False
current_page_number = int(await current_page_element.get_attribute("title"))
self.logger.log(f"현재 페이지 번호: {current_page_number}", level=logging.INFO)
next_page_number = current_page_number + 1
next_page_button_locator = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template').format(page_number=next_page_number)
self.logger.log(f"다음 페이지 선택자: {next_page_button_locator}", level=logging.DEBUG)
next_page_button = await self.page.query_selector(next_page_button_locator)
if next_page_button:
await next_page_button.click()
await self.page.wait_for_load_state('domcontentloaded', timeout=5000)
self.logger.log(f"페이지 {next_page_number}로 이동 완료.", level=logging.INFO)
return True
else:
self.logger.log("다음 페이지가 없습니다.", level=logging.WARNING)
return False
except Exception as e:
self.logger.log(f"다음 페이지로 이동 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return False
async def scroll_with_wheel(self, direction="down", pause_time=0.5, max_scrolls=50):
"""
휠 스크롤을 사용하여 페이지를 위나 아래로 천천히 스크롤.
Parameters:
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
- pause_time: 스크롤 사이의 대기 시간 (초).
- max_scrolls: 최대 스크롤 횟수.
"""
scroll_count = 0
self.logger.log(f"스크롤 시작", level=logging.DEBUG)
# 현재 페이지 높이 가져오기
last_height = await self.page.evaluate("document.body.scrollHeight")
self.logger.log(f"현재 페이지 높이 가져오기 - {last_height}", level=logging.DEBUG)
while scroll_count < max_scrolls:
if direction == "down":
# 아래로 스크롤
self.logger.log(f"scroll_count[{scroll_count}]회 : 휠 아래로 1000px", level=logging.DEBUG)
await self.page.evaluate("window.scrollBy(0, 1000);")
elif direction == "up":
# 위로 스크롤
self.logger.log(f"scroll_count[{scroll_count}]회 : 휠 위로 1000px", level=logging.DEBUG)
await self.page.evaluate("window.scrollBy(0, -1000);")
else:
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
self.logger.log(f"pause_time 슬립 : {pause_time}", level=logging.DEBUG)
await asyncio.sleep(pause_time)
# 새로운 페이지 높이 가져오기
new_height = await self.page.evaluate("document.body.scrollHeight")
self.logger.log(f"새로운 페이지 높이 가져오기 - {new_height}", level=logging.DEBUG)
# 스크롤이 더 이상 필요 없는 경우(페이지 끝에 도달)
if direction == "down" and new_height == last_height:
self.logger.log(f"페이지 끝에 도달했습니다. 스크롤 횟수: {scroll_count}", level=logging.DEBUG)
break
elif direction == "up" and new_height == 0:
self.logger.log(f"페이지 시작에 도달했습니다. 스크롤 횟수: {scroll_count}", level=logging.DEBUG)
break
self.logger.log(f"새로운 페이지 높이를 현재높이로 재설정 - {new_height}", level=logging.DEBUG)
last_height = new_height
scroll_count += 1
self.logger.log(f"스크롤 카운트 + 1", level=logging.DEBUG)
if scroll_count == max_scrolls:
self.logger.log("최대 스크롤 횟수에 도달했습니다.", level=logging.DEBUG)
async def scroll_with_keyboard(self, direction="down", pause_time=0.5, max_scrolls=50):
"""
키보드를 사용하여 페이지를 위나 아래로 천천히 스크롤.
Parameters:
- direction: 스크롤 방향 ("down"은 아래로, "up"은 위로).
- pause_time: 스크롤 사이의 대기 시간 (초).
- max_scrolls: 최대 스크롤 횟수.
"""
scroll_count = 0
while scroll_count < max_scrolls:
if direction == "down":
# 아래로 스크롤 (Page Down 키 사용)
await self.page.keyboard.press("PageDown")
elif direction == "up":
# 위로 스크롤 (Page Up 키 사용)
await self.page.keyboard.press("PageUp")
else:
raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.")
await asyncio.sleep(pause_time)
scroll_count += 1
if scroll_count == max_scrolls:
self.logger.log("최대 스크롤 횟수에 도달했습니다.", level=logging.DEBUG)
async def collect_product_info(self, items_per_page, ed_mode):
"""
상품 정보를 수집하는 메서드
"""
try:
product_infos = []
product_name_elements = [] # product_name_element를 저장할 리스트
# ed_mode에 따라 product_elements 설정
if ed_mode:
# 각 상품의 이름, 가격, 이미지를 위한 선택자 리스트 구성 (index가 2부터 시작)
product_elements = [
{
"name": self.product_name_for_ed_template.format(index=i),
"price": self.product_price_for_ed_template.format(index=i),
"image": self.product_image_for_ed_template.format(index=i)
}
for i in range(2, items_per_page + 2) # index가 2부터 시작하도록 설정
]
else:
# ed_mode=False일 때는 각 상품의 부모 요소를 모두 선택
product_elements = await self.page.query_selector_all(self.product_parent_locator)
for i, element in enumerate(product_elements[:items_per_page], start=1):
try:
if ed_mode:
# ed_mode=True일 때는 각 상품의 개별 선택자 사용
product_name_element = await self.page.wait_for_selector(element["name"], timeout=3000, state="attached")
product_price_element = await self.page.wait_for_selector(element["price"], timeout=3000, state="attached")
product_image_element = await self.page.wait_for_selector(element["image"], timeout=3000, state="attached")
else:
# ed_mode=False일 때 부모 요소 내의 선택자를 사용
product_name_element = await self.page.wait_for_selector(self.product_name_inner_locator, timeout=3000, state="attached")
product_price_element = await self.page.wait_for_selector(self.product_price_inner_locator, timeout=3000, state="attached")
product_image_element = await self.page.wait_for_selector(self.product_image_inner_locator, timeout=3000, state="attached")
# 요소가 존재하면 정보 추출
self.logger.log(f"product_name_element : {product_name_element}", level=logging.DEBUG)
self.logger.log(f"product_price_element : {product_price_element}", level=logging.DEBUG)
self.logger.log(f"product_image_element : {product_image_element}", level=logging.DEBUG)
if product_name_element and product_price_element and product_image_element:
# await의 결과를 각 변수에 저장
product_name_text = (await product_name_element.inner_text()).strip()
product_price_text = (await product_price_element.inner_text()).strip()
product_image_url = await product_image_element.get_attribute('src')
# product_info 딕셔너리에 결과 저장
product_info = {
"name": product_name_text,
"price": product_price_text,
"image_url": product_image_url
}
self.logger.log(f"상품 {i}: {product_info}", level=logging.DEBUG)
product_infos.append(product_info)
product_name_elements.append(product_name_element) # 각 product_name_element 추가
except Exception as e:
self.logger.log(f"상품 {i} 정보 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
continue
return product_infos, product_name_elements # product_infos와 product_name_elements 함께 반환
except Exception as e:
self.logger.log(f"상품 정보 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return []
async def scroll_page_to_bottom(self, pause_time=0.2):
"""페이지의 맨 아래까지 스크롤하여 모든 동적 요소를 로드"""
self.logger.log('페이지 스크롤 시작...', level=logging.INFO)
previous_height = await self.page.evaluate("() => document.body.scrollHeight")
while True:
await self.page.evaluate("window.scrollBy(0, window.innerHeight);") # 한 화면씩 스크롤
await asyncio.sleep(pause_time) # 페이지 로딩 대기
current_height = await self.page.evaluate("() => document.body.scrollHeight")
if current_height == previous_height:
break # 더 이상 스크롤할 내용이 없으면 종료
previous_height = current_height
self.logger.log('페이지 스크롤 완료.', level=logging.INFO)
async def scroll_page_to_top(self, pause_time=0.2):
"""페이지의 맨 위까지 스크롤"""
self.logger.log('페이지 위로 스크롤 시작...', level=logging.INFO)
previous_height = await self.page.evaluate("() => window.pageYOffset")
while previous_height > 0:
await self.page.evaluate("window.scrollBy(0, -window.innerHeight);") # 한 화면씩 위로 스크롤
await asyncio.sleep(pause_time) # 페이지 로딩 대기
current_height = await self.page.evaluate("() => window.pageYOffset")
if current_height == previous_height:
break # 더 이상 스크롤할 내용이 없으면 종료
previous_height = current_height
self.logger.log('페이지 위로 스크롤 완료.', level=logging.INFO)
async def start_Percenty_task(self):
self.logger.log('퍼센티 상품수정 작업을 시작합니다...', level=logging.DEBUG)
self.running = True # 번역 작업이 시작됨
try:
# 금지어 목록 업데이트
self.forbidden_word_manager.refresh_forbidden_words()
# 웨일 브라우저 시작 - 최대 3번 재시도
max_retries = 3
retry_count = 0
whale_window = None
while retry_count < max_retries:
self.check_pause() # 일시중지 상태 확인
self.logger.log(f'웨일 브라우저 시작 시도 {retry_count + 1}/{max_retries}...', level=logging.INFO)
whale_window = self.whale_translator.start_trans_browser()
if whale_window:
self.logger.log('웨일 브라우저가 성공적으로 시작되었습니다.', level=logging.INFO)
break
retry_count += 1
self.logger.log(f'웨일 브라우저 시작 실패. {retry_count}/{max_retries}', level=logging.WARNING)
await asyncio.sleep(1) # 잠시 대기 후 재시도
if not whale_window:
error_msg = "웨일 브라우저를 시작할 수 없습니다. 상품수정을 중단합니다."
self.logger.log(error_msg, level=logging.ERROR)
return
if whale_window and self.toggle_states['collect_method_combo'] == "lens":
self.whale_translator.check_capcha()
# 1. 총 상품 수 수집
self.check_pause() # 일시중지 상태 확인
await self.scroll_page_to_bottom() # 동적 로딩을 위해 끝까지 스크롤
# total_products = await self.browser_controller.get_total_product_count(ed_mode=self.toggle_states['ed_mode'])
# get_total_product_count 메서드 호출 후 결과를 딕셔너리로 받음
result = await self.get_total_product_count()
# 딕셔너리에서 총 상품 수와 페이지당 상품 수를 추출
total_products = result.get("total_count", 0)
items_per_page = result.get("items_per_page", 0)
total_pages = math.ceil(total_products / items_per_page)
self.logger.log(f"총 상품: {total_products}, 페이지당: {items_per_page}, 총 페이지: {total_pages}", level=logging.DEBUG)
self.logger.log(f"[ self.toggle_states 상태 ]\n{self.toggle_states}", level=logging.DEBUG)
if total_products == 0:
self.logger.log('수집할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG)
return
completed_count = 0
page_number = 1
# 3. 총 상품 수만큼 반복 작업 수행
# while self.running and completed_count < total_products:
for page_number in range(1, total_pages + 1):
self.check_pause() # 일시중지 상태 확인
# self.logger.log(f'현재 페이지: {page_number}', level=logging.DEBUG)
self.logger.log(f'=== 페이지 {page_number}/{total_pages} 시작 ===', level=logging.INFO)
await self.scroll_page_to_top()
self.logger.log(f'동적로딩을 위해 휠 스크롤 업', level=logging.DEBUG)
if not self.toggle_states['ed_mode']:
# 4. 현재 페이지의 모든 "세부사항 수정 및 업로드" 버튼 찾기
self.logger.log('수정모드가 아니므로 상품수정 버튼 elements를 수집합니다.', level=logging.DEBUG)
product_buttons = await self.get_product_edit_buttons_by_template()
else:
self.logger.log('상품정보 수집', level=logging.DEBUG)
product_infos, product_name_elements = await self.collect_product_info(items_per_page, ed_mode=self.toggle_states['ed_mode'])
self.logger.log(f"product_infos : {product_infos}", level=logging.DEBUG)
self.logger.log('수정모드이므로 상품명 elements를 수정버튼으로 활용합니다.', level=logging.DEBUG)
product_buttons = [{"edit_button": name_element, "memo_button": None, "shipping_button": None} for name_element in product_name_elements]
self.logger.log(f"product_buttons 갯수 : [{len(product_buttons)}]개", level=logging.DEBUG)
if not product_buttons:
self.logger.log('수정할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG)
break
# 5. 각 상품에 대해 상품수정작업 수행
for index, button_set in enumerate(product_buttons, start=1):
try:
self.check_pause() # 일시중지 상태 확인
edit_button = button_set.get("edit_button")
memo_button = button_set.get("memo_button")
shipping_button = button_set.get("shipping_button") # 해외배송비 수정 버튼
# 상태 초기화 코드 추가
self.titleGenerator.reset_state()
# 그 외 임시 변수 초기화
self.current_options_info = None
# 상품명 수집 오류 처리
self.logger.log(f'{index}/{len(product_buttons)} 버튼의 활성상태 확인 중...', level=logging.DEBUG)
is_disabled = await self.is_button_disabled(edit_button)
if is_disabled:
self.logger.log(f'{index}/{len(product_buttons)}: 상품의 수정버튼이 비활성화되어 있어 작업을 건너뜁니다.', level=logging.DEBUG)
# completed_count += 1
continue
self.logger.log(f'{index}/{len(product_buttons)}: 세부사항 수정 작업 중...', level=logging.DEBUG)
# 상품 수정 다이얼로그 열기
await self.open_product_edit_dialog(edit_button)
self.check_pause() # 일시중지 상태 확인
title_infos = await self.titleGenerator.get_initial_info(self.price_setting_diag)
self.logger.log(f"title_infos : {title_infos}", level=logging.DEBUG)
# 금지카테고리 여부가 True이면, 금지카테고리 제목 설정 후 저장하고 다음 상품으로 넘어감.
if title_infos.get("is_banned_category", False):
banned_title = self.titleGenerator.set_banned_category_title(title_infos.get("banned_category_info"))
self.logger.log(f"금지카테고리 상품 처리: 새 제목: {banned_title}", level=logging.INFO)
# 랜덤 4자리 붙이기
random_suffix = self.generate_random_suffix()
# 상품명 설정 (TitleGenerator 내부 또는 별도 메서드를 통해)
is_set = await self.titleGenerator.set_product_name(banned_title + random_suffix)
if is_set:
self.logger.log("금지카테고리 상품명 설정 완료.", level=logging.INFO)
else:
self.logger.log("금지카테고리 상품명 설정 실패.", level=logging.WARNING)
await self.save_and_ecs_product_edit()
# completed_count += 1
continue # 다음 상품으로 넘어감
# 정상상품이면 상품명 수정과 카테고리 수집
is_title = self.toggle_states['title']
is_title_shuffle = self.toggle_states['title_shuffle']
if is_title or is_title_shuffle:
self.check_pause() # 일시중지 상태 확인
self.logger.log(f"상품명 수정 : {is_title} ", level=logging.DEBUG)
title_infos["generated_name"] = await self.titleGenerator.process_title()
# 옵션 수정
is_optionTrnas = self.toggle_states.get('optionTrnas')
is_optionIMGTrans = self.toggle_states.get('optionIMGTrans')
is_optionAutoSelect = self.toggle_states.get('optionAutoSelect')
if is_optionTrnas or is_optionIMGTrans or is_optionAutoSelect:
self.check_pause() # 일시중지 상태 확인
self.logger.log(f"옵션수정 : optionTrnas={is_optionTrnas} + optionIMGTrans={is_optionIMGTrans} + optionAutoSelect={is_optionAutoSelect}", level=logging.DEBUG)
await self.edit_option(title_infos.get("original_name", None))
# 가격 수정
is_price = self.toggle_states.get('price')
if is_price:
self.check_pause() # 일시중지 상태 확인
self.logger.log(f"가격수정 : {is_price} ", level=logging.DEBUG)
await self.edit_price(title_infos)
# 썸네일 수정
thumb = self.toggle_states.get('thumb')
self.logger.log(f"썸네일수정 : {thumb} ", level=logging.DEBUG)
if thumb:
self.check_pause() # 일시중지 상태 확인
await self.edit_thumb()
# 태그 수정
tag = self.toggle_states.get('tag')
if tag:
self.check_pause() # 일시중지 상태 확인
self.logger.log(f"키워드태그 수정 : {tag} ", level=logging.DEBUG)
await self.edit_tags(title_infos)
# 상세페이지 수정 수정
detail_Option = self.toggle_states.get('detail_Option')
detail_IMGTrans = self.toggle_states.get('detail_IMGTrans')
if detail_Option or detail_IMGTrans:
self.check_pause() # 일시중지 상태 확인
self.logger.log(f"상세페이지 수정 : {detail_Option} + {detail_IMGTrans}", level=logging.DEBUG)
# 상세페이지 수정
await self.edit_detail(detail_Option, detail_IMGTrans)
# 수정 후 저장
self.logger.log('상품 세부사항 저장 중...', level=logging.DEBUG)
await self.save_and_ecs_product_edit()
# 메모 입력
if memo_button and self.toggle_states['memo']:
self.check_pause() # 일시중지 상태 확인
self.logger.log(f'{index}/{len(product_buttons)}: 메모 입력 중...', level=logging.DEBUG)
await self.insert_product_infos_to_memo(memo_button, title_infos)
except Exception as item_err:
# 이 상품만 실패로 기록하고, 다음 상품으로
self.logger.log(f" ▶ 상품 {index}/{len(product_buttons)} 처리중 오류: {item_err}", level=logging.ERROR, exc_info=True)
finally:
# 성공·실패 관계없이 진행 카운트와 프로그레스바 업데이트
completed_count += 1
self.logger.log(f'{completed_count}/[{total_products}]개 상품 수정 완료.', level=logging.INFO)
if (completed_count % 10) == 0:
gc.collect()
mem = psutil.virtual_memory()
self.logger.log(f"GC 호출 후, 메모리 사용량: {mem.percent}% 사용중", level=logging.DEBUG)
# 목표 도달 체크
if completed_count >= total_products:
break
# (3) 다음 페이지로 이동
if page_number < total_pages:
success = await self.go_to_next_page()
if not success:
err = f"페이지 {page_number}{page_number+1} 이동 실패"
self.logger.log(err, level=logging.ERROR)
return
else:
self.logger.log("마지막 페이지까지 모두 처리했습니다.", level=logging.INFO)
except Exception as fatal:
# 전체 작업 중에 발생한 치명적 예외
self.logger.log(f"전체 작업 중 오류: {fatal}", level=logging.ERROR, exc_info=True)
else:
# 예외 없이 ‘정상적으로’ 전체 루프를 마쳤을 때
self.logger.log(f"모든 상품({completed_count}/{total_products}) 처리 완료", level=logging.INFO)
finally:
# 브라우저나 리소스는 항상 닫아 주기
try:
self.whale_translator.close_trans_browser()
except:
pass
async def start_shuffle_upload_task(self):
self.logger.log('셔플업로드 작업을 시작합니다...', level=logging.DEBUG)
self.running = True # 번역 작업이 시작됨
try:
await self.scroll_page_to_bottom() # 동적 로딩을 위해 끝까지 스크롤
# get_total_product_count 메서드 호출 후 결과를 딕셔너리로 받음
result = await self.get_total_product_count()
# 딕셔너리에서 총 상품 수와 페이지당 상품 수를 추출
total_products = result.get("total_count", 0)
items_per_page = result.get("items_per_page", 0)
total_pages = math.ceil(total_products / items_per_page)
self.logger.log(f"총 상품: {total_products}, 페이지당: {items_per_page}, 총 페이지: {total_pages}", level=logging.DEBUG)
if total_products == 0:
self.logger.log('셔플할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG)
return
completed_count = 0
page_number = 1
# 3. 총 상품 수만큼 반복 작업 수행
for page_number in range(1, total_pages + 1):
self.logger.log(f'=== 페이지 {page_number}/{total_pages} 시작 ===', level=logging.INFO)
if page_number != 1:
await self.scroll_page_to_top()
self.logger.log(f'동적로딩을 위해 휠 스크롤 업', level=logging.DEBUG)
self.logger.log('등록 상품 등록 페이지로 이동 중...', level=logging.INFO)
# await self.go_to_new_product_page()
try:
current_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'current_product_page_locator')
await self.page.click(current_product_page_locator)
await self.close_ad_if_exists_with_ESC_Key()
await self.close_ant_modal_dialogs()
self.logger.log("등록 상품 등록 페이지로 이동 완료.", level=logging.INFO)
# self.logger.log(f"신규 상품 등록 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True)
except Exception as e:
self.logger.log(f"광고 닫기 오류: {str(e)}", level=logging.ERROR, exc_info=True)
return
# 그룹선택 버튼 클릭
group_select_button = self.toggle_states['group_select_button']
if group_select_button:
self.logger.log('그룹선택 버튼 클릭 중...', level=logging.INFO)
await self.page.click(group_select_button)
self.logger.log('그룹선택 버튼 클릭 완료.', level=logging.INFO)
if not self.toggle_states['ed_mode']:
# 4. 현재 페이지의 모든 "세부사항 수정 및 업로드" 버튼 찾기
self.logger.log('수정모드가 아니므로 상품수정 버튼 elements를 수집합니다.', level=logging.DEBUG)
product_buttons = await self.get_product_edit_buttons_by_template()
else:
self.logger.log('상품정보 수집', level=logging.DEBUG)
product_infos, product_name_elements = await self.collect_product_info(items_per_page, ed_mode=self.toggle_states['ed_mode'])
self.logger.log(f"product_infos : {product_infos}", level=logging.DEBUG)
self.logger.log('수정모드이므로 상품명 elements를 수정버튼으로 활용합니다.', level=logging.DEBUG)
product_buttons = [{"edit_button": name_element, "memo_button": None, "shipping_button": None} for name_element in product_name_elements]
self.logger.log(f"product_buttons 갯수 : [{len(product_buttons)}]개", level=logging.DEBUG)
if not product_buttons:
self.logger.log('수정할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG)
break
# 5. 각 상품에 대해 상품수정작업 수행
for index, button_set in enumerate(product_buttons, start=1):
try:
self.check_pause() # 일시중지 상태 확인
edit_button = button_set.get("edit_button")
memo_button = button_set.get("memo_button")
shipping_button = button_set.get("shipping_button") # 해외배송비 수정 버튼
# 상태 초기화 코드 추가
self.titleGenerator.reset_state()
# 그 외 임시 변수 초기화
self.current_options_info = None
# 상품명 수집 오류 처리
self.logger.log(f'{index}/{len(product_buttons)} 버튼의 활성상태 확인 중...', level=logging.DEBUG)
is_disabled = await self.is_button_disabled(edit_button)
if is_disabled:
self.logger.log(f'{index}/{len(product_buttons)}: 상품의 수정버튼이 비활성화되어 있어 작업을 건너뜁니다.', level=logging.DEBUG)
# completed_count += 1
continue
self.logger.log(f'{index}/{len(product_buttons)}: 세부사항 수정 작업 중...', level=logging.DEBUG)
# 상품 수정 다이얼로그 열기
await self.open_product_edit_dialog(edit_button)
self.check_pause() # 일시중지 상태 확인
title_infos = await self.titleGenerator.get_initial_info(self.price_setting_diag)
self.logger.log(f"title_infos : {title_infos}", level=logging.DEBUG)
# 금지카테고리 여부가 True이면, 금지카테고리 제목 설정 후 저장하고 다음 상품으로 넘어감.
# if title_infos.get("is_banned_category", False):
if title_infos.get("is_banned_category", False):
banned_title = self.titleGenerator.set_banned_category_title(title_infos.get("banned_category_info"))
self.logger.log(f"금지카테고리 상품 처리: 새 제목: {banned_title}", level=logging.INFO)
# 랜덤 4자리 붙이기
random_suffix = self.generate_random_suffix()
# 상품명 설정 (TitleGenerator 내부 또는 별도 메서드를 통해)
is_set = await self.titleGenerator.set_product_name(banned_title + random_suffix)
if is_set:
self.logger.log("금지카테고리 상품명 설정 완료.", level=logging.INFO)
else:
self.logger.log("금지카테고리 상품명 설정 실패.", level=logging.WARNING)
await self.save_and_ecs_product_edit()
# completed_count += 1
continue # 다음 상품으로 넘어감
# 정상상품이면 상품명 수정과 카테고리 수집
is_title = self.toggle_states['title']
is_title_shuffle = self.toggle_states['title_shuffle']
if is_title or is_title_shuffle:
self.check_pause() # 일시중지 상태 확인
self.logger.log(f"상품명 수정 : {is_title} ", level=logging.DEBUG)
title_infos["generated_name"] = await self.titleGenerator.process_title()
# 옵션 수정
is_optionTrnas = self.toggle_states.get('optionTrnas')
is_optionIMGTrans = self.toggle_states.get('optionIMGTrans')
is_optionAutoSelect = self.toggle_states.get('optionAutoSelect')
if is_optionTrnas or is_optionIMGTrans or is_optionAutoSelect:
self.check_pause() # 일시중지 상태 확인
self.logger.log(f"옵션수정 : optionTrnas={is_optionTrnas} + optionIMGTrans={is_optionIMGTrans} + optionAutoSelect={is_optionAutoSelect}", level=logging.DEBUG)
await self.edit_option(title_infos.get("original_name", None))
# 가격 수정
is_price = self.toggle_states.get('price')
if is_price:
self.check_pause() # 일시중지 상태 확인
self.logger.log(f"가격수정 : {is_price} ", level=logging.DEBUG)
await self.edit_price(title_infos)
# 썸네일 수정
thumb = self.toggle_states.get('thumb')
self.logger.log(f"썸네일수정 : {thumb} ", level=logging.DEBUG)
if thumb:
self.check_pause() # 일시중지 상태 확인
await self.edit_thumb()
# 태그 수정
tag = self.toggle_states.get('tag')
if tag:
self.check_pause() # 일시중지 상태 확인
self.logger.log(f"키워드태그 수정 : {tag} ", level=logging.DEBUG)
await self.edit_tags(title_infos)
# 상세페이지 수정 수정
detail_Option = self.toggle_states.get('detail_Option')
detail_IMGTrans = self.toggle_states.get('detail_IMGTrans')
if detail_Option or detail_IMGTrans:
self.check_pause() # 일시중지 상태 확인
self.logger.log(f"상세페이지 수정 : {detail_Option} + {detail_IMGTrans}", level=logging.DEBUG)
# 상세페이지 수정
await self.edit_detail(detail_Option, detail_IMGTrans)
# 수정 후 저장
self.logger.log('상품 세부사항 저장 중...', level=logging.DEBUG)
await self.save_and_ecs_product_edit()
# 메모 입력
if memo_button and self.toggle_states['memo']:
self.check_pause() # 일시중지 상태 확인
self.logger.log(f'{index}/{len(product_buttons)}: 메모 입력 중...', level=logging.DEBUG)
await self.insert_product_infos_to_memo(memo_button, title_infos)
except Exception as item_err:
# 이 상품만 실패로 기록하고, 다음 상품으로
self.logger.log(f" ▶ 상품 {index}/{len(product_buttons)} 처리중 오류: {item_err}", level=logging.ERROR, exc_info=True)
finally:
# 성공·실패 관계없이 진행 카운트와 프로그레스바 업데이트
completed_count += 1
self.logger.log(f'{completed_count}/[{total_products}]개 상품 수정 완료.', level=logging.INFO)
if (completed_count % 10) == 0:
gc.collect()
mem = psutil.virtual_memory()
self.logger.log(f"GC 호출 후, 메모리 사용량: {mem.percent}% 사용중", level=logging.DEBUG)
# 목표 도달 체크
if completed_count >= total_products:
break
# (3) 다음 페이지로 이동
if page_number < total_pages:
success = await self.go_to_next_page()
if not success:
err = f"페이지 {page_number}{page_number+1} 이동 실패"
self.logger.log(err, level=logging.ERROR)
return
else:
self.logger.log("마지막 페이지까지 모두 처리했습니다.", level=logging.INFO)
except Exception as fatal:
# 전체 작업 중에 발생한 치명적 예외
self.logger.log(f"전체 작업 중 오류: {fatal}", level=logging.ERROR, exc_info=True)
else:
# 예외 없이 ‘정상적으로’ 전체 루프를 마쳤을 때
self.logger.log(f"모든 상품({completed_count}/{total_products}) 처리 완료", level=logging.INFO)
finally:
# 브라우저나 리소스는 항상 닫아 주기
try:
self.whale_translator.close_trans_browser()
except:
pass
def clear_temp_folder(self, folder_path):
"""
폴더 내 모든 파일 삭제, 폴더 없으면 생성
"""
if os.path.exists(folder_path):
# 폴더 내 파일/폴더 삭제
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
self.logger.log(f"파일 삭제 실패: {file_path}, 에러: {e}", level=logging.ERROR, exc_info=True)
else:
os.makedirs(folder_path, exist_ok=True)
def run(self):
"""QThread의 run 메서드에서 이벤트 루프 생성 및 실행"""
self.logger.log("run() - 이벤트 루프 초기화 시작", level=logging.DEBUG)
# 이벤트 루프가 없거나 종료된 경우에만 초기화
if not self.loop or self.loop.is_closed():
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.logger.log("run() - 이벤트 루프가 생성되었습니다.", level=logging.DEBUG)
# 이벤트 루프를 유지하여 외부에서 작업 추가 가능
self.loop.run_forever()
def start_browser_task(self):
"""이벤트 루프에서 브라우저 초기화 작업 추가"""
self.logger.log(f"start_browser_task - 브라우저 초기화 작업 시작 : {self.toggle_states_for_limited}", level=logging.DEBUG)
# 실행 중인 이벤트 루프에 비동기 작업 추가
if self.loop and not self.loop.is_closed():
asyncio.run_coroutine_threadsafe(self.start_browser_async(), self.loop)
self.browser_task_started = True # 작업 실행 플래그 설정
self.logger.log("start_browser_task - 비동기 작업이 추가되었습니다.", level=logging.DEBUG)
else:
self.logger.log("start_browser_task - 실행 중인 이벤트 루프가 없습니다.", level=logging.ERROR)
def start_PercentyJob_task(self):
"""번역 작업을 이벤트 루프에서 실행"""
# 이벤트 루프가 없거나 닫혀 있으면 새로 생성
# self.initialize_event_loop()
if self.loop and not self.loop.is_closed():
# 이미 실행 중인 이벤트 루프에 번역 작업 추가
asyncio.run_coroutine_threadsafe(self.start_Percenty_task(), self.loop)
else:
self.logger.log("이벤트 루프가 초기화되지 않았거나 이미 종료되었습니다.", level=logging.ERROR)
def start_shuffle_upload_task(self):
"""이벤트 루프에서 셔플업로드 작업 추가"""
self.logger.log(f"start_shuffle_upload_task - 셔플업로드 작업 시작 : {self.toggle_states}", level=logging.DEBUG)
# 실행 중인 이벤트 루프에 비동기 작업 추가
if self.loop and not self.loop.is_closed():
asyncio.run_coroutine_threadsafe(self.start_shuffle_upload_async(), self.loop)
self.shuffle_upload_task_started = True # 작업 실행 플래그 설정
self.logger.log("start_shuffle_upload_task - 비동기 작업이 추가되었습니다.", level=logging.DEBUG)
else:
self.logger.log("start_shuffle_upload_task - 실행 중인 이벤트 루프가 없습니다.", level=logging.ERROR)
def initialize_event_loop(self):
"""이벤트 루프가 없거나 닫힌 경우 새로 생성"""
if not self.loop or self.loop.is_closed():
self.logger.log("initialize_event_loop - 새로운 이벤트 루프 생성 중", level=logging.DEBUG)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.logger.log("initialize_event_loop - 이벤트 루프가 생성되었습니다.", level=logging.DEBUG)
else:
self.logger.log("initialize_event_loop - 기존 이벤트 루프 사용 중", level=logging.DEBUG)
def stop(self):
"""Playwright를 안전하게 종료"""
try:
# 비동기 함수 호출을 동기식으로 처리
asyncio.run(self._stop_playwright())
except RuntimeError as e:
self.logger.log(f"Playwright 종료 중 오류 발생: {e}", level=logging.ERROR)
async def _stop_playwright(self):
"""모든 페이지/브라우저/Playwright 종료"""
# 1. 페이지 닫기
if self.browser:
for page in self.browser.pages:
try: await page.close()
except: pass
try: await self.browser.close()
except: pass
# 2. Playwright 종료
if self.playwright:
try: await self.playwright.stop()
except: pass
# 3. WhaleTranslator 등 추가 리소스
if self.whale_translator:
self.whale_translator.close_trans_browser()
# 4. 기타 핸들러 정리
if hasattr(self, 'imageProcessor') and self.imageProcessor:
self.imageProcessor.cleanup()
# 5. 루프 종료
if self.loop and not self.loop.is_closed():
self.loop.call_soon_threadsafe(self.loop.stop)
def force_terminate_browser(self):
"""Playwright 브라우저 프로세스를 강제로 종료"""
try:
if self.browser:
browser_pid = self.browser.contexts[0]._channel.owner._impl_obj._browser_pid
browser_process = psutil.Process(browser_pid)
for child in browser_process.children(recursive=True):
child.kill()
browser_process.kill()
self.logger.log("브라우저 프로세스를 강제로 종료했습니다.", level=logging.WARNING)
if self.whale_translator:
self.whale_translator.close_trans_browser()
except Exception as e:
self.logger.log(f"브라우저 프로세스 강제 종료 중 오류 발생: {e}", level=logging.ERROR)
def terminate(self):
"""QThread 종료시 정리"""
self.logger.log("크롬 스레드 종료", level=logging.INFO)
self.request_cleanup() # QThread 이벤트루프에서 정리
super().terminate()
def cleanup(self):
"""BrowserController 종료 시 리소스 정리"""
try:
# ImageProcessor 리소스 정리
if hasattr(self, 'imageProcessor') and self.imageProcessor:
self.logger.log("ImageProcessor 리소스 정리 중...", level=logging.INFO)
self.imageProcessor.cleanup()
except Exception as e:
self.logger.log(f"리소스 정리 중 오류 발생: {e}", level=logging.ERROR)
def request_cleanup(self):
"""브라우저 리소스 정리 명령을 QThread 이벤트루프에 올림"""
if self.loop and not self.loop.is_closed():
asyncio.run_coroutine_threadsafe(self._stop_playwright(), self.loop)
else:
self.logger.log("정리 요청 시 루프가 없습니다.", level=logging.ERROR)
def log_memory_usage(self, message=""):
mem = psutil.virtual_memory()
self.logger.log(f"{message} 메모리 사용: 총 {mem.total/1024/1024:.1f}MB, 사용 {mem.used/1024/1024:.1f}MB, 사용률 {mem.percent}%", level=logging.DEBUG)
def pause_task(self):
"""작업을 일시정지합니다"""
self.logger.log("작업 일시정지 요청...", level=logging.INFO)
# 일시정지 상태 설정
self.pause()
# 추가 로직이 필요한 경우 여기에 구현
def resume_task(self):
"""일시정지된 작업을 재개합니다"""
self.logger.log("작업 재개 요청...", level=logging.INFO)
# 재개 상태 설정
self.resume()
# 추가 로직이 필요한 경우 여기에 구현