import requests from bs4 import BeautifulSoup # import sqlite3 import json # from datetime import datetime import logging # 로거 인스턴스 가져오기 logger = logging.getLogger('default_logger') def parse_naver_shopping(keyword, naver_code, isOverseas=1, sortcount=10): # 네이버 쇼핑 URL 설정 urlBase = "https://search.shopping.naver.com/search/all?query=" # urlEnd = f"&cat_id={naver_code}&frm=NVSHATC&pagingIndex=1&pagingSize=40&&productSet=overseas&sort=rel×tamp=&viewType=list" urlEnd = "&frm=NVSHATC&pagingIndex=1&pagingSize=40&&productSet=overseas&sort=rel×tamp=&viewType=list" url = urlBase + keyword + urlEnd # logger.debug(f"네이버 카테코드는 [{naver_code}] 입니다.") logger.debug("네이버 카테코드는 [생략]] 입니다.") logger.debug(f"대상키워드는 [{keyword}] 입니다.") headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate, br", "DNT": "1", # Do Not Track 요청 헤더 (사용자의 추적을 거부) "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", # https로의 업그레이드를 요청 "Cache-Control": "max-age=0", # 캐시된 콘텐츠를 재사용하지 않도록 요청 } # # 네이버 쇼핑에 접속하여 HTML 받아오기 # response = requests.get(url, headers=headers) # soup = BeautifulSoup(response.text, 'html.parser') # 네이버 쇼핑에 접속하여 HTML 받아오기 try: response = requests.get(url, headers=headers) response.raise_for_status() # 만약 요청이 실패하면 예외 발생 except requests.exceptions.RequestException as e: logger.debug(f"네이버 쇼핑 HTML을 가져오는 중 오류 발생: {e}", exc_info=True) # 예외 처리 코드 추가 soup = BeautifulSoup(response.text, 'html.parser') # "__NEXT_DATA__" 파싱하기 logger.debug("NEXT_DATA 파싱") try: next_data_str = soup.find("script", {"id": "__NEXT_DATA__"}).string except AttributeError as e: logger.debug("NEXT_DATA 스크립트를 찾을 수 없습니다.") # 예외 처리 코드 추가 else: try: next_data_json = json.loads(next_data_str) except json.JSONDecodeError as e: logger.debug("NEXT_DATA JSON을 파싱하는 중 오류 발생:", e) # 예외 처리 코드 추가 # # "__NEXT_DATA__" 파싱하기 # logger.debug("NEXT_DATA 파싱 완료") # next_data_str = soup.find("script", {"id": "__NEXT_DATA__"}).string # next_data_json = json.loads(next_data_str) # products 리스트 가져오기 logger.debug("products 리스트 가져오기") products_list = next_data_json["props"]["pageProps"]["initialState"]["products"]["list"] logger.debug(f"products 리스트 [{len(products_list)}] 개 입니다.") filtered_products = products_list #logger.debug(products_list) # isOverseas 값에 따라 필터링 #filtered_products = [product for product in products_list if product.get("item", {}).get("overseaTp", 0) == isOverseas] # isOverseas 값에 따라 필터링 # isOverseas = int(isOverseas) # if isOverseas == 1: # filtered_products = [product for product in products_list if int(product.get("item", {}).get("overseaTp")) == 1] # else: # filtered_products = products_list # 모든 제품 포함 #logger.debug(filtered_products) # purchaseCnt, reviewCount, keepCnt를 기준으로 정렬 logger.debug(f"해외배송여부는 [{isOverseas}] 입니다.") if len(filtered_products) >= 4: # 상품 정렬: 낮은 가격 순 sorted_products_by_price = sorted(filtered_products, key=lambda p: int(p.get("item", {}).get("price", 0))) logger.debug(f"상품 정렬 {len(sorted_products_by_price)}개 선택") # 가격이 높은 상품 3개와 낮은 상품 2개를 제외한 나머지 상품 추출 remaining_products = sorted_products_by_price[1:-1] logger.debug(f"낮은가격 2개, 높은가격 3개제외 한 나머지 상품 정렬 {len(remaining_products)}개 선택") else: sorted_products_by_price = sorted(filtered_products, key=lambda p: int(p.get("item", {}).get("price", 0))) logger.debug(f"총 상품 4개이하 | 상품 정렬 {len(sorted_products_by_price)}개 선택") remaining_products = filtered_products logger.debug(f"총 상품 4개이하로 상하위 상품제외 로직 pass {len(remaining_products)}개 선택") # 추출된 상품을 rank 순서대로 정렬 top5_products = sorted(remaining_products, key=lambda p: int(p.get("item", {}).get("rank", 0))) logger.debug(f"추출된 상품을 rank 순서대로 정렬 : {len(top5_products)}개 선택") final_top_5_products = top5_products[:5] logger.debug(f"RANK 정렬상품 중 마지막으로 상위 제품 [{len(final_top_5_products)}]개 선택") # pro_num = len(top5_products) # logger.debug(f"정렬상품 중 상위 5[{pro_num}]개 선택") #logger.debug(f"해외배송여부필터링된 상위제품들은 [{filtered_products}] 입니다.") # def sort_key(p): # # 값을 가져오고, 문자열이면 정수로 변환 # def to_int(value): # try: # return int(value) # except (ValueError, TypeError): # return 0 # 숫자로 변환할 수 없는 경우 0을 반환 # rank = to_int(p.get("item", {}).get("rank", 0)) # purchaseCnt = to_int(p.get("item", {}).get("purchaseCnt", 0)) # reviewCount = to_int(p.get("item", {}).get("reviewCount", 0)) # keepCnt = to_int(p.get("item", {}).get("keepCnt", 0)) # # return rank, purchaseCnt, reviewCount, keepCnt # return rank # # 정렬 부분 # sorted_products = sorted(filtered_products, key=sort_key, reverse=False) # logger.debug(f"정렬된 상품 {len(sorted_products)}개") # # 낮은가격이 위로 올라갈 경우 메인키워드와 일치하지않는 부분이 발생함 ex) 유압도끼 검색결과가 정렬 후 수동도끼로 바뀜. # #logger.debug(f"3가지조건으로 정렬된 상품들 : [{sorted_products}]") # #logger.debug(sorted_products) # # 상위 5개 제품 선택 # top_5_products = sorted_products[:int(sortcount)] # logger.debug(f"정렬된 상품 {len(top_5_products)}개") # # 낮은가격이 위로 올라갈 경우 메인키워드와 일치하지않는 부분이 발생함 ex) 유압도끼 검색결과가 정렬 후 수동도끼로 바뀜. # # top_5_products = filtered_products[:int(sortcount)] # #logger.debug(f"정렬상품 중 상위 5개 선택 : [{top_5_products}]") # # SQLite DB에 연결하기 # #conn = sqlite3.connect(db_name) # # c = conn.cursor() # original relatedTags 리스트 가져오기 related_tags_ori = next_data_json["props"]["pageProps"]["relatedTags"] # 상품 정보를 담을 리스트 초기화 products_info = [] # relatedTags 리스트 생성 related_tags = [tag.strip() for tag in related_tags_ori if tag] logger.debug(f"현재 키워드인 [{keyword}]에 대한 연관검색어는 [{related_tags}] 입니다.") for product in final_top_5_products: price = product.get("item", {}).get("price") productTitle = product.get("item", {}).get("productTitle") category1Name = product.get("item", {}).get("category1Name") category2Name = product.get("item", {}).get("category2Name") category3Name = product.get("item", {}).get("category3Name") category4Name = product.get("item", {}).get("category4Name") openDate = product.get("item", {}).get("openDate") mallCount = product.get("item", {}).get("mallCount") keepCnt = product.get("item", {}).get("keepCnt") overseaTp = product.get("item", {}).get("overseaTp") reviewCount = product.get("item", {}).get("reviewCount") reviewCountSum = product.get("item", {}).get("reviewCountSum") scoreInfo = product.get("item", {}).get("scoreInfo") naverPayAdAccumulatedDisplayValue = product.get("item", {}).get("naverPayAdAccumulatedDisplayValue") mobileLowPrice = product.get("item", {}).get("mobileLowPrice") lowPrice = product.get("item", {}).get("lowPrice") deliveryFeeContent = product.get("item", {}).get("deliveryFeeContent") dlvryLowPrice = product.get("item", {}).get("dlvryLowPrice") imageUrl = product.get("item", {}).get("imageUrl") imgSz = product.get("item", {}).get("imgSz") searchKeyword = product.get("item", {}).get("searchKeyword") mallProductUrl = product.get("item", {}).get("mallProductUrl") mallPcUrl = product.get("item", {}).get("mallPcUrl") mallName = product.get("item", {}).get("mallName") manuTag = product.get("item", {}).get("manuTag") #mallInfoCache = product.get("item", {}).get("mallInfoCache") purchaseCnt = product.get("item", {}).get("purchaseCnt") rank = product.get("item", {}).get("rank") # 상품 정보를 딕셔너리로 만들어 리스트에 추가 product_info = { "productTitle": productTitle, "price": price, "imageUrl": imageUrl, "rank": rank, "purchase" : purchaseCnt, "review" : reviewCountSum } products_info.append(product_info) logger.debug(f"키워드 검색 결과 상품 [{keyword}]에 대한 [{len(products_info)}]개의 상품정보수집 완료") return products_info