Skip to content

Commit

Permalink
supressed log spaming, minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mjishnu committed Jan 20, 2025
1 parent b0d7204 commit c4de0f5
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 46 deletions.
2 changes: 1 addition & 1 deletion pypdl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = "1.5.1"
__version__ = "1.5.2"

from .pypdl import Pypdl
4 changes: 3 additions & 1 deletion pypdl/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
auto_cancel_gather,
combine_files,
create_segment_table,
check_main_thread_exception,
)


Expand Down Expand Up @@ -35,14 +36,15 @@ async def process_tasks(self, in_queue, out_queue):
self.logger.debug("Consumer %s started", self.id)
while True:
task = await in_queue.get()
self.logger.debug("Consumer %s received task", self.id)
if task is None:
break
self.logger.debug("Consumer %s received task", self.id)
try:
await self._download(task)
except asyncio.CancelledError:
raise
except Exception as e:
check_main_thread_exception(e)
self.logger.debug("Task %s failed", self.id)
self.logger.exception(e)
await out_queue.put([task[0]])
Expand Down
3 changes: 2 additions & 1 deletion pypdl/producer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio

from .utils import Size, get_filepath, get_range, get_url
from .utils import Size, get_filepath, get_range, get_url, check_main_thread_exception


class Producer:
Expand Down Expand Up @@ -46,6 +46,7 @@ async def enqueue_tasks(self, in_queue: asyncio.queues, out_queue):
except asyncio.CancelledError:
raise
except Exception as e:
check_main_thread_exception(e)
self.logger.debug(
f"Failed to get header for {task}, skipping task"
)
Expand Down
74 changes: 41 additions & 33 deletions pypdl/pypdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def start(
:param multisegment: Whether to download in multiple segments.
:param segments: Number of segments if multi-segmented.
:param retries: Number of retries for failed downloads.
:param mirrors: List of mirrors (callable/string) or callable/string.
:param overwrite: Overwrite existing files if True.
:param speed_limit: Limit download speed in MB/s if > 0.
:param etag_validation: Validate server-provided ETag if True.
Expand Down Expand Up @@ -184,33 +185,40 @@ async def _download_tasks(self, tasks_dict, display, clear_terminal):
self._producer_queue = asyncio.Queue(self.total_task)
self._consumer_queue = asyncio.Queue(self.total_task)

async with aiohttp.ClientSession() as session:
self._producer = Producer(session, self._logger, tasks_dict)
producer_task = asyncio.create_task(
self._producer.enqueue_tasks(self._producer_queue, self._consumer_queue)
)
coroutines.append(producer_task)
await self._producer_queue.put(list(tasks_dict))

for _id in range(self._max_concurrent):
consumer = Consumer(session, self._logger, _id)
consumer_task = asyncio.create_task(
consumer.process_tasks(
self._consumer_queue,
self._producer_queue,
try:
async with aiohttp.ClientSession() as session:
self._producer = Producer(session, self._logger, tasks_dict)
producer_task = asyncio.create_task(
self._producer.enqueue_tasks(
self._producer_queue, self._consumer_queue
)
)
coroutines.append(consumer_task)
self._consumers.append(consumer)

self._logger.debug("Starting producer and consumer tasks")
self._pool.submit(self._progress_monitor, display, clear_terminal)

res = await utils.auto_cancel_gather(*coroutines)
self.failed.extend(res.pop(0))
for success in res:
self.success.extend(success)
await asyncio.sleep(0.5)
coroutines.append(producer_task)
await self._producer_queue.put(list(tasks_dict))

for _id in range(self._max_concurrent):
consumer = Consumer(session, self._logger, _id)
consumer_task = asyncio.create_task(
consumer.process_tasks(
self._consumer_queue,
self._producer_queue,
)
)
coroutines.append(consumer_task)
self._consumers.append(consumer)

self._logger.debug("Starting producer and consumer tasks")
self._pool.submit(self._progress_monitor, display, clear_terminal)

res = await utils.auto_cancel_gather(*coroutines)
self.failed.extend(res.pop(0))
for success in res:
self.success.extend(success)
await asyncio.sleep(0.5)
except utils.MainThreadException as e:
self._interrupt.set()
self._logger.exception(e)
raise e

self.time_spent = time.time() - start_time
if display:
Expand Down Expand Up @@ -264,13 +272,6 @@ def _progress_monitor(self, display, clear_terminal):
time.sleep(interval)
self._logger.debug("exiting progress monitor")

async def _completed(self):
self._logger.debug("All downloads completed")
await self._producer_queue.put(None)
for _ in range(self._max_concurrent):
await self._consumer_queue.put(None)
self.completed = True

def _calc_values(self, recent_queue, interval):
self.size = self._producer.size
self.current_size = sum(consumer.size for consumer in self._consumers)
Expand Down Expand Up @@ -307,6 +308,13 @@ def _calc_values(self, recent_queue, interval):
)
future.result()

async def _completed(self):
self._logger.debug("All downloads completed")
await self._producer_queue.put(None)
for _ in range(self._max_concurrent):
await self._consumer_queue.put(None)
self.completed = True

def _display(self):
utils.cursor_up()
whitespace = " "
Expand All @@ -318,7 +326,7 @@ def _display(self):
else:
info1 = ""

info2 = f"Size: {utils.to_mb(self.size):.2f} MB, Speed: {self.speed:.2f} MB/s, ETA: { utils.seconds_to_hms(self.eta)}"
info2 = f"Size: {utils.to_mb(self.size):.2f} MB, Speed: {self.speed:.2f} MB/s, ETA: {utils.seconds_to_hms(self.eta)}"
print(progress_bar + info1 + info2 + whitespace * 35)
else:
if self.total_task > 1:
Expand Down
29 changes: 19 additions & 10 deletions pypdl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@
CHUNKSIZE = BLOCKSIZE * BLOCKS


class MainThreadException(Exception):
pass


class Size:
def __init__(self, start: int, end: int) -> None:
self.start = start
self.end = end
self.value = end - start + 1 # since range is inclusive[0-99 -> 100]

def __repr__(self) -> str:
return str(self.value)


class Task:
def __init__(
self,
Expand Down Expand Up @@ -105,16 +119,6 @@ def __repr__(self) -> str:
return f"Task(url={self.url}, file_path={self.file_path}, tries={self.tries}, size={self.size})"


class Size:
def __init__(self, start: int, end: int) -> None:
self.start = start
self.end = end
self.value = end - start + 1 # since range is inclusive[0-99 -> 100]

def __repr__(self) -> str:
return str(self.value)


class TEventLoop:
"""A Threaded Eventloop"""

Expand Down Expand Up @@ -279,6 +283,11 @@ def cursor_up() -> None:
sys.stdout.flush()


def check_main_thread_exception(e: Exception) -> None:
if str(e) == "cannot schedule new futures after shutdown":
raise MainThreadException from e


async def get_url(url: Union[str, Callable]) -> str:
if callable(url):
if asyncio.iscoroutinefunction(url):
Expand Down

0 comments on commit c4de0f5

Please sign in to comment.