import sqlite3 import requests from bs4 import BeautifulSoup import json import logging from typing import List, Dict, Optional import xml.etree.ElementTree as ET # 로거 설정 logger = logging.getLogger('default_logger') logging.basicConfig(level=logging.DEBUG) class TranslateService: def __init__(self, service="papago", papago_key=None, google_key=None, deepl_key=None): self.service = service self.papago_key = papago_key self.google_key = google_key self.deepl_key = deepl_key def translate(self, text: str, source_lang="zh", target_lang="ko") -> Optional[str]: if self.service == "papago": return self.translate_with_papago(text, source_lang, target_lang) elif self.service == "google": return self.translate_with_google(text, source_lang, target_lang) elif self.service == "deepl": return self.translate_with_deepl(text, source_lang, target_lang) else: logger.error(f"지원하지 않는 번역 서비스: {self.service}") return None def translate_with_papago(self, text, source_lang, target_lang): url = "https://openapi.naver.com/v1/papago/n2mt" headers = { "X-Naver-Client-Id": self.papago_key["client_id"], "X-Naver-Client-Secret": self.papago_key["client_secret"], } data = { "source": source_lang, "target": target_lang, "text": text } response = requests.post(url, headers=headers, data=data) if response.status_code == 200: result = response.json().get('message', {}).get('result', {}).get('translatedText', '') return result else: logger.error(f"파파고 번역 오류: {response.status_code}") return None def translate_with_google(self, text, source_lang, target_lang): url = f"https://translation.googleapis.com/language/translate/v2?key={self.google_key}" data = { 'q': text, 'source': source_lang, 'target': target_lang, 'format': 'text' } response = requests.post(url, data=data) if response.status_code == 200: result = response.json().get("data", {}).get("translations", [])[0].get("translatedText", '') return result else: logger.error(f"구글 번역 오류: {response.status_code}") return None def translate_with_deepl(self, text, source_lang, target_lang): url = "https://api-free.deepl.com/v2/translate" headers = { "Authorization": f"DeepL-Auth-Key {self.deepl_key}" } data = { "text": text, "source_lang": source_lang.upper(), "target_lang": target_lang.upper() } response = requests.post(url, headers=headers, data=data) if response.status_code == 200: result = response.json().get("translations", [])[0].get("text", '') return result else: logger.error(f"Deepl 번역 오류: {response.status_code}") return None # 금지어 DB 생성 및 초기화 class ForbiddenWordsDB: def __init__(self, db_name="forbidden_words.db"): self.conn = sqlite3.connect(db_name) self.cursor = self.conn.cursor() self.create_table() def create_table(self): # 금지어 테이블에 설명 필드 추가 self.cursor.execute(""" CREATE TABLE IF NOT EXISTS forbidden_words ( id INTEGER PRIMARY KEY AUTOINCREMENT, word TEXT NOT NULL UNIQUE, description TEXT, applicant_name TEXT, agent_name TEXT, classification_code TEXT, category_description TEXT ) """) self.conn.commit() def add_forbidden_word(self, word, description=None, applicant_name=None, agent_name=None, classification_code=None, category_description=None): # 중복 확인 후 금지어 추가 if self.is_forbidden(word): logger.info(f"'{word}'은 이미 금지어 목록에 존재합니다.") return try: self.cursor.execute(""" INSERT INTO forbidden_words (word, description, applicant_name, agent_name, classification_code, category_description) VALUES (?, ?, ?, ?, ?, ?) """, (word, description, applicant_name, agent_name, classification_code, category_description)) self.conn.commit() logger.info(f"'{word}'이 금지어 목록에 추가되었습니다.") except sqlite3.Error as e: logger.error(f"Error inserting forbidden word '{word}': {e}") def is_forbidden(self, word): # 금지어 여부 확인 self.cursor.execute("SELECT id FROM forbidden_words WHERE word = ?", (word,)) return self.cursor.fetchone() is not None def delete_forbidden_word(self, word): # 금지어 삭제 try: self.cursor.execute("DELETE FROM forbidden_words WHERE word = ?", (word,)) self.conn.commit() logger.info(f"'{word}'이 금지어 목록에서 삭제되었습니다.") except sqlite3.Error as e: logger.error(f"Error deleting forbidden word '{word}': {e}") def update_forbidden_word(self, word, new_word=None, description=None, applicant_name=None, agent_name=None, classification_code=None, category_description=None): # 금지어 정보 업데이트 if not self.is_forbidden(word): logger.info(f"'{word}'은 금지어 목록에 없습니다.") return try: self.cursor.execute(""" UPDATE forbidden_words SET word = COALESCE(?, word), description = COALESCE(?, description), applicant_name = COALESCE(?, applicant_name), agent_name = COALESCE(?, agent_name), classification_code = COALESCE(?, classification_code), category_description = COALESCE(?, category_description) WHERE word = ? """, (new_word, description, applicant_name, agent_name, classification_code, category_description, word)) self.conn.commit() logger.info(f"'{word}'의 정보가 업데이트되었습니다.") except sqlite3.Error as e: logger.error(f"Error updating forbidden word '{word}': {e}") def get_all_forbidden_words(self): # 모든 금지어 조회 try: self.cursor.execute("SELECT * FROM forbidden_words") words = self.cursor.fetchall() return words except sqlite3.Error as e: logger.error(f"Error fetching all forbidden words: {e}") return [] def close(self): self.conn.close() # Kipris API 연동 class Kipris_API: def __init__(self, apikey=None): self.url = 'http://kipo-api.kipi.or.kr/openapi/service/trademarkInfoSearchService/getWordSearch' self.apikey = apikey self.results = {} filename = 'kiprisCategories.json' self.category_description = self.load_category_descriptions(filename) def fetch_and_decode(self, params): # API 요청 및 응답 받기 try: response = requests.get(self.url, params=params) decoded_data = response.content.decode('utf-8') return decoded_data except Exception as e: logger.error(f"키프리스 요청 중 에러발생 : {e}") def parse_xml(self, xml_data, status): # XML 데이터 파싱 root = ET.fromstring(xml_data) total_items = 0 status_registered = 0 status_published = 0 # 'body/items/item' 경로에 맞춰 'item' 태그를 순회하면서 필요한 데이터 추출 for i, item in enumerate(root.findall('.//body/items/item')): total_items += 1 application_status = item.find('applicationStatus').text if item.find('applicationStatus') is not None else None product_category = item.find('classificationCode').text if item.find('classificationCode') is not None else None if product_category: category_desc = self.add_category_description(product_category) if product_category else "No category description" else: category_desc = None # 각 상태의 개수를 카운트 if application_status == "등록": status_registered += 1 if application_status == "공개": status_published += 1 # if application_status in ["등록", "공개"]: if application_status in status: # status는 self.set_status 리스트를 참조 self.results[f"result_{i+1}"] = { "index_no": item.find('indexNo').text if item.find('indexNo') is not None else None, "application_number": item.find('applicationNumber').text if item.find('applicationNumber') is not None else None, "application_date": item.find('applicationDate').text if item.find('applicationDate') is not None else None, "publication_number": item.find('publicationNumber').text if item.find('publicationNumber') is not None else None, "publication_date": item.find('publicationDate').text if item.find('publicationDate') is not None else None, "registration_date": item.find('registrationDate').text if item.find('registrationDate') is not None else None, "registration_number": item.find('registrationNumber').text if item.find('registrationNumber') is not None else None, "applicant_name": item.find('applicantName').text if item.find('applicantName') is not None else None, "agent_name": item.find('agentName').text if item.find('agentName') is not None else None, "title": item.find('title').text if item.find('title') is not None else None, "drawing_url": item.find('drawing').text if item.find('drawing') is not None else None, "big_drawing_url": item.find('bigDrawing').text if item.find('bigDrawing') is not None else None, "full_text": item.find('fullText').text if item.find('fullText') is not None else None, "application_status": application_status, "classification_code": item.find('classificationCode').text if item.find('classificationCode') is not None else None, "category_description": category_desc } # self.results.append(result) # 상태 개수와 총 아이템 개수 출력 logger.debug(f"검색된 item 총 개수: {total_items}") self.results['total_count'] = total_items logger.debug(f"등록 상태인 item 개수: {status_registered}") logger.debug(f"공개 상태인 item 개수: {status_published}") def get_results(self): return self.results def run(self, keyword, status): params = { 'serviceKey': self.apikey, 'searchString': keyword, 'searchRecentYear': '0', 'title': '', 'fullText': '', 'drawing': '', 'bigDrawing': '' } logger.debug(f" Search params : {params}") try: xml_data = self.fetch_and_decode(params) self.parse_xml(xml_data, status) except Exception as e: logger.error(f"API 요청 중 에러발생 : {e}") return self.get_results() def close_Kipris(self): pass def load_category_descriptions(self, filename): """JSON 파일에서 카테고리 설명을 로드합니다.""" with open(filename, 'r', encoding='utf-8') as file: return json.load(file) def add_category_description(self, category_code): """각 분류 코드를 설명과 함께 포맷합니다.""" descriptions = [] codes = category_code.split('|') for code in codes: description = self.category_description.get(code, "카테고리 설명을 찾을 수 없습니다.") descriptions.append(f"[{code}] - {description}") return "; ".join(descriptions) def check_trademark(self, keyword): # API를 사용하여 키워드의 상표 등록 여부를 검사 status = ["등록", "공개"] self.run(keyword, status) return self.get_results() # Naver Parser 연동 class NaverParser: def __init__(self): self.base_url = "https://search.shopping.naver.com/search/all?query=" self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate, br", "DNT": "1", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Cache-Control": "max-age=0" } def fetch_search_results(self, keyword: str, product_set: str = "overseas") -> Optional[Dict]: """네이버 쇼핑에서 키워드 검색 결과를 가져옴. product_set 인자로 제품 유형을 선택 가능.""" url = f"{self.base_url}{keyword}&frm=NVSHATC&pagingIndex=1&pagingSize=40&productSet={product_set}&sort=rel×tamp=&viewType=list" logger.debug(f"검색 URL: {url}") try: response = requests.get(url, headers=self.headers) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') next_data = soup.find("script", {"id": "__NEXT_DATA__"}) if next_data: next_data_json = json.loads(next_data.string) return next_data_json else: logger.error("검색 결과에서 '__NEXT_DATA__' 태그를 찾을 수 없음.") except requests.exceptions.RequestException as e: logger.error(f"네이버 쇼핑 HTML 가져오기 실패: {e}") return None def get_product_list(self, data: Dict) -> List[Dict]: """검색 결과에서 제품 리스트를 추출.""" try: products_list = data["props"]["pageProps"]["initialState"]["products"]["list"] logger.debug(f"총 {len(products_list)}개의 제품이 검색됨.") return products_list except KeyError as e: logger.error(f"제품 리스트를 추출하는 중 오류 발생: {e}") return [] def filter_products_by_price(self, products: List[Dict], min_price: int = 50000) -> List[Dict]: """최소 가격 기준으로 제품을 필터링.""" filtered_products = [product for product in products if int(product.get("item", {}).get("price", 0)) > min_price] logger.debug(f"가격이 {min_price}원 이상인 제품 {len(filtered_products)}개 필터링됨.") return filtered_products def extract_product_info(self, product: Dict) -> Dict: """제품 정보에서 필요한 데이터만 추출.""" item = product.get("item", {}) product_info = { "title": item.get("productTitle"), "price": item.get("price"), "mall_name": item.get("mallName"), "image_url": item.get("imageUrl"), "product_url": item.get("mallProductUrl"), "category": [ item.get("category1Name"), item.get("category2Name"), item.get("category3Name"), item.get("category4Name") ], "rank": item.get("rank"), "review_count": item.get("reviewCount"), "review_count_sum": item.get("reviewCountSum"), "score_info": item.get("scoreInfo"), "mobile_low_price": item.get("mobileLowPrice"), "low_price": item.get("lowPrice"), "delivery_fee_content": item.get("deliveryFeeContent"), "dlvry_low_price": item.get("dlvryLowPrice"), "open_date": item.get("openDate"), "mall_count": item.get("mallCount"), "keep_count": item.get("keepCnt"), "oversea_tp": item.get("overseaTp"), "purchase_count": item.get("purchaseCnt"), "manu_tag": item.get("manuTag"), "img_size": item.get("imgSz"), "search_keyword": item.get("searchKeyword"), "mall_pc_url": item.get("mallPcUrl"), } return product_info def get_top_n_products(self, products: List[Dict], top_n: int = 5) -> List[Dict]: """상위 N개의 제품 정보 추출.""" sorted_products = sorted(products, key=lambda p: int(p.get("item", {}).get("rank", 0))) top_products = sorted_products[:top_n] logger.debug(f"상위 {top_n}개 제품을 추출함.") return [self.extract_product_info(product) for product in top_products] def get_related_tags(self, data: Dict) -> List[str]: """연관 검색어를 추출.""" try: related_tags = data["props"]["pageProps"]["relatedTags"] filtered_tags = [tag.strip() for tag in related_tags if tag] logger.debug(f"연관 검색어: {filtered_tags}") return filtered_tags except KeyError: logger.error("연관 검색어를 추출하는 중 오류 발생.") return [] def search_and_parse(self, keyword: str, min_price: int = 10000, top_n: int = 5) -> Dict: """키워드로 검색 후 필터링된 상위 제품 정보 및 연관 검색어 반환.""" data = self.fetch_search_results(keyword) if data: products = self.get_product_list(data) filtered_products = self.filter_products_by_price(products, min_price) top_products = self.get_top_n_products(filtered_products, top_n) related_tags = self.get_related_tags(data) return { "top_products": top_products, "related_tags": related_tags } return {} def get_top_related_keywords(self, keyword): result = self.search_and_parse(keyword) return result['top_products'], result['related_tags'] # 상품명 생성기 class ProductNameGenerator: def __init__(self, translate_service, kipris_api, naver_parser, forbidden_db): self.translate_service = translate_service self.kipris_api = kipris_api self.naver_parser = naver_parser self.forbidden_db = forbidden_db def is_common_word(self, word: str) -> bool: """단어가 일반적인 단어인지 확인""" return word in COMMON_WORDS def generate_names(self, chinese_name: str): # 1. 중국어를 한국어로 번역 korean_name = self.translate_service.translate(chinese_name) if not korean_name: logger.error("번역 실패") return [] # 2. 번역된 단어를 공통 단어 및 금지어 필터링 words_to_check = [word for word in korean_name.split() if not self.is_common_word(word)] if not self.filter_forbidden(" ".join(words_to_check)): logger.debug("번역된 상품명이 금지어 목록에 포함되어 제외됨.") return [] # 3. Kipris API로 상표권 검사 for word in words_to_check: trademark_check = self.kipris_api.check_trademark(word) if isinstance(trademark_check, dict) and any( item.get("application_status") in ["등록", "공개"] for item in trademark_check.values() ): logger.debug(f"상표권 문제로 단어 '{word}'가 제외됨.") return [] # 검색 결과가 없으면 상표권이 없다는 의미이므로 상품명에 사용 가능 if not trademark_check: logger.debug(f"'{word}' 단어는 상표권이 없는 안전한 단어로 판단됨.") # 4. 네이버 검색 및 상위 상품명 생성 top_products, related_tags = self.naver_parser.get_top_related_keywords(korean_name) scored_words = self.score_words(top_products) final_name = self.construct_name(scored_words) random_name = self.construct_random_name(scored_words) return { "scored_name": final_name, "random_name": random_name, "related_keywords": related_tags } def filter_forbidden(self, name): return not any(self.forbidden_db.is_forbidden(word) for word in name.split()) def score_words(self, top_products): word_scores = {} for i, product in enumerate(top_products): words = product['title'].split() for j, word in enumerate(words): score = max(0, 10 - i) + max(0, 20 - j * 2) if word not in word_scores or score > word_scores[word]: word_scores[word] = score return {word: score for word, score in sorted(word_scores.items(), key=lambda x: -x[1])} def construct_name(self, scored_words, max_length=30): name = "" for word in scored_words: if len(name) + len(word) + 1 <= max_length: name += f"{word} " return name.strip() def construct_random_name(self, scored_words): import random words = list(scored_words.keys()) random.shuffle(words) return " ".join(words[:5]) # 실행 예제 def main(): forbidden_db = ForbiddenWordsDB() translate_service = TranslateService(service="deepl", papago_key={"client_id": "V1UyIry1TNhzj4ln1UJ7", "client_secret": "YV3EsIWlTH"}, deepl_key = '6f07317d-f155-46f9-84a0-033ed942c9c6:fx') kipris_api = Kipris_API(apikey="X9Tz3JqC/JcCwxnNewA6qdloIN6QFIitVBgS1a2KVDYk1AmddaDTvzr6+t3dyLZV3gh2TPXdNhxsRQwaKP673Q==") naver_parser = NaverParser() generator = ProductNameGenerator(translate_service, kipris_api, naver_parser, forbidden_db) chinese_name = "中国制造的超级胶水" result = generator.generate_names(chinese_name) logger.debug("생성된 상품명: " + json.dumps(result, ensure_ascii=False, indent=4)) if __name__ == "__main__": main()