import time import json import re import requests from bs4 import BeautifulSoup from playwright.sync_api import sync_playwright from PIL import Image, ImageOps from skimage.metrics import structural_similarity as ssim import numpy as np import sys, os, random class BaiduImageSearcher: def __init__(self, base_path, sources=None, image_downloader=None, db_manager=None, logger=None): self.base_path = base_path self.filtered_sources = set(sources) if sources else {'淘宝', 'tmall', '1688'} self.image_downloader = image_downloader self.db_manager = db_manager self.logger = logger self.browser = None self.page = None self.is_first_search = True # 첫 번째 검색 여부를 추적 self.logger.info("ImageSearcher initialized.") def start_browser(self): # Playwright로 브라우저를 시작하고 최초 접속 URL로 이동 self.logger.info("start_browser.") self.playwright = sync_playwright().start() browser_path = os.path.join(self.base_path, 'src', 'browsers', 'chromium-1112', 'chrome-win','chrome.exe') browser_webkit_path = os.path.join(self.base_path, 'src', 'browsers', 'webkit-2083', 'Playwright.exe') user_data_dir = os.path.join(self.base_path, 'src', 'browsers', 'user_data') self.logger.debug(f"브라우저 경로: {browser_path}") self.logger.debug(f"사용자 폴더 경로: {user_data_dir}") # 사용자 데이터 디렉토리가 존재하지 않으면 생성 if not os.path.exists(user_data_dir): os.makedirs(user_data_dir) self.logger.debug(f"{user_data_dir} 디렉토리가 생성되었습니다.") # User agent 설정 user_agent = random.choice([ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0", ]) self.logger.debug(f"user_agent: {user_agent}") # 브라우저 시작 및 설정 self.browser = self.playwright.chromium.launch_persistent_context( user_data_dir, headless=True, permissions=["geolocation", "notifications"], geolocation={"latitude": 37.5665, "longitude": 126.9780}, locale="ko-KR", args=[ '--disable-popup-blocking', '--start-maximized', '--window-size=1920,1080' ], executable_path=browser_path, user_agent=user_agent ) # self.browser = self.playwright.chromium.launch(headless=True) self.page = self.browser.new_page() # 첫 번째 기본 탭 닫기 if self.browser.pages: self.browser.pages[0].close() # User-Agent와 추가 헤더 설정 self.page.set_extra_http_headers({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36", "Referer": "https://www.baidu.com/", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept-Encoding": "gzip, deflate, br", "DNT": "1", # Do Not Track 요청 헤더 "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Cache-Control": "max-age=0" }) self.initial_url = 'https://graph.baidu.com/pcpage/index?tpl_from=pc' # upload_button_xpath = '//*[@id="app"]/div/div[1]/div[7]/div/span[1]/span[1]' self.logger.info("Start Complete B") # self.page.route("**/*", lambda route, request: route.abort() if request.resource_type in ["image", "stylesheet", "font"] else route.continue_()) # # self.page.goto(self.initial_url) # 최초 접속 URL # self.goto_initialPage() # self.page.wait_for_selector(upload_button_xpath) def goto_initialPage(self): self.page.goto(self.initial_url) # 최초 접속 URL def close_browser(self): # 브라우저 종료 if self.browser: self.browser.close() if self.playwright: self.playwright.stop() def check_capcha(self): try: # 현재 URL 확인 및 로그 출력 current_url = self.page.evaluate("() => window.location.href") self.logger.info(f"Current URL: {current_url}") # 에러 페이지 조건 확인 if current_url.startswith('https://graph.baidu.com/errpage'): self.logger.warning("Error page detected. Navigating back.") # self.page.go_back() # 뒤로 가기 # self.page.goto(self.initial_url) # 최초 접속 URL return True else: return False except Exception as e: self.logger.error(f"Finding Error page occured ERROR: {e}", exc_info=True) return False def upload_image(self, image_path): try: if self.check_capcha(): return False # 첫 번째 검색과 이후 검색의 선택자를 다르게 설정 # if self.is_first_search: # if self.is_first_search: self.logger.info("is_first_search") upload_button_xpath = '//*[@id="app"]/div/div[1]/div[7]/div/span[1]/span[1]' upload_input_xpath = '//*[@id="app"]/div/div[1]/div[7]/div/div/div[2]/div[2]/div/form/input' self.is_first_search = False # 이후 검색에서는 일반 선택자를 사용 # else: # self.logger.info("another search") # upload_button_xpath = '//*[@id="app"]/div/div[1]/div/div[1]/div/div/div[1]/span[1]/span[1]' # upload_input_xpath = '//*[@id="app"]/div/div[1]/div/div[1]/div/div/div[1]/div/div[2]/div[2]/div/form/input' # 이미지 업로드 버튼 클릭 및 파일 업로드 self.page.wait_for_selector(upload_button_xpath) self.page.click(upload_button_xpath) if self.check_capcha(): return False self.page.wait_for_selector(upload_input_xpath) self.page.set_input_files(upload_input_xpath, image_path) return True except Exception as e: self.logger.error(f"Error in upload_image: {e}", exc_info=True) def expand_results(self): # 확장 버튼 클릭 및 확장된 상품 카드 요소 로드 대기 if self.check_capcha(): return False self.logger.info("expand_results") expand_button_xpath = '//*[@id="app"]/div/div[2]/div/div[1]/ul/li[2]' self.page.wait_for_selector(expand_button_xpath) self.page.click(expand_button_xpath) if self.check_capcha(): return False # 확장된 상품 카드 이미지 요소가 로드될 때까지 대기 self.logger.info("확장된 상품 카드 이미지 요소가 로드될 때까지 대기") product_card_selector = 'div.graph-product-list-img img' self.page.wait_for_selector(product_card_selector) if self.check_capcha(): return False return True def extract_product_data(self, product_id): # 검색 결과 페이지에서 JSON 데이터 추출 if self.check_capcha(): return False content = None self.logger.info("검색 결과 페이지에서 JSON 데이터 추출") content = self.page.content() soup = BeautifulSoup(content, 'html.parser') script_tag = soup.select_one("html > head > script:nth-of-type(2)") if script_tag: self.logger.info("script_tag를 찾음") raw_data = script_tag.string.strip() match = re.search(r"window\.cardData\s*=\s*(\[\{.*\}\]);", raw_data, re.DOTALL) if match: json_data_str = match.group(1) # self.logger.debug(f"[json_data_str]\n{json_data_str}") try: data = json.loads(json_data_str) product_info = [] # 필터링된 출처에 따라 데이터 추출 for idx, card in enumerate(data): if card.get("cardName") == "product": products = card["tplData"]["list"] # 출처 필터링 filtered_products = [ product for product in products if product.get("source", "") in self.filtered_sources ] # 필터링 후 남은 데이터가 없을 경우 처리 건너뜀 if not filtered_products: self.logger.warning(f"No products left after filtering by source for card index {idx}. Skipping this card.") continue # 가격 기준으로 정렬 try: products = sorted(filtered_products, key=lambda x: float(x.get("text", "").replace("¥", "").replace(",", ""))) print(f"낮은 가격순 정렬") except ValueError as e: self.logger.error(f"Error sorting products by price: {e}") # 이미지 다운로드 및 saved_img_path 설정 for product in products: imgurl = product.get("imgurl", "") product["saved_img_path"] = self.image_downloader.download_image_for_searchResult( imgurl, product_id=product_id, index=products.index(product) ) # 1번상품부터 우선 is_selected = 1로 설정 products[0]["is_selected"] = 1 print(f"first products is_selected : 1") # # 최유사 최저가 상품 판별 # original_img_path = self.db_manager.get_saved_img_path(product_id) # most_similar_product = self.find_most_similar_product(original_img_path, products) # # 최유사 상품의 is_selected 설정 # if most_similar_product: # most_similar_product["is_selected"] = 1 # print(f"most_similar_product : {most_similar_product}") print(f"{idx}/{len(products)} Processed products") # 상품 정보를 product_info에 추가 for product in products: title = product.get("desc", "") source = product.get("source", "") price = product.get("text", "") buyurl = product.get("buyurl", "") imgurl = product.get("imgurl", "") saved_img_path = product.get("saved_img_path", "") is_selected = product.get("is_selected", 0) # if most_similar_product: # is_selected = most_similar_product.get("is_selected", 0) # print(f"is_selected : {is_selected}") # else: # is_selected = 0 # 출처 필터링 if source in self.filtered_sources: original_url = self.get_original_url(buyurl) product_info.append({ "title": title, "source": source, "price": price, "imgurl": imgurl, "saved_img_path": saved_img_path, "encrypted_url": buyurl, "original_url": original_url, "is_selected": is_selected }) self.logger.info("product_info 추출 완료") self.logger.info(f"{product_info}") return product_info except json.JSONDecodeError as e: self.logger.debug(f"JSON 디코딩 오류: {e}", exc_info=True) except Exception as e: self.logger.error(f"extract_product_data 오류: {e}", exc_info=True) return [] def get_original_url(self, encrypted_url): # 암호화된 구매 링크를 원래 URL로 변환 try: response = requests.get(encrypted_url, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36" }) self.logger.info("get_original_url 변환 완료") return response.url if response.status_code == 200 else "변환 실패" except requests.RequestException: self.logger.info("get_original_url 변환 실패") return "변환 실패" def find_most_similar_product(self, original_img_path, products): """ 원본 이미지와 유사성이 0.7 이상인 제품 중 최저가 상품을 선택합니다. :param original_img_path: 원본 이미지 경로 :param products: 상품 데이터 리스트 :return: 선택된 상품 (dict) 또는 None """ min_price = float('inf') most_similar_product = None try: # 원본 이미지 불러오기 original_img = Image.open(original_img_path).convert("L") # 흑백 변환 for product in products: try: # 저장된 이미지 경로 가져오기 saved_img_path = product.get("saved_img_path") if not saved_img_path: self.logger.debug("No saved image path for product.") continue # 저장된 이미지 불러오기 saved_img = Image.open(saved_img_path).convert("L") # 흑백 변환 # 이미지 크기 조정 if original_img.size != saved_img.size: saved_img = ImageOps.fit( saved_img, original_img.size, method=Image.Resampling.LANCZOS ) # 이미지 유사도 계산 original_arr = np.array(original_img) saved_arr = np.array(saved_img) similarity, _ = ssim(original_arr, saved_arr, full=True) if similarity >= 0.2: # 유사도가 0.7 이상일 경우 price_str = product.get("text", "").replace("¥", "").replace(",", "") price = float(price_str) if price < min_price: min_price = price most_similar_product = product except Exception as e: self.logger.debug(f"Error processing product image: {e}", exc_info=True) continue if most_similar_product: # 모든 상품의 is_selected 값을 초기화 for product in products: product["is_selected"] = 0 # 최종적으로 선택된 상품에 is_selected = 1 설정 if most_similar_product: most_similar_product["is_selected"] = 1 except Exception as e: self.logger.debug(f"Error loading original image: {e}", exc_info=True) return most_similar_product