761 lines
38 KiB
Python
761 lines
38 KiB
Python
from collections import Counter
|
|
import pyautogui
|
|
from datetime import datetime
|
|
import numpy as np
|
|
import asyncio
|
|
import math , re, time
|
|
from playwright.async_api import TimeoutError
|
|
|
|
class PriceHandler:
|
|
def __init__(self, locator_manager, browser_controller, logger, vertexAI, debug_flag=False):
|
|
self.locator_manager = locator_manager
|
|
self.browser_controller = browser_controller
|
|
self.page = self.browser_controller.page
|
|
self.logger = logger
|
|
self.debug_flag = debug_flag
|
|
self.vertexAItranslator = vertexAI
|
|
|
|
# Locator들을 미리 로드하여 초기화 - locator_template:동적 로딩, locator:고정로딩
|
|
self.product_cost_locator_template = self.locator_manager.get_locator('PriceLocators', 'product_cost_locator')
|
|
self.standard_selling_price_locator_template = self.locator_manager.get_locator('PriceLocators', 'standard_selling_price_locator')
|
|
self.plus_margin_locator = self.locator_manager.get_locator('PriceLocators', 'plus_margin_locator')
|
|
self.oversea_shipping_locator = self.locator_manager.get_locator('PriceLocators', 'oversea_shipping_locator')
|
|
self.return_fee_input_locator = self.locator_manager.get_locator('PriceLocators', 'return_fee_input_locator')
|
|
self.first_delv_fee_input_locator = self.locator_manager.get_locator('PriceLocators', 'first_delv_fee_input_locator')
|
|
self.exchange_fee_input_locator = self.locator_manager.get_locator('PriceLocators', 'exchange_fee_input_locator')
|
|
self.option_count_text_locator = self.locator_manager.get_locator('PriceLocators', 'option_count_text_locator')
|
|
|
|
self.initial_setting()
|
|
self.initialize_values() # 초기값 설정 메서드
|
|
|
|
def initialize_values(self):
|
|
"""
|
|
가격과 관련된 변수들을 초기화하는 메서드입니다.
|
|
여러 상품을 처리할 때 데이터가 이전 상품에 영향을 주지 않도록 초기화합니다.
|
|
"""
|
|
self.shipping_config = None
|
|
self.margin_config = None
|
|
self.optimal_price_config = None
|
|
self.product_costs = []
|
|
self.option_data = []
|
|
self.sold_price = 0
|
|
self.calculated_price = 0
|
|
self.optimal_price = 0
|
|
self.return_fee = 0
|
|
self.first_delv_fee = 0
|
|
self.exchange_fee = 0
|
|
self.sold_price = 0
|
|
|
|
def initial_setting(self):
|
|
# 설정 초기화
|
|
self.shipping_config = self.set_shipping_config(min_price=50000, thresholds=[50000, 100000, 200000], increment_unit=20000, additional_costs=[5000, 7000, 9000])
|
|
self.margin_config = self.set_margin_config(thresholds=[50000, 70000, 100000, 150000, 200000, 300000, 400000, 500000, 1000000], additional_margins=[5000, 10000, 15000, 25000, 35000, 50000, 70000, 90000, 120000])
|
|
self.optimal_price_config = self.set_optimal_price_config(sold_price=None, cost_price2X=None, calculated_price=None, lower_bound=0.85, upper_bound=1.15, ratios={'sold_price': 0.5, 'cost_price2X': 0.3, 'calculated_price': 0.2})
|
|
|
|
async def process_price(self, sold_price=0, category=None):
|
|
try:
|
|
# 상품정보 초기화
|
|
self.initialize_values()
|
|
|
|
# 1. 페이지에서 가격 정보 수집
|
|
self.logger.debug("초기 더하기마진과 해외배송비 가격 정보를 수집합니다.")
|
|
initial_plusmargin, initial_shipping_price = await self.get_plusmargin_and_shipping_values()
|
|
|
|
# if initial_plusmargin and initial_plusmargin != 10000:
|
|
if (initial_plusmargin != 10000 and initial_shipping_price != 0):
|
|
sold_price = initial_plusmargin
|
|
self.logger.debug(f"더하기마진값{initial_plusmargin}을 팔린가격{sold_price}으로 간주")
|
|
|
|
self.logger.debug("옵션 가격 정보를 수집합니다.")
|
|
option_data, min_cost, max_cost, avg_cost, upper_avg_cost = await self.collect_product_costs_and_prices() # 수집된 옵션정보를 반환
|
|
if option_data is None:
|
|
self.logger.error("상품 옵션 정보를 수집하지 못했습니다.", exc_info=True)
|
|
return
|
|
|
|
# 2. 원가 기반가격 계산
|
|
calculated_price = self.calc_initial_price(upper_avg_cost, self.margin_config, self.shipping_config, 0.04, 0.24)
|
|
|
|
# 3. 적정 판매가 산출
|
|
self.logger.debug("적정 판매가를 계산합니다.")
|
|
self.optimal_price_config['sold_price'] = sold_price # 팔린가격 기본값(10000이 아닌 값이 있을 경우 팔린가격으로 간주)
|
|
self.optimal_price_config['cost_price2X'] = upper_avg_cost # 원가2배 가격 : upper_avg_cost를 기준으로 계산.
|
|
self.optimal_price_config['calculated_price'] = calculated_price # 기준판매가를 기준으로 계산된 값
|
|
optimal_price = self.calculate_optimal_price(self.optimal_price_config)
|
|
self.logger.debug(f"계산된 적정 판매가: {optimal_price}")
|
|
|
|
# 4. 더하기 마진을 적정 판매가 기준으로 재설정
|
|
self.logger.debug("더하기 마진을 적정 판매가에 맞게 조정합니다.")
|
|
additional_margin = self.calculate_adjusted_margin(optimal_price, option_data, self.margin_config)
|
|
additional_margin = self.round_to_UP(additional_margin)
|
|
self.logger.debug(f"조정된 더하기 마진: {additional_margin}")
|
|
|
|
# 5. 해외 배송비 재계산
|
|
shipping_base_price = optimal_price - upper_avg_cost - (upper_avg_cost*0.04) - (upper_avg_cost*0.24) - additional_margin
|
|
shipping_cost = self.calculate_shipping_cost_with_extended_thresholds(10000, shipping_base_price, self.shipping_config)
|
|
shipping_cost = self.round_to_UP(shipping_cost)
|
|
self.logger.debug(f"적정판매가 기준으로 재계산된 해외배송비: {shipping_cost}")
|
|
|
|
# 5. 카테고리별 추가 해외배송비 계산
|
|
extra_shipping = self.calculate_category_extra_shipping(category, optimal_price)
|
|
shipping_cost += extra_shipping
|
|
self.logger.debug(f"카테고리별 추가배송비 기준으로 재계산된 해외배송비: {shipping_cost}")
|
|
|
|
# 5. 계산된 값 입력
|
|
self.logger.debug("계산된 값을 페이지에 입력합니다.")
|
|
await self.input_calculated_values(additional_margin, shipping_cost)
|
|
|
|
# 6. 반품비, 초도배송비, 교환비 계산 및 입력
|
|
return_fee, first_delv_fee, exchange_fee = self.calculate_claim_costs(max_cost)
|
|
self.logger.debug(f"반품비: {return_fee}, 초도배송비: {first_delv_fee}, 교환비: {exchange_fee}")
|
|
await self.input_claim_costs(return_fee, first_delv_fee, exchange_fee)
|
|
|
|
# 7. 저장
|
|
# save_xpath = "//button[contains(.,'저장하기')]"
|
|
# click_element(driver, 'XPATH', save_xpath, 10, 'js')
|
|
# self.logger.debug("가격 정리 후 저장버튼 클릭 완료")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"가격 수정 중 오류 발생: {e}", exc_info=True)
|
|
|
|
|
|
async def input_claim_costs(self, return_fee, first_delv_fee, exchange_fee):
|
|
"""
|
|
계산된 반품비, 초도배송비, 교환비를 입력합니다.
|
|
|
|
Parameters:
|
|
- driver: WebDriver 인스턴스
|
|
- return_fee (int): 반품비
|
|
- first_delv_fee (int): 초도배송비
|
|
- exchange_fee (int): 교환비
|
|
"""
|
|
try:
|
|
# 반품비 수정
|
|
return_fee_element = await self.page.wait_for_selector(self.return_fee_input_locator)
|
|
if return_fee_element:
|
|
return_fee = self.round_to_UP(return_fee)
|
|
await return_fee_element.fill(str(return_fee)) # 기존 내용을 지우고 새 값을 입력
|
|
self.logger.debug(f"반품비 수정 완료: {return_fee}")
|
|
else:
|
|
self.logger.error(f"반품비 입력 중 오류 발생: 요소를 찾을 수 없음", exc_info=True)
|
|
|
|
# 초도배송비 수정
|
|
first_delv_fee_element = await self.page.wait_for_selector(self.first_delv_fee_input_locator)
|
|
if first_delv_fee_element:
|
|
first_delv_fee = self.round_to_UP(first_delv_fee)
|
|
await first_delv_fee_element.fill(str(first_delv_fee)) # 기존 내용을 지우고 새 값을 입력
|
|
self.logger.debug(f"초도배송비 수정 완료: {first_delv_fee}")
|
|
else:
|
|
self.logger.error(f"초도배송비 입력 중 오류 발생: 요소를 찾을 수 없음", exc_info=True)
|
|
|
|
# 교환비 수정
|
|
exchange_fee_element = await self.page.wait_for_selector(self.exchange_fee_input_locator)
|
|
if exchange_fee_element:
|
|
exchange_fee = self.round_to_UP(exchange_fee)
|
|
await exchange_fee_element.fill(str(exchange_fee)) # 기존 내용을 지우고 새 값을 입력
|
|
self.logger.debug(f"교환비 수정 완료: {exchange_fee}")
|
|
else:
|
|
self.logger.error(f"교환비 입력 중 오류 발생: 요소를 찾을 수 없음", exc_info=True)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Claim cost 수정 중 오류 발생: {e}", exc_info=True)
|
|
|
|
|
|
def calculate_claim_costs(self, max_cost: int) -> tuple[int, int, int]:
|
|
|
|
"""
|
|
반품비, 초도배송비, 교환비를 계산합니다.
|
|
|
|
Parameters:
|
|
- max_cost (int): 상품의 최대 원가
|
|
|
|
Returns:
|
|
- return_fee (int): 반품비
|
|
- first_delv_fee (int): 초도배송비
|
|
- exchange_fee (int): 교환비
|
|
"""
|
|
try:
|
|
max_return_fee = 199000
|
|
max_exchange_fee = 499000
|
|
|
|
return_fee = min(max_return_fee, max_cost)
|
|
return_fee = self.round_to_UP(return_fee)
|
|
first_delv_fee = return_fee # 초도배송비는 반품비와 동일
|
|
exchange_fee = min(max_exchange_fee, return_fee * 2)
|
|
exchange_fee = self.round_to_UP(exchange_fee)
|
|
|
|
self.logger.debug(f"계산된 반품비: {return_fee}, 초도배송비: {first_delv_fee}, 교환비: {exchange_fee}")
|
|
return return_fee, first_delv_fee, exchange_fee
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Claim cost 계산 중 오류 발생: {e}", exc_info=True)
|
|
return 199000, 199000, 499000
|
|
|
|
async def input_calculated_values(self, margin, shipping_cost):
|
|
"""
|
|
계산된 마진, 배송비, 교환비를 각 옵션에 입력합니다.
|
|
|
|
Parameters:
|
|
- driver: WebDriver 인스턴스
|
|
- margin (int): 더하기 마진
|
|
- shipping_cost (int): 해외배송비
|
|
"""
|
|
try:
|
|
# 1000원 단위 절상
|
|
margin = self.round_to_UP(margin)
|
|
shipping_cost = self.round_to_UP(shipping_cost)
|
|
|
|
# 더하기 마진 입력 (4번째 인풋박스)
|
|
margin_element = await self.page.wait_for_selector(self.plus_margin_locator)
|
|
await margin_element.fill(str(margin))
|
|
self.logger.debug(f"더하기 마진 입력 완료: {margin}")
|
|
|
|
# 해외 배송비 입력 (5번째 인풋박스)
|
|
oversea_shipping_element = await self.page.wait_for_selector(self.oversea_shipping_locator)
|
|
await oversea_shipping_element.fill(str(shipping_cost))
|
|
self.logger.debug(f"해외 배송비 입력 완료: {shipping_cost}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to input calculated values: {e}", exc_info=True)
|
|
|
|
|
|
def calculate_shipping_cost_with_extended_thresholds(self, base_cost, product_price, config, max_threshold=5000000):
|
|
"""
|
|
점진적으로 증가하는 해외배송비를 계산하는 메서드입니다.
|
|
|
|
Parameters:
|
|
- base_cost (int): 기본 해외배송비.
|
|
- product_price (int): 상품의 가격.
|
|
- config (dict): 구간과 추가 배송비 정보를 포함하는 딕셔너리.
|
|
- max_threshold (int): 추가 구간을 자동 확장할 최대 가격 (기본값: 5000000).
|
|
|
|
Returns:
|
|
- total_shipping_cost (int): 계산된 총 해외배송비.
|
|
"""
|
|
try:
|
|
total_shipping_cost = base_cost
|
|
min_price_for_extra_shipping = config.get('min_price_for_extra_shipping', 0)
|
|
thresholds = config.get('thresholds', [])
|
|
increment_unit = config.get('increment_unit', 0)
|
|
additional_costs = config.get('additional_costs', [])
|
|
|
|
if not thresholds or not additional_costs:
|
|
raise ValueError("기준 구간과 추가 비용 정보는 비워둘 수 없습니다.")
|
|
|
|
self.logger.debug(f"기존 구간: {thresholds}")
|
|
self.logger.debug(f"기존 추가 비용: {additional_costs}")
|
|
|
|
# 주어진 product_price 까지만 확장하도록 변경
|
|
while thresholds[-1] < max_threshold and thresholds[-1] < product_price:
|
|
last_threshold_diff = thresholds[-1] - thresholds[-2]
|
|
new_threshold_diff = last_threshold_diff + (last_threshold_diff - (thresholds[-2] - thresholds[-3]) if len(thresholds) > 2 else last_threshold_diff)
|
|
|
|
last_cost_diff = additional_costs[-1] - additional_costs[-2]
|
|
new_cost_diff = last_cost_diff + (last_cost_diff - (additional_costs[-2] - additional_costs[-3]) if len(additional_costs) > 2 else last_cost_diff)
|
|
|
|
next_threshold = thresholds[-1] + new_threshold_diff
|
|
next_additional_cost = additional_costs[-1] + new_cost_diff
|
|
|
|
thresholds.append(next_threshold)
|
|
additional_costs.append(next_additional_cost)
|
|
|
|
self.logger.debug(f"확장된 '해외배송비' 구간: {next_threshold}")
|
|
self.logger.debug(f"확장된 '해외배송비' 추가 비용: {next_additional_cost}")
|
|
|
|
# 상품 가격이 최소 추가 배송비 적용 금액 이상일 때만 추가 비용을 계산
|
|
if product_price > min_price_for_extra_shipping:
|
|
remaining_price = product_price - min_price_for_extra_shipping
|
|
|
|
for i in range(len(thresholds)):
|
|
if remaining_price > 0:
|
|
if i < len(thresholds) - 1:
|
|
next_threshold = thresholds[i + 1]
|
|
if remaining_price <= next_threshold - min_price_for_extra_shipping:
|
|
increments = remaining_price // increment_unit
|
|
total_shipping_cost += increments * additional_costs[i]
|
|
self.logger.debug(f"적용된 구간: {next_threshold}, 계산된 구간추가배송비: {increments * additional_costs[i]}")
|
|
remaining_price = 0
|
|
else:
|
|
increments = (next_threshold - min_price_for_extra_shipping) // increment_unit
|
|
total_shipping_cost += increments * additional_costs[i]
|
|
self.logger.debug(f"적용된 구간: {next_threshold}, 계산된 구간추가배송비: {increments * additional_costs[i]}")
|
|
remaining_price -= next_threshold - min_price_for_extra_shipping
|
|
min_price_for_extra_shipping = next_threshold
|
|
else:
|
|
increments = remaining_price // increment_unit
|
|
total_shipping_cost += increments * additional_costs[i]
|
|
self.logger.debug(f"최대 구간 적용, 계산된 구간추가배송비: {increments * additional_costs[i]}")
|
|
remaining_price = 0
|
|
|
|
result = self.round_to_UP(total_shipping_cost)
|
|
self.logger.debug(f"최종 계산된 배송비: {result}")
|
|
|
|
return result
|
|
|
|
except ValueError as ve:
|
|
self.logger.error(f"기본 구성 오류: {ve}", exc_info=True)
|
|
return base_cost # 예외 발생 시 기본 배송비만 반환
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"계산된 값 입력 실패: {e}", exc_info=True)
|
|
return max(base_cost, 10000) # 예외 발생 시 최소 기본 배송비 반환
|
|
|
|
def calculate_adjusted_margin(self, optimal_price, option_data, margin_config):
|
|
"""
|
|
적정판매가와 기준판매가를 비교하여 더하기 마진을 조정하는 메서드입니다.
|
|
|
|
Parameters:
|
|
- optimal_price (int): 계산된 적정 판매가.
|
|
- option_data (list): 각 옵션의 데이터.
|
|
- margin_config (dict): 더하기 마진 설정 딕셔너리.
|
|
|
|
Returns:
|
|
- adjusted_margin (int): 적정 판매가에 맞게 조정된 더하기 마진.
|
|
"""
|
|
try:
|
|
total_option_price = math.ceil(sum([option['price'] for option in option_data]) / len(option_data))
|
|
self.logger.debug(f"총 옵션 기준 판매가 평균: {total_option_price}")
|
|
|
|
# 기준판매가와 적정판매가의 차이를 바탕으로 마진을 조정
|
|
price_difference = optimal_price - total_option_price
|
|
self.logger.debug(f"적정 판매가와 기준 판매가 차이: {price_difference}")
|
|
|
|
# 마진 설정: 차이에 따른 더하기 마진 재조정
|
|
adjusted_margin = self.calculate_additional_margin_with_extended_thresholds(optimal_price, margin_config)
|
|
|
|
# 만약 차이가 크다면, 추가로 마진을 더 조정할 수 있음
|
|
if price_difference > 0:
|
|
adjusted_margin += price_difference * 0.1 # 차이의 10%를 추가 마진에 반영
|
|
|
|
return adjusted_margin
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"더하기 마진 조정 중 오류 발생: {e}", exc_info=True)
|
|
return 0 # 기본값 반환
|
|
|
|
|
|
def calculate_optimal_price(self, config):
|
|
"""
|
|
적정 판매가를 계산하는 메서드입니다.
|
|
|
|
Parameters:
|
|
- config (dict): 가격 정보와 비율을 포함하는 딕셔너리.
|
|
|
|
Returns:
|
|
- optimal_price (float): 계산된 적정 판매가
|
|
"""
|
|
try:
|
|
sold_price = config.get('sold_price') # 팔린가격 없으면 무시
|
|
cost_price2X = config['cost_price2X'] * 2 # 원가의 2배
|
|
calculated_price = config['calculated_price'] # 원가기준 계산된 가격
|
|
lower_bound = config['lower_bound'] # 상한선 비율 (기본값 : 원가2배의 15%)
|
|
upper_bound = config['upper_bound'] # 하한선 비율 (기본값 : 원가2배의 -15%)
|
|
ratios = config['ratios'] # 계산비율 (기본값 : 팔린가격 50%, 원가2배가격 30%, 계산된가격 20%)
|
|
|
|
# 비율 기반 계산
|
|
weighted_sum = 0
|
|
total_ratio = 0
|
|
|
|
# if sold_price and sold_price > cost_price2X:
|
|
if sold_price is not None:
|
|
weighted_sum += sold_price * ratios.get('sold_price', 0)
|
|
total_ratio += ratios.get('sold_price', 0)
|
|
if cost_price2X > 0:
|
|
weighted_sum += cost_price2X * ratios.get('cost_price2X', 0) # 원가의 2배
|
|
total_ratio += ratios.get('cost_price2X', 0)
|
|
if calculated_price > 0:
|
|
weighted_sum += calculated_price * ratios.get('calculated_price', 0)
|
|
total_ratio += ratios.get('calculated_price', 0)
|
|
|
|
if total_ratio == 0:
|
|
raise ValueError("유효한 가격 정보가 없습니다.")
|
|
|
|
# 비율에 따른 평균 가격 계산
|
|
optimal_price = weighted_sum / total_ratio
|
|
optimal_price = self.round_to_UP(optimal_price)
|
|
|
|
# 상하한 계산
|
|
lower_limit = cost_price2X * lower_bound
|
|
lower_limit = self.round_to_UP(lower_limit)
|
|
upper_limit = cost_price2X * upper_bound
|
|
upper_limit = self.round_to_UP(upper_limit)
|
|
|
|
self.logger.debug(f"비율 기반 계산된 가격: {optimal_price}, 상한: {upper_limit}, 하한: {lower_limit}")
|
|
|
|
# 상하한 범위 내의 값으로 조정
|
|
if optimal_price < lower_limit:
|
|
self.logger.debug(f"가격이 하한을 밑돌아서 하한으로 조정됨: {lower_limit}")
|
|
return lower_limit
|
|
elif optimal_price > upper_limit:
|
|
self.logger.debug(f"가격이 상한을 넘어서 상한으로 조정됨: {upper_limit}")
|
|
return upper_limit
|
|
else:
|
|
return optimal_price
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"적정 판매가 계산 실패: {e}", exc_info=True)
|
|
return 0 # 예외 발생 시 기본값 0 반환
|
|
|
|
def calculate_additional_margin_with_extended_thresholds(self, upper_average_price, config, max_threshold=5000000):
|
|
"""
|
|
점진적으로 증가하는 더하기 마진을 계산하는 메서드입니다.
|
|
|
|
Parameters:
|
|
- upper_average_price (int): 상품의 산술평균과 최대값의 평균 가격.
|
|
- config (dict): 구간과 추가 마진 정보를 포함하는 딕셔너리.
|
|
- max_threshold (int): 추가 구간을 자동 확장할 최대 가격 (기본값: 5000000).
|
|
|
|
Returns:
|
|
- total_margin (int): 계산된 더하기 마진.
|
|
"""
|
|
try:
|
|
thresholds = config.get('thresholds', [])
|
|
additional_margins = config.get('additional_margins', [])
|
|
|
|
if not thresholds or not additional_margins:
|
|
raise ValueError("기준 구간과 추가 마진 정보는 비워둘 수 없습니다.")
|
|
|
|
self.logger.debug(f"기존 구간: {thresholds}")
|
|
self.logger.debug(f"기존 추가 마진: {additional_margins}")
|
|
|
|
# 주어진 average_price 까지만 확장하도록 변경
|
|
while thresholds[-1] < max_threshold and thresholds[-1] < upper_average_price:
|
|
last_threshold_diff = thresholds[-1] - thresholds[-2]
|
|
new_threshold_diff = last_threshold_diff + (last_threshold_diff - (thresholds[-2] - thresholds[-3]) if len(thresholds) > 2 else last_threshold_diff)
|
|
|
|
last_margin_diff = additional_margins[-1] - additional_margins[-2]
|
|
new_margin_diff = last_margin_diff + (last_margin_diff - (additional_margins[-2] - additional_margins[-3]) if len(additional_margins) > 2 else last_margin_diff)
|
|
|
|
next_threshold = thresholds[-1] + new_threshold_diff
|
|
next_additional_margin = additional_margins[-1] + new_margin_diff
|
|
|
|
thresholds.append(next_threshold)
|
|
additional_margins.append(next_additional_margin)
|
|
|
|
self.logger.debug(f"확장된 '더하기마진' 구간: {next_threshold}")
|
|
self.logger.debug(f"확장된 '더하기마진' 의 추가 마진: {next_additional_margin}")
|
|
|
|
# 해당 구간에 맞는 마진을 계산
|
|
for i in range(len(thresholds)):
|
|
if upper_average_price <= thresholds[i]:
|
|
# 이전 구간과 현재 구간 사이에서 선형 보간법으로 마진 계산
|
|
if i == 0:
|
|
self.logger.debug(f"적용된 구간: {thresholds[0]}, 적용된 마진: {additional_margins[0]}")
|
|
return additional_margins[0]
|
|
prev_threshold = thresholds[i - 1]
|
|
prev_margin = additional_margins[i - 1]
|
|
current_threshold = thresholds[i]
|
|
current_margin = additional_margins[i]
|
|
# 선형 보간
|
|
calculated_margin = prev_margin + (current_margin - prev_margin) * (upper_average_price - prev_threshold) / (current_threshold - prev_threshold)
|
|
calculated_margin = self.round_to_UP(calculated_margin)
|
|
self.logger.debug(f"적용된 구간: {current_threshold}, 계산된 마진: {calculated_margin}")
|
|
return calculated_margin
|
|
|
|
# 구간을 넘어섰을 경우, 마지막 구간의 마진을 적용
|
|
result = self.round_to_UP(additional_margins[-1])
|
|
self.logger.debug(f"최대 구간 초과, 적용된 마진: {result}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"계산된 값 입력 실패: {e}", exc_info=True)
|
|
return 0 # 예외 발생 시 기본값 0 반환
|
|
|
|
|
|
def calc_initial_price(self, upper_avg_cost, margin_config, shipping_config, card = 0.04, min_margin = 0.24):
|
|
"""
|
|
초기에 모은 상품원가와 기준가격표를 기준으로 카드수수료, 최소마진등을 기반으로 계산 기준가격을 산출하고
|
|
이를 기반으로 테이블표에 의한 더하기마진, 해외배송비를 계산하는 함수.
|
|
|
|
Parameters:
|
|
- upper_avg_cost: 수집된 상품원가의 상위평균값
|
|
- card: 카드수수료. 기본값은 상품원가의 4%
|
|
- min_margin: 카드수수료가 반영된 상품원가에 대한 비율로 최소마진 추가. 기본값은 24%.
|
|
|
|
Returns:
|
|
- result (int): 초기값으로 계산된 초기가격
|
|
"""
|
|
try:
|
|
# 기본원가
|
|
initial_cost_price = upper_avg_cost + (upper_avg_cost * card) + (upper_avg_cost * min_margin)
|
|
initial_cost_price = self.round_to_UP(initial_cost_price)
|
|
self.logger.debug(f"원가에 카드수수료 {card*100}%, 기본마진 {min_margin*100}% 적용된 계산원가: {initial_cost_price}")
|
|
|
|
initail_shipping_cost = self.calculate_additional_margin_with_extended_thresholds(initial_cost_price, margin_config)
|
|
initail_shipping_cost = self.round_to_UP(initail_shipping_cost)
|
|
self.logger.debug(f"계산원가 기준 해외배송비: {initail_shipping_cost}")
|
|
|
|
initail_margin_cost = self.calculate_shipping_cost_with_extended_thresholds(10000, initial_cost_price, shipping_config)
|
|
initail_margin_cost = self.round_to_UP(initail_margin_cost)
|
|
self.logger.debug(f"계산원가 기준 더하기마진: {initail_margin_cost}")
|
|
|
|
result = int(initial_cost_price + initail_shipping_cost + initail_margin_cost)
|
|
self.logger.debug(f"원가기반 가격: {result}")
|
|
|
|
return result
|
|
except Exception as e:
|
|
self.logger.error(f"원가기반 가격 계산 중 중 오류 발생: {e}", exc_info=True)
|
|
return 100000 # 오류발생시 기본 가격 100,000 반환
|
|
|
|
async def get_option_count_from_text(self):
|
|
"""
|
|
텍스트 기반으로 옵션 수를 계산합니다. 옵션이 없는 경우 단일 상품으로 간주하여 기본값 1을 반환합니다.
|
|
|
|
Parameters:
|
|
- driver: WebDriver 인스턴스
|
|
|
|
Returns:
|
|
- total_options (int): 계산된 옵션 수, 단일 상품인 경우 1을 반환
|
|
"""
|
|
try:
|
|
|
|
# 요소를 기다림
|
|
option_count_text_element = await self.page.wait_for_selector(self.option_count_text_locator)
|
|
|
|
# 텍스트에서 숫자만 추출
|
|
option_text = await option_count_text_element.text_content()
|
|
match = re.search(r'\d+', option_text)
|
|
|
|
if match:
|
|
total_options = int(match.group()) # 숫자 추출
|
|
self.logger.debug(f"옵션 수: {total_options}")
|
|
return total_options
|
|
else:
|
|
self.logger.debug("옵션 수를 찾을 수 없음. 단일 상품으로 간주.")
|
|
return 1 # 옵션이 없는 경우 단일 상품으로 간주
|
|
|
|
except TimeoutError:
|
|
self.logger.error("옵션 수를 나타내는 텍스트를 찾지 못함. 기본적으로 단일 상품으로 간주.", exc_info=True)
|
|
return 1 # 옵션 수를 찾지 못한 경우 단일 상품으로 간주
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"옵션 수 계산 중 오류 발생: {e}", exc_info=True)
|
|
return 1 # 예외 발생 시 기본값 1 반환
|
|
|
|
|
|
async def collect_product_costs_and_prices(self):
|
|
"""
|
|
상품 원가와 판매가를 수집하여 반환합니다. 단위는 위안화
|
|
|
|
Parameters:
|
|
- driver: WebDriver 인스턴스
|
|
|
|
Returns:
|
|
- option_data (list): 각 옵션의 원가 및 가격 데이터
|
|
- min_cost (int): 최소 원가, 원화, 에러발생시 기본값 20000 반환
|
|
- max_cost (int): 최대 원가, 원화, 에러발생시 기본값 20000 반환
|
|
- avg_cost (int): 평균 원가, 원화, 에러발생시 기본값 20000 반환
|
|
- upper_avg_cost (int): 평균 원가와 최대값 사이의 평균값, 원화, 에러발생시 기본값 20000 반환
|
|
"""
|
|
try:
|
|
# 옵션 개수 가져오기 (두 가지 방법 중 하나 사용)
|
|
total_options = await self.get_option_count_from_text()
|
|
|
|
self.product_costs = [] # 모든 옵션의 상품원가를 저장할 리스트
|
|
self.option_data = [] # 각 옵션의 상품원가와 기준판매가를 저장할 리스트
|
|
|
|
# 옵션 개수만큼 순회하면서 값을 수집
|
|
for i in range(1, total_options + 1):
|
|
product_cost_locator = self.product_cost_locator_template.format(i=i+1)
|
|
standard_selling_price_locator = self.standard_selling_price_locator_template.format(i=i+1)
|
|
|
|
# 각 선택자를 사용하여 요소 찾기
|
|
product_cost_element = await self.page.wait_for_selector(product_cost_locator)
|
|
standard_selling_price_element = await self.page.wait_for_selector(standard_selling_price_locator)
|
|
|
|
cost_value = int(float(await product_cost_element.input_value().replace(",", "")))
|
|
price_value = int(await standard_selling_price_element.input_value().replace(",", ""))
|
|
|
|
self.product_costs.append(cost_value)
|
|
self.option_data.append({
|
|
"option_number": i,
|
|
"cost": cost_value,
|
|
"price": price_value
|
|
})
|
|
|
|
# 상품원가를 가격순으로 정렬
|
|
self.product_costs.sort()
|
|
|
|
# 최소값, 최대값, 평균값 계산
|
|
min_cost = self.convert_cny_to_krw(min(self.product_costs)) # 최소값 - 위안화>원화로 변환
|
|
min_cost = self.round_to_UP(min_cost) # 1000원 절상
|
|
|
|
max_cost = self.convert_cny_to_krw(max(self.product_costs)) # 최대값 - 위안화>원화로 변환
|
|
max_cost = self.round_to_UP(max_cost) # 1000원 절상
|
|
|
|
avg_cost = self.convert_cny_to_krw(sum(self.product_costs) / len(self.product_costs)) # 평균값 - 위안화>원화로 변환
|
|
avg_cost = self.round_to_UP(avg_cost) # 산술평균값 1000원 절상
|
|
|
|
upper_avg_cost = round(((avg_cost + max_cost) / 2)) # 상위평균값 - 위안화>원화로 변환
|
|
upper_avg_cost = self.round_to_UP(upper_avg_cost) # 상위평균값 1000원 절상
|
|
|
|
self.logger.debug(f"상품원가가 모였습니다.: {self.product_costs}")
|
|
self.logger.debug(f"최소원가: {min_cost}, 최대원가: {max_cost}, 평균원가: {avg_cost}, 상위평균원가: {upper_avg_cost}")
|
|
|
|
return self.option_data, min_cost, max_cost, avg_cost, upper_avg_cost
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to collect product costs and prices: {e}", exc_info=True)
|
|
# 오류 발생 시 기본값을 반환
|
|
return self.option_data, 20000, 20000, 20000, 20000
|
|
|
|
|
|
async def get_plusmargin_and_shipping_values(self):
|
|
"""
|
|
더하기 마진과 해외 배송비 값을 가져오는 메서드입니다.
|
|
|
|
Parameters:
|
|
- driver: WebDriver 인스턴스
|
|
|
|
Returns:
|
|
- (tuple): 더하기 마진(int), 해외 배송비(int)
|
|
"""
|
|
try:
|
|
# 더하기 마진 값 가져오기
|
|
margin_element = await self.page.wait_for_selector(self.selectors['plus_margin_locator'])
|
|
margin_value = await margin_element.input_value()
|
|
margin = int(margin_value.replace(",", "")) if margin_value else 0
|
|
self.logger.debug(f"더하기 마진 값: {margin}")
|
|
|
|
shipping_element = await self.page.wait_for_selector(self.selectors['oversea_shipping_locator'])
|
|
shipping_value = await shipping_element.input_value()
|
|
shipping_cost = int(shipping_value.replace(",", "")) if shipping_value else 0
|
|
self.logger.debug(f"해외 배송비 값: {shipping_cost}")
|
|
|
|
return margin, shipping_cost
|
|
except Exception as e:
|
|
self.logger.error(f"해외배송비와 더하기 마진 수집 중 오류 발생: {e}", exc_info=True)
|
|
return 5000, 10000
|
|
|
|
|
|
def set_shipping_config(self, min_price, thresholds, increment_unit, additional_costs):
|
|
"""
|
|
해외배송비 설정을 위한 딕셔너리를 생성합니다.
|
|
|
|
Parameters:
|
|
- min_price (int): 최소 추가 배송비가 적용되는 가격.
|
|
- thresholds (list): 기준 구간 리스트.
|
|
- increment_unit (int): 초과 단위.
|
|
- additional_costs (list): 각 구간에 대한 추가 비용 리스트.
|
|
|
|
Returns:
|
|
- config_shipping (dict): 해외배송비 설정 딕셔너리.
|
|
"""
|
|
return {
|
|
'min_price_for_extra_shipping': min_price,
|
|
'thresholds': thresholds,
|
|
'increment_unit': increment_unit,
|
|
'additional_costs': additional_costs
|
|
}
|
|
|
|
def set_margin_config(self, thresholds, additional_margins):
|
|
"""
|
|
더하기마진 설정을 위한 딕셔너리를 생성합니다.
|
|
|
|
Parameters:
|
|
- thresholds (list): 기준 구간 리스트.
|
|
- additional_margins (list): 각 구간에 대한 추가 마진 리스트.
|
|
|
|
Returns:
|
|
- config_margin (dict): 더하기마진 설정 딕셔너리.
|
|
"""
|
|
return {
|
|
'thresholds': thresholds,
|
|
'additional_margins': additional_margins
|
|
}
|
|
|
|
def set_optimal_price_config(self, sold_price, cost_price2X, calculated_price, lower_bound, upper_bound, ratios):
|
|
"""
|
|
적정판매가 설정을 위한 딕셔너리를 생성합니다.
|
|
|
|
Parameters:
|
|
- sold_price (optional): 팔린 가격.
|
|
- cost_price2X (int): 원가2배
|
|
- calculated_price (int): 계산된 판매가.
|
|
- lower_bound (float): 하한 비율.
|
|
- upper_bound (float): 상한 비율.
|
|
- ratios (dict): 각 가격 요소에 대한 비율.
|
|
|
|
Returns:
|
|
- config_optimal_price (dict): 적정판매가 설정 딕셔너리.
|
|
"""
|
|
return {
|
|
'sold_price': sold_price,
|
|
'cost_price2X': cost_price2X,
|
|
'calculated_price': calculated_price,
|
|
'lower_bound': lower_bound,
|
|
'upper_bound': upper_bound,
|
|
'ratios': ratios
|
|
}
|
|
|
|
def round_to_UP(number, nearest=1000):
|
|
"""
|
|
숫자를 주어진 'nearest' 값에 맞춰 올림합니다.
|
|
|
|
Parameters:
|
|
- number (float or int): 올림할 숫자
|
|
- nearest (int): 올림할 단위 (기본값 1000)
|
|
|
|
Returns:
|
|
- rounded_number (int): 올림된 숫자
|
|
"""
|
|
# nearest 값으로 나눈 후 math.ceil을 사용하여 올림
|
|
rounded_number = math.ceil(number / nearest) * nearest
|
|
return rounded_number
|
|
|
|
def convert_cny_to_krw(cny, exchange_rate=200):
|
|
"""
|
|
위안화 금액을 원화로 변환하는 함수
|
|
|
|
:param yuan: 변환할 위안화 금액
|
|
:param exchange_rate: 위안화에서 원화로 변환할 환율 (기본값: 200)
|
|
:return: 변환된 원화 금액
|
|
"""
|
|
krw = cny * exchange_rate
|
|
return krw
|
|
|
|
def calculate_category_extra_shipping(self, category: str, product_price: int) -> int:
|
|
"""
|
|
카테고리와 상품가를 기반으로 추가 해외배송비를 계산합니다.
|
|
|
|
Parameters:
|
|
- category (str): 카테고리명
|
|
- product_price (int): 상품 가격
|
|
|
|
Returns:
|
|
- total_extra_shipping (int): 계산된 추가 해외배송비
|
|
"""
|
|
total_extra_shipping = 0
|
|
|
|
# locator_manager에서 카테고리별 해외배송비 설정을 불러옴
|
|
category_data = self.locator_manager.get_category_data(category)
|
|
|
|
if category_data: # 카테고리 설정이 있는지 확인
|
|
threshold_index = 1
|
|
|
|
# threshold와 extra_shipping의 dynamic한 처리
|
|
while True:
|
|
threshold_key = f'threshold_{threshold_index}'
|
|
extra_shipping_key = f'extra_shipping_{threshold_index}'
|
|
unit_key = f'unit_{threshold_index}'
|
|
|
|
# threshold_key가 존재하지 않으면 종료
|
|
if threshold_key not in category_data or extra_shipping_key not in category_data or unit_key not in category_data:
|
|
break
|
|
|
|
threshold = int(category_data.get(threshold_key, 0))
|
|
extra_shipping = int(category_data.get(extra_shipping_key, 0))
|
|
unit = int(category_data.get(unit_key, 0))
|
|
|
|
if product_price > threshold:
|
|
excess_amount = product_price - threshold
|
|
increments = excess_amount // unit
|
|
total_extra_shipping += increments * extra_shipping
|
|
self.logger.debug(f"{category} 카테고리, threshold {threshold}을 초과하여 추가 해외배송비 {increments * extra_shipping} 추가 적용")
|
|
|
|
threshold_index += 1 # 다음 구간으로 이동
|
|
else:
|
|
self.logger.debug(f"카테고리 {category}는 추가 해외배송비 규칙이 없습니다. 추가 해외배송비는 0으로 설정됩니다.")
|
|
|
|
self.logger.debug(f"총 추가 해외배송비: {total_extra_shipping}")
|
|
return total_extra_shipping
|