Skip to content

Commit

Permalink
refactor(scheduler): strict mode (#180)
Browse files Browse the repository at this point in the history
- improvement on SchedulerInterface::getDueTasks, the scheduler now check the current date in a strict approach

- Improvement on scheduler:consume to use lazy and strict options

- Improvement on the worker to handle strict call
  • Loading branch information
Guikingone authored Aug 31, 2021
1 parent 75558a7 commit 6ea8bb5
Show file tree
Hide file tree
Showing 16 changed files with 626 additions and 162 deletions.
18 changes: 17 additions & 1 deletion doc/command.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,31 @@ This command allows using multiple options to filter consumed tasks (each one ca
- `--limit`: Define the maximum amount of due tasks to consume.
- `--time-limit`: Define the maximum amount in seconds before the worker stop.
- `--failure-limit`: Define the maximum amount of tasks that can fails during consumation.
- `--wait`: Set the worker to a "infinite" wait loop where tasks are consumed then the worker wait until next minute.
- `--wait`: Set the worker to an "infinite" wait loop where tasks are consumed then the worker wait until next minute.
- `--force`: Force the worker to wait for tasks even if no tasks are currently available.
- `--lazy`: Force the scheduler to retrieve the tasks using lazy-loading.
- `--strict`: Force the scheduler to check the date before retrieving the tasks.

### Extra informations

- The scheduler will only return tasks that haven't been executed since the last minute.

- The command filter tasks returned by the scheduler by checking if each task is not paused
(the worker will do this if the `--wait` option is set).

- The output of each executed task can be displayed if the `-vv` option is used.

- By default, when using `--wait`, the command will stop if no tasks can be found, thanks to `--force`,
you can ask the worker to wait without stopping the command for the next minute and check again the tasks.

- When using `--lazy`, tasks are retrieve using a `LazyTaskList`, this can improve performances
BUT keep in mind that using this approach can trigger edge cases when checking due tasks.

- When using `--strict`, keep in mind that `SchedulerInterface::getDueTasks()` is called twice,
first, the command will check if tasks can be found using a strict comparison on the current date
then the worker will retrieve the tasks using the same approach, if the worker didn't receive any tasks,
it will start the sleeping phase until next minute.

## Executing tasks

_Introduced in `0.5`_
Expand Down
16 changes: 10 additions & 6 deletions doc/scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ The scheduler provides several methods to help interact with tasks during the wh
if `true` is passed, a [LazyTaskList](../src/Task/LazyTaskList.php) is returned.

- `getDueTasks`: Return the tasks that are dues regarding the current date (thanks to each task expression),
a [TaskList](../src/Task/TaskList.php) is returned.
If `true` is passed, the due tasks are returned using a [LazyTaskList](../src/Task/LazyTaskList.php).
This method can lock each tasks before returning it (using the `$lock` argument),
the idea is to prevent a concurrent usage, keep in mind that the lock factory
[must be able to handle the serialization](https://symfony.com/doc/current/components/lock.html#serializing-locks)
of the key to use this feature.
a [TaskList](../src/Task/TaskList.php) is returned:

```text
If "true" is passed as the first argument, the due tasks are returned using a LazyTaskList.
If "true" is passed as the second argument, the tasks are retrieved only if the current scheduler date
is equal to the machine / os date regarding the seconds precision.
Example: If the call is performed at 10:00:00 and the synchronized date does not match exactly, the tasks aren't retrieved.
```

- `next`: Return the next due task, if none, an exception is thrown.
If `true` is used, the due tasks are retrieved using a [LazyTaskList](../src/Task/LazyTaskList.php).
Expand Down
17 changes: 14 additions & 3 deletions src/Command/ConsumeTasksCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ protected function configure(): void
new InputOption('failure-limit', 'f', InputOption::VALUE_REQUIRED, 'Limit the amount of task allowed to fail'),
new InputOption('wait', 'w', InputOption::VALUE_NONE, 'Set the worker to wait for tasks every minutes'),
new InputOption('force', null, InputOption::VALUE_NONE, 'Force the worker to wait for tasks even if no tasks are currently available'),
new InputOption('lazy', null, InputOption::VALUE_NONE, 'Force the scheduler to retrieve the tasks using lazy-loading'),
new InputOption('strict', null, InputOption::VALUE_NONE, 'Force the scheduler to check the date before retrieving the tasks'),
])
->setHelp(
<<<'EOF'
Expand All @@ -93,6 +95,12 @@ protected function configure(): void
Use the --force option to force the worker to wait for tasks every minutes even if no tasks are currently available:
<info>php %command.full_name% --force</info>
Use the --lazy option to force the scheduler to retrieve the tasks using lazy-loading:
<info>php %command.full_name% --lazy</info>
Use the --strict option to force the scheduler to check the date before retrieving the tasks:
<info>php %command.full_name% --strict</info>
EOF
)
;
Expand All @@ -109,8 +117,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int

$wait = $input->getOption('wait');
$force = $input->getOption('force');
$lazy = $input->getOption('lazy');
$strict = $input->getOption('strict');

$dueTasks = $this->scheduler->getDueTasks()->filter(fn (TaskInterface $task): bool => !$task instanceof ProbeTask);
$dueTasks = $this->scheduler->getDueTasks(true === $lazy, true === $strict)->filter(fn (TaskInterface $task): bool => !$task instanceof ProbeTask);
if (0 === $dueTasks->count() && false === $wait) {
$symfonyStyle->warning('No due tasks found');

Expand All @@ -132,7 +142,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$stopOptions = [];

if (null !== $limit = $input->getOption('limit')) {
$stopOptions[] = sprintf('%s tasks has been consumed', $limit);
$stopOptions[] = sprintf('%s task%s %s been consumed', $limit, (int) $limit > 1 ? 's' : '', (int) $limit > 1 ? 'have' : 'has');
$this->eventDispatcher->addSubscriber(new StopWorkerOnTaskLimitSubscriber((int) $limit, $this->logger));
}

Expand All @@ -142,7 +152,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
}

if (null !== $failureLimit = $input->getOption('failure-limit')) {
$stopOptions[] = sprintf('%d task%s have failed', $failureLimit, (int) $failureLimit > 1 ? 's' : '');
$stopOptions[] = sprintf('%d task%s %s failed', $failureLimit, (int) $failureLimit > 1 ? 's' : '', (int) $failureLimit > 1 ? 'have' : 'has');
$this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitSubscriber((int) $failureLimit, $this->logger));
}

Expand Down Expand Up @@ -174,6 +184,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

try {
$this->worker->execute([
'mustStrictlyCheckDate' => true === $strict,
'sleepUntilNextMinute' => true === $wait,
]);
} catch (Throwable $throwable) {
Expand Down
2 changes: 1 addition & 1 deletion src/EventListener/TaskLifecycleSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public function onTaskExecuted(TaskExecutedEvent $taskExecutedEvent): void

public function onTaskFailed(TaskFailedEvent $taskFailedEvent): void
{
$this->logger->info('A task execution has failed', [
$this->logger->error('A task execution has failed', [
'task' => $taskFailedEvent->getTask()->getTask()->getName(),
]);
}
Expand Down
4 changes: 2 additions & 2 deletions src/LazyScheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ public function getTasks(bool $lazy = false): TaskListInterface
/**
* {@inheritdoc}
*/
public function getDueTasks(bool $lazy = false): TaskListInterface
public function getDueTasks(bool $lazy = false, bool $strict = false): TaskListInterface
{
$this->initialize();

return $this->scheduler->getDueTasks($lazy);
return $this->scheduler->getDueTasks($lazy, $strict);
}

/**
Expand Down
28 changes: 18 additions & 10 deletions src/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use SchedulerBundle\Messenger\TaskToYieldMessage;
use SchedulerBundle\Middleware\SchedulerMiddlewareStack;
use SchedulerBundle\Task\LazyTask;
use SchedulerBundle\Task\TaskList;
use Symfony\Component\Messenger\MessageBusInterface;
use SchedulerBundle\Event\SchedulerRebootedEvent;
use SchedulerBundle\Event\TaskScheduledEvent;
Expand Down Expand Up @@ -161,10 +162,14 @@ public function getTasks(bool $lazy = false): TaskListInterface
/**
* {@inheritdoc}
*/
public function getDueTasks(bool $lazy = false): TaskListInterface
public function getDueTasks(bool $lazy = false, bool $strict = false): TaskListInterface
{
$synchronizedCurrentDate = $this->getSynchronizedCurrentDate();

if ($synchronizedCurrentDate->format('s') !== '00' && $strict) {
return new TaskList();
}

$dueTasks = $this->getTasks($lazy)->filter(function (TaskInterface $task) use ($synchronizedCurrentDate): bool {
$timezone = $task->getTimezone() ?? $this->getTimezone();
$lastExecution = $task->getLastExecution();
Expand All @@ -181,28 +186,31 @@ public function getDueTasks(bool $lazy = false): TaskListInterface
});

return $dueTasks->filter(function (TaskInterface $task) use ($synchronizedCurrentDate): bool {
if ($task->getExecutionStartDate() instanceof DateTimeImmutable && $task->getExecutionEndDate() instanceof DateTimeImmutable) {
if ($task->getExecutionStartDate() === $synchronizedCurrentDate) {
return $task->getExecutionEndDate() > $synchronizedCurrentDate;
$executionStartDate = $task->getExecutionStartDate();
$executionEndDate = $task->getExecutionEndDate();

if ($executionStartDate instanceof DateTimeImmutable && $executionEndDate instanceof DateTimeImmutable) {
if ($executionStartDate === $synchronizedCurrentDate) {
return $executionEndDate > $synchronizedCurrentDate;
}

if ($task->getExecutionStartDate() < $synchronizedCurrentDate) {
return $task->getExecutionEndDate() > $synchronizedCurrentDate;
if ($executionStartDate < $synchronizedCurrentDate) {
return $executionEndDate > $synchronizedCurrentDate;
}

return false;
}

if ($task->getExecutionStartDate() instanceof DateTimeImmutable) {
if ($executionStartDate instanceof DateTimeImmutable) {
if ($task->getExecutionStartDate() === $synchronizedCurrentDate) {
return true;
}

return $task->getExecutionStartDate() < $synchronizedCurrentDate;
return $executionStartDate < $synchronizedCurrentDate;
}

if ($task->getExecutionEndDate() instanceof DateTimeImmutable) {
return $task->getExecutionEndDate() > $synchronizedCurrentDate;
if ($executionEndDate instanceof DateTimeImmutable) {
return $executionEndDate > $synchronizedCurrentDate;
}

return true;
Expand Down
7 changes: 4 additions & 3 deletions src/SchedulerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ public function getTasks(bool $lazy = false): TaskListInterface;
/**
* Allow to retrieve every due tasks, the logic used to build the TaskList is own to the scheduler.
*
* Can lazy-load the task list if @param bool $lazy is used
* If the @param bool $lazy is used, the tasks are lazy-loaded.
* If the @param bool $strict is used, the current date will assert that the seconds are equals to '00'.
*
* @throws Throwable {@see TransportInterface::list()}
*/
public function getDueTasks(bool $lazy = false): TaskListInterface;
public function getDueTasks(bool $lazy = false, bool $strict = false): TaskListInterface;

/**
* Return the next task that must be executed (based on {@see SchedulerInterface::getDueTasks()})
Expand All @@ -79,7 +80,7 @@ public function getDueTasks(bool $lazy = false): TaskListInterface;
public function next(bool $lazy = false): TaskInterface;

/**
* Remove every tasks except the ones that use the '@reboot' expression.
* Remove every task except the ones that use the '@reboot' expression.
*
* The "reboot" tasks are re-scheduled and MUST be executed as soon as possible.
*
Expand Down
32 changes: 22 additions & 10 deletions src/Task/AbstractTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ protected function defineOptions(array $options = [], array $additionalOptions =
$optionsResolver->setAllowedValues('execution_state', fn (string $executionState = null): bool => $this->validateExecutionState($executionState));

$optionsResolver->setNormalizer('expression', fn (Options $options, string $value): string => Expression::createFromString($value)->getExpression());
$optionsResolver->setNormalizer('execution_end_date', fn (Options $options, ?string $value): ?DateTimeImmutable => null !== $value ? new DateTimeImmutable($value, $options['timezone'] ?? $this->getTimezone() ?? new DateTimeZone('UTC')) : null);
$optionsResolver->setNormalizer('execution_start_date', fn (Options $options, ?string $value): ?DateTimeImmutable => null !== $value ? new DateTimeImmutable($value, $options['timezone'] ?? $this->getTimezone() ?? new DateTimeZone('UTC')) : null);

$optionsResolver->setInfo('arrival_time', '[INTERNAL] The time when the task is retrieved in order to execute it');
$optionsResolver->setInfo('access_lock_bag', '[INTERNAL] Used to store the key that hold the task lock state');
Expand Down Expand Up @@ -188,25 +190,37 @@ protected function defineOptions(array $options = [], array $additionalOptions =
$this->options = $optionsResolver->resolve($options);
}

/**
* {@inheritdoc}
*/
public function setName(string $name): TaskInterface
{
$this->name = $name;

return $this;
}

/**
* {@inheritdoc}
*/
public function getName(): string
{
return $this->name;
}

/**
* {@inheritdoc}
*/
public function setArrivalTime(DateTimeImmutable $dateTimeImmutable = null): TaskInterface
{
$this->options['arrival_time'] = $dateTimeImmutable;

return $this;
}

/**
* {@inheritdoc}
*/
public function getArrivalTime(): ?DateTimeImmutable
{
return $this->options['arrival_time'] ?? null;
Expand Down Expand Up @@ -442,14 +456,10 @@ public function setExecutionStartDate(string $executionStartDate = null): TaskIn
}

/**
* @throws Exception {@see DateTimeImmutable::__construct()}
* {@inheritdoc}
*/
public function getExecutionStartDate(): ?DateTimeImmutable
{
if (!$this->options['execution_start_date'] instanceof DateTimeImmutable) {
return null;
}

return $this->options['execution_start_date'];
}

Expand All @@ -468,14 +478,10 @@ public function setExecutionEndDate(string $executionEndDate = null): TaskInterf
}

/**
* @throws Exception {@see DateTimeImmutable::__construct()}
* {@inheritdoc}
*/
public function getExecutionEndDate(): ?DateTimeImmutable
{
if (!$this->options['execution_end_date'] instanceof DateTimeImmutable) {
return null;
}

return $this->options['execution_end_date'];
}

Expand All @@ -486,6 +492,9 @@ public function setExecutionStartTime(DateTimeImmutable $dateTimeImmutable = nul
return $this;
}

/**
* {@inheritdoc}
*/
public function getExecutionStartTime(): ?DateTimeImmutable
{
return $this->options['execution_start_time'] instanceof DateTimeImmutable ? $this->options['execution_start_time'] : null;
Expand All @@ -498,6 +507,9 @@ public function setExecutionEndTime(DateTimeImmutable $dateTimeImmutable = null)
return $this;
}

/**
* {@inheritdoc}
*/
public function getExecutionEndTime(): ?DateTimeImmutable
{
return $this->options['execution_end_time'] instanceof DateTimeImmutable ? $this->options['execution_end_time'] : null;
Expand Down
24 changes: 24 additions & 0 deletions src/Task/TaskInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,24 @@ interface TaskInterface
self::TO_RETRY,
];

/**
* Return the name of the task.
*/
public function getName(): string;

/**
* Define the name of the task.
*/
public function setName(string $name): self;

/**
* Set the date from which the task has been retrieved by the worker and started its execution.
*/
public function setArrivalTime(DateTimeImmutable $dateTimeImmutable = null): self;

/**
* Return the date from which the task has been retrieved by the worker and started its execution.
*/
public function getArrivalTime(): ?DateTimeImmutable;

public function setBackground(bool $background): self;
Expand Down Expand Up @@ -179,18 +191,30 @@ public function setExecutionRelativeDeadline(DateInterval $dateInterval = null):

public function setExecutionStartDate(string $executionStartDate = null): self;

/**
* Return the date from which the task is allowed to be executed / considered as "due".
*/
public function getExecutionStartDate(): ?DateTimeImmutable;

public function setExecutionEndDate(string $executionEndDate = null): self;

/**
* Return the date until which the task is allowed to be executed / considered as "due".
*/
public function getExecutionEndDate(): ?DateTimeImmutable;

public function setExecutionStartTime(DateTimeImmutable $dateTimeImmutable = null): self;

/**
* Return the date since the task started to being executed.
*/
public function getExecutionStartTime(): ?DateTimeImmutable;

public function setExecutionEndTime(DateTimeImmutable $dateTimeImmutable = null): self;

/**
* Return the date since the task stopped to being executed.
*/
public function getExecutionEndTime(): ?DateTimeImmutable;

/**
Expand Down
Loading

0 comments on commit 6ea8bb5

Please sign in to comment.