렌즈 final

This commit is contained in:
9700X_PC 2025-01-09 11:30:47 +09:00
parent f6e001cad9
commit ebd8f58fc3
7 changed files with 444 additions and 709 deletions

View File

@ -58,12 +58,12 @@ class CategoryManager:
for category in self.category_list:
if (
category["category1Name"] == detailed_category[0]
and category["category2Name"] == detailed_category[1]
and category["category3Name"] == detailed_category[2]
and category["category4Name"] == detailed_category[3]
and category["category2Name"] == (detailed_category[1] if len(detailed_category) > 1 else None)
and category["category3Name"] == (detailed_category[2] if len(detailed_category) > 2 else None)
and category["category4Name"] == (detailed_category[3] if len(detailed_category) > 3 else None)
):
# # 디버깅 로그 추가
# self.logger.log(f"매칭된 카테고리: {category['category_code']}, is_allowed: {category['is_allowed']}", level=logging.DEBUG)
# 디버깅 로그 추가
self.logger.log(f"매칭된 카테고리: {category['category_code']}, is_allowed: {category['is_allowed']}", level=logging.DEBUG)
# 허용된 카테고리만 반환
category_hierarchy = "-".join(
@ -154,21 +154,28 @@ class CategoryManager:
self.logger.log(f"3번째 카테고리까지 일치하는 항목을 찾을 수 없습니다: {detailed_category}", level=logging.ERROR, exc_info=True)
return None
# def is_category_allowed(self, category: List[Optional[str]]) -> bool:
# """카테고리 리스트를 받아 해당 카테고리가 허용인지 금지인지 확인."""
# for cat in self.category_list:
# if (
# cat["category1Name"] == category[0]
# and cat["category2Name"] == (category[1] if len(category) > 1 else None)
# and cat["category3Name"] == (category[2] if len(category) > 2 else None)
# and cat["category4Name"] == (category[3] if len(category) > 3 else None)
# ):
# # 디버깅 로그 추가
# self.logger.log(f"매칭된 카테고리: [{category['category_code']}], is_allowed: {cat['is_allowed']}", level=logging.DEBUG)
def is_category_allowed(self, category: List[Optional[str]]) -> bool:
"""카테고리 리스트를 받아 해당 카테고리가 허용인지 금지인지 확인."""
category = [cat if cat else None for cat in category] # 빈 문자열을 None으로 변환
# return bool(cat["is_allowed"]) # 허용이면 True, 금지면 False
# # 카테고리를 찾지 못하면 기본적으로 금지(False)로 간주
# return False
for cat in self.category_list:
if (
cat["category1Name"] == category[0]
and cat["category2Name"] == (category[1] if len(category) > 1 else None)
and cat["category3Name"] == (category[2] if len(category) > 2 else None)
and cat["category4Name"] == (category[3] if len(category) > 3 else None)
):
# 디버깅 로그 추가
self.logger.log(
f"매칭된 카테고리: [{cat['category_code']}], is_allowed: {cat['is_allowed']}",
level=logging.DEBUG,
)
return bool(cat["is_allowed"]) # 허용이면 True, 금지면 False
# 카테고리를 찾지 못하면 기본적으로 허용(True)으로 간주
self.logger.log(f"매칭 카테고리를 찾지못함 - 기본설정으로 허용으로 간주", level=logging.WARNING)
return True
def is_allowed_by_category_code(self, category_with_hierarchy: str) -> bool:
"""
@ -197,20 +204,36 @@ class CategoryManager:
self.logger.log(f"카테고리 코드를 찾을 수 없습니다: {category_code}", level=logging.DEBUG)
return True
def find_most_common_category(self, detailed_products: List[Dict]) -> Optional[str]:
"""상위 제품 정보에서 가장 빈도가 높은 카테고리 추출."""
category_list = [
product["category_code"]
# def find_most_common_category(self, detailed_products: List[Dict]) -> Optional[str]:
# """상위 제품 정보에서 가장 빈도가 높은 카테고리 추출."""
# category_list = [
# product["category_code"]
# for product in detailed_products
# if product.get("category_code")
# ]
# if not category_list:
# return None
# # 빈도 계산
# most_common = Counter(category_list).most_common(1)
# return most_common[0][0] if most_common else None
def find_most_common_category(self, detailed_products: List[Dict]) -> Optional[List[str]]:
"""상품 정보에서 가장 빈도가 높은 카테고리 경로 반환."""
# 모든 카테고리 경로를 리스트로 수집
category_paths = [
tuple(product["category"]) # 튜플로 변환하여 불변 객체로 사용 가능
for product in detailed_products
if product.get("category_code")
if product.get("category") and isinstance(product["category"], list)
]
if not category_list:
# 카테고리 경로가 비어 있다면 None 반환
if not category_paths:
return None
# 빈도 계산
most_common = Counter(category_list).most_common(1)
return most_common[0][0] if most_common else None
# 카테고리 경로 빈도 계산
most_common_path = Counter(category_paths).most_common(1)
return list(most_common_path[0][0]) if most_common_path else None
# # 사용 예제
# if __name__ == "__main__":

View File

@ -4,15 +4,16 @@ from typing import Dict, List
import os, sys
from PySide6.QtWidgets import QFileDialog
from pywinauto import Application, findwindows, timings
from pywinauto.controls.hwndwrapper import HwndWrapper
# from pywinauto import Application, findwindows, timings
# from pywinauto.controls.hwndwrapper import HwndWrapper
import configparser
from src.shoppingLens import ShoppingLensScraper
# from src.shoppingLens import ShoppingLensScraper
from src.titleManager import TitleManager
from src.categoryManager import CategoryManager
from src.naver_parser import NaverParser
# from src.naver_parser import NaverParser
from src.gpt_client import GPTClient
from src.xlsSerachThread import XlsSerachThread
from src.wh_con import WhaleController
import requests
from bs4 import BeautifulSoup
@ -27,15 +28,13 @@ class PostProcessor:
base_xls_path = 'baseXLS_Percenty.xlsx'
config_path = 'config.ini'
client_id='Uq5c9J_WdQYF8e2wOQT4'
client_secret='y0CnrADAae'
self.gpt = GPTClient(self.logger, api_key='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')
self.shopping_lens = ShoppingLensScraper(self.logger)
# self.shopping_lens = ShoppingLensScraper(self.logger)
self.wh_con = WhaleController(self.logger)
self.title_manager = TitleManager(self.logger, self.gpt)
self.categoryManager = CategoryManager(self.logger, base_xls_path)
self.naver_parser = NaverParser(self.logger, client_id=client_id, client_secret=client_secret)
# self.naver_parser = NaverParser(self.logger, client_id=client_id, client_secret=client_secret)
self.xlThread = XlsSerachThread(self.logger, self.db_manager)
@ -75,50 +74,50 @@ class PostProcessor:
base_dir = os.path.dirname(os.path.abspath(__file__))
return base_dir
def start_whale_browser(self):
base_path = self.get_base_dir()
whale_exe_path = os.path.join(base_path, "browsers", "whale", "whale.exe")
user_data_dir = os.path.join(base_path, "browsers", "whale", "user_data")
cache_dir = os.path.join(base_path, "browsers", "whale", "cache")
extension_path = os.path.join(base_path, "browsers", "whale", "extensions", "gadfmnjdnhkncfcibhfleoojcdimdcbd", "1.1.11_0")
# def start_whale_browser(self):
# base_path = self.get_base_dir()
# whale_exe_path = os.path.join(base_path, "browsers", "whale", "whale.exe")
# user_data_dir = os.path.join(base_path, "browsers", "whale", "user_data")
# cache_dir = os.path.join(base_path, "browsers", "whale", "cache")
# extension_path = os.path.join(base_path, "browsers", "whale", "extensions", "gadfmnjdnhkncfcibhfleoojcdimdcbd", "1.1.11_0")
whale_app = Application(backend="uia").start(
f'"{whale_exe_path}" --user-data-dir="{user_data_dir}" --disk-cache-dir="{cache_dir}" --load-extension="{extension_path}"'
)
# whale_app = Application(backend="uia").start(
# f'"{whale_exe_path}" --user-data-dir="{user_data_dir}" --disk-cache-dir="{cache_dir}" --load-extension="{extension_path}"'
# )
# 창이 완전히 생성될 때까지 대기
whale_window = self.find_whale_window(whale_app)
# # 창이 완전히 생성될 때까지 대기
# whale_window = self.find_whale_window(whale_app)
if whale_window:
self.logger.log(f"웨일 시작 완료.", level=logging.DEBUG)
else:
self.logger.log(f"웨일 창을 찾을 수 없습니다.", level=logging.DEBUG)
# if whale_window:
# self.logger.log(f"웨일 시작 완료.", level=logging.DEBUG)
# else:
# self.logger.log(f"웨일 창을 찾을 수 없습니다.", level=logging.DEBUG)
return whale_window
# return whale_window
def find_whale_window(self, whale_app):
try:
# 최대 10초 동안 '새 시크릿 탭 - Whale' 창이 나타나기를 기다림
timings.wait_until(10, 0.5, lambda: any(window.name == '새 탭 - Whale' for window in findwindows.find_elements()))
# def find_whale_window(self, whale_app):
# try:
# # 최대 10초 동안 '새 시크릿 탭 - Whale' 창이 나타나기를 기다림
# timings.wait_until(10, 0.5, lambda: any(window.name == '새 탭 - Whale' for window in findwindows.find_elements()))
windows = findwindows.find_elements()
for window in windows:
if window.name == '새 탭 - Whale':
whale_pid = window.process_id
whale_app = Application(backend="uia").connect(process=whale_pid)
whale_window = whale_app.top_window()
# windows = findwindows.find_elements()
# for window in windows:
# if window.name == '새 탭 - Whale':
# whale_pid = window.process_id
# whale_app = Application(backend="uia").connect(process=whale_pid)
# whale_window = whale_app.top_window()
# 위치 및 크기 조절
hwnd_wrapper = HwndWrapper(whale_window.handle)
hwnd_wrapper.move_window(x=1, y=1, width=1280, height=720)
whale_window.set_focus()
# # 위치 및 크기 조절
# hwnd_wrapper = HwndWrapper(whale_window.handle)
# hwnd_wrapper.move_window(x=1, y=1, width=1280, height=720)
# whale_window.set_focus()
self.logger.log(f"웨일 창을 성공적으로 찾았습니다.", level=logging.DEBUG)
return whale_window
self.logger.log(f"'새 탭 - Whale' 창을 찾을 수 없습니다.", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"웨일 창 탐색 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return None
# self.logger.log(f"웨일 창을 성공적으로 찾았습니다.", level=logging.DEBUG)
# return whale_window
# self.logger.log(f"'새 탭 - Whale' 창을 찾을 수 없습니다.", level=logging.DEBUG)
# except Exception as e:
# self.logger.log(f"웨일 창 탐색 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
# return None
async def post_by_DB(self):
# 1. DB에서 처리되지 않은 상품 가져오기
@ -301,7 +300,8 @@ class PostProcessor:
async def process_products(self, products):
# 쇼핑렌즈를 위한 웹브라우저 준비
whale_window = self.start_whale_browser()
# whale_window = self.start_whale_browser()
self.wh_con.start_whale_Browser()
# time.sleep(600)
@ -313,27 +313,21 @@ class PostProcessor:
try:
# 2. 쇼핑렌즈로 상품 정보 수집
self.logger.log(f"상품 {product['id']}에 대한 쇼핑렌즈 검색 시작", level=logging.DEBUG)
scraped_data, most_common_category = self.shopping_lens.search(whale_window=whale_window, categoryManager=self.categoryManager, image_url=product['image_url'], original_name=product['name'], min_price=30000, max_product=5)
self.logger.log(f"결정된 카테고리 : {most_common_category}", level=logging.DEBUG)
scraped_data = self.wh_con.search_and_parse(image_url=product['image_url'])
if not scraped_data:
self.logger.log(f"상품 {product['id']}에 대한 쇼핑렌즈 데이터 없음, 스킵", level=logging.WARNING)
continue
titles = [product["title"] for product in scraped_data if "title" in product]
# 4. 상품명 생성
final_title = self.title_manager.generate_product_name(titles, product['name'])
self.logger.log(f"상품명 생성 완료: {final_title}", level=logging.DEBUG)
most_common_category = self.categoryManager.find_most_common_category(scraped_data)
self.logger.log(f"결정된 카테고리 : {most_common_category}", level=logging.DEBUG)
if not final_title and most_common_category:
continue
category_code = self.categoryManager.find_category_code(most_common_category)
self.logger.log(f"결정된 카테고리 코드 : {category_code}", level=logging.DEBUG)
# 금지 카테고리 확인
tags = None
if scraped_data:
isvalid_category = self.categoryManager.is_allowed_by_category_code(most_common_category)
isvalid_category = self.categoryManager.is_category_allowed(most_common_category)
self.logger.log(f"isvalid_category : {isvalid_category}", level=logging.DEBUG)
if not isvalid_category:
@ -342,8 +336,14 @@ class PostProcessor:
else:
product['is_valid'] = 1
# 태그 필터링 및 병합
tags = self.filter_and_merge_tags(scraped_data)
titles = [product["title"] for product in scraped_data if "title" in product]
# 4. 상품명 생성
final_title = self.title_manager.generate_product_name(titles, product['name'])
self.logger.log(f"상품명 생성 완료: {final_title}", level=logging.DEBUG)
# 태그 필터링 및 병합
tags = self.filter_and_merge_tags(scraped_data)
additional_margin = self.calculate_additional_margin(scraped_data)
self.logger.log(f"더하기마진(=팔린가격) : {additional_margin}", level=logging.DEBUG)
@ -351,7 +351,7 @@ class PostProcessor:
# 6. 정보 업데이트
product.update({
"generated_Title": final_title,
"category_code": most_common_category,
"category_code": category_code,
"tags": tags,
"margin_price": additional_margin,
"memo": self.generate_memo(scraped_data)
@ -388,15 +388,34 @@ class PostProcessor:
result = ", ".join(filtered_tags)
self.logger.log(f"태그 필터링 : {result}", level=logging.DEBUG)
return result
except Exception as e:
self.logger.log(f"태그 필터링 및 병합 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return ""
# def calculate_additional_margin(self, scraped_data):
# prices = [item["price"] for item in scraped_data]
# return max(sum(prices) / len(prices), max(prices))
def calculate_additional_margin(self, scraped_data):
prices = [item["price"] for item in scraped_data]
return max(sum(prices) / len(prices), max(prices))
try:
# 가격 리스트를 추출 및 정수형으로 변환
prices = [int(item["price"]) for item in scraped_data if item.get("price")]
if not prices: # 가격 리스트가 비어 있는 경우 예외 처리
self.logger.log("가격 정보가 비어 있음", level=logging.WARNING)
return 0
# 평균값과 최대값 중 더 큰 값 반환
average_price = sum(prices) / len(prices)
max_price = max(prices)
result = max(average_price, max_price)
self.logger.log(f"추가 마진 계산: 평균값 {average_price}, 최대값 {max_price}, 최종 값 {result}", level=logging.DEBUG)
return result
except Exception as e:
self.logger.log(f"추가 마진 계산 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return 0
def generate_memo(self, scraped_data: List[Dict]) -> str:
"""

View File

@ -1,9 +1,7 @@
from pywinauto import Application, findwindows, timings
from pywinauto.timings import wait_until
from pywinauto import timings
from pywinauto.keyboard import send_keys
import time, logging, os, re, json
from deep_translator import GoogleTranslator
from collections import Counter
import logging
import pyperclip
@ -13,27 +11,6 @@ from bs4 import BeautifulSoup
class ShoppingLensScraper:
def __init__(self, logger=None):
self.logger = logger
self.gtranslator = GoogleTranslator(source="zh-CN", target="ko")
def translate_name(self, text: str) -> str:
"""
중국어 텍스트를 한국어로 번역하는 메서드
:param text: 번역할 텍스트
:return: 번역된 한국어 텍스트
"""
if not text.strip():
self.logger.log(f"빈 텍스트가 입력되었습니다.", level=logging.WARNING)
return ""
try:
# 번역 수행
result = self.gtranslator.translate(text)
self.logger.log(f"번역 성공: {text} -> {result}", level=logging.DEBUG)
return result
except Exception as e:
self.logger.log(f"번역 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return "번역 실패"
def save_control_identifiers(self, window, output_file="debug_controls.txt"):
"""
@ -52,9 +29,6 @@ class ShoppingLensScraper:
except Exception as e:
self.logger.log(f"컨트롤 식별자를 저장하는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
def scroll_into_view(self, whale_window, element_title, control_type="Text"):
"""
지정된 요소가 화면에 나타날 때까지 스크롤을 수행합니다.
@ -88,167 +62,91 @@ class ShoppingLensScraper:
except Exception as e:
self.logger.log(f"스크롤 중 오류 발생: {e}", level=logging.ERROR)
def search(self, whale_window, categoryManager, image_url, original_name, max_retries=3, min_price: int = 30000, max_product: int = 5):
def lens_Search(self, whale_window, image_url):
"""네이버 웨일을 이용하여 쇼핑 렌즈 검색 수행"""
for attempt in range(max_retries):
try:
self.logger.log(f"검색 시도 {attempt + 1}/{max_retries}", level=logging.INFO)
try:
pyperclip.copy(image_url)
pyperclip.copy(image_url)
# URL 이동
self.logger.log(f"주소창으로 이동", level=logging.INFO)
address_bar = whale_window.child_window(title="주소창 및 검색창", control_type="Edit")
address_bar.click_input()
address_bar.type_keys("^v") # 복사한 URL 붙여넣기
address_bar.type_keys("{ENTER}")
time.sleep(1) # 페이지 로딩 대기
# URL 이동
self.logger.log(f"주소창으로 이동", level=logging.INFO)
address_bar = whale_window.child_window(title="주소창 및 검색창", control_type="Edit")
address_bar.click_input()
address_bar.type_keys("^v") # 복사한 URL 붙여넣기
address_bar.type_keys("{ENTER}")
time.sleep(1) # 페이지 로딩 대기
# 이미지 우클릭 및 쇼핑 렌즈 실행
image = whale_window.child_window(control_type="Image")
image.right_click_input()
# 이미지 우클릭 및 쇼핑 렌즈 실행
image = whale_window.child_window(control_type="Image")
image.right_click_input()
self.logger.log(f"쇼핑 렌즈 검색", level=logging.INFO)
menu_item = whale_window.child_window(title="쇼핑렌즈로 검색하기", control_type="MenuItem")
menu_item.click_input()
self.logger.log(f"쇼핑 렌즈 검색", level=logging.INFO)
menu_item = whale_window.child_window(title="쇼핑렌즈로 검색하기", control_type="MenuItem")
menu_item.click_input()
self.logger.log(f"쇼핑 렌즈 검색 결과 기다리기", level=logging.INFO)
timings.wait_until(10, 0.5, lambda: whale_window.child_window(control_type="List", found_index=0).exists())
self.logger.log(f"쇼핑 렌즈 검색 결과 기다리기", level=logging.INFO)
timings.wait_until(10, 0.5, lambda: whale_window.child_window(control_type="List", found_index=0).exists())
time.sleep(0.5)
time.sleep(0.5)
# control_type="Document" 요소 검색
document_elements = whale_window.descendants(control_type="Document")
if not document_elements:
raise RuntimeError("Document 요소를 찾을 수 없습니다.")
# control_type="Document" 요소 검색
document_elements = whale_window.descendants(control_type="Document")
if not document_elements:
raise RuntimeError("Document 요소를 찾을 수 없습니다.")
# self.save_control_identifiers(whale_window, "document_elements.txt", self.logger)
# self.logger.log(f"컨트롤 식별자가 저장되었습니다.", level=logging.INFO)
type_index = None
for document in document_elements:
title_text = document.window_text() # Document의 title 텍스트 가져오기
if "쇼핑렌즈 검색결과" in title_text:
type_index = 1 # "쇼핑렌즈 검색결과" 포함 시 type_index=1
self.logger.log(f"쇼핑렌즈 검색결과 : type_index: {type_index}", level=logging.INFO)
# 요소를 화면에 표시
lens_result = self.scroll_into_view(
whale_window, element_title="쇼핑렌즈 검색결과", control_type="Text"
)
lens_result = whale_window.child_window(title="쇼핑렌즈 검색결과", control_type="Text")
lens_result.right_click_input()
time.sleep(0.5)
URL_COPY_menu_item = whale_window.child_window(title="URL 복사", control_type="MenuItem")
URL_COPY_menu_item.click_input()
time.sleep(0.5)
# 클립보드에서 복사한 URL 가져오기
page_url = pyperclip.paste()
if not page_url.startswith("http"):
raise ValueError(f"잘못된 URL이 복사되었습니다: {page_url}")
self.logger.log(f"복사된 URL: {page_url}", level=logging.INFO)
# view-source: URL 생성 및 이동
# source_page_url = f"view-source:{page_url}"
# self.logger.log(f"view-source URL 생성: {source_page_url}", level=logging.INFO)
return page_url
# break
elif "본문 바로가기" in title_text:
type_index = 0 # "본문 바로가기" 포함 시 type_index=0
self.logger.log(f"본문 바로가기 : type_index: {type_index}", level=logging.INFO)
source_page_url = self.extract_docu_title(document)
self.logger.log(f"source_page_url 생성완료", level=logging.DEBUG)
return source_page_url
# break
else:
type_index = None
if type_index is None:
raise RuntimeError("적절한 type_index를 결정할 수 없습니다.")
# self.logger.log(f"결정된 type_index: {type_index}", level=logging.INFO)
# # URL 복사 및 주소창에 붙여넣기
# self.logger.log(f"URL 복사 및 주소창에 붙여넣기", level=logging.INFO)
# pyperclip.copy(source_page_url)
# address_bar.click_input()
# address_bar.type_keys("^v") # 복사한 URL 붙여넣기
# address_bar.type_keys("{ENTER}")
# time.sleep(1.5) # 페이지 로딩 대기
# # 페이지 소스 저장
# document_element = whale_window.child_window(control_type="Document")
# page_source = document_element.window_text()
# self.logger.log(f"page_source extracted", level=logging.INFO)
# products = self.extract_next_data(page_source, type_index, min_price, max_product)
# self.logger.log(f"extract_next_data : {products}", level=logging.INFO)
# for product in products:
# category_code = categoryManager.find_category_code(product["category"])
# product["category_code"] = category_code
# most_common_category = categoryManager.find_most_common_category(products)
# back_button = whale_window.child_window(title="뒤로", control_type="Button")
# self.logger.log(f"뒤로가기 버튼을 클릭합니다.", level=logging.DEBUG)
# back_button.click_input()
type_index = None
for document in document_elements:
title_text = document.window_text() # Document의 title 텍스트 가져오기
# self.logger.log(f"스크래핑 완료. 추출된 데이터 : {len(products)}개", level=logging.INFO)
if "쇼핑렌즈 검색결과" in title_text:
type_index = 1 # "쇼핑렌즈 검색결과" 포함 시 type_index=1
# return products, most_common_category
return
except RuntimeError as e:
self.logger.log(f"검색 결과 없음: {e}", level=logging.DEBUG)
# 기본값 반환
original_title =self.translate_name(original_name)
self.logger.log(f"쇼핑렌즈 검색결과 : type_index: {type_index}", level=logging.INFO)
# 공백 또는 쉼표로 분리
words = re.split(r"[,\s]+", original_title)
# 요소를 화면에 표시
lens_result = self.scroll_into_view(
whale_window, element_title="쇼핑렌즈 검색결과", control_type="Text"
)
# 첫 8단어 추출
new_title = " ".join(words[:8])
lens_result = whale_window.child_window(title="쇼핑렌즈 검색결과", control_type="Text")
lens_result.right_click_input()
time.sleep(0.5)
return [{
"상품명": new_title, # 원본 상품명을 번역하여 사용
"상품가격": 10000, # 더하기마진 기본값
"배송비": 0 # 해외배송비 기본값
}]
URL_COPY_menu_item = whale_window.child_window(title="URL 복사", control_type="MenuItem")
URL_COPY_menu_item.click_input()
time.sleep(0.5)
except Exception as e:
self.logger.log(f"검색 시도 {attempt + 1}/{max_retries} 실패: {e}", level=logging.ERROR, exc_info=True)
if attempt == max_retries - 1:
self.logger.log(f"최대 재시도 횟수에 도달했습니다.", level=logging.INFO)
try:
# 검색실패시 해당시점의 컨트롤식별자 저장
self.save_control_identifiers(whale_window, "debug_controls.txt", self.logger)
self.logger.log(f"컨트롤 식별자가 저장되었습니다.", level=logging.INFO)
except Exception as ee:
self.logger.log(f"컨트롤 식별자 저장 중 오류 발생: {ee}", level=logging.ERROR, exc_info=True)
return [], None
# 클립보드에서 복사한 URL 가져오기
page_url = pyperclip.paste()
if not page_url.startswith("http"):
raise ValueError(f"잘못된 URL이 복사되었습니다: {page_url}")
time.sleep(2) # 재시도 전 대기
self.logger.log(f"복사된 URL: {page_url}", level=logging.INFO)
# self.click_back_and_img_buttons(window=whale_window)
return page_url, type_index
elif "본문 바로가기" in title_text:
type_index = 0 # "본문 바로가기" 포함 시 type_index=0
self.logger.log(f"본문 바로가기 : type_index: {type_index}", level=logging.INFO)
source_page_url = self.extract_docu_title(document)
self.logger.log(f"source_page_url 생성완료", level=logging.DEBUG)
# self.click_back_and_img_buttons(window=whale_window)
return source_page_url, type_index
except Exception as e:
self.logger.log(f"렌즈 검색 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
def extract_docu_title(self, document):
try:
# 네이버 쇼핑 검색 기본 URL
base_url = "https://msearch.shopping.naver.com/search/all?bt=-1&frm=MOSCPRO&query="
mobile_base_url = "https://msearch.shopping.naver.com/search/all?bt=-1&frm=MOSCPRO&query="
pc_base_url = "https://search.shopping.naver.com/search/all?bt=-1&frm=NVSCPRO&query="
if document.is_visible:
# Document 텍스트 가져오기
@ -275,7 +173,7 @@ class ShoppingLensScraper:
query_text = quote(extracted_text)
# 기본 URL과 결합하여 page_url 생성
page_url = f"{base_url}{query_text}"
page_url = f"{mobile_base_url}{query_text}"
self.logger.log(f"생성된 URL: {page_url}", level=logging.DEBUG)
return page_url
@ -288,295 +186,6 @@ class ShoppingLensScraper:
except Exception as e:
self.logger.log(f"컨트롤 식별자를 저장하는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
def extract_next_data(self, page_source: str, type_index, min_price, max_product):
"""HTML 페이지 소스에서 __NEXT_DATA__를 추출."""
try:
# BeautifulSoup로 HTML 파싱
soup = BeautifulSoup(page_source, "html.parser")
# __NEXT_DATA__ 스크립트 태그 찾기
next_data_script = soup.find("script", id="__NEXT_DATA__")
if not next_data_script or not next_data_script.string:
raise ValueError("__NEXT_DATA__를 찾을 수 없습니다.")
# JSON 디코딩
next_data = json.loads(next_data_script.string)
initial_state = next_data["props"]["pageProps"]["initialState"]
# self.logger.log(f"initial_state : {initial_state}", level=logging.DEBUG)
products = self.extract_product_details(initial_state, type_index, min_price, max_product)
self.logger.log(f"products : {len(products)}개 상품 추출 성공", level=logging.DEBUG)
return products
except json.JSONDecodeError as e:
raise ValueError(f"JSON 디코딩 실패: {e}")
except Exception as e:
raise ValueError(f"데이터 추출 중 오류 발생: {e}")
def extract_product_details(self, json_data: str, type_index: int = 1, min_price: int = 30000, max_product: int = 5):
"""
주어진 JSON 데이터를 기반으로 상품 정보를 추출하고, 공통 필드 데이터를 반환합니다.
:param json_data: JSON 형식의 문자열 데이터
:param type_index: 데이터 구조 선택 (1: 기존 방식, 0: 새로운 방식)
:param min_price: 필터링 기준이 되는 최소 가격 (기본값: 30000)
:param max_product: 반환할 최대 상품 (기본값: 5)
:return: 공통 필드를 포함한 상품 정보를 담은 딕셔너리 리스트
"""
try:
data = json.loads(json_data)
except json.JSONDecodeError as e:
self.logger.log(f"JSON 파싱 오류: {e}", level=logging.ERROR)
return []
products = []
# 데이터 추출 분기
if type_index == 1: # 기존 방식
items = data.get('imageSearch', {}).get('searchResult', {}).get('similarImages', [])
self.logger.log(f"기존 방식으로 데이터 추출: {len(items)}개의 항목", level=logging.INFO)
elif type_index == 0: # 새로운 방식
composite_products = data.get('compositeProducts', {}).get('list', [])
items = [product_data.get("item", {}) for product_data in composite_products]
self.logger.log(f"새로운 방식으로 데이터 추출: {len(items)}개의 항목", level=logging.INFO)
else:
self.logger.log("유효하지 않은 type_index입니다. 0 또는 1만 지원됩니다.", level=logging.ERROR)
return []
# 공통 필드 추출
for item in items:
try:
price = int(item.get("price", 0))
if price >= min_price:
product = {
"title": item.get("productName"),
"price": price,
"rank": int(item.get("rank", 0)),
"manuTag": item.get("manuTag"),
"imageUrl": item.get("src"),
"category": [
item.get("category1Name"),
item.get("category2Name"),
item.get("category3Name"),
item.get("category4Name")
],
"review_count": item.get("reviewCount"),
"relevance": item.get("relevance"),
"similarity": item.get("similarity"),
"overseaTp": item.get("overseaTp"),
"openDate": item.get("openDate"),
"purchaseCnt": item.get("purchaseCnt"),
}
products.append(product)
self.logger.log(f"상품 추가됨: {product}", level=logging.DEBUG)
except ValueError:
self.logger.log(f"가격 변환 오류: {item.get('price')}", level=logging.WARNING)
# 필터링된 상품을 rank 기준으로 정렬 후 상위 max_product 개 추출
products = sorted(products, key=lambda x: x["rank"])[:max_product]
self.logger.log(f"최종 필터링된 상품: {len(products)}개 반환", level=logging.INFO)
return products
def search_ori(self, whale_window, image_url, original_name, max_retries=2):
"""
네이버 웨일을 이용하여 쇼핑 렌즈 검색 수행
"""
keywords = ["무료", "쿠팡", "11번가", "해외", "네이버쇼핑", "G마켓", "옥션", "스마트스토어", "플러스스토어", "인터파크", "롯데온"]
for attempt in range(max_retries):
try:
self.logger.log(f"검색 시도 {attempt + 1}/{max_retries}", level=logging.DEBUG)
# URL 이동
address_bar = whale_window.child_window(title="주소창 및 검색창", control_type="Edit")
address_bar.click_input()
for chunk in [image_url[i:i + 10] for i in range(0, len(image_url), 10)]:
address_bar.type_keys(chunk, with_spaces=True)
address_bar.type_keys("{ENTER}")
time.sleep(2) # 페이지 로딩 대기
# 이미지 우클릭 및 쇼핑 렌즈 실행
image = whale_window.child_window(control_type="Image")
image.right_click_input()
menu_item = whale_window.child_window(title="쇼핑렌즈로 검색하기", control_type="MenuItem")
menu_item.click_input()
self.logger.log(f"쇼핑 렌즈 검색 결과 기다리기", level=logging.DEBUG)
timings.wait_until(10, 0.5, lambda: any(
whale_window.child_window(control_type="List", found_index=i).exists()
for i in range(2)
))
# found_index=0과 1에서 데이터 수집
all_texts = []
selected_index = None
selected_texts = None
for found_index in [0, 1]:
try:
list_box = whale_window.child_window(control_type="List", found_index=found_index)
if list_box.exists():
texts = list_box.texts()
if texts:
# 평탄화 처리
flattened_texts = [item for sublist in texts for item in sublist] if isinstance(texts[0], list) else texts
self.logger.log(f"found_index={found_index}에서 텍스트 수집: {flattened_texts}", level=logging.DEBUG)
all_texts.append((found_index, flattened_texts))
# 키워드가 포함된 텍스트 확인
for text in flattened_texts:
if any(keyword in text for keyword in keywords):
selected_index = found_index
selected_texts = texts
break
if selected_index is not None:
break # 필요한 리스트박스가 확인되면 루프 종료
except Exception as e:
self.logger.log(f"found_index={found_index}에서 텍스트 수집 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
if selected_index is None or not selected_texts:
raise RuntimeError("유효한 found_index를 결정할 수 없습니다. 키워드가 포함된 텍스트가 없습니다.")
self.logger.log(f"선택된 found_index: {selected_index}", level=logging.DEBUG)
self.logger.log(f"선택된 텍스트: {selected_texts}", level=logging.INFO)
# 이미 수집된 텍스트를 가공하여 처리
products_infos = self.process_listbox_data(selected_texts, type_index=selected_index)
if not products_infos or all(product["상품명"] == "" for product in products_infos):
raise RuntimeError("유효한 상품 정보를 추출할 수 없습니다.")
self.logger.log(f"스크래핑 완료. 추출된 데이터 : {len(products_infos)}\n {products_infos}", level=logging.DEBUG)
return products_infos
except RuntimeError as e:
self.logger.log(f"검색 결과 없음: {e}", level=logging.DEBUG)
# 기본값 반환
original_title =self.translate_name(original_name)
# 공백 또는 쉼표로 분리
words = re.split(r"[,\s]+", original_title)
# 첫 8단어 추출
new_title = " ".join(words[:8])
return [{
"상품명": new_title, # 원본 상품명을 번역하여 사용
"상품가격": 10000, # 더하기마진 기본값
"배송비": 0 # 해외배송비 기본값
}]
except Exception as e:
self.logger.log(f"검색 시도 {attempt + 1}/{max_retries} 실패: {e}", level=logging.DEBUG)
if attempt == max_retries - 1:
self.logger.log(f"최대 재시도 횟수에 도달했습니다.", level=logging.DEBUG)
return []
time.sleep(1) # 재시도 전 대기
def process_listbox_data(self, listbox_texts, type_index, max_product_count=5):
"""
list_box.texts() 데이터를 가공하여 상품 정보를 추출합니다.
:param listbox_texts: ListBox 텍스트 리스트
:param type_index: 0 또는 1 구분된 데이터 타입
:return: 가공된 상품 정보 리스트 (최대 5)
"""
processed_data = []
if type_index == 0:
for idx, text_list in enumerate(listbox_texts):
if idx >= max_product_count: # 최대 max_product_count개의 상품만 처리
break
try:
full_text = text_list[0]
# 전체 상품명 추출
match = re.search(r"(.*?)\d{1,3}(,\d{3})*원", full_text)
if match:
full_name = match.group(1).strip()
# 단어별 빈도 계산
words = full_name.split()
first_word = words[0] # 첫 단어는 무조건 포함
word_counts = Counter(words)
repeated_words = [word for word, count in word_counts.items() if count > 1]
# '해외+첫단어' 처리 및 중복 제거
if f"해외{first_word}" in repeated_words:
repeated_words.remove(f"해외{first_word}")
filtered_words = list(dict.fromkeys([first_word] + repeated_words)) # 첫 단어 추가 및 중복 제거
product_name = " ".join(filtered_words)
self.logger.log(f"product_name : {product_name}", level=logging.DEBUG)
# 가격 정보 추출
prices = [int(price.replace(",", "")) for price in re.findall(r"(\d{1,3}(?:,\d{3})*)원", full_text)]
product_price = prices[0] if prices else 0
shipping_price = prices[1] if len(prices) > 1 else 0
self.logger.log(f"product_price : {product_price}", level=logging.DEBUG)
self.logger.log(f"shipping_price : {shipping_price}", level=logging.DEBUG)
processed_data.append({
"상품명": product_name,
"상품가격": product_price,
"배송비": shipping_price
})
except Exception as e:
self.logger.log(f"데이터 처리 중 오류 발생 (type_index=0): {text_list} -> {e}", level=logging.ERROR, exc_info=True)
elif type_index == 1:
for idx, text_list in enumerate(listbox_texts):
if idx >= max_product_count: # 최대 max_product_count 개의 상품만 처리
break
try:
if isinstance(text_list, list):
product_name = re.sub(r"[^\w\s]", "", text_list[0]).strip()
if text_list[2] == '최저 ':
price_element = text_list[3]
else:
price_element = text_list[2]
product_price = int(price_element.replace(",", ""))
else:
raise ValueError("type_index=1 데이터가 리스트 형식이 아님")
shipping_price = 0
processed_data.append({
"상품명": product_name,
"상품가격": product_price,
"배송비": shipping_price
})
except Exception as e:
self.logger.log(f"데이터 처리 중 오류 발생 (type_index=1): {text_list} -> {e}", level=logging.ERROR, exc_info=True)
else:
raise ValueError(f"알 수 없는 type_index 값: {type_index}")
return processed_data
def click_back_and_img_buttons(self, window):
"""
뒤로가기 버튼을 클릭하는 함수

View File

@ -1,38 +1,30 @@
import os, sys
from pywinauto import Application, findwindows
from pywinauto.controls.hwndwrapper import HwndWrapper
from pywinauto.timings import wait_until
# from pywinauto.timings import wait_until
from pywinauto import Application, findwindows, timings
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
# from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
import time
from shoppingLens import ShoppingLensScraper
from src.categoryManager import CategoryManager
from loggerModule import Logger
from src.shoppingLens import ShoppingLensScraper
# from loggerModule import Logger
import logging
import json
from typing import Dict
from bs4 import BeautifulSoup
class WhaleBrowserController:
class WhaleController:
def __init__(self, logger=None):
self.logger = Logger(level=logging.DEBUG)
self.logger = logger
self.driver = None
self.browser_pid = None
self.whale_window = None
self.lens = ShoppingLensScraper(self.logger)
base_xls_path = 'baseXLS_Percenty.xlsx'
self.categoryManager = CategoryManager(self.logger, base_xls_path)
def default_logger(self, message, level="INFO", exc_info=False):
"""기본 로거. 별도 로거가 없을 경우 사용."""
self.logger.log(f"[{level}] {message}", level=logging.INFO)
def control_with_pywinauto(self):
def find_whale(self):
"""pywinauto를 사용하여 웨일 브라우저 제어"""
try:
if not self.browser_pid:
@ -41,7 +33,6 @@ class WhaleBrowserController:
# 최대 10초 동안 '새 시크릿 탭 - Whale' 창이 나타나기를 기다림
timings.wait_until(10, 0.5, lambda: any(window.name == 'whale://new-tab-page-third-party/ - Whale' for window in findwindows.find_elements()))
windows = findwindows.find_elements()
for window in windows:
@ -55,18 +46,18 @@ class WhaleBrowserController:
# pywinauto로 PID를 기반으로 애플리케이션에 연결
whale_app = Application(backend="uia").connect(process=window.process_id)
whale_window = whale_app.top_window()
self.whale_window = whale_app.top_window()
# 창 위치 및 크기 조정
hwnd_wrapper = HwndWrapper(whale_window.handle)
hwnd_wrapper = HwndWrapper(self.whale_window.handle)
hwnd_wrapper.move_window(x=1, y=1, width=1280, height=720)
whale_window.set_focus()
self.whale_window.set_focus()
# self.logger.log("웨일 브라우저를 pywinauto로 제어했습니다.", level=logging.INFO)
return whale_window
# return whale_window
except Exception as e:
self.logger.log(f"pywinauto로 웨일 제어 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
self.logger.log(f"웨일 제어 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
def get_base_dir(self):
"""
@ -83,8 +74,8 @@ class WhaleBrowserController:
base_dir = os.path.dirname(os.path.abspath(__file__))
return base_dir
def control_with_selenium(self, url):
"""웨일 브라우저에서 Selenium으로 페이지 제어"""
def start_whale_Browser(self):
"""웨일 제어"""
try:
base_path = self.get_base_dir()
@ -93,11 +84,11 @@ class WhaleBrowserController:
cache_dir = os.path.join(base_path, "browsers", "whale", "cache")
extension_path = os.path.join(base_path, "browsers", "whale", "extensions", "gadfmnjdnhkncfcibhfleoojcdimdcbd", "1.1.11_0")
# Selenium WebDriver 설정
# chromedriver_path = r"./chromedriver_130.0.6723.31.exe"
chromedriver_path = r"./chromedriver_128.0.6613.137.exe"
# chromedriver_path = r"./chromedriver_128.0.6613.137.exe"
chromedriver_path = os.path.join(base_path, "browsers", "chromedriver_128.0.6613.137.exe")
chrome_service = Service(chromedriver_path)
chrome_options = Options()
@ -130,32 +121,217 @@ class WhaleBrowserController:
self.browser_pid = self.driver.service.process.pid
self.logger.log(f"웨일 브라우저 PID: {self.browser_pid}", level=logging.INFO)
whale_window = self.control_with_pywinauto()
self.driver.get(url)
self.find_whale()
page_url = self.lens.search(whale_window=whale_window, categoryManager=self.categoryManager, image_url=url, original_name="product['name']", min_price=30000, max_product=5)
self.logger.log(f"page_url: {page_url}", level=logging.INFO)
# return whale_window, self.driver
except Exception as e:
self.logger.log(f"웨일 브라우저 시작 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
self.driver.get(page_url)
def get_product_list(self, data: Dict, type_index, min_price: int = 50000, top_n: int = 5) -> list:
"""
검색 결과에서 제품 리스트를 추출하고 필터링, 정렬 결과 구성.
:param data: 검색 결과 JSON 데이터
:param min_price: 최소 가격
:param top_n: 상위 N개 제품 추출
:return: 요청된 형식의 제품 정보 (상품별 딕셔너리 리스트)
"""
try:
self.logger.log(f"data 타입 : {type(data)}", level=logging.INFO)
products = []
related_tags = []
if type_index == 0:
# 1번 타입 데이터 처리
self.logger.log(f"1번 타입 데이터 처리", level=logging.INFO)
products_list = data["compositeProducts"]["list"]
products = [product["item"] for product in products_list if "item" in product]
# 1번 타입에서만 관련 태그 추출
related_tags = data.get("props", {}).get("pageProps", {}).get("relatedTags", [])
related_tags = [tag.strip() for tag in related_tags if tag]
elif type_index == 1:
# 2번 타입 데이터 처리
self.logger.log(f"1번 타입 데이터 처리", level=logging.INFO)
products_list = data["imageSearch"]["searchResult"]["similarImages"]
products = products_list # 유사 상품 리스트
# # 최소 가격 필터링
# self.logger.log(f"최소 가격 필터링 : {min_price}", level=logging.INFO)
# filtered_products = [
# product for product in products if int(product.get("price", 0)) >= min_price
# ]
# # 상위 N개 정렬 (랭킹 기준)
# self.logger.log(f"상위 [{top_n}]개 정렬 (랭킹 기준)", level=logging.INFO)
# sorted_products = sorted(filtered_products, key=lambda p: int(p.get("rank", 0)))[:top_n]
self.logger.log(f"상위 [{top_n}]개 ", level=logging.INFO)
sorted_products = products[:top_n]
# 상품별 딕셔너리 구성
product_data = []
self.logger.log(f"상품별 딕셔너리 구성", level=logging.INFO)
for product in sorted_products:
product_info = {
"title": product.get("productTitle", "") or product.get("title", ""),
"price": product.get("price", ""),
"category": [
product.get("category1Name", ""),
product.get("category2Name", ""),
product.get("category3Name", ""),
product.get("category4Name", ""),
],
"review_count": product.get("reviewCount", 0) or product.get("reviewCountSum", 0),
"purchaseCnt": product.get("purchaseCnt", 0),
"overseaTp": product.get("overseaTp", 0),
"rank": product.get("rank", 0),
"manuTag": product.get("manuTag", ""),
}
product_data.append(product_info)
self.logger.log(f"{len(product_data)}개의 제품 정보가 처리됨.", level=logging.DEBUG)
return product_data
except KeyError as e:
self.logger.log(f"제품 리스트를 처리하는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return []
def get_product_list_for_list(self, data: Dict, type_index, min_price: int = 50000, top_n: int = 5) -> Dict:
"""
검색 결과에서 제품 리스트를 추출하고 필터링, 정렬 결과 구성.
:param data: 검색 결과 JSON 데이터
:param min_price: 최소 가격
:param top_n: 상위 N개 제품 추출
:return: 요청된 형식의 제품 정보
"""
try:
self.logger.log(f"data 타입 : {type(data)}", level=logging.INFO)
products = []
related_tags = []
if type_index == 0:
# 1번 타입 데이터 처리
products_list = data["compositeProducts"]["list"]
products = [product["item"] for product in products_list if "item" in product]
elif type_index == 1:
# 2번 타입 데이터 처리
products_list = data["imageSearch"]["searchResult"]["similarImages"]
products = products_list # 유사 상품 리스트
# 연관검색어 태그 추출
related_tags = data.get("props", {}).get("pageProps", {}).get("relatedTags", [])
related_tags = [tag.strip() for tag in related_tags if tag]
# # 최소 가격 필터링
# products = [
# product for product in products if int(product.get("price", 0)) >= min_price
# ]
# # 상위 N개 정렬 (랭킹 기준)
# products = sorted(products, key=lambda p: int(p.get("rank", 0)))[:top_n]
self.logger.log(f"상위 [{top_n}]개 ", level=logging.INFO)
sorted_products = products[:top_n]
# 결과 데이터 구성
product_data = {
"title": [
product.get("productTitle", "") or product.get("title", "")
for product in sorted_products
],
"price": [
product.get("price", "")
for product in sorted_products
],
"category": [
[
product.get("category1Name", ""),
product.get("category2Name", ""),
product.get("category3Name", ""),
product.get("category4Name", ""),
]
for product in sorted_products
],
"review_count": [
product.get("reviewCount", 0) or product.get("reviewCountSum", 0)
for product in sorted_products
],
"purchaseCnt": [
product.get("purchaseCnt", 0) or product.get("purchaseCnt", 0)
for product in sorted_products
],
"overseaTp": [
product.get("overseaTp", 0) or product.get("overseaTp", 0)
for product in sorted_products
],
"rank": [
product.get("rank", 0) or product.get("rank", 0)
for product in sorted_products
],
"manuTag": [
product.get("manuTag", "")
for product in sorted_products
],
"related_tags": related_tags if related_tags else [],
}
self.logger.log(f"{len(sorted_products)}개의 제품 정보가 처리됨.", level=logging.DEBUG)
return product_data
except KeyError as e:
self.logger.log(f"제품 리스트를 처리하는 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return {
"title": [],
"price": [],
"category": [],
"review_count": [],
"purchaseCnt": [],
"overseaTp": [],
"rank": [],
"manuTag": [],
"related_tags": [],
}
def search_and_parse(self, image_url: str, min_price: int = 30000, top_n: int = 5) -> Dict:
"""
URL로 검색 필터링된 상위 제품 정보 반환.
:param image_url: 스크래핑할 이미지 URL
:param min_price: 최소 가격 필터
:param top_n: 상위 N개 제품 정보
:return: 요청된 형식의 제품 정보
"""
try:
self.driver.get(image_url)
result_page_url, type_index = self.lens.lens_Search(whale_window=self.whale_window,image_url=image_url)
self.logger.log(f"result_page_url: {result_page_url}", level=logging.INFO)
self.driver.get(result_page_url)
time.sleep(1)
# self.logger.log(f"scraped_data: {scraped_data}", level=logging.INFO)
# self.logger.log(f"most_common_category: {most_common_category}", level=logging.INFO)
# 페이지 소스 출력
page_source = self.driver.page_source
self.logger.log("현재 페이지 소스:", level=logging.INFO)
# self.logger.log(page_source, level=logging.INFO)
from bs4 import BeautifulSoup
self.logger.log("페이지소스 가져오기", level=logging.DEBUG)
# BeautifulSoup로 HTML 파싱
soup = BeautifulSoup(page_source, "html.parser")
# __NEXT_DATA__ 스크립트 태그 찾기
next_data_script = soup.find("script", id="__NEXT_DATA__")
@ -166,130 +342,38 @@ class WhaleBrowserController:
next_data = json.loads(next_data_script.string)
initialState = next_data["props"]["pageProps"]["initialState"]
self.logger.log(initialState, level=logging.INFO)
# self.logger.log(initialState, level=logging.INFO)
return initialState
result = json.loads(initialState)
# # __NEXT_DATA__ 추출
# try:
# next_data_element = self.driver.find_element(By.ID, "__NEXT_DATA__")
# next_data = next_data_element.get_attribute("innerHTML")
products_data = self.get_product_list(result, type_index, min_price, top_n)
# next_data_json = json.loads(next_data)
# self.logger.log("추출된 __NEXT_DATA__ JSON 데이터:", level=logging.INFO)
# self.logger.log(next_data_json, level=logging.INFO)
# return next_data_json
return products_data
# except Exception as e:
# self.logger.log(f"__NEXT_DATA__를 찾을 수 없습니다: {e}", level=logging.ERROR, exc_info=True)
# 브라우저 종료
self.driver.quit()
except Exception as e:
self.logger.log(f"Selenium 제어 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
self.logger.log(f"렌즈 검색 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
def get_product_list(self, data: Dict, min_price: int = 50000, top_n: int = 5) -> Dict:
"""
검색 결과에서 제품 리스트를 추출하고 필터링, 정렬 결과 구성.
:param data: 검색 결과 JSON 데이터
:param min_price: 최소 가격
:param top_n: 상위 N개 제품 추출
:return: 요청된 형식의 제품 정보
"""
def close_whale_br(self):
# 브라우저 종료
try:
products = []
related_tags = []
self.logger.log(f"브라우저를 종료합니다.", level=logging.DEBUG)
if "products" in data:
# 1번 타입 데이터 처리
products_list = data["products"]["list"]
products = [product["item"] for product in products_list if "item" in product]
if self.driver:
self.driver.quit()
else:
self.logger.log(f"브라우저 객체를 찾을 수 없습니다.", level=logging.WARNING)
# 1번 타입에서만 관련 태그 추출
related_tags = data.get("props", {}).get("pageProps", {}).get("relatedTags", [])
related_tags = [tag.strip() for tag in related_tags if tag]
except Exception as e:
self.logger.log(f"브라우저 종료 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
elif "imageSearch" in data:
# 2번 타입 데이터 처리
products_list = data["imageSearch"]["searchResult"]["similarImages"]
products = products_list # 유사 상품 리스트
# if __name__ == "__main__":
# # 웨일 브라우저 컨트롤러 인스턴스 생성
# whale_controller = WhaleBrowserController()
# 최소 가격 필터링
filtered_products = [
product for product in products if int(product.get("price", 0)) >= min_price
]
# 상위 N개 정렬 (랭킹 기준)
sorted_products = sorted(filtered_products, key=lambda p: int(p.get("rank", 0)))[:top_n]
# 결과 데이터 구성
product_data = {
"title": [product.get("productTitle") or product.get("title") for product in sorted_products],
"price": [product.get("price") for product in sorted_products],
"category": [
[
product.get("category1Name"),
product.get("category2Name"),
product.get("category3Name"),
product.get("category4Name"),
]
for product in sorted_products
],
"review_count": [product.get("reviewCount") or product.get("reviewCountSum") for product in sorted_products],
"manu_tag": [product.get("manuTag") for product in sorted_products],
"related_tags": related_tags if related_tags else [], # 2번 타입에서는 빈 리스트 반환
}
self.logger(f"{len(sorted_products)}개의 제품 정보가 처리됨.", level="DEBUG")
return product_data
except KeyError as e:
self.logger(f"제품 리스트를 처리하는 중 오류 발생: {e}", level="ERROR")
return {
"title": [],
"price": [],
"category": [],
"review_count": [],
"manu_tag": [],
"related_tags": [],
}
def search_and_parse(self, url: str, min_price: int = 50000, top_n: int = 5) -> Dict:
"""
URL로 검색 필터링된 상위 제품 정보 반환.
:param url: 스크래핑할 전체 URL
:param min_price: 최소 가격 필터
:param top_n: 상위 N개 제품 정보
:return: 요청된 형식의 제품 정보
"""
json_data = self.control_with_selenium(url)
if json_data:
return self.get_product_list(json_data, min_price, top_n)
return {
"title": [],
"price": [],
"category": [],
"review_count": [],
"manu_tag": [],
"related_tags": [],
}
# target_url = "https://file.percenty.co.kr/public/652bed8e865b1f32ea62bf1f/products/6774440131d56e408c286cd4/04ae3cd9-4fdd-4677-8da8-d37dbf240107.jpg"
# # target_url = 'https://file.percenty.co.kr/public/652bed8e865b1f32ea62bf1f/products/677443d531d56e408c286cca/b136631a-66c7-42c5-88a5-fa720215f259.jpg'
# # whale_controller.control_with_selenium(target_url)
if __name__ == "__main__":
# 웨일 브라우저 컨트롤러 인스턴스 생성
whale_controller = WhaleBrowserController()
# target_url = "https://file.percenty.co.kr/public/652bed8e865b1f32ea62bf1f/products/6774440131d56e408c286cd4/04ae3cd9-4fdd-4677-8da8-d37dbf240107.jpg"
target_url = 'https://file.percenty.co.kr/public/652bed8e865b1f32ea62bf1f/products/677443d531d56e408c286cca/b136631a-66c7-42c5-88a5-fa720215f259.jpg'
# whale_controller.control_with_selenium(target_url)
results = whale_controller.search_and_parse(url=target_url, min_price=50000, top_n=5)
print("결과 데이터:", json.dumps(results, indent=4, ensure_ascii=False))
# results = whale_controller.search_and_parse(url=target_url, min_price=50000, top_n=5)
# print("결과 데이터:", json.dumps(results, indent=4, ensure_ascii=False))

Binary file not shown.