229 lines
12 KiB
Python
229 lines
12 KiB
Python
import asyncio, aiofiles, aiohttp
|
|
from playwright.async_api import async_playwright
|
|
import random, requests, json
|
|
from PIL import Image
|
|
from io import BytesIO
|
|
|
|
class WebScraper:
|
|
def __init__(self):
|
|
self.results = {}
|
|
self.playwright = None
|
|
self.browser = None
|
|
self.context = None
|
|
self.page = None
|
|
filename = 'categories.json'
|
|
self.category_description = self.load_category_descriptions(filename)
|
|
self.url = "http://kdtj.kipris.or.kr/kdtj/searchLogina.do?method=loginTM#page1"
|
|
self.is_page_loaded = False
|
|
|
|
async def setup_browser(self):
|
|
"""브라우저 설정 및 인스턴스 생성"""
|
|
self.playwright = await async_playwright().start()
|
|
self.browser = await self.playwright.chromium.launch(headless=False)
|
|
self.context = await self.browser.new_context(
|
|
user_agent=random.choice([
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.0.0",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 12_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 OPR/85.0.0.0",
|
|
]))
|
|
# ]),
|
|
# cache_enabled=True # 캐시 활성화
|
|
# )
|
|
self.page = await self.context.new_page()
|
|
await self.page.route('**/*.{png,jpg,jpeg,svg,gif,css}', lambda route: route.abort())
|
|
|
|
try:
|
|
await self.page.goto(self.url, wait_until='networkidle')
|
|
self.is_page_loaded = True
|
|
print("Page loaded successfully.")
|
|
except Exception as e:
|
|
print(f"Failed to load the page: {e}")
|
|
self.is_page_loaded = False # 로드 실패 처리
|
|
|
|
|
|
async def navigate_to_page(self, url):
|
|
try:
|
|
await self.page.goto(url, wait_until='networkidle')
|
|
self.is_page_loaded = True
|
|
print("Page loaded successfully.")
|
|
except Exception as e:
|
|
print(f"Failed to load the page: {e}")
|
|
self.is_page_loaded = False # 로드 실패 처리
|
|
|
|
|
|
async def search_for_term(self, term):
|
|
"""검색어로 비동기적으로 검색하고 결과를 수집합니다."""
|
|
try:
|
|
self.results = {}
|
|
# await self.page.fill("#queryText", term)
|
|
await self.page.fill("#keywordTextarea", term)
|
|
|
|
print(f"검색어 입력 : {term}")
|
|
await self.page.click(".input_btn")
|
|
print(f"검색버튼 클릭")
|
|
|
|
# JavaScript에 의해 결과가 동적으로 로드되기를 기다립니다.
|
|
loaded = await self.page.wait_for_function(
|
|
"document.querySelector('form#listForm section.search_section article') != null"
|
|
)
|
|
if not loaded:
|
|
print("검색 결과가 시간 내에 로드되지 않았습니다.")
|
|
return None
|
|
print(f"결과가 동적으로 로드되기를 기다림 : {loaded}")
|
|
|
|
# 검색 결과가 없는지 확인
|
|
nodata_info = await self.page.query_selector(".nodata_info")
|
|
if nodata_info:
|
|
print("검색 결과가 없습니다.")
|
|
return None
|
|
|
|
# 결과 로딩이 확인된 후, 실제 요소를 수집
|
|
await self.page.wait_for_selector("form#listForm section.search_section", state="visible", timeout=10000)
|
|
total_count = await self.page.text_content("form#listForm section.search_section div p span.total")
|
|
total_count = total_count.strip()
|
|
total_count = int(total_count.replace(',', ''))
|
|
print(f"total_count : {total_count}")
|
|
|
|
if total_count:
|
|
self.results['total_count'] = total_count
|
|
articles = await self.page.query_selector_all("form#listForm section.search_section article")
|
|
print(f"articles : {len(articles)} 개")
|
|
for i, article in enumerate(articles):
|
|
id_and_name_element = await article.query_selector("div:nth-child(1) span input[type='checkbox']")
|
|
trademark_name = await id_and_name_element.get_attribute("title") if id_and_name_element else "No name found"
|
|
print(f"trademark_name : {trademark_name}")
|
|
|
|
applno = await id_and_name_element.get_attribute("value") if id_and_name_element else "No ID found"
|
|
trademark_image_url_by_id = (
|
|
f"http://kdtj.kipris.or.kr/kdtj/remoteFile.do?method=bigImageTM&applno={applno}&no={applno}_tm000001.jpg"
|
|
if applno else None
|
|
)
|
|
print(f"trademark_image_url_by_id : {trademark_image_url_by_id}")
|
|
|
|
img_element = await article.query_selector("div:nth-child(2) div a img")
|
|
trademark_image = await img_element.get_attribute("src") if img_element else "No image found"
|
|
print(f"trademark_image : {trademark_image}")
|
|
|
|
admin_status_element = await article.query_selector("div:nth-child(1) h1 a:nth-child(1) span")
|
|
admin_status = await admin_status_element.text_content() if admin_status_element else "No status found"
|
|
print(f"admin_status : {admin_status}")
|
|
|
|
product_category_element = await article.query_selector("div:nth-child(2) ul li:nth-child(1) a span")
|
|
product_category = await product_category_element.text_content() if product_category_element else "No category found"
|
|
category_desc = self.add_category_description(product_category) if product_category else "No category description"
|
|
print(f"product_category : {product_category}")
|
|
print(f"category_desc : {category_desc}")
|
|
|
|
applicant_element = await article.query_selector("div:nth-child(2) ul li:nth-child(2) span[title]")
|
|
applicant = await applicant_element.get_attribute("title") if applicant_element else "No applicant found"
|
|
print(f"applicant : {applicant}")
|
|
|
|
publication_date_element = await article.query_selector("div:nth-child(2) ul li:nth-child(8)")
|
|
if publication_date_element:
|
|
publication_date_content = await publication_date_element.text_content()
|
|
publication_date = publication_date_content.strip() if publication_date_content else "No publication date found"
|
|
else:
|
|
publication_date = "No publication date found"
|
|
print(f"publication_date : {publication_date}")
|
|
|
|
registration_date_element = await article.query_selector("div:nth-child(2) ul li:nth-child(6)")
|
|
if registration_date_element:
|
|
registration_date_content = await registration_date_element.text_content()
|
|
registration_date = registration_date_content.strip() if registration_date_content else "No registration date found"
|
|
else:
|
|
registration_date = "No registration date found"
|
|
print(f"registration_date : {registration_date}")
|
|
|
|
if not (admin_status == "소멸" or admin_status == "거절"):
|
|
self.results[f"result_{i+1}"] = {
|
|
"ID": applno,
|
|
"title": trademark_name,
|
|
"admin_status": admin_status,
|
|
"imageURL": trademark_image,
|
|
"IDimageURL": trademark_image_url_by_id,
|
|
"product_category": product_category,
|
|
"category_description": category_desc,
|
|
"applicant": applicant,
|
|
"publication_date": publication_date,
|
|
"registration_date": registration_date,
|
|
}
|
|
# print(f"results : {self.results}")
|
|
await self.navigate_to_page(self.url)
|
|
return self.results
|
|
else:
|
|
print("No total count element found, possibly incorrect selector or page structure has changed.")
|
|
except Exception as e:
|
|
print(f"오류 발생 : {e}")
|
|
return None
|
|
|
|
async def download_image(self, url, applno):
|
|
"""이미지를 비동기적으로 다운로드하고 applno를 파일 이름으로 사용하여 저장합니다."""
|
|
async with aiohttp.ClientSession() as session:
|
|
print(f"download_image session Start!!")
|
|
async with session.get(url) as response:
|
|
print(f"download_image url : {url}")
|
|
if response.status == 200:
|
|
filename = f"{applno}.jpeg"
|
|
async with aiofiles.open(filename, 'wb') as file:
|
|
content = await response.read()
|
|
await file.write(content)
|
|
print(f"이미지가 성공적으로 저장되었습니다: {filename}")
|
|
else:
|
|
print(f"이미지 다운로드 실패: HTTP {response.status}")
|
|
|
|
async def fetch_image_data(self, url):
|
|
"""주어진 URL로부터 이미지 데이터를 비동기적으로 가져와 반환합니다."""
|
|
async with aiohttp.ClientSession() as session:
|
|
# print(f"download_image session Start!!")
|
|
async with session.get(url) as response:
|
|
print(f"download_image url : {url}")
|
|
if response.status == 200:
|
|
# print(f"response : {response}")
|
|
content_type = response.headers.get('Content-Type', '') # await 제거
|
|
print(f"content_type : {content_type}")
|
|
if 'image' in content_type or 'octet-stream' in content_type:
|
|
# print(f"image content type or octet-stream : {content_type}")
|
|
return await response.read()
|
|
else:
|
|
try:
|
|
# Content-Type이 이미지가 아니면, 데이터를 이미지로 변환
|
|
data = await response.read()
|
|
# print(f"Content-Type이 이미지가 아님 : {data}")
|
|
image = Image.open(BytesIO(data))
|
|
with BytesIO() as buffer:
|
|
image.save(buffer, 'JPEG')
|
|
print(f"image 를 JPEG로 저장")
|
|
return buffer.getvalue()
|
|
except Exception as e:
|
|
print(f"이미지 변환 실패: {e}")
|
|
return None
|
|
else:
|
|
print(f"이미지 다운로드 실패: HTTP {response.status}")
|
|
return None
|
|
|
|
# async def load_category_descriptions(self, filename):
|
|
# """JSON 파일에서 카테고리 설명을 비동기적으로 로드합니다."""
|
|
# async with aiofiles.open(filename, 'r', encoding='utf-8') as file:
|
|
# content = await file.read()
|
|
# print(f"JSON 파일에서 카테고리 설명을 비동기적으로 로드합니다: {content}")
|
|
# return json.loads(content)
|
|
|
|
def load_category_descriptions(self, filename):
|
|
"""JSON 파일에서 카테고리 설명을 로드합니다."""
|
|
with open(filename, 'r', encoding='utf-8') as file:
|
|
return json.load(file)
|
|
|
|
def add_category_description(self, category_code):
|
|
"""주어진 카테고리 코드에 따라 설명을 반환합니다."""
|
|
print(f"add_category_description => category_code: {category_code}")
|
|
return self.category_description.get(category_code, "카테고리 설명을 찾을 수 없습니다.")
|
|
|
|
|
|
async def close_browser(self):
|
|
if self.context:
|
|
await self.context.close() # context를 닫습니다.
|
|
await self.browser.close() # browser를 닫습니다.
|
|
await self.playwright.stop() # playwright 세션을 종료합니다.
|