KiprisAPI/web_scraper.py

200 lines
11 KiB
Python

# import asyncio
from playwright.sync_api import sync_playwright
import random, requests, json
from PIL import Image
from io import BytesIO
class WebScraper:
def __init__(self):
self.results = {}
self.playwright = None
self.browser = None
self.context = None
self.page = None
filename = 'categories.json'
self.category_description = self.load_category_descriptions(filename)
self.url = "http://kdtj.kipris.or.kr/kdtj/searchLogina.do?method=loginTM#page1"
def setup_browser(self):
"""브라우저 설정 및 인스턴스 생성"""
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(headless=True) # For testing, set headless to True
self.context = self.browser.new_context(user_agent=random.choice([
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0"
]))
self.page = self.context.new_page()
self.navigate_to_page(self.url)
def navigate_to_page(self, url):
"""주어진 URL로 이동"""
self.page.goto(url)
# def search_for_term_success(self, term):
# """검색어로 검색하고 결과 수집"""
# try:
# self.page.fill("#queryText", term)
# self.page.click(".input_btn")
# # JavaScript에 의해 결과가 동적으로 로드되기를 기다립니다.
# loaded = self.page.wait_for_function("document.querySelector('form#listForm section.search_section article') != null", timeout=10000)
# if not loaded:
# print("검색 결과가 시간 내에 로드되지 않았습니다.")
# return None
# # 결과 로딩이 확인된 후, 실제 요소를 수집
# self.page.wait_for_selector("form#listForm section.search_section", state="visible", timeout=10000)
# total_count = self.page.text_content("form#listForm section.search_section div p span.total")
# if total_count:
# self.results['total_count'] = total_count.strip()
# articles = self.page.query_selector_all("form#listForm section.search_section article")
# for i, article in enumerate(articles):
# img_element = article.query_selector("div.thumb a img")
# if img_element:
# trademark_image = img_element.get_attribute("src")
# name_element = article.query_selector("div.search_section_title h1.stitle a b")
# if name_element:
# trademark_name = name_element.text_content()
# self.results[f"result_{i+1}"] = {"image": trademark_image, "name": trademark_name}
# else:
# print(f"이미지 요소가 없는 기사: {i+1}")
# return self.results
# except Exception as e:
# print(f"오류 발생 : {e}")
# return None
def search_for_term(self, term):
"""검색어로 검색하고 결과 수집"""
try:
self.page.fill("#queryText", term)
self.page.click(".input_btn")
# JavaScript에 의해 결과가 동적으로 로드되기를 기다립니다.
loaded = self.page.wait_for_function("document.querySelector('form#listForm section.search_section article') != null")
if not loaded:
print("검색 결과가 시간 내에 로드되지 않았습니다.")
return None
# 검색 결과가 없는지 확인
nodata_info = self.page.query_selector(".nodata_info")
if nodata_info:
print("검색 결과가 없습니다.")
# 특정 메서드 호출 및 함수 종료
# self.handle_no_search_results()
return None
# 결과 로딩이 확인된 후, 실제 요소를 수집
self.page.wait_for_selector("form#listForm section.search_section", state="visible", timeout=10000)
total_count = self.page.text_content("form#listForm section.search_section div p span.total")
if total_count:
self.results['total_count'] = total_count.strip()
articles = self.page.query_selector_all("form#listForm section.search_section article")
for i, article in enumerate(articles):
id_and_name_element = article.query_selector("div:nth-child(1) span input[type='checkbox']")
trademark_name = id_and_name_element.get_attribute("title") if id_and_name_element else "No name found"
applno = id_and_name_element.get_attribute("value") if id_and_name_element else "No ID found"
if applno:
trademark_image_url_by_id = f"http://kdtj.kipris.or.kr/kdtj/remoteFile.do?method=bigImageTM&applno={applno}&no={applno}_tm000001.jpg"
img_element = article.query_selector("div:nth-child(2) div a img")
trademark_image = img_element.get_attribute("src") if img_element else "No image found"
admin_status_element = article.query_selector("div:nth-child(1) h1 a:nth-child(1) span")
admin_status = admin_status_element.text_content() if admin_status_element else "No status found"
product_category_element = article.query_selector("div:nth-child(2) ul li:nth-child(1) a span")
product_category = product_category_element.text_content() if product_category_element else "No category found"
if product_category:
category_desc = self.add_category_description(product_category)
applicant_element = article.query_selector("div:nth-child(2) ul li:nth-child(2) span[title]")
applicant = applicant_element.get_attribute("title") if applicant_element else "No applicant found"
publication_date_element = article.query_selector("div:nth-child(2) ul li:nth-child(8)")
publication_date = publication_date_element.text_content().strip() if publication_date_element else "No publication date found"
registration_date_element = article.query_selector("div:nth-child(2) ul li:nth-child(6)")
registration_date = registration_date_element.text_content().strip() if registration_date_element else "No registration date found"
# name_element = article.query_selector("div:nth-child(1) h1 a:nth-child(2) font b")
# trademark_name = name_element.text_content() if name_element else "No name found"
# Use the title attribute of the checkbox input for the trademark name
name_element = article.query_selector("div:nth-child(1) span input[type='checkbox']")
trademark_name = name_element.get_attribute("title") if name_element else "No name found"
if not admin_status == "소멸":
self.results[f"result_{i+1}"] = {
"ID": applno,
"title": trademark_name,
"admin_status": admin_status,
"imageURL": trademark_image,
"IDimageURL": trademark_image_url_by_id,
"product_category": product_category,
"category_description": category_desc,
"applicant": applicant,
"publication_date": publication_date,
"registration_date": registration_date,
}
return self.results
else:
print("No total count element found, possibly incorrect selector or page structure has changed.")
except Exception as e:
print(f"오류 발생 : {e}")
return None
def download_image(url, applno):
"""이미지를 다운로드하고 applno를 파일 이름으로 사용하여 저장합니다."""
response = requests.get(url)
if response.status_code == 200:
filename = f"{applno}.jpeg" # 파일 이름을 ID로 설정
with open(filename, 'wb') as file:
file.write(response.content)
print(f"이미지가 성공적으로 저장되었습니다: {filename}")
else:
print(f"이미지 다운로드 실패: HTTP {response.status_code}")
def load_category_descriptions(self, filename):
"""JSON 파일에서 카테고리 설명을 로드합니다."""
with open(filename, 'r', encoding='utf-8') as file:
return json.load(file)
def add_category_description(self, category_code):
"""주어진 카테고리 코드에 따라 설명을 반환합니다."""
return self.category_description.get(category_code, "카테고리 설명을 찾을 수 없습니다.")
def fetch_image_data(self, url):
"""주어진 URL로부터 이미지 데이터를 가져와 반환합니다."""
response = requests.get(url)
if response.status_code == 200:
# 서버 응답 헤더에서 Content-Type 확인
content_type = response.headers.get('Content-Type', '')
if 'image' in content_type:
return response.content
else:
# Content-Type이 이미지가 아니면, 데이터를 이미지로 변환
try:
image = Image.open(BytesIO(response.content))
with BytesIO() as buffer:
image.save(buffer, 'JPEG') # 예시로 JPEG 포맷을 사용
return buffer.getvalue()
except Exception as e:
print(f"이미지 변환 실패: {e}")
return None
else:
print(f"이미지 다운로드 실패: HTTP {response.status_code}")
return None
def close_browser(self):
"""브라우저 리소스 정리"""
if self.context:
self.context.close()
self.browser.close()
self.playwright.stop()