111 lines
4.7 KiB
Python
111 lines
4.7 KiB
Python
from requests_html import HTMLSession
|
|
import json
|
|
import requests
|
|
|
|
class WebScraper:
|
|
def __init__(self):
|
|
self.session = HTMLSession()
|
|
self.url = "http://kdtj.kipris.or.kr/kdtj/searchLogina.do?method=loginTM#page1"
|
|
self.results = {}
|
|
self.category_description = self.load_category_descriptions('categories.json')
|
|
|
|
def add_category_description(self, category_code):
|
|
"""주어진 카테고리 코드에 따라 설명을 반환합니다."""
|
|
description = self.category_description.get(category_code, "카테고리 설명을 찾을 수 없습니다.")
|
|
return description
|
|
|
|
def load_category_descriptions(self, filename):
|
|
"""JSON 파일에서 카테고리 설명을 로드합니다."""
|
|
with open(filename, 'r', encoding='utf-8') as file:
|
|
return json.load(file)
|
|
|
|
def navigate_to_page(self, url):
|
|
"""지정된 URL로 이동하고 페이지를 로드합니다."""
|
|
response = self.session.get(url)
|
|
response.html.render() # 필요 시 JavaScript 실행
|
|
print("Page loaded successfully.")
|
|
return response
|
|
|
|
def search_for_term(self, term):
|
|
"""검색어로 검색하고 결과를 수집합니다."""
|
|
response = self.navigate_to_page(self.url) # 로그인 페이지 로드
|
|
# await self.page.fill("#queryText", term)
|
|
# 검색어 입력과 폼 제출을 시뮬레이션
|
|
try:
|
|
form = response.html.find('form', containing='검색', first=True)
|
|
if form:
|
|
search_url = form.attrs.get('action')
|
|
if search_url:
|
|
data = {
|
|
'keywordTextarea': term, # form에서 요구하는 필드 이름에 맞추어야 함
|
|
}
|
|
search_response = self.session.post(search_url, data=data)
|
|
search_response.html.render()
|
|
return search_response
|
|
else:
|
|
print("폼의 action URL을 찾을 수 없습니다.")
|
|
else:
|
|
print("검색 폼을 찾을 수 없습니다.")
|
|
except Exception as e:
|
|
print(f"검색 실행 중 오류 발생: {e}")
|
|
|
|
# response.html.render(script=script, reload=False)
|
|
|
|
# # response.html.find('#keywordTextarea', first=True).fill(term)
|
|
# self.page.evaluate(f"document.querySelector('#keywordTextarea').value = '{term}';")
|
|
# print(f"검색어 입력: {term}")
|
|
# input_field = response.html.find('#keywordTextarea', first=True)
|
|
# script = f"document.querySelector('#keywordTextarea').value = '{term}';"
|
|
# response.html.page.evaluate(script) # JavaScript 실행
|
|
|
|
# search_button = response.html.find('.input_btn', first=True)
|
|
# search_button.click()
|
|
response.html.render(wait=10, timeout=20) # 결과 로드를 위해 대기
|
|
|
|
articles = response.html.find('form#listForm section.search_section article')
|
|
if not articles:
|
|
print("검색 결과가 없습니다.")
|
|
return None
|
|
|
|
# Store results in a structured format
|
|
for i, article in enumerate(articles, 1):
|
|
title = article.find('.title', first=True).text
|
|
status = article.find('.status', first=True).text
|
|
image_url = article.find('img', first=True).attrs.get('src', '')
|
|
|
|
self.results[f'result_{i}'] = {
|
|
'title': title,
|
|
'status': status,
|
|
'image_url': image_url
|
|
}
|
|
return self.results
|
|
|
|
def download_image(self, url, applno):
|
|
"""이미지를 다운로드하고 applno를 파일 이름으로 사용하여 저장합니다."""
|
|
response = requests.get(url) # 이미지 URL에 대한 요청
|
|
if response.status_code == 200:
|
|
filename = f"{applno}.jpeg"
|
|
with open(filename, 'wb') as file:
|
|
file.write(response.content)
|
|
print(f"이미지가 성공적으로 저장되었습니다: {filename}")
|
|
else:
|
|
print(f"이미지 다운로드 실패: HTTP {response.status_code}")
|
|
|
|
def fetch_image_data(self, url):
|
|
"""주어진 URL로부터 이미지 데이터를 직접 가져와 반환합니다."""
|
|
response = requests.get(url)
|
|
if response.status_code == 200:
|
|
return response.content
|
|
else:
|
|
print(f"이미지 다운로드 실패: HTTP {response.status_code}")
|
|
return None
|
|
|
|
def close_browser(self):
|
|
"""세션을 종료합니다."""
|
|
self.session.close()
|
|
|
|
# # 사용 예시
|
|
# scraper = WebScraper()
|
|
# results = scraper.search_for_term("특허")
|
|
# print(results)
|