DeliveryFeeCalc/src/kiprisThread.py

76 lines
2.9 KiB
Python

from PyQt5.QtCore import QThread, pyqtSignal, QObject
import time
from src.web_scraper_async import *
class APISearchWorker(QObject):
finished = pyqtSignal(list, float)
def __init__(self, kiprisObject, keyword, set_status):
super().__init__()
self.kiprisObject = kiprisObject
self.keyword = keyword
self.set_status = set_status
def run(self):
start_time = time.time() # 검색 시작 시간
print(f"Search keyword : [{self.keyword}], self.set_status : [{self.set_status}]")
result = self.kiprisObject.run(self.keyword, self.set_status)
print(f"Search finished | result \n [{result}]")
elapsed_time = time.time() - start_time # 경과 시간 계산
self.finished.emit(result, elapsed_time)
class SearchThread(QThread):
def __init__(self, worker):
super().__init__()
self.worker = worker
self.worker.moveToThread(self)
self.worker.finished.connect(self.handle_finished)
def run(self):
self.worker.run()
# self.quit()
self.exec_()
def start_search(self):
self.start()
def handle_finished(self, result, elapsed_time):
# 검색 작업이 완료되면 이 시그널이 발생합니다.
print(f"Search finished : [{elapsed_time}]초 경과")
class AsyncWebSearchWorker(QObject):
finished = pyqtSignal(object, float) # 결과와 소요 시간을 전달할 신호 정의
def __init__(self, term, set_status):
super().__init__()
self.term = term
self.set_status = set_status
self.scraper = WebScraper() # WebScraper 인스턴스 생성
self.thread = QThread() # 별도의 스레드 생성
self.moveToThread(self.thread) # 이 객체를 새 스레드로 이동
self.thread.started.connect(self.run) # 스레드가 시작되면 run 메서드 실행
self.start_time = None
def start(self):
self.thread.start() # 스레드 시작
def run(self):
self.start_time = time.time() # 검색 시작 시간 기록
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
loop.run_until_complete(self.async_run())
async def async_run(self):
try:
await self.scraper.setup_browser() # 브라우저 설정
result = await self.scraper.run(self.term, self.set_status) # 검색어를 사용하여 검색 실행
elapsed_time = time.time() - self.start_time # 소요 시간 계산
self.finished.emit(result, elapsed_time) # 작업 완료 신호 전송
except Exception as e:
print(f"Error in AsyncWebSearchWorker: {e}")
self.finished.emit(None, 0) # 에러 발생 시 None 전송
finally:
await self.scraper.close_Kipris() # 리소스 정리
self.thread.quit() # 스레드 종료