AutoPercenty3/test/t.py

500 lines
22 KiB
Python

import sqlite3
import requests
from bs4 import BeautifulSoup
import json
import logging
from typing import List, Dict, Optional
import xml.etree.ElementTree as ET
# 로거 설정
logger = logging.getLogger('default_logger')
logging.basicConfig(level=logging.DEBUG)
class TranslateService:
def __init__(self, service="papago", papago_key=None, google_key=None, deepl_key=None):
self.service = service
self.papago_key = papago_key
self.google_key = google_key
self.deepl_key = deepl_key
def translate(self, text: str, source_lang="zh", target_lang="ko") -> Optional[str]:
if self.service == "papago":
return self.translate_with_papago(text, source_lang, target_lang)
elif self.service == "google":
return self.translate_with_google(text, source_lang, target_lang)
elif self.service == "deepl":
return self.translate_with_deepl(text, source_lang, target_lang)
else:
logger.error(f"지원하지 않는 번역 서비스: {self.service}")
return None
def translate_with_papago(self, text, source_lang, target_lang):
url = "https://openapi.naver.com/v1/papago/n2mt"
headers = {
"X-Naver-Client-Id": self.papago_key["client_id"],
"X-Naver-Client-Secret": self.papago_key["client_secret"],
}
data = {
"source": source_lang,
"target": target_lang,
"text": text
}
response = requests.post(url, headers=headers, data=data)
if response.status_code == 200:
result = response.json().get('message', {}).get('result', {}).get('translatedText', '')
return result
else:
logger.error(f"파파고 번역 오류: {response.status_code}")
return None
def translate_with_google(self, text, source_lang, target_lang):
url = f"https://translation.googleapis.com/language/translate/v2?key={self.google_key}"
data = {
'q': text,
'source': source_lang,
'target': target_lang,
'format': 'text'
}
response = requests.post(url, data=data)
if response.status_code == 200:
result = response.json().get("data", {}).get("translations", [])[0].get("translatedText", '')
return result
else:
logger.error(f"구글 번역 오류: {response.status_code}")
return None
def translate_with_deepl(self, text, source_lang, target_lang):
url = "https://api-free.deepl.com/v2/translate"
headers = {
"Authorization": f"DeepL-Auth-Key {self.deepl_key}"
}
data = {
"text": text,
"source_lang": source_lang.upper(),
"target_lang": target_lang.upper()
}
response = requests.post(url, headers=headers, data=data)
if response.status_code == 200:
result = response.json().get("translations", [])[0].get("text", '')
return result
else:
logger.error(f"Deepl 번역 오류: {response.status_code}")
return None
# 금지어 DB 생성 및 초기화
class ForbiddenWordsDB:
def __init__(self, db_name="forbidden_words.db"):
self.conn = sqlite3.connect(db_name)
self.cursor = self.conn.cursor()
self.create_table()
def create_table(self):
# 금지어 테이블에 설명 필드 추가
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS forbidden_words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word TEXT NOT NULL UNIQUE,
description TEXT,
applicant_name TEXT,
agent_name TEXT,
classification_code TEXT,
category_description TEXT
)
""")
self.conn.commit()
def add_forbidden_word(self, word, description=None, applicant_name=None, agent_name=None, classification_code=None, category_description=None):
# 중복 확인 후 금지어 추가
if self.is_forbidden(word):
logger.info(f"'{word}'은 이미 금지어 목록에 존재합니다.")
return
try:
self.cursor.execute("""
INSERT INTO forbidden_words (word, description, applicant_name, agent_name, classification_code, category_description)
VALUES (?, ?, ?, ?, ?, ?)
""", (word, description, applicant_name, agent_name, classification_code, category_description))
self.conn.commit()
logger.info(f"'{word}'이 금지어 목록에 추가되었습니다.")
except sqlite3.Error as e:
logger.error(f"Error inserting forbidden word '{word}': {e}")
def is_forbidden(self, word):
# 금지어 여부 확인
self.cursor.execute("SELECT id FROM forbidden_words WHERE word = ?", (word,))
return self.cursor.fetchone() is not None
def delete_forbidden_word(self, word):
# 금지어 삭제
try:
self.cursor.execute("DELETE FROM forbidden_words WHERE word = ?", (word,))
self.conn.commit()
logger.info(f"'{word}'이 금지어 목록에서 삭제되었습니다.")
except sqlite3.Error as e:
logger.error(f"Error deleting forbidden word '{word}': {e}")
def update_forbidden_word(self, word, new_word=None, description=None, applicant_name=None, agent_name=None, classification_code=None, category_description=None):
# 금지어 정보 업데이트
if not self.is_forbidden(word):
logger.info(f"'{word}'은 금지어 목록에 없습니다.")
return
try:
self.cursor.execute("""
UPDATE forbidden_words
SET word = COALESCE(?, word),
description = COALESCE(?, description),
applicant_name = COALESCE(?, applicant_name),
agent_name = COALESCE(?, agent_name),
classification_code = COALESCE(?, classification_code),
category_description = COALESCE(?, category_description)
WHERE word = ?
""", (new_word, description, applicant_name, agent_name, classification_code, category_description, word))
self.conn.commit()
logger.info(f"'{word}'의 정보가 업데이트되었습니다.")
except sqlite3.Error as e:
logger.error(f"Error updating forbidden word '{word}': {e}")
def get_all_forbidden_words(self):
# 모든 금지어 조회
try:
self.cursor.execute("SELECT * FROM forbidden_words")
words = self.cursor.fetchall()
return words
except sqlite3.Error as e:
logger.error(f"Error fetching all forbidden words: {e}")
return []
def close(self):
self.conn.close()
# Kipris API 연동
class Kipris_API:
def __init__(self, apikey=None):
self.url = 'http://kipo-api.kipi.or.kr/openapi/service/trademarkInfoSearchService/getWordSearch'
self.apikey = apikey
self.results = {}
filename = 'kiprisCategories.json'
self.category_description = self.load_category_descriptions(filename)
def fetch_and_decode(self, params):
# API 요청 및 응답 받기
try:
response = requests.get(self.url, params=params)
decoded_data = response.content.decode('utf-8')
return decoded_data
except Exception as e:
logger.error(f"키프리스 요청 중 에러발생 : {e}")
def parse_xml(self, xml_data, status):
# XML 데이터 파싱
root = ET.fromstring(xml_data)
total_items = 0
status_registered = 0
status_published = 0
# 'body/items/item' 경로에 맞춰 'item' 태그를 순회하면서 필요한 데이터 추출
for i, item in enumerate(root.findall('.//body/items/item')):
total_items += 1
application_status = item.find('applicationStatus').text if item.find('applicationStatus') is not None else None
product_category = item.find('classificationCode').text if item.find('classificationCode') is not None else None
if product_category:
category_desc = self.add_category_description(product_category) if product_category else "No category description"
else:
category_desc = None
# 각 상태의 개수를 카운트
if application_status == "등록":
status_registered += 1
if application_status == "공개":
status_published += 1
# if application_status in ["등록", "공개"]:
if application_status in status: # status는 self.set_status 리스트를 참조
self.results[f"result_{i+1}"] = {
"index_no": item.find('indexNo').text if item.find('indexNo') is not None else None,
"application_number": item.find('applicationNumber').text if item.find('applicationNumber') is not None else None,
"application_date": item.find('applicationDate').text if item.find('applicationDate') is not None else None,
"publication_number": item.find('publicationNumber').text if item.find('publicationNumber') is not None else None,
"publication_date": item.find('publicationDate').text if item.find('publicationDate') is not None else None,
"registration_date": item.find('registrationDate').text if item.find('registrationDate') is not None else None,
"registration_number": item.find('registrationNumber').text if item.find('registrationNumber') is not None else None,
"applicant_name": item.find('applicantName').text if item.find('applicantName') is not None else None,
"agent_name": item.find('agentName').text if item.find('agentName') is not None else None,
"title": item.find('title').text if item.find('title') is not None else None,
"drawing_url": item.find('drawing').text if item.find('drawing') is not None else None,
"big_drawing_url": item.find('bigDrawing').text if item.find('bigDrawing') is not None else None,
"full_text": item.find('fullText').text if item.find('fullText') is not None else None,
"application_status": application_status,
"classification_code": item.find('classificationCode').text if item.find('classificationCode') is not None else None,
"category_description": category_desc
}
# self.results.append(result)
# 상태 개수와 총 아이템 개수 출력
logger.debug(f"검색된 item 총 개수: {total_items}")
self.results['total_count'] = total_items
logger.debug(f"등록 상태인 item 개수: {status_registered}")
logger.debug(f"공개 상태인 item 개수: {status_published}")
def get_results(self):
return self.results
def run(self, keyword, status):
params = {
'serviceKey': self.apikey,
'searchString': keyword,
'searchRecentYear': '0',
'title': '',
'fullText': '',
'drawing': '',
'bigDrawing': ''
}
logger.debug(f" Search params : {params}")
try:
xml_data = self.fetch_and_decode(params)
self.parse_xml(xml_data, status)
except Exception as e:
logger.error(f"API 요청 중 에러발생 : {e}")
return self.get_results()
def close_Kipris(self):
pass
def load_category_descriptions(self, filename):
"""JSON 파일에서 카테고리 설명을 로드합니다."""
with open(filename, 'r', encoding='utf-8') as file:
return json.load(file)
def add_category_description(self, category_code):
"""각 분류 코드를 설명과 함께 포맷합니다."""
descriptions = []
codes = category_code.split('|')
for code in codes:
description = self.category_description.get(code, "카테고리 설명을 찾을 수 없습니다.")
descriptions.append(f"[{code}] - {description}")
return "; ".join(descriptions)
def check_trademark(self, keyword):
# API를 사용하여 키워드의 상표 등록 여부를 검사
status = ["등록", "공개"]
self.run(keyword, status)
return self.get_results()
# Naver Parser 연동
class NaverParser:
def __init__(self):
self.base_url = "https://search.shopping.naver.com/search/all?query="
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Cache-Control": "max-age=0"
}
def fetch_search_results(self, keyword: str, product_set: str = "overseas") -> Optional[Dict]:
"""네이버 쇼핑에서 키워드 검색 결과를 가져옴. product_set 인자로 제품 유형을 선택 가능."""
url = f"{self.base_url}{keyword}&frm=NVSHATC&pagingIndex=1&pagingSize=40&productSet={product_set}&sort=rel&timestamp=&viewType=list"
logger.debug(f"검색 URL: {url}")
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
next_data = soup.find("script", {"id": "__NEXT_DATA__"})
if next_data:
next_data_json = json.loads(next_data.string)
return next_data_json
else:
logger.error("검색 결과에서 '__NEXT_DATA__' 태그를 찾을 수 없음.")
except requests.exceptions.RequestException as e:
logger.error(f"네이버 쇼핑 HTML 가져오기 실패: {e}")
return None
def get_product_list(self, data: Dict) -> List[Dict]:
"""검색 결과에서 제품 리스트를 추출."""
try:
products_list = data["props"]["pageProps"]["initialState"]["products"]["list"]
logger.debug(f"{len(products_list)}개의 제품이 검색됨.")
return products_list
except KeyError as e:
logger.error(f"제품 리스트를 추출하는 중 오류 발생: {e}")
return []
def filter_products_by_price(self, products: List[Dict], min_price: int = 50000) -> List[Dict]:
"""최소 가격 기준으로 제품을 필터링."""
filtered_products = [product for product in products if int(product.get("item", {}).get("price", 0)) > min_price]
logger.debug(f"가격이 {min_price}원 이상인 제품 {len(filtered_products)}개 필터링됨.")
return filtered_products
def extract_product_info(self, product: Dict) -> Dict:
"""제품 정보에서 필요한 데이터만 추출."""
item = product.get("item", {})
product_info = {
"title": item.get("productTitle"),
"price": item.get("price"),
"mall_name": item.get("mallName"),
"image_url": item.get("imageUrl"),
"product_url": item.get("mallProductUrl"),
"category": [
item.get("category1Name"),
item.get("category2Name"),
item.get("category3Name"),
item.get("category4Name")
],
"rank": item.get("rank"),
"review_count": item.get("reviewCount"),
"review_count_sum": item.get("reviewCountSum"),
"score_info": item.get("scoreInfo"),
"mobile_low_price": item.get("mobileLowPrice"),
"low_price": item.get("lowPrice"),
"delivery_fee_content": item.get("deliveryFeeContent"),
"dlvry_low_price": item.get("dlvryLowPrice"),
"open_date": item.get("openDate"),
"mall_count": item.get("mallCount"),
"keep_count": item.get("keepCnt"),
"oversea_tp": item.get("overseaTp"),
"purchase_count": item.get("purchaseCnt"),
"manu_tag": item.get("manuTag"),
"img_size": item.get("imgSz"),
"search_keyword": item.get("searchKeyword"),
"mall_pc_url": item.get("mallPcUrl"),
}
return product_info
def get_top_n_products(self, products: List[Dict], top_n: int = 5) -> List[Dict]:
"""상위 N개의 제품 정보 추출."""
sorted_products = sorted(products, key=lambda p: int(p.get("item", {}).get("rank", 0)))
top_products = sorted_products[:top_n]
logger.debug(f"상위 {top_n}개 제품을 추출함.")
return [self.extract_product_info(product) for product in top_products]
def get_related_tags(self, data: Dict) -> List[str]:
"""연관 검색어를 추출."""
try:
related_tags = data["props"]["pageProps"]["relatedTags"]
filtered_tags = [tag.strip() for tag in related_tags if tag]
logger.debug(f"연관 검색어: {filtered_tags}")
return filtered_tags
except KeyError:
logger.error("연관 검색어를 추출하는 중 오류 발생.")
return []
def search_and_parse(self, keyword: str, min_price: int = 10000, top_n: int = 5) -> Dict:
"""키워드로 검색 후 필터링된 상위 제품 정보 및 연관 검색어 반환."""
data = self.fetch_search_results(keyword)
if data:
products = self.get_product_list(data)
filtered_products = self.filter_products_by_price(products, min_price)
top_products = self.get_top_n_products(filtered_products, top_n)
related_tags = self.get_related_tags(data)
return {
"top_products": top_products,
"related_tags": related_tags
}
return {}
def get_top_related_keywords(self, keyword):
result = self.search_and_parse(keyword)
return result['top_products'], result['related_tags']
# 상품명 생성기
class ProductNameGenerator:
def __init__(self, translate_service, kipris_api, naver_parser, forbidden_db):
self.translate_service = translate_service
self.kipris_api = kipris_api
self.naver_parser = naver_parser
self.forbidden_db = forbidden_db
def is_common_word(self, word: str) -> bool:
"""단어가 일반적인 단어인지 확인"""
return word in COMMON_WORDS
def generate_names(self, chinese_name: str):
# 1. 중국어를 한국어로 번역
korean_name = self.translate_service.translate(chinese_name)
if not korean_name:
logger.error("번역 실패")
return []
# 2. 번역된 단어를 공통 단어 및 금지어 필터링
words_to_check = [word for word in korean_name.split() if not self.is_common_word(word)]
if not self.filter_forbidden(" ".join(words_to_check)):
logger.debug("번역된 상품명이 금지어 목록에 포함되어 제외됨.")
return []
# 3. Kipris API로 상표권 검사
for word in words_to_check:
trademark_check = self.kipris_api.check_trademark(word)
if isinstance(trademark_check, dict) and any(
item.get("application_status") in ["등록", "공개"] for item in trademark_check.values()
):
logger.debug(f"상표권 문제로 단어 '{word}'가 제외됨.")
return []
# 검색 결과가 없으면 상표권이 없다는 의미이므로 상품명에 사용 가능
if not trademark_check:
logger.debug(f"'{word}' 단어는 상표권이 없는 안전한 단어로 판단됨.")
# 4. 네이버 검색 및 상위 상품명 생성
top_products, related_tags = self.naver_parser.get_top_related_keywords(korean_name)
scored_words = self.score_words(top_products)
final_name = self.construct_name(scored_words)
random_name = self.construct_random_name(scored_words)
return {
"scored_name": final_name,
"random_name": random_name,
"related_keywords": related_tags
}
def filter_forbidden(self, name):
return not any(self.forbidden_db.is_forbidden(word) for word in name.split())
def score_words(self, top_products):
word_scores = {}
for i, product in enumerate(top_products):
words = product['title'].split()
for j, word in enumerate(words):
score = max(0, 10 - i) + max(0, 20 - j * 2)
if word not in word_scores or score > word_scores[word]:
word_scores[word] = score
return {word: score for word, score in sorted(word_scores.items(), key=lambda x: -x[1])}
def construct_name(self, scored_words, max_length=30):
name = ""
for word in scored_words:
if len(name) + len(word) + 1 <= max_length:
name += f"{word} "
return name.strip()
def construct_random_name(self, scored_words):
import random
words = list(scored_words.keys())
random.shuffle(words)
return " ".join(words[:5])
# 실행 예제
def main():
forbidden_db = ForbiddenWordsDB()
translate_service = TranslateService(service="deepl", papago_key={"client_id": "V1UyIry1TNhzj4ln1UJ7", "client_secret": "YV3EsIWlTH"}, deepl_key = '6f07317d-f155-46f9-84a0-033ed942c9c6:fx')
kipris_api = Kipris_API(apikey="X9Tz3JqC/JcCwxnNewA6qdloIN6QFIitVBgS1a2KVDYk1AmddaDTvzr6+t3dyLZV3gh2TPXdNhxsRQwaKP673Q==")
naver_parser = NaverParser()
generator = ProductNameGenerator(translate_service, kipris_api, naver_parser, forbidden_db)
chinese_name = "中国制造的超级胶水"
result = generator.generate_names(chinese_name)
logger.debug("생성된 상품명: " + json.dumps(result, ensure_ascii=False, indent=4))
if __name__ == "__main__":
main()