from collections import Counter from datetime import datetime import numpy as np import asyncio, time, math import tempfile, os class OptionHandler: def __init__(self, locator_manager, browser_controller, whale_translator, clipboardImageManager, logger, vertexAI, debug_flag=False): self.locator_manager = locator_manager self.browser_controller = browser_controller self.page = self.browser_controller.page self.clipboardImageManager = clipboardImageManager self.logger = logger self.debug_flag = debug_flag self.vertexAItranslator = vertexAI self.whale_translator = whale_translator self.is_percenty_success = False self.is_vertext_success = False self.init_option_info() # 선택자 로드 self.option_excluded_selector_template = self.locator_manager.get_locator('OptionLocators', 'option_excluded_selector_template') self.option_input_selector_template = self.locator_manager.get_locator('OptionLocators', 'option_input_selector_template') self.single_option_locator = self.locator_manager.get_locator('OptionLocators', 'single_option_locator') self.option_product_locator = self.locator_manager.get_locator('OptionLocators', 'option_product_locator') self.total_options_selector = self.locator_manager.get_locator('OptionLocators', 'total_options_selector') self.is_all_option_checked_selector = self.locator_manager.get_locator('OptionLocators', 'is_all_option_checked_selector') self.ai_option_btn_selector = self.locator_manager.get_locator('OptionLocators', 'ai_option_btn_selector') self.original_name_selector_template = self.locator_manager.get_locator('OptionLocators', 'original_name_selector_template') self.edit_field_selector_template = self.locator_manager.get_locator('OptionLocators', 'edit_field_selector_template') self.checkbox_selector_template = self.locator_manager.get_locator('OptionLocators', 'checkbox_selector_template') self.image_selector_template = self.locator_manager.get_locator('OptionLocators', 'image_selector_template') self.price_selector_template = self.locator_manager.get_locator('OptionLocators', 'price_selector_template') self.option_box_selector = self.locator_manager.get_locator('OptionLocators', 'option_box_selector') self.excluded_option_marker = self.locator_manager.get_locator('OptionLocators', 'excluded_option_marker') self.delete_button_selector_template = self.locator_manager.get_locator('OptionLocators', 'delete_button_selector_template') self.fallback1_delete_button_selector_template = self.locator_manager.get_locator('OptionLocators', 'fallback1_delete_button_selector_template') self.delete_dialog_selector = self.locator_manager.get_locator('OptionLocators', 'delete_dialog_selector') self.confirm_delete_button_selector = self.locator_manager.get_locator('OptionLocators', 'confirm_delete_button_selector') self.add_button_selector = self.locator_manager.get_locator('OptionLocators', 'add_button_selector') self.file_upload_button_selector = self.locator_manager.get_locator('OptionLocators', 'file_upload_button_selector') self.confirm_upload_button_selector = self.locator_manager.get_locator('OptionLocators', 'confirm_upload_button_selector') self.low_order_button_locator = self.locator_manager.get_locator('OptionLocators', 'low_order_button_locator') self.AtoZ_button_locator = self.locator_manager.get_locator('OptionLocators', 'AtoZ_button_locator') self.one_to_nine_button_locator = self.locator_manager.get_locator('OptionLocators', 'one_to_nine_button_locator') def update_page(self, page1): self.page = page1 self.logger.debug(f"page객체 업데이트 : {page1}") def update_whale(self, whale1): self.whale_translator = whale1 self.logger.debug(f"whale_translator 객체 업데이트 : {whale1}") def init_option_info(self): self.option_info = { 'is_single_option': False, 'is_completed_option': False, 'original_names': {}, 'translated_names': {}, 'selected_translated_options': {}, 'edit_fields': {}, 'checkboxes': [], 'images': {}, 'prices': {}, # 가격 정보 추가 'checked_states': {} # 체크된 상태를 저장하는 추가 변수 } def get_selected_translated_options(self): """클래스 변수에 저장된 선택된 번역된 옵션들을 반환""" return self.option_info.get('selected_translated_options', []) def filter_bait_items_with_price_distribution(self, options, lower_z=-0.9, upper_z=1.5, base_ratio=2): """ 옵션들에서 미끼 상품을 제외하고 본품 옵션들을 필터링하는 메서드. 표준편차 및 가격 범위를 고려하여 옵션을 필터링한다. Parameters: options (list): {'name': 옵션명, 'price': 가격} 형식의 옵션 리스트 std_threshold (float): 표준편차를 기준으로 필터링할 Z-점수 임계값 base_ratio (int): 가격 범위를 계산할 때 사용되는 비율. 최소가격의 2배 (기본: 2) min_main_item_count (int): 본품 가격대 옵션이 몰려 있는 최소 개수 (기본: 3) Returns: list: 필터링된 옵션 리스트 (가격 분포 및 기준에 맞게 필터링된 옵션들) """ prices = np.array([option['price'] for option in options]) # 1. 옵션의 개수가 적고 가격 차이가 큰 경우, 가장 낮은 가격의 옵션 1개만 제외 if len(prices) <= 3 and (max(prices) / min(prices) > 3): self.logger.debug("옵션 개수가 적고 가격 차이가 크므로 최저가 1개를 제외합니다.") final_options = [option for option in options if option['price'] != min(prices)] return final_options # 2. 표준편차 기반으로 낮은 가격대만 제거 (높은 가격대는 제외하지 않음) mean_price = np.mean(prices) std_price = np.std(prices) self.logger.debug(f"최저옵션: {mean_price}, 표준편차: {std_price}") if std_price == 0: # 모든 옵션의 가격이 동일한 경우 그대로 반환 self.logger.debug("모든 옵션의 가격이 동일합니다. 필터링 없이 모든 옵션을 반환합니다.") return options filtered_options = [] for option in options: z_score = (option['price'] - mean_price) / std_price self.logger.debug(f"Option {option['price']} z-score: {z_score}") if lower_z <= z_score <= upper_z: # Z-score 범위 내에서 필터링 filtered_options.append(option) self.logger.debug(f"Z-스코어 필터링: {[opt['price'] for opt in filtered_options]}") # 3. 본품 가격대 선택 후, 가격 범위 재설정 if filtered_options: min_price_after_filtering = min([option['price'] for option in filtered_options]) lower_bound = min_price_after_filtering / base_ratio upper_bound = min_price_after_filtering * base_ratio self.logger.debug(f"필터링된 본품옵션의 최저가: {min_price_after_filtering}, 마켓상한선: {upper_bound}") # 최종적으로 필터링된 가격 범위 내에 있는 옵션만 선택 final_options = [option for option in filtered_options if lower_bound <= option['price'] <= upper_bound] self.logger.debug(f"최종 선택된 옵션: {[(opt['name'], opt['price']) for opt in final_options]}") return final_options async def store_selected_options(self): """현재 페이지에서 선택된 옵션을 수집하여, 가격 낮은 순으로 정렬한 후 클래스 변수에 저장""" try: selected_translated_options = {} total_options_count = len(self.option_info['original_names']) # checked_states 딕셔너리에서 값이 True인 항목의 개수를 계산 checked_count = sum(value is True for value in self.option_info['checked_states'].values()) self.logger.debug(f"체크된 옵션 수: {checked_count}") await self.low_order_click() for i in range(1, total_options_count + 1): option_excluded_selector = self.option_excluded_selector_template.format(index=i) option_input_selector = self.option_input_selector_template.format(index=i) option_excluded_element = await self.page.query_selector(option_excluded_selector) if not option_excluded_element: option_input_element = await self.page.query_selector(option_input_selector) if option_input_element: option_name_value = (await option_input_element.get_attribute('value')).strip() selected_translated_options[option_name_value] = self.option_info['prices'].get(option_name_value, {}).get('low_price', 0) self.option_info['selected_translated_options'] = selected_translated_options self.logger.debug(f"선택된 옵션 저장 완료: {selected_translated_options}") except Exception as e: self.logger.error(f"선택된 옵션 저장 중 오류 발생: {e}", exc_info=True) async def process_options(self, product_name, max_option_count=20, toggle_states=False): """ 옵션 처리 로직. 옵션을 번역하고 이미지를 업데이트함. :param product_name: 상품명 (str 형태). :param max_option_count: 최대 옵션 갯수 (기본값 20). """ try: self.logger.debug(f"상품명: {product_name}에 대한 옵션을 처리 중...") self.logger.debug(f"이전 상품의 옵션정보를 초기화합니다.") self.init_option_info() # self.logger.debug(f"포커스를 위해 클릭1회") # self.low_order_click() # self.logger.debug(f"휠스크롤 Down") # await self.browser_controller.scroll_with_wheel('down') # self.logger.debug(f"휠스크롤 Up") # await self.browser_controller.scroll_with_wheel('up') # await self.page.wait_for_load_state('domcontentloaded') # self.logger.debug(f"동적요소 로딩완료") await asyncio.sleep(2) # 1. 단일 옵션인지 판단 if await self.is_single_option(): self.logger.debug("단일 옵션 상품입니다. 옵션 수정 과정을 생략합니다.") self.option_info['is_single_option'] = True return self.option_info # 2. 전체 옵션 체크박스 상태 확인 click_to_check_to_all = True self.logger.debug(f"일부 체크된 옵션상품에 대한 처리 방법 : {'전체체크에서 시작' if click_to_check_to_all else '일부 체크시 수정완료로 판단'}") if click_to_check_to_all: self.logger.debug("옵션이 일부만 체크된 상태입니다. 전체 체크로 바꿉니다.") await self.is_all_options_checked(click_to_check_to_all) else: self.logger.debug("옵션이 일부만 체크된 상태입니다. 옵션 수정이 완료된 상품으로 판단하여 패스합니다.") self.option_info['is_completed_option'] = True return self.option_info # 3. 가격 낮은 순 정렬 클릭 await self.low_order_click() try: self.logger.info(f"옵션 정보 수집") self.option_info = await self.collect_options_info() self.logger.debug(f"수집된 옵션 정보 : {self.option_info}") except Exception as e: # 옵션 처리 중 오류 발생 시 전체 로그 출력 self.logger.error(f"옵션 정보 수집 중 오류 발생: {e}", exc_info=True) # 4. 옵션 정보 수집 및 번역 if toggle_states['optionTrnas']: self.logger.debug(f"옵션 AI번역 : {toggle_states['optionTrnas']}") try: # Vertex AI를 통한 번역 시도 translated_options = await self.vertexAItranslator.translate_options(self.option_info['original_names'], product_name) self.logger.debug(f"번역된 옵션 입력") await self.apply_translated_options(translated_options, self.option_info['edit_fields']) self.is_vertext_success = True # 번역 성공 except ValueError as ve: # 안전 필터 예외 처리 if "SAFETY" in str(ve): self.logger.error(f"안전 필터에 의해 번역 요청이 차단되었습니다. {ve}") self.logger.debug("퍼센티 자체 AI번역 사용 시도") await self.page.click(self.ai_option_btn_selector) self.logger.debug("번역을 위한 5초간 대기") await asyncio.sleep(5) self.is_vertext_success = False self.is_percenty_success = True # except google.api_core.exceptions.ResourceExhausted as re: # # 할당량 초과 예외 처리 # self.logger.error(f"Vertex AI 할당량 초과: {re}") # self.logger.debug("퍼센티 자체 AI번역 사용 시도") # pyautogui.hotkey('alt', 'q') # self.logger.debug("번역을 위한 5초간 대기") # time.sleep(5) # translation_success = False # 번역 실패 except Exception as e: # 기타 예외 처리 self.logger.error(f"번역 처리 중 알 수 없는 오류 발생: {e}", exc_info=True) self.logger.debug("퍼센티 자체 AI번역 사용 시도") await self.page.click(self.ai_option_btn_selector) self.logger.debug("번역을 위한 5초간 대기") await asyncio.sleep(5) self.is_vertext_success = False # 번역 실패 self.is_percenty_success = True # 번역 성공 여부에 따른 로그 self.logger.debug(f"[{'VertexAI' if self.is_vertext_success else '퍼센티AI'}] 를 이용한 옵션번역 성공") # 5. 옵션 필터링 및 조정 if toggle_states['optionAutoSelect']: self.logger.debug(f"옵션 필터링 및 조정 : {toggle_states['optionAutoSelect']}") await self.filter_and_adjust_options(max_option_count) # 가격 낮은 순 재정렬 클릭 await self.low_order_click() # 6. 선택된 옵션정보 재수집 if self.is_percenty_success: self.logger.debug(f"퍼센티 옵션번역으로 인해 선택된 옵션정보 재수집") await self.store_selected_options() # 페이지에서 실제 선택된 옵션을 수집하여 저장 # 7. 옵션 이미지 업데이트 (옵션 이미지가 있는 경우) if toggle_states.get('optionIMGTrans'): self.logger.debug(f"옵션 이미지 번역을 시작합니다.") await self.update_option_image(toggle_states, debug_flag=self.debug_flag) # 8. A-Z or 1-99 button 클릭 what_prefix_button = '1-99' if what_prefix_button == 'A-Z': await self.AtoZ_button_click() elif what_prefix_button == '1-99': # # 9. A-Z 버튼 클릭 await self.one_to_nine_button_click() await self.low_order_click() # 9. 저장 버튼 클릭 self.logger.debug("저장 버튼을 클릭합니다.") await self.page.click('button:has-text("저장하기")') self.logger.debug("옵션 처리 완료.") return self.option_info except Exception as e: self.logger.debug(f"옵션 처리 중 오류 발생: {e}", exc_info=True) return None async def is_single_option(self): """단일 상품 상태 여부를 확인하는 메서드""" try: # 단일 상품 등록 버튼이 선택되었는지 확인 single_option_checked = await self.page.query_selector(self.single_option_locator) is not None # 옵션 상품 등록 버튼이 선택되었는지 확인 option_product_checked = await self.page.query_selector(self.option_product_locator) is not None # 두 요소의 상태를 기반으로 단일 상품 여부 결정 is_single = single_option_checked and not option_product_checked self.logger.debug(f"단일 상품 여부: {'단일 상품입니다' if is_single else '옵션 상품입니다'}") return is_single except Exception as e: self.logger.error(f"단일 옵션 확인 중 예외 발생: {e}", exc_info=True) return False async def is_all_options_checked(self, click_to_check_to_all=False): """ 전체 옵션 체크박스 상태를 확인하고 필요한 경우 전체 체크 상태로 변경. Args: click_to_check_to_all (bool): 전체 옵션을 체크할지 여부. True면 전체 체크 상태로 변경. Returns: bool: 전체 체크 상태일 경우 True, 일부 또는 전체 체크 해제 상태일 경우 False. """ try: # 전체 체크박스 요소 가져오기 checkbox_element = await self.page.query_selector(self.is_all_option_checked_selector) if not checkbox_element: self.logger.error("전체 체크박스 요소를 찾을 수 없습니다.") return False # 체크박스 상태 확인 aria_checked = await checkbox_element.get_attribute('aria-checked') self.logger.debug(f"aria_checked : {aria_checked}----------------") # 체크박스 상태 판단 if aria_checked == 'mixed': self.logger.debug("전체 체크박스가 일부만 체크되어 있음") if click_to_check_to_all: # 전체 체크 수행 await checkbox_element.click() self.logger.debug("전체 체크박스를 전체 체크 상태로 변경") return True # 전체 체크 후 True 반환 else: return False # 일부 체크 상태만 확인 시 False 반환 elif aria_checked == 'true' or aria_checked is None: self.logger.debug("전체 체크박스가 완전 체크 상태임") return True else: self.logger.debug("전체 체크박스가 체크 해제 상태임") return False except Exception as e: self.logger.error(f"전체 옵션 체크박스 상태 확인 중 오류 발생: {e}", exc_info=True) return False async def collect_options_info(self): """옵션 정보를 수집 (이미지, 옵션명, 편집 필드, 가격, 체크박스 정보 포함)""" try: # 총 옵션 갯수 수집 total_options_element = await self.page.query_selector(self.total_options_selector) total_options_count = int(''.join(filter(str.isdigit, await total_options_element.inner_text()))) if total_options_element else 0 self.logger.debug(f"총 옵션 갯수: {total_options_count}") # 옵션 정보를 비동기로 수집 (각 항목 병렬 처리) for i in range(1, total_options_count + 1): try: original_name_selector = self.original_name_selector_template.format(index=i) edit_field_selector = self.edit_field_selector_template.format(index=i) checkbox_selector = self.checkbox_selector_template.format(index=i) image_selector = self.image_selector_template.format(index=i) price_selector = self.price_selector_template.format(index=i) tasks = [ self.page.query_selector(original_name_selector), self.page.query_selector(edit_field_selector), self.page.query_selector(checkbox_selector), self.page.query_selector(image_selector), self.page.query_selector(price_selector) ] elements = await asyncio.gather(*tasks) original_name_element, edit_field_element, checkbox_element, image_element, price_element = elements self.logger.debug(f"{i}번째 original_name_element : {original_name_element}") self.logger.debug(f"{i}번째 edit_field_element : {edit_field_element}") self.logger.debug(f"{i}번째 checkbox_element : {checkbox_element}") self.logger.debug(f"{i}번째 image_element : {image_element}") self.logger.debug(f"{i}번째 price_element : {price_element}") if original_name_element: original_name = await original_name_element.inner_text() self.option_info['original_names'][f'origin_option_{i}'] = original_name self.option_info['edit_fields'][original_name] = edit_field_element if edit_field_element else None self.option_info['checkboxes'].append(checkbox_element if checkbox_element else None) # 체크박스 상태 수집 (체크 여부는 클래스 이름으로 판단) checkbox_state = None if checkbox_element: checkbox_classes = await checkbox_element.get_attribute('class') if 'ant-checkbox-checked' in checkbox_classes: checkbox_state = True elif 'ant-checkbox' in checkbox_classes: checkbox_state = False self.option_info['checkboxes'].append({'option_name': original_name, 'checked': checkbox_state}) self.logger.debug(f"=============================================") self.logger.debug(f"{i}번째 옵션 checkbox_state : {checkbox_state}") self.logger.debug(f"=============================================") if image_element: self.option_info['images'][original_name] = await image_element.get_attribute('src') if price_element: price_text = await price_element.inner_text() price = [int(p.replace(",", "")) for p in price_text.replace("원", "").split(" - ")] self.option_info['prices'][original_name] = {'low_price': price[0], 'high_price': price[-1]} self.logger.debug(f"{i}번째 옵션 정보 수집 완료") except Exception as e: self.logger.error(f"{i}번째 옵션 수집 중 오류 발생: {e}", exc_info=True) except Exception as e: self.logger.error(f"옵션 정보 수집 중 오류 발생: {e}", exc_info=True) return self.option_info # async def collect_options_info_ori(self): # """옵션 정보를 수집 (이미지, 옵션명, 편집 필드, 가격, 체크박스 정보 포함)""" # self.init_option_info() # try: # # 총 옵션 갯수 수집 # total_options_selector = '#productMainContentContainerId label.ant-checkbox-wrapper' # total_options_element = await self.page.query_selector(total_options_selector) # if total_options_element: # total_options_text = await total_options_element.inner_text() # total_options_count = int(''.join(filter(str.isdigit, total_options_text))) # 숫자만 추출 # else: # total_options_count = 0 # 옵션 갯수를 찾지 못할 경우 기본값 # self.logger.debug(f"총 옵션 갯수: {total_options_count}") # # 옵션 정보를 수집 (총 옵션 갯수만큼 반복) # for i in range(1, total_options_count + 1): # try: # # 원본옵션명 수집 # original_name_selector = f"div#productMainContentContainerId li:nth-child({i}) > div > div:nth-child(1) > div > div:nth-child(3) > div:nth-child(3) > span" # original_name_element = await self.page.query_selector(original_name_selector) # original_name = await original_name_element.inner_text() if original_name_element else None # if original_name: # # 옵션명 기준으로 수집 항목 구성 # self.logger.debug(f"{i}번째 옵션명 수집완료. 나머지 필드 수집중...") # self.option_info['original_names'][f'origin_option_{i}'] = original_name # # 옵션 편집 필드 수집 # edit_field_selector = f"div#productMainContentContainerId li:nth-child({i}) > div > div:nth-child(1) > div > div:nth-child(3) > div:nth-child(2) > div:nth-child(1) > span > input" # edit_field_element = await self.page.query_selector(edit_field_selector) # if edit_field_element: # self.option_info['edit_fields'][original_name] = edit_field_element # self.logger.debug(f"{i}번째 옵션편집필드 수집 완료 : {edit_field_element}") # else: # self.logger.debug(f"{i}번째 옵션편집필드 수집 실패▣ edit_field_element : {edit_field_element}") # # 옵션 체크박스 수집 # checkbox_selector = f'#productMainContentContainerId li:nth-child({i}) input[type="checkbox"]' # # f"div#productMainContentContainerId li:nth-child({i}) > div > div:nth-child(1) > div > div:nth-child(1) > label > span > input" # checkbox_element = await self.page.query_selector(checkbox_selector) # if checkbox_element: # self.option_info['checkboxes'].append(checkbox_element) # self.logger.debug(f"{i}번째 옵션 체크박스 수집 완료 : {checkbox_element}") # # 체크 상태 수집 # self.logger.debug(f"{i}번째 옵션 체크박스 상태 수집") # is_checked = await checkbox_element.is_checked() # original_name = self.option_info['original_names'].get(f'origin_option_{i}') # self.option_info['checked_states'][original_name] = is_checked # self.logger.debug(f"{i}번째 옵션 체크 상태: {is_checked}") # else: # self.logger.debug(f"{i}번째 옵션 체크박스 수집 실패▣ checkbox_element : {checkbox_element}") # # 옵션 이미지 수집 # image_selector = f'#productMainContentContainerId li:nth-child({i}) img.sc-gbvfcU.ezktkd' # # "div#productMainContentContainerId li:nth-child(1) > div > div:nth-child(1) > div > div:nth-child(2) > div > img" # # "div#productMainContentContainerId li:nth-child(2) > div > div:nth-child(1) > div > div:nth-child(2) > div > img" # image_element = await self.page.query_selector(image_selector) # if image_element: # image_url = await image_element.get_attribute('src') # self.option_info['images'][original_name] = image_url # self.logger.debug(f"{i}번째 옵션 이미지 수집 완료 : {image_element}") # else: # self.option_info['images'][original_name] = None # 이미지가 없으면 None. # self.logger.debug(f"{i}번째 옵션 이미지 수집 실패▣ image_element : {image_element}") # # 가격 정보 수집 # price_selector = f'#productMainContentContainerId li:nth-child({i}) sup' # price_element = await self.page.query_selector(price_selector) # if price_element: # price_text = await price_element.inner_text() # price_text = price_text.replace(",", "").replace("원", "").strip() # if " - " in price_text: # low_price, high_price = map(int, price_text.split(" - ")) # else: # low_price = high_price = int(price_text) # self.option_info['prices'][original_name] = {'low_price': low_price, 'high_price': high_price} # self.logger.debug(f"{i}번째 옵션 가격정보 수집 완료 : {low_price} - {high_price}") # else: # self.logger.debug(f"{i}번째 옵션 가격정보 수집 실패▣ price_element : {price_element}") # except Exception as e: # self.logger.error(f"{i}번째 옵션 수집 중 오류 발생: {e}", exc_info=True) # except Exception as e: # self.logger.error(f"옵션 정보 수집 중 오류 발생: {e}", exc_info=True) # return self.option_info async def apply_translated_options(self, translated_options, edit_fields): """번역된 옵션명을 편집 필드에 입력하고 selected_translated_options을 한 번에 업데이트""" try: updated_translations = {} # 업데이트된 번역 옵션을 저장할 딕셔너리 for key, translated_name in translated_options.items(): self.logger.debug(f"{key}번째 translated_name : {translated_name}") # 원본 옵션명을 기준으로 참조 origin_option_key = key.replace('trans_', 'origin_') # 'trans_option_1'을 'origin_option_1'로 변환 original_name = self.option_info['original_names'].get(origin_option_key) if original_name: edit_field = edit_fields.get(original_name) # 원본 옵션명으로 필드 참조 self.logger.debug(f"{key}번째 번역옵션 필드 : {edit_field}") if edit_field: await edit_field.fill(translated_name) # 필드에 번역된 옵션명 입력 self.logger.info(f"{key}번째 translated_name : [{translated_name}] 입력 완료") # 업데이트할 번역된 이름을 임시 딕셔너리에 저장 updated_translations[original_name] = translated_name else: self.logger.error(f"{key}번째 옵션 필드가 없습니다.") else: self.logger.error(f"원본 옵션명을 찾을 수 없습니다: {origin_option_key}") # 모든 번역이 끝난 후 한 번에 업데이트 self.option_info['selected_translated_options'].update(updated_translations) self.logger.debug(f"selected_translated_options 일괄 업데이트: {updated_translations}") except Exception as e: self.logger.error(f"번역된 옵션명을 입력하는 중 오류 발생: {e}", exc_info=True) def round_to_UP(self, number, nearest=1000): """ 숫자를 주어진 'nearest' 값에 맞춰 올림합니다. Parameters: - number (float or int): 올림할 숫자 - nearest (int): 올림할 단위 (기본값 1000) Returns: - rounded_number (int): 올림된 숫자 """ # nearest 값으로 나눈 후 math.ceil을 사용하여 올림 rounded_number = math.ceil(number / nearest) * nearest return rounded_number async def filter_and_adjust_options(self, max_option_count): """가격 필터링을 적용하고 옵션을 조정""" try: # 옵션 가격 정보 수집 prices = { name: {'low_price': info['low_price'], 'high_price': info['high_price']} for name, info in self.option_info['prices'].items() } # 가격 정보 형식에 맞게 옵션들을 리스트로 변환 options_list = [ { "name": name, "price": self.round_to_UP((info['low_price'] + info['high_price']) / 2) # 중간 값을 사용하여 필터링 } for name, info in prices.items() ] # 미끼 상품 필터링 및 가격 범위 재조정 filtered_options = self.filter_bait_items_with_price_distribution(options_list) # 필터링된 옵션명만 추출 filtered_option_names = {option['name'] for option in filtered_options} # # 필터링된 옵션들만 체크박스에서 남기고 나머지 체크박스 해제 # checkboxes = [ # self.option_info['checkboxes'][i] # for i, name in enumerate(self.option_info['original_names'].values()) # if name in filtered_option_names # ] # 체크박스 상태 조정 await self.adjust_options(filtered_option_names, max_option_count) except Exception as e: self.logger.error(f"옵션 필터링 및 조정 중 오류 발생: {e}", exc_info=True) async def adjust_options(self, filtered_option_names, max_option_count): """ 필터링된 옵션에 맞게 체크박스 상태를 조정하는 메서드. :param filtered_option_names: 필터링된 옵션 리스트 :param max_option_count: 최대 선택 가능한 옵션 수 (0이면 제한 없음) """ try: selected_count = 0 # 현재까지 선택된 옵션 수 self.logger.info(f"최대 선택 가능 옵션수 설정값 : {max_option_count}") for i, name in enumerate(self.option_info['original_names'].values()): # 최대 옵션 수에 도달하면 더 이상 선택하지 않음 if max_option_count > 0 and selected_count >= max_option_count: break checkbox_selector = self.checkbox_selector_template.format(index=i+1) checkbox_element = await self.page.query_selector(checkbox_selector) # 디버깅 로그: 현재 옵션 이름과 필터링된 옵션 이름 확인 self.logger.debug(f"옵션 이름: {name}, 필터링된 옵션에 포함 여부: {name in filtered_option_names}") if checkbox_element: # 필터링된 옵션에 포함되고, 선택 가능한 수량 내라면 선택 if name in filtered_option_names and (max_option_count == 0 or selected_count < max_option_count): # await checkbox_element.click() self.logger.debug(f"옵션 '{name}' 체크함") self.option_info['checked_states'][name] = True selected_count += 1 # 필터링된 옵션에 포함되지 않으면 선택 해제 else: await checkbox_element.click() self.logger.debug(f"옵션 '{name}' 체크 해제함") self.option_info['checked_states'][name] = False self.logger.debug(f"옵션 체크 상태 조정 완료. 선택된 옵션 수: {selected_count}/{max_option_count if max_option_count > 0 else '무제한'}") except Exception as e: self.logger.error(f"옵션 체크 상태 조정 중 오류 발생: {e}", exc_info=True) async def adjust_options_without_max_count(self, filtered_option_names, max_option_count): """ 필터링된 옵션에 맞게 체크박스 상태를 조정하는 메서드. :param filtered_options: 필터링된 옵션 리스트 :param max_option_count: 최대 선택 가능한 옵션 수 """ try: # 옵션 체크 상태를 수집한 정보에서 필터링된 옵션들만 체크 상태로 유지 for i, name in enumerate(self.option_info['original_names'].values()): checkbox_selector = self.checkbox_selector_template.format(index=i+1) checkbox_element = await self.page.query_selector(checkbox_selector) # is_checked = self.option_info['checked_states'].get(name, False) # 디버깅 로그: 현재 옵션 이름과 필터링된 옵션 이름 확인 self.logger.debug(f"옵션 이름: {name}, 필터링된 옵션에 포함 여부: {name in filtered_option_names}") if checkbox_element: # 필터링된 이름에 포함되는 경우 if name in filtered_option_names: # await checkbox_element.click() self.logger.debug(f"옵션 '{name}' 체크함") self.option_info['checked_states'][name] = True # 필터링된 이름에 포함되지 않는 경우 else: await checkbox_element.click() self.logger.debug(f"옵션 '{name}' 체크 해제함") self.option_info['checked_states'][name] = False self.logger.debug(f"옵션 체크 상태 조정 완료.") except Exception as e: self.logger.error(f"옵션 체크 상태 조정 중 오류 발생: {e}", exc_info=True) async def AtoZ_button_click(self): self.logger.debug("A-Z 버튼을 클릭합니다.") await self.page.click(self.AtoZ_button_locator) async def one_to_nine_button_click(self): self.logger.debug("1-99 버튼을 클릭합니다.") await self.page.click(self.one_to_nine_button_locator) async def low_order_click(self): self.logger.debug("가격 낮은 순 정렬을 클릭합니다.") await self.page.click(self.low_order_button_locator) async def update_option_image(self, toggle_states, debug_flag=False): """ 옵션 이미지가 존재할 경우, 제외된 옵션이 아닌 경우 번역하여 업데이트하는 메서드. :param debug_flag: 디버그 모드일 경우 임시 이미지를 삭제하지 않음 (기본값 False). """ # main.py 실행 폴더의 tmp_images 폴더 경로 설정 base_dir = os.path.dirname(os.path.abspath(__file__)) # main.py 위치 확인 temp_dir = os.path.join(base_dir, "tmp_images") # tmp_images 폴더 경로 설정 os.makedirs(temp_dir, exist_ok=True) # 폴더가 없으면 생성 try: # 모든 옵션 상자 요소 가져오기 option_boxes = await self.page.query_selector_all(self.option_box_selector) # option_boxes = await self.page.query_selector_all("div#productMainContentContainerId li > div > div:nth-child(1) > div > div:nth-child(2) > div") # option_image_element = await self.page.locator("xpath=//*[@id='productMainContentContainerId']/div[1]/div[2]/div/div/div[2]/div/div[1]/div/div/div[2]/div/div/div[5]/div[1]/div/div/ul/li[1]/div/div[1]/div/div[2]/div/img").element_handle() total_options = len(option_boxes) self.logger.debug(f"총 {total_options}개의 옵션 이미지 번역을 시작합니다.") # 실제 옵션 이미지가 존재하는 항목에만 인덱스를 적용하기 위해 별도 카운터 사용 translated_index = 1 # 각 옵션 상자를 순회 for index, option_box in enumerate(option_boxes, start=1): # 선택자에서 인덱스를 반영해 동적 선택자를 생성 add_button_selector = self.add_button_selector.format(index=index) # 옵션 박스 내부의 텍스트 확인하여 "제외된 옵션" 포함 여부 검사 option_text_content = await option_box.inner_text() if "제외된 옵션" in option_text_content: self.logger.debug(f"{index}번째 옵션은 제외된 옵션입니다. 번역을 생략합니다.") continue # 제외된 옵션이므로 다음 옵션으로 이동 # 옵션 이미지가 존재하는지 확인 option_image = await option_box.query_selector("img") if option_image is None: self.logger.debug(f"{index}번째 옵션에 이미지가 없습니다. 다음 옵션으로 이동합니다.") continue option_image_url = await option_image.get_attribute("src") self.logger.debug(f"{index}번째 옵션 이미지 URL: {option_image_url}") # 이미지가 SVG 형식일 경우 번역을 건너뜀 if option_image_url.endswith(".svg"): self.logger.debug(f"{index}번째 옵션은 SVG 이미지입니다. 번역을 생략합니다.") continue # 이미지 번역 및 업로드 translated_image_path = os.path.join(temp_dir, f"translated_option_{translated_index}.png") # 이미지 저장 경로 설정 try: # 이미지 번역 self.logger.debug(f"{index}번째 옵션의 이미지 번역 시도") is_success_translated = self.whale_translator.translate_image(option_image_url) self.clipboardImageManager.process_clipboard(option_image_url, is_success_translated, toggle_states, translated_image_path) self.logger.debug(f"{index}번째 옵션의 번역 이미지 저장 완료: {translated_image_path}") self.browser_controller.switch_to_chrome() # 크롬으로 포커스 이동 if is_success_translated and os.path.exists(translated_image_path): # 삭제 버튼 클릭 # delete_button = await self.page.query_selector(delete_button_selector) self.logger.debug(f"{index}번째 옵션의 이미지 삭제 버튼 가져오기") try: # 기본 선택자로 삭제 버튼 찾기 delete_button = self.page.locator(f'{self.delete_button_selector_template.format(index=index)}') await delete_button.wait_for(state="attached", timeout=5000) # 타임아웃 설정 if not await delete_button.is_visible(): # fallback으로 재시도 delete_button = self.page.locator(f'xpath={self.fallback1_delete_button_selector_template.format(index=index)}') await delete_button.wait_for(state="attached", timeout=5000) if await delete_button.is_visible(): await delete_button.click() self.logger.debug(f"{index}번째 옵션의 삭제 버튼 클릭") # 다이알로그 확인 후 삭제 버튼 클릭 try: self.logger.debug(f"{index}번째 옵션의 삭제 다이알로그 확인 중...") dialog = await self.page.wait_for_selector(self.delete_dialog_selector, timeout=5000) # 다이알로그 클래스 확인 if dialog: self.logger.debug(f"{index}번째 옵션의 삭제 다이알로그 확인됨") # 삭제 확인 버튼 찾기 confirm_delete_button = await dialog.query_selector(self.confirm_delete_button_selector) if confirm_delete_button: await confirm_delete_button.click() self.logger.debug(f"{index}번째 옵션의 삭제 확인 버튼 클릭됨") else: self.logger.error(f"{index}번째 옵션의 삭제 확인 버튼이 보이지 않습니다.") else: self.logger.error(f"{index}번째 옵션의 삭제 다이알로그가 나타나지 않았습니다.") except Exception as e: self.logger.error(f"{index}번째 옵션의 삭제 다이알로그를 찾는 중 오류 발생: {e}", exc_info=True) except Exception as e: self.logger.error(f"{index}번째 옵션의 삭제 버튼을 찾는 중 오류 발생: {e}", exc_info=True) try: # '+ 버튼' 클릭 후 파일 업로드 self.logger.debug(f"{index}번째 옵션의 이미지추가 버튼 가져오기") add_button = await self.page.query_selector(add_button_selector) if add_button: await add_button.click() self.logger.debug(f"{index}번째 옵션의 이미지추가 버튼 클릭") # 파일 업로드 영역의 input 요소 직접 선택 (수정된 부분) file_input = await self.page.query_selector(self.file_upload_button_selector) # Ant Design의 클래스 사용 if file_input: # Playwright의 set_input_files를 사용하여 파일 업로드 처리 await file_input.set_input_files(translated_image_path) self.logger.debug(f"{index}번째 옵션의 파일 업로드 완료") # '이미지 삽입' 버튼 클릭 confirm_upload_button = await self.page.wait_for_selector(self.confirm_upload_button_selector) await confirm_upload_button.click() self.logger.debug(f"{index}번째 옵션에 이미지가 업로드되었습니다.") else: self.logger.error(f"{index}번째 옵션의 파일 입력 요소를 찾을 수 없습니다.") except Exception as e: self.logger.error(f"{index}번째 옵션의 이미지를 추가하는 중 오류 발생: {e}", exc_info=True) except Exception as e: self.logger.error(f"{index}번째 옵션 이미지 번역 중 오류 발생: {e}", exc_info=True) finally: # 파일 사용 후 0.5초 대기하여 접근 완료 보장 time.sleep(0.5) # 디버그 모드가 아닐 경우 임시 파일 삭제 if not debug_flag and os.path.exists(translated_image_path): os.remove(translated_image_path) self.logger.debug(f"{index}번째 옵션의 임시 번역 이미지 파일 삭제 완료: {translated_image_path}") # 실제 번역이 완료된 경우에만 인덱스 증가 translated_index += 1 except Exception as e: self.logger.error(f"옵션 이미지 업데이트 중 오류 발생: {e}", exc_info=True)