Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(distro): add new queue per distro #2

Merged
merged 1 commit into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions arq/connections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import functools
import logging
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta
from operator import attrgetter
Expand Down Expand Up @@ -85,6 +86,9 @@ def __repr__(self) -> str:
else:
BaseRedis = Redis

enqueued_jobs = 0



class ArqRedis(BaseRedis):
"""
Expand Down Expand Up @@ -126,6 +130,7 @@ async def enqueue_job(
_defer_by: Union[None, int, float, timedelta] = None,
_expires: Union[None, int, float, timedelta] = None,
_job_try: Optional[int] = None,
distribution: str = None, # example 5:2
**kwargs: Any,
) -> Optional[Job]:
"""
Expand All @@ -143,8 +148,18 @@ async def enqueue_job(
:param kwargs: any keyword arguments to pass to the function
:return: :class:`arq.jobs.Job` instance or ``None`` if a job with this ID already exists
"""
global enqueued_jobs

if _queue_name is None:
_queue_name = self.default_queue_name

if distribution:
queue_index = self._get_queue_index(distribution)
_queue_name = f'{_queue_name}_{queue_index}'
if enqueued_jobs >= sys.maxsize:
enqueued_jobs = 0
enqueued_jobs += 1

job_id = _job_id or uuid4().hex
job_key = job_key_prefix + job_id
if _defer_until and _defer_by:
Expand Down Expand Up @@ -180,6 +195,20 @@ async def enqueue_job(
return None
return Job(job_id, redis=self, _queue_name=_queue_name, _deserializer=self.job_deserializer)

def _get_queue_index(self, distribution) -> int:
ratios = list(map(lambda x: int(x), distribution.split(':')))
ratios_sum = sum(ratios)
up_to_ratio = ratios[0]
queue_index = 0
for i, _ in enumerate(ratios, 1):
if enqueued_jobs % ratios_sum >= up_to_ratio:
up_to_ratio = up_to_ratio + ratios[i]
queue_index = i
else:
break

return queue_index

async def _get_job_result(self, key: bytes) -> JobResult:
job_id = key[len(result_key_prefix) :].decode()
job = Job(job_id, self, _deserializer=self.job_deserializer)
Expand Down
5 changes: 5 additions & 0 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def __init__(
self,
functions: Sequence[Union[Function, 'WorkerCoroutine']] = (),
*,
distribution_index: int = None,
queue_name: Optional[str] = default_queue_name,
cron_jobs: Optional[Sequence[CronJob]] = None,
redis_settings: Optional[RedisSettings] = None,
Expand Down Expand Up @@ -224,6 +225,10 @@ def __init__(
queue_name = redis_pool.default_queue_name
else:
raise ValueError('If queue_name is absent, redis_pool must be present.')

if distribution_index is not None:
queue_name = f'{queue_name}_{distribution_index}'

self.queue_name = queue_name
self.cron_jobs: List[CronJob] = []
if cron_jobs is not None:
Expand Down
Loading