1
0
Fork 0

2차대규모 수정

This commit is contained in:
K.H.CHOI 2024-04-01 23:51:39 +09:00
parent e4e8f3a873
commit 9e768e589b
13 changed files with 826 additions and 398 deletions

170
database_with_mongo.py Normal file
View File

@ -0,0 +1,170 @@
from datetime import datetime
# 로거 인스턴스 가져오기
import logging
logger = logging.getLogger('default_logger')
class AutoPercentyProductsDB:
def __init__(self, mongo_config):
self.mongo_config = mongo_config
self.db = mongo_config.client['taobao_project']
self.collection = self.db['AutoPercenty_products']
def initialize_process_steps(self, product_id, user):
steps = {'tag_modification': 'incomplete', 'option_modification': 'incomplete',
'detail_page_modification': 'incomplete', 'thumbnail_modification': 'incomplete',
'price_modification': 'incomplete', 'title_modification': 'incomplete'}
document = {'product_id': product_id, 'process_steps': steps, 'user': user, 'created_at': datetime.now()}
self.collection.insert_one(document)
logger.info(f"Initialized process steps for product_id {product_id} by user {user}.")
# def mark_product_processed(self, product_id, current_user):
# # 상품 처리 정보가 이미 존재하는지 확인
# if not self.is_product_processed(product_id):
# document = {
# 'product_id': product_id,
# 'user_name': current_user,
# 'process_time': datetime.now(),
# # 초기 단계 정보는 여기에 포함하지 않음
# }
# self.collection.insert_one(document)
# print(f"상품 ID {product_id}의 처리가 시작되었습니다.")
# else:
# print(f"상품 ID {product_id}에 대한 처리 정보는 이미 존재합니다.")
def finalize_product_processing(self, product_id, current_user, product_info):
incomplete_steps = self.get_incomplete_steps(product_id)
if not incomplete_steps:
self.collection.update_one(
{'product_id': product_id},
{'$set': {
'finalized': True,
'finalized_time': datetime.now(),
'finalized_by': current_user,
'product_info': product_info.to_dict() # 상품 정보 딕셔너리로 저장
}}
)
logger.info(f"Product processing finalized for {product_id}")
else:
logger.warning(f"Cannot finalize product {product_id} with incomplete steps: {incomplete_steps}")
def is_product_processed(self, product_id):
# 상품의 현재 처리 단계 정보를 가져옴
process_steps = self.get_current_step(product_id)
# 상품에 대한 기본 정보 조회
product_info = self.collection.find_one({'product_id': product_id}, {'_id': 0, 'user_name': 1, 'process_time': 1})
if product_info:
# 모든 처리 단계가 완료되었는지 확인
all_steps_completed = all(status == 'complete' for status in process_steps.values()) if process_steps else False
# 조회 결과에 처리 단계의 완료 여부 추가
product_info['all_steps_completed'] = all_steps_completed
return product_info
else:
return False
def update_process_step(self, product_id, step_name, user):
update_result = self.collection.update_one(
{'product_id': product_id, f'process_steps.{step_name}': 'incomplete'},
{'$set': {f'process_steps.{step_name}': 'complete', 'last_updated_by': user, 'last_updated_at': datetime.now()}}
)
if update_result.modified_count == 0:
logger.info(f"Step {step_name} for product_id {product_id} is already complete or product does not exist.")
else:
logger.info(f"Updated step {step_name} to complete for product_id {product_id} by user {user}.")
def get_incomplete_steps(self, product_id):
product = self.collection.find_one({'product_id': product_id}, {'process_steps': 1, '_id': 0})
if product:
logger.debug("완료되지 않은 스텝 찾음")
return [step for step, status in product['process_steps'].items() if status == 'incomplete']
else:
logger.warning(f"해당 ID의 상품을 찾을 수 없습니다. : {product_id}.")
return []
def is_step_incomplete(self, product_id):
"""특정 상품의 단계가 완료되지 않았는지 여부를 반환합니다."""
product = self.collection.find_one({'product_id': product_id}, {'finalized': 1})
if product:
incomplete_steps = self.get_incomplete_steps(product_id)
if product.get('finalized'):
logger.debug(f"상품 ID {product_id}: 완료")
return True
elif len(incomplete_steps) < 6:
logger.debug(f"상품 ID {product_id}: 미완성 목록을 반환")
return incomplete_steps
else:
logger.debug(f"상품 ID {product_id}: 기록없음")
return False
def get_current_step(self, product_id):
result = self.collection.find_one({'product_id': product_id}, {'_id': 0, 'process_steps': 1})
if result and 'process_steps' in result:
return result['process_steps']
else:
return {}
def toggle_process_step(self, product_id, step_name):
# 상품의 현재 처리 단계 상태를 확인
product = self.collection.find_one({'product_id': product_id}, {'process_steps': 1})
if not product or 'process_steps' not in product:
print(f"상품 ID {product_id}에 대한 정보가 없거나, 처리 단계 정보가 없습니다.")
return
current_status = product['process_steps'].get(step_name, 'incomplete')
new_status = 'complete' if current_status == 'incomplete' else 'incomplete'
# 처리 단계 상태 전환
self.collection.update_one(
{'product_id': product_id},
{'$set': {f'process_steps.{step_name}': new_status}}
)
print(f"상품 ID {product_id}{step_name} 단계가 {new_status} 상태로 전환되었습니다.")
def reset_product_by_id(self, product_id):
# 상품 ID에 해당하는 문서를 데이터베이스에서 삭제
result = self.collection.delete_one({'product_id': product_id})
if result.deleted_count > 0:
print(f"상품 ID {product_id}에 대한 정보가 성공적으로 초기화되었습니다.")
else:
print(f"상품 ID {product_id}에 대한 정보를 찾을 수 없습니다.")
def complete_all_steps_for_product(self, product_id):
# 모든 처리 단계를 'complete'로 설정
steps = {
'tag_modification': 'complete',
'option_modification': 'complete',
'detail_page_modification': 'complete',
'thumbnail_modification': 'complete',
'price_modification': 'complete',
'title_modification': 'complete'
}
# 현재 시간
current_time = datetime.now()
# 상품 ID에 해당하는 문서를 업데이트하여 모든 단계를 완료로 설정하고, 최종 완료 표시
result = self.collection.update_one(
{'product_id': product_id},
{'$set': {
'process_steps': steps,
'finalized': True,
'finalized_time': current_time
}}
)
if result.matched_count > 0:
print(f"상품 ID {product_id}의 모든 처리 단계가 완료되었으며, 최종 처리가 완료되었습니다.")
else:
print(f"상품 ID {product_id}에 대한 정보를 찾을 수 없습니다.")

58
db_transport.py Normal file
View File

@ -0,0 +1,58 @@
import sqlite3
from pymongo import MongoClient
from datetime import datetime
def sqlite_to_mongodb(sqlite_db_path, mongo_uri, mongo_db_name, mongo_collection_name):
# SQLite 데이터베이스 연결
sqlite_conn = sqlite3.connect(sqlite_db_path)
sqlite_cursor = sqlite_conn.cursor()
# MongoDB 연결
mongo_client = MongoClient(mongo_uri)
mongo_db = mongo_client[mongo_db_name]
mongo_collection = mongo_db[mongo_collection_name]
# SQLite에서 데이터 조회
sqlite_cursor.execute("SELECT product_id FROM processed_products")
sqlite_products = sqlite_cursor.fetchall()
# 데이터 가공 및 MongoDB에 저장
for sqlite_product in sqlite_products:
product_id_raw = sqlite_product[0]
# 'ID:' 부분 제거
product_id = product_id_raw.replace('ID:', '').strip()
steps = {
'tag_modification': 'complete',
'option_modification': 'complete',
'detail_page_modification': 'complete',
'thumbnail_modification': 'complete',
'price_modification': 'complete',
'title_modification': 'complete'
}
document = {
'product_id': product_id,
'user_name': 'leensoo1nt@gmail.com', # 처리를 시작하는 사용자
'process_time': datetime.now(), # 처리 시작 시간
'process_steps': steps, # 처리 단계 및 상태
'finalized': True, # 최종 완료 여부
'finalized_by':'leensoo1nt@gmail.com',
'finalized_time':datetime.now()
}
# MongoDB에 문서 저장
mongo_collection.insert_one(document)
print("SQLite 데이터가 MongoDB로 이전되었습니다.")
# 연결 닫기
sqlite_conn.close()
mongo_client.close()
# 스크립트 실행 예제
sqlite_db_path = 'products.db'
mongo_uri = 'mongodb://root:1234@cckb9998.synology.me:27017'
mongo_db_name = 'taobao_project'
mongo_collection_name = 'AutoPercenty_products'
sqlite_to_mongodb(sqlite_db_path, mongo_uri, mongo_db_name, mongo_collection_name)

View File

@ -88,3 +88,22 @@ def return_element(driver, element_type, element_identifier, wait_time=10):
except Exception as e:
logger.debug(f"예상치 못한 오류 발생: {e}")
return None
def wait_element(driver, element_type, element_identifier, wait_time=10):
try:
element = WebDriverWait(driver, wait_time).until(
EC.visibility_of_element_located((getattr(By, element_type), element_identifier))
)
if element:
logger.debug(f"'{element_identifier}' 요소를 {wait_time} 초 이내에 찾음")
return True
except TimeoutException:
logger.error(f"에러 : '{element_identifier}' 요소를 {wait_time} 초 이내에 찾지 못함.")
return False
except Exception as e:
logger.error(f"예상치 못한 오류 발생: {e}")
return False

View File

@ -298,6 +298,10 @@ def NS_info_with_HTML_ori(products):
def naver_prices(products):
prices = []
for product in products:
if product["price"] is None:
logger.debug("가격 정보가 없는 제품이 있습니다.")
continue # 가격 정보가 없으면, 이 제품을 건너뛰고 다음 제품으로 넘어갑니다.
if isinstance(product["price"], str):
price = int(product["price"].replace(",", ""))
logger.debug(f"정수변환 가격 : {price}")
@ -307,12 +311,17 @@ def naver_prices(products):
prices.append(price)
if not prices: # prices 리스트가 비어 있는 경우를 처리
logger.error("가격 정보가 없습니다.")
return None, None, None
low_price = min(prices)
avg_price = sum(prices) / len(prices)
high_price = max(prices)
return low_price, avg_price, high_price
def create_product_card(product_name, image_url, price, rank, purchase, review):
# 카드의 크기를 150x150px로 조정
product_card = f"""
@ -382,7 +391,7 @@ def NS_info_markdown(products):
return product_info_md
def modify_detail_page(driver, gemini, product_info, delv_collection, json_naver_codes):
def modify_detail_page(driver, product_info, gemini, delv_collection, json_naver_codes):
product_title = product_info.init_title
product_low_cost = product_info.tao_low_price
product_high_cost = product_info.tao_high_price
@ -407,6 +416,8 @@ def modify_detail_page(driver, gemini, product_info, delv_collection, json_naver
# 상세페이지 탭으로 이동
# detail_tab = driver.find_element(By.ID, "rc-tabs-0-tab-5")
try:
detail_tab_xpath = ".ant-tabs-tab:nth-child(6)"
detail_tab = WebDriverWait(driver, 10).until(
@ -432,19 +443,19 @@ def modify_detail_page(driver, gemini, product_info, delv_collection, json_naver
# detail_content = driver.find_element(By.XPATH, "//div[@id='productMainContentContainerId']/div/div/div[2]/div[2]/div[2]/div")
# detail_content = driver.find_element(By.ID, "rc-tabs-0-tab-5")
try:
ActionChains(driver).move_to_element(detail_content).click().perform()
time.sleep(1)
logger.debug("AI 이미지 처리중")
# contents = bard_img(image_src)
# contents = gemini.generate_description(image_src, product_title)
contents = safe_generate_content(gemini, image_src, product_title)
logger.debug(f"{contents}")
product_info.ai_contents = contents
aicontents = safe_generate_content(gemini, image_src, product_title)
logger.debug(f"{aicontents}")
product_info.ai_contents = aicontents
logger.debug("AI 이미지 처리 완료")
weight = extract_weight(contents)
weight = extract_weight(aicontents)
logger.debug(f"무게 추정 : {weight}")
product_info.weight = weight
@ -472,6 +483,8 @@ def modify_detail_page(driver, gemini, product_info, delv_collection, json_naver
product_info_text = NS_info_with_HTML(products)
# logger.debug(f"수집된 정보 \n {product_info_text} \n")
logger.debug("네이버쇼핑 파싱 완료")
except Exception as e:
logger.debug(f"상세페이지 편집 중 에러발생 : {e}")
try:
html_btn_by_contains_xpath="//button[contains(.,'소스')]"
@ -503,6 +516,7 @@ def modify_detail_page(driver, gemini, product_info, delv_collection, json_naver
except Exception as e:
logger.debug(f"textarea버튼 요소를 찾을 수 없습니다. : {e}")
try:
# # data-value 속성 값을 가져옵니다.
current_value = textarea.get_attribute("data-value")
# logger.debug(f"현재 속성값 :{current_value}")
@ -576,9 +590,10 @@ def modify_detail_page(driver, gemini, product_info, delv_collection, json_naver
logger.debug("HTML 수정 버튼 다시 클릭하여 기본편집페이지로 돌아감.")
time.sleep(0.2)
except Exception as e:
logger.debug(f"HTML 전환 수정 중 에러발생 : {e}")
try:
# 상세페이지 내용에 텍스트 추가
logger.debug("커서위치 맞추기")
detail_content.send_keys(Keys.LEFT)
@ -592,8 +607,21 @@ def modify_detail_page(driver, gemini, product_info, delv_collection, json_naver
detail_content.send_keys(Keys.HOME)
detail_content.send_keys(Keys.HOME)
# contents 변수의 값이 None이 아닐 때만 send_keys 메서드를 호출합니다.
if aicontents is not None:
detail_content.send_keys(aicontents)
else:
# contents 변수가 None일 때의 대체 처리
# 예: detail_content.send_keys("") 또는 아무 동작도 수행하지 않음
pass
time.sleep(0.5)
detail_content.send_keys(Keys.ENTER)
detail_content.send_keys(Keys.ENTER)
detail_content.send_keys(Keys.ENTER)
except Exception as e:
logger.error(f"AI 컨텐츠 입력 중 에러발생 : {e}")
logger.debug("상세페이지 이미지 번역 시작")
@ -634,16 +662,7 @@ def modify_detail_page(driver, gemini, product_info, delv_collection, json_naver
# quote_button.click()
# time.sleep(0.5)
# contents 변수의 값이 None이 아닐 때만 send_keys 메서드를 호출합니다.
if contents is not None:
detail_content.send_keys(contents)
else:
# contents 변수가 None일 때의 대체 처리
# 예: detail_content.send_keys("") 또는 아무 동작도 수행하지 않음
pass
time.sleep(0.5)
try:
# 가격 정보 계산
low_cost, low_margin = selling_price(product_low_cost, delv_fee, 0.29)
high_cost, high_margin = selling_price(product_high_cost, delv_fee, 0.29)
@ -679,6 +698,8 @@ def modify_detail_page(driver, gemini, product_info, delv_collection, json_naver
detail_content.send_keys(add_text)
time.sleep(2)
except Exception as e:
logger.debug(f"가격정보 생성 중 에러 : {e}")
# 저장 버튼 클릭
# try:
@ -690,7 +711,10 @@ def modify_detail_page(driver, gemini, product_info, delv_collection, json_naver
# logger.debug(f"저장 버튼 요소를 찾을 수 없습니다. : {e}")
# save_button = driver.find_element(By.XPATH, "//button[contains(.,'저장하기')]")
# save_button.click()
logger.debug("상세페이지 편집 저장")
click_element(driver, "XPATH", save_button_xpath, 5, 'js')
time.sleep(0.5)
logger.debug("상세페이지 편집 완료")

View File

@ -58,17 +58,17 @@ def modify_option_page(driver, product_info):
# 현재 전체 옵션갯수 가져오기
# 각 옵션 타입별로 XPath를 문자열 포맷을 사용하여 한 줄로 작성
option_num_xpaths = [
"//div[@id='productMainContentContainerId']/div/div[2]/div/div/div[2]/div/div[{}]/div/div/div[2]/div/div/div[5]/div/div/label/span[2]".format(idx)
"//div[@id='productMainContentContainerId']/div/div[2]/div/div/div[2]/div/div[{}]/div/div/div[2]/div/div/div[4]/div/div/label/span[2]".format(idx)
for idx in range(1, 4) # 옵션 타입이 3개까지 있음
]
# 각 옵션 타입의 갯수를 가져와서 options_info에 업데이트
for idx, option_num_xpath in enumerate(option_num_xpaths, start=1):
for idx, option_num_xpath in enumerate(option_num_xpaths[:option_type_nums], start=1):
try:
option_num_element = return_element(driver, 'XPATH', option_num_xpath, 3)
option_num_element = return_element(driver, 'XPATH', option_num_xpath, 5)
if option_num_element:
option_number = int(re.search(r'\d+', option_num_element.text).group())
logger.debug(f"옵션타입 {idx} 갯수 : {option_number}")
logger.debug(f"옵션타입 [{idx}] 의 갯수 : {option_number}")
options_info[f'option_type_{idx}']['options_count'] = option_number
else:
logger.debug(f"옵션타입 {idx} 요소를 찾을 수 없음.")
@ -142,7 +142,7 @@ def modify_option_page(driver, product_info):
try:
logger.debug(f"옵션타입 {option_idx}에 대한 처리 시작")
logger.debug("가격 낮은 순으로 정렬")
low_price_order_xpath = f"//div[{option_idx}]/div/div/div[2]/div/div/div[5]/div/div[3]/button"
low_price_order_xpath = f"//div[{option_idx}]/div/div/div[2]/div/div/div[4]/div[2]/div[2]/div/div[3]/button"
click_element(driver, 'XPATH', low_price_order_xpath, 10, 'js')
edit_option(driver, option_idx, options_info[f'option_type_{option_idx}']['options_count']) # 각 옵션의 전체 체크박스 해제
# 가격 낮은 순으로 재정렬
@ -189,7 +189,9 @@ def edit_option(driver, option_type, option_count):
try:
# 전체 체크박스 선택 해제
logger.debug("전체 체크박스 선택 해제")
select_all_checkbox_xpath = f"//div[@id='productMainContentContainerId']/div/div[2]/div/div/div[2]/div/div[{option_type}]/div/div/div[2]/div/div/div[5]/div/div/label/span"
# select_all_checkbox_xpath = f"//div[@id='productMainContentContainerId']/div/div[2]/div/div/div[2]/div/div[{option_type}]/div/div/div[2]/div/div/div[5]/div/div/label/span"
select_all_checkbox_xpath = f"//div[{option_type}]/div/div/div[2]/div/div/div[4]/div[2]/div[1]/label/span[1]"
click_element(driver, 'XPATH', select_all_checkbox_xpath, 10, 'js')
logger.debug(f"옵션타입 {option_type} 전체 선택 체크박스 해제")
time.sleep(1)
@ -199,14 +201,14 @@ def edit_option(driver, option_type, option_count):
if option_count <= 2:
# 옵션 갯수가 2개 이하면 모든 옵션 체크
for i in range(1, option_count + 1):
option_xpath = f"//div[{option_type}]/div/div/div[2]/div/div/div[6]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[1]/label/span"
option_xpath = f"//div[{option_type}]/div/div/div[2]/div/div/div[5]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[1]/label/span"
click_element(driver, 'XPATH', option_xpath, 10, 'ac')
logger.debug(f"옵션 {i} 체크")
time.sleep(0.5)
else:
# 옵션 갯수가 3개 이상인 경우, 첫 번째 옵션 제외 최대 5개 옵션 체크
for i in range(2, min(option_count, 6) + 1): # 첫 번째 옵션 제외, 최대 5개 선택
option_xpath = f"//div[{option_type}]/div/div/div[2]/div/div/div[6]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[1]/label/span"
option_xpath = f"//div[{option_type}]/div/div/div[2]/div/div/div[5]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[1]/label/span"
click_element(driver, 'XPATH', option_xpath, 10, 'ac')
logger.debug(f"옵션 {i} 체크")
time.sleep(0.5)
@ -235,11 +237,11 @@ def option_name_trans(driver, option_type_number, option_count, allowed_special_
try:
for i in range(1, option_count + 1):
if option_type_number == 1:
ori_optionName_xpath = f"//div[{option_type_number}]/div/div/div[2]/div/div/div[6]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[3]/div[3]/span"
ori_optionName_xpath = f"//div[{option_type_number}]/div/div/div[2]/div/div/div[5]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[3]/div[3]/span"
elif option_type_number == 2:
ori_optionName_xpath = f"//div[{option_type_number}]/div/div/div[2]/div/div/div[6]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[2]/div[3]/span"
ori_optionName_xpath = f"//div[{option_type_number}]/div/div/div[2]/div/div/div[5]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[2]/div[3]/span"
else: # option_type_number == 3
ori_optionName_xpath = f"//div[{option_type_number}]/div/div/div[2]/div/div/div[6]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[2]/div[4]/div/span"
ori_optionName_xpath = f"//div[{option_type_number}]/div/div/div[2]/div/div/div[5]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[2]/div[4]/div/span"
ori_optionName_element = driver.find_element(By.XPATH, ori_optionName_xpath)
ori_optionName = ori_optionName_element.text.strip()
@ -272,8 +274,7 @@ def option_name_trans(driver, option_type_number, option_count, allowed_special_
# 번역된 옵션명을 각 input 요소에 입력
try:
for i, trans_optionName in enumerate(trans_optionNames, 1):
optionName_input_xpath = f"//div[{option_type_number}]/div/div/div[2]/div/div/div[6]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[3]/div[2]/div[1]/span/input" if option_type_number != 2 else f"//div[{option_type_number}]/div/div/div[2]/div/div/div[6]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[2]/div[2]/div[1]/span/input"
optionName_input_xpath = f"//div[{option_type_number}]/div/div/div[2]/div/div/div[5]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[3]/div[2]/div[1]/span/input" if option_type_number != 2 else f"//div[{option_type_number}]/div/div/div[2]/div/div/div[5]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[2]/div[2]/div[1]/span/input"
optionName_input_element = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.XPATH, optionName_input_xpath)))
@ -333,8 +334,7 @@ def update_price_range(driver, options_info, product_info):
for i in range(1, option_number + 1):
option_type_number = [int(s) for s in option_type_key.split('_') if s.isdigit()][0]
middle_index = '3' if option_type_number == 1 else '2'
price_xpath = f"//div[{option_type_number}]/div/div/div[2]/div/div/div[6]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[{middle_index}]/div[1]/div[2]/span/sup"
price_xpath = f"//div[{option_type_number}]/div/div/div[2]/div/div/div[5]/div[1]/div/div/ul/li[{i}]/div/div[1]/div/div[{middle_index}]/div[1]/div[2]/span/sup"
try:
price_element = return_element(driver, 'XPATH', price_xpath, 5)
price_text = re.sub(r'[^\d\-]', '', price_element.text.strip())

View File

@ -15,10 +15,21 @@ logger = logging.getLogger('default_logger')
def modify_price_page(driver, product_infos):
# tao_high_price = product_infos.option_high_price
# tao_low_price = product_infos.option_low_price
tao_high_price = product_infos.tao_high_price # 디버깅을 위해 선택된 옵션가격이 아닌 기존의 원래가격을 가져옴
tao_low_price = product_infos.tao_low_price # 디버깅을 위해 선택된 옵션가격이 아닌 기존의 원래가격을 가져옴
# 가격 탭으로 이동
logger.debug("가격탭으로 이동")
option_tab_XPATH = "//div[@id='rc-tabs-0-tab-3']"
keyword_tab_CSS = ".ant-tabs-tab:nth-child(3)"
click_element(driver, 'CSS_SELECTOR', keyword_tab_CSS, 10, 'js')
logger.debug("가격탭으로 이동 완료")
logger.debug("페이지 로딩 대기")
time.sleep(2) # 페이지 로딩 대기.
option_high_price = product_infos.option_high_price
option_low_price = product_infos.option_low_price
# tao_high_price = product_infos.tao_high_price # 디버깅을 위해 선택된 옵션가격이 아닌 기존의 원래가격을 가져옴
# tao_low_price = product_infos.tao_low_price # 디버깅을 위해 선택된 옵션가격이 아닌 기존의 원래가격을 가져옴
naver_low_price = product_infos.naver_low_price
naver_avg_price = product_infos.naver_avg_price
@ -27,15 +38,19 @@ def modify_price_page(driver, product_infos):
packing_fee = product_infos.packing_fee
inputs = {
"tao_low_price": tao_high_price,
"tao_high_price": tao_low_price,
"option_low_price": option_low_price,
"option_high_price": option_high_price,
"delv_fee": w_delv_fee,
"packing_fee": packing_fee,
"ns_low_price": naver_low_price,
"ns_high_price": naver_high_price
}
logger.debug(f"가격계산 INPUT 요소 : {inputs}")
outputs = calculate_margin_and_price(inputs)
logger.debug(f"가격계산 OUTPUT 요소 : {outputs}")
selling_price = outputs["selling_price"]
final_margin_rate = outputs["final_margin_rate"]
seller_cost = outputs["seller_cost"]
@ -43,6 +58,7 @@ def modify_price_page(driver, product_infos):
ns_avg_price = outputs["ns_avg_price"]
return_fee = selling_price/2
init_delv_fee = seller_cost
exchange_fee = return_fee + init_delv_fee

View File

@ -1,7 +1,14 @@
import logging
# 로거 인스턴스 가져오기
logger = logging.getLogger('default_logger')
def calculate_margin_and_price(inputs):
logger.debug("calculate_margin_and_price 계산 시작")
# 입력값 처리
tao_low_price = inputs["tao_low_price"]
tao_high_price = inputs["tao_high_price"]
option_low_price = inputs["option_low_price"]
option_high_price = inputs["option_high_price"]
delv_fee = inputs["delv_fee"]
packing_fee = inputs["packing_fee"]
ns_low_price = inputs["ns_low_price"]
@ -10,6 +17,9 @@ def calculate_margin_and_price(inputs):
# 계산 로직
ns_avg_price = (ns_low_price + ns_high_price) / 2
logger.debug(f"ns_avg_price : {ns_avg_price}")
option_avg_price = (option_low_price + option_high_price) / 2
logger.debug(f"option_avg_price : {option_avg_price}")
tao_cost = lambda price: price * 190 * 1.035
base_margin = lambda cost: cost * 0.12
@ -28,7 +38,9 @@ def calculate_margin_and_price(inputs):
return selling_price, final_margin_rate
seller_cost = tao_cost(tao_low_price) + delv_fee + packing_fee
seller_cost = tao_cost(option_avg_price) + delv_fee + packing_fee
logger.debug(f"seller_cost : {seller_cost}")
mall_cost = seller_cost * 0.13
selling_price = seller_cost + plus_margin + mall_cost
selling_price, final_margin_rate = adjust_plus_margin_for_min_margin(seller_cost, mall_cost, selling_price)
@ -38,7 +50,7 @@ def calculate_margin_and_price(inputs):
"selling_price": selling_price,
"final_margin_rate": final_margin_rate,
"seller_cost": seller_cost,
"base_margin": base_margin(tao_cost(tao_low_price)),
"base_margin": base_margin(tao_cost(option_avg_price)),
"plus_margin": plus_margin,
"ns_avg_price": ns_avg_price
}

View File

@ -46,3 +46,41 @@ class ProductInfo:
def add_detail(self, key, value):
self.details[key] = value
def to_dict(self):
return {
'id': self.id,
'init_title': self.init_title,
'modify_title': self.modify_title,
'trans_title': self.trans_title,
'tao_high_price': self.tao_high_price,
'tao_low_price': self.tao_low_price,
'option_high_price': self.option_high_price,
'option_low_price': self.option_low_price,
'main_image_url': self.main_image_url,
'per_cat_code': self.per_cat_code,
'naver_code': self.naver_code,
'naver_low_price': self.naver_low_price,
'naver_avg_price': self.naver_avg_price,
'naver_high_price': self.naver_high_price,
'weight': self.weight,
'w_delv_fee': self.w_delv_fee,
'packing_fee': self.packing_fee,
'plus_fee': self.plus_fee,
'return_fee': self.return_fee,
'init_delv_fee': self.init_delv_fee,
'exchange_fee': self.exchange_fee,
'ai_contents': self.ai_contents,
'current_value': self.current_value,
'new_value': self.new_value,
'naver_products': self.naver_products,
'option_names': self.option_names,
'trans_option_names': self.trans_option_names,
'trans_option_name_parts': self.trans_option_name_parts,
'option_image_urls': self.option_image_urls,
'trans_option_image_urls': self.trans_option_image_urls,
'detail_image_urls': self.detail_image_urls,
'trans_detail_image_urls': self.trans_detail_image_urls,
'thumb_image_urls': self.thumb_image_urls,
'trans_thumb_image_urls': self.trans_thumb_image_urls,
}

View File

@ -32,8 +32,8 @@ def image_trans(image_url, convert_type):
logger.debug("텍스트가 없는 이미지")
pil_img = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
return pil_img
logger.debug("Detected texts:", detected_texts)
# logger.debug("Detected texts:", detected_texts)
logger.debug("Detected texts: %s", detected_texts)
# 처리된 이미지 경로 설정
img_name = "output"
thumb_image_path = f'img/{img_name}_thumbnail.jpg'

View File

@ -3,18 +3,21 @@ from PIL import Image # PIL 라이브러리가 필요합니다.
import cv2
import matplotlib.pyplot as plt
import numpy as np
import os
import os, sys
import logging
# 로거 인스턴스 가져오기
logger = logging.getLogger('default_logger')
# 현재 작업 디렉터리를 동적으로 받아오기
current_path = os.getcwd()
try:
# 실행 파일이 PyInstaller로 패키징된 경우와 개발 중인 경우를 모두 고려하여 경로 설정
base_path = getattr(sys, '_MEIPASS', os.path.dirname(os.path.abspath(__file__)))
# 모델 파일을 저장할 경로 설정
det_model_path = os.path.join(current_path, "models", "det")
rec_model_path = os.path.join(current_path, "models", "rec")
cls_model_path = os.path.join(current_path, "models", "cls") # 분류 모델 경로 추가
det_model_path = os.path.join(base_path, "models", "det")
rec_model_path = os.path.join(base_path, "models", "rec")
cls_model_path = os.path.join(base_path, "models", "cls") # 분류 모델 경로 추가
# models 폴더가 없으면 자동으로 생성
os.makedirs(det_model_path, exist_ok=True)
@ -27,6 +30,8 @@ ocr = PaddleOCR(use_angle_cls=True, lang="ch",
rec_model_dir=rec_model_path,
cls_model_dir=cls_model_path) # 분류 모델 경로 지정
except Exception as e:
logger.error(f"PaddleOCR 설정 중 에러발생 : {e}")
def detect_text(image):
# 이미지에서 텍스트 감지 및 인식

22
main.py
View File

@ -20,13 +20,16 @@ from PyQt5 import QtCore, QtWidgets
from logger_module import setup_logger
import logging
mongo_config = MongoConfig() # MongoDB 설정 관리 인스턴스 생성
mongo_config.try_connect(*mongo_config.load_config()) # MongoDB에 연결 시도
logger = setup_logger('default_logger', 'application.log', level=logging.DEBUG)
def DB_setting():
QtCore.QCoreApplication.setAttribute(QtCore.Qt.AA_ShareOpenGLContexts)
app = QtWidgets.QApplication(sys.argv)
# MongoDB 설정 및 연결 시도
mongo_config = MongoConfig()
if not mongo_config.try_connect(*mongo_config.load_config()):
logger.debug("MongoDB에 연결할 수 없습니다. 설정을 확인해주세요.")
sys.exit(-1) # MongoDB 연결 실패 시, 애플리케이션 종료
login_widget = LoginWidget(mongo_config) # MongoDB 설정을 인자로 전달하려면 여기에 추가하세요.
login_widget.show()
result = app.exec_() # 이벤트 루프를 실행하고 종료 코드를 반환
@ -42,10 +45,10 @@ def DB_setting():
"login_result": login_widget.result # 로그인 결과도 함께 반환
}
return login_info, result
return login_info, result, mongo_config
def main():
login_info, app_result = DB_setting() # 로그인 정보와 QApplication 실행 결과를 받음
login_info, app_result, mongo_config = DB_setting() # 로그인 정보와 QApplication 실행 결과를 받음
if not login_info["login_result"]:
logger.debug("사용자 인증 실패로 프로그램을 종료합니다.")
@ -93,8 +96,8 @@ def main():
# 웨일 WebDriver를 사용하여 네이버 웨일 브라우저 실행
#driver = webdriver.Chrome(service=service, options=chrome_options)
# 데이터베이스 설정
setup_database()
# # 데이터베이스 설정
# setup_database()
# 로그인
driver.get(WEBSITE_URL)
@ -102,10 +105,11 @@ def main():
login(driver, login_info) # login 함수에 로그인 정보를 전달하여 호출
#상품 수정 작업 수행
modify_products(driver, gemini, mongo_config, set_num_modify)
modify_products(driver, gemini, mongo_config, login_info, set_num_modify)
#웹 드라이버 종료
driver.quit()
if __name__ == "__main__":
logger = setup_logger('default_logger', 'application.log', level=logging.DEBUG)
main()

View File

@ -1,25 +1,26 @@
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import time, re, math
import json
# from utils import log
from database import is_product_processed, mark_product_processed
from database_with_mongo import AutoPercentyProductsDB
from edit.detail1 import modify_detail_page
from edit.tag import edit_tag
from edit.options import modify_option_page
from edit.price import modify_price_page
from edit.title import modify_product_title
from edit.thumbnail import modify_thumb_page
from edit.action_elements import click_element, return_element
from edit.action_elements import click_element, return_element, wait_element
import logging
from edit.product_info import ProductInfo
# 로거 인스턴스 가져오기
logger = logging.getLogger('default_logger')
# def set_product_info():
# '''
# 상품 정보 product_info 딕셔너리 초기화
@ -50,6 +51,31 @@ logger = logging.getLogger('default_logger')
# return product_info
# 함수 이름과 해당하는 작업 함수 매핑
# function_mapping = {
# 'tag_modification': edit_tag,
# 'option_modification': modify_option_page,
# 'detail_page_modification': modify_detail_page,
# 'thumbnail_modification': modify_thumb_page,
# 'price_modification': modify_price_page,
# 'title_modification': modify_product_title,
# }
def perform_step(db, step_name, action, current_user, *args):
"""
단계별 작업을 수행하는 함수.
"""
product_info = args[1]
try:
action(*args) # action 함수에 product_info 및 *args를 전달합니다.
db.update_process_step(product_info.id, step_name, current_user)
logger.debug(f"Step {step_name} completed for product {product_info.id}.")
except Exception as e:
logger.error(f"Error performing step {step_name} for product {product_info.id}: {e}")
def load_json_naver_codes(filename):
codes = []
with open(filename, "r", encoding='utf-8') as file:
@ -67,16 +93,33 @@ def wait_for_javascript(driver, timeout=20):
except TimeoutException:
logger.debug("페이지 로딩 실패: 시간 초과")
def modify_products(driver, gemini, mongo_config, set_num_modify):
def modify_products(driver, gemini, mongo_config, login_info, set_num_modify):
# product_info = set_product_info() # 상품정보리스트 생성
autoPercentyProductsDB = AutoPercentyProductsDB(mongo_config)
try:
if login_info['per_mode']:
current_user = login_info['per_email']
else:
current_user = login_info['per_em_email']
logger.debug(f"현재 작업중인 사용자 지정 : {current_user}")
except Exception as e:
logger.debug(f"현재 작업중인 사용자 지정 중 에러발생 : {e}")
# MongoDB에 연결
try:
client = mongo_config.client
# self.login_db = self.mongoConfig.get_db() # mongo_config 인스턴스를 통해 데이터베이스 객체를 가져옴
db = client['taobao_project']
# products_collection = db['AutoPercenty_Produtcs']
delv_collection = db['delv_fee']
except Exception as e:
logger.debug(f"DB연결 중 에러발생 : {e}")
set_num_modify = int(set_num_modify)
logger.debug(f"수정대상 상품 수 설정 : {set_num_modify}")
# 한 번만 호출하여 메모리에 로드
json_naver_codes = load_json_naver_codes("Percenty_SS_code.json")
@ -120,22 +163,52 @@ def modify_products(driver, gemini, mongo_config, set_num_modify):
product_infos.append(ProductInfo())
# 1번 상품과 나머지 상품들에 대한 XPATH 처리
steps_and_actions = [
('tag_modification', edit_tag, (driver, product_infos[i-1])),
('thumbnail_modification', modify_thumb_page, (driver, product_infos[i-1])),
('option_modification', modify_option_page, (driver, product_infos[i-1])),
('detail_page_modification', modify_detail_page, (driver, product_infos[i-1], gemini, delv_collection, json_naver_codes)),
('price_modification', modify_price_page, (driver, product_infos[i-1])),
# 'title_modification' 단계와 관련된 함수 호출은 여기에 추가합니다.
]
# 상품ID 복사
try:
product_id_element = return_element(driver,"XPATH",f"//div[{i}]/li/div/div/div[2]/div/div/div/div[3]/div[3]/span[2]/span",10)
product_id = product_id_element.get_attribute('innerText')
product_id_set = f"ID:{product_id}"
product_infos[i-1].id = product_id_set
logger.debug(f"상품ID : {product_id_set}")
# product_id = f"ID:{product_id}"
product_infos[i-1].id = product_id
logger.debug(f"상품ID : {product_id}")
except Exception as e:
logger.debug(f"상품ID 복사 중 오류 발생: 요소를 찾을 수 없습니다. : {e}")
# 상품ID 처리 여부 판단
if is_product_processed(product_id_set):
logger.debug(f"상품 {product_id_set}는 이미 처리됨.")
# close_button = driver.find_element(By.CSS_SELECTOR, ".anticon-close path")
# close_button.click()
continue
incomplete_steps = autoPercentyProductsDB.is_step_incomplete(product_id)
if incomplete_steps is True:
pass
elif isinstance(incomplete_steps, list) and len(incomplete_steps) < 6:
# incomplete_steps가 리스트 타입이고, 길이가 6 미만인 경우에만 미완성된 단계들에 대해 처리합니다.
logger.debug(f"총 [{len(incomplete_steps)}]개의 미완성 목록을 발견")
logger.debug(f"미완성 스텝 목록 : {incomplete_steps}")
# 완료되지 않은 단계에 해당하는 작업을 수행합니다.
click_to_product_title_for_modify_xpath = f"//div[{i}]/li/div/div/div[2]/div/div/div/div"
click_element(driver, 'XPATH', click_to_product_title_for_modify_xpath, 10, 'js')
logger.debug("상품 수정 페이지 클릭")
time.sleep(2)
for step_name, action, args in steps_and_actions:
if step_name in incomplete_steps:
perform_step(autoPercentyProductsDB, step_name, action, current_user, *args)
autoPercentyProductsDB.finalize_product_processing(product_id, current_user, product_infos[i-1])
logger.info(f"상품 ID {product_id}의 미완성 목록을 완성했습니다.")
pass
else:
# 여기에는 incomplete_steps가 False이거나 리스트의 길이가 6인 경우.
# 이는 상품이 수정된 적 없거나 모든 단계가 미완성인 상태를 의미.
autoPercentyProductsDB.initialize_process_steps(product_id, current_user)
logger.info(f"상품 ID {product_id}를 초기화 하고 상품수정을 시작합니다.")
try:
p_main_title_xpath = f"//div[{i}]/li/div/div/div[2]/div/div/div[1]/div[1]/span[2]"
@ -266,53 +339,62 @@ def modify_products(driver, gemini, mongo_config, set_num_modify):
product_infos[i-1].main_image_url = product_image_url
logger.debug("키워드 수정작업 시작")
edit_tag(driver, product_infos[i-1])
logger.debug("키워드 수정작업 완료")
# 각 단계별로 수정 작업 수행
logger.debug("옵션 수정작업 시작")
modify_option_page(driver, product_infos[i-1])
logger.debug("옵션 수정작업 완료")
for step_name, action, args in steps_and_actions:
perform_step(autoPercentyProductsDB, step_name, action, current_user, *args)
# 상품 수정 작업 수행
logger.debug("상세페이지 수정작업 시작")
modify_detail_page(driver, gemini, product_infos[i-1], delv_collection, json_naver_codes)
logger.debug("상세페이지 수정작업 완료")
# 모든 단계가 완료되었다면, 상품 처리 최종화
autoPercentyProductsDB.finalize_product_processing(product_infos[i-1].id, current_user, product_infos[i-1])
logger.debug("썸네일 수정작업 시작")
modify_thumb_page(driver, product_infos[i-1])
logger.debug("썸네일 수정작업 완료")
# try:
# # 각 단계별 작업 수행
# perform_step(autoPercentyProductsDB, 'tag_modification', product_infos[i-1], current_user, driver)
# perform_step(autoPercentyProductsDB, 'option_modification', product_infos[i-1], current_user, driver)
# perform_step(autoPercentyProductsDB, 'detail_page_modification', product_infos[i-1], current_user, driver, gemini, delv_collection, json_naver_codes)
# perform_step(autoPercentyProductsDB, 'thumbnail_modification', product_infos[i-1], current_user, driver)
# perform_step(autoPercentyProductsDB, 'price_modification', product_infos[i-1], current_user, driver)
# logger.debug("모든 단계의 수정 작업이 완료되었습니다.")
# except Exception as e:
# logger.error(f"수정 작업 중 예외 발생: {e}")
logger.debug("가격 수정작업 시작")
modify_price_page(driver, product_infos[i-1])
logger.debug("가격 수정작업 완료")
# logger.debug("키워드 수정작업 시작")
# edit_tag(driver, product_infos[i-1])
# autoPercentyProductsDB.update_process_step(product_id, step_name="step1")
# logger.debug("키워드 수정작업 완료")
logger.debug("상품명 수정작업 시작")
# modify_product_title(driver, product_infos[i-1])
logger.debug("......상품명 메서드 작성 중......")
logger.debug("상품명 수정작업 완료")
# logger.debug("옵션 수정작업 시작")
# modify_option_page(driver, product_infos[i-1])
# logger.debug("옵션 수정작업 완료")
# # 상품 수정 작업 수행
# logger.debug("상세페이지 수정작업 시작")
# modify_detail_page(driver, gemini, product_infos[i-1], delv_collection, json_naver_codes)
# logger.debug("상세페이지 수정작업 완료")
# logger.debug("썸네일 수정작업 시작")
# modify_thumb_page(driver, product_infos[i-1])
# logger.debug("썸네일 수정작업 완료")
# logger.debug("가격 수정작업 시작")
# modify_price_page(driver, product_infos[i-1])
# logger.debug("가격 수정작업 완료")
# logger.debug("상품명 수정작업 시작")
# # modify_product_title(driver, product_infos[i-1])
# logger.debug("......상품명 메서드 작성 중......")
# logger.debug("상품명 수정작업 완료")
# 상품수정페이지 저장 버튼 클릭
click_element(driver, 'CSS_SELECTOR', '.ant-col:nth-child(9) > .ant-btn', wait_time=10, click_type='normal')
logger.debug("상품 수정 저장 중......")
click_element(driver, 'CSS_SELECTOR', '.ant-col:nth-child(9) > .ant-btn', wait_time=10, click_type='js')
# 상품수정페이지 닫기 버튼 클릭
click_element(driver, 'CSS_SELECTOR', '.anticon-close > svg', wait_time=10, click_type='normal')
time.sleep(1)
ActionChains(driver).send_keys(Keys.ESCAPE).perform()
# try:
# # 'close' 버튼이 나타날 때까지 최대 10초간 대기
# close_button = WebDriverWait(driver, 10).until(
# EC.visibility_of_element_located((By.CSS_SELECTOR, ".anticon-close > svg"))
# )
# # 'close' 버튼이 나타나면 클릭
# close_button.click()
# logger.debug("Close button was clicked.")
# except TimeoutException:
# # 지정된 시간 내에 'close' 버튼이 나타나지 않으면
# logger.debug("Close button was not found within the given time.")
mark_product_processed(product_id)
logger.debug(f"상품 {product_id} 처리 완료.")
completed_products += 1

0
product.db Normal file
View File