from playwright.async_api import async_playwright, TimeoutError from PySide6.QtCore import QThread, Signal, QMutex, QWaitCondition, Slot import re # import pyautogui import time import win32gui, win32con import asyncio import os, sys, random import requests import gc import psutil import shutil from datetime import datetime, timedelta import math # from PIL import Image # from src.wh_search import WhaleSearchParser # from src.wh_img_trans import WhaleImageTranslator # from src.whale_new import WhaleTranslator # from src.modules.clipboardImageManager import ClipboardImageManager # from src.contents.option import OptionHandler # from src.contents.price import PriceHandler # from src.contents.details import DetailHandler # from src.contents.titleGenerator import TitleGenerator # from src.contents.thumb import ThumbnailHandler # from src.modules.image_processor import ImageProcessor # from src.contents.tags import TagsHandler # from src.titleManager.sp_ForbiddenM import SupabaseForbiddenWordManager from src.titleManager.gpt_client import GPTClient from locatorManager_by_SP import LocatorManager import logging # 새로운 모듈 import import string class BrowserController(QThread): def __init__(self, app, logger, base_path, login_infos, toggle_states_for_limited, biz_dbManager, sp_user_id, supabase_manager): super().__init__() self.app = app # AutoPercentyGUI 객체 저장 self.logger = logger self.base_path = base_path self.login_infos = login_infos self.biz_dbManager = biz_dbManager self.sp_user_id = sp_user_id self.supabase_manager = supabase_manager self.toggle_states_for_limited = toggle_states_for_limited self.parsing_page = None self.chrome_hwnd = None # 2. 에러 스크린샷 폴더 (장기 보관) self.ERROR_SCREENSHOT_DIR = os.path.join(self.base_path, 'error_screenshots') os.makedirs(self.ERROR_SCREENSHOT_DIR, exist_ok=True) self.logger.log(f"error 디렉토리 생성: {self.ERROR_SCREENSHOT_DIR}", level=logging.INFO) self.clear_old_screenshot_dirs(days=14) self.logger.log(f"error 디렉토리 삭제 완료", level=logging.INFO) self.playwright = None self.browser = None self.page = None self.loop = None # 이벤트 루프를 저장할 변수 self.gpt_client = GPTClient(logger=self.logger) self.whale_translator = None self.forbidden_word_manager =None self.locator_manager = LocatorManager(self.supabase_manager, self.logger) self.titleGenerator = TitleGenerator(self.locator_manager, self, self.logger, self.whale_translator, self.toggle_states_for_limited, self.gpt_client, self.forbidden_word_manager, self.sp_user_id, self.supabase_manager) # BrowserController에 해당하는 모든 locator를 정의 self.chrome_window_name = self.locator_manager.get_locator('BrowserControl', 'chrome_window_name') self.login_email_locator = self.locator_manager.get_locator('BrowserControl', 'login_email_locator') self.login_password_locator = self.locator_manager.get_locator('BrowserControl', 'login_password_locator') self.login_button_locator = self.locator_manager.get_locator('BrowserControl', 'login_button_locator') self.admin_toggle_locator = self.locator_manager.get_locator('BrowserControl', 'admin_toggle_locator') self.staff_id_locator = self.locator_manager.get_locator('BrowserControl', 'staff_id_locator') self.staff_login_button_locator = self.locator_manager.get_locator('BrowserControl', 'staff_login_button_locator') self.close_ad_dialog_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_dialog_locator') self.close_ad_button_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_button_locator') self.total_product_count_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_locator') self.total_product_count_for_registed_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_for_registed_locator') self.items_per_page_locator = self.locator_manager.get_locator('BrowserControl', 'items_per_page_locator') self.product_edit_dialog_open_locator = self.locator_manager.get_locator('BrowserControl', 'product_edit_dialog_open_locator') self.product_dialog_close_btn = self.locator_manager.get_locator('BrowserControl', 'product_dialog_close_btn') self.product_parent_locator= self.locator_manager.get_locator('BrowserControl', 'product_parent_locator') self.product_name_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_name_inner_locator') self.product_price_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_price_inner_locator') self.product_image_inner_locator = self.locator_manager.get_locator('BrowserControl', 'product_image_inner_locator') self.product_name_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_name_for_ed_template') self.product_price_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_price_for_ed_template') self.product_image_for_ed_template = self.locator_manager.get_locator('BrowserControl', 'product_image_for_ed_template') self.current_page = self.locator_manager.get_locator('BrowserControl', 'current_page') self.next_page_button_template = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template') self.new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator') self.registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator') self.current_page_locator = self.locator_manager.get_locator('BrowserControl', 'current_page_locator') self.option_input_field_locator = self.locator_manager.get_locator('BrowserControl', 'option_input_field_locator') self.title_tab_locator = self.locator_manager.get_locator('BrowserControl', 'title_tab_locator') self.option_tab_locator = self.locator_manager.get_locator('BrowserControl', 'option_tab_locator') self.price_tab_locator = self.locator_manager.get_locator('BrowserControl', 'price_tab_locator') self.tag_tab_locator = self.locator_manager.get_locator('BrowserControl', 'tag_tab_locator') self.thumb_tab_locator = self.locator_manager.get_locator('BrowserControl', 'thumb_tab_locator') self.detail_tab_locator = self.locator_manager.get_locator('BrowserControl', 'detail_tab_locator') # self.upload_tab_locator = self.locator_manager.get_locator('BrowserControl', 'upload_tab_locator') self.save_button_locator = self.locator_manager.get_locator('BrowserControl', 'save_button_locator') self.group_dropdown_locator = self.locator_manager.get_locator('BrowserControl', 'group_dropdown_locator') self.group_index_template = self.locator_manager.get_locator('BrowserControl', 'group_index_template') self.group_options_selector_locator = self.locator_manager.get_locator('BrowserControl', 'group_options_selector_locator') self.dropdown_openstatus_locator = self.locator_manager.get_locator('BrowserControl', 'dropdown_openstatus_locator') self.selected_group_name_locator = self.locator_manager.get_locator('BrowserControl', 'selected_group_name_locator') self.product_edit_buttons = self.locator_manager.get_locator('BrowserControl', 'product_edit_buttons') self.product_memo_buttons = self.locator_manager.get_locator('BrowserControl', 'product_memo_buttons') self.memo_input_locator = self.locator_manager.get_locator('BrowserControl', 'memo_input_locator') self.memo_exposer_locator = self.locator_manager.get_locator('BrowserControl', 'memo_exposer_locator') self.memo_save_btn_locator = self.locator_manager.get_locator('BrowserControl', 'memo_save_btn_locator') # self.text_templates = self.locator_manager.selectors.get('DetailPageTextTemplates', {}) # # 스레드 종료 시 close_whale_window_if_exists 호출 # self.finished.connect(self.cleanup) def get_page(self): return self.page def get_parsing_page(self): return self.parsing_page def get_whale(self): return self.whale_translator def generate_random_suffix(self, length=4): """랜덤한 영문+숫자 조합 문자열 생성""" chars = string.ascii_uppercase + string.digits return ''.join(random.choices(chars, k=length)) async def start_browser_async(self): """비동기 Playwright 초기화 및 로그인 수행""" try: try: self.logger.log('알바생 브라우저를 실행합니다...', level=logging.DEBUG) debug_mode = True # Playwright 시작 및 브라우저 실행 self.playwright = await async_playwright().start() browser_path = os.path.join(self.base_path, 'src', 'browsers', 'chromium-1140', 'chrome-win','chrome.exe') user_data_dir = os.path.join(self.base_path, 'src', 'browsers', 'user_data') self.logger.log(f"브라우저 경로: {browser_path}", level=logging.DEBUG) self.logger.log(f"사용자 폴더 경로: {user_data_dir}", level=logging.DEBUG) if not os.path.exists(browser_path): self.logger.log(f"브라우저 실행 파일이 없습니다: {browser_path}", level=logging.DEBUG) raise FileNotFoundError(f"브라우저 실행 파일이 없습니다: {browser_path}") # 사용자 데이터 디렉토리가 존재하지 않으면 생성 if not os.path.exists(user_data_dir): os.makedirs(user_data_dir) self.logger.log(f"{user_data_dir} 디렉토리가 생성되었습니다.", level=logging.DEBUG) # User agent 설정 import random user_agent = random.choice([ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0", ]) self.logger.log(f"user_agent: {user_agent}", level=logging.DEBUG) # 브라우저 시작 및 설정 self.browser = await self.playwright.chromium.launch_persistent_context( user_data_dir, headless=not debug_mode, permissions=["geolocation", "notifications"], geolocation={"latitude": 37.5665, "longitude": 126.9780}, locale="ko-KR", args=[ '--disable-popup-blocking', '--start-maximized', '--window-size=1920,1080' ], executable_path=browser_path, user_agent=user_agent, ) # 기본 페이지가 없을 수 있으므로 새로운 페이지 생성 self.page_for_upload = await self.browser.new_page() self.logger.log('새 페이지 로딩 중...', level=logging.INFO) await self.page_for_upload.goto('https://percenty.co.kr/signin') self.logger.log('percenty.co.kr/signin 로딩 완료', level=logging.INFO) # 첫 번째 기본 탭 닫기 if self.browser.pages: await self.browser.pages[0].close() # 페이지 제목을 가져와서 창 제목으로 활용 page_title = await self.page_for_upload.title() self.logger.log(f'페이지 제목: {page_title}', level=logging.DEBUG) except Exception as e: self.logger.log(f"브라우저 객체 생성 오류: {str(e)}", level=logging.ERROR, exc_info=True) return try: is_login_success = await self.login(self.page_for_upload) if not is_login_success: self.logger.log("로그인 실패", level=logging.ERROR) return except Exception as e: self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR, exc_info=True) return # 기본 페이지가 없을 수 있으므로 새로운 페이지 생성 self.page_for_market = await self.browser.new_page() self.logger.log('새 페이지 로딩 중...', level=logging.INFO) await self.page_for_market.goto('https://percenty.co.kr') self.logger.log('percenty.co.kr 로딩 완료', level=logging.INFO) try: await self.close_ant_modal_dialogs(self.page_for_upload) except Exception as e: self.logger.log(f"광고 닫기 오류: {str(e)}", level=logging.ERROR, exc_info=True) return self.logger.log('등록 상품 관리 페이지로 이동 중...', level=logging.INFO) try: registered_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'registered_product_page_locator') await self.page_for_upload.click(registered_product_page_locator) self.logger.log("등록 상품 관리 페이지로 이동 완료.", level=logging.INFO) except Exception as e: self.logger.log(f"등록 상품 관리 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True) return self.logger.log('마켓관리리 페이지로 이동 중...', level=logging.INFO) # await self.go_to_new_product_page() try: market_management_page_locator = self.locator_manager.get_locator('BrowserControl', 'market_management_page_locator') await self.page_for_market.click(market_management_page_locator) await self.close_ad_if_exists_with_ESC_Key(self.page_for_market) await self.close_ant_modal_dialogs(self.page_for_market) self.logger.log("마켓관리리 페이지로 이동 완료.", level=logging.INFO) except Exception as e: self.logger.log(f"마켓관리리 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True) return try: # 각 핸들러에 초기화된 page 객체 전달. self.titleGenerator.update_page(self.page_for_upload) except Exception as e: self.logger.log(f"핸들러 업데이트 오류: {str(e)}", level=logging.ERROR, exc_info=True) return except Exception as e: self.logger.log(f"브라우저 시작 오류: {str(e)}", level=logging.ERROR, exc_info=True) return async def login(self) -> bool: """로그인 처리: 다양한 로그인 성공 지표를 확인합니다.""" is_admin = self.login_infos.get('is_admin', False) who = "관리자" if is_admin else "직원" self.logger.log(f'로그인 시도 중: {who} 계정', level=logging.INFO) # 필수 정보 누락 체크 required = ['admin_id', 'admin_pw'] if is_admin else ['admin_id', 'user_id', 'user_pw'] missing = [k for k in required if not self.login_infos.get(k)] if missing: msg = f'로그인 정보 누락: {", ".join(missing)}' self.logger.log(msg, level=logging.ERROR) self.browser_login_error.emit(msg) return False try: try: initial_body_html = await self.page.evaluate('() => document.body.innerHTML') initial_body_length = len(initial_body_html) self.logger.log(f'로그인 전 페이지 body 길이: {initial_body_length}', level=logging.DEBUG) except: initial_body_length = 0 # 1) 자격증명 입력 & 로그인 클릭 if is_admin: await self.page.fill(self.login_email_locator, self.login_infos['admin_id']) await self.page.fill(self.login_password_locator, self.login_infos['admin_pw']) await self.page.click(self.login_button_locator) else: admin_toggle = self.page.locator(self.admin_toggle_locator) if await admin_toggle.get_attribute("aria-checked") == "true": await admin_toggle.click() await self.page.fill(self.login_email_locator, self.login_infos['admin_id']) await self.page.fill(self.staff_id_locator, self.login_infos['user_id']) await self.page.fill(self.login_password_locator, self.login_infos['user_pw']) await self.page.click(self.staff_login_button_locator) self.logger.log(f'로그인 버튼 클릭 완료: {who} 계정', level=logging.DEBUG) # 2) 다양한 방법으로 로그인 성공 확인 # 방법 1: 기존 선택자들로 확인 success_selectors = [ '[role="menu"]', # ul[role="menu"]보다 더 포괄적 '[role="dialog"]', 'span:text("신규 상품 등록")', # 신규 상품 등록 텍스트 'span:text("등록 상품 관리")' # 등록 상품 관리 텍스트 ] try: # 여러 선택자를 동시에 기다림 (짧은 시간) selector_string = ', '.join(success_selectors) self.logger.log(f'로그인 성공 요소 대기 중... 선택자: {selector_string}', level=logging.DEBUG) locator = await self.page.wait_for_selector( selector_string, timeout=5000 # 5초로 단축 ) if locator: self.logger.log(f'로그인 성공: {who} 계정', level=logging.INFO) return True except Exception as wait_error: self.logger.log(f'메뉴 요소 대기 실패: {str(wait_error)}', level=logging.DEBUG) # DOM 변경 감지 try: new_body_html = await self.page.evaluate('() => document.body.innerHTML') new_body_length = len(new_body_html) body_change_ratio = abs(new_body_length - initial_body_length) / max(initial_body_length, 1) self.logger.log(f'로그인 후 페이지 body 길이: {new_body_length} (변경률: {body_change_ratio:.2%})', level=logging.DEBUG) # 페이지 내용이 10% 이상 변경되었다면 로그인 성공으로 간주 if body_change_ratio > 0.1: self.logger.log(f'페이지 내용 변경으로 로그인 성공 확인: {who} 계정 (변경률: {body_change_ratio:.2%})', level=logging.INFO) return True except Exception as dom_error: self.logger.log(f'DOM 변경 감지 중 오류: {dom_error}', level=logging.DEBUG) # 방법 5: 로그인 폼 사라짐 확인 try: login_form_exists = await self.page.query_selector(self.login_button_locator) is not None if not login_form_exists: self.logger.log(f'로그인 폼 사라짐으로 로그인 성공 확인: {who} 계정', level=logging.INFO) return True except: pass # 모든 방법으로 확인 실패 msg = f'로그인 실패: {who} 계정' self.logger.log(msg, level=logging.ERROR) self.logger.log(f'대기 오류 상세: {str(wait_error)}', level=logging.DEBUG) self.browser_login_error.emit(msg) await self.save_error_screenshot(f'login_failed_{who}') return False except Exception as e: # 클릭/입력 중 예외 msg = f'로그인 처리 중 오류: {who} 계정 – {e}' self.logger.log(msg, level=logging.ERROR, exc_info=True) self.browser_login_error.emit(msg) await self.save_error_screenshot(f'login_exception_{who}') return False async def close_browser(self): """브라우저 종료""" if self.browser: await self.browser.close() await self.playwright.stop() self.cleanup() self.logger.log('브라우저 종료됨.', level=logging.INFO) async def get_total_product_count(self): total_count = 0 items_per_page = 0 try: # total_count_elements = await self.page.query_selector_all(".sc-dOvA-dm.jqRNYf") total_count_element = await self.page.query_selector(self.total_product_count_locator) items_per_page_element = await self.page.query_selector(self.items_per_page_locator) self.logger.log(f"total_count_element : {total_count_element}", level=logging.DEBUG) if total_count_element: total_count_text = await total_count_element.inner_text() if "총" in total_count_text and "개 상품" in total_count_text: total_count = int(''.join(re.findall(r'\d+', total_count_text))) self.logger.log(f"총 상품수 확인: {total_count} 개", level=logging.INFO) # 페이지당 상품 수 추출 if items_per_page_element: items_per_page_text = await items_per_page_element.get_attribute("title") if items_per_page_text and "개씩 보기" in items_per_page_text: items_per_page = int(''.join(re.findall(r'\d+', items_per_page_text))) self.logger.log(f"페이지당 상품수 확인: {items_per_page} 개씩 보기", level=logging.INFO) # 결과 반환 if total_count: return {"total_count": total_count, "items_per_page": items_per_page} return {"total_count": 0, "items_per_page": 0} except Exception as e: self.logger.log(f"상품 수를 가져오는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True) return {"total_count": 0, "items_per_page": 0} async def close_ant_modal_dialogs(self, page, max_loops=3): """ antd 모달 다이얼로그에서 '다시 보지 않기', '닫기' 버튼을 최대 max_loops번 반복 클릭합니다. self.page, self.logger 사용 """ total_closed = 0 for loop in range(max_loops): closed_this_round = 0 # 다이얼로그 생성될 때까지 3초간 대기 (한 번만) try: await page.wait_for_selector('.ant-modal-root', timeout=3000) except Exception: self.logger.log("3초 대기 내에 .ant-modal-root 다이얼로그 미등장", level=logging.WARNING) # '다시 보지 않기' 버튼 (span 텍스트 포함) dont_show_btns = page.locator( '.ant-modal-root button span:text("다시 보지 않기")' ).locator('..') # span의 부모 button count1 = await dont_show_btns.count() for i in range(count1): await dont_show_btns.nth(i).click() closed_this_round += 1 self.logger.log(f'"다시 보지 않기" 버튼 클릭 (loop {loop+1}, {i+1}/{count1})', level=logging.INFO) await page.wait_for_timeout(200) # '닫기' 버튼 (span 텍스트 포함) close_btns = page.locator( '.ant-modal-root button span:text("닫기")' ).locator('..') count2 = await close_btns.count() for i in range(count2): await close_btns.nth(i).click() closed_this_round += 1 self.logger.log(f'"닫기" 버튼 클릭 (loop {loop+1}, {i+1}/{count2})', level=logging.INFO) await page.wait_for_timeout(200) total_closed += closed_this_round if closed_this_round == 0: break await page.wait_for_timeout(300) self.logger.log(f"총 {total_closed}개 다이얼로그 버튼 클릭 완료", level=logging.INFO) return total_closed > 0 async def close_ad_if_exists_with_ESC_Key(self, page): """ESC 키를 두 번 전송하여 다이얼로그를 닫는 메서드""" try: # JavaScript로 ESC 키 이벤트 발생 await page.evaluate(""" (() => { const escEvent = new KeyboardEvent('keydown', { key: 'Escape', code: 'Escape', keyCode: 27, which: 27, bubbles: true, cancelable: true }); document.dispatchEvent(escEvent); return true; })() """) time.sleep(1) await page.evaluate(""" (() => { const escEvent = new KeyboardEvent('keydown', { key: 'Escape', code: 'Escape', keyCode: 27, which: 27, bubbles: true, cancelable: true }); document.dispatchEvent(escEvent); return true; })() """) self.logger.log("ESC 키를 전송하여 다이얼로그를 닫았습니다.", level=logging.INFO) except Exception as e: self.logger.log(f"ESC 키 전송 중 오류: {e}", level=logging.ERROR, exc_info=True) async def close_ad_if_exists_new(self): """ 다이얼로그가 존재하면 해당 다이얼로그 내부의 "닫기" 버튼을 클릭하고, 다이얼로그를 찾지 못하거나 내부에 닫기 버튼이 없으면 페이지 전체에서 "닫기" 버튼을 검색하여 클릭하는 메서드 """ # 새로운 다이얼로그의 닫기(X) 버튼 먼저 시도 self.logger.log("새로운 다이얼로그의 닫기(X) 버튼 먼저 시도", level=logging.INFO) # 1. 모달 다이얼로그 처리 (ant-modal-wrap) try: # 모달 처리 JavaScript 코드 modal_js = """ () => { // 모달 찾기 const modalWraps = document.querySelectorAll('.ant-modal-wrap.ant-modal-centered'); if (modalWraps.length === 0) return 0; console.log(`${modalWraps.length}개의 모달 발견`); let closed = 0; // 각 모달에 대해 for (const modal of modalWraps) { try { // 1. 닫기 버튼 찾기 const closeBtn = modal.querySelector('.ant-modal-close'); if (closeBtn) { closeBtn.click(); console.log('모달 닫기 버튼 클릭'); closed++; continue; } // 2. '닫기' 텍스트 버튼 찾기 const buttons = modal.querySelectorAll('button'); let found = false; for (const btn of buttons) { if (btn.textContent.includes('닫기')) { btn.click(); console.log('닫기 텍스트 버튼 클릭'); closed++; found = true; break; } } if (found) continue; // 3. 마지막 수단: 모달 숨기기 modal.style.display = 'none'; const mask = document.querySelector('.ant-modal-mask'); if (mask) mask.style.display = 'none'; console.log('모달 강제 숨김'); closed++; } catch (e) { console.error('모달 처리 오류:', e); } } return closed; } """ modal_count = await self.page.evaluate(modal_js) if modal_count > 0: self.logger.log(f"{modal_count}개의 모달 다이얼로그 닫기 처리됨", level=logging.INFO) await self.page.wait_for_timeout(1000) # 모달이 사라질 때까지 대기 return True except Exception as e: self.logger.log(f"모달 다이얼로그 처리 중 오류: {e}", level=logging.INFO) # 2. drawer-close 버튼 처리 try: drawer_js = """ () => { const drawerCloseButtons = Array.from(document.querySelectorAll('button.ant-drawer-close')); let count = 0; drawerCloseButtons.forEach(button => { try { button.click(); console.log('drawer-close 버튼 클릭'); count++; } catch (e) { console.error('drawer-close 클릭 오류:', e); } }); return count; } """ drawer_count = await self.page.evaluate(drawer_js) if drawer_count > 0: self.logger.log(f"{drawer_count}개의 drawer-close 버튼 클릭됨", level=logging.INFO) await self.page.wait_for_timeout(1000) return True except Exception as e: self.logger.log(f"drawer-close 버튼 처리 중 오류: {e}", level=logging.INFO) # 3. 배경 클릭 시도 try: backdrop_js = """ () => { const backdropSelectors = [ '.ant-modal-mask', '.ant-drawer-mask', '.modal-backdrop', '.dialog-backdrop', '[role="dialog"] + div', '.overlay', '.modal-overlay' ]; let clicked = 0; for (const selector of backdropSelectors) { const backdrops = document.querySelectorAll(selector); for (const backdrop of backdrops) { try { backdrop.click(); console.log(`${selector} 배경 클릭`); clicked++; } catch (e) { console.error(`${selector} 클릭 오류:`, e); } } } return clicked; } """ backdrop_count = await self.page.evaluate(backdrop_js) if backdrop_count > 0: self.logger.log(f"{backdrop_count}개의 배경 요소 클릭됨", level=logging.INFO) await self.page.wait_for_timeout(1000) return True except Exception as e: self.logger.log(f"배경 클릭 처리 중 오류: {e}", level=logging.INFO) # 4. ESC 키 시도 try: await self.page.keyboard.down("Escape") await self.page.wait_for_timeout(100) await self.page.keyboard.up("Escape") self.logger.log("ESC 키 전송 (down-up 방식)", level=logging.INFO) await self.page.wait_for_timeout(1000) except Exception as e: self.logger.log(f"ESC 키 전송 중 오류: {e}", level=logging.INFO) # 5. 기존 다이얼로그 처리 (이전 코드) try: # 다이얼로그가 나타날 때까지 최대 3초 대기 dialog_element = await self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=3000, state='visible') self.logger.log("다이얼로그가 발견되었습니다. 내부의 닫기 버튼을 찾습니다.", level=logging.INFO) if dialog_element: # 다이얼로그 내부에서 "닫기" 텍스트를 가진 버튼을 xpath로 찾음 close_button = await dialog_element.query_selector("xpath=.//button[span[text()='닫기']]") if close_button: await close_button.click() self.logger.log("다이얼로그 내부의 닫기 버튼을 클릭하여 닫았습니다.", level=logging.INFO) return # 닫기 성공 시 함수 종료 else: self.logger.log("다이얼로그 내부에서 닫기 버튼을 찾지 못했습니다.", level=logging.WARNING) else: self.logger.log("대기 후에도 다이얼로그 엘리먼트를 찾지 못했습니다.", level=logging.WARNING) try: await self.page.keyboard.press("Escape") await self.page.wait_for_timeout(100) await self.page.dispatch_event("body", "keydown", { "key": "Escape", "code": "Escape", "keyCode": 27, "which": 27 }) await self.page.wait_for_timeout(50) await self.page.dispatch_event("body", "keyup", { "key": "Escape", "code": "Escape", "keyCode": 27, "which": 27 }) await self.page.wait_for_timeout(100) self.logger.log("ECS를 전송하여 닫았습니다.", level=logging.INFO) return except Exception as e: self.logger.log(f"닫기 중 오류: {e}", level=logging.ERROR, exc_info=True) except TimeoutError: self.logger.log("다이얼로그가 나타나지 않았습니다.", level=logging.INFO) except Exception as e: self.logger.log(f"다이얼로그 닫기 시도 중 오류 발생: {e}", level=logging.ERROR, exc_info=True) async def is_button_disabled(self, button): """버튼이 disabled 상태인지 확인""" try: # 버튼의 disabled 속성 확인 is_disabled = await button.get_attribute('disabled') return is_disabled is not None # disabled 속성이 있으면 True 반환 except Exception as e: self.logger.log(f"상품 수정 버튼 상태 확인 중 오류 발생: {e}", level=logging.ERROR, exc_info=True) return False # 오류 발생 시 기본적으로 활성화된 것으로 처리 async def select_group_index(self, group_index: int): """그룹 드롭다운 열고 옵션 선택""" try: self.logger.log(f"group_index : {group_index}", level=logging.INFO) await self.page.evaluate(""" const targetElement = Array.from(document.querySelectorAll('span')) .find(el => el.textContent.trim() === '수집 상품 목록'); if (targetElement) { targetElement.scrollIntoView({ behavior: 'smooth', block: 'center' }); } """) self.logger.log("그룹박스 요소로 스크롤", level=logging.INFO) await asyncio.sleep(1) group_option_locator = self.group_index_template.format(index=group_index+1) # 드롭다운 열기 await self.page.wait_for_selector(self.group_dropdown_locator, timeout=3000, state='visible') await self.page.click(self.group_dropdown_locator, timeout=3000, force=True) self.logger.log("드롭다운을 성공적으로 클릭했습니다.", level=logging.INFO) # 드롭다운 열림 상태 확인 await self.page.wait_for_selector(self.dropdown_openstatus_locator, timeout=3000) self.logger.log("드롭다운이 열렸습니다.", level=logging.INFO) # 옵션 선택 self.logger.log(f"[{group_option_locator}] : group_option_locator", level=logging.INFO) await self.page.click(group_option_locator, timeout=3000) self.logger.log(f"[{group_index}]번 그룹 선택 완료", level=logging.INFO) selected_group_name = await self.page.inner_text(self.selected_group_name_locator) self.logger.log(f"선택된 그룹 이름 : [{selected_group_name}]", level=logging.INFO) # 총 상품 개수 가져오기 product_count_info = await self.get_total_product_count() total_products = product_count_info.get("total_count", 0) items_per_page = product_count_info.get("items_per_page", 0) self.logger.log(f"총 상품 개수 : [{total_products}]", level=logging.INFO) except TimeoutError: self.logger.log("드롭다운 또는 옵션 선택 중 타임아웃이 발생했습니다.", level=logging.WARNING) screenshot_path = await self.save_error_screenshot("group_select_timeout") self.logger.log(f"그룹 선택 중 타임아웃이 발생했습니다. 스크린샷 저장됨: {screenshot_path}", level=logging.WARNING) except Exception as e: self.logger.log(f"그룹 선택 중 오류 발생: {e}", level=logging.ERROR, exc_info=True) async def get_group_names_list(self): """그룹 드롭다운을 열고 모든 그룹 이름 목록을 가져오기""" try: self.logger.log("그룹 이름 목록 가져오기 시작", level=logging.INFO) # 수집 상품 목록 요소로 스크롤 await self.page.evaluate(""" const targetElement = Array.from(document.querySelectorAll('span')) .find(el => el.textContent.trim() === '수집 상품 목록'); if (targetElement) { targetElement.scrollIntoView({ behavior: 'smooth', block: 'center' }); } """) self.logger.log("그룹박스 요소로 스크롤", level=logging.INFO) await asyncio.sleep(1) # 드롭다운 열기 await self.page.wait_for_selector(self.group_dropdown_locator, timeout=3000, state='visible') await self.page.click(self.group_dropdown_locator, timeout=3000, force=True) self.logger.log("드롭다운을 성공적으로 클릭했습니다.", level=logging.INFO) # 드롭다운 열림 상태 확인 await self.page.wait_for_selector(self.dropdown_openstatus_locator, timeout=3000) self.logger.log("드롭다운이 열렸습니다.", level=logging.INFO) # rc-virtual-list가 로딩될 때까지 잠시 대기 await asyncio.sleep(0.5) # 그룹 옵션들이 로딩될 때까지 대기 await self.page.wait_for_selector(self.group_options_selector_locator, timeout=3000) # 모든 그룹 이름 텍스트 가져오기 group_names = await self.page.evaluate(f""" () => {{ const elements = document.querySelectorAll('{self.group_options_selector_locator}'); return Array.from(elements).map(el => el.textContent.trim()); }} """) self.logger.log(f"발견된 그룹 목록: {group_names}", level=logging.INFO) # 드롭다운 닫기 (ESC 키 또는 다른 곳 클릭) await self.page.keyboard.press("Escape") self.logger.log("드롭다운 닫기 완료", level=logging.INFO) return group_names except TimeoutError: self.logger.log("그룹 목록 가져오기 중 타임아웃이 발생했습니다.", level=logging.WARNING) # 드롭다운이 열려있을 수 있으므로 ESC로 닫기 시도 save_screenshot_path = await self.save_error_screenshot("group_list_timeout") self.logger.log(f"self.group_options_selector_locator: {self.group_options_selector_locator}", level=logging.WARNING) self.logger.log(f"그룹 목록 가져오기 중 타임아웃이 발생했습니다. 스크린샷 저장됨: {save_screenshot_path}", level=logging.WARNING) try: await self.page.keyboard.press("Escape") except: pass return [] except Exception as e: self.logger.log(f"그룹 목록 가져오기 중 오류 발생: {e}", level=logging.ERROR, exc_info=True) # 드롭다운이 열려있을 수 있으므로 ESC로 닫기 시도 try: await self.page.keyboard.press("Escape") except: pass return [] async def get_product_edit_buttons_by_template(self): """현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기""" # 버튼 선택자 설정 # buttons = self.page.locator(self.product_edit_button_template) # memos = self.page.locator(self.product_memo_button_template) """ 현재 페이지의 세부사항 수정 및 업로드 버튼과 메모 버튼을 매칭하여 반환. 각 상품에 대해 해외배송비 수정 버튼과 메모 버튼을 추출합니다. """ try: # 수정 버튼 선택자 설정 # edit_buttons = self.page.locator('//button[span[text()="세부사항 수정 및 업로드"]]') # memo_buttons = self.page.locator('button.ant-btn.css-1li46mu.ant-btn-default.ant-btn-icon-only') edit_buttons = self.page.locator(self.product_edit_buttons) memo_buttons = self.page.locator(self.product_memo_buttons) # 수정 버튼 개수 확인 edit_button_count = await edit_buttons.count() if edit_button_count == 0: self.logger.log("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.", level=logging.WARNING) return [] # 메모 버튼 개수 확인 memo_button_count = await memo_buttons.count() if memo_button_count <= 2: # 첫 번째 버튼과 최소 상품 메모 버튼 제외 self.logger.log("메모 버튼이 유효하지 않습니다. 최소 3개 이상의 버튼에서 메모버튼이 유효합니다.", level=logging.WARNING) return [] # 첫 번째 버튼 제외하고 나머지 버튼을 처리 valid_memo_buttons = [memo_buttons.nth(i) for i in range(1, memo_button_count)] # 상품별 해외배송비 수정 버튼과 메모 버튼 매칭 product_buttons = [] index = 0 # valid_memo_buttons에서의 현재 인덱스 for i in range(edit_button_count): if index + 1 >= len(valid_memo_buttons): self.logger.log("메모 버튼의 개수가 부족합니다. 매칭에 실패했습니다.", level=logging.WARNING) break # 상품별 버튼 매칭 product_buttons.append({ "edit_button": edit_buttons.nth(i), "shipping_button": valid_memo_buttons[index], # 해외배송비 수정 버튼 "memo_button": valid_memo_buttons[index + 1] # 메모 버튼 }) index += 2 # 한 상품에 대해 두 개의 버튼(해외배송비 + 메모 버튼)을 소모 # 최종 결과 로그 self.logger.log(f"현재 페이지의 수정할 상품 개수: {len(product_buttons)}", level=logging.INFO) return product_buttons except Exception as e: self.logger.log(f"상품 수정 버튼을 찾는 중 오류: {e}", level=logging.ERROR, exc_info=True) return [] async def get_product_edit_buttons_by_template_new(self): """ 템플릿 방식의 CSS 선택자를 이용해 현재 페이지의 수정 및 메모 버튼을 동적으로 찾습니다. """ product_buttons = [] # 예시로, 상품 항목들이 li.product-item 형태로 존재한다고 가정합니다. product_items = self.page.locator("div#root li.product-item") count = await product_items.count() self.logger.log(f"현재 페이지에 {count}개의 상품 항목을 발견했습니다.", level=logging.INFO) for i in range(1, count + 1): edit_selector = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template').format(index=i) memo_selector = self.locator_manager.get_locator('BrowserControl', 'product_memo_button_template').format(index=i) edit_button = self.page.locator(edit_selector) memo_button = self.page.locator(memo_selector) if await edit_button.count() > 0: product_buttons.append({ "edit_button": edit_button, "memo_button": memo_button }) else: self.logger.log(f"상품 인덱스 {i}에 대해 수정 버튼을 찾지 못했습니다.", level=logging.WARNING) return product_buttons async def open_product_edit_dialog(self, button): """상품 수정 다이얼로그 열기""" try: # 요소가 화면에 없을 경우 스크롤하여 보이도록 함 await button.scroll_into_view_if_needed() self.logger.log("상품의 '세부사항 수정 및 업로드' 버튼을 화면에 보이도록 스크롤.", level=logging.DEBUG) await self.page.keyboard.press("Escape") self.logger.log("이전 다이얼로그 닫기 완료.", level=logging.INFO) await button.click() self.logger.log("세부사항 수정 다이얼로그 열기 완료.", level=logging.INFO) # await self.page.wait_for_selector('div.ant-tabs-nav') # 다이얼로그가 완전히 로딩될 때까지 기다림 await self.page.wait_for_selector(self.product_edit_dialog_open_locator) # 다이얼로그가 완전히 로딩될 때까지 기다림 except Exception as e: self.logger.log(f"세부사항 수정 다이얼로그 열기 중 오류: {e}", level=logging.ERROR, exc_info=True) # 에러 발생 시 스크린샷 저장 screenshot_path = await self.save_error_screenshot() self.logger.log(f"세부사항 수정 다이얼로그 열기 중 오류 발생으로 스크린샷 저장됨: {screenshot_path}", level=logging.INFO) async def save_error_screenshot(self, error_tag="error"): """에러 발생시 에러스크린샷 저장""" today = datetime.now().strftime('%Y%m%d') date_dir = os.path.join(self.ERROR_SCREENSHOT_DIR, today) os.makedirs(date_dir, exist_ok=True) timestamp = int(time.time()) filename = f"screenshot_{error_tag}_{timestamp}.png" path = os.path.join(date_dir, filename) await self.page.screenshot(path=path) return path def clear_old_screenshot_dirs(self, days=14): """에러스크린샷 디렉토리에서 지정일(기본 14일) 지난 하위폴더 삭제""" now = datetime.now() for dir_name in os.listdir(self.ERROR_SCREENSHOT_DIR): dir_path = os.path.join(self.ERROR_SCREENSHOT_DIR, dir_name) if os.path.isdir(dir_path): try: # 폴더명 YYYYMMDD로 파싱 dir_date = datetime.strptime(dir_name, '%Y%m%d') if now - dir_date > timedelta(days=days): # 오래된 폴더 삭제 import shutil shutil.rmtree(dir_path) self.logger.log(f"[스크린샷] {dir_path} 삭제됨", level=logging.INFO) except Exception as e: self.logger.log(f"[스크린샷] {dir_path} 삭제 중 오류: {e}", level=logging.ERROR, exc_info=True) async def click_title_tab(self): """상품명 탭 클릭""" try: await self.page.click(self.title_tab_locator) self.logger.log("상품명 탭 클릭 완료.", level=logging.INFO) except Exception as e: self.logger.log(f"상품명 탭 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True) async def save_and_ecs_product_edit(self): """상품 수정 후 저장 버튼 클릭 및 ESC 전송, 그리고 다이얼로그 닫힘 버튼 클릭을 통한 다이얼로그 종료 처리""" try: # 저장 버튼 클릭 await self.page.click(self.save_button_locator) self.logger.log("저장 버튼 클릭 완료.", level=logging.INFO) # 저장 동작 완료를 위한 잠시 대기 await asyncio.sleep(0.5) # ESC 키를 최대 3회 전송하여 다이얼로그가 닫혔는지 확인 max_attempts = 3 for attempt in range(max_attempts): await self.page.keyboard.press("Escape") self.logger.log(f"ESC 키 전송 시도 {attempt+1}/{max_attempts}", level=logging.DEBUG) await asyncio.sleep(0.5) if not await self.page.is_visible(self.save_button_locator): self.logger.log("상품 수정 다이얼로그가 성공적으로 닫혔습니다.", level=logging.INFO) break else: self.logger.log("상품 수정 다이얼로그가 여전히 열려있습니다.", level=logging.WARNING) else: # ESC 전송 후에도 다이얼로그가 닫히지 않은 경우, product_dialog_close_btn을 클릭 self.logger.log("ESC 시도 후에도 다이얼로그가 닫히지 않아, 닫힘 버튼 클릭 시도합니다.", level=logging.INFO) if await self.page.is_visible(self.product_dialog_close_btn): await self.page.click(self.product_dialog_close_btn) self.logger.log("닫힘 버튼 클릭 완료.", level=logging.INFO) else: self.logger.log("닫힘 버튼을 찾지 못했습니다.", level=logging.ERROR) except Exception as e: self.logger.log(f"저장 및 다이얼로그 종료 처리 중 오류 발생: {e}", level=logging.ERROR, exc_info=True) # 에러 발생 시 스크린샷 저장 screenshot_path = await self.save_error_screenshot() self.logger.log(f"저장 및 다이얼로그 종료 처리 중 에러 발생 으로 스크린샷 저장됨: {screenshot_path}", level=logging.INFO) async def save_product_edit(self): """상품 수정 후 저장 버튼 클릭""" try: await self.page.click(self.save_button_locator) self.logger.log("상품 수정 내용 저장 완료.", level=logging.INFO) except Exception as e: self.logger.log(f"저장 버튼 클릭 중 오류: {e}", level=logging.ERROR, exc_info=True) async def go_to_next_page(self): """ 수정된 CSS 선택자를 사용하여 다음 페이지로 이동합니다. """ try: current_page_element = await self.page.query_selector(self.current_page_locator) if not current_page_element: self.logger.log("현재 페이지 정보를 찾을 수 없습니다.", level=logging.WARNING) return False current_page_number = int(await current_page_element.get_attribute("title")) self.logger.log(f"현재 페이지 번호: {current_page_number}", level=logging.INFO) next_page_number = current_page_number + 1 next_page_button_locator = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template').format(page_number=next_page_number) self.logger.log(f"다음 페이지 선택자: {next_page_button_locator}", level=logging.DEBUG) next_page_button = await self.page.query_selector(next_page_button_locator) if next_page_button: await next_page_button.click() await self.page.wait_for_load_state('domcontentloaded', timeout=5000) self.logger.log(f"페이지 {next_page_number}로 이동 완료.", level=logging.INFO) return True else: self.logger.log("다음 페이지가 없습니다.", level=logging.WARNING) return False except Exception as e: self.logger.log(f"다음 페이지로 이동 중 오류 발생: {e}", level=logging.ERROR, exc_info=True) return False async def scroll_with_wheel(self, direction="down", pause_time=0.5, max_scrolls=50): """ 휠 스크롤을 사용하여 페이지를 위나 아래로 천천히 스크롤. Parameters: - direction: 스크롤 방향 ("down"은 아래로, "up"은 위로). - pause_time: 스크롤 사이의 대기 시간 (초). - max_scrolls: 최대 스크롤 횟수. """ scroll_count = 0 self.logger.log(f"스크롤 시작", level=logging.DEBUG) # 현재 페이지 높이 가져오기 last_height = await self.page.evaluate("document.body.scrollHeight") self.logger.log(f"현재 페이지 높이 가져오기 - {last_height}", level=logging.DEBUG) while scroll_count < max_scrolls: if direction == "down": # 아래로 스크롤 self.logger.log(f"scroll_count[{scroll_count}]회 : 휠 아래로 1000px", level=logging.DEBUG) await self.page.evaluate("window.scrollBy(0, 1000);") elif direction == "up": # 위로 스크롤 self.logger.log(f"scroll_count[{scroll_count}]회 : 휠 위로 1000px", level=logging.DEBUG) await self.page.evaluate("window.scrollBy(0, -1000);") else: raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.") self.logger.log(f"pause_time 슬립 : {pause_time}", level=logging.DEBUG) await asyncio.sleep(pause_time) # 새로운 페이지 높이 가져오기 new_height = await self.page.evaluate("document.body.scrollHeight") self.logger.log(f"새로운 페이지 높이 가져오기 - {new_height}", level=logging.DEBUG) # 스크롤이 더 이상 필요 없는 경우(페이지 끝에 도달) if direction == "down" and new_height == last_height: self.logger.log(f"페이지 끝에 도달했습니다. 스크롤 횟수: {scroll_count}", level=logging.DEBUG) break elif direction == "up" and new_height == 0: self.logger.log(f"페이지 시작에 도달했습니다. 스크롤 횟수: {scroll_count}", level=logging.DEBUG) break self.logger.log(f"새로운 페이지 높이를 현재높이로 재설정 - {new_height}", level=logging.DEBUG) last_height = new_height scroll_count += 1 self.logger.log(f"스크롤 카운트 + 1", level=logging.DEBUG) if scroll_count == max_scrolls: self.logger.log("최대 스크롤 횟수에 도달했습니다.", level=logging.DEBUG) async def scroll_with_keyboard(self, direction="down", pause_time=0.5, max_scrolls=50): """ 키보드를 사용하여 페이지를 위나 아래로 천천히 스크롤. Parameters: - direction: 스크롤 방향 ("down"은 아래로, "up"은 위로). - pause_time: 스크롤 사이의 대기 시간 (초). - max_scrolls: 최대 스크롤 횟수. """ scroll_count = 0 while scroll_count < max_scrolls: if direction == "down": # 아래로 스크롤 (Page Down 키 사용) await self.page.keyboard.press("PageDown") elif direction == "up": # 위로 스크롤 (Page Up 키 사용) await self.page.keyboard.press("PageUp") else: raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.") await asyncio.sleep(pause_time) scroll_count += 1 if scroll_count == max_scrolls: self.logger.log("최대 스크롤 횟수에 도달했습니다.", level=logging.DEBUG) async def collect_product_info(self, items_per_page, ed_mode): """ 상품 정보를 수집하는 메서드 """ try: product_infos = [] product_name_elements = [] # product_name_element를 저장할 리스트 # ed_mode에 따라 product_elements 설정 if ed_mode: # 각 상품의 이름, 가격, 이미지를 위한 선택자 리스트 구성 (index가 2부터 시작) product_elements = [ { "name": self.product_name_for_ed_template.format(index=i), "price": self.product_price_for_ed_template.format(index=i), "image": self.product_image_for_ed_template.format(index=i) } for i in range(2, items_per_page + 2) # index가 2부터 시작하도록 설정 ] else: # ed_mode=False일 때는 각 상품의 부모 요소를 모두 선택 product_elements = await self.page.query_selector_all(self.product_parent_locator) for i, element in enumerate(product_elements[:items_per_page], start=1): try: if ed_mode: # ed_mode=True일 때는 각 상품의 개별 선택자 사용 product_name_element = await self.page.wait_for_selector(element["name"], timeout=3000, state="attached") product_price_element = await self.page.wait_for_selector(element["price"], timeout=3000, state="attached") product_image_element = await self.page.wait_for_selector(element["image"], timeout=3000, state="attached") else: # ed_mode=False일 때 부모 요소 내의 선택자를 사용 product_name_element = await self.page.wait_for_selector(self.product_name_inner_locator, timeout=3000, state="attached") product_price_element = await self.page.wait_for_selector(self.product_price_inner_locator, timeout=3000, state="attached") product_image_element = await self.page.wait_for_selector(self.product_image_inner_locator, timeout=3000, state="attached") # 요소가 존재하면 정보 추출 self.logger.log(f"product_name_element : {product_name_element}", level=logging.DEBUG) self.logger.log(f"product_price_element : {product_price_element}", level=logging.DEBUG) self.logger.log(f"product_image_element : {product_image_element}", level=logging.DEBUG) if product_name_element and product_price_element and product_image_element: # await의 결과를 각 변수에 저장 product_name_text = (await product_name_element.inner_text()).strip() product_price_text = (await product_price_element.inner_text()).strip() product_image_url = await product_image_element.get_attribute('src') # product_info 딕셔너리에 결과 저장 product_info = { "name": product_name_text, "price": product_price_text, "image_url": product_image_url } self.logger.log(f"상품 {i}: {product_info}", level=logging.DEBUG) product_infos.append(product_info) product_name_elements.append(product_name_element) # 각 product_name_element 추가 except Exception as e: self.logger.log(f"상품 {i} 정보 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True) continue return product_infos, product_name_elements # product_infos와 product_name_elements 함께 반환 except Exception as e: self.logger.log(f"상품 정보 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True) return [] async def scroll_page_to_bottom(self, pause_time=0.2): """페이지의 맨 아래까지 스크롤하여 모든 동적 요소를 로드""" self.logger.log('페이지 스크롤 시작...', level=logging.INFO) previous_height = await self.page.evaluate("() => document.body.scrollHeight") while True: await self.page.evaluate("window.scrollBy(0, window.innerHeight);") # 한 화면씩 스크롤 await asyncio.sleep(pause_time) # 페이지 로딩 대기 current_height = await self.page.evaluate("() => document.body.scrollHeight") if current_height == previous_height: break # 더 이상 스크롤할 내용이 없으면 종료 previous_height = current_height self.logger.log('페이지 스크롤 완료.', level=logging.INFO) async def scroll_page_to_top(self, pause_time=0.2): """페이지의 맨 위까지 스크롤""" self.logger.log('페이지 위로 스크롤 시작...', level=logging.INFO) try: # 타임아웃 설정으로 API 대기 중에도 안전하게 동작 previous_height = await asyncio.wait_for( self.page.evaluate("() => window.pageYOffset"), timeout=5.0 ) retry_count = 0 max_retries = 10 # 최대 재시도 횟수 while previous_height > 0 and retry_count < max_retries: try: # 스크롤 실행에도 타임아웃 적용 await asyncio.wait_for( self.page.evaluate("window.scrollBy(0, -window.innerHeight);"), timeout=3.0 ) await asyncio.sleep(pause_time) # 현재 높이 체크에도 타임아웃 적용 current_height = await asyncio.wait_for( self.page.evaluate("() => window.pageYOffset"), timeout=3.0 ) if current_height == previous_height: break # 더 이상 스크롤할 내용이 없으면 종료 previous_height = current_height retry_count += 1 except asyncio.TimeoutError: self.logger.log(f'스크롤 중 타임아웃 발생 (시도 {retry_count + 1}/{max_retries})', level=logging.WARNING) retry_count += 1 await asyncio.sleep(1.0) # 타임아웃 후 잠시 대기 # 페이지 상태 재확인 try: previous_height = await asyncio.wait_for( self.page.evaluate("() => window.pageYOffset"), timeout=2.0 ) except asyncio.TimeoutError: self.logger.log('페이지 상태 확인 실패, 스크롤 중단', level=logging.WARNING) break if retry_count >= max_retries: self.logger.log('최대 재시도 횟수 도달, 스크롤 강제 종료', level=logging.WARNING) else: self.logger.log('페이지 위로 스크롤 완료.', level=logging.INFO) except asyncio.TimeoutError: self.logger.log('페이지 스크롤 초기화 중 타임아웃 발생, 스크롤 건너뛰기', level=logging.WARNING) except Exception as e: self.logger.log(f'페이지 스크롤 중 예외 발생: {e}', level=logging.ERROR) # 에러가 발생해도 작업을 계속 진행 async def start_Percenty_task(self): self.logger.log('퍼센티 상품수정 작업을 시작합니다...', level=logging.DEBUG) self.running = True # 번역 작업이 시작됨 try: # 금지어 목록 업데이트 self.forbidden_word_manager.refresh_forbidden_words() # 웨일 브라우저 시작 - 최대 3번 재시도 max_retries = 3 retry_count = 0 whale_window = None while retry_count < max_retries: self.check_pause() # 일시중지 상태 확인 self.logger.log(f'웨일 브라우저 시작 시도 {retry_count + 1}/{max_retries}...', level=logging.INFO) whale_window = self.whale_translator.start_trans_browser() if whale_window: self.logger.log('웨일 브라우저가 성공적으로 시작되었습니다.', level=logging.INFO) break retry_count += 1 self.logger.log(f'웨일 브라우저 시작 실패. {retry_count}/{max_retries}', level=logging.WARNING) await asyncio.sleep(1) # 잠시 대기 후 재시도 if not whale_window: error_msg = "웨일 브라우저를 시작할 수 없습니다. 상품수정을 중단합니다." self.logger.log(error_msg, level=logging.ERROR) return if whale_window and self.toggle_states['collect_method_combo'] == "lens": self.whale_translator.check_capcha() # 1. 총 상품 수 수집 self.check_pause() # 일시중지 상태 확인 await self.scroll_page_to_bottom() # 동적 로딩을 위해 끝까지 스크롤 # total_products = await self.browser_controller.get_total_product_count(ed_mode=self.toggle_states['ed_mode']) # get_total_product_count 메서드 호출 후 결과를 딕셔너리로 받음 result = await self.get_total_product_count() # 딕셔너리에서 총 상품 수와 페이지당 상품 수를 추출 total_products = result.get("total_count", 0) items_per_page = result.get("items_per_page", 0) total_pages = math.ceil(total_products / items_per_page) self.logger.log(f"총 상품: {total_products}, 페이지당: {items_per_page}, 총 페이지: {total_pages}", level=logging.DEBUG) self.logger.log(f"[ self.toggle_states 상태 ]\n{self.toggle_states}", level=logging.DEBUG) if total_products == 0: self.logger.log('수집할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG) return completed_count = 0 page_number = 1 # 3. 총 상품 수만큼 반복 작업 수행 # while self.running and completed_count < total_products: for page_number in range(1, total_pages + 1): self.check_pause() # 일시중지 상태 확인 # self.logger.log(f'현재 페이지: {page_number}', level=logging.DEBUG) self.logger.log(f'=== 페이지 {page_number}/{total_pages} 시작 ===', level=logging.INFO) await self.scroll_page_to_top() self.logger.log(f'동적로딩을 위해 휠 스크롤 업', level=logging.DEBUG) if not self.toggle_states['ed_mode']: # 4. 현재 페이지의 모든 "세부사항 수정 및 업로드" 버튼 찾기 self.logger.log('수정모드가 아니므로 상품수정 버튼 elements를 수집합니다.', level=logging.DEBUG) product_buttons = await self.get_product_edit_buttons_by_template() else: self.logger.log('상품정보 수집', level=logging.DEBUG) product_infos, product_name_elements = await self.collect_product_info(items_per_page, ed_mode=self.toggle_states['ed_mode']) self.logger.log(f"product_infos : {product_infos}", level=logging.DEBUG) self.logger.log('수정모드이므로 상품명 elements를 수정버튼으로 활용합니다.', level=logging.DEBUG) product_buttons = [{"edit_button": name_element, "memo_button": None, "shipping_button": None} for name_element in product_name_elements] self.logger.log(f"product_buttons 갯수 : [{len(product_buttons)}]개", level=logging.DEBUG) if not product_buttons: self.logger.log('수정할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG) break # 5. 각 상품에 대해 상품수정작업 수행 for index, button_set in enumerate(product_buttons, start=1): try: self.check_pause() # 일시중지 상태 확인 edit_button = button_set.get("edit_button") memo_button = button_set.get("memo_button") shipping_button = button_set.get("shipping_button") # 해외배송비 수정 버튼 # 상태 초기화 코드 추가 self.titleGenerator.reset_state() # 그 외 임시 변수 초기화 self.current_options_info = None # 상품명 수집 오류 처리 self.logger.log(f'{index}/{len(product_buttons)} 버튼의 활성상태 확인 중...', level=logging.DEBUG) is_disabled = await self.is_button_disabled(edit_button) if is_disabled: self.logger.log(f'{index}/{len(product_buttons)}: 상품의 수정버튼이 비활성화되어 있어 작업을 건너뜁니다.', level=logging.DEBUG) # completed_count += 1 continue self.logger.log(f'{index}/{len(product_buttons)}: 세부사항 수정 작업 중...', level=logging.DEBUG) # 상품 수정 다이얼로그 열기 await self.open_product_edit_dialog(edit_button) self.check_pause() # 일시중지 상태 확인 title_infos = await self.titleGenerator.get_initial_info(self.price_setting_diag) self.logger.log(f"title_infos : {title_infos}", level=logging.DEBUG) # 금지카테고리 여부가 True이면, 금지카테고리 제목 설정 후 저장하고 다음 상품으로 넘어감. if title_infos.get("is_banned_category", False): banned_title = self.titleGenerator.set_banned_category_title(title_infos.get("banned_category_info")) self.logger.log(f"금지카테고리 상품 처리: 새 제목: {banned_title}", level=logging.INFO) # 랜덤 4자리 붙이기 random_suffix = self.generate_random_suffix() # 상품명 설정 (TitleGenerator 내부 또는 별도 메서드를 통해) is_set = await self.titleGenerator.set_product_name(banned_title + random_suffix) if is_set: self.logger.log("금지카테고리 상품명 설정 완료.", level=logging.INFO) else: self.logger.log("금지카테고리 상품명 설정 실패.", level=logging.WARNING) await self.save_and_ecs_product_edit() # completed_count += 1 continue # 다음 상품으로 넘어감 # 정상상품이면 상품명 수정과 카테고리 수집 is_title = self.toggle_states['title'] is_title_shuffle = self.toggle_states['title_shuffle'] if is_title or is_title_shuffle: self.check_pause() # 일시중지 상태 확인 self.logger.log(f"상품명 수정 : {is_title} ", level=logging.DEBUG) title_infos["generated_name"] = await self.titleGenerator.process_title() # 옵션 수정 is_optionTrnas = self.toggle_states.get('optionTrnas') is_optionIMGTrans = self.toggle_states.get('optionIMGTrans') is_optionAutoSelect = self.toggle_states.get('optionAutoSelect') if is_optionTrnas or is_optionIMGTrans or is_optionAutoSelect: self.check_pause() # 일시중지 상태 확인 self.logger.log(f"옵션수정 : optionTrnas={is_optionTrnas} + optionIMGTrans={is_optionIMGTrans} + optionAutoSelect={is_optionAutoSelect}", level=logging.DEBUG) await self.edit_option(title_infos.get("original_name", None)) # 가격 수정 is_price = self.toggle_states.get('price') if is_price: self.check_pause() # 일시중지 상태 확인 self.logger.log(f"가격수정 : {is_price} ", level=logging.DEBUG) await self.edit_price(title_infos) # 썸네일 수정 thumb = self.toggle_states.get('thumb') self.logger.log(f"썸네일수정 : {thumb} ", level=logging.DEBUG) if thumb: self.check_pause() # 일시중지 상태 확인 await self.edit_thumb() # 태그 수정 tag = self.toggle_states.get('tag') if tag: self.check_pause() # 일시중지 상태 확인 self.logger.log(f"키워드태그 수정 : {tag} ", level=logging.DEBUG) await self.edit_tags(title_infos) # 상세페이지 수정 수정 detail_Option = self.toggle_states.get('detail_Option') detail_IMGTrans = self.toggle_states.get('detail_IMGTrans') if detail_Option or detail_IMGTrans: self.check_pause() # 일시중지 상태 확인 self.logger.log(f"상세페이지 수정 : {detail_Option} + {detail_IMGTrans}", level=logging.DEBUG) # 상세페이지 수정 await self.edit_detail(detail_Option, detail_IMGTrans) # 수정 후 저장 self.logger.log('상품 세부사항 저장 중...', level=logging.DEBUG) await self.save_and_ecs_product_edit() # 메모 입력 if memo_button and self.toggle_states['memo']: self.check_pause() # 일시중지 상태 확인 self.logger.log(f'{index}/{len(product_buttons)}: 메모 입력 중...', level=logging.DEBUG) await self.insert_product_infos_to_memo(memo_button, title_infos) except Exception as item_err: # 이 상품만 실패로 기록하고, 다음 상품으로 self.logger.log(f" ▶ 상품 {index}/{len(product_buttons)} 처리중 오류: {item_err}", level=logging.ERROR, exc_info=True) finally: # 성공·실패 관계없이 진행 카운트와 프로그레스바 업데이트 completed_count += 1 self.logger.log(f'{completed_count}/[{total_products}]개 상품 수정 완료.', level=logging.INFO) if (completed_count % 10) == 0: gc.collect() mem = psutil.virtual_memory() self.logger.log(f"GC 호출 후, 메모리 사용량: {mem.percent}% 사용중", level=logging.DEBUG) # 목표 도달 체크 if completed_count >= total_products: break # (3) 다음 페이지로 이동 if page_number < total_pages: success = await self.go_to_next_page() if not success: err = f"페이지 {page_number} → {page_number+1} 이동 실패" self.logger.log(err, level=logging.ERROR) return else: self.logger.log("마지막 페이지까지 모두 처리했습니다.", level=logging.INFO) except Exception as fatal: # 전체 작업 중에 발생한 치명적 예외 self.logger.log(f"전체 작업 중 오류: {fatal}", level=logging.ERROR, exc_info=True) else: # 예외 없이 ‘정상적으로' 전체 루프를 마쳤을 때 self.logger.log(f"모든 상품({completed_count}/{total_products}) 처리 완료", level=logging.INFO) finally: # 브라우저나 리소스는 항상 닫아 주기 try: self.whale_translator.close_trans_browser() except: pass async def start_shuffle_upload_task(self): self.logger.log('셔플업로드 작업을 시작합니다...', level=logging.DEBUG) self.running = True # 번역 작업이 시작됨 try: await self.scroll_page_to_bottom() # 동적 로딩을 위해 끝까지 스크롤 # get_total_product_count 메서드 호출 후 결과를 딕셔너리로 받음 result = await self.get_total_product_count() # 딕셔너리에서 총 상품 수와 페이지당 상품 수를 추출 total_products = result.get("total_count", 0) items_per_page = result.get("items_per_page", 0) total_pages = math.ceil(total_products / items_per_page) self.logger.log(f"총 상품: {total_products}, 페이지당: {items_per_page}, 총 페이지: {total_pages}", level=logging.DEBUG) if total_products == 0: self.logger.log('셔플할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG) return completed_count = 0 page_number = 1 # 3. 총 상품 수만큼 반복 작업 수행 for page_number in range(1, total_pages + 1): self.logger.log(f'=== 페이지 {page_number}/{total_pages} 시작 ===', level=logging.INFO) if page_number != 1: await self.scroll_page_to_top() self.logger.log(f'동적로딩을 위해 휠 스크롤 업', level=logging.DEBUG) self.logger.log('등록 상품 등록 페이지로 이동 중...', level=logging.INFO) # await self.go_to_new_product_page() try: current_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'current_product_page_locator') await self.page.click(current_product_page_locator) await self.close_ad_if_exists_with_ESC_Key() await self.close_ant_modal_dialogs() self.logger.log("등록 상품 등록 페이지로 이동 완료.", level=logging.INFO) # self.logger.log(f"신규 상품 등록 페이지 이동 중 오류: {e}", level=logging.ERROR, exc_info=True) except Exception as e: self.logger.log(f"광고 닫기 오류: {str(e)}", level=logging.ERROR, exc_info=True) return # 그룹선택 버튼 클릭 group_select_button = self.toggle_states['group_select_button'] if group_select_button: self.logger.log('그룹선택 버튼 클릭 중...', level=logging.INFO) await self.page.click(group_select_button) self.logger.log('그룹선택 버튼 클릭 완료.', level=logging.INFO) if not self.toggle_states['ed_mode']: # 4. 현재 페이지의 모든 "세부사항 수정 및 업로드" 버튼 찾기 self.logger.log('수정모드가 아니므로 상품수정 버튼 elements를 수집합니다.', level=logging.DEBUG) product_buttons = await self.get_product_edit_buttons_by_template() else: self.logger.log('상품정보 수집', level=logging.DEBUG) product_infos, product_name_elements = await self.collect_product_info(items_per_page, ed_mode=self.toggle_states['ed_mode']) self.logger.log(f"product_infos : {product_infos}", level=logging.DEBUG) self.logger.log('수정모드이므로 상품명 elements를 수정버튼으로 활용합니다.', level=logging.DEBUG) product_buttons = [{"edit_button": name_element, "memo_button": None, "shipping_button": None} for name_element in product_name_elements] self.logger.log(f"product_buttons 갯수 : [{len(product_buttons)}]개", level=logging.DEBUG) if not product_buttons: self.logger.log('수정할 상품이 없습니다. 작업을 종료합니다.', level=logging.DEBUG) break # 5. 각 상품에 대해 상품수정작업 수행 for index, button_set in enumerate(product_buttons, start=1): try: self.check_pause() # 일시중지 상태 확인 edit_button = button_set.get("edit_button") memo_button = button_set.get("memo_button") shipping_button = button_set.get("shipping_button") # 해외배송비 수정 버튼 # 상태 초기화 코드 추가 self.titleGenerator.reset_state() # 그 외 임시 변수 초기화 self.current_options_info = None # 상품명 수집 오류 처리 self.logger.log(f'{index}/{len(product_buttons)} 버튼의 활성상태 확인 중...', level=logging.DEBUG) is_disabled = await self.is_button_disabled(edit_button) if is_disabled: self.logger.log(f'{index}/{len(product_buttons)}: 상품의 수정버튼이 비활성화되어 있어 작업을 건너뜁니다.', level=logging.DEBUG) # completed_count += 1 continue self.logger.log(f'{index}/{len(product_buttons)}: 세부사항 수정 작업 중...', level=logging.DEBUG) # 상품 수정 다이얼로그 열기 await self.open_product_edit_dialog(edit_button) self.check_pause() # 일시중지 상태 확인 title_infos = await self.titleGenerator.get_initial_info(self.price_setting_diag) self.logger.log(f"title_infos : {title_infos}", level=logging.DEBUG) # 금지카테고리 여부가 True이면, 금지카테고리 제목 설정 후 저장하고 다음 상품으로 넘어감. # if title_infos.get("is_banned_category", False): if title_infos.get("is_banned_category", False): banned_title = self.titleGenerator.set_banned_category_title(title_infos.get("banned_category_info")) self.logger.log(f"금지카테고리 상품 처리: 새 제목: {banned_title}", level=logging.INFO) # 랜덤 4자리 붙이기 random_suffix = self.generate_random_suffix() # 상품명 설정 (TitleGenerator 내부 또는 별도 메서드를 통해) is_set = await self.titleGenerator.set_product_name(banned_title + random_suffix) if is_set: self.logger.log("금지카테고리 상품명 설정 완료.", level=logging.INFO) else: self.logger.log("금지카테고리 상품명 설정 실패.", level=logging.WARNING) await self.save_and_ecs_product_edit() # completed_count += 1 continue # 다음 상품으로 넘어감 # 정상상품이면 상품명 수정과 카테고리 수집 is_title = self.toggle_states['title'] is_title_shuffle = self.toggle_states['title_shuffle'] if is_title or is_title_shuffle: self.check_pause() # 일시중지 상태 확인 self.logger.log(f"상품명 수정 : {is_title} ", level=logging.DEBUG) title_infos["generated_name"] = await self.titleGenerator.process_title() # 옵션 수정 is_optionTrnas = self.toggle_states.get('optionTrnas') is_optionIMGTrans = self.toggle_states.get('optionIMGTrans') is_optionAutoSelect = self.toggle_states.get('optionAutoSelect') if is_optionTrnas or is_optionIMGTrans or is_optionAutoSelect: self.check_pause() # 일시중지 상태 확인 self.logger.log(f"옵션수정 : optionTrnas={is_optionTrnas} + optionIMGTrans={is_optionIMGTrans} + optionAutoSelect={is_optionAutoSelect}", level=logging.DEBUG) await self.edit_option(title_infos.get("original_name", None)) # 가격 수정 is_price = self.toggle_states.get('price') if is_price: self.check_pause() # 일시중지 상태 확인 self.logger.log(f"가격수정 : {is_price} ", level=logging.DEBUG) await self.edit_price(title_infos) # 썸네일 수정 thumb = self.toggle_states.get('thumb') self.logger.log(f"썸네일수정 : {thumb} ", level=logging.DEBUG) if thumb: self.check_pause() # 일시중지 상태 확인 await self.edit_thumb() # 태그 수정 tag = self.toggle_states.get('tag') if tag: self.check_pause() # 일시중지 상태 확인 self.logger.log(f"키워드태그 수정 : {tag} ", level=logging.DEBUG) await self.edit_tags(title_infos) # 상세페이지 수정 수정 detail_Option = self.toggle_states.get('detail_Option') detail_IMGTrans = self.toggle_states.get('detail_IMGTrans') if detail_Option or detail_IMGTrans: self.check_pause() # 일시중지 상태 확인 self.logger.log(f"상세페이지 수정 : {detail_Option} + {detail_IMGTrans}", level=logging.DEBUG) # 상세페이지 수정 await self.edit_detail(detail_Option, detail_IMGTrans) # 수정 후 저장 self.logger.log('상품 세부사항 저장 중...', level=logging.DEBUG) await self.save_and_ecs_product_edit() # 메모 입력 if memo_button and self.toggle_states['memo']: self.check_pause() # 일시중지 상태 확인 self.logger.log(f'{index}/{len(product_buttons)}: 메모 입력 중...', level=logging.DEBUG) await self.insert_product_infos_to_memo(memo_button, title_infos) except Exception as item_err: # 이 상품만 실패로 기록하고, 다음 상품으로 self.logger.log(f" ▶ 상품 {index}/{len(product_buttons)} 처리중 오류: {item_err}", level=logging.ERROR, exc_info=True) finally: # 성공·실패 관계없이 진행 카운트와 프로그레스바 업데이트 completed_count += 1 self.logger.log(f'{completed_count}/[{total_products}]개 상품 수정 완료.', level=logging.INFO) if (completed_count % 10) == 0: gc.collect() mem = psutil.virtual_memory() self.logger.log(f"GC 호출 후, 메모리 사용량: {mem.percent}% 사용중", level=logging.DEBUG) # 목표 도달 체크 if completed_count >= total_products: break # (3) 다음 페이지로 이동 if page_number < total_pages: success = await self.go_to_next_page() if not success: err = f"페이지 {page_number} → {page_number+1} 이동 실패" self.logger.log(err, level=logging.ERROR) return else: self.logger.log("마지막 페이지까지 모두 처리했습니다.", level=logging.INFO) except Exception as fatal: # 전체 작업 중에 발생한 치명적 예외 self.logger.log(f"전체 작업 중 오류: {fatal}", level=logging.ERROR, exc_info=True) else: # 예외 없이 '정상적으로' 전체 루프를 마쳤을 때 self.logger.log(f"모든 상품({completed_count}/{total_products}) 처리 완료", level=logging.INFO) finally: # 브라우저나 리소스는 항상 닫아 주기 try: self.whale_translator.close_trans_browser() except: pass def clear_temp_folder(self, folder_path): """ 폴더 내 모든 파일 삭제, 폴더 없으면 생성 """ if os.path.exists(folder_path): # 폴더 내 파일/폴더 삭제 for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: self.logger.log(f"파일 삭제 실패: {file_path}, 에러: {e}", level=logging.ERROR, exc_info=True) else: os.makedirs(folder_path, exist_ok=True) def run(self): """QThread의 run 메서드에서 이벤트 루프 생성 및 실행""" self.logger.log("run() - 이벤트 루프 초기화 시작", level=logging.DEBUG) # 이벤트 루프가 없거나 종료된 경우에만 초기화 if not self.loop or self.loop.is_closed(): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.logger.log("run() - 이벤트 루프가 생성되었습니다.", level=logging.DEBUG) # 이벤트 루프를 유지하여 외부에서 작업 추가 가능 self.loop.run_forever() def start_browser_task(self): """이벤트 루프에서 브라우저 초기화 작업 추가""" self.logger.log(f"start_browser_task - 브라우저 초기화 작업 시작 : {self.toggle_states_for_limited}", level=logging.DEBUG) # 실행 중인 이벤트 루프에 비동기 작업 추가 if self.loop and not self.loop.is_closed(): asyncio.run_coroutine_threadsafe(self.start_browser_async(), self.loop) self.browser_task_started = True # 작업 실행 플래그 설정 self.logger.log("start_browser_task - 비동기 작업이 추가되었습니다.", level=logging.DEBUG) else: self.logger.log("start_browser_task - 실행 중인 이벤트 루프가 없습니다.", level=logging.ERROR) def start_PercentyJob_task(self): """번역 작업을 이벤트 루프에서 실행""" # 이벤트 루프가 없거나 닫혀 있으면 새로 생성 # self.initialize_event_loop() if self.loop and not self.loop.is_closed(): # 이미 실행 중인 이벤트 루프에 번역 작업 추가 asyncio.run_coroutine_threadsafe(self.start_Percenty_task(), self.loop) else: self.logger.log("이벤트 루프가 초기화되지 않았거나 이미 종료되었습니다.", level=logging.ERROR) def start_shuffle_upload_task(self): """이벤트 루프에서 셔플업로드 작업 추가""" self.logger.log(f"start_shuffle_upload_task - 셔플업로드 작업 시작 : {self.toggle_states}", level=logging.DEBUG) # 실행 중인 이벤트 루프에 비동기 작업 추가 if self.loop and not self.loop.is_closed(): asyncio.run_coroutine_threadsafe(self.start_shuffle_upload_async(), self.loop) self.shuffle_upload_task_started = True # 작업 실행 플래그 설정 self.logger.log("start_shuffle_upload_task - 비동기 작업이 추가되었습니다.", level=logging.DEBUG) else: self.logger.log("start_shuffle_upload_task - 실행 중인 이벤트 루프가 없습니다.", level=logging.ERROR) def initialize_event_loop(self): """이벤트 루프가 없거나 닫힌 경우 새로 생성""" if not self.loop or self.loop.is_closed(): self.logger.log("initialize_event_loop - 새로운 이벤트 루프 생성 중", level=logging.DEBUG) self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.logger.log("initialize_event_loop - 이벤트 루프가 생성되었습니다.", level=logging.DEBUG) else: self.logger.log("initialize_event_loop - 기존 이벤트 루프 사용 중", level=logging.DEBUG) def stop(self): """Playwright를 안전하게 종료""" try: # 비동기 함수 호출을 동기식으로 처리 asyncio.run(self._stop_playwright()) except RuntimeError as e: self.logger.log(f"Playwright 종료 중 오류 발생: {e}", level=logging.ERROR) async def _stop_playwright(self): """모든 페이지/브라우저/Playwright 종료""" # 1. 페이지 닫기 if self.browser: for page in self.browser.pages: try: await page.close() except: pass try: await self.browser.close() except: pass # 2. Playwright 종료 if self.playwright: try: await self.playwright.stop() except: pass # 3. WhaleTranslator 등 추가 리소스 if self.whale_translator: self.whale_translator.close_trans_browser() # 4. 기타 핸들러 정리 if hasattr(self, 'imageProcessor') and self.imageProcessor: self.imageProcessor.cleanup() # 5. 루프 종료 if self.loop and not self.loop.is_closed(): self.loop.call_soon_threadsafe(self.loop.stop) def force_terminate_browser(self): """Playwright 브라우저 프로세스를 강제로 종료""" try: if self.browser: browser_pid = self.browser.contexts[0]._channel.owner._impl_obj._browser_pid browser_process = psutil.Process(browser_pid) for child in browser_process.children(recursive=True): child.kill() browser_process.kill() self.logger.log("브라우저 프로세스를 강제로 종료했습니다.", level=logging.WARNING) if self.whale_translator: self.whale_translator.close_trans_browser() except Exception as e: self.logger.log(f"브라우저 프로세스 강제 종료 중 오류 발생: {e}", level=logging.ERROR) def terminate(self): """QThread 종료시 정리""" self.logger.log("크롬 스레드 종료", level=logging.INFO) self.request_cleanup() # QThread 이벤트루프에서 정리 super().terminate() def cleanup(self): """BrowserController 종료 시 리소스 정리""" try: # ImageProcessor 리소스 정리 if hasattr(self, 'imageProcessor') and self.imageProcessor: self.logger.log("ImageProcessor 리소스 정리 중...", level=logging.INFO) self.imageProcessor.cleanup() except Exception as e: self.logger.log(f"리소스 정리 중 오류 발생: {e}", level=logging.ERROR) def request_cleanup(self): """브라우저 리소스 정리 명령을 QThread 이벤트루프에 올림""" if self.loop and not self.loop.is_closed(): asyncio.run_coroutine_threadsafe(self._stop_playwright(), self.loop) else: self.logger.log("정리 요청 시 루프가 없습니다.", level=logging.ERROR) def log_memory_usage(self, message=""): mem = psutil.virtual_memory() self.logger.log(f"{message} 메모리 사용: 총 {mem.total/1024/1024:.1f}MB, 사용 {mem.used/1024/1024:.1f}MB, 사용률 {mem.percent}%", level=logging.DEBUG) def pause_task(self): """작업을 일시정지합니다""" self.logger.log("작업 일시정지 요청...", level=logging.INFO) # 일시정지 상태 설정 self.pause() # 추가 로직이 필요한 경우 여기에 구현 def resume_task(self): """일시정지된 작업을 재개합니다""" self.logger.log("작업 재개 요청...", level=logging.INFO) # 재개 상태 설정 self.resume() # 추가 로직이 필요한 경우 여기에 구현