8000 [Messenger] implementation of `messenger:consume`, which processes messages concurrently by alli83 · Pull Request #53964 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] implementation of messenger:consume, which processes messages concurrently #53964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 7.4
Choose a base branch
from
Open
Show file tree
10000 Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Bundle\FrameworkBundle\DependencyInjection;

use Amp\Parallel\Worker\Task;
use Composer\InstalledVersions;
use Http\Client\HttpAsyncClient;
use Http\Client\HttpClient;
Expand Down Expand Up @@ -113,6 +114,7 @@
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Middleware\RouterContextMiddleware;
use Symfony\Component\Messenger\ParallelMessageBus;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface as MessengerTransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
Expand Down Expand Up @@ -2090,6 +2092,10 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder

$loader->load('messenger.php');

if (!class_exists(ParallelMessageBus::class)) {
$container->removeDefinition('parallel_bus');
}

if (!interface_exists(DenormalizerInterface::class)) {
$container->removeDefinition('serializer.normalizer.flatten_exception');
}
Expand Down Expand Up @@ -2161,7 +2167,12 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder

if ($busId === $config['default_bus']) {
$container->setAlias('messenger.default_bus', $busId)->setPublic(true);
$container->setAlias(MessageBusInterface::class, $busId);

$messageBusAlias = $container->setAlias(MessageBusInterface::class, $busId);

if (class_exists(Task::class)) {
$messageBusAlias->setPublic(true);
}
} else {
$container->registerAliasForArgument($busId, MessageBusInterface::class);
}
Expand Down
11 changes: 11 additions & 0 deletions src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Middleware\TraceableMiddleware;
use Symfony\Component\Messenger\Middleware\ValidationMiddleware;
use Symfony\Component\Messenger\ParallelMessageBus;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory;
Expand All @@ -54,6 +55,7 @@
abstract_arg('per message senders map'),
abstract_arg('senders service locator'),
])

->set('messenger.middleware.send_message', SendMessageMiddleware::class)
->abstract()
->args([
Expand Down Expand Up @@ -134,6 +136,15 @@
])
->tag('messenger.transport_factory')

->set('parallel_bus', ParallelMessageBus::class)
->args([
[],
param('kernel.environment'),
param('kernel.debug'),
param('kernel.project_dir'),
])
->tag('messenger.bus')

->set('messenger.transport.in_memory.factory', InMemoryTransportFactory::class)
->args([
service('clock')->nullOnInvalid(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ protected function configure(): void
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
new InputOption('parallel-limit', 'p', InputOption::VALUE_REQUIRED, 'The number of concurrent processes', 10),
])
->setHelp(<<<'EOF'
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
Expand Down Expand Up @@ -240,6 +241,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$options['queues'] = $queues;
}

$options['parallel-limit'] = $input->getOption('parallel-limit');

try {
$this->worker->run($options);
} finally {
Expand Down
88 changes: 88 additions & 0 deletions src/Symfony/Component/Messenger/DispatchTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger;

use Amp\Cache\LocalCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
use App\Kernel;
use Psr\Container\ContainerInterface;
use Symfony\Component\Dotenv\Dotenv;
use Symfony\Component\Messenger\Stamp\AckStamp;

class DispatchTask implements Task

Check failure on line 23 in src/Symfony/Component/Messenger/DispatchTask.php

View workflow job for this annotation

GitHub Actions / Psalm

UndefinedClass

src/Symfony/Component/Messenger/DispatchTask.php:23:31: UndefinedClass: Class, interface or enum named Amp\Parallel\Worker\Task does not exist (see https://psalm.dev/019)

Check failure on line 23 in src/Symfony/Component/Messenger/DispatchTask.php

View workflow job for this annotation

GitHub Actions / Psalm

UndefinedClass

src/Symfony/Component/Messenger/DispatchTask.php:23:31: UndefinedClass: Class, interface or enum named Amp\Parallel\Worker\Task does not exist (see https://psalm.dev/019)
{
private static ?LocalCache $cache = null;

public function __construct(
private Envelope $envelope,
private array $stamps,
private readonly string $env,
private readonly bool $isDebug,
private readonly string $projectDir,
) {
if (!class_exists(LocalCache::class)) {
throw new \LogicException(\sprintf('Package "amp/cache" is required to use the "%s". Try running "composer require amphp/cache".', LocalCache::class));
}
}

public function run(Channel $channel, Cancellation $cancellation): mixed
{
$container = $this->getContainer();
$envelope = $this->dispatch($container, $channel);

return $envelope->withoutStampsOfType(AckStamp::class);
}

private function dispatch(ContainerInterface $container, $channel)
{
$messageBus = $container->get(MessageBusInterface::class);

return $messageBus->dispatch($this->envelope, $this->stamps);
}

private function getContainer()
{
$cache = self::$cache ??= new LocalCache();
$container = $cache->get('cache-container');

// if not in cache, create container
if (!$container) {
if (!method_exists(Dotenv::class, 'bootEnv')) {
throw new \LogicException(\sprintf("Method bootEnv de \"%s\" doesn't exist.", Dotenv::class));
}

(new Dotenv())->bootEnv($this->projectDir.'/.env');

if (!class_exists(Kernel::class) && !isset($_ENV['KERNEL_CLASS'])) {
throw new \LogicException('You must set the KERNEL_CLASS environment variable to the fully-qualified class name of your Kernel in .env or have "%s" class.', Kernel::class);
} elseif (isset($_ENV['KERNEL_CLASS'])) {
$kernel = new $_ENV['KERNEL_CLASS']($this->env, $this->isDebug);
} else {
$kernel = new Kernel($this->env, $this->isDebug);
}

$kernel->boot();

$container = $kernel->getContainer();
$cache->set('cache-container', $container);
}

return $container;
}

public function getEnvelope(): Envelope
{
return $this->envelope;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
use Symfony\Component\Messenger\Stamp\AckStamp;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;
Expand Down Expand Up @@ -64,6 +65,10 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope

/** @var AckStamp $ackStamp */
if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
if ($envelope->last(BusNameStamp::class) && 'parallel_bus' === $envelope->last(BusNameStamp::class)->getBusName()) {
throw new HandlerFailedException($envelope, [new LogicException("Parallel bus can't be used for batch messages")]);
}

$ack = new Acknowledger(get_debug_type($batchHandler), static function (?\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
if (null !== $e) {
$e = new HandlerFailedException($envelope, [$handlerDescriptor->getName() => $e]);
Expand All @@ -75,7 +80,6 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
});

$result = $this->callHandler($handler, $message, $ack, $envelope->last(HandlerArgumentsStamp::class));

if (!\is_int($result) || 0 > $result) {
throw new LogicException(\sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler)));
}
Expand Down
53 changes: 53 additions & 0 deletions src/Symfony/Component/Messenger/ParallelMessageBus.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger;

use Amp\Parallel\Worker\ContextWorkerPool;
use Symfony\Component\Messenger\Stamp\FutureStamp;

use function Amp\async;
use function Amp\Parallel\Worker\workerPool;

/**
* Using this bus will enable concurrent message processing without the need for multiple workers
* using multiple processes or threads.
* It requires a ZTS build of PHP 8.2+ and ext-parallel to create threads; otherwise, it will use processes.
*/
final class ParallelMessageBus implements MessageBusInterface
{
public static ?ContextWorkerPool $worker = null;

Check failure on line 27 in src/Symfony/Component/Messenger/ParallelMessageBus.php

View workflow job for this annotation

GitHub Actions / Psalm

UndefinedClass

src/Symfony/Component/Messenger/ParallelMessageBus.php:27:19: UndefinedClass: Class, interface or enum named Amp\Parallel\Worker\ContextWorkerPool does not exist (see https://psalm.dev/019)

Check failure on line 27 in src/Symfony/Component/Messenger/ParallelMessageBus.php

View workflow job for this annotation

GitHub Actions / Psalm

UndefinedClass

src/Symfony/Component/Messenger/ParallelMessageBus.php:27:19: UndefinedClass: Class, interface or enum named Amp\Parallel\Worker\ContextWorkerPool does not exist (see https://psalm.dev/019)

public function __construct(
private array $something,
private readonly string $env,
private readonly string $debug,
private readonly string $projectdir,
) {
if (!class_exists(ContextWorkerPool::class)) {
throw new \LogicException(\sprintf('Package "amp/parallel" is required to use the "%s". Try running "composer require amphp/parallel".', self::class));
}
}

public function dispatch(object $message, array $stamps = []): Envelope
{
$worker = (self::$worker ??= workerPool());

Check failure on line 42 in src/Symfony/Component/Messenger/ParallelMessageBus.php

View workflow job for this annotation

GitHub Actions / Psalm

UndefinedFunction

src/Symfony/Component/Messenger/ParallelMessageBus.php:42:38: UndefinedFunction: Function Amp\Parallel\Worker\workerPool does not exist (see https://psalm.dev/021)

$envelope = Envelope::wrap($message, $stamps);
$task = new DispatchTask($envelope, $stamps, $this->env, $this->debug, $this->projectdir);

Check failure on line 45 in src/Symfony/Component/Messenger/ParallelMessageBus.php

View workflow job for this annotation

GitHub Actions / Psalm

MissingDependency

src/Symfony/Component/Messenger/ParallelMessageBus.php:45:21: MissingDependency: Symfony\Component\Messenger\DispatchTask depends on class or interface amp\parallel\worker\task that does not exist (see https://psalm.dev/157)

$future = async(function () use ($worker, $task) {

Check failure on line 47 in src/Symfony/Component/Messenger/ParallelMessageBus.php

View workflow job for this annotation

GitHub Actions / Psalm

UndefinedFunction

src/Symfony/Component/Messenger/ParallelMessageBus.php:47:19: UndefinedFunction: Function Amp\async does not exist (see https://psalm.dev/021)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use static function here to avoid holding a reference to $this.

return $worker->submit($task);
});

return $envelope->with(new FutureStamp($future));
}
}
31 changes: 31 additions & 0 deletions src/Symfony/Component/Messenger/Stamp/FutureStamp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Stamp;

use Amp\Future;

/**
* This stamp allows passing the future representing the potential result of the handler,
* which is treated as an asynchronous operation,
* and will be retrieved later by the worker to ack or nack based on the obtained result.
*/
final readonly class FutureStamp implements StampInterface
{
public function __construct(private Future $future)

Check failure on line 23 in src/Symfony/Component/Messenger/Stamp/FutureStamp.php

View workflow job for this annotation

GitHub Actions / Psalm

UndefinedClass

src/Symfony/Component/Messenger/Stamp/FutureStamp.php:23:33: UndefinedClass: Class, interface or enum named Amp\Future does not exist (see https://psalm.dev/019)
{
}

public function getFuture(): Future

Check failure on line 27 in src/Symfony/Component/Messenger/Stamp/FutureStamp.php

View workflow job for this annotation

GitHub Actions / Psalm

UndefinedClass

src/Symfony/Component/Messenger/Stamp/FutureStamp.php:27:34: UndefinedClass: Class, interface or enum named Amp\Future does not exist (see https://psalm.dev/019)

Choose a reason for hiding this comment

The reason will be displayed to describe this com A377 ment to others. Learn more.

This leaks the amp/future directly, which is somewhat tricky to use. Ideally, we would just allow getting the value (or waiting for it) instead of leaking the internal implementation.

{
return $this->future;
}
}
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/Tests/Fixtures/App/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
KERNEL_CLASS=Symfony\Component\Messenger\Tests\Fixtures\App\Kernel
46 changes: 46 additions & 0 deletions src/Symfony/Component/Messenger/Tests/Fixtures/App/Kernel.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

namespace Symfony\Component\Messenger\Tests\Fixtures\App;

use Symfony\Bundle\FrameworkBundle\FrameworkBundle;
use Symfony\Bundle\FrameworkBundle\Kernel\MicroKernelTrait;
use Symfony\Component\Config\Loader\LoaderInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\HttpKernel\Kernel as SymfonyKernel;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface;

class Kernel extends SymfonyKernel
{
use MicroKernelTrait;

public function registerBundles(): iterable
{
yield new FrameworkBundle();
}

public function registerContainerConfiguration(LoaderInterface $loader): void
{
$loader->load(function (ContainerBuilder $container) use ($loader) {
$container->loadFromExtension('framework', [
'router' => [
'resource' => 'kernel::loadRoutes',
'type' => 'service',
],
]);
$container
->register('message.bus', MessageBus::class);
$container->setAlias(MessageBusInterface::class, 'message.bus')->setPublic(true);
});
}

public function getCacheDir(): string
{
return sys_get_temp_dir().'/'. \Symfony\Component\HttpKernel\Kernel::VERSION.'/EmptyAppKernel/cache/'.$this->environment;
}

public function getLogDir(): string
{
return sys_get_temp_dir().'/'.Kernel::VERSION.'/EmptyAppKernel/logs';

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return sys_get_temp_dir().'/'.Kernel::VERSION.'/EmptyAppKernel/logs';
return sys_get_temp_dir().DIRECTORY_SEPARATOR.Kernel::VERSION.DIRECTORY_SEPARATOR.'EmptyAppKernel'.DIRECTORY_SEPARATOR.'logs';

and others in this file?

}
}
53 changes: 53 additions & 0 deletions src/Symfony/Component/Messenger/Tests/ParallelMessageBusTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests;

use Amp\Parallel\Worker\Worker;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\ParallelMessageBus;
use Symfony\Component\Messenger\Stamp\FutureStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;

class ParallelMessageBusTest extends TestCase
{
private string $env = 'dev';
private bool $debug = false;
private string $projectDir = 'path/to/project';

public function testItHasTheRightInterface()
{
if (!class_exists(Worker::class)) {
$this->markTestSkipped(\sprintf('%s not available.', Worker::class));
}

$bus = new ParallelMessageBus([], $this->env, $this->debug, $this->projectDir);

$this->assertInstanceOf(MessageBusInterface::class, $bus);
}

public function testItReturnsWithFutureStamp()
{
if (!class_exists(Worker::class)) {
$this->markTestSkipped(\sprintf('%s not available.', Worker::class));
}

$message = new DummyMessage('Hello');

$bus = new ParallelMessageBus([], $this->env, $this->debug, $this->projectDir);

$envelope = $bus->dispatch(new Envelope($message));

$this->assertInstanceOf(FutureStamp::class, $envelope->last(FutureStamp::class));
}
}
Loading
Loading
0