From c4de0f5d8fd139b2be24289a40ba58dc7bf29168 Mon Sep 17 00:00:00 2001 From: Jishnu M Date: Mon, 20 Jan 2025 22:57:10 +0530 Subject: [PATCH] supressed log spaming, minor refactoring --- pypdl/__init__.py | 2 +- pypdl/consumer.py | 4 ++- pypdl/producer.py | 3 +- pypdl/pypdl.py | 74 ++++++++++++++++++++++++++--------------------- pypdl/utils.py | 29 ++++++++++++------- 5 files changed, 66 insertions(+), 46 deletions(-) diff --git a/pypdl/__init__.py b/pypdl/__init__.py index ba90622..6e61bb3 100644 --- a/pypdl/__init__.py +++ b/pypdl/__init__.py @@ -1,3 +1,3 @@ -__version__ = "1.5.1" +__version__ = "1.5.2" from .pypdl import Pypdl diff --git a/pypdl/consumer.py b/pypdl/consumer.py index e29286d..72cf91d 100644 --- a/pypdl/consumer.py +++ b/pypdl/consumer.py @@ -8,6 +8,7 @@ auto_cancel_gather, combine_files, create_segment_table, + check_main_thread_exception, ) @@ -35,14 +36,15 @@ async def process_tasks(self, in_queue, out_queue): self.logger.debug("Consumer %s started", self.id) while True: task = await in_queue.get() - self.logger.debug("Consumer %s received task", self.id) if task is None: break + self.logger.debug("Consumer %s received task", self.id) try: await self._download(task) except asyncio.CancelledError: raise except Exception as e: + check_main_thread_exception(e) self.logger.debug("Task %s failed", self.id) self.logger.exception(e) await out_queue.put([task[0]]) diff --git a/pypdl/producer.py b/pypdl/producer.py index 6bf976c..50864eb 100644 --- a/pypdl/producer.py +++ b/pypdl/producer.py @@ -1,6 +1,6 @@ import asyncio -from .utils import Size, get_filepath, get_range, get_url +from .utils import Size, get_filepath, get_range, get_url, check_main_thread_exception class Producer: @@ -46,6 +46,7 @@ async def enqueue_tasks(self, in_queue: asyncio.queues, out_queue): except asyncio.CancelledError: raise except Exception as e: + check_main_thread_exception(e) self.logger.debug( f"Failed to get header for {task}, skipping task" ) diff --git a/pypdl/pypdl.py b/pypdl/pypdl.py index 072a684..16d476c 100644 --- a/pypdl/pypdl.py +++ b/pypdl/pypdl.py @@ -104,6 +104,7 @@ def start( :param multisegment: Whether to download in multiple segments. :param segments: Number of segments if multi-segmented. :param retries: Number of retries for failed downloads. + :param mirrors: List of mirrors (callable/string) or callable/string. :param overwrite: Overwrite existing files if True. :param speed_limit: Limit download speed in MB/s if > 0. :param etag_validation: Validate server-provided ETag if True. @@ -184,33 +185,40 @@ async def _download_tasks(self, tasks_dict, display, clear_terminal): self._producer_queue = asyncio.Queue(self.total_task) self._consumer_queue = asyncio.Queue(self.total_task) - async with aiohttp.ClientSession() as session: - self._producer = Producer(session, self._logger, tasks_dict) - producer_task = asyncio.create_task( - self._producer.enqueue_tasks(self._producer_queue, self._consumer_queue) - ) - coroutines.append(producer_task) - await self._producer_queue.put(list(tasks_dict)) - - for _id in range(self._max_concurrent): - consumer = Consumer(session, self._logger, _id) - consumer_task = asyncio.create_task( - consumer.process_tasks( - self._consumer_queue, - self._producer_queue, + try: + async with aiohttp.ClientSession() as session: + self._producer = Producer(session, self._logger, tasks_dict) + producer_task = asyncio.create_task( + self._producer.enqueue_tasks( + self._producer_queue, self._consumer_queue ) ) - coroutines.append(consumer_task) - self._consumers.append(consumer) - - self._logger.debug("Starting producer and consumer tasks") - self._pool.submit(self._progress_monitor, display, clear_terminal) - - res = await utils.auto_cancel_gather(*coroutines) - self.failed.extend(res.pop(0)) - for success in res: - self.success.extend(success) - await asyncio.sleep(0.5) + coroutines.append(producer_task) + await self._producer_queue.put(list(tasks_dict)) + + for _id in range(self._max_concurrent): + consumer = Consumer(session, self._logger, _id) + consumer_task = asyncio.create_task( + consumer.process_tasks( + self._consumer_queue, + self._producer_queue, + ) + ) + coroutines.append(consumer_task) + self._consumers.append(consumer) + + self._logger.debug("Starting producer and consumer tasks") + self._pool.submit(self._progress_monitor, display, clear_terminal) + + res = await utils.auto_cancel_gather(*coroutines) + self.failed.extend(res.pop(0)) + for success in res: + self.success.extend(success) + await asyncio.sleep(0.5) + except utils.MainThreadException as e: + self._interrupt.set() + self._logger.exception(e) + raise e self.time_spent = time.time() - start_time if display: @@ -264,13 +272,6 @@ def _progress_monitor(self, display, clear_terminal): time.sleep(interval) self._logger.debug("exiting progress monitor") - async def _completed(self): - self._logger.debug("All downloads completed") - await self._producer_queue.put(None) - for _ in range(self._max_concurrent): - await self._consumer_queue.put(None) - self.completed = True - def _calc_values(self, recent_queue, interval): self.size = self._producer.size self.current_size = sum(consumer.size for consumer in self._consumers) @@ -307,6 +308,13 @@ def _calc_values(self, recent_queue, interval): ) future.result() + async def _completed(self): + self._logger.debug("All downloads completed") + await self._producer_queue.put(None) + for _ in range(self._max_concurrent): + await self._consumer_queue.put(None) + self.completed = True + def _display(self): utils.cursor_up() whitespace = " " @@ -318,7 +326,7 @@ def _display(self): else: info1 = "" - info2 = f"Size: {utils.to_mb(self.size):.2f} MB, Speed: {self.speed:.2f} MB/s, ETA: { utils.seconds_to_hms(self.eta)}" + info2 = f"Size: {utils.to_mb(self.size):.2f} MB, Speed: {self.speed:.2f} MB/s, ETA: {utils.seconds_to_hms(self.eta)}" print(progress_bar + info1 + info2 + whitespace * 35) else: if self.total_task > 1: diff --git a/pypdl/utils.py b/pypdl/utils.py index 9c00ff6..5ab9562 100644 --- a/pypdl/utils.py +++ b/pypdl/utils.py @@ -19,6 +19,20 @@ CHUNKSIZE = BLOCKSIZE * BLOCKS +class MainThreadException(Exception): + pass + + +class Size: + def __init__(self, start: int, end: int) -> None: + self.start = start + self.end = end + self.value = end - start + 1 # since range is inclusive[0-99 -> 100] + + def __repr__(self) -> str: + return str(self.value) + + class Task: def __init__( self, @@ -105,16 +119,6 @@ def __repr__(self) -> str: return f"Task(url={self.url}, file_path={self.file_path}, tries={self.tries}, size={self.size})" -class Size: - def __init__(self, start: int, end: int) -> None: - self.start = start - self.end = end - self.value = end - start + 1 # since range is inclusive[0-99 -> 100] - - def __repr__(self) -> str: - return str(self.value) - - class TEventLoop: """A Threaded Eventloop""" @@ -279,6 +283,11 @@ def cursor_up() -> None: sys.stdout.flush() +def check_main_thread_exception(e: Exception) -> None: + if str(e) == "cannot schedule new futures after shutdown": + raise MainThreadException from e + + async def get_url(url: Union[str, Callable]) -> str: if callable(url): if asyncio.iscoroutinefunction(url):