401 lines
15 KiB
Python
401 lines
15 KiB
Python
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.common.action_chains import ActionChains
|
|
from selenium.webdriver.common.keys import Keys
|
|
from google.api_core.exceptions import InternalServerError
|
|
import time
|
|
import sys, re
|
|
from naver_search import parse_naver_shopping
|
|
import numpy as np
|
|
from pymongo import MongoClient
|
|
import logging
|
|
|
|
# 로거 인스턴스 가져오기
|
|
logger = logging.getLogger('default_logger')
|
|
avg_price = 0
|
|
|
|
def try_connect(self, address, port, user, password):
|
|
address, port, user, password = self.load_config()
|
|
# logger.debug(f"{address},{port},{user},{password}")
|
|
if not all([address, port, user, password]):
|
|
logger.debug("Configuration missing. Please check your config.ini file.")
|
|
return False
|
|
try:
|
|
client = MongoClient(f'mongodb://{user}:{password}@{address}:{port}/')
|
|
db = client['taobao_project'] # 여기에서 추가 연결 확인 작업을 수행할 수 있습니다.
|
|
logger.debug("MongoDB 연결 성공.")
|
|
return db['delv_fee']
|
|
except Exception as e:
|
|
logger.debug(f"MongoDB 연결 실패: {e}")
|
|
return False
|
|
|
|
def safe_generate_content(gemini, image_src, product_title, max_retries=3, initial_wait=1):
|
|
retry_count = 0
|
|
wait_time = initial_wait
|
|
|
|
while retry_count < max_retries:
|
|
try:
|
|
return gemini.generate_description(image_src, product_title)
|
|
except InternalServerError as e:
|
|
logger.debug(f"Retry {retry_count + 1}/{max_retries} for InternalServerError")
|
|
time.sleep(wait_time)
|
|
wait_time *= 2 # Exponential backoff
|
|
retry_count += 1
|
|
|
|
# Final attempt without catching the exception
|
|
return gemini.generate_content(image_src, product_title)
|
|
|
|
def selling_price(tao_bao_price, shipping_fee=6500, target_margin_rate=0.29, market_fee_rate=0.13,
|
|
card_fee_rate=0.035, exchange_rate=200):
|
|
"""
|
|
판매가를 계산하는 함수
|
|
|
|
Args:
|
|
tao_bao_price: float, 타오바오 제품 원가
|
|
target_margin_rate: float, 목표 마진율
|
|
market_fee_rate: float, 마켓 수수료율
|
|
card_fee_rate: float, 카드 결제 수수료율
|
|
exchange_rate: float, 환율
|
|
shipping_fee: float, 배송비
|
|
|
|
Returns:
|
|
float, 판매가
|
|
"""
|
|
|
|
# 제품 원가 계산
|
|
product_cost = tao_bao_price * exchange_rate
|
|
|
|
# 목표 마진 계산
|
|
target_margin = product_cost * target_margin_rate
|
|
|
|
# 총 수수료율 계산
|
|
total_fee_rate = card_fee_rate + market_fee_rate
|
|
|
|
# 판매가 계산
|
|
selling_price = (product_cost + target_margin) / (1 - total_fee_rate - market_fee_rate) + shipping_fee
|
|
|
|
# 마켓 수수료 계산
|
|
market_fee = selling_price * market_fee_rate
|
|
|
|
# 마진 계산
|
|
margin = selling_price - product_cost - market_fee - shipping_fee
|
|
|
|
return selling_price, margin
|
|
|
|
def extract_weight(text):
|
|
"""
|
|
텍스트에서 무게 값만 추출하는 함수. 유효한 값이 없을 경우 기본값을 적용합니다.
|
|
|
|
Args:
|
|
text: str, 텍스트. None이거나 빈 문자열일 경우 기본값 반환.
|
|
|
|
Returns:
|
|
float, 추출된 무게 값 (kg) 또는 기본값.
|
|
"""
|
|
default_weight = 1.0 # 기본 무게값 설정
|
|
|
|
# 입력값 검증
|
|
if not text:
|
|
return default_weight
|
|
|
|
# 정규표현식을 사용하여 숫자와 단위를 추출합니다.
|
|
pattern = r'(\d+(\.\d+)?)\s*(kg|g)'
|
|
match = re.search(pattern, text)
|
|
|
|
if match:
|
|
weight = float(match.group(1)) # 숫자를 실수형으로 변환합니다.
|
|
unit = match.group(3) # 단위를 가져옵니다.
|
|
|
|
if unit == 'g':
|
|
weight /= 1000 # 그램을 킬로그램으로 변환합니다.
|
|
|
|
return weight if weight > 0 else default_weight
|
|
else:
|
|
return default_weight
|
|
|
|
# 무게배송비 추정
|
|
def find_delivery_fee(weight, delv_collection):
|
|
# 가장 가까운 무게값 찾기
|
|
cursor = delv_collection.find({}, {'_id': 0, 'weight': 1, 'fee': 1}).sort('weight', 1)
|
|
weights = [record['weight'] for record in cursor]
|
|
cursor.rewind()
|
|
fees = [int(record['fee']) for record in cursor] # 콤마 제거 후 정수로 변환
|
|
|
|
|
|
# fees = [record['fee'] for record in cursor]
|
|
|
|
|
|
# 입력된 무게와 가장 가까운 무게 찾기
|
|
closest_weight_index = np.argmin(np.abs(np.array(weights) - weight))
|
|
closest_weight = weights[closest_weight_index]
|
|
logger.debug(f"가장 가까운 무게 : {closest_weight}")
|
|
logger.debug(f"closest_weight_index: {closest_weight_index}")
|
|
logger.debug(f"fees 길이 : {len(fees)}")
|
|
logger.debug(f"weights 길이 : {len(weights)}")
|
|
|
|
|
|
# 가장 가까운 무게에 해당하는 요금 반환
|
|
return fees[closest_weight_index]
|
|
|
|
|
|
#네이버쇼핑 함수
|
|
def NS_info(products):
|
|
# 각 상품 정보에 접근하여 텍스트로 가공
|
|
product_info_text = ""
|
|
global avg_price
|
|
avg_price = 0
|
|
|
|
for index, product in enumerate(products):
|
|
idx = index + 1
|
|
product_info_text += '<div class="card">\n' # 카드 시작 태그
|
|
product_info_text += f"=================={idx} 번째 네이버 상품========================"
|
|
# 이미지 태그
|
|
product_info_text += f'<img src="{product["imageUrl"]}" width="200" alt="상품 이미지">\n'
|
|
|
|
# 상품 세부 정보 태그 시작
|
|
product_info_text += '<div class="product-details">\n'
|
|
|
|
# 상품명 태그
|
|
# product_info_text += f'<div class="product-name"><strong>상품명 : {product["productTitle"]}</strong></div>\n'
|
|
product_info_text += f'<div class="product-name"><h3>상품명 : {product["productTitle"]}</h3></div>\n'
|
|
# 가격 태그
|
|
# 상품 가격을 숫자로 변환하고 포맷
|
|
if isinstance(product["price"], str):
|
|
# 문자열인 경우 숫자로 변환
|
|
price = int(product["price"].replace(",", "")) # 쉼표 제거 후 정수 변환
|
|
else:
|
|
price = product["price"] # 이미 숫자인 경우 변환 없음
|
|
|
|
formatted_price = format(price, ",") + "원" # 천 단위 구분으로 포맷
|
|
product_info_text += f'<div class="price">가격: {formatted_price}</div>'
|
|
|
|
# 순위 태그
|
|
product_info_text += f'<div class="price">순위: {product["rank"]}</div>\n'
|
|
|
|
product_info_text += "=================================================================="
|
|
|
|
# 상품 세부 정보 태그 종료
|
|
product_info_text += '</div>\n'
|
|
|
|
product_info_text += '</div>\n' # 카드 종료 태그
|
|
avg_price += int(product["price"])
|
|
|
|
# avg_price = round(avg_price/len(products),0)
|
|
|
|
if products: # products가 비어 있지 않은 경우에만 평균 가격 계산
|
|
avg_price = round(avg_price / len(products), 0)
|
|
product_info_text += f'네이버 상품의 평균 가격은 [{avg_price}]'
|
|
|
|
return product_info_text
|
|
|
|
|
|
def NS_info_with_HTML(products):
|
|
# 상품 정보를 담을 5x1 표 생성
|
|
product_info_text = "<table><tr>"
|
|
global avg_price
|
|
avg_price = 0
|
|
|
|
for product in products:
|
|
if isinstance(product["price"], str):
|
|
price = int(product["price"].replace(",", ""))
|
|
else:
|
|
price = product["price"]
|
|
formatted_price = format(price, ",") + "원"
|
|
|
|
# 각 상품 카드를 생성하고 테이블의 셀로 추가
|
|
product_card = create_product_card(product["productTitle"], product["imageUrl"], formatted_price, product["rank"], product["purchase"], product["review"])
|
|
product_info_text += f"<td style='width: 170px; height: 170px;'>{product_card}</td>"
|
|
avg_price += price
|
|
|
|
product_info_text += "</tr></table>"
|
|
|
|
if products:
|
|
avg_price = round(avg_price / len(products), 0)
|
|
product_info_text += f'<div><h3>네이버 상품의 평균 가격은 {avg_price:,.0f}원입니다.</h3></div>'
|
|
|
|
return product_info_text
|
|
|
|
|
|
#네이버쇼핑 함수 HTML
|
|
def NS_info_with_HTML_ori(products):
|
|
product_info_text = ""
|
|
global avg_price
|
|
avg_price = 0
|
|
|
|
for product in products:
|
|
if isinstance(product["price"], str):
|
|
price = int(product["price"].replace(",", ""))
|
|
else:
|
|
price = product["price"]
|
|
formatted_price = format(price, ",") + "원"
|
|
|
|
# 각 상품 카드를 생성하여 테이블의 셀로 추가
|
|
product_card = create_product_card(product["productTitle"], product["imageUrl"], formatted_price, product["rank"])
|
|
product_info_text += f"<td>{product_card}</td>"
|
|
avg_price += price
|
|
|
|
product_info_text += "</tr></table>"
|
|
|
|
if products:
|
|
avg_price = round(avg_price / len(products), 0)
|
|
product_info_text += f'<div>네이버 상품의 평균 가격은 {avg_price}원입니다.</div>'
|
|
|
|
return product_info_text
|
|
|
|
def create_product_card(product_name, image_url, price, rank, purchase, review):
|
|
# 카드의 크기를 150x150px로 조정
|
|
product_card = f"""
|
|
<div style="width: 150px; height: 150px; overflow: hidden; text-align: center;">
|
|
<img src="{image_url}" style="width: 150px; height: 150px;" alt="상품 이미지">
|
|
<div style="font-size: 12px;"><strong>{product_name}</strong></div>
|
|
<div style="font-size: 12px;">가격: {price}</div>
|
|
<div style="font-size: 12px;">순위: {rank}</div>
|
|
<div style="font-size: 12px;">구매건수: {purchase}</div>
|
|
<div style="font-size: 12px;">리뷰수: {review}</div>
|
|
</div>
|
|
"""
|
|
return product_card
|
|
|
|
|
|
|
|
#네이버쇼핑 함수 마크다운
|
|
def NS_info_markdown(products):
|
|
product_info_md = ""
|
|
global avg_price
|
|
avg_price = 0
|
|
|
|
for index, product in enumerate(products):
|
|
idx = index + 1
|
|
# 제품명을 헤더로 표시
|
|
product_info_md += f"### {idx}번상품. {product['productTitle']}\n"
|
|
|
|
# 이미지 추가
|
|
product_info_md += f"\n\n"
|
|
|
|
# 가격과 순위 정보
|
|
if isinstance(product["price"], str):
|
|
price = int(product["price"].replace(",", "")) # 쉼표 제거 후 정수 변환
|
|
else:
|
|
price = product["price"]
|
|
formatted_price = format(price, ",") + "원"
|
|
|
|
product_info_md += f"- **가격:** {formatted_price}\n"
|
|
product_info_md += f"- **순위:** {product['rank']}\n\n"
|
|
|
|
avg_price += price # 평균 가격 계산을 위해 가격 추가
|
|
|
|
if products: # products가 비어 있지 않은 경우에만 평균 가격 계산
|
|
avg_price = round(avg_price / len(products), 0)
|
|
product_info_md += f"**네이버 상품의 평균 가격은 {avg_price}원입니다.**\n"
|
|
|
|
return product_info_md
|
|
|
|
|
|
def modify_detail_page(driver, image_src, gemini, product_title, delv_collection, product_low_cost, product_high_cost):
|
|
|
|
# testt = find_delivery_fee(7,delv_collection)
|
|
# logger.debug(f"값 출력 : {testt}")
|
|
|
|
# 상세페이지 탭으로 이동
|
|
# detail_tab = driver.find_element(By.ID, "rc-tabs-0-tab-5")
|
|
try:
|
|
detail_tab = WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located((By.CSS_SELECTOR, ".ant-tabs-tab:nth-child(6)"))
|
|
)
|
|
save_button = WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located((By.XPATH, "//button[contains(.,'저장하기')]"))
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f"상세페이지 탭으로 이동 중 오류 발생: 요소를 찾을 수 없습니다. : {e}")
|
|
# detail_tab = driver.find_element(By.CSS_SELECTOR, ".ant-tabs-tab:nth-child(6)")
|
|
detail_tab.click()
|
|
time.sleep(2) # 페이지 로딩 대기
|
|
|
|
# 상세페이지 내용 수정
|
|
try:
|
|
detail_content = WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located((By.XPATH, "//div[@id='productMainContentContainerId']/div/div/div[2]/div[2]/div[2]/div"))
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f"상세페이지 내용 수정 중 오류 발생: 요소를 찾을 수 없습니다. : {e}")
|
|
# detail_content = driver.find_element(By.XPATH, "//div[@id='productMainContentContainerId']/div/div/div[2]/div[2]/div[2]/div")
|
|
# detail_content = driver.find_element(By.ID, "rc-tabs-0-tab-5")
|
|
|
|
|
|
ActionChains(driver).move_to_element(detail_content).click().perform()
|
|
time.sleep(1)
|
|
|
|
logger.debug("AI 이미지 처리중")
|
|
# contents = bard_img(image_src)
|
|
# contents = gemini.generate_description(image_src, product_title)
|
|
contents = safe_generate_content(gemini, image_src, product_title)
|
|
logger.debug(f"{contents}")
|
|
logger.debug("AI 이미지 처리 완료")
|
|
|
|
weight = extract_weight(contents)
|
|
logger.debug(f"무게 추정 : {weight}")
|
|
delv_fee = find_delivery_fee(weight, delv_collection)
|
|
logger.debug(f"무게배송비 추정 : {delv_fee}")
|
|
|
|
logger.debug("네이버쇼핑 파싱 중")
|
|
products = parse_naver_shopping(product_title, isOverseas=1, sortcount=5)
|
|
# product_info_text = NS_info(products)
|
|
product_info_text = NS_info_with_HTML(products)
|
|
# logger.debug(f"수집된 정보 \n {product_info_text} \n")
|
|
logger.debug("네이버쇼핑 파싱 완료")
|
|
|
|
|
|
# 가격 정보 계산
|
|
low_cost, low_margin = selling_price(product_low_cost, delv_fee, 0.29)
|
|
high_cost, high_margin = selling_price(product_high_cost, delv_fee, 0.29)
|
|
low_delv_fee = avg_price - low_cost
|
|
high_delv_fee = avg_price - high_cost
|
|
|
|
# 마진율 계산
|
|
low_margin_r = round((low_margin / low_cost) * 100, 2)
|
|
high_margin_r = round((high_margin / high_cost) * 100, 2)
|
|
|
|
# 텍스트 조합
|
|
add_text = f"""
|
|
============= 가격과 마진율 =============
|
|
해당 제품의 네이버 평균가격은 {avg_price:,.0f}원입니다.
|
|
해당 제품의 무게배송비는 {delv_fee:,.0f}원입니다.
|
|
|
|
**마진율 확보를 위한 판매가 범위:** {low_cost:,.0f}원 ~ {high_cost:,.0f}원
|
|
|
|
**저가:**
|
|
* 무게 배송비 대비: {low_delv_fee:,.0f}원 {('낮은' if low_delv_fee < 0 else '높은')} 편
|
|
* 마진율: {low_margin_r:,.2f}%
|
|
|
|
**고가:**
|
|
* 무게 배송비 대비: {high_delv_fee:,.0f}원 {('낮은' if high_delv_fee < 0 else '높은')} 편
|
|
* 마진율: {high_margin_r:,.2f}%
|
|
|
|
**적정 배송비:** {low_delv_fee:,.0f}원 ~ {high_delv_fee:,.0f}원
|
|
|
|
**추가 정보:**
|
|
* 해당 제품의 파손 여부 및 무게 오차를 고려하여 더하기 마진을 설정하세요.
|
|
=======================================
|
|
"""
|
|
|
|
detail_content.send_keys(add_text)
|
|
time.sleep(2)
|
|
|
|
# 저장 버튼 클릭
|
|
try:
|
|
save_button = WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located((By.XPATH, "//button[contains(.,'저장하기')]"))
|
|
)
|
|
logger.debug("저장 버튼을 성공적으로 찾았습니다.")
|
|
except Exception as e:
|
|
logger.debug(f"저장 버튼 요소를 찾을 수 없습니다. : {e}")
|
|
# save_button = driver.find_element(By.XPATH, "//button[contains(.,'저장하기')]")
|
|
save_button.click()
|
|
time.sleep(0.5)
|
|
|
|
|
|
|
|
|
|
|
|
|