TaoSourcerer/naverParser.py

274 lines
16 KiB
Python

import requests, time
from bs4 import BeautifulSoup
import sqlite3
import json
from datetime import datetime
import re
import logging
# 로거 인스턴스 가져오기
logger = logging.getLogger('default_logger')
# def find_codes(base_categories, json_data):
# for base_category in base_categories:
# category_parts = re.split(r'>|-', base_category)
# # category_parts 리스트에서 None 값들로 확장하여 필요한 부분까지 포함시킴
# base_category1Name, base_category2Name, base_category3Name, base_category4Name = (category_parts + [None] * 4)[:4]
# for item in json_data:
# # 'category3Name'과 'category4Name' 모두에 대해 값이 없거나 None일 때 처리를 포함
# if (item.get('category1Name') == base_category1Name and
# item.get('category2Name') == base_category2Name and
# (item.get('category3Name') == base_category3Name or item.get('category3Name') is None) and
# (item.get('category4Name') == base_category4Name or item.get('category4Name') is None)):
# return item['Naver_code'], item.get('cat_code') # naver_code와 cat_code를 반환
# return None, None # 조건에 맞는 항목을 찾지 못한 경우
def parse_naver_shopping(keyword_id, keyword, isBranch, branchCount, json_data, conn, overPrice, sortcount=5):
# naver_code = code[0]
# base_cat_code = code[1]
# if not naver_code:
# naver_code = ""
# 네이버 쇼핑 URL 설정
urlBase = "https://search.shopping.naver.com/search/all?query="
# urlEnd = f"&cat_id={naver_code}&frm=NVSHATC&pagingIndex=1&pagingSize=40&productSet=overseas&sort=rel&timestamp=&viewType=list"
urlEnd = f"&frm=NVSHATC&pagingIndex=1&pagingSize=40&productSet=overseas&sort=rel&timestamp=&viewType=list"
url = urlBase + keyword + urlEnd
logger.debug(f"검색 주소는 [{url}] 입니다.")
logger.debug(f"대상키워드는 [{keyword}] 입니다.")
# logger.debug(f"대상 카테고리는 [{naver_code}] 입니다.")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1", # Do Not Track 요청 헤더 (사용자의 추적을 거부)
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1", # https로의 업그레이드를 요청
"Cache-Control": "max-age=0", # 캐시된 콘텐츠를 재사용하지 않도록 요청
}
# def fetch_with_retry(url, headers, retries=3, delay=2):
# """네이버 쇼핑에 접속하여 HTML 받아오고 "__NEXT_DATA__"를 찾는 함수.
# Args:
# url (str): 접속할 URL.
# headers (dict): HTTP 요청에 사용될 헤더.
# retries (int): 최대 재시도 횟수.
# delay (int): 재시도 전 대기 시간 (초).
# Returns:
# str or None: 찾은 "__NEXT_DATA__"의 문자열, 또는 실패시 None.
# """
# for attempt in range(retries):
# response = requests.get(url, headers=headers)
# soup = BeautifulSoup(response.text, 'html.parser')
# next_data_tag = soup.find("script", {"id": "__NEXT_DATA__"})
# if next_data_tag:
# return next_data_tag.string
# # "__NEXT_DATA__"를 찾지 못한 경우, 일정 시간 기다린 후 재시도
# time.sleep(delay)
# # 지정된 재시도 횟수 후에도 찾지 못한 경우
# logger.debug("Failed to find '__NEXT_DATA__' after retrying.")
# return None
# next_data_str = fetch_with_retry(url, headers)
# 네이버 쇼핑에 접속하여 HTML 받아오기
try:
response = requests.get(url, headers=headers)
response.raise_for_status() # 만약 요청이 실패하면 예외 발생
except requests.exceptions.RequestException as e:
logger.debug(f"네이버 쇼핑 HTML을 가져오는 중 오류 발생: {e}")
# 예외 처리 코드 추가
soup = BeautifulSoup(response.text, 'html.parser')
time.sleep(1)
# "__NEXT_DATA__" 파싱하기
logger.debug("NEXT_DATA 파싱")
next_data = soup.find("script", {"id": "__NEXT_DATA__"})
if next_data:
next_data_str = next_data.string
else:
# If the tag wasn't found, it might be a dynamic loading issue or the structure has changed.
logger.debug("요소찾기 실패 : __NEXT_DATA__ script tag.")
soup = BeautifulSoup(response.text, 'html.parser')
time.sleep(1)
next_data = soup.find("script", {"id": "__NEXT_DATA__"})
next_data_str = next_data.string
# pass
next_data_json = json.loads(next_data_str)
# products 리스트 가져오기
logger.debug("products 리스트 가져오기")
products_list = next_data_json["props"]["pageProps"]["initialState"]["products"]["list"]
logger.debug(f"products 리스트 [{len(products_list)}] 개 입니다.")
# 가격이 5만원 이하인 제품 제외 후 필터링
filtered_products = [product for product in products_list if int(product.get("item", {}).get("price", 0)) > 50000]
logger.debug(f"가격이 {overPrice}원 이하인 제품을 제외한 후 [{len(filtered_products)}] 개의 상품이 남았습니다.")
# 필터링된 제품의 갯수 확인
if not filtered_products:
# 상품이 없으면 나머지 코드 실행을 중지하고 로그를 남김
logger.debug("필터링된 상품이 없어서 이후 과정은 수행되지 않습니다.")
else:
# 상품이 있으면 나머지 로직 실행
if len(filtered_products) >= 4:
# 상품 정렬: 낮은 가격 순
sorted_products_by_price = sorted(filtered_products, key=lambda p: int(p.get("item", {}).get("price", 0)))
logger.debug(f"상품 정렬 {len(sorted_products_by_price)}개 선택")
# 가격이 높은 상품 1개와 낮은 상품 1개를 제외한 나머지 상품 추출
remaining_products = sorted_products_by_price[1:-1]
logger.debug(f"낮은가격 1개, 높은가격 1개제외 한 나머지 상품 정렬 {len(remaining_products)}개 선택")
else:
sorted_products_by_price = sorted(filtered_products, key=lambda p: int(p.get("item", {}).get("price", 0)))
logger.debug(f"총 상품 4개이하 | 상품 정렬 {len(sorted_products_by_price)}개 선택")
remaining_products = filtered_products
logger.debug(f"총 상품 4개이하로 상하위 상품제외 로직 pass {len(remaining_products)}개 선택")
# 추출된 상품을 rank 순서대로 정렬
top5_products = sorted(remaining_products, key=lambda p: int(p.get("item", {}).get("rank", 0)))
logger.debug(f"추출된 상품을 rank 순서대로 정렬 : {len(top5_products)}개 선택")
final_top_5_products = top5_products[:5]
logger.debug(f"RANK 정렬상품 중 마지막으로 상위 제품 [{len(final_top_5_products)}]개 선택")
merged_price = 0
for index, product in enumerate(final_top_5_products):
price = int(product.get("item", {}).get("price"))
merged_price += price # 평균 가격 계산을 위해
merged_price = merged_price / len(final_top_5_products)
c = conn.cursor()
# original relatedTags 리스트 가져오기
related_tags_ori = next_data_json["props"]["pageProps"]["relatedTags"]
# relatedTags 리스트 생성 , 초기화 및 검증
if related_tags_ori:
related_tags = [tag.strip() for tag in related_tags_ori if tag] # related_tags_ori에서 공백 제거 후 related_tags에 할당
else:
related_tags = []
logger.debug(f"현재 키워드인 [{keyword}]에 대한 연관검색어는 [{related_tags}] 입니다.")
# branchCount 값이 문자열로 전달될 가능성이 있으므로, 정수로 변환
try:
branchCount = min(int(branchCount), 5) # 최대 5
except ValueError:
logger.error(f"branchCount 값이 정수로 변환되지 않습니다: {branchCount}")
branchCount = 0 # 기본값 설정
if isBranch:
logger.debug(f"가지치기 설정이 [{isBranch}] 이므로 {branchCount}개 연관검색어 보존")
# related_tags의 길이가 branchCount보다 짧은 경우 "-"로 채워서 5의 길이를 유지
if len(related_tags) < branchCount:
related_tags += ["-"] * (branchCount - len(related_tags))
# 리스트의 길이를 branchCount만큼 자르고, 부족한 부분은 "-"로 채움
related_tags = related_tags[:branchCount] + ["-"] * (5 - branchCount)
else:
logger.debug(f"가지치기 설정이 [{isBranch}] 이므로 연관검색어 초기화")
related_tags = ["-"] * 5 # 5개의 "-" 문자열로 초기화
# DB에 기록하기
for index, product in enumerate(final_top_5_products):
# 첫 번째 제품에 대한 keyword 처리
logger.debug(f"{keyword}의 검색결과 [{index+1}]번째 상품 처리")
# current_keyword = keyword if index == 0 else related_tags[index - 1] if index - 1 < len(related_tags) else keyword
current_keyword = keyword if index == 0 else related_tags[index - 1] if index - 1 < len(related_tags) else ""
logger.debug(f"가지치기 상태 : [{isBranch}] 이므로 {index+1}번째 [current_keyword]에는 [{current_keyword}] 할당 ")
# current_keyword = keyword
price = product.get("item", {}).get("price")
productTitle = product.get("item", {}).get("productTitle")
category1Name = product.get("item", {}).get("category1Name")
category2Name = product.get("item", {}).get("category2Name")
category3Name = product.get("item", {}).get("category3Name")
category4Name = product.get("item", {}).get("category4Name")
openDate = product.get("item", {}).get("openDate")
mallCount = product.get("item", {}).get("mallCount")
keepCnt = product.get("item", {}).get("keepCnt")
overseaTp = product.get("item", {}).get("overseaTp")
reviewCount = product.get("item", {}).get("reviewCount")
reviewCountSum = product.get("item", {}).get("reviewCountSum")
scoreInfo = product.get("item", {}).get("scoreInfo")
naverPayAdAccumulatedDisplayValue = product.get("item", {}).get("naverPayAdAccumulatedDisplayValue")
mobileLowPrice = product.get("item", {}).get("mobileLowPrice")
lowPrice = product.get("item", {}).get("lowPrice")
deliveryFeeContent = product.get("item", {}).get("deliveryFeeContent")
dlvryLowPrice = product.get("item", {}).get("dlvryLowPrice")
imageUrl = product.get("item", {}).get("imageUrl")
imgSz = product.get("item", {}).get("imgSz")
searchKeyword = product.get("item", {}).get("searchKeyword")
mallProductUrl = product.get("item", {}).get("mallProductUrl")
mallPcUrl = product.get("item", {}).get("mallPcUrl")
mallName = product.get("item", {}).get("mallName")
manuTag = product.get("item", {}).get("manuTag")
#mallInfoCache = product.get("item", {}).get("mallInfoCache")
purchaseCnt = product.get("item", {}).get("purchaseCnt")
rank = product.get("item", {}).get("rank")
# 현재 날짜와 시간 가져오기
now = datetime.now()
logger.debug("cat_code 찾기")
c.execute("""
SELECT cat_code FROM SSCategoryCode
WHERE cat1 = ? AND cat2 = ? AND cat3 = ? AND (cat4 = ? OR cat4 IS NULL)
""", (category1Name, category2Name, category3Name, category4Name or None))
cat_code_result = c.fetchone()
cat_code = cat_code_result[0] if cat_code_result else ""
logger.debug(f"[{index+1}/{len(final_top_5_products)}]번째 상품 cat_code 8자리 숫자 : {cat_code}")
# 날짜와 시간을 문자열로 변환하기 (예: '2023-12-19', '14:30:00')
date_created = now.strftime('%Y-%m-%d')
time_created = now.strftime('%H:%M:%S')
# 중복 체크를 위한 고유 식별자 생성 (예: productTitle + imageUrl)
unique_id = productTitle + imageUrl
# 데이터베이스에서 중복되는 제품이 있는지 확인
c.execute("SELECT COUNT(*) FROM NaverShopping WHERE unique_id = ?", (unique_id,))
count = c.fetchone()[0]
if count == 0:
# 중복되는 데이터가 없으면 새로운 데이터 삽입
c.execute("INSERT INTO NaverShopping (keyword_id, keyword, price, productTitle, category1Name, category2Name, category3Name, category4Name, cat_code, openDate, mallCount, keepCnt, overseaTp, reviewCount, reviewCountSum, scoreInfo, naverPayAdAccumulatedDisplayValue, mobileLowPrice, lowPrice, deliveryFeeContent, dlvryLowPrice, imageUrl, imgSz, searchKeyword, mallProductUrl, mallPcUrl, mallName, manuTag, purchaseCnt, relatedTags, rank, merged_price, unique_id, date_created, time_created) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(keyword_id+1, current_keyword, price, productTitle, category1Name, category2Name, category3Name, category4Name, cat_code, openDate, mallCount, keepCnt, overseaTp, reviewCount, reviewCountSum, scoreInfo, naverPayAdAccumulatedDisplayValue, mobileLowPrice, lowPrice, deliveryFeeContent, dlvryLowPrice, imageUrl, imgSz, searchKeyword, mallProductUrl, mallPcUrl, mallName, manuTag, purchaseCnt, current_keyword, rank, merged_price, unique_id, date_created, time_created)) # keyword_id, item_name, price, purchase_count, related_keywords, rank는 적절하게 설정해야 합니다.
logger.debug(f"{current_keyword} DB업데이트 완료")
else:
# 중복되는 데이터가 있으면 업데이트 또는 무시 (여기서는 예시로 업데이트 로직을 추가함)
#c.execute("UPDATE NaverShopping SET keyword_id = ?, keyword = ?, price = ?, productTitle = ?, category1Name = ?, category2Name = ?, category3Name = ?, category4Name = ?, openDate = ?, mallCount = ?, keepCnt = ?, overseaTp = ?, reviewCount = ?, reviewCountSum = ?, scoreInfo = ?, naverPayAdAccumulatedDisplayValue = ?, mobileLowPrice = ?, lowPrice = ?, deliveryFeeContent = ?, dlvryLowPrice = ?, imageUrl = ?, imgSz = ?, searchKeyword = ?, mallProductUrl = ?, mallPcUrl = ?, mallName = ?, manuTag = ?, purchaseCnt = ?, relatedTags = ?, rank = ?, date_created = ?, WHERE unique_id = ?",
# (keyword_id, current_keyword, price, productTitle, category1Name, category2Name, category3Name, category4Name, openDate, mallCount, keepCnt, overseaTp, reviewCount, reviewCountSum, scoreInfo, naverPayAdAccumulatedDisplayValue, mobileLowPrice, lowPrice, deliveryFeeContent, dlvryLowPrice, imageUrl, imgSz, searchKeyword, mallProductUrl, mallPcUrl, mallName, manuTag, purchaseCnt, current_keyword, rank, date_created, unique_id)) # keyword_id, item_name, price, purchase_count, related_keywords, rank는 적절하게 설정해야 합니다.
pass
# SubKeywords 테이블에 기록하기
# ... (SubKeywords 테이블에 기록하는 코드) ...
logger.debug(f"키워드 검색 결과 상품 [{keyword}]에 대한 [{len(final_top_5_products)}]개의 상품정보수집 완료")
conn.commit() # Commit the transaction
#conn.close() # Close the connection