# from playwright.sync_api import sync_playwright from playwright.async_api import async_playwright import re import pyautogui import time import win32gui, win32con from bs4 import BeautifulSoup import asyncio import os, sys, random class BrowserController: def __init__(self, app, logger, locator_manager): self.app = app self.logger = logger self.locator_manager = locator_manager # self.chrome_window_name = "퍼센티 - 셀러들을 위한 AI 구매대행 솔루션 - Chrome" # self.whale_window_name = "새 시크릿 탭 - Whale" self.chrome_hwnd = None self.whale_hwnd = None self.playwright = None self.browser = None self.page = None # BrowserController에 해당하는 모든 locator를 정의 self.chrome_window_name = self.locator_manager.get_locator('BrowserControl', 'chrome_window_name') self.login_email_locator = self.locator_manager.get_locator('BrowserControl', 'login_email_locator') self.login_password_locator = self.locator_manager.get_locator('BrowserControl', 'login_password_locator') self.login_button_locator = self.locator_manager.get_locator('BrowserControl', 'login_button_locator') self.admin_toggle_locator = self.locator_manager.get_locator('BrowserControl', 'admin_toggle_locator') self.staff_id_locator = self.locator_manager.get_locator('BrowserControl', 'staff_id_locator') self.staff_login_button_locator = self.locator_manager.get_locator('BrowserControl', 'staff_login_button_locator') self.close_ad_dialog_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_dialog_locator') self.close_ad_button_locator = self.locator_manager.get_locator('BrowserControl', 'close_ad_button_locator') self.total_product_count_locator = self.locator_manager.get_locator('BrowserControl', 'total_product_count_locator') self.product_name_template = self.locator_manager.get_locator('BrowserControl', 'product_name_template') self.product_price_template = self.locator_manager.get_locator('BrowserControl', 'product_price_template') self.product_image_template = self.locator_manager.get_locator('BrowserControl', 'product_image_template') self.product_edit_button_template = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template') self.current_page = self.locator_manager.get_locator('BrowserControl', 'current_page') self.next_page_button_template = self.locator_manager.get_locator('BrowserControl', 'next_page_button_template') self.new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator') self.current_page_locator = self.locator_manager.get_locator('BrowserControl', 'current_page_locator') self.source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator') self.ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator') self.option_input_field_locator = self.locator_manager.get_locator('BrowserControl', 'option_input_field_locator') self.title_tab_locator = self.locator_manager.get_locator('BrowserControl', 'title_tab_locator') self.option_tab_locator = self.locator_manager.get_locator('BrowserControl', 'option_tab_locator') self.price_tab_locator = self.locator_manager.get_locator('BrowserControl', 'price_tab_locator') self.tag_tab_locator = self.locator_manager.get_locator('BrowserControl', 'tag_tab_locator') self.thumb_tab_locator = self.locator_manager.get_locator('BrowserControl', 'thumb_tab_locator') self.detail_tab_locator = self.locator_manager.get_locator('BrowserControl', 'detail_tab_locator') self.upload_tab_locator = self.locator_manager.get_locator('BrowserControl', 'upload_tab_locator') self.save_button_locator = self.locator_manager.get_locator('BrowserControl', 'save_button_locator') self.text_templates = self.locator_manager.selectors.get('DetailPageTextTemplates', {}) def get_page(self): return self.page async def start_browser(self): """크롬 브라우저 실행 및 페이지 로딩""" self.logger.debug('크롬 브라우저 실행 중...') # Playwright를 수동으로 실행하여 브라우저 유지 self.playwright = await async_playwright().start() # cx_Freeze로 패키징된 경우와 일반 Python 실행 환경 구분하여 경로 설정 if getattr(sys, 'frozen', False): browser_path = os.path.join(os.path.dirname(sys.executable), 'browsers', 'chromium-1112', 'chrome-win','chrome.exe') extension_path = os.path.join(os.path.dirname(sys.executable), 'browsers', 'extensions', '1.1.100_0') user_data_dir = os.path.join(os.path.dirname(sys.executable), 'browsers', 'user_data') else: browser_path = os.path.join(os.path.dirname(__file__), 'browsers', 'chromium-1112', 'chrome-win','chrome.exe') extension_path = os.path.join(os.path.dirname(__file__), 'browsers', 'extensions', '1.1.100_0') user_data_dir = os.path.join(os.path.dirname(__file__), 'browsers', 'user_data') self.logger.debug(f"브라우저 경로: {browser_path}") self.logger.debug(f"확장 프로그램 경로: {extension_path}") self.logger.debug(f"사용자 폴더 경로: {user_data_dir}") # 사용자 데이터 디렉토리가 존재하지 않으면 생성 if not os.path.exists(user_data_dir): os.makedirs(user_data_dir) self.logger.debug(f"{user_data_dir} 디렉토리가 생성되었습니다.") # User agent 설정 user_agent = random.choice([ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0", ]) self.logger.debug(f"user_agent: {user_agent}") # 브라우저 시작 및 설정 self.browser = await self.playwright.chromium.launch_persistent_context( user_data_dir, headless=False, permissions=["geolocation", "notifications"], geolocation={"latitude": 37.5665, "longitude": 126.9780}, locale="ko-KR", args=[ '--disable-popup-blocking', f'--disable-extensions-except={extension_path}', f'--load-extension={extension_path}', '--start-maximized', '--window-size=1920,1080' ], executable_path=browser_path, user_agent=user_agent ) # 기본 페이지가 없을 수 있으므로 새로운 페이지 생성 self.page = await self.browser.new_page() self.logger.info('새 페이지 로딩 중...') await self.page.goto('https://percenty.co.kr/signin') self.logger.info('percenty.co.kr/signin 로딩 완료') # 첫 번째 기본 탭 닫기 if self.browser.pages: await self.browser.pages[0].close() # 페이지 제목을 가져와서 창 제목으로 활용 page_title = await self.page.title() self.logger.debug(f'페이지 제목: {page_title}') # 창 핸들 찾기 (동적으로 얻은 페이지 제목 사용) self.chrome_hwnd = self.find_window_by_title(page_title) if not self.chrome_hwnd: self.logger.warning('크롬 창을 찾을 수 없습니다.') else: self.logger.debug(f'크롬 창 핸들: {self.chrome_hwnd}') # await self.page.wait_for_load_state('networkidle', timeout=10000) async def login(self, admin_id, user_id, admin_password, user_password, is_admin=False): """로그인 처리""" self.logger.info(f'로그인 시도 중: {"관리자" if is_admin else "직원"} 계정') if is_admin: # 관리자 로그인 처리 await self.page.fill(self.login_email_locator, admin_id) await self.page.fill(self.login_password_locator, admin_password) await self.page.click(self.login_button_locator) else: # 관리자 토글 버튼을 클릭해서 직원 로그인 화면 활성화 admin_toggle = self.page.locator(self.admin_toggle_locator) if await admin_toggle.get_attribute("aria-checked") == "true": await admin_toggle.click() # 관리자 모드에서 직원 모드로 전환 await self.page.fill(self.login_email_locator, admin_id) await self.page.fill(self.staff_id_locator, user_id) await self.page.fill(self.login_password_locator, user_password) await self.page.click(self.staff_login_button_locator) self.logger.info(f'로그인 완료: {"관리자" if is_admin else "직원"} 계정') # await self.page.wait_for_load_state('networkidle', timeout=10000) await self.close_ad_if_exists() async def close_browser(self): """브라우저 종료""" if self.browser: await self.browser.close() await self.playwright.stop() self.logger.info('브라우저 종료됨.') def find_window_by_title(self, window_name): """창 제목을 통해 핸들을 찾는 메서드""" def enum_windows_callback(hwnd, result): if win32gui.IsWindowVisible(hwnd) and window_name in win32gui.GetWindowText(hwnd): result.append(hwnd) result = [] win32gui.EnumWindows(enum_windows_callback, result) return result[0] if result else None def switch_to_chrome(self): """크롬으로 포커스 전환""" if self.chrome_hwnd: win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE) win32gui.SetForegroundWindow(self.chrome_hwnd) self.logger.debug('크롬 창으로 포커스 이동.') else: self.logger.error('크롬 창을 찾을 수 없습니다.') async def get_total_product_count_ori(self): try: # JavaScript로 해당 요소의 텍스트를 가져옴 element_text = await self.page.evaluate('''() => { let element = document.querySelector('#root > div > div > div > div > main > div > div.sc-ezreuY.kYrYVh > div.sc-dChVcU.cRrUlt > div.sc-izQBue.dxiUJm > div > div:nth-child(1) > label > span:nth-child(2)'); return element ? element.innerText : null; }''') if element_text: self.logger.debug(f"가져온 텍스트: {element_text}") # 텍스트 확인용 로그 # "총 xx개 상품"에서 숫자만 추출 count = int(''.join(filter(str.isdigit, element_text))) return count else: self.logger.debug("요소를 찾을 수 없습니다.") return 0 except Exception as e: self.logger.debug(f"상품 수를 가져오는 중 오류 발생: {e}", exc_info=True) return 0 async def get_total_product_count(self): try: # Python 변수를 JavaScript로 전달하여 요소의 텍스트를 가져옴 element_text = await self.page.evaluate(f'''(selector) => {{ let element = document.querySelector(selector); return element ? element.innerText : null; }}''', self.total_product_count_locator) if element_text: self.logger.info(f"총 상품수 확인: {element_text}") # 텍스트 확인용 로그 # "총 xx개 상품"에서 숫자만 추출 count = int(''.join(filter(str.isdigit, element_text))) return count else: self.logger.warning("요소를 찾을 수 없습니다.") return 0 except Exception as e: self.logger.error(f"상품 수를 가져오는 중 오류 발생: {e}", exc_info=True) return 0 # def fetch_image_urls_ori(self, html_content): # """ # HTML 콘텐츠에서 모든 태그의 URL을 순서대로 추출하고 중복 제거. # """ # soup = BeautifulSoup(html_content, 'html.parser') # # 순서를 유지하면서 중복을 제거하기 위해 리스트 사용 # image_urls = [] # seen_urls = set() # #
내부의 모든 태그 찾기 # figures = soup.find_all('figure', class_='image') # for figure in figures: # img_tag = figure.find('img') # if img_tag and 'src' in img_tag.attrs: # url = img_tag['src'] # if url not in seen_urls: # image_urls.append(url) # seen_urls.add(url) # 중복 방지 # # class="image_resized"를 가진 모든 태그 찾기 # images_resized = soup.find_all('img', class_='image_resized') # for img in images_resized: # if img and 'src' in img.attrs: # url = img['src'] # if url not in seen_urls: # image_urls.append(url) # seen_urls.add(url) # 중복 방지 # return image_urls def fetch_image_urls(self, html_content): """ HTML 콘텐츠에서 모든 태그의 URL을 추출하는 함수.
안의 태그와 독립된 태그 모두 처리. """ soup = BeautifulSoup(html_content, 'html.parser') # 모든 태그를 찾기 image_urls = [] img_tags = soup.find_all('img') for img in img_tags: # img 태그에서 src 속성 추출 if 'src' in img.attrs: image_url = img['src'] image_urls.append(image_url) self.logger.debug(f"fetch_image_urls 에서 추출한 이미지URL 갯수 : {len(image_urls)} 개") self.logger.debug(f"fetch_image_urls 에서 추출한 이미지URL 목록 : {image_urls}") return image_urls async def close_ad_if_exists(self): """광고 다이얼로그가 있으면 닫기 버튼을 클릭하는 메서드""" try: # 광고 다이얼로그가 나타날 때까지 기다림 await self.page.wait_for_selector(self.close_ad_dialog_locator, timeout=5000, state='visible') self.logger.info("다이얼로그가 발견되었습니다. 닫기 버튼을 클릭합니다.") # 닫기 버튼 클릭 close_button = await self.page.query_selector(self.close_ad_button_locator) if close_button: await close_button.click() self.logger.info("다이얼로그를 성공적으로 닫았습니다.") else: self.logger.warning("닫기 버튼을 찾지 못했습니다.") except Exception as e: # 다이얼로그가 없거나 다른 문제가 발생한 경우 self.logger.error(f"다이얼로그가 발견되지 않았거나 오류 발생: {e}", exc_info=True) async def go_to_new_product_page(self): """신규 상품 등록 페이지로 이동""" try: new_product_page_locator = self.locator_manager.get_locator('BrowserControl', 'new_product_page_locator') await self.page.click(new_product_page_locator) self.logger.info("신규 상품 등록 페이지로 이동 완료.") except Exception as e: self.logger.error(f"신규 상품 등록 페이지 이동 중 오류: {e}", exc_info=True) # async def get_product_edit_buttons(self): # """현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기""" # try: # # 버튼 선택자를 가져옴 # edit_button_selector = self.product_edit_button # if not edit_button_selector: # self.logger.warning("상품 수정 버튼의 선택자를 찾을 수 없습니다.") # return [] # # 선택자를 사용해 버튼 객체를 찾음 # buttons = self.page.locator(edit_button_selector) # # 버튼이 존재하는지 확인 # if await buttons.count() == 0: # self.logger.warning("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.") # return [] # count = await buttons.count() # self.logger.info(f"수정할 상품 개수: {count}") # # 모든 버튼을 리스트로 반환 # return [buttons.nth(i) for i in range(count)] # except Exception as e: # self.logger.error(f"상품 수정 버튼을 찾는 중 오류: {e}", exc_info=True) # return [] async def is_button_disabled(self, button): """버튼이 disabled 상태인지 확인""" try: # 버튼의 disabled 속성 확인 is_disabled = await button.get_attribute('disabled') return is_disabled is not None # disabled 속성이 있으면 True 반환 except Exception as e: self.logger.error(f"상품 수정 버튼 상태 확인 중 오류 발생: {e}", exc_info=True) return False # 오류 발생 시 기본적으로 활성화된 것으로 처리 async def get_product_edit_buttons_by_templete(self): """현재 페이지의 세부사항 수정 및 업로드 버튼을 찾기""" try: # 버튼 선택자 설정 # edit_button_selector_template = f'//button[span[text()="세부사항 수정 및 업로드"]]' self.product_edit_button_template # 선택자를 사용해 버튼 객체를 찾음 buttons = self.page.locator(self.product_edit_button_template) # 버튼이 존재하는지 확인 button_count = await buttons.count() if button_count == 0: self.logger.warning("세부사항 수정 및 업로드 버튼을 찾을 수 없습니다.") return [] self.logger.info(f"현재 페이지의 수정할 상품 개수: {button_count}") # 모든 버튼을 리스트로 반환 return [buttons.nth(i) for i in range(button_count)] except Exception as e: self.logger.error(f"상품 수정 버튼을 찾는 중 오류: {e}", exc_info=True) return [] async def click_modify_button_by_text(self, index): """인덱스에 해당하는 '세부사항 수정 및 업로드' 버튼 클릭""" try: # config.ini에서 선택자 가져오기 button_template = self.locator_manager.get_locator('BrowserControl', 'product_edit_button_template') button_selector = f'({button_template})[{index}]' button = await self.page.query_selector(button_selector) # 버튼이 화면에 보이도록 스크롤 후 클릭 if button: await button.scroll_into_view_if_needed() await self.page.evaluate('arguments[0].click();', button) self.logger.info(f'{index}번째 상품의 수정 버튼 클릭 완료') else: self.logger.warning(f'{index}번째 상품의 수정 버튼을 찾지 못했습니다.') except Exception as e: self.logger.error(f'{index}번째 상품의 수정 버튼 클릭 중 오류: {str(e)}') async def open_product_edit_dialog(self, button): """상품 수정 다이얼로그 열기""" try: # 요소가 화면에 없을 경우 스크롤하여 보이도록 함 await button.scroll_into_view_if_needed() self.logger.debug("상품의 '세부사항 수정 및 업로드' 버튼을 화면에 보이도록 스크롤.") await button.click() self.logger.info("세부사항 수정 다이얼로그 열기 완료.") await self.page.wait_for_selector('div.ant-tabs-nav') # 다이얼로그가 완전히 로딩될 때까지 기다림 except Exception as e: self.logger.error(f"세부사항 수정 다이얼로그 열기 중 오류: {e}", exc_info=True) async def click_detail_tab(self): """상세페이지 탭 클릭""" try: await self.page.click(self.detail_tab_locator) self.logger.info("상세페이지 탭 클릭 완료.") except Exception as e: self.logger.error(f"상세페이지 탭 클릭 중 오류: {e}", exc_info=True) async def click_option_tab(self): """옵션 탭 클릭""" try: await self.page.click(self.option_tab_locator) self.logger.info("옵션 탭 클릭 완료.") except Exception as e: self.logger.error(f"옵션 탭 클릭 중 오류: {e}", exc_info=True) async def click_price_tab(self): """가격 탭 클릭""" try: await self.page.click(self.price_tab_locator) self.logger.info("가격 탭 클릭 완료.") except Exception as e: self.logger.error(f"가격 탭 클릭 중 오류: {e}", exc_info=True) async def click_title_tab(self): """상품명 탭 클릭""" try: await self.page.click(self.title_tab_locator) self.logger.info("상품명 탭 클릭 완료.") except Exception as e: self.logger.error(f"상품명 탭 클릭 중 오류: {e}", exc_info=True) async def extract_image_urls(self, optionHandler, is_option_data=False): """상세페이지에서 이미지 URL 추출""" try: # 소스 편집 모드로 전환 source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator') ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator') # 소스 편집 모드로 전환 await self.page.click(source_button_locator) self.logger.debug("소스 버튼 클릭 완료.") # 'data-value' 속성 값을 추출 (textarea 요소) textarea = await self.page.wait_for_selector(ck_source_editing_area_locator, timeout=5000) data_value = await textarea.get_attribute("data-value") # HTML 소스에서 이미지 URL 추출 image_urls = self.fetch_image_urls(data_value) self.logger.info(f'추출된 이미지 URL 수: {len(image_urls)}') # HTML 소스에서 이미지 URL 삭제 self.logger.debug('img 태그를 삭제 중...') data_value_element = await self.page.query_selector(ck_source_editing_area_locator) new_value = "" if data_value_element: await self.page.evaluate(f'() => document.querySelector("{ck_source_editing_area_locator}").setAttribute("data-value", "{new_value}")') updated_value = await data_value_element.get_attribute('data-value') self.logger.debug(f'Updated data-value: {updated_value}') else: self.logger.debug('Element with data-value not found.') self.logger.debug('img 태그 삭제 완료.') # img 태그의 class 삭제 후 다시 소스 버튼 클릭 await self.page.click(source_button_locator) self.logger.debug('소스 버튼 재 클릭 완료.') if is_option_data: self.logger.debug('옵션 데이터 입력 시작') option_data = {} # option_data 초기화 option_data = optionHandler.get_selected_translated_options() is_single = optionHandler.option_info['is_single_option'] is_single = True # 옵션입력 일단 제외 self.logger.debug('옵션입력 일단 제외') self.logger.debug('가져온 옵션 데이터') self.logger.debug(f'{option_data}') # 옵션 입력 필드 선택 input_field = await self.page.wait_for_selector(self.option_input_field_locator, timeout=5000) await input_field.press('Enter') # 선두부 텍스트 입력 for key in sorted(self.text_templates.keys()): leading_text = self.text_templates[key] if 'leading_text' in key and leading_text: # leading_text 항목만 가져오기 await input_field.type(leading_text) await input_field.press('Enter') self.logger.info(f"{key} 텍스트 입력 완료: {leading_text}") if not is_single: self.logger.info('단일옵션이 아니므로 옵션목록을 입력') # 각 옵션을 한 줄씩 입력 await input_field.type("# 옵션 목록") await input_field.press('Enter') # 첫 번째 옵션의 번역된 옵션명만 입력 first_key = list(option_data.keys())[0] first_value = option_data[first_key] await input_field.type(f"- 1. {first_value}") await input_field.press('Enter') # 첫 번째 옵션 이후 엔터로 줄바꿈 # 나머지 옵션도 번역된 옵션명만 입력 for i, (key, value) in enumerate(list(option_data.items())[1:], start=2): await input_field.type(f"{i}. {value}") # 옵션 번호와 번역된 옵션명만 입력 await input_field.press('Enter') # 엔터 키를 입력하여 줄바꿈 # 목록 끝을 알리기 위해 엔터 두 번 입력 await input_field.press('Enter') await input_field.press('Enter') # 후두부 텍스트 입력 await input_field.type('### 나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.') await input_field.press('Enter') await input_field.type('---') await input_field.press('Enter') self.logger.info('옵션 데이터 입력 완료') return image_urls except Exception as e: self.logger.error(f"이미지 URL 추출 & 옵션데이터 입력 처리 중 오류: {e}", exc_info=True) return image_urls if image_urls else [] def paste_image_in_chrome(self, clipboardImageManager, url, is_success_translated): """크롬으로 포커스를 옮기고 클립보드의 이미지를 붙여넣고 엔터 입력""" self.logger.debug("크롬으로 포커스를 옮기고 클립보드의 이미지를 붙여넣고 엔터 입력") try: self.switch_to_chrome() # 크롬으로 포커스 이동 clipboardImageManager.process_clipboard(original_url=url, is_success_translated=is_success_translated) # 클립보드 내용을 처리 # clipboard_content = pyperclip.paste() if clipboardImageManager.is_clipboard_image(): pyautogui.hotkey('ctrl', 'v') # 클립보드 이미지 붙여넣기 self.logger.info("이미지 붙여넣기 완료.") pyautogui.press('right') # 오른쪽 입력 self.logger.debug("이미지 붙여넣기 완료.") clipboardImageManager.clear_clipboard() self.logger.info("이미지 붙여넣기 완료로 클립보드 비우기.") return True else: self.logger.warning("클립보드가 비어있습니다.") return False except Exception as e: self.logger.error(f"이미지 붙여넣기 중 오류: {e}", exc_info=True) return False async def save_and_ecs_product_edit(self): """상품 수정 후 저장 버튼 클릭""" try: await self.page.click(self.save_button_locator) await self.page.keyboard.press("Escape") self.logger.info("상품 수정 내용 저장 및 ECS 완료.") except Exception as e: self.logger.error(f"저장 버튼 클릭 중 오류: {e}", exc_info=True) async def save_product_edit(self): """상품 수정 후 저장 버튼 클릭""" try: await self.page.click(self.save_button_locator) self.logger.info("상품 수정 내용 저장 완료.") except Exception as e: self.logger.error(f"저장 버튼 클릭 중 오류: {e}", exc_info=True) async def go_to_next_page(self): """다음 페이지로 이동""" try: # 현재 페이지가 몇 번째 페이지인지 확인 (클래스에 'ant-pagination-item-active'가 있는 요소) current_page = await self.page.query_selector(self.current_page_locator) if not current_page: self.logger.warning("현재 페이지 정보를 찾을 수 없습니다.") return False # 현재 활성화된 페이지 번호를 가져옴 current_page_number = int(await current_page.get_attribute("title")) next_page_number = current_page_number + 1 # 다음 페이지 버튼을 찾음 (title 속성으로 다음 페이지를 찾음) next_page_button_locator = self.next_page_button_template.format(page_number=next_page_number) next_page_button = await self.page.query_selector(next_page_button_locator) if next_page_button: await next_page_button.click() # 페이지 버튼 클릭 # await self.page.wait_for_load_state('domcontentloaded') # 페이지 로딩이 완료될 때까지 대기 self.logger.info(f"페이지 {next_page_number}로 이동 완료.") return True else: self.logger.warning("다음 페이지가 없습니다.") return False except Exception as e: self.logger.error(f"다음 페이지로 이동 중 오류 발생: {e}", exc_info=True) return False def switch_to_chrome(self): """크롬으로 포커스 전환""" try: if not self.chrome_hwnd: self.chrome_hwnd = self.find_window_by_title(self.chrome_window_name) if self.chrome_hwnd: win32gui.ShowWindow(self.chrome_hwnd, win32con.SW_RESTORE) win32gui.SetForegroundWindow(self.chrome_hwnd) self.logger.debug('크롬 창으로 포커스 이동.') else: self.logger.warning('크롬 창을 찾을 수 없습니다.') except Exception as e: self.logger.error(f"크롬 포커스 전환 중 오류: {e}", exc_info=True) async def scroll_with_wheel(self, direction="down", pause_time=0.5, max_scrolls=50): """ 휠 스크롤을 사용하여 페이지를 위나 아래로 천천히 스크롤. Parameters: - direction: 스크롤 방향 ("down"은 아래로, "up"은 위로). - pause_time: 스크롤 사이의 대기 시간 (초). - max_scrolls: 최대 스크롤 횟수. """ scroll_count = 0 self.logger.debug(f"스크롤 시작") # 현재 페이지 높이 가져오기 last_height = await self.page.evaluate("document.body.scrollHeight") self.logger.debug(f"현재 페이지 높이 가져오기 - {last_height}") while scroll_count < max_scrolls: if direction == "down": # 아래로 스크롤 self.logger.debug(f"scroll_count[{scroll_count}]회 : 휠 아래로 1000px") await self.page.evaluate("window.scrollBy(0, 1000);") elif direction == "up": # 위로 스크롤 self.logger.debug(f"scroll_count[{scroll_count}]회 : 휠 위로 1000px") await self.page.evaluate("window.scrollBy(0, -1000);") else: raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.") self.logger.debug(f"pause_time 슬립 : {pause_time}") await asyncio.sleep(pause_time) # 새로운 페이지 높이 가져오기 new_height = await self.page.evaluate("document.body.scrollHeight") self.logger.debug(f"새로운 페이지 높이 가져오기 - {new_height}") # 스크롤이 더 이상 필요 없는 경우(페이지 끝에 도달) if direction == "down" and new_height == last_height: self.logger.debug(f"페이지 끝에 도달했습니다. 스크롤 횟수: {scroll_count}") break elif direction == "up" and new_height == 0: self.logger.debug(f"페이지 시작에 도달했습니다. 스크롤 횟수: {scroll_count}") break self.logger.debug(f"새로운 페이지 높이를 현재높이로 재설정 - {new_height}") last_height = new_height scroll_count += 1 self.logger.debug(f"스크롤 카운트 + 1") if scroll_count == max_scrolls: self.logger.debug("최대 스크롤 횟수에 도달했습니다.") async def scroll_with_keyboard(self, direction="down", pause_time=0.5, max_scrolls=50): """ 키보드를 사용하여 페이지를 위나 아래로 천천히 스크롤. Parameters: - direction: 스크롤 방향 ("down"은 아래로, "up"은 위로). - pause_time: 스크롤 사이의 대기 시간 (초). - max_scrolls: 최대 스크롤 횟수. """ scroll_count = 0 while scroll_count < max_scrolls: if direction == "down": # 아래로 스크롤 (Page Down 키 사용) await self.page.keyboard.press("PageDown") elif direction == "up": # 위로 스크롤 (Page Up 키 사용) await self.page.keyboard.press("PageUp") else: raise ValueError("direction 인자는 'down' 또는 'up'만 허용됩니다.") await asyncio.sleep(pause_time) scroll_count += 1 if scroll_count == max_scrolls: self.logger.debug("최대 스크롤 횟수에 도달했습니다.") # async def collect_product_info(self): # """ # 상품 정보를 수집하는 메서드 # """ # try: # # 페이지를 아래로 스크롤하여 모든 상품 로드 # await self.scroll_with_wheel('down') # await self.scroll_with_wheel('up') # product_infos = [] # for i in range(1, 51): # 1부터 최대 50까지 상품 처리 # try: # # 각 상품의 CSS 선택자를 동적으로 생성하여 접근 # product_name_locator = self.product_name_template.format(i=i) # product_price_locator = self.product_price_template.format(i=i) # product_image_locator = self.product_image_template.format(i=i) # product_name_element = await self.page.query_selector(product_name_locator) # product_price_element = await self.page.query_selector(product_price_locator) # product_image_element = await self.page.query_selector(product_image_locator) # if product_name_element and product_price_element and product_image_element: # product_info = { # "name": await product_name_element.text_content().strip(), # "price": await product_price_element.text_content().strip(), # "image_url": await product_image_element.get_attribute('src') # } # self.logger.debug(f"상품 {i}: {product_info}") # product_infos.append(product_info) # except Exception as e: # self.logger.error(f"상품 {i} 정보 수집 중 오류 발생: {e}", exc_info=True) # continue # return product_infos # except Exception as e: # self.logger.error(f"상품 정보 수집 중 오류 발생: {e}", exc_info=True) # return [] async def scroll_page_to_bottom(self, pause_time=0.2): """페이지의 맨 아래까지 스크롤하여 모든 동적 요소를 로드""" self.logger.info('페이지 스크롤 시작...') previous_height = await self.page.evaluate("() => document.body.scrollHeight") while True: await self.page.evaluate("window.scrollBy(0, window.innerHeight);") # 한 화면씩 스크롤 await asyncio.sleep(pause_time) # 페이지 로딩 대기 current_height = await self.page.evaluate("() => document.body.scrollHeight") if current_height == previous_height: break # 더 이상 스크롤할 내용이 없으면 종료 previous_height = current_height self.logger.info('페이지 스크롤 완료.') async def scroll_page_to_top(self, pause_time=0.2): """페이지의 맨 위까지 스크롤""" self.logger.info('페이지 위로 스크롤 시작...') previous_height = await self.page.evaluate("() => window.pageYOffset") while previous_height > 0: await self.page.evaluate("window.scrollBy(0, -window.innerHeight);") # 한 화면씩 위로 스크롤 await asyncio.sleep(pause_time) # 페이지 로딩 대기 current_height = await self.page.evaluate("() => window.pageYOffset") if current_height == previous_height: break # 더 이상 스크롤할 내용이 없으면 종료 previous_height = current_height self.logger.info('페이지 위로 스크롤 완료.')