IMG_Worker/modules/client/details.py

1432 lines
73 KiB
Python

import numpy as np
import asyncio, time, math
# import pywinauto
import os
import logging
import random
import glob
import markdown
from bs4 import BeautifulSoup
import re
import sqlite3
import json
from datetime import datetime, timedelta
class DetailHandler:
def __init__(self, locator_manager, browser_controller, imageProcessor, detail_text_widget, TEMP_IMAGE_DIR, logger, gpt_client, update_detail_progress_signal, set_progress_visible_signal, toggle_states):
self.update_detail_progress_signal = update_detail_progress_signal
self.set_progress_visible_signal = set_progress_visible_signal
self.TEMP_IMAGE_DIR = TEMP_IMAGE_DIR
self.locator_manager = locator_manager
self.browser_controller = browser_controller
self.page = self.browser_controller.page
# self.clipboardImageManager = clipboardImageManager
self.detail_text_widget = detail_text_widget
self.logger = logger
self.imageProcessor = imageProcessor
self.toggle_states = toggle_states
self.gpt_client = gpt_client
# self.whale_translator = whale_translator
# 선택자 로드
self.source_button_locator = self.locator_manager.get_locator('DetailLocators', 'source_button_locator')
self.ck_source_editing_area_locator = self.locator_manager.get_locator('DetailLocators', 'ck_source_editing_area_locator')
self.cke_text_editing_area_locator = self.locator_manager.get_locator('DetailLocators', 'cke_text_editing_area_locator')
self.cke_img_file_input_locator = self.locator_manager.get_locator('DetailLocators', 'cke_img_file_input_locator')
self.oneclick_trans_img_btn_locator = self.locator_manager.get_locator('DetailLocators', 'oneclick_trans_img_btn_locator')
self.editor_modal_locator = self.locator_manager.get_locator('DetailLocators', 'editor_modal_locator')
self.save_btn_locator = self.locator_manager.get_locator('DetailLocators', 'save_btn_locator')
self.dialog_save_btn_locator = self.locator_manager.get_locator('DetailLocators', 'dialog_save_btn_locator')
self.detail_img_trans_btn_locator = self.locator_manager.get_locator('DetailLocators', 'detail_img_trans_btn_locator')
self.detail_img_trans_dialog_locator = self.locator_manager.get_locator('DetailLocators', 'detail_img_trans_dialog_locator')
self.detail_img_trans_dialog_closeBTN_locator = self.locator_manager.get_locator('DetailLocators', 'detail_img_trans_dialog_closeBTN_locator')
self.detail_img_trans_editBTN_locator = self.locator_manager.get_locator('DetailLocators', 'detail_img_trans_editBTN_locator')
self.detail_images_locator = self.locator_manager.get_locator('DetailLocators', 'detail_images_locator')
def update_page(self, page1, toggle_states, imageProcessor):
self.page = page1
self.toggle_states = toggle_states
self.imageProcessor = imageProcessor
self.logger.log(f"page객체 업데이트 : {page1}", level=logging.DEBUG)
def reset_state(self):
"""상세페이지 핸들러의 상태를 초기화합니다. (detail 관련 임시파일만 정리)"""
self.logger.log("DetailHandler 상태 초기화 - detail 관련 임시파일 정리", level=logging.DEBUG)
try:
# detail 관련 임시 디렉토리의 파일들만 삭제 (더 포괄적인 패턴 사용)
patterns = [
"detail_*.png",
"detail_*.jpg",
"detail_*.webp",
"translated_detail_*"
]
all_detail_files = []
for pattern in patterns:
detail_pattern = os.path.join(self.TEMP_IMAGE_DIR, pattern)
all_detail_files.extend(glob.glob(detail_pattern))
if all_detail_files:
for file_path in all_detail_files:
try:
os.remove(file_path)
self.logger.log(f"detail 임시 파일 삭제: {file_path}", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"detail 임시 파일 삭제 실패: {file_path}, 오류: {e}", level=logging.WARNING)
self.logger.log(f"{len(all_detail_files)}개의 detail 임시 파일 정리 완료", level=logging.INFO)
else:
self.logger.log("정리할 detail 임시 파일이 없습니다.", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"detail 임시 파일 처리 중 오류 발생: {e}", level=logging.ERROR)
async def get_ckeditor_data(self):
"""CKEditor에서 HTML 데이터를 가져옵니다."""
try:
# # 소스 버튼 locator
# source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
# # 소스 편집 영역 locator
# ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
# 소스 모드로 전환
source_button = await self.page.wait_for_selector(self.source_button_locator, timeout=8000)
await source_button.click()
self.logger.log("소스 버튼 클릭 완료.", level=logging.DEBUG)
# 데이터 가져오기
textarea = await self.page.wait_for_selector(self.ck_source_editing_area_locator, timeout=5000)
html_data = await textarea.get_attribute("data-value")
# 다시 에디터 모드로 전환
await self.page.click(self.source_button_locator)
self.logger.log("소스 버튼 재클릭 완료.", level=logging.DEBUG)
return html_data
except Exception as e:
self.logger.log(f"CKEditor 데이터 가져오기 실패: {e}", level=logging.ERROR)
return ""
async def clear_ckeditor_data(self):
"""CKEditor의 내용을 지웁니다."""
try:
# # 소스 버튼 locator
# source_button_locator = self.locator_manager.get_locator('BrowserControl', 'source_button_locator')
# # 소스 편집 영역 locator
# ck_source_editing_area_locator = self.locator_manager.get_locator('BrowserControl', 'ck_source_editing_area_locator')
# 소스 모드로 전환
await self.page.click(self.source_button_locator)
self.logger.log("소스 버튼 클릭 완료.", level=logging.DEBUG)
# 데이터 지우기
await self.page.evaluate(f'document.querySelector("{self.ck_source_editing_area_locator}").setAttribute("data-value", "")')
self.logger.log("CKEditor 데이터 지우기 완료.", level=logging.DEBUG)
# 다시 에디터 모드로 전환
await self.page.click(self.source_button_locator)
self.logger.log("소스 버튼 재클릭 완료.", level=logging.DEBUG)
return True
except Exception as e:
self.logger.log(f"CKEditor 데이터 지우기 실패: {e}", level=logging.ERROR)
return False
def is_valid_html(self, html_data: str) -> bool:
"""
HTML 데이터가 정상적인지 확인하는 메서드
조건: <img> 태그가 포함되어 있고 다수의 이미지 URL이 존재해야 함
"""
try:
# <img> 태그 내의 src 속성 값을 추출
img_urls = re.findall(r'<img[^>]+src=["\'](.*?)["\']', html_data)
# 최소 이미지 URL 개수를 기준으로 판단 (예: 1개 이상)
if len(img_urls) >= 1:
self.logger.log(f"HTML에 포함된 이미지 URL: {img_urls}", level=logging.DEBUG)
return True
else:
self.logger.log(f"HTML에 포함된 이미지 URL이 부족합니다. ({len(img_urls)}개)", level=logging.WARNING)
return False
except Exception as e:
self.logger.log(f"HTML 유효성 검사 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return False
def fetch_image_urls(self, html_content):
"""
HTML 콘텐츠에서 모든 <img> 태그의 URL을 추출하는 함수.
<figure> 안의 <img> 태그와 독립된 <img> 태그 모두 처리하며,
지정된 도메인으로 시작하는 이미지는 제외한다.
"""
soup = BeautifulSoup(html_content, "html.parser")
# 제외할 URL 접두사 모음 (필요 시 여기서만 손보면 됨)
EXCLUDE_PREFIXES: tuple[str, ...] = (
"https://assets.alicdn.com",
"https://gtms01.alicdn.com",
)
image_urls: list[str] = [] # 최종 반환 목록
excluded_urls: list[str] = [] # 필터링된 URL 목록 (디버깅용)
# 모든 <img> 태그를 순회
for img in soup.find_all("img"):
if "src" not in img.attrs:
continue # src 없는 태그는 건너뜀
image_url: str = img["src"]
# 특정 도메인 시작이면 제외
if image_url.startswith(EXCLUDE_PREFIXES):
excluded_urls.append(image_url)
continue
image_urls.append(image_url)
# 디버깅 로그
self.logger.log(
f"fetch_image_urls ▶ 추출 이미지 URL 수 : {len(image_urls)}", level=logging.DEBUG
)
self.logger.log(
f"fetch_image_urls ▶ 추출 이미지 URL 목록 : {image_urls}", level=logging.DEBUG
)
self.logger.log(
f"fetch_image_urls ▶ 제외된 이미지 URL 수 : {len(excluded_urls)}", level=logging.DEBUG
)
self.logger.log(
f"fetch_image_urls ▶ 제외된 이미지 URL 목록 : {excluded_urls}",
level=logging.DEBUG,
)
return image_urls
async def upload_image(self, img_path):
"""
이미지를 CKEditor에 업로드합니다.
Args:
img_path (str): 업로드할 이미지 파일 경로
Returns:
bool: 업로드 성공 여부
"""
try:
# (1) 업로드 시도 전에 파일 경로가 로컬에 존재하는지 확인
if (not img_path) or (not os.path.isfile(img_path)):
self.logger.log(f"유효하지 않은 이미지 경로로 업로드를 건너뜀: {img_path}", level=logging.WARNING)
return False
# 업로드 이전 이미지 리스트 저장
existing_images = await self.page.evaluate("""
() => Array.from(document.querySelectorAll('img')).map(img => img.src)
""")
# 1. 파일 업로드
file_input = self.page.locator(self.cke_img_file_input_locator)
await file_input.set_input_files(img_path)
self.logger.log(f"이미지가 성공적으로 업로드되었습니다: {img_path}", level=logging.INFO)
# 2. 커서 이동
await self.page.keyboard.press("ArrowRight")
await self.page.keyboard.press("ArrowRight")
# 업로드된 이미지 URL 확인
current_images = await self.page.evaluate("""
() => Array.from(document.querySelectorAll('img')).map(img => img.src)
""")
new_images = list(set(current_images) - set(existing_images))
if new_images:
# self.logger.log(f"업로드된 이미지 확인: {new_images[0]}", level=logging.DEBUG)
return True
else:
self.logger.log("업로드된 이미지를 찾을 수 없습니다.", level=logging.WARNING)
return False
except Exception as e:
self.logger.log(f"이미지 업로드 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
return False
async def input_option_list(self, option_data, input_field):
lines = ["# 옵션 목록"]
for i, key in enumerate(option_data.keys(), 1):
lines.append(f"- {i}. {key}")
lines += ["", "### 나열된 옵션목록 이외의 옵션이 필요하실 경우 고객센터로 연락주세요.", "---", ""]
await input_field.fill("\n".join(lines))
async def input_detail_text(self, optionHandler, input_field):
"""
상세페이지에 소개글과 옵션 데이터를 입력하는 메서드
"""
try:
if not input_field:
self.logger.log("텍스트 입력 필드를 찾을 수 없습니다.", level=logging.ERROR)
return
# --------------------------------------------------
# 1. VIP 등록모드 랜덤 소개 문구 (맨 윗줄)
# --------------------------------------------------
vip_intro_block = ""
if (self.toggle_states.get('ed_mode', False) and
self.toggle_states.get('vip_detail_edit', False)):
try:
random_intro = self._get_random_biz_intro()
vip_intro_block = f"{random_intro}\n\n---\n"
self.logger.log("VIP 등록모드: 랜덤 소개 문구 추가됨", level=logging.INFO)
except Exception as e:
self.logger.log(f"VIP 랜덤 소개 문구 생성 실패: {e}", level=logging.WARNING)
# --------------------------------------------------
# 2. 소개글 (기존 유지)
# --------------------------------------------------
leading_lines = [t for t in self.detail_text_widget.get_lines() if t]
leading_block = "\n".join(leading_lines)
# --------------------------------------------------
# 2. 옵션 목록 (범용적인 고급 마크다운)
# --------------------------------------------------
option_data = optionHandler.get_all_translated_options()
is_single = optionHandler.option_info.get("is_single_option", True)
options_block = ""
if option_data and (len(option_data) > 1 or not is_single):
lines = [
"",
"### 🚀 **구매대행 특별 혜택**",
"",
"> 🎯 **전문 구매대행 서비스**",
"- **해외직구 전문가**가 직접 상품 선별 및 구매배송",
"**철저한 검수** 및 **꼼꼼한 포장**",
"**빠른 배송** 및 **고객 맞춤형 서비스**",
"",
"💬 **문의 및 주문 안내**",
"나열된 옵션목록 이외의 옵션이 필요하시거나, ",
"**특별 주문**이 필요하시면 언제든 고객센터로 연락주세요!",
"⚡ **빠른 응답**으로 고객님의 만족을 최우선으로 합니다.",
"",
"",
"---",
"### 📋 **옵션 목록**",
"",
""
]
# 이미 넘버링이 적용된 옵션명을 일관된 이모지와 함께 사용 (마크다운 자동 넘버링 방지)
# 상품별로 하나의 이모지를 선택하여 모든 옵션에 동일하게 사용
selected_emoji = self._get_option_emoji(0, style="random")
for i, option in enumerate(option_data.keys()):
lines.append(f"{selected_emoji} **{option}**")
lines.extend([
"",
"---",
"",
""
])
options_block = "\n".join(lines)
# --------------------------------------------------
# 3. 과거 형식 옵션 목록 자동 정리 (선택적)
# --------------------------------------------------
# 기존 내용에서 과거 형식 옵션 목록이 있다면 정리 시도
if leading_block and option_data:
cleaned_content = self._replace_or_append_options_section(leading_block, options_block, option_data)
if cleaned_content and cleaned_content != leading_block:
leading_block = cleaned_content
self.logger.log("기존 소개글 내의 과거 옵션 목록을 정리했습니다.", level=logging.INFO)
# --------------------------------------------------
# 4. 최종 조합 & 입력
# --------------------------------------------------
# VIP 소개 문구, 기존 소개글, 옵션 목록 순서로 조합
text_blocks = []
if vip_intro_block:
text_blocks.append(vip_intro_block.strip())
if leading_block:
text_blocks.append(leading_block)
if options_block:
text_blocks.append(options_block)
bulk_text = "\n".join(text_blocks).strip()
try:
await input_field.click()
await input_field.fill("") # 내용 초기화
await input_field.press("Enter")
# 청크 단위 입력으로 변경
await self.input_detail_text_chunked(input_field, bulk_text)
except Exception as e:
self.logger.log(f"CKEditor HTML 입력 실패: {e}", level=logging.ERROR, exc_info=True)
self.logger.log("상세페이지 텍스트(소개 + 옵션) 일괄 입력 완료", level=logging.INFO)
except Exception as e:
self.logger.log(f"상세페이지 텍스트 입력 중 오류 발생: {e}", level=logging.ERROR, exc_info=True)
async def input_detail_text_chunked(self, input_field, bulk_text):
"""청크 단위로 텍스트를 입력하는 메서드"""
# 긴 텍스트를 청크로 나누어 입력
chunk_size = 1000 # 한 번에 입력할 문자 수
chunks = [bulk_text[i:i+chunk_size] for i in range(0, len(bulk_text), chunk_size)]
for i, chunk in enumerate(chunks):
try:
await input_field.type(chunk, delay=1, timeout=60000)
self.logger.log(f"청크 {i+1}/{len(chunks)} 입력 완료", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"청크 {i+1} 입력 실패: {e}", level=logging.ERROR)
raise
async def process_detail(self, optionHandler, toggle_states):
"""상세페이지 번역을 처리합니다.
Args:
optionHandler: 옵션 핸들러 객체
Returns:
bool: 처리 성공 여부
"""
self.logger.log("상세페이지 처리를 시작합니다.", level=logging.INFO)
# 이전 상품의 detail 관련 임시파일들 정리
self.reset_state()
# 사용값 설정
self.interval = toggle_states.get('interval', 3.0)
self.watingTime = toggle_states.get('watingTime', 20)
self.detail_Option = toggle_states.get('detail_Option', False)
self.detail_IMGTrans = toggle_states.get('detail_IMGTrans', False)
self.detail_IMGTrans_type = toggle_states.get('detail_IMGTrans_type', True)
self.unwanted_words = toggle_states.get('unwanted_words', False)
self.logger.log(f"self.toggle_states : {self.toggle_states}", level=logging.DEBUG)
try:
# 현재 HTML 가져오기
current_html = await self.get_ckeditor_data()
if not current_html or not self.is_valid_html(current_html):
self.logger.log("유효한 HTML을 가져오지 못했습니다.", level=logging.WARNING)
return False
# 이미지 URL 추출
image_urls = self.fetch_image_urls(current_html)
original_image_count = len(image_urls)
self.logger.log(f"추출된 이미지 URL 수: {original_image_count}", level=logging.INFO)
if not image_urls:
self.logger.log("변환할 이미지가 없습니다.", level=logging.INFO)
return False
# 텍스트 입력 필드 선택
input_field = self.page.locator(self.cke_text_editing_area_locator)
# 상세페이지 옵션에 따라 처리
self.logger.log(f"self.detail_Option : {self.detail_Option}", level=logging.DEBUG)
self.logger.log(f"self.detail_IMGTrans : {self.detail_IMGTrans}", level=logging.DEBUG)
if (self.detail_Option or self.detail_IMGTrans) and (self.browser_controller.image_worker_mgr is not None):
# if self.detail_Option:
# self.logger.log("소개글 입력 시작...", level=logging.INFO)
# await self.input_detail_text(optionHandler, input_field)
if self.detail_IMGTrans:
self.logger.log("이미지 번역 시작...", level=logging.INFO)
self.set_progress_visible_signal.emit(True)
self.logger.log(f"이미지 번역 엔진 : {self.detail_IMGTrans_type}", level=logging.DEBUG)
# if (self.detail_IMGTrans_type in ('CPU', '자체서버')) and (self.imageProcessor is not None):
if self.detail_IMGTrans_type and (self.browser_controller.image_worker_mgr is not None):
# CKEditor 내용 지우기
await self.clear_ckeditor_data()
# 텍스트 입력 필드 선택
await input_field.click()
self.logger.log("입력 필드 선택", level=logging.DEBUG)
# 소개글과 옵션데이터 입력
if self.detail_Option:
await self.input_detail_text(optionHandler, input_field)
# 이미지 처리 통계
success_count = 0
excluded_count = 0
error_count = 0
translated_count = 0
# 임시 파일 정리를 위한 리스트
processed_files = []
# 모든 이미지 처리
self.logger.log("이미지 처리 및 업로드 시작...", level=logging.INFO)
# OCR 데이터 수집을 위한 리스트 (상품 요약 생성용)
all_ocr_data = []
# 번역 파라미터 준비
font_type = self.browser_controller.toggle_states.get('font_type', '')
unwanted_texts = list(self.browser_controller.unwanted_words.keys()) if hasattr(self.browser_controller, 'unwanted_words') else []
is_member_valid = self.browser_controller.user_membership_level == "premium"
authenticated_by_admin = self.browser_controller.authenticated_by_admin
for idx, image_url in enumerate(image_urls):
try:
# detail 구분자를 포함한 임시파일명으로 처리
try:
final_image_result = await self.browser_controller.image_processor.process_image_url(
original_image_url=image_url, index=idx, file_prefix="detail",
font_type=font_type,
unwanted_texts=unwanted_texts,
is_member_valid=is_member_valid,
authenticated_by_admin=authenticated_by_admin
)
except Exception:
raise
# 서버가 반환한 child_results(분할 조각) 처리
child_results = (final_image_result.get('child_results')
if isinstance(final_image_result, dict) else None) or []
if child_results:
for split_idx, split_result in enumerate(child_results, 1):
split_status = split_result.get('status')
split_path = split_result.get('path')
if split_status == 'translated' and split_path:
translated_count += 1
split_upload_success = await self.upload_image(split_path)
if split_upload_success:
success_count += 1
self.logger.log(f"분할 이미지 {idx+1}-{split_idx+1} 업로드 성공", level=logging.INFO)
else:
error_count += 1
if split_path:
processed_files.append(split_path)
else:
# 구버전 서버 호환: 내부 분할 목록이 있을 경우 기존 보조 경로 사용
split_results_fallback = await self.process_split_images_if_any(image_url, idx)
if split_results_fallback:
for split_idx, split_result in enumerate(split_results_fallback, 1):
split_status = split_result.get('status')
split_path = split_result.get('path')
if split_status == 'translated' and split_path:
translated_count += 1
split_upload_success = await self.upload_image(split_path)
if split_upload_success:
success_count += 1
self.logger.log(f"분할 이미지 {idx+1}-{split_idx+1} 업로드 성공", level=logging.INFO)
else:
error_count += 1
if split_path:
processed_files.append(split_path)
status = final_image_result.get('status')
translated_image_path = final_image_result.get('path')
# 🔧 개선: 모든 경우에 생성된 파일을 정리 목록에 추가 (한 번만)
if translated_image_path and os.path.exists(translated_image_path):
processed_files.append(translated_image_path)
self.logger.log(f"이미지 {idx+1} 처리 결과 파일 정리 예약: {translated_image_path}", level=logging.DEBUG)
if status == 'translated':
translated_count += 1
upload_success = await self.upload_image(translated_image_path)
elif status in ('failed', 'original'):
# 실패/원본인 경우 로컬 파일이 실제 존재할 때만 업로드 시도
if translated_image_path and os.path.isfile(translated_image_path):
upload_success = await self.upload_image(translated_image_path)
else:
upload_success = False
self.logger.log(f"이미지 {idx+1} 실패/원본 처리: 로컬 파일이 존재하지 않아 업로드를 건너뜀 → {translated_image_path}", level=logging.WARNING)
elif status == 'exclude':
excluded_count += 1
self.logger.log(f"이미지 {idx+1} 제외됨: {image_url}", level=logging.INFO)
# 제외된 경우에는 업로드하지 않음
continue
elif isinstance(final_image_result, str) and final_image_result:
# (이전 방식과의 호환성)
upload_success = await self.upload_image(final_image_result)
else:
# 예상하지 못한 반환값
error_count += 1
self.logger.log(f"이미지 {idx+1} 처리 결과 예상하지 못한 값: {final_image_result}", level=logging.WARNING)
continue
# 인페인팅 방식/장치 집계
try:
if isinstance(final_image_result, dict):
used = final_image_result.get('inpaint_method')
dev = (final_image_result.get('inpaint_device') or '').lower()
if hasattr(self.browser_controller, 'add_image_edit_stats'):
if used == 'cv':
self.browser_controller.add_image_edit_stats(inpaint_cv=1)
elif used == 'migan':
self.browser_controller.add_image_edit_stats(inpaint_migan=1)
elif used == 'request':
self.browser_controller.add_image_edit_stats(inpaint_request=1)
if dev == 'cpu':
self.browser_controller.add_image_edit_stats(inpaint_device_cpu=1)
elif dev == 'cuda':
self.browser_controller.add_image_edit_stats(inpaint_device_cuda=1)
elif dev == 'directml':
self.browser_controller.add_image_edit_stats(inpaint_device_directml=1)
elif dev == 'server':
self.browser_controller.add_image_edit_stats(inpaint_device_server=1)
except Exception:
pass
# 성공/실패 카운트 및 로그 처리
if upload_success:
success_count += 1
self.logger.log(f"이미지 {idx+1} 업로드 성공: {translated_image_path}", level=logging.INFO)
else:
error_count += 1
self.logger.log(f"이미지 {idx+1} 업로드 실패: {translated_image_path}", level=logging.WARNING)
except Exception as e:
error_count += 1
self.logger.log(f"이미지 {idx+1} 처리 중 오류: {e}", level=logging.ERROR)
# 프로그레스 업데이트
self.update_detail_progress_signal.emit(idx+1, len(image_urls))
self.logger.log(f"이미지 처리 진행률: {idx+1}/{len(image_urls)}", level=logging.INFO)
# 모든 이미지 처리 완료 후 OCR 데이터 수집
store_ocr_to_db = self.toggle_states.get('store_ocr_data_to_db', False)
if store_ocr_to_db:
all_ocr_data = await self.collect_ocr_data_from_db()
else:
# 메모리에서 OCR 데이터 수집 (기본 방식)
if self.imageProcessor and hasattr(self.imageProcessor, 'get_session_ocr_data'):
all_ocr_data = self.imageProcessor.get_session_ocr_data()
self.logger.log(f"세션 OCR 데이터 수집 완료: {len(all_ocr_data) if all_ocr_data else 0}", level=logging.DEBUG)
else:
self.logger.log("⚠️ ImageProcessor가 초기화되지 않아 OCR 데이터 수집 건너뜀", level=logging.WARNING)
all_ocr_data = []
# 처리 결과 로깅
self.logger.log(f"이미지 처리 완료 - 성공: {success_count}, 제외: {excluded_count}, 오류: {error_count}, 총: {len(image_urls)}", level=logging.INFO)
# 모든 이미지 처리 완료 후 임시 파일 배치 정리
if processed_files:
self.logger.log(f"임시 파일 배치 정리 시작: {len(processed_files)}개 파일", level=logging.INFO)
deleted_count = 0
for file_path in processed_files:
try:
if file_path and os.path.exists(file_path):
os.remove(file_path)
deleted_count += 1
self.logger.log(f"임시 파일 삭제: {file_path}", level=logging.DEBUG)
except Exception as e:
self.logger.log(f"파일 삭제 실패 (무시): {file_path} - {e}", level=logging.DEBUG)
self.logger.log(f"임시 파일 정리 완료: {deleted_count}/{len(processed_files)}개 삭제", level=logging.INFO)
# 번역 완료 후 프로그레스바 숨김
self.set_progress_visible_signal.emit(False)
# OCR 데이터 수집 완료 로깅
if all_ocr_data:
self.logger.log(f"OCR 데이터 수집 완료: {len(all_ocr_data)}개 이미지에서 데이터 추출", level=logging.INFO)
else:
self.logger.log("OCR 데이터가 수집되지 않았습니다.", level=logging.WARNING)
# 세션 OCR 데이터 정리 (메모리 모드인 경우)
if self.imageProcessor and hasattr(self.imageProcessor, 'clear_session_ocr_data'):
self.imageProcessor.clear_session_ocr_data()
self.logger.log("세션 OCR 데이터 정리 완료", level=logging.DEBUG)
# 상세이미지 통계 요약 및 집계 반영
try:
self.logger.log(
f"📊 상세페이지 이미지 통계: 총={original_image_count}, 번역됨={translated_count}, 제외={excluded_count}, 오류={error_count}",
level=logging.INFO,
)
if hasattr(self.browser_controller, 'add_image_edit_stats'):
self.browser_controller.add_image_edit_stats(
detail_total=original_image_count,
detail_translated=translated_count,
)
except Exception:
pass
# 추가 줄바꿈 입력
await self.page.keyboard.press("ArrowRight")
await self.page.keyboard.press("Enter")
await self.page.keyboard.press("Enter")
await self.page.keyboard.press("Enter")
else:
# 상세페이지 옵션이 비활성화된 경우
self.logger.log("상세페이지 옵션이 비활성화되어 있습니다.", level=logging.INFO)
return False
except Exception as e:
self.logger.log(f"process_detail 중 오류 발생: {str(e)}", level=logging.ERROR)
self.set_progress_visible_signal.emit(False)
return False
# async def update_detail_for_ed_mode(self, optionHandler):
# """
# ED 모드에서 옵션명이 변경되었을 때 상세페이지의 옵션 목록만 업데이트하는 메서드
# """
# try:
# self.logger.log("ED 모드: 상세페이지 옵션 목록 업데이트 시작", level=logging.INFO)
# # 텍스트 입력 필드 선택
# input_field = self.page.locator(self.cke_text_editing_area_locator)
# # 기존 내용 가져오기
# current_content = await self.get_ckeditor_data()
# if not current_content:
# self.logger.log("기존 상세페이지 내용을 가져올 수 없습니다.", level=logging.WARNING)
# return False
# # 옵션 목록 부분만 새로 생성
# option_data = optionHandler.get_all_translated_options()
# if not option_data:
# self.logger.log("업데이트할 옵션 데이터가 없습니다.", level=logging.WARNING)
# return False
# # 새로운 옵션 목록 섹션 생성
# new_options_section = self._generate_options_section_for_ed_mode(option_data)
# # 기존 옵션 목록 섹션을 찾아서 교체 시도
# updated_content = self._replace_or_append_options_section(current_content, new_options_section, option_data)
# if updated_content is None:
# # 패턴이 없으면 기존 내용을 건드리지 않음
# self.logger.log("ED 모드: 기존 옵션 목록 패턴이 없어 상세페이지를 업데이트하지 않습니다.", level=logging.INFO)
# return False
# # 패턴이 있으면 해당 부분만 교체된 내용으로 CKEditor 업데이트
# await input_field.click()
# await input_field.fill("") # 전체 내용을 새로운 내용으로 교체
# await self.input_detail_text_chunked(input_field, updated_content)
# self.logger.log("ED 모드: 상세페이지 옵션 목록 업데이트 완료", level=logging.INFO)
# return True
# except Exception as e:
# self.logger.log(f"ED 모드 상세페이지 업데이트 중 오류: {e}", level=logging.ERROR, exc_info=True)
# return False
def _generate_options_section_for_ed_mode(self, option_data, clean_legacy=False):
"""ED 모드용 옵션 섹션 생성"""
lines = [
"",
"---",
"### 📋 **옵션 목록** ",
"",
""
]
if clean_legacy:
# 과거 형식 정리 시에는 번호 없이 깔끔하게
lines.append("")
for i, option in enumerate(option_data.keys()):
# 번호 제거한 깔끔한 형태
clean_option = self._remove_numbering_from_option(option)
emoji = self._get_option_emoji(i, style="geometric") # 정리된 옵션은 일관된 스타일
lines.append(f"{emoji} **{clean_option}**")
else:
# 일반적인 경우: 이미 넘버링이 적용된 옵션명을 다양한 이모지와 함께 사용
for i, option in enumerate(option_data.keys()):
emoji = self._get_option_emoji(i, style="geometric") # ED 모드는 기하학적 스타일
lines.append(f"{emoji} **{option}**")
lines.extend([
"",
"---",
"",
""
])
return "\n".join(lines)
def _remove_numbering_from_option(self, option_text):
"""옵션명에서 넘버링을 제거하고 깔끔한 텍스트만 반환"""
# 앞쪽 넘버링 패턴들 제거
patterns = [
r'^[A-Z]\.\s*', # A. B. C.
r'^[a-z]\.\s*', # a. b. c.
r'^[ㄱ-ㅎ]\.\s*', # ㄱ. ㄴ. ㄷ.
r'^[가-하]\.\s*', # 가. 나. 다.
r'^\d+\.\s*', # 1. 2. 3.
r'^[IVX]+\.\s*', # I. II. III.
r'^[①②③④⑤⑥⑦⑧⑨⑩⑪⑫⑬⑭⑮⑯⑰⑱⑲⑳]\s*', # 동그라미 숫자
r'^\$\d+\.\s*', # $1. $2.
r'^\(\d+\)\s*', # (1) (2)
r'^\[\d+\]\s*', # [1] [2]
r'^\d+\)\s*', # 1) 2)
]
clean_text = option_text
for pattern in patterns:
clean_text = re.sub(pattern, '', clean_text)
return clean_text.strip()
def _get_option_emoji(self, index=0, style="geometric"):
"""옵션 목록용 이모지를 반환합니다"""
emoji_sets = {
"check": ["", "☑️", "✔️", "🗸"], # 체크 계열
"geometric": ["🔸", "🔹", "", "", "▪️", "▫️"], # 기하학적 모양
"star": ["", "🌟", "", "💫", "🌠"], # 별 계열
"arrow": ["▶️", "➡️", "👉", "📌", "🔻"], # 화살표/포인터 계열
"colorful": ["🔴", "🟠", "🟡", "🟢", "🔵", "🟣"], # 색상 계열
}
if style == "mixed":
# 일관성을 위해 geometric 스타일로 통일 (상품 내 모든 옵션에 동일한 이모지 사용)
chosen_set = emoji_sets["geometric"]
return chosen_set[0] # 항상 첫 번째 이모지 사용
elif style == "random":
# 완전 랜덤 선택 (모든 이모지 중에서 랜덤하게 하나 선택)
all_emojis = []
for emoji_list in emoji_sets.values():
all_emojis.extend(emoji_list)
return random.choice(all_emojis)
elif style == "random_style":
# 랜덤 스타일 선택 후 첫 번째 이모지만 사용
random.seed(42) # 고정 시드로 스타일 선택
chosen_style = random.choice(list(emoji_sets.keys()))
chosen_set = emoji_sets[chosen_style]
return chosen_set[0] # 해당 스타일의 첫 번째 이모지만 사용
elif style in emoji_sets:
# 특정 스타일의 첫 번째 이모지만 사용
chosen_set = emoji_sets[style]
return chosen_set[0] # 항상 첫 번째 이모지 사용
else:
# 기본값 (geometric의 첫 번째 이모지)
return emoji_sets["geometric"][0]
def _replace_or_append_options_section(self, content, new_options_section, current_option_data=None):
"""기존 옵션 섹션을 찾아서 교체. 패턴이 없으면 None 반환 (기존 내용 보존)"""
# 1. 현재 형식의 옵션 목록 패턴 찾기
current_options_pattern = r'---\s*\n### 📋 \*\*옵션 목록\*\*[^\n]*\n.*?\n---'
if re.search(current_options_pattern, content, re.DOTALL):
# 현재 형식 옵션 섹션이 있으면 교체
updated_content = re.sub(current_options_pattern, new_options_section.strip(), content, flags=re.DOTALL)
self.logger.log("현재 형식의 옵션 섹션을 새로운 내용으로 교체했습니다.", level=logging.DEBUG)
return updated_content
# # 2. 과거 형식의 다양한 옵션 목록 패턴들 찾기
# legacy_patterns = [
# r'옵션\s*목록[^\n]*\n(.*?)(?=\n\n|\n---|\n###|$)', # 일반적인 "옵션 목록" 헤딩
# r'###\s*옵션\s*목록[^\n]*\n(.*?)(?=\n\n|\n---|\n###|$)', # ### 옵션 목록
# r'## 옵션\s*목록[^\n]*\n(.*?)(?=\n\n|\n---|\n###|$)', # ## 옵션 목록
# r'# 옵션\s*목록[^\n]*\n(.*?)(?=\n\n|\n---|\n###|$)', # # 옵션 목록
# r'\*\*옵션\s*목록\*\*[^\n]*\n(.*?)(?=\n\n|\n---|\n###|$)', # **옵션 목록**
# ]
# for pattern in legacy_patterns:
# match = re.search(pattern, content, re.DOTALL | re.IGNORECASE)
# if match:
# self.logger.log(f"과거 형식의 옵션 목록 패턴을 발견했습니다: {pattern[:30]}...", level=logging.INFO)
# # 과거 옵션 목록에서 번호 제거 및 정리
# legacy_options_content = match.group(1) if len(match.groups()) > 0 else match.group(0)
# cleaned_options = self._clean_legacy_options(legacy_options_content)
# if cleaned_options:
# # 과거 형식 발견: 현재 옵션 데이터가 있으면 우선 사용, 없으면 과거 옵션 정리
# if current_option_data and len(current_option_data) > 0:
# # 현재 옵션 데이터가 있으면 기존 new_options_section 사용
# updated_content = re.sub(pattern, new_options_section.strip(), content, flags=re.DOTALL | re.IGNORECASE)
# self.logger.log(f"과거 형식 옵션 섹션을 현재 옵션 데이터로 업데이트했습니다.", level=logging.INFO)
# else:
# # 현재 옵션 데이터가 없으면 과거 옵션을 정리해서 사용
# clean_new_section = self._generate_options_section_for_ed_mode({}, clean_legacy=True)
# # 과거 옵션들을 정리해서 새로운 섹션에 추가
# lines = clean_new_section.split('\n')
# insert_index = -4 # "---" 앞에 삽입
# # 다양한 이모지로 옵션 정리 (과거 옵션은 특별한 스타일로)
# for i, cleaned_option in enumerate(cleaned_options):
# emoji = self._get_option_emoji(i, style="geometric") # 과거 옵션 정리는 일관된 스타일
# lines.insert(insert_index, f"{emoji} **{cleaned_option}**")
# insert_index += 1
# final_new_section = '\n'.join(lines)
# updated_content = re.sub(pattern, final_new_section.strip(), content, flags=re.DOTALL | re.IGNORECASE)
# self.logger.log(f"과거 형식의 옵션 섹션을 번호 제거하여 정리했습니다: {len(cleaned_options)}개 옵션", level=logging.INFO)
# return updated_content
# 옵션 섹션이 없으면 기존 내용을 건드리지 않음
self.logger.log("기존 옵션 섹션 패턴을 찾을 수 없어 내용을 변경하지 않습니다.", level=logging.INFO)
return None
def _clean_legacy_options(self, legacy_content):
"""과거 형식의 옵션 목록에서 번호와 특수 기호를 제거하고 정리"""
if not legacy_content or not legacy_content.strip():
return []
# 줄별로 분리
lines = legacy_content.split('\n')
cleaned_options = []
for line in lines:
line = line.strip()
if not line:
continue
# 다양한 번호 패턴 제거
patterns_to_remove = [
r'^\d+\.\s*', # 1. 2. 3. 형태
r'^\d+\)\s*', # 1) 2) 3) 형태
r'^-\d+[\.,]?\s*', # -1, -2. -3 형태
r'^[-*•]\s*\d+[\.,]?\s*', # - 1. * 2. • 3. 형태
r'^[○●◦◉]\s*-?\d+[\.,]?\s*', # ○-1. ●2. ◦3 형태 (동그라미 등)
r'^[①②③④⑤⑥⑦⑧⑨⑩⑪⑫⑬⑭⑮⑯⑰⑱⑲⑳]\s*', # 동그라미 숫자
r'^[가나다라마바사아자차카타파하]\.\s*', # 가. 나. 다. 형태
r'^[ㄱ-ㅎ]\.\s*', # ㄱ. ㄴ. ㄷ. 형태
r'^[A-Z]\.\s*', # A. B. C. 형태
r'^[a-z]\.\s*', # a. b. c. 형태
r'^[IVX]+\.\s*', # I. II. III. 로마숫자 형태
r'^[-*•]\s*', # 단순 리스트 마커
r'^\s*[\-\*\+]\s*', # 마크다운 리스트 마커
r'^\s*>\s*', # 인용 마커
]
cleaned_line = line
for pattern in patterns_to_remove:
cleaned_line = re.sub(pattern, '', cleaned_line, flags=re.IGNORECASE)
# 남은 내용이 있으면 추가
cleaned_line = cleaned_line.strip()
if cleaned_line:
# HTML 태그나 마크다운 볼드 제거
cleaned_line = re.sub(r'<[^>]+>', '', cleaned_line) # HTML 태그 제거
cleaned_line = re.sub(r'\*\*(.*?)\*\*', r'\1', cleaned_line) # **텍스트** → 텍스트
cleaned_line = re.sub(r'\*(.*?)\*', r'\1', cleaned_line) # *텍스트* → 텍스트
cleaned_line = cleaned_line.strip()
if cleaned_line and len(cleaned_line) > 1: # 최소 길이 확인
cleaned_options.append(cleaned_line)
# 중복 제거
unique_options = []
seen = set()
for option in cleaned_options:
if option.lower() not in seen:
unique_options.append(option)
seen.add(option.lower())
self.logger.log(f"과거 옵션 목록 정리 완료: {len(legacy_content.split())}개 라인 → {len(unique_options)}개 옵션", level=logging.DEBUG)
return unique_options
def _get_random_biz_intro(self):
"""사업자 정보에서 랜덤 소개 문구를 가져옵니다 (VIP 등록모드용)"""
try:
from src.limited_contents.bizDBManager import BizDBManager
import random
# bizinfo.db에서 사업자 정보 가져오기
biz_db_path = "user_data/bizinfo.db" # 실제 경로에 맞게 수정 필요
biz_manager = BizDBManager(biz_db_path)
# 선택마켓 중 스마트스토어 마켓의 사업자 정보 가져오기
smartstore_biz = self._get_smartstore_selected_biz(biz_manager)
if not smartstore_biz:
self.logger.log("설정된 선택마켓이 없어 기본 문구를 사용합니다.", level=logging.WARNING)
return self._get_default_biz_intro()
# 선택마켓의 사업자 이름으로 소개 문구 생성
biz_name = smartstore_biz.get('name', '').strip()
if biz_name:
default_intros = [
f"🏪 **{biz_name}**에서 직접 선별한 프리미엄 상품입니다.",
f"✨ **{biz_name}** 전문 큐레이션으로 엄선된 고품질 제품을 만나보세요.",
f"🎯 **{biz_name}**가 추천하는 베스트 아이템으로 특별한 경험을 선사합니다.",
f"💎 **{biz_name}**의 전문성과 신뢰를 바탕으로 한 최고 품질의 상품입니다.",
f"🛒 **{biz_name}**의 엄격한 품질 관리를 통과한 검증된 상품입니다.",
f"🌟 **{biz_name}**에서 고객님을 위해 특별히 준비한 프리미엄 아이템입니다."
]
selected_intro = random.choice(default_intros)
self.logger.log(f"사업자({biz_name}) 기반 소개 문구 생성: {selected_intro}", level=logging.DEBUG)
return selected_intro
else:
return self._get_default_biz_intro()
except Exception as e:
self.logger.log(f"사업자 정보 조회 중 오류: {e}", level=logging.ERROR, exc_info=True)
return self._get_default_biz_intro()
def _get_smartstore_selected_biz(self, biz_manager):
"""선택마켓 중 스마트스토어로 지정된 마켓의 사업자 정보를 가져옵니다"""
try:
# 선택마켓 매핑 정보 가져오기
market_selection = biz_manager.get_market_selection()
# 스마트스토어 관련 마켓 타입들 확인
smartstore_types = ['smartstore', '스마트스토어', 'naver_smartstore', 'naver']
selected_biz_id = None
for market_type, biz_id in market_selection.items():
# 스마트스토어 관련 마켓 타입 찾기
if any(stype in market_type.lower() for stype in ['smartstore', '스마트스토어', 'naver']):
selected_biz_id = biz_id
self.logger.log(f"스마트스토어 선택마켓 발견: {market_type} -> 사업자 ID {biz_id}", level=logging.DEBUG)
break
if not selected_biz_id:
self.logger.log("선택마켓에 스마트스토어가 설정되지 않았습니다.", level=logging.INFO)
return None
# 해당 사업자 정보 조회
conn = biz_manager.conn
cursor = conn.cursor()
cursor.execute("SELECT * FROM biz WHERE id = ?", (selected_biz_id,))
result = cursor.fetchone()
if result:
biz_info = dict(result)
self.logger.log(f"스마트스토어 선택마켓의 사업자 정보: {biz_info.get('name', 'Unknown')}", level=logging.DEBUG)
return biz_info
else:
self.logger.log(f"사업자 ID {selected_biz_id}에 해당하는 사업자 정보를 찾을 수 없습니다.", level=logging.WARNING)
return None
except Exception as e:
self.logger.log(f"스마트스토어 선택마켓 조회 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None
def _get_default_biz_intro(self):
"""기본 소개 문구들 중 랜덤 선택"""
import random
default_intros = [
"🌟 **전문 구매대행 서비스**로 엄선된 고품질 제품을 안전하게 배송해드립니다.",
"💯 **신뢰할 수 있는 품질 보증**과 함께 최상의 쇼핑 경험을 제공합니다.",
"🚀 **빠르고 안전한 배송**으로 고객님의 소중한 시간을 절약해드립니다.",
"🎁 **특별 할인 혜택**과 함께 프리미엄 상품을 합리적인 가격에 만나보세요.",
"⭐ **고객 만족도 1위** 구매대행 서비스의 검증된 품질을 경험해보세요."
]
selected = random.choice(default_intros)
self.logger.log(f"기본 소개 문구 선택: {selected}", level=logging.DEBUG)
return selected
async def save_shortcut(self):
"""저장 단축키(Ctrl+S)를 실행하는 메서드"""
try:
# 저장 확인 다이얼로그가 있는지 먼저 확인
await self.handle_save_dialog_if_exists()
self.logger.log("이미지 에디터 저장버튼 클릭", level=logging.DEBUG)
await self.page.click(self.save_btn_locator)
self.logger.log("수정사항 저장 버튼 클릭 완료", level=logging.DEBUG)
editor_locator = self.page.locator(self.editor_modal_locator)
if await editor_locator.is_visible():
# self.logger.log("이미지 에디터 모달 오픈 감지", level=logging.DEBUG)
# await self.page.click(self.save_btn_locator)
# self.logger.log("수정사항 저장 버튼 클릭 완료", level=logging.DEBUG)
try:
# 닫힘 대기 (예: 최대 5초)
await self.page.wait_for_selector(self.editor_modal_locator, state="detached", timeout=5000)
self.logger.log("에디터 정상 종료", level=logging.INFO)
except Exception:
self.logger.log("에디터 닫힘 timeout! ESC로 강제 종료 시도", level=logging.WARNING)
await self.page.keyboard.press("1")
await asyncio.sleep(1)
await self.page.keyboard.press("Escape")
await self.page.wait_for_selector(self.editor_modal_locator, state="detached", timeout=5000)
except Exception as e:
self.logger.log(f"저장 단축키 실행 실패: {e}", level=logging.ERROR, exc_info=True)
async def handle_save_dialog_if_exists(self) -> bool:
"""
저장 확인 다이얼로그가 있는지 확인하고 있으면 처리합니다.
다이얼로그가 없으면 아무 작업도 하지 않습니다.
Returns:
bool: 다이얼로그를 처리했으면 True, 없었으면 False
"""
try:
dialog_locator = self.page.locator(self.dialog_save_btn_locator)
try:
is_visible = await dialog_locator.is_visible(timeout=1000)
except Exception:
is_visible = False
if is_visible:
self.logger.log("저장 확인 다이얼로그 감지됨 - 저장 버튼 클릭 시도", level=logging.INFO)
await dialog_locator.click()
await asyncio.sleep(0.2)
return True
else:
self.logger.log("저장 확인 다이얼로그 없음", level=logging.DEBUG)
return False
except Exception as e:
self.logger.log(f"저장 다이얼로그 확인 중 오류: {e}", level=logging.WARNING)
return False
async def click_and_wait_complete(self, btn_locator: str, timeout: float = 20.0, check_interval: float = 0.5) -> bool:
"""
로딩 버튼 클릭 후 ant-btn-loading 상태 해제까지 대기.
성공시 True, 실패(타임아웃 등)시 False 반환.
"""
try:
await self.page.click(btn_locator)
self.logger.log(f"버튼 클릭: {btn_locator}", level=logging.INFO)
# 로딩 상태 진입 대기 (최대 2초)
loading_detected = False
for attempt in range(10):
try:
is_loading = await asyncio.wait_for(
self.page.locator(btn_locator).evaluate(
"el => el.classList.contains('ant-btn-loading')"
),
timeout=3.0
)
if is_loading:
self.logger.log(f"로딩 상태 진입 감지: {btn_locator}", level=logging.DEBUG)
loading_detected = True
break
except asyncio.TimeoutError:
self.logger.log(f"로딩 상태 확인 타임아웃, 재시도 중... ({attempt+1}/10)", level=logging.WARNING)
continue
except (KeyError, Exception) as e:
# Playwright 내부 에러 및 네트워크 에러 처리
if "data" in str(e) or isinstance(e, KeyError) or "ENOTFOUND" in str(e) or "getaddrinfo" in str(e):
self.logger.log(f"Playwright/네트워크 에러 발생, 재시도 중... ({attempt+1}/10): {e}", level=logging.WARNING)
await asyncio.sleep(0.5) # 약간 더 긴 대기
continue
else:
self.logger.log(f"예상치 못한 에러 발생: {e}", level=logging.ERROR)
continue
await asyncio.sleep(0.2)
# 만약 2초 내에 로딩 진입을 못하면, 즉시 완료된 것으로 간주(성공 반환)
if not loading_detected:
self.logger.log("로딩 상태 미감지(즉시 완료로 판단)", level=logging.INFO)
return True
# 로딩 해제(완료)까지 대기
start = asyncio.get_event_loop().time()
while True:
try:
is_loading = await asyncio.wait_for(
self.page.locator(btn_locator).evaluate(
"el => el.classList.contains('ant-btn-loading')"
),
timeout=3.0
)
if not is_loading:
self.logger.log(f"로딩 해제 감지(작업 완료): {btn_locator}", level=logging.INFO)
return True
except asyncio.TimeoutError:
self.logger.log(f"로딩 해제 확인 타임아웃, 재시도 중...", level=logging.WARNING)
# 타임아웃이 발생해도 계속 진행
except (KeyError, Exception) as e:
# Playwright 내부 에러 및 네트워크 에러 처리
if "data" in str(e) or isinstance(e, KeyError) or "ENOTFOUND" in str(e) or "getaddrinfo" in str(e):
self.logger.log(f"로딩 해제 확인 중 Playwright/네트워크 에러: {e}", level=logging.WARNING)
await asyncio.sleep(0.5)
# 내부 에러 발생 시에도 계속 진행
else:
self.logger.log(f"로딩 해제 확인 중 예상치 못한 에러: {e}", level=logging.ERROR)
if asyncio.get_event_loop().time() - start > timeout:
self.logger.log(f"완료 대기 시간 초과: {btn_locator}", level=logging.WARNING)
return False
await asyncio.sleep(check_interval)
except Exception as e:
self.logger.log(f"버튼 클릭 또는 대기 중 예외 발생: {e}", level=logging.ERROR, exc_info=True)
return False
async def process_split_images_if_any(self, image_url: str, index: int) -> list:
"""
매우 큰 원본이미지가 있을 경우에 대한 처리
분할된 이미지들이 있는 경우 각각을 처리
Args:
image_url: 원본 이미지 URL
index: 이미지 인덱스
Returns:
list: 분할된 이미지 처리 결과들
"""
try:
# 서버가 분할 및 처리 결과를 child_results로 반환하므로,
# 메인앱에서는 별도 분할 처리를 수행하지 않습니다.
self.logger.log("분할 이미지 처리는 서버에서 수행됩니다(child_results 사용).", level=logging.DEBUG)
return []
except Exception as e:
self.logger.log(f"분할 이미지 처리 확인 중 오류: {e}", level=logging.ERROR, exc_info=True)
return []
async def generate_and_add_product_summary(self, ocr_data_list: list, input_field):
"""
수집된 OCR 데이터를 바탕으로 GPT를 이용해 상품 요약을 생성하고 상세페이지에 추가
Args:
ocr_data_list: 모든 이미지에서 수집된 OCR 데이터 리스트
input_field: 텍스트 입력 필드
"""
try:
if not ocr_data_list or not self.gpt_client:
return
# OCR 데이터를 GPT 프롬프트용으로 정리
product_text_summary = self._prepare_ocr_data_for_gpt(ocr_data_list)
if not product_text_summary:
self.logger.log("GPT 요약용 OCR 데이터가 없습니다.", level=logging.DEBUG)
return
# GPT를 이용한 상품 요약 생성
summary = await self._generate_product_summary_with_gpt(product_text_summary)
if summary:
# 상세페이지에 요약 추가
await self._add_summary_to_detail_page(summary, input_field)
self.logger.log("상품 요약이 상세페이지에 추가되었습니다.", level=logging.INFO)
else:
self.logger.log("GPT 상품 요약 생성에 실패했습니다.", level=logging.WARNING)
except Exception as e:
self.logger.log(f"상품 요약 생성 및 추가 중 오류: {e}", level=logging.ERROR, exc_info=True)
def _prepare_ocr_data_for_gpt(self, ocr_data_list: list) -> str:
"""
OCR 데이터를 GPT 프롬프트용 텍스트로 정리
Args:
ocr_data_list: OCR 데이터 리스트
Returns:
str: GPT 프롬프트용 정리된 텍스트
"""
try:
# 모든 OCR 데이터를 카테고리별로 병합
all_specs = []
all_colors = []
all_materials = []
all_warnings = []
all_features = []
all_usage = []
all_others = []
for data in ocr_data_list:
classified_info = data.get('classified_info', {})
all_specs.extend(classified_info.get('specs', []))
all_colors.extend(classified_info.get('colors', []))
all_materials.extend(classified_info.get('materials', []))
all_warnings.extend(classified_info.get('warnings', []))
all_features.extend(classified_info.get('features', []))
all_usage.extend(classified_info.get('usage', []))
all_others.extend(classified_info.get('others', []))
# 중복 제거
all_specs = list(set(all_specs))
all_colors = list(set(all_colors))
all_materials = list(set(all_materials))
all_warnings = list(set(all_warnings))
all_features = list(set(all_features))
all_usage = list(set(all_usage))
all_others = list(set(all_others))
# GPT 프롬프트용 텍스트 생성
summary_parts = []
if all_specs:
summary_parts.append(f"제품 사양: {'; '.join(all_specs[:10])}") # 최대 10개
if all_features:
summary_parts.append(f"주요 특징: {'; '.join(all_features[:10])}")
if all_materials:
summary_parts.append(f"소재/재질: {'; '.join(all_materials[:5])}")
if all_colors:
summary_parts.append(f"색상: {'; '.join(all_colors[:5])}")
if all_warnings:
summary_parts.append(f"주의사항: {'; '.join(all_warnings[:5])}")
if all_usage:
summary_parts.append(f"사용법: {'; '.join(all_usage[:5])}")
return '\n'.join(summary_parts) if summary_parts else ''
except Exception as e:
self.logger.log(f"OCR 데이터 정리 중 오류: {e}", level=logging.ERROR, exc_info=True)
return ''
async def _generate_product_summary_with_gpt(self, product_info: str) -> str:
"""
GPT를 이용해 상품 요약 생성
Args:
product_info: 정리된 상품 정보 텍스트
Returns:
str: 생성된 상품 요약
"""
try:
prompt = f"""
다음은 상품 이미지에서 추출한 정보입니다. 이 정보를 바탕으로 고객이 구매 결정에 도움이 되는 상품 요약을 작성해주세요.
상품 정보:
{product_info}
요구사항:
1. 마크다운 형식으로 작성
2. 주요 특징, 사양, 주의사항 등을 포함
3. 구매대행 서비스임을 고려한 안내문구 포함
4. 300자 이내로 간결하게 작성
5. 고객 친화적이고 신뢰감 있는 톤앤매너
형식:
## 📋 상품 정보 요약
### 🎯 주요 특징
- [특징 1]
- [특징 2]
### 📏 제품 사양
- [사양 정보]
### ⚠️ 주의사항
- [주의사항]
### 💬 구매 안내
전문 구매대행 서비스로 안전하고 신속한 배송을 보장합니다.
"""
# GPT API 호출
response = await self.gpt_client.chat_completion(
messages=[{"role": "user", "content": prompt}],
model="gpt-4",
max_tokens=500,
temperature=0.3
)
if response and 'choices' in response and len(response['choices']) > 0:
return response['choices'][0]['message']['content'].strip()
else:
self.logger.log("GPT 응답이 올바르지 않습니다.", level=logging.WARNING)
return None
except Exception as e:
self.logger.log(f"GPT 상품 요약 생성 중 오류: {e}", level=logging.ERROR, exc_info=True)
return None
async def _add_summary_to_detail_page(self, summary: str, input_field):
"""
생성된 상품 요약을 상세페이지 마지막에 추가
Args:
summary: 생성된 상품 요약
input_field: 텍스트 입력 필드
"""
try:
# 커서를 마지막으로 이동
await self.page.keyboard.press("End")
await self.page.keyboard.press("Enter")
await self.page.keyboard.press("Enter")
# 구분선 추가
separator = "\n---\n\n"
await input_field.type(separator, delay=1)
# 요약 텍스트 청크 단위로 입력
await self.input_detail_text_chunked(input_field, summary)
self.logger.log("상품 요약이 상세페이지에 성공적으로 추가되었습니다.", level=logging.INFO)
except Exception as e:
self.logger.log(f"상품 요약을 상세페이지에 추가하는 중 오류: {e}", level=logging.ERROR, exc_info=True)
async def collect_ocr_data_from_db(self) -> list:
"""
데이터베이스에서 최근 저장된 OCR 데이터를 수집
Returns:
list: OCR 데이터 리스트
"""
try:
# 데이터베이스 파일 경로
if not self.imageProcessor or not hasattr(self.imageProcessor, 'base_dir'):
self.logger.log("⚠️ ImageProcessor가 초기화되지 않아 OCR 데이터베이스 접근 불가", level=logging.WARNING)
return []
db_path = os.path.join(self.imageProcessor.base_dir, "user_data", "product_ocr_data.db")
if not os.path.exists(db_path):
self.logger.log("OCR 데이터베이스 파일이 존재하지 않습니다.", level=logging.DEBUG)
return []
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 최근 10분 내 저장된 데이터를 조회 (현재 처리 세션의 데이터)
recent_time = datetime.now() - timedelta(minutes=10)
cursor.execute('''
SELECT raw_texts, text_count
FROM ocr_raw_data
WHERE created_at >= ?
ORDER BY created_at DESC
''', (recent_time.strftime('%Y-%m-%d %H:%M:%S'),))
rows = cursor.fetchall()
conn.close()
# 데이터 파싱 (단순화됨)
ocr_data_list = []
for row in rows:
try:
raw_texts = json.loads(row[0]) if row[0] else []
text_count = row[1] if len(row) > 1 else len(raw_texts)
ocr_data_list.append({
'raw_texts': raw_texts,
'text_count': text_count
})
except json.JSONDecodeError as e:
self.logger.log(f"OCR 데이터 파싱 오류: {e}", level=logging.WARNING)
continue
self.logger.log(f"데이터베이스에서 {len(ocr_data_list)}개의 OCR 데이터를 수집했습니다.", level=logging.DEBUG)
return ocr_data_list
except Exception as e:
self.logger.log(f"OCR 데이터 수집 중 오류: {e}", level=logging.ERROR, exc_info=True)
return []
# GPT 관련 기능들은 현재 비활성화됨 (gpt_client 사용 불가)
# 향후 필요시 다시 활성화 가능