diff --git a/src/DependencyInjection/SchedulerBundleExtension.php b/src/DependencyInjection/SchedulerBundleExtension.php index 86c9a908..ab1ab88f 100644 --- a/src/DependencyInjection/SchedulerBundleExtension.php +++ b/src/DependencyInjection/SchedulerBundleExtension.php @@ -1387,7 +1387,7 @@ private function registerMiddlewareStacks(ContainerBuilder $container, array $co $container->register(SingleRunTaskMiddleware::class, SingleRunTaskMiddleware::class) ->setArguments([ - new Reference(SchedulerInterface::class, ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE), + new Reference(TransportInterface::class, ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE), new Reference(LoggerInterface::class, ContainerInterface::NULL_ON_INVALID_REFERENCE), ]) ->setPublic(false) @@ -1400,7 +1400,7 @@ private function registerMiddlewareStacks(ContainerBuilder $container, array $co $container->register(TaskUpdateMiddleware::class, TaskUpdateMiddleware::class) ->setArguments([ - new Reference(SchedulerInterface::class, ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE), + new Reference(TransportInterface::class, ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE), ]) ->setPublic(false) ->addTag(self::SCHEDULER_MIDDLEWARE_TAG) diff --git a/src/Middleware/SingleRunTaskMiddleware.php b/src/Middleware/SingleRunTaskMiddleware.php index f999ff70..50efd6f8 100644 --- a/src/Middleware/SingleRunTaskMiddleware.php +++ b/src/Middleware/SingleRunTaskMiddleware.php @@ -6,8 +6,8 @@ use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; -use SchedulerBundle\SchedulerInterface; use SchedulerBundle\Task\TaskInterface; +use SchedulerBundle\Transport\TransportInterface; use SchedulerBundle\Worker\WorkerInterface; use function in_array; use function sprintf; @@ -20,7 +20,7 @@ final class SingleRunTaskMiddleware implements PostExecutionMiddlewareInterface, private LoggerInterface $logger; public function __construct( - private SchedulerInterface $scheduler, + private TransportInterface $transport, ?LoggerInterface $logger = null ) { $this->logger = $logger ?? new NullLogger(); @@ -42,12 +42,12 @@ public function postExecute(TaskInterface $task, WorkerInterface $worker): void } if ($task->isDeleteAfterExecute()) { - $this->scheduler->unschedule($task->getName()); + $this->transport->delete($task->getName()); return; } - $this->scheduler->pause($task->getName()); + $this->transport->pause($task->getName()); } /** diff --git a/src/Middleware/TaskUpdateMiddleware.php b/src/Middleware/TaskUpdateMiddleware.php index afb4f849..31cad668 100644 --- a/src/Middleware/TaskUpdateMiddleware.php +++ b/src/Middleware/TaskUpdateMiddleware.php @@ -4,8 +4,8 @@ namespace SchedulerBundle\Middleware; -use SchedulerBundle\SchedulerInterface; use SchedulerBundle\Task\TaskInterface; +use SchedulerBundle\Transport\TransportInterface; use SchedulerBundle\Worker\WorkerInterface; /** @@ -13,7 +13,7 @@ */ final class TaskUpdateMiddleware implements PostExecutionMiddlewareInterface, OrderedMiddlewareInterface, RequiredMiddlewareInterface { - public function __construct(private SchedulerInterface $scheduler) + public function __construct(private TransportInterface $transport) { } @@ -22,7 +22,7 @@ public function __construct(private SchedulerInterface $scheduler) */ public function postExecute(TaskInterface $task, WorkerInterface $worker): void { - $this->scheduler->update($task->getName(), $task); + $this->transport->update($task->getName(), $task); } /** diff --git a/tests/Command/DebugMiddlewareCommandTest.php b/tests/Command/DebugMiddlewareCommandTest.php index e6b36b0a..d072f34d 100644 --- a/tests/Command/DebugMiddlewareCommandTest.php +++ b/tests/Command/DebugMiddlewareCommandTest.php @@ -13,7 +13,10 @@ use SchedulerBundle\Middleware\TaskCallbackMiddleware; use SchedulerBundle\Middleware\TaskLockBagMiddleware; use SchedulerBundle\Middleware\WorkerMiddlewareStack; -use SchedulerBundle\SchedulerInterface; +use SchedulerBundle\SchedulePolicy\FirstInFirstOutPolicy; +use SchedulerBundle\SchedulePolicy\SchedulePolicyOrchestrator; +use SchedulerBundle\Transport\Configuration\InMemoryConfiguration; +use SchedulerBundle\Transport\InMemoryTransport; use Symfony\Component\Console\Tester\CommandTester; use Symfony\Component\Lock\LockFactory; use Symfony\Component\Lock\Store\InMemoryStore; @@ -92,12 +95,12 @@ public function testCommandCanDisplaySchedulingPhaseMiddlewareList(): void public function testCommandCanDisplayExecutionPhaseMiddlewareList(): void { - $scheduler = $this->createMock(SchedulerInterface::class); - $command = new DebugMiddlewareCommand(new SchedulerMiddlewareStack(new MiddlewareRegistry([])), new WorkerMiddlewareStack(new MiddlewareRegistry([ new TaskCallbackMiddleware(), new TaskLockBagMiddleware(new LockFactory(new InMemoryStore())), - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware(new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + new FirstInFirstOutPolicy(), + ]))), new PostExecutionMiddleware(), new PreExecutionMiddleware(), ]))); diff --git a/tests/DependencyInjection/SchedulerBundleExtensionTest.php b/tests/DependencyInjection/SchedulerBundleExtensionTest.php index 1f94d2c6..6e6aec3b 100644 --- a/tests/DependencyInjection/SchedulerBundleExtensionTest.php +++ b/tests/DependencyInjection/SchedulerBundleExtensionTest.php @@ -1754,7 +1754,7 @@ public function testMiddlewareStackAreConfigured(): void self::assertFalse($container->getDefinition(SingleRunTaskMiddleware::class)->isPublic()); self::assertCount(2, $container->getDefinition(SingleRunTaskMiddleware::class)->getArguments()); self::assertInstanceOf(Reference::class, $container->getDefinition(SingleRunTaskMiddleware::class)->getArgument(0)); - self::assertSame(SchedulerInterface::class, (string) $container->getDefinition(SingleRunTaskMiddleware::class)->getArgument(0)); + self::assertSame(TransportInterface::class, (string) $container->getDefinition(SingleRunTaskMiddleware::class)->getArgument(0)); self::assertSame(ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE, $container->getDefinition(SingleRunTaskMiddleware::class)->getArgument(0)->getInvalidBehavior()); self::assertInstanceOf(Reference::class, $container->getDefinition(SingleRunTaskMiddleware::class)->getArgument(1)); self::assertSame(LoggerInterface::class, (string) $container->getDefinition(SingleRunTaskMiddleware::class)->getArgument(1)); @@ -1769,7 +1769,7 @@ public function testMiddlewareStackAreConfigured(): void self::assertFalse($container->getDefinition(TaskUpdateMiddleware::class)->isPublic()); self::assertCount(1, $container->getDefinition(TaskUpdateMiddleware::class)->getArguments()); self::assertInstanceOf(Reference::class, $container->getDefinition(TaskUpdateMiddleware::class)->getArgument(0)); - self::assertSame(SchedulerInterface::class, (string) $container->getDefinition(TaskUpdateMiddleware::class)->getArgument(0)); + self::assertSame(TransportInterface::class, (string) $container->getDefinition(TaskUpdateMiddleware::class)->getArgument(0)); self::assertCount(3, $container->getDefinition(TaskUpdateMiddleware::class)->getTags()); self::assertTrue($container->getDefinition(TaskUpdateMiddleware::class)->hasTag('scheduler.middleware')); self::assertTrue($container->getDefinition(TaskUpdateMiddleware::class)->hasTag('scheduler.worker_middleware')); diff --git a/tests/FiberSchedulerTest.php b/tests/FiberSchedulerTest.php index 809d2157..5f4faae5 100644 --- a/tests/FiberSchedulerTest.php +++ b/tests/FiberSchedulerTest.php @@ -1672,9 +1672,11 @@ public function testSchedulerCanPreemptTasks(): void $eventDispatcher = new EventDispatcher(); - $scheduler = new FiberScheduler(new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), $eventDispatcher)); + ])); + + $scheduler = new FiberScheduler(new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), $eventDispatcher)); $scheduler->schedule(new NullTask('foo')); $scheduler->schedule(new NullTask('bar')); @@ -1688,8 +1690,8 @@ public function testSchedulerCanPreemptTasks(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), new TaskExecutionTracker(new Stopwatch()), new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); diff --git a/tests/LazySchedulerTest.php b/tests/LazySchedulerTest.php index 9d0a6521..6d02e616 100644 --- a/tests/LazySchedulerTest.php +++ b/tests/LazySchedulerTest.php @@ -709,9 +709,11 @@ public function testSchedulerCanPreemptTasks(): void $eventDispatcher = new EventDispatcher(); - $scheduler = new LazyScheduler(new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(new MiddlewareRegistry([])), $eventDispatcher)); + ])); + + $scheduler = new LazyScheduler(new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(new MiddlewareRegistry([])), $eventDispatcher)); self::assertFalse($scheduler->isInitialized()); $scheduler->schedule(new NullTask('foo')); @@ -726,8 +728,8 @@ public function testSchedulerCanPreemptTasks(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), new TaskExecutionTracker(new Stopwatch()), new WorkerMiddlewareStack(new MiddlewareRegistry([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ])), $eventDispatcher, $lockFactory, $logger); diff --git a/tests/Middleware/SingleRunTaskMiddlewareTest.php b/tests/Middleware/SingleRunTaskMiddlewareTest.php index eb05bbbf..8afcf44d 100644 --- a/tests/Middleware/SingleRunTaskMiddlewareTest.php +++ b/tests/Middleware/SingleRunTaskMiddlewareTest.php @@ -8,9 +8,12 @@ use Psr\Log\LoggerInterface; use SchedulerBundle\Middleware\PostExecutionMiddlewareInterface; use SchedulerBundle\Middleware\SingleRunTaskMiddleware; -use SchedulerBundle\SchedulerInterface; +use SchedulerBundle\SchedulePolicy\FirstInFirstOutPolicy; +use SchedulerBundle\SchedulePolicy\SchedulePolicyOrchestrator; use SchedulerBundle\Task\NullTask; use SchedulerBundle\Task\TaskInterface; +use SchedulerBundle\Transport\Configuration\InMemoryConfiguration; +use SchedulerBundle\Transport\InMemoryTransport; use SchedulerBundle\Worker\WorkerInterface; use Throwable; @@ -21,9 +24,9 @@ final class SingleRunTaskMiddlewareTest extends TestCase { public function testMiddlewareIsConfigured(): void { - $scheduler = $this->createMock(SchedulerInterface::class); - - $singleRunTaskMiddleware = new SingleRunTaskMiddleware($scheduler); + $singleRunTaskMiddleware = new SingleRunTaskMiddleware(new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + new FirstInFirstOutPolicy(), + ]))); self::assertSame(15, $singleRunTaskMiddleware->getPriority()); } @@ -38,11 +41,10 @@ public function testMiddlewareCannotHandleTaskWithIncompleteExecutionState(): vo $logger = $this->createMock(LoggerInterface::class); $logger->expects(self::once())->method('warning')->with(self::equalTo('The task "foo" is marked as incomplete or to retry, the "is_single" option is not used')); - $scheduler = $this->createMock(SchedulerInterface::class); - $scheduler->expects(self::never())->method('pause'); - $scheduler->expects(self::never())->method('unschedule'); + $singleRunTaskMiddleware = new SingleRunTaskMiddleware(new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + new FirstInFirstOutPolicy(), + ])), $logger); - $singleRunTaskMiddleware = new SingleRunTaskMiddleware($scheduler, $logger); $singleRunTaskMiddleware->postExecute(new NullTask('foo', [ 'execution_state' => TaskInterface::INCOMPLETE, ]), $worker); @@ -58,11 +60,10 @@ public function testMiddlewareCannotHandleTaskWithToRetryExecutionState(): void $logger = $this->createMock(LoggerInterface::class); $logger->expects(self::once())->method('warning')->with(self::equalTo('The task "foo" is marked as incomplete or to retry, the "is_single" option is not used')); - $scheduler = $this->createMock(SchedulerInterface::class); - $scheduler->expects(self::never())->method('pause'); - $scheduler->expects(self::never())->method('unschedule'); + $singleRunTaskMiddleware = new SingleRunTaskMiddleware(new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + new FirstInFirstOutPolicy(), + ])), $logger); - $singleRunTaskMiddleware = new SingleRunTaskMiddleware($scheduler, $logger); $singleRunTaskMiddleware->postExecute(new NullTask('foo', [ 'execution_state' => TaskInterface::TO_RETRY, ]), $worker); @@ -75,14 +76,17 @@ public function testMiddlewareCannotHandleInvalidTask(): void { $worker = $this->createMock(WorkerInterface::class); - $scheduler = $this->createMock(SchedulerInterface::class); - $scheduler->expects(self::never())->method('pause'); - $scheduler->expects(self::never())->method('unschedule'); + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + new FirstInFirstOutPolicy(), + ])); + $transport->create(new NullTask('foo')); + + $singleRunTaskMiddleware = new SingleRunTaskMiddleware($transport); - $singleRunTaskMiddleware = new SingleRunTaskMiddleware($scheduler); $singleRunTaskMiddleware->postExecute(new NullTask('foo', [ 'single_run' => false, ]), $worker); + self::assertCount(1, $transport->list()); } /** @@ -92,14 +96,17 @@ public function testMiddlewareCanHandleSingleRunTask(): void { $worker = $this->createMock(WorkerInterface::class); - $scheduler = $this->createMock(SchedulerInterface::class); - $scheduler->expects(self::once())->method('pause')->with(self::equalTo('foo')); - $scheduler->expects(self::never())->method('unschedule'); + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + new FirstInFirstOutPolicy(), + ])); + $transport->create(new NullTask('foo')); + + $singleRunTaskMiddleware = new SingleRunTaskMiddleware($transport); - $singleRunTaskMiddleware = new SingleRunTaskMiddleware($scheduler); $singleRunTaskMiddleware->postExecute(new NullTask('foo', [ 'single_run' => true, ]), $worker); + self::assertSame(TaskInterface::PAUSED, $transport->get('foo')->getState()); } /** @@ -109,15 +116,18 @@ public function testMiddlewareCanHandleSingleRunTrueAndDeleteAfterExecuteTrueTas { $worker = $this->createMock(WorkerInterface::class); - $scheduler = $this->createMock(SchedulerInterface::class); - $scheduler->expects(self::never())->method('pause'); - $scheduler->expects(self::once())->method('unschedule')->with(self::equalTo('foo')); + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + new FirstInFirstOutPolicy(), + ])); + $transport->create(new NullTask('foo')); + + $singleRunTaskMiddleware = new SingleRunTaskMiddleware($transport); - $singleRunTaskMiddleware = new SingleRunTaskMiddleware($scheduler); $singleRunTaskMiddleware->postExecute(new NullTask('foo', [ 'single_run' => true, 'delete_after_execute' => true, ]), $worker); + self::assertCount(0, $transport->list()); } /** @@ -127,15 +137,18 @@ public function testMiddlewareCanHandleSingleRunFalseAndDeleteAfterExecuteFalseT { $worker = $this->createMock(WorkerInterface::class); - $scheduler = $this->createMock(SchedulerInterface::class); - $scheduler->expects(self::never())->method('pause'); - $scheduler->expects(self::never())->method('unschedule'); + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + new FirstInFirstOutPolicy(), + ])); + $transport->create(new NullTask('foo')); + + $singleRunTaskMiddleware = new SingleRunTaskMiddleware($transport); - $singleRunTaskMiddleware = new SingleRunTaskMiddleware($scheduler); $singleRunTaskMiddleware->postExecute(new NullTask('foo', [ 'single_run' => false, 'delete_after_execute' => false, ]), $worker); + self::assertCount(1, $transport->list()); } /** @@ -145,14 +158,17 @@ public function testMiddlewareCanHandleSingleRunFalseAndDeleteAfterExecuteTrueTa { $worker = $this->createMock(WorkerInterface::class); - $scheduler = $this->createMock(SchedulerInterface::class); - $scheduler->expects(self::never())->method('pause'); - $scheduler->expects(self::never())->method('unschedule'); + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + new FirstInFirstOutPolicy(), + ])); + $transport->create(new NullTask('foo')); + + $singleRunTaskMiddleware = new SingleRunTaskMiddleware($transport); - $singleRunTaskMiddleware = new SingleRunTaskMiddleware($scheduler); $singleRunTaskMiddleware->postExecute(new NullTask('foo', [ 'single_run' => false, 'delete_after_execute' => true, ]), $worker); + self::assertCount(1, $transport->list()); } } diff --git a/tests/Middleware/TaskUpdateMiddlewareTest.php b/tests/Middleware/TaskUpdateMiddlewareTest.php index 99565249..0e5dfc7c 100644 --- a/tests/Middleware/TaskUpdateMiddlewareTest.php +++ b/tests/Middleware/TaskUpdateMiddlewareTest.php @@ -6,9 +6,13 @@ use PHPUnit\Framework\TestCase; use SchedulerBundle\Middleware\TaskUpdateMiddleware; -use SchedulerBundle\SchedulerInterface; -use SchedulerBundle\Task\TaskInterface; +use SchedulerBundle\SchedulePolicy\FirstInFirstOutPolicy; +use SchedulerBundle\SchedulePolicy\SchedulePolicyOrchestrator; +use SchedulerBundle\Task\NullTask; +use SchedulerBundle\Transport\Configuration\InMemoryConfiguration; +use SchedulerBundle\Transport\InMemoryTransport; use SchedulerBundle\Worker\WorkerInterface; +use Throwable; /** * @author Guillaume Loulier @@ -17,26 +21,30 @@ final class TaskUpdateMiddlewareTest extends TestCase { public function testMiddlewareIsConfigured(): void { - $scheduler = $this->createMock(SchedulerInterface::class); - - $middleware = new TaskUpdateMiddleware($scheduler); + $middleware = new TaskUpdateMiddleware(new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + new FirstInFirstOutPolicy(), + ]))); self::assertSame(10, $middleware->getPriority()); } + /** + * @throws Throwable {@see PostExecutionMiddlewareInterface::postExecute()} + */ public function testMiddlewareCanUpdate(): void { $worker = $this->createMock(WorkerInterface::class); - $task = $this->createMock(TaskInterface::class); - $task->expects(self::once())->method('getName')->willReturn('foo'); + $task = new NullTask('foo'); - $scheduler = $this->createMock(SchedulerInterface::class); - $scheduler->expects(self::once())->method('update') - ->with(self::equalTo('foo'), self::equalTo($task)) - ; + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + new FirstInFirstOutPolicy(), + ])); + $transport->create($task); - $taskUpdateMiddleware = new TaskUpdateMiddleware($scheduler); + $taskUpdateMiddleware = new TaskUpdateMiddleware($transport); $taskUpdateMiddleware->postExecute($task, $worker); + + self::assertSame('foo', $task->getName()); } } diff --git a/tests/SchedulerTest.php b/tests/SchedulerTest.php index f1a620fc..4023e09c 100644 --- a/tests/SchedulerTest.php +++ b/tests/SchedulerTest.php @@ -1676,9 +1676,11 @@ public function testSchedulerCanPreemptTasks(): void $eventDispatcher = new EventDispatcher(); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(new MiddlewareRegistry([])), $eventDispatcher); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(new MiddlewareRegistry([])), $eventDispatcher); $scheduler->schedule(new NullTask('foo')); $scheduler->schedule(new NullTask('bar')); @@ -1692,8 +1694,8 @@ public function testSchedulerCanPreemptTasks(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), new TaskExecutionTracker(new Stopwatch()), new WorkerMiddlewareStack(new MiddlewareRegistry([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ])), $eventDispatcher, $lockFactory, $logger); diff --git a/tests/Worker/FiberWorkerTest.php b/tests/Worker/FiberWorkerTest.php index 480e0dd6..2a362e6b 100644 --- a/tests/Worker/FiberWorkerTest.php +++ b/tests/Worker/FiberWorkerTest.php @@ -194,9 +194,11 @@ public function testTaskCannotBeExecutedWithoutSupportingRunner(): void $watcher = $this->createMock(TaskExecutionTrackerInterface::class); $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack([]), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack([]), new EventDispatcher()); $scheduler->schedule(new NullTask('foo')); $eventDispatcher = $this->createMock(EventDispatcherInterface::class); @@ -210,7 +212,7 @@ public function testTaskCannotBeExecutedWithoutSupportingRunner(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $watcher, new WorkerMiddlewareStack([ - new TaskUpdateMiddleware($scheduler), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -242,9 +244,11 @@ public function testTaskCannotBeExecutedWhileWorkerIsStopped(): void $watcher = $this->createMock(TaskExecutionTrackerInterface::class); $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack([]), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack([]), new EventDispatcher()); $lockFactory = new LockFactory(new InMemoryStore()); @@ -253,7 +257,7 @@ public function testTaskCannotBeExecutedWhileWorkerIsStopped(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $watcher, new WorkerMiddlewareStack([ - new TaskUpdateMiddleware($scheduler), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -292,9 +296,11 @@ public function testTaskCannotBeExecutedWhilePaused(): void $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($secondTask)); $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($secondTask)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack([]), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack([]), new EventDispatcher()); $scheduler->schedule($task); $scheduler->schedule($secondTask); @@ -308,8 +314,8 @@ public function testTaskCannotBeExecutedWhilePaused(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -340,9 +346,11 @@ public function testTaskCannotBeExecutedWithAnExecutionDelay(): void $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -355,7 +363,7 @@ public function testTaskCannotBeExecutedWithAnExecutionDelay(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -382,9 +390,11 @@ public function testTaskCannotBeExecutedWithErroredBeforeExecutionCallback(): vo $tracker->expects(self::never())->method('startTracking')->with(self::equalTo($task)); $tracker->expects(self::never())->method('endTracking')->with(self::equalTo($task)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -397,7 +407,7 @@ public function testTaskCannotBeExecutedWithErroredBeforeExecutionCallback(): vo ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -430,9 +440,11 @@ public function testTaskCanBeExecutedWithErroredBeforeExecutionCallback(): void $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($validTask)); $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($validTask)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $scheduler->schedule($validTask); @@ -446,7 +458,7 @@ public function testTaskCanBeExecutedWithErroredBeforeExecutionCallback(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -477,9 +489,11 @@ public function testTaskCanBeExecutedWithBeforeExecutionCallback(): void $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($task)); $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($task)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -492,7 +506,7 @@ public function testTaskCanBeExecutedWithBeforeExecutionCallback(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -525,9 +539,11 @@ public function testTaskCanBeExecutedWithErroredAfterExecutionCallback(): void $tracker->expects(self::exactly(2))->method('startTracking')->withConsecutive([$task], [$validTask]); $tracker->expects(self::exactly(2))->method('endTracking')->withConsecutive([$task], [$validTask]); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $scheduler->schedule($validTask); @@ -541,7 +557,7 @@ public function testTaskCanBeExecutedWithErroredAfterExecutionCallback(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -572,9 +588,11 @@ public function testTaskCanBeExecutedWithAfterExecutionCallback(): void $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -587,7 +605,7 @@ public function testTaskCanBeExecutedWithAfterExecutionCallback(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -611,9 +629,11 @@ public function testTaskCanBeExecutedWithRunner(): void $eventDispatcher = new EventDispatcher(); $eventDispatcher->addSubscriber(new StopWorkerOnTaskLimitSubscriber(1)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack([ + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack([ new TaskLockBagMiddleware(new LockFactory(new InMemoryStore())), ]), $eventDispatcher); $scheduler->schedule(new NullTask('foo')); @@ -627,8 +647,8 @@ public function testTaskCanBeExecutedWithRunner(): void ]), new TaskExecutionTracker(new Stopwatch()), new WorkerMiddlewareStack([ new TaskCallbackMiddleware(), new NotifierMiddleware(), - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -661,9 +681,11 @@ public function testEmptyTaskCannotBeExecuted(): void $eventDispatcher = new EventDispatcher(); $eventDispatcher->addSubscriber(new StopWorkerOnTaskLimitSubscriber(1)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack([ + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack([ new TaskLockBagMiddleware(new LockFactory(new InMemoryStore())), ]), $eventDispatcher); @@ -676,8 +698,8 @@ public function testEmptyTaskCannotBeExecuted(): void ]), new TaskExecutionTracker(new Stopwatch()), new WorkerMiddlewareStack([ new TaskCallbackMiddleware(), new NotifierMiddleware(), - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -705,9 +727,11 @@ public function testTaskCanBeExecutedAndTheWorkerCanReturnTheLastExecutedTask(): $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -720,7 +744,7 @@ public function testTaskCanBeExecutedAndTheWorkerCanReturnTheLastExecutedTask(): ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -751,9 +775,11 @@ public function testTaskCannotBeExecutedTwiceAsSingleRunTask(): void $secondRunner->expects(self::once())->method('support')->willReturn(false); $secondRunner->expects(self::never())->method('run')->willReturn(new Output($shellTask, null)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($shellTask); $eventDispatcher = new EventDispatcher(); @@ -767,7 +793,7 @@ public function testTaskCannotBeExecutedTwiceAsSingleRunTask(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -845,9 +871,11 @@ public function testTaskCanBeExecutedWithoutBeforeExecutionNotificationAndNotifi $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -860,7 +888,7 @@ public function testTaskCanBeExecutedWithoutBeforeExecutionNotificationAndNotifi ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new NotifierMiddleware($notifier), new TaskLockBagMiddleware($lockFactory), @@ -896,9 +924,11 @@ public function testTaskCanBeExecutedWithBeforeExecutionNotificationAndNotifier( $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -911,7 +941,7 @@ public function testTaskCanBeExecutedWithBeforeExecutionNotificationAndNotifier( ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new NotifierMiddleware($notifier), new TaskLockBagMiddleware($lockFactory), @@ -942,9 +972,11 @@ public function testTaskCanBeExecutedWithoutAfterExecutionNotificationAndNotifie $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -957,7 +989,7 @@ public function testTaskCanBeExecutedWithoutAfterExecutionNotificationAndNotifie ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new NotifierMiddleware($notifier), new TaskLockBagMiddleware($lockFactory), @@ -993,9 +1025,11 @@ public function testTaskCanBeExecutedWithAfterExecutionNotificationAndNotifier() $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -1008,7 +1042,7 @@ public function testTaskCanBeExecutedWithAfterExecutionNotificationAndNotifier() ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new NotifierMiddleware($notifier), new TaskLockBagMiddleware($lockFactory, $logger), @@ -1036,9 +1070,11 @@ public function testWorkerCannotReserveMaxExecutionTokensWithoutRateLimiter(): v $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -1052,7 +1088,7 @@ public function testWorkerCannotReserveMaxExecutionTokensWithoutRateLimiter(): v new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ new MaxExecutionMiddleware(), - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1079,9 +1115,11 @@ public function testWorkerCannotReserveMaxExecutionTokensWithoutMaxExecutionLimi $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($task)); $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($task)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -1102,7 +1140,7 @@ public function testWorkerCannotReserveMaxExecutionTokensWithoutMaxExecutionLimi 'interval' => '5 seconds', ], ], new InMemoryStorage())), - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1129,9 +1167,11 @@ public function testWorkerCanReserveMaxExecutionTokensAndLimitTaskExecutionThenS $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($task)); $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($task)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -1152,7 +1192,7 @@ public function testWorkerCanReserveMaxExecutionTokensAndLimitTaskExecutionThenS 'interval' => '5 seconds', ], ], new InMemoryStorage())), - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1183,9 +1223,11 @@ public function testWorkerCanStopWhenTaskAreConsumedAndWithoutDaemonEnabled(): v $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($task)); $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($task)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack([]), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack([]), new EventDispatcher()); $eventDispatcher = $this->createMock(EventDispatcherInterface::class); $eventDispatcher->expects(self::exactly(7))->method('dispatch'); @@ -1197,7 +1239,7 @@ public function testWorkerCanStopWhenTaskAreConsumedAndWithoutDaemonEnabled(): v ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1227,9 +1269,11 @@ public function testWorkerCanStopWhenTaskAreConsumedWithError(): void $runner->expects(self::once())->method('support')->with($task)->willReturn(true); $runner->expects(self::once())->method('run')->with($task)->willThrowException(new RuntimeException('An error occurred')); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $eventDispatcher = $this->createMock(EventDispatcherInterface::class); $eventDispatcher->expects(self::exactly(7))->method('dispatch'); @@ -1241,7 +1285,7 @@ public function testWorkerCanStopWhenTaskAreConsumedWithError(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1277,9 +1321,11 @@ public function testPausedTaskIsNotExecutedIfListContainsASingleTask(): void ], ]); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule(new NullTask('bar', [ 'access_lock_bag' => new AccessLockBag(new Key('bar')), @@ -1300,8 +1346,8 @@ public function testPausedTaskIsNotExecutedIfListContainsASingleTask(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1326,9 +1372,11 @@ public function testWorkerCanExecuteChainedTasks(): void $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($chainedTask); $scheduler->schedule($shellTask); @@ -1342,8 +1390,8 @@ public function testWorkerCanExecuteChainedTasks(): void new DefaultPolicy(), new FiberPolicy(), ]), new TaskExecutionTracker(new Stopwatch()), new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1387,9 +1435,11 @@ public function testWorkerCanRetrieveTasksLazily(): void $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($chainedTask); $scheduler->schedule($shellTask); @@ -1403,8 +1453,8 @@ public function testWorkerCanRetrieveTasksLazily(): void new DefaultPolicy(), new FiberPolicy(), ]), new TaskExecutionTracker(new Stopwatch()), new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1454,9 +1504,11 @@ public function testWorkerCanExecuteLongRunningTask(): void $task->setScheduledAt(new DateTimeImmutable('- 1 month')); $task->setSingleRun(true); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1466,8 +1518,8 @@ public function testWorkerCanExecuteLongRunningTask(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), new EventDispatcher(), $lockFactory, $logger); @@ -1496,9 +1548,11 @@ public function testWorkerCanExecuteTaskWithExecutionDelay(): void 'access_lock_bag' => new AccessLockBag(new Key('foo')), ]); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1508,8 +1562,8 @@ public function testWorkerCanExecuteTaskWithExecutionDelay(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), new TaskExecutionMiddleware(), ]), new EventDispatcher(), $lockFactory, $logger); @@ -1532,9 +1586,11 @@ public function testWorkerCanStopWithEmptyTaskList(): void $tracker = $this->createMock(TaskExecutionTrackerInterface::class); $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1543,8 +1599,8 @@ public function testWorkerCanStopWithEmptyTaskList(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), new EventDispatcher(), $lockFactory, $logger); self::assertSame(0, $worker->getConfiguration()->getExecutedTasksCount()); @@ -1568,9 +1624,11 @@ public function testWorkerCanStopWithoutExecutedTasks(): void $tracker = $this->createMock(TaskExecutionTrackerInterface::class); $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule(new NullTask('foo')); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1580,8 +1638,8 @@ public function testWorkerCanStopWithoutExecutedTasks(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), new EventDispatcher(), $lockFactory, $logger); @@ -1603,9 +1661,11 @@ public function testWorkerCanStopWhenTasksAreExecutedAndWithoutSleepOptionTwice( $tracker = $this->createMock(TaskExecutionTrackerInterface::class); $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule(new NullTask('foo')); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1615,8 +1675,8 @@ public function testWorkerCanStopWhenTasksAreExecutedAndWithoutSleepOptionTwice( ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), new EventDispatcher(), $lockFactory, $logger); @@ -1648,9 +1708,11 @@ public function testWorkerCanBePaused(): void $logger = $this->createMock(LoggerInterface::class); $tracker = $this->createMock(TaskExecutionTrackerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule(new NullTask('foo')); $eventDispatcher = new EventDispatcher(); @@ -1678,8 +1740,8 @@ public function testWorkerCanBePaused(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1702,9 +1764,11 @@ public function testWorkerCanBeRestarted(): void $logger = $this->createMock(LoggerInterface::class); $tracker = $this->createMock(TaskExecutionTrackerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule(new NullTask('foo')); $eventDispatcher = new EventDispatcher(); @@ -1726,8 +1790,8 @@ public function testWorkerCanBeRestarted(): void ]), new ExecutionPolicyRegistry([ new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1751,9 +1815,11 @@ public function testWorkerCanPreempt(): void $logger = $this->createMock(LoggerInterface::class); $tracker = $this->createMock(TaskExecutionTrackerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1763,8 +1829,8 @@ public function testWorkerCanPreempt(): void new DefaultPolicy(), new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), new EventDispatcher(), $lockFactory, $logger); $worker->getConfiguration()->setExecutionPolicy('fiber'); @@ -1798,9 +1864,11 @@ public function testWorkerCannotPreemptEmptyList(): void $logger = $this->createMock(LoggerInterface::class); $tracker = $this->createMock(TaskExecutionTrackerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1810,8 +1878,8 @@ public function testWorkerCannotPreemptEmptyList(): void new DefaultPolicy(), new FiberPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), new EventDispatcher(), $lockFactory, $logger); $worker->getConfiguration()->setExecutionPolicy('fiber'); diff --git a/tests/Worker/WorkerTest.php b/tests/Worker/WorkerTest.php index 9dd034c2..67f9b891 100644 --- a/tests/Worker/WorkerTest.php +++ b/tests/Worker/WorkerTest.php @@ -186,9 +186,11 @@ public function testTaskCannotBeExecutedWithoutSupportingRunner(): void $watcher = $this->createMock(TaskExecutionTrackerInterface::class); $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack([]), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack([]), new EventDispatcher()); $scheduler->schedule(new NullTask('foo')); $eventDispatcher = $this->createMock(EventDispatcherInterface::class); @@ -202,7 +204,7 @@ public function testTaskCannotBeExecutedWithoutSupportingRunner(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $watcher, new WorkerMiddlewareStack([ - new TaskUpdateMiddleware($scheduler), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); $worker->execute(WorkerConfiguration::create()); @@ -230,9 +232,11 @@ public function testTaskCannotBeExecutedWhileWorkerIsStopped(): void $watcher = $this->createMock(TaskExecutionTrackerInterface::class); $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack([]), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack([]), new EventDispatcher()); $lockFactory = new LockFactory(new InMemoryStore()); @@ -241,7 +245,7 @@ public function testTaskCannotBeExecutedWhileWorkerIsStopped(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $watcher, new WorkerMiddlewareStack([ - new TaskUpdateMiddleware($scheduler), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -279,9 +283,11 @@ public function testTaskCannotBeExecutedWhilePaused(): void $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($secondTask)); $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($secondTask)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack([]), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack([]), new EventDispatcher()); $scheduler->schedule($task); $scheduler->schedule($secondTask); @@ -295,8 +301,8 @@ public function testTaskCannotBeExecutedWhilePaused(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); $worker->execute(WorkerConfiguration::create()); @@ -324,9 +330,11 @@ public function testTaskCannotBeExecutedWithAnExecutionDelay(): void $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -339,7 +347,7 @@ public function testTaskCannotBeExecutedWithAnExecutionDelay(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); $worker->execute(WorkerConfiguration::create()); @@ -363,9 +371,11 @@ public function testTaskCannotBeExecutedWithErroredBeforeExecutionCallback(): vo $tracker->expects(self::never())->method('startTracking')->with(self::equalTo($task)); $tracker->expects(self::never())->method('endTracking')->with(self::equalTo($task)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -378,7 +388,7 @@ public function testTaskCannotBeExecutedWithErroredBeforeExecutionCallback(): vo ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -408,9 +418,11 @@ public function testTaskCanBeExecutedWithErroredBeforeExecutionCallback(): void $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($validTask)); $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($validTask)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $scheduler->schedule($validTask); @@ -424,7 +436,7 @@ public function testTaskCanBeExecutedWithErroredBeforeExecutionCallback(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -452,9 +464,11 @@ public function testTaskCanBeExecutedWithBeforeExecutionCallback(): void $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($task)); $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($task)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -467,7 +481,7 @@ public function testTaskCanBeExecutedWithBeforeExecutionCallback(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -497,9 +511,11 @@ public function testTaskCanBeExecutedWithErroredAfterExecutionCallback(): void $tracker->expects(self::exactly(2))->method('startTracking')->withConsecutive([$task], [$validTask]); $tracker->expects(self::exactly(2))->method('endTracking')->withConsecutive([$task], [$validTask]); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $scheduler->schedule($validTask); @@ -513,7 +529,7 @@ public function testTaskCanBeExecutedWithErroredAfterExecutionCallback(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -541,9 +557,11 @@ public function testTaskCanBeExecutedWithAfterExecutionCallback(): void $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -556,7 +574,7 @@ public function testTaskCanBeExecutedWithAfterExecutionCallback(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -577,9 +595,11 @@ public function testTaskCanBeExecutedWithRunner(): void $eventDispatcher = new EventDispatcher(); $eventDispatcher->addSubscriber(new StopWorkerOnTaskLimitSubscriber(1)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack([ + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack([ new TaskLockBagMiddleware(new LockFactory(new InMemoryStore())), ]), $eventDispatcher); $scheduler->schedule(new NullTask('foo')); @@ -593,8 +613,8 @@ public function testTaskCanBeExecutedWithRunner(): void ]), new TaskExecutionTracker(new Stopwatch()), new WorkerMiddlewareStack([ new TaskCallbackMiddleware(), new NotifierMiddleware(), - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -624,9 +644,11 @@ public function testEmptyTaskCannotBeExecuted(): void $eventDispatcher = new EventDispatcher(); $eventDispatcher->addSubscriber(new StopWorkerOnTaskLimitSubscriber(1)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack([ + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack([ new TaskLockBagMiddleware(new LockFactory(new InMemoryStore())), ]), $eventDispatcher); @@ -639,8 +661,8 @@ public function testEmptyTaskCannotBeExecuted(): void ]), new TaskExecutionTracker(new Stopwatch()), new WorkerMiddlewareStack([ new TaskCallbackMiddleware(), new NotifierMiddleware(), - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -665,9 +687,11 @@ public function testTaskCanBeExecutedAndTheWorkerCanReturnTheLastExecutedTask(): $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -680,7 +704,7 @@ public function testTaskCanBeExecutedAndTheWorkerCanReturnTheLastExecutedTask(): ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); $worker->execute(WorkerConfiguration::create()); @@ -708,9 +732,11 @@ public function testTaskCannotBeExecutedTwiceAsSingleRunTask(): void $secondRunner->expects(self::once())->method('support')->willReturn(false); $secondRunner->expects(self::never())->method('run')->willReturn(new Output($shellTask, null)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($shellTask); $eventDispatcher = new EventDispatcher(); @@ -724,7 +750,7 @@ public function testTaskCannotBeExecutedTwiceAsSingleRunTask(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); $worker->execute(WorkerConfiguration::create()); @@ -796,9 +822,11 @@ public function testTaskCanBeExecutedWithoutBeforeExecutionNotificationAndNotifi $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -811,7 +839,7 @@ public function testTaskCanBeExecutedWithoutBeforeExecutionNotificationAndNotifi ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new NotifierMiddleware($notifier), new TaskLockBagMiddleware($lockFactory), @@ -844,9 +872,11 @@ public function testTaskCanBeExecutedWithBeforeExecutionNotificationAndNotifier( $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -859,7 +889,7 @@ public function testTaskCanBeExecutedWithBeforeExecutionNotificationAndNotifier( ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new NotifierMiddleware($notifier), new TaskLockBagMiddleware($lockFactory), @@ -887,9 +917,11 @@ public function testTaskCanBeExecutedWithoutAfterExecutionNotificationAndNotifie $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -902,7 +934,7 @@ public function testTaskCanBeExecutedWithoutAfterExecutionNotificationAndNotifie ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new NotifierMiddleware($notifier), new TaskLockBagMiddleware($lockFactory), @@ -935,9 +967,11 @@ public function testTaskCanBeExecutedWithAfterExecutionNotificationAndNotifier() $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -950,7 +984,7 @@ public function testTaskCanBeExecutedWithAfterExecutionNotificationAndNotifier() ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskCallbackMiddleware(), new NotifierMiddleware($notifier), new TaskLockBagMiddleware($lockFactory, $logger), @@ -975,9 +1009,11 @@ public function testWorkerCannotReserveMaxExecutionTokensWithoutRateLimiter(): v $tracker->expects(self::once())->method('startTracking'); $tracker->expects(self::once())->method('endTracking'); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -991,7 +1027,7 @@ public function testWorkerCannotReserveMaxExecutionTokensWithoutRateLimiter(): v new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ new MaxExecutionMiddleware(), - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); $worker->execute(WorkerConfiguration::create()); @@ -1015,9 +1051,11 @@ public function testWorkerCannotReserveMaxExecutionTokensWithoutMaxExecutionLimi $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($task)); $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($task)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -1038,7 +1076,7 @@ public function testWorkerCannotReserveMaxExecutionTokensWithoutMaxExecutionLimi 'interval' => '5 seconds', ], ], new InMemoryStorage())), - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); $worker->execute(WorkerConfiguration::create()); @@ -1062,9 +1100,11 @@ public function testWorkerCanReserveMaxExecutionTokensAndLimitTaskExecutionThenS $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($task)); $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($task)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $eventDispatcher = new EventDispatcher(); @@ -1085,7 +1125,7 @@ public function testWorkerCanReserveMaxExecutionTokensAndLimitTaskExecutionThenS 'interval' => '5 seconds', ], ], new InMemoryStorage())), - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1114,9 +1154,11 @@ public function testWorkerCanStopWhenTaskAreConsumedAndWithoutDaemonEnabled(): v $tracker->expects(self::once())->method('startTracking')->with(self::equalTo($task)); $tracker->expects(self::once())->method('endTracking')->with(self::equalTo($task)); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack([]), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack([]), new EventDispatcher()); $eventDispatcher = $this->createMock(EventDispatcherInterface::class); $eventDispatcher->expects(self::exactly(7))->method('dispatch'); @@ -1128,7 +1170,7 @@ public function testWorkerCanStopWhenTaskAreConsumedAndWithoutDaemonEnabled(): v ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); $worker->execute(WorkerConfiguration::create(), $task); @@ -1155,9 +1197,11 @@ public function testWorkerCanStopWhenTaskAreConsumedWithError(): void $runner->expects(self::once())->method('support')->with($task)->willReturn(true); $runner->expects(self::once())->method('run')->with($task)->willThrowException(new RuntimeException('An error occurred')); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $eventDispatcher = $this->createMock(EventDispatcherInterface::class); $eventDispatcher->expects(self::exactly(7))->method('dispatch'); @@ -1169,7 +1213,7 @@ public function testWorkerCanStopWhenTaskAreConsumedWithError(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1203,9 +1247,11 @@ public function testPausedTaskIsNotExecutedIfListContainsASingleTask(): void ], ]); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule(new NullTask('bar', [ 'access_lock_bag' => new AccessLockBag(new Key('bar')), @@ -1226,8 +1272,8 @@ public function testPausedTaskIsNotExecutedIfListContainsASingleTask(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); $worker->execute(WorkerConfiguration::create()); @@ -1249,9 +1295,11 @@ public function testWorkerCanExecuteChainedTasks(): void $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($chainedTask); $scheduler->schedule($shellTask); @@ -1264,8 +1312,8 @@ public function testWorkerCanExecuteChainedTasks(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), new TaskExecutionTracker(new Stopwatch()), new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); $worker->execute(WorkerConfiguration::create()); @@ -1306,9 +1354,11 @@ public function testWorkerCanRetrieveTasksLazily(): void $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($chainedTask); $scheduler->schedule($shellTask); @@ -1321,8 +1371,8 @@ public function testWorkerCanRetrieveTasksLazily(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), new TaskExecutionTracker(new Stopwatch()), new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1372,9 +1422,11 @@ public function testWorkerCanExecuteLongRunningTask(): void $task->setScheduledAt(new DateTimeImmutable('- 1 month')); $task->setSingleRun(true); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1384,8 +1436,8 @@ public function testWorkerCanExecuteLongRunningTask(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), new EventDispatcher(), $lockFactory, $logger); @@ -1412,9 +1464,11 @@ public function testWorkerCanExecuteTaskWithExecutionDelay(): void 'access_lock_bag' => new AccessLockBag(new Key('foo')), ]); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule($task); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1424,8 +1478,8 @@ public function testWorkerCanExecuteTaskWithExecutionDelay(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), new TaskExecutionMiddleware(), ]), new EventDispatcher(), $lockFactory, $logger); @@ -1446,9 +1500,11 @@ public function testWorkerCanStopWithEmptyTaskList(): void $tracker = $this->createMock(TaskExecutionTrackerInterface::class); $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1457,8 +1513,8 @@ public function testWorkerCanStopWithEmptyTaskList(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), new EventDispatcher(), $lockFactory, $logger); self::assertSame(0, $worker->getConfiguration()->getExecutedTasksCount()); @@ -1480,9 +1536,11 @@ public function testWorkerCanStopWithoutExecutedTasks(): void $tracker = $this->createMock(TaskExecutionTrackerInterface::class); $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule(new NullTask('foo')); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1492,8 +1550,8 @@ public function testWorkerCanStopWithoutExecutedTasks(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), new EventDispatcher(), $lockFactory, $logger); @@ -1513,9 +1571,11 @@ public function testWorkerCanStopWhenTasksAreExecutedAndWithoutSleepOptionTwice( $tracker = $this->createMock(TaskExecutionTrackerInterface::class); $logger = $this->createMock(LoggerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule(new NullTask('foo')); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1525,8 +1585,8 @@ public function testWorkerCanStopWhenTasksAreExecutedAndWithoutSleepOptionTwice( ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), new EventDispatcher(), $lockFactory, $logger); @@ -1554,9 +1614,11 @@ public function testWorkerCanBePaused(): void $logger = $this->createMock(LoggerInterface::class); $tracker = $this->createMock(TaskExecutionTrackerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule(new NullTask('foo')); $eventDispatcher = new EventDispatcher(); @@ -1584,8 +1646,8 @@ public function testWorkerCanBePaused(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1605,9 +1667,11 @@ public function testWorkerCanBeRestarted(): void $logger = $this->createMock(LoggerInterface::class); $tracker = $this->createMock(TaskExecutionTrackerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $scheduler->schedule(new NullTask('foo')); $eventDispatcher = new EventDispatcher(); @@ -1629,8 +1693,8 @@ public function testWorkerCanBeRestarted(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), $eventDispatcher, $lockFactory, $logger); @@ -1652,9 +1716,11 @@ public function testWorkerCanPreempt(): void $logger = $this->createMock(LoggerInterface::class); $tracker = $this->createMock(TaskExecutionTrackerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1663,8 +1729,8 @@ public function testWorkerCanPreempt(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), new EventDispatcher(), $lockFactory, $logger); @@ -1697,9 +1763,11 @@ public function testWorkerCannotPreemptEmptyList(): void $logger = $this->createMock(LoggerInterface::class); $tracker = $this->createMock(TaskExecutionTrackerInterface::class); - $scheduler = new Scheduler('UTC', new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ + $transport = new InMemoryTransport(new InMemoryConfiguration(), new SchedulePolicyOrchestrator([ new FirstInFirstOutPolicy(), - ])), new SchedulerMiddlewareStack(), new EventDispatcher()); + ])); + + $scheduler = new Scheduler('UTC', $transport, new SchedulerMiddlewareStack(), new EventDispatcher()); $lockFactory = new LockFactory(new InMemoryStore()); @@ -1708,8 +1776,8 @@ public function testWorkerCannotPreemptEmptyList(): void ]), new ExecutionPolicyRegistry([ new DefaultPolicy(), ]), $tracker, new WorkerMiddlewareStack([ - new SingleRunTaskMiddleware($scheduler), - new TaskUpdateMiddleware($scheduler), + new SingleRunTaskMiddleware($transport), + new TaskUpdateMiddleware($transport), new TaskLockBagMiddleware($lockFactory), ]), new EventDispatcher(), $lockFactory, $logger);