Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Fibers support #235

Merged
merged 20 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/static-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ jobs:
composer-options: "--prefer-stable"

- name: Run PHPStan
run: php vendor/bin/phpstan analyze --xdebug
run: php vendor/bin/phpstan analyze --xdebug --configuration phpstan.neon.${{ matrix.php-versions }}.dist
5 changes: 5 additions & 0 deletions .php-cs-fixer.dist.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
'date_time_immutable' => true,
'declare_strict_types' => true,
'fully_qualified_strict_types' => true,
'function_declaration' => true,
'function_typehint_space' => true,
'global_namespace_import' => [
'import_classes' => true,
'import_constants' => true,
'import_functions' => true,
],
'implode_call' => true,
'modernize_strpos' => true,
'no_alias_functions' => true,
'no_alias_language_construct_call' => true,
'no_multiline_whitespace_around_double_arrow' => true,
'no_trailing_comma_in_singleline_array' => true,
'no_unused_imports' => true,
Expand Down
10 changes: 7 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ php-cs-fixer-dry: ## Run PHP-CS-FIXER in --dry-run mode
php-cs-fixer-dry: .php-cs-fixer.dist.php
$(PHP) vendor/bin/php-cs-fixer fix --allow-risky=yes --dry-run

phpstan: ## Run PHPStan (the configuration must be defined in phpstan.neon.dist)
phpstan: phpstan.neon.dist
$(PHP) vendor/bin/phpstan analyse --memory-limit 2G --xdebug
phpstan_80: ## Run PHPStan (the configuration must be defined in phpstan.neon.dist)
phpstan_80: phpstan.neon.8.0.dist
$(PHP) vendor/bin/phpstan analyse --memory-limit 2G --xdebug --configuration phpstan.neon.8.0.dist

phpstan_81: ## Run PHPStan (the configuration must be defined in phpstan.neon.dist)
phpstan_81: phpstan.neon.8.1.dist
$(PHP) vendor/bin/phpstan analyse --memory-limit 2G --xdebug --configuration phpstan.neon.8.1.dist

rector: rector.php
$(PHP) vendor/bin/rector
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
- Background worker
- [Symfony/Messenger](https://symfony.com/doc/current/messenger.html) integration
- [Mercure](https://www.mercure.rocks) integration
- [Fibers](https://www.php.net/manual/en/language.fibers.php) support

## Installation

Expand Down
2 changes: 2 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"SchedulerBundle\\EventListener\\": "src/EventListener/",
"SchedulerBundle\\Exception\\": "src/Exception/",
"SchedulerBundle\\Expression\\": "src/Expression/",
"SchedulerBundle\\Fiber\\": "src/Fiber/",
"SchedulerBundle\\Messenger\\": "src/Messenger/",
"SchedulerBundle\\Middleware\\": "src/Middleware/",
"SchedulerBundle\\Pool\\": "src/Pool/",
Expand Down Expand Up @@ -65,6 +66,7 @@
"Tests\\SchedulerBundle\\Command\\": "tests/Command/",
"Tests\\SchedulerBundle\\DataCollector\\": "tests/DataCollector/",
"Tests\\SchedulerBundle\\DependencyInjection\\": "tests/DependencyInjection/",
"Tests\\SchedulerBundle\\DependencyInjection\\Assets\\": "tests/DependencyInjection/Assets/",
"Tests\\SchedulerBundle\\Event\\": "tests/Event/",
"Tests\\SchedulerBundle\\EventListener\\": "tests/EventListener/",
"Tests\\SchedulerBundle\\Expression\\": "tests/Expression/",
Expand Down
10 changes: 10 additions & 0 deletions doc/scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The [Scheduler](../src/Scheduler.php) is the main entrypoint for every action re
- [API](#api)
- [Asynchronous API](#asynchronous-api)
- [Lazy scheduler](#lazy-scheduler)
- [Fiber scheduler](#fiber-scheduler)
- [SchedulerAware](#scheduleraware)

## API
Expand Down Expand Up @@ -161,6 +162,15 @@ the default `Scheduler`, when enabled via the configuration, each action perform

The scheduler still available to injection via [SchedulerInterface](../src/SchedulerInterface.php).

## Fiber scheduler

_Introduced in `0.9`_

The [FiberScheduler](../src/FiberScheduler.php) act as a wrapper around
the default `Scheduler`, when enabled via the configuration, each action is performed using a dedicated fiber.

The scheduler still available to injection via [SchedulerInterface](../src/SchedulerInterface.php).

## SchedulerAware

_Introduced in `0.9`_
Expand Down
17 changes: 16 additions & 1 deletion doc/transport.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ This bundle defines a set of transports, each transport has its own configuratio
- [FailOver](#failover)
- [RoundRobin](#roundrobin)
- [Lazy](#lazy)
- [Fiber](#fiber)
- [Redis](#redis)
- [Doctrine](#doctrine)

Expand Down Expand Up @@ -193,7 +194,7 @@ scheduler_bundle:

## Lazy

The [Lazy](../src/Transport/LazyTransport.php) allows to perform actions using an asynchronous approach.
The [LazyTransport](../src/Transport/LazyTransport.php) allows to perform actions using an asynchronous approach.

### Usage

Expand All @@ -203,6 +204,20 @@ scheduler_bundle:
dsn: 'lazy://(memory://batch)'
```

## Fiber

_Introduced in `0.9`_

The [FiberTransport](../src/Transport/FiberTransport.php) allows to perform actions using native fibers.

### Usage

```yaml
scheduler_bundle:
transport:
dsn: 'fiber://(memory://batch)'
```

## Redis

The [RedisTransport](../src/Bridge/Redis/Transport/RedisTransport.php) allows to use Redis
Expand Down
21 changes: 21 additions & 0 deletions doc/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and executed thanks to [runners](runners.md).
- [Daemon](#daemon)
- [Loop](#loop)
- [Forking a worker](#forking-a-worker)
- [Using fibers](#using-fibers)
- [Extending](#extending)

## API
Expand Down Expand Up @@ -77,6 +78,26 @@ a [WorkerForkedEvent](../src/Event/WorkerForkedEvent.php) is dispatched.

**PS II: You can determine if the current worker is a fork via the option `isForked`.**

## Using fibers

_Introduced in `0.9`_

Since PHP `8.1`, you can use fibers to "fork" the current process and create an isolated process
Copy link

@GromNaN GromNaN Apr 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, Fibers are not a new syntax to fork processes. The fibers are similar to Generators and executed in the same process.

See RFC https://wiki.php.net/rfc/fibers

Since fibers exist within a single process thread, switching between fibers is significantly more performant than switching between processes or threads.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @GromNaN 👋🏻

Nice catch, I opened #238 if you want to gave more informations / feedbacks on this one 🙂

that can execute some logic.

Thanks to the [FiberWorker](../src/Worker/FiberWorker.php), you can use fibers to execute tasks,
you can enable it via the configuration:

```yaml
scheduler_bundle:
# ...

worker:
mode: 'fiber'
```

**PS: Keep in mind that a fiber is created for each task to execute.**

## Extending

The worker can easily be extended thanks to [WorkerInterface](../src/Worker/WorkerInterface.php) but
Expand Down
18 changes: 18 additions & 0 deletions phpstan.neon.8.0.dist
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
includes:
- vendor/phpstan/phpstan-deprecation-rules/rules.neon
- vendor/phpstan/phpstan-doctrine/extension.neon
- vendor/phpstan/phpstan-phpunit/extension.neon
- vendor/phpstan/phpstan-strict-rules/rules.neon
- vendor/phpstan/phpstan-symfony/extension.neon

parameters:
level: 5
paths:
- src
- tests
excludePaths:
- vendor
ignoreErrors:
- '#Instantiated class Fiber not found#'
- '#Call to method start\(\) on an unknown class Fiber#'
- '#Call to static method suspend\(\) on an unknown class Fiber#'
File renamed without changes.
15 changes: 14 additions & 1 deletion rector.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,20 @@
__DIR__ . '/tests',
]);

$parameters->set(Option::SKIP, [
'8.1' !== PHP_VERSION ? $parameters->set(Option::SKIP, [
__DIR__ . '/vendor',
__DIR__ . '/src/DependencyInjection/SchedulerBundleExtension.php',
__DIR__ . '/src/Fiber/AbstractFiberHandler.php',
__DIR__ . '/src/Transport/Configuration/FiberConfiguration.php',
__DIR__ . '/src/Transport/FiberTransport.php',
__DIR__ . '/src/Worker/FiberWorker.php',
__DIR__ . '/src/FiberScheduler.php',
__DIR__ . '/tests/Serializer/TaskNormalizerTest.php',
__DIR__ . '/tests/FiberSchedulerTest.php',
__DIR__ . '/tests/Transport/Configuration/FiberConfigurationTest.php',
__DIR__ . '/tests/Transport/FiberTransportTest.php',
__DIR__ . '/tests/Worker/FiberWorkerTest.php',
]) : $parameters->set(Option::SKIP, [
__DIR__ . '/vendor',
__DIR__ . '/src/DependencyInjection/SchedulerBundleExtension.php',
__DIR__ . '/tests/Serializer/TaskNormalizerTest.php',
Expand Down
3 changes: 1 addition & 2 deletions src/Bridge/Redis/Transport/RedisTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use SchedulerBundle\Transport\Dsn;
use SchedulerBundle\Transport\TransportFactoryInterface;
use Symfony\Component\Serializer\SerializerInterface;
use function array_merge;
use function class_exists;
use function phpversion;
use function version_compare;
Expand Down Expand Up @@ -46,7 +45,7 @@ public function createTransport(Dsn $dsn, array $options, SerializerInterface $s
'list' => $dsn->getOption('list', '_symfony_scheduler_tasks'),
];

return new RedisTransport(array_merge($connectionOptions, $options), $serializer, $schedulePolicyOrchestrator);
return new RedisTransport($connectionOptions + $options, $serializer, $schedulePolicyOrchestrator);
}

/**
Expand Down
14 changes: 12 additions & 2 deletions src/DependencyInjection/SchedulerBundleConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public function getConfigTreeBuilder(): TreeBuilder
->arrayNode('scheduler')
->children()
->enumNode('mode')
->info('Define the scheduler mode (lazy or default)')
->values(['default', 'lazy'])
->info('Define the scheduler mode (default, lazy or fiber)')
->values(['default', 'lazy', 'fiber'])
->defaultValue('default')
->end()
->end()
Expand Down Expand Up @@ -225,6 +225,16 @@ public function getConfigTreeBuilder(): TreeBuilder
->end()
->end()
->end()
->arrayNode('worker')
->addDefaultsIfNotSet()
->children()
->enumNode('mode')
->info('The current worker mode (fiber or default)')
->values(['default', 'fiber'])
->defaultValue('default')
->end()
->end()
->end()
->end()
->end()
;
Expand Down
67 changes: 65 additions & 2 deletions src/DependencyInjection/SchedulerBundleExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
use SchedulerBundle\Expression\ExpressionBuilder;
use SchedulerBundle\Expression\ExpressionBuilderInterface;
use SchedulerBundle\Expression\FluentExpressionBuilder;
use SchedulerBundle\FiberScheduler;
use SchedulerBundle\LazyScheduler;
use SchedulerBundle\Messenger\TaskToExecuteMessageHandler;
use SchedulerBundle\Messenger\TaskToPauseMessageHandler;
Expand Down Expand Up @@ -104,10 +105,12 @@
use SchedulerBundle\Transport\Configuration\ConfigurationFactory;
use SchedulerBundle\Transport\Configuration\ConfigurationFactoryInterface;
use SchedulerBundle\Transport\Configuration\ConfigurationInterface as TransportConfigurationInterface;
use SchedulerBundle\Transport\Configuration\FiberConfigurationFactory;
use SchedulerBundle\Transport\Configuration\InMemoryConfigurationFactory;
use SchedulerBundle\Transport\Configuration\LazyConfigurationFactory;
use SchedulerBundle\Transport\Dsn;
use SchedulerBundle\Transport\FailOverTransportFactory;
use SchedulerBundle\Transport\FiberTransportFactory;
use SchedulerBundle\Transport\FilesystemTransportFactory;
use SchedulerBundle\Transport\InMemoryTransportFactory;
use SchedulerBundle\Transport\LazyTransportFactory;
Expand All @@ -116,6 +119,7 @@
use SchedulerBundle\Transport\TransportFactory;
use SchedulerBundle\Transport\TransportFactoryInterface;
use SchedulerBundle\Transport\TransportInterface;
use SchedulerBundle\Worker\FiberWorker;
use SchedulerBundle\Worker\Worker;
use SchedulerBundle\Worker\WorkerInterface;
use Symfony\Bundle\FrameworkBundle\Console\Application;
Expand Down Expand Up @@ -206,6 +210,7 @@ private function registerParameters(ContainerBuilder $container, array $configur
$container->setParameter('scheduler.probe_enabled', $configuration['probe']['enabled'] ?? false);
$container->setParameter('scheduler.mercure_support', $configuration['mercure']['enabled']);
$container->setParameter('scheduler.pool_support', $configuration['pool']['enabled']);
$container->setParameter('scheduler.worker_mode', $configuration['worker']['mode']);
}

private function registerAutoConfigure(ContainerBuilder $container): void
Expand Down Expand Up @@ -259,6 +264,17 @@ private function registerConfigurationFactories(ContainerBuilder $container): vo
'class' => LazyConfigurationFactory::class,
])
;

$container->register(FiberConfigurationFactory::class, FiberConfigurationFactory::class)
->setArguments([
new TaggedIteratorArgument(self::TRANSPORT_CONFIGURATION_FACTORY_TAG),
])
->setPublic(false)
->addTag(self::TRANSPORT_CONFIGURATION_FACTORY_TAG)
->addTag('container.preload', [
'class' => FiberConfigurationFactory::class,
])
;
}

/**
Expand Down Expand Up @@ -355,6 +371,17 @@ private function registerTransportFactories(ContainerBuilder $container, array $
])
;

$container->register(FiberTransportFactory::class, FiberTransportFactory::class)
->setArguments([
new TaggedIteratorArgument(self::SCHEDULER_TRANSPORT_FACTORY_TAG),
])
->setPublic(false)
->addTag(self::SCHEDULER_TRANSPORT_FACTORY_TAG)
->addTag('container.preload', [
'class' => FiberTransportFactory::class,
])
;

if (!str_starts_with($configuration['transport']['dsn'], 'cache://')) {
return;
}
Expand Down Expand Up @@ -460,6 +487,20 @@ private function registerScheduler(ContainerBuilder $container): void
])
;
}

if ('fiber' === $container->getParameter('scheduler.scheduler_mode')) {
$container->register(FiberScheduler::class, FiberScheduler::class)
->setDecoratedService(Scheduler::class, 'scheduler.scheduler')
->setArguments([
new Reference('scheduler.scheduler', ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE),
new Reference(LoggerInterface::class, ContainerInterface::NULL_ON_INVALID_REFERENCE),
])
->setPublic(false)
->addTag('container.preload', [
'class' => FiberScheduler::class,
])
;
}
}

private function registerCommands(ContainerBuilder $container): void
Expand Down Expand Up @@ -1006,7 +1047,7 @@ private function registerTracker(ContainerBuilder $container): void

private function registerWorker(ContainerBuilder $container): void
{
$container->register(Worker::class, Worker::class)
$container->register('scheduler.worker', Worker::class)
->setArguments([
new Reference(SchedulerInterface::class, ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE),
new Reference(RunnerRegistryInterface::class, ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE),
Expand All @@ -1025,7 +1066,29 @@ private function registerWorker(ContainerBuilder $container): void
'class' => Worker::class,
])
;
$container->setAlias(WorkerInterface::class, Worker::class);
$container->setAlias(WorkerInterface::class, 'scheduler.worker');

if ('fiber' === $container->getParameter('scheduler.worker_mode')) {
$container->register('scheduler.worker', FiberWorker::class)
->setArguments([
new Reference(SchedulerInterface::class, ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE),
new Reference(RunnerRegistryInterface::class, ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE),
new Reference(TaskExecutionTrackerInterface::class, ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE),
new Reference(WorkerMiddlewareStack::class, ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE),
new Reference(EventDispatcherInterface::class, ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE),
new Reference('scheduler.lock_store.factory', ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE),
new Reference(LoggerInterface::class, ContainerInterface::NULL_ON_INVALID_REFERENCE),
])
->addTag('scheduler.worker')
->addTag('monolog.logger', [
'channel' => 'scheduler',
])
->addTag('container.hot_path')
->addTag('container.preload', [
'class' => FiberWorker::class,
])
;
}
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/DependencyInjection/SchedulerPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use SchedulerBundle\SchedulerInterface;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\DependencyInjection\Reference;
use function array_keys;

Expand Down Expand Up @@ -47,7 +48,7 @@ private function registerSchedulerEntrypoint(ContainerBuilder $container): void
{
foreach (array_keys($container->findTaggedServiceIds($this->schedulerEntryPointTag)) as $service) {
$container->getDefinition($service)->addMethodCall('schedule', [
new Reference(SchedulerInterface::class),
new Reference(SchedulerInterface::class, ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE),
]);
}
}
Expand Down
Loading