238 lines
14 KiB
Python
238 lines
14 KiB
Python
import requests, time
|
|
from bs4 import BeautifulSoup
|
|
import sqlite3
|
|
import json
|
|
from datetime import datetime
|
|
import re
|
|
import logging
|
|
|
|
# 로거 인스턴스 가져오기
|
|
logger = logging.getLogger('default_logger')
|
|
|
|
# def find_codes(base_categories, json_data):
|
|
# for base_category in base_categories:
|
|
# category_parts = re.split(r'>|-', base_category)
|
|
# # category_parts 리스트에서 None 값들로 확장하여 필요한 부분까지 포함시킴
|
|
# base_category1Name, base_category2Name, base_category3Name, base_category4Name = (category_parts + [None] * 4)[:4]
|
|
|
|
# for item in json_data:
|
|
# # 'category3Name'과 'category4Name' 모두에 대해 값이 없거나 None일 때 처리를 포함
|
|
# if (item.get('category1Name') == base_category1Name and
|
|
# item.get('category2Name') == base_category2Name and
|
|
# (item.get('category3Name') == base_category3Name or item.get('category3Name') is None) and
|
|
# (item.get('category4Name') == base_category4Name or item.get('category4Name') is None)):
|
|
# return item['Naver_code'], item.get('cat_code') # naver_code와 cat_code를 반환
|
|
# return None, None # 조건에 맞는 항목을 찾지 못한 경우
|
|
|
|
|
|
def parse_naver_shopping(keyword_id, keyword, json_data, conn, overPrice, sortcount=5):
|
|
|
|
# naver_code = code[0]
|
|
# base_cat_code = code[1]
|
|
|
|
# if not naver_code:
|
|
# naver_code = ""
|
|
|
|
# 네이버 쇼핑 URL 설정
|
|
urlBase = "https://search.shopping.naver.com/search/all?query="
|
|
# urlEnd = f"&cat_id={naver_code}&frm=NVSHATC&pagingIndex=1&pagingSize=40&productSet=overseas&sort=rel×tamp=&viewType=list"
|
|
urlEnd = f"&frm=NVSHATC&pagingIndex=1&pagingSize=40&productSet=overseas&sort=rel×tamp=&viewType=list"
|
|
url = urlBase + keyword + urlEnd
|
|
logger.debug(f"검색 주소는 [{url}] 입니다.")
|
|
logger.debug(f"대상키워드는 [{keyword}] 입니다.")
|
|
# logger.debug(f"대상 카테고리는 [{naver_code}] 입니다.")
|
|
|
|
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
|
|
"Accept-Language": "en-US,en;q=0.9",
|
|
"Accept-Encoding": "gzip, deflate, br",
|
|
"DNT": "1", # Do Not Track 요청 헤더 (사용자의 추적을 거부)
|
|
"Connection": "keep-alive",
|
|
"Upgrade-Insecure-Requests": "1", # https로의 업그레이드를 요청
|
|
"Cache-Control": "max-age=0", # 캐시된 콘텐츠를 재사용하지 않도록 요청
|
|
}
|
|
|
|
# def fetch_with_retry(url, headers, retries=3, delay=2):
|
|
# """네이버 쇼핑에 접속하여 HTML 받아오고 "__NEXT_DATA__"를 찾는 함수.
|
|
|
|
# Args:
|
|
# url (str): 접속할 URL.
|
|
# headers (dict): HTTP 요청에 사용될 헤더.
|
|
# retries (int): 최대 재시도 횟수.
|
|
# delay (int): 재시도 전 대기 시간 (초).
|
|
|
|
# Returns:
|
|
# str or None: 찾은 "__NEXT_DATA__"의 문자열, 또는 실패시 None.
|
|
# """
|
|
# for attempt in range(retries):
|
|
# response = requests.get(url, headers=headers)
|
|
# soup = BeautifulSoup(response.text, 'html.parser')
|
|
|
|
# next_data_tag = soup.find("script", {"id": "__NEXT_DATA__"})
|
|
# if next_data_tag:
|
|
# return next_data_tag.string
|
|
|
|
# # "__NEXT_DATA__"를 찾지 못한 경우, 일정 시간 기다린 후 재시도
|
|
# time.sleep(delay)
|
|
|
|
# # 지정된 재시도 횟수 후에도 찾지 못한 경우
|
|
# logger.debug("Failed to find '__NEXT_DATA__' after retrying.")
|
|
# return None
|
|
|
|
# next_data_str = fetch_with_retry(url, headers)
|
|
|
|
# 네이버 쇼핑에 접속하여 HTML 받아오기
|
|
try:
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status() # 만약 요청이 실패하면 예외 발생
|
|
except requests.exceptions.RequestException as e:
|
|
logger.debug(f"네이버 쇼핑 HTML을 가져오는 중 오류 발생: {e}")
|
|
# 예외 처리 코드 추가
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
|
|
time.sleep(1)
|
|
|
|
# "__NEXT_DATA__" 파싱하기
|
|
logger.debug("NEXT_DATA 파싱")
|
|
next_data = soup.find("script", {"id": "__NEXT_DATA__"})
|
|
|
|
if next_data:
|
|
next_data_str = next_data.string
|
|
else:
|
|
# If the tag wasn't found, it might be a dynamic loading issue or the structure has changed.
|
|
logger.debug("요소찾기 실패 : __NEXT_DATA__ script tag.")
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
time.sleep(1)
|
|
next_data = soup.find("script", {"id": "__NEXT_DATA__"})
|
|
next_data_str = next_data.string
|
|
# pass
|
|
|
|
next_data_json = json.loads(next_data_str)
|
|
|
|
|
|
# products 리스트 가져오기
|
|
logger.debug("products 리스트 가져오기")
|
|
products_list = next_data_json["props"]["pageProps"]["initialState"]["products"]["list"]
|
|
logger.debug(f"products 리스트 [{len(products_list)}] 개 입니다.")
|
|
|
|
# 가격이 5만원 이하인 제품 제외 후 필터링
|
|
filtered_products = [product for product in products_list if int(product.get("item", {}).get("price", 0)) > 50000]
|
|
logger.debug(f"가격이 {overPrice}원 이하인 제품을 제외한 후 [{len(filtered_products)}] 개의 상품이 남았습니다.")
|
|
|
|
# 필터링된 제품의 갯수 확인
|
|
if not filtered_products:
|
|
# 상품이 없으면 나머지 코드 실행을 중지하고 로그를 남김
|
|
logger.debug("필터링된 상품이 없어서 이후 과정은 수행되지 않습니다.")
|
|
else:
|
|
# 상품이 있으면 나머지 로직 실행
|
|
if len(filtered_products) >= 4:
|
|
# 상품 정렬: 낮은 가격 순
|
|
sorted_products_by_price = sorted(filtered_products, key=lambda p: int(p.get("item", {}).get("price", 0)))
|
|
logger.debug(f"상품 정렬 {len(sorted_products_by_price)}개 선택")
|
|
|
|
# 가격이 높은 상품 1개와 낮은 상품 1개를 제외한 나머지 상품 추출
|
|
remaining_products = sorted_products_by_price[1:-1]
|
|
logger.debug(f"낮은가격 1개, 높은가격 1개제외 한 나머지 상품 정렬 {len(remaining_products)}개 선택")
|
|
|
|
else:
|
|
sorted_products_by_price = sorted(filtered_products, key=lambda p: int(p.get("item", {}).get("price", 0)))
|
|
logger.debug(f"총 상품 4개이하 | 상품 정렬 {len(sorted_products_by_price)}개 선택")
|
|
|
|
remaining_products = filtered_products
|
|
logger.debug(f"총 상품 4개이하로 상하위 상품제외 로직 pass {len(remaining_products)}개 선택")
|
|
|
|
# 추출된 상품을 rank 순서대로 정렬
|
|
top5_products = sorted(remaining_products, key=lambda p: int(p.get("item", {}).get("rank", 0)))
|
|
logger.debug(f"추출된 상품을 rank 순서대로 정렬 : {len(top5_products)}개 선택")
|
|
|
|
final_top_5_products = top5_products[:5]
|
|
logger.debug(f"RANK 정렬상품 중 마지막으로 상위 제품 [{len(final_top_5_products)}]개 선택")
|
|
|
|
c = conn.cursor()
|
|
|
|
# original relatedTags 리스트 가져오기
|
|
related_tags_ori = next_data_json["props"]["pageProps"]["relatedTags"]
|
|
|
|
# relatedTags 리스트 생성
|
|
related_tags = [tag.strip() for tag in related_tags_ori if tag]
|
|
logger.debug(f"현재 키워드인 [{keyword}]에 대한 연관검색어는 [{related_tags}] 입니다.")
|
|
|
|
# DB에 기록하기
|
|
for index, product in enumerate(final_top_5_products):
|
|
# 첫 번째 제품에 대한 keyword 처리
|
|
logger.debug(f"{keyword}의 검색결과 [{index}]번째 상품 처리")
|
|
current_keyword = keyword if index == 0 else related_tags_ori[index - 1] if index - 1 < len(related_tags_ori) else keyword
|
|
# current_keyword = keyword
|
|
price = product.get("item", {}).get("price")
|
|
productTitle = product.get("item", {}).get("productTitle")
|
|
category1Name = product.get("item", {}).get("category1Name")
|
|
category2Name = product.get("item", {}).get("category2Name")
|
|
category3Name = product.get("item", {}).get("category3Name")
|
|
category4Name = product.get("item", {}).get("category4Name")
|
|
openDate = product.get("item", {}).get("openDate")
|
|
mallCount = product.get("item", {}).get("mallCount")
|
|
keepCnt = product.get("item", {}).get("keepCnt")
|
|
overseaTp = product.get("item", {}).get("overseaTp")
|
|
reviewCount = product.get("item", {}).get("reviewCount")
|
|
reviewCountSum = product.get("item", {}).get("reviewCountSum")
|
|
scoreInfo = product.get("item", {}).get("scoreInfo")
|
|
naverPayAdAccumulatedDisplayValue = product.get("item", {}).get("naverPayAdAccumulatedDisplayValue")
|
|
mobileLowPrice = product.get("item", {}).get("mobileLowPrice")
|
|
lowPrice = product.get("item", {}).get("lowPrice")
|
|
deliveryFeeContent = product.get("item", {}).get("deliveryFeeContent")
|
|
dlvryLowPrice = product.get("item", {}).get("dlvryLowPrice")
|
|
imageUrl = product.get("item", {}).get("imageUrl")
|
|
imgSz = product.get("item", {}).get("imgSz")
|
|
searchKeyword = product.get("item", {}).get("searchKeyword")
|
|
mallProductUrl = product.get("item", {}).get("mallProductUrl")
|
|
mallPcUrl = product.get("item", {}).get("mallPcUrl")
|
|
mallName = product.get("item", {}).get("mallName")
|
|
manuTag = product.get("item", {}).get("manuTag")
|
|
#mallInfoCache = product.get("item", {}).get("mallInfoCache")
|
|
purchaseCnt = product.get("item", {}).get("purchaseCnt")
|
|
rank = product.get("item", {}).get("rank")
|
|
# 현재 날짜와 시간 가져오기
|
|
now = datetime.now()
|
|
|
|
|
|
logger.debug("cat_code 찾기")
|
|
|
|
c.execute("""
|
|
SELECT cat_code FROM SSCategoryCode
|
|
WHERE cat1 = ? AND cat2 = ? AND cat3 = ? AND (cat4 = ? OR cat4 IS NULL)
|
|
""", (category1Name, category2Name, category3Name, category4Name or None))
|
|
cat_code_result = c.fetchone()
|
|
cat_code = cat_code_result[0] if cat_code_result else ""
|
|
logger.debug(f"[{index}/{len(final_top_5_products)}]번째 상품 cat_code 8자리 숫자 : {cat_code}")
|
|
|
|
# 날짜와 시간을 문자열로 변환하기 (예: '2023-12-19', '14:30:00')
|
|
date_created = now.strftime('%Y-%m-%d')
|
|
time_created = now.strftime('%H:%M:%S')
|
|
|
|
# 중복 체크를 위한 고유 식별자 생성 (예: productTitle + imageUrl)
|
|
unique_id = productTitle + imageUrl
|
|
# 데이터베이스에서 중복되는 제품이 있는지 확인
|
|
c.execute("SELECT COUNT(*) FROM NaverShopping WHERE unique_id = ?", (unique_id,))
|
|
count = c.fetchone()[0]
|
|
|
|
if count == 0:
|
|
# 중복되는 데이터가 없으면 새로운 데이터 삽입
|
|
c.execute("INSERT INTO NaverShopping (keyword_id, keyword, price, productTitle, category1Name, category2Name, category3Name, category4Name, cat_code, openDate, mallCount, keepCnt, overseaTp, reviewCount, reviewCountSum, scoreInfo, naverPayAdAccumulatedDisplayValue, mobileLowPrice, lowPrice, deliveryFeeContent, dlvryLowPrice, imageUrl, imgSz, searchKeyword, mallProductUrl, mallPcUrl, mallName, manuTag, purchaseCnt, relatedTags, rank, unique_id, date_created, time_created) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(keyword_id+1, current_keyword, price, productTitle, category1Name, category2Name, category3Name, category4Name, cat_code, openDate, mallCount, keepCnt, overseaTp, reviewCount, reviewCountSum, scoreInfo, naverPayAdAccumulatedDisplayValue, mobileLowPrice, lowPrice, deliveryFeeContent, dlvryLowPrice, imageUrl, imgSz, searchKeyword, mallProductUrl, mallPcUrl, mallName, manuTag, purchaseCnt, current_keyword, rank, unique_id, date_created, time_created)) # keyword_id, item_name, price, purchase_count, related_keywords, rank는 적절하게 설정해야 합니다.
|
|
logger.debug(f"{current_keyword} DB업데이트 완료")
|
|
else:
|
|
# 중복되는 데이터가 있으면 업데이트 또는 무시 (여기서는 예시로 업데이트 로직을 추가함)
|
|
#c.execute("UPDATE NaverShopping SET keyword_id = ?, keyword = ?, price = ?, productTitle = ?, category1Name = ?, category2Name = ?, category3Name = ?, category4Name = ?, openDate = ?, mallCount = ?, keepCnt = ?, overseaTp = ?, reviewCount = ?, reviewCountSum = ?, scoreInfo = ?, naverPayAdAccumulatedDisplayValue = ?, mobileLowPrice = ?, lowPrice = ?, deliveryFeeContent = ?, dlvryLowPrice = ?, imageUrl = ?, imgSz = ?, searchKeyword = ?, mallProductUrl = ?, mallPcUrl = ?, mallName = ?, manuTag = ?, purchaseCnt = ?, relatedTags = ?, rank = ?, date_created = ?, WHERE unique_id = ?",
|
|
# (keyword_id, current_keyword, price, productTitle, category1Name, category2Name, category3Name, category4Name, openDate, mallCount, keepCnt, overseaTp, reviewCount, reviewCountSum, scoreInfo, naverPayAdAccumulatedDisplayValue, mobileLowPrice, lowPrice, deliveryFeeContent, dlvryLowPrice, imageUrl, imgSz, searchKeyword, mallProductUrl, mallPcUrl, mallName, manuTag, purchaseCnt, current_keyword, rank, date_created, unique_id)) # keyword_id, item_name, price, purchase_count, related_keywords, rank는 적절하게 설정해야 합니다.
|
|
pass
|
|
|
|
# SubKeywords 테이블에 기록하기
|
|
# ... (SubKeywords 테이블에 기록하는 코드) ...
|
|
logger.debug(f"키워드 검색 결과 상품 [{keyword}]에 대한 [{len(final_top_5_products)}]개의 상품정보수집 완료")
|
|
|
|
conn.commit() # Commit the transaction
|
|
#conn.close() # Close the connection
|