diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index a44e9a25bb540..1056cdb9a0b0b 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -11,6 +11,7 @@ namespace Symfony\Bundle\FrameworkBundle\DependencyInjection; +use Amp\Parallel\Worker\Task; use Composer\InstalledVersions; use Http\Client\HttpAsyncClient; use Http\Client\HttpClient; @@ -113,6 +114,7 @@ use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Middleware\RouterContextMiddleware; +use Symfony\Component\Messenger\ParallelMessageBus; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\TransportFactoryInterface as MessengerTransportFactoryInterface; use Symfony\Component\Messenger\Transport\TransportInterface; @@ -2090,6 +2092,10 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $loader->load('messenger.php'); + if (!class_exists(ParallelMessageBus::class)) { + $container->removeDefinition('parallel_bus'); + } + if (!interface_exists(DenormalizerInterface::class)) { $container->removeDefinition('serializer.normalizer.flatten_exception'); } @@ -2161,7 +2167,12 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder if ($busId === $config['default_bus']) { $container->setAlias('messenger.default_bus', $busId)->setPublic(true); - $container->setAlias(MessageBusInterface::class, $busId); + + $messageBusAlias = $container->setAlias(MessageBusInterface::class, $busId); + + if (class_exists(Task::class)) { + $messageBusAlias->setPublic(true); + } } else { $container->registerAliasForArgument($busId, MessageBusInterface::class); } diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index df247609653f3..782566dae4de1 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -33,6 +33,7 @@ use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Middleware\TraceableMiddleware; use Symfony\Component\Messenger\Middleware\ValidationMiddleware; +use Symfony\Component\Messenger\ParallelMessageBus; use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; use Symfony\Component\Messenger\RoutableMessageBus; use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory; @@ -54,6 +55,7 @@ abstract_arg('per message senders map'), abstract_arg('senders service locator'), ]) + ->set('messenger.middleware.send_message', SendMessageMiddleware::class) ->abstract() ->args([ @@ -134,6 +136,15 @@ ]) ->tag('messenger.transport_factory') + ->set('parallel_bus', ParallelMessageBus::class) + ->args([ + [], + param('kernel.environment'), + param('kernel.debug'), + param('kernel.project_dir'), + ]) + ->tag('messenger.bus') + ->set('messenger.transport.in_memory.factory', InMemoryTransportFactory::class) ->args([ service('clock')->nullOnInvalid(), diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index a974859f6b23c..4cdae7fcbf32c 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -75,6 +75,7 @@ protected function configure(): void new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'), new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'), new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'), + new InputOption('parallel-limit', 'p', InputOption::VALUE_REQUIRED, 'The number of concurrent processes', 10), ]) ->setHelp(<<<'EOF' The %command.name% command consumes messages and dispatches them to the message bus. @@ -240,6 +241,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int $options['queues'] = $queues; } + $options['parallel-limit'] = $input->getOption('parallel-limit'); + try { $this->worker->run($options); } finally { diff --git a/src/Symfony/Component/Messenger/DispatchTask.php b/src/Symfony/Component/Messenger/DispatchTask.php new file mode 100644 index 0000000000000..ab1506ef2ac1c --- /dev/null +++ b/src/Symfony/Component/Messenger/DispatchTask.php @@ -0,0 +1,88 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +use Amp\Cache\LocalCache; +use Amp\Cancellation; +use Amp\Parallel\Worker\Task; +use Amp\Sync\Channel; +use App\Kernel; +use Psr\Container\ContainerInterface; +use Symfony\Component\Dotenv\Dotenv; +use Symfony\Component\Messenger\Stamp\AckStamp; + +class DispatchTask implements Task +{ + private static ?LocalCache $cache = null; + + public function __construct( + private Envelope $envelope, + private array $stamps, + private readonly string $env, + private readonly bool $isDebug, + private readonly string $projectDir, + ) { + if (!class_exists(LocalCache::class)) { + throw new \LogicException(\sprintf('Package "amp/cache" is required to use the "%s". Try running "composer require amphp/cache".', LocalCache::class)); + } + } + + public function run(Channel $channel, Cancellation $cancellation): mixed + { + $container = $this->getContainer(); + $envelope = $this->dispatch($container, $channel); + + return $envelope->withoutStampsOfType(AckStamp::class); + } + + private function dispatch(ContainerInterface $container, $channel) + { + $messageBus = $container->get(MessageBusInterface::class); + + return $messageBus->dispatch($this->envelope, $this->stamps); + } + + private function getContainer() + { + $cache = self::$cache ??= new LocalCache(); + $container = $cache->get('cache-container'); + + // if not in cache, create container + if (!$container) { + if (!method_exists(Dotenv::class, 'bootEnv')) { + throw new \LogicException(\sprintf("Method bootEnv de \"%s\" doesn't exist.", Dotenv::class)); + } + + (new Dotenv())->bootEnv($this->projectDir.'/.env'); + + if (!class_exists(Kernel::class) && !isset($_ENV['KERNEL_CLASS'])) { + throw new \LogicException('You must set the KERNEL_CLASS environment variable to the fully-qualified class name of your Kernel in .env or have "%s" class.', Kernel::class); + } elseif (isset($_ENV['KERNEL_CLASS'])) { + $kernel = new $_ENV['KERNEL_CLASS']($this->env, $this->isDebug); + } else { + $kernel = new Kernel($this->env, $this->isDebug); + } + + $kernel->boot(); + + $container = $kernel->getContainer(); + $cache->set('cache-container', $container); + } + + return $container; + } + + public function getEnvelope(): Envelope + { + return $this->envelope; + } +} diff --git a/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php index aa5ddab26c470..55d5bd1fd53f4 100644 --- a/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php @@ -20,6 +20,7 @@ use Symfony\Component\Messenger\Handler\HandlerDescriptor; use Symfony\Component\Messenger\Handler\HandlersLocatorInterface; use Symfony\Component\Messenger\Stamp\AckStamp; +use Symfony\Component\Messenger\Stamp\BusNameStamp; use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp; use Symfony\Component\Messenger\Stamp\HandledStamp; use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp; @@ -64,6 +65,10 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope /** @var AckStamp $ackStamp */ if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) { + if ($envelope->last(BusNameStamp::class) && 'parallel_bus' === $envelope->last(BusNameStamp::class)->getBusName()) { + throw new HandlerFailedException($envelope, [new LogicException("Parallel bus can't be used for batch messages")]); + } + $ack = new Acknowledger(get_debug_type($batchHandler), static function (?\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) { if (null !== $e) { $e = new HandlerFailedException($envelope, [$handlerDescriptor->getName() => $e]); @@ -75,7 +80,6 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope }); $result = $this->callHandler($handler, $message, $ack, $envelope->last(HandlerArgumentsStamp::class)); - if (!\is_int($result) || 0 > $result) { throw new LogicException(\sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler))); } diff --git a/src/Symfony/Component/Messenger/ParallelMessageBus.php b/src/Symfony/Component/Messenger/ParallelMessageBus.php new file mode 100644 index 0000000000000..649ae1b7bb5e2 --- /dev/null +++ b/src/Symfony/Component/Messenger/ParallelMessageBus.php @@ -0,0 +1,53 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +use Amp\Parallel\Worker\ContextWorkerPool; +use Symfony\Component\Messenger\Stamp\FutureStamp; + +use function Amp\async; +use function Amp\Parallel\Worker\workerPool; + +/** + * Using this bus will enable concurrent message processing without the need for multiple workers + * using multiple processes or threads. + * It requires a ZTS build of PHP 8.2+ and ext-parallel to create threads; otherwise, it will use processes. + */ +final class ParallelMessageBus implements MessageBusInterface +{ + public static ?ContextWorkerPool $worker = null; + + public function __construct( + private array $something, + private readonly string $env, + private readonly string $debug, + private readonly string $projectdir, + ) { + if (!class_exists(ContextWorkerPool::class)) { + throw new \LogicException(\sprintf('Package "amp/parallel" is required to use the "%s". Try running "composer require amphp/parallel".', self::class)); + } + } + + public function dispatch(object $message, array $stamps = []): Envelope + { + $worker = (self::$worker ??= workerPool()); + + $envelope = Envelope::wrap($message, $stamps); + $task = new DispatchTask($envelope, $stamps, $this->env, $this->debug, $this->projectdir); + + $future = async(function () use ($worker, $task) { + return $worker->submit($task); + }); + + return $envelope->with(new FutureStamp($future)); + } +} diff --git a/src/Symfony/Component/Messenger/Stamp/FutureStamp.php b/src/Symfony/Component/Messenger/Stamp/FutureStamp.php new file mode 100644 index 0000000000000..04ff3d98d7394 --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/FutureStamp.php @@ -0,0 +1,31 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +use Amp\Future; + +/** + * This stamp allows passing the future representing the potential result of the handler, + * which is treated as an asynchronous operation, + * and will be retrieved later by the worker to ack or nack based on the obtained result. + */ +final readonly class FutureStamp implements StampInterface +{ + public function __construct(private Future $future) + { + } + + public function getFuture(): Future + { + return $this->future; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/App/.env b/src/Symfony/Component/Messenger/Tests/Fixtures/App/.env new file mode 100644 index 0000000000000..a1054d1e745eb --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Fixtures/App/.env @@ -0,0 +1 @@ +KERNEL_CLASS=Symfony\Component\Messenger\Tests\Fixtures\App\Kernel diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/App/Kernel.php b/src/Symfony/Component/Messenger/Tests/Fixtures/App/Kernel.php new file mode 100644 index 0000000000000..6c99777274fbf --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Fixtures/App/Kernel.php @@ -0,0 +1,46 @@ +load(function (ContainerBuilder $container) use ($loader) { + $container->loadFromExtension('framework', [ + 'router' => [ + 'resource' => 'kernel::loadRoutes', + 'type' => 'service', + ], + ]); + $container + ->register('message.bus', MessageBus::class); + $container->setAlias(MessageBusInterface::class, 'message.bus')->setPublic(true); + }); + } + + public function getCacheDir(): string + { + return sys_get_temp_dir().'/'. \Symfony\Component\HttpKernel\Kernel::VERSION.'/EmptyAppKernel/cache/'.$this->environment; + } + + public function getLogDir(): string + { + return sys_get_temp_dir().'/'.Kernel::VERSION.'/EmptyAppKernel/logs'; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/ParallelMessageBusTest.php b/src/Symfony/Component/Messenger/Tests/ParallelMessageBusTest.php new file mode 100644 index 0000000000000..dd55e25a12ae2 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/ParallelMessageBusTest.php @@ -0,0 +1,53 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests; + +use Amp\Parallel\Worker\Worker; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\ParallelMessageBus; +use Symfony\Component\Messenger\Stamp\FutureStamp; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; + +class ParallelMessageBusTest extends TestCase +{ + private string $env = 'dev'; + private bool $debug = false; + private string $projectDir = 'path/to/project'; + + public function testItHasTheRightInterface() + { + if (!class_exists(Worker::class)) { + $this->markTestSkipped(\sprintf('%s not available.', Worker::class)); + } + + $bus = new ParallelMessageBus([], $this->env, $this->debug, $this->projectDir); + + $this->assertInstanceOf(MessageBusInterface::class, $bus); + } + + public function testItReturnsWithFutureStamp() + { + if (!class_exists(Worker::class)) { + $this->markTestSkipped(\sprintf('%s not available.', Worker::class)); + } + + $message = new DummyMessage('Hello'); + + $bus = new ParallelMessageBus([], $this->env, $this->debug, $this->projectDir); + + $envelope = $bus->dispatch(new Envelope($message)); + + $this->assertInstanceOf(FutureStamp::class, $envelope->last(FutureStamp::class)); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 12deb98f25358..5716495a975f7 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -11,12 +11,18 @@ namespace Symfony\Component\Messenger\Tests; +use Amp\Future; +use Amp\Parallel\Worker\Execution; +use Amp\Parallel\Worker\Worker as ParallelWorker; use PHPUnit\Framework\TestCase; use Psr\EventDispatcher\EventDispatcherInterface; use Psr\Log\LoggerInterface; use Symfony\Component\Clock\MockClock; use Symfony\Component\EventDispatcher\EventDispatcher; +use Symfony\Component\Filesystem\Filesystem; use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter; +use Symfony\Component\HttpKernel\Kernel; +use Symfony\Component\Messenger\DispatchTask; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; @@ -36,6 +42,8 @@ use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; +use Symfony\Component\Messenger\ParallelMessageBus; +use Symfony\Component\Messenger\Stamp\BusNameStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\SentStamp; @@ -49,11 +57,18 @@ use Symfony\Component\RateLimiter\RateLimiterFactory; use Symfony\Component\RateLimiter\Storage\InMemoryStorage; +use function Amp\async; +use function Amp\Parallel\Worker\workerPool; + /** * @group time-sensitive */ class WorkerTest extends TestCase { + private string $env = 'dev'; + private bool $debug = false; + private string $projectdir = __DIR__.'/Fixtures/App'; + public function testWorkerDispatchTheReceivedMessage() { $apiMessage = new DummyMessage('API'); @@ -102,6 +117,118 @@ public function dispatch(object $event): object $this->assertSame(2, $receiver->getAcknowledgeCount()); } + public function testFlushAllFuturesFromParallelMessageBus() + { + if (!class_exists(ParallelWorker::class)) { + $this->markTestSkipped(\sprintf('%s not available.', Worker::class)); + } + + $apiMessage = new DummyMessage('API'); + $ipaMessage = new DummyMessage('IPA'); + + $receiver = new DummyReceiver([ + [(new Envelope($apiMessage))->with(new BusNameStamp('parallel_bus')), (new Envelope($ipaMessage))->with(new BusNameStamp('parallel_bus'))], + ]); + + $bus = new ParallelMessageBus([$receiver], $this->env, $this->debug, $this->projectdir); + + $dispatcher = new class implements EventDispatcherInterface { + private StopWorkerOnMessageLimitListener $listener; + + public function __construct() + { + $this->listener = new StopWorkerOnMessageLimitListener(2); + } + + public function dispatch(object $event): object + { + if ($event instanceof WorkerRunningEvent) { + $this->listener->onWorkerRunning($event); + } + + return $event; + } + }; + + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher, clock: new MockClock()); + + $worker->run(); + + $this->assertSame(2, $receiver->getAcknowledgeCount()); + } + + public function testFuturesReceived() + { + if (!class_exists(ParallelWorker::class)) { + $this->markTestSkipped(\sprintf('%s not available.', Worker::class)); + } + + $apiMessage = new DummyMessage('API'); + $ipaMessage = new DummyMessage('IPA'); + + $receiver = new DummyReceiver([ + [(new Envelope($apiMessage))->with(new BusNameStamp('parallel_bus')), (new Envelope($ipaMessage))->with(new BusNameStamp('parallel_bus'))], + ]); + + $bus = $this->createMock(ParallelMessageBus::class); + $futures = []; + + $workerPool = workerPool(); + + $bus->expects($this->exactly(2)) + ->method('dispatch') + ->willReturnCallback(function ($envelope) use (&$futures, $workerPool) { + return $futures[] = async(function () use ($workerPool, $envelope) { + return $workerPool->submit( + new DispatchTask($envelope, [], $this->env, $this->debug, $this->projectdir) + ); + }); + }); + + $dispatcher = new class implements EventDispatcherInterface { + private StopWorkerOnMessageLimitListener $listener; + + public function __construct() + { + $this->listener = new StopWorkerOnMessageLimitListener(2); + } + + public function dispatch(object $event): object + { + if ($event instanceof WorkerRunningEvent) { + $this->listener->onWorkerRunning($event); + } + + return $event; + } + }; + + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher, clock: new MockClock()); + + $worker->run(); + + $this->assertSame($futures[0]::class, Future::class); + $execution = $futures[0]->await(); + $executionOther = $futures[1]->await(); + $this->assertSame($execution::class, Execution::class); + $this->assertSame($executionOther::class, Execution::class); + $envelope = $execution->await(); + $envelopeOther = $executionOther->await(); + + $this->assertSame($apiMessage->getMessage(), $envelope->getMessage()->getMessage()); + $this->assertSame($ipaMessage->getMessage(), $envelopeOther->getMessage()->getMessage()); + $this->assertCount(1, $envelope->all(ReceivedStamp::class)); + $this->assertCount(1, $envelope->all(ConsumedByWorkerStamp::class)); + $this->assertSame('transport', $envelope->last(ReceivedStamp::class)->getTransportName()); + + if (!file_exists($dir = sys_get_temp_dir().'/'.Kernel::VERSION.'/EmptyAppKernel')) { + return; + } + + $fs = new Filesystem(); + $fs->remove($dir); + } + public function testHandlingErrorCausesReject() { $receiver = new DummyReceiver([ diff --git a/src/Symfony/Component/Messenger/TraceableMessageBus.php b/src/Symfony/Component/Messenger/TraceableMessageBus.php index 7f0ac09219f18..c0ba090227885 100644 --- a/src/Symfony/Component/Messenger/TraceableMessageBus.php +++ b/src/Symfony/Component/Messenger/TraceableMessageBus.php @@ -101,4 +101,9 @@ private function getCaller(): array 'line' => $line, ]; } + + public function getMessageBus(): MessageBusInterface + { + return $this->decoratedBus; + } } diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index c4c1a2462dc1c..4ed256779b884 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger; +use Amp\Future; use Psr\EventDispatcher\EventDispatcherInterface; use Psr\Log\LoggerInterface; use Symfony\Component\Clock\Clock; @@ -26,8 +27,10 @@ use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; use Symfony\Component\Messenger\Exception\RuntimeException; use Symfony\Component\Messenger\Stamp\AckStamp; +use Symfony\Component\Messenger\Stamp\BusNameStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp; +use Symfony\Component\Messenger\Stamp\FutureStamp; use Symfony\Component\Messenger\Stamp\NoAutoAckStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; @@ -48,6 +51,8 @@ class Worker private array $acks = []; private \SplObjectStorage $unacks; + private static $futures = []; + /** * @param ReceiverInterface[] $receivers Where the key is the transport name */ @@ -102,11 +107,16 @@ public function run(array $options = []): void $envelopes = $receiver->get(); } + if (!$envelopes) { + // flush + $this->handleFutures($transportName, 0); + } + foreach ($envelopes as $envelope) { $envelopeHandled = true; $this->rateLimit($transportName); - $this->handleMessage($envelope, $transportName); + $this->handleMessage($envelope, $transportName, $options['parallel-limit'] ?? 0); $this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false)); if ($this->shouldStop) { @@ -141,7 +151,7 @@ public function run(array $options = []): void $this->eventDispatcher?->dispatch(new WorkerStoppedEvent($this)); } - private function handleMessage(Envelope $envelope, string $transportName): void + private function handleMessage(Envelope $envelope, string $transportName, int $parallelProcessesLimit): void { $event = new WorkerMessageReceivedEvent($envelope, $transportName); $this->eventDispatcher?->dispatch($event); @@ -152,17 +162,49 @@ private function handleMessage(Envelope $envelope, string $transportName): void } $acked = false; - $ack = function (Envelope $envelope, ?\Throwable $e = null) use ($transportName, &$acked) { - $acked = true; - $this->acks[] = [$transportName, $envelope, $e]; - }; + $busNameStamp = $envelope->last(BusNameStamp::class); try { $e = null; - $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp(), new AckStamp($ack))); + + $envelope = $envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()); + + // "non concurrent" behaviour + if (!$busNameStamp || 'parallel_bus' !== $busNameStamp->getBusName()) { + $ack = function (Envelope $envelope, ?\Throwable $e = null) use ($transportName, &$acked) { + $acked = true; + $this->acks[] = [$transportName, $envelope, $e]; + }; + + $envelope = $envelope->with(new AckStamp($ack)); + } + + $envelope = $this->bus->dispatch($envelope); + + // "non concurrent" behaviour + if (null == $futureStamp = $envelope->last(FutureStamp::class)) { + $this->preAck($envelope, $transportName, $acked, $e); + + return; + } + + $envelope = $envelope->withoutStampsOfType(FutureStamp::class); + self::$futures[] = [$futureStamp->getFuture(), $envelope]; } catch (\Throwable $e) { + $this->preAck($envelope, $transportName, $acked, $e); + + return; } + if (\count(self::$futures) < $parallelProcessesLimit) { + return; + } + + $this->handleFutures($transportName, $parallelProcessesLimit); + } + + private function preAck(Envelope $envelope, string $transportName, bool $acked, $e): void + { $noAutoAckStamp = $envelope->last(NoAutoAckStamp::class); if (!$acked && !$noAutoAckStamp) { @@ -281,4 +323,40 @@ public function getMetadata(): WorkerMetadata { return $this->metadata; } + + public function handleFutures(string $transportName, $parallelProcessLimit): void + { + $toHandle = self::$futures; + self::$futures = []; + + if (!$toHandle) { + return; + } + + $futuresReceived = []; + $envelopesAssociated = []; + + foreach ($toHandle as $combo) { + $futuresReceived[] = $combo[0]; + $envelopesAssociated[] = $combo[1]; + } + + foreach (Future::iterate($futuresReceived) as $idx => $future) { + try { + $execution = $future->await(); + $execution->getFuture()->await(); + $envelope = $execution->getFuture()->await(); + + unset($futuresReceived[$idx]); + + $this->preAck($envelope, $transportName, false, null); + } catch (\Throwable $e) { + $envelope = $envelopesAssociated[$idx]; + + unset($futuresReceived[$idx]); + + $this->preAck($envelope, $transportName, false, $e); + } + } + } } diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json index 3fdfe4a55ee26..88b7c55f911bb 100644 --- a/src/Symfony/Component/Messenger/composer.json +++ b/src/Symfony/Component/Messenger/composer.json @@ -22,10 +22,14 @@ "symfony/deprecation-contracts": "^2.5|^3" }, "require-dev": { + "amphp/amp": "^3.0", + "amphp/parallel": "^2.0", "psr/cache": "^1.0|^2.0|^3.0", "symfony/console": "^6.4|^7.0", + "symfony/dotenv": "^6.4|^7.0", "symfony/dependency-injection": "^6.4|^7.0", "symfony/event-dispatcher": "^6.4|^7.0", + "symfony/framework-bundle": "^6.4|^7.0", "symfony/http-kernel": "^6.4|^7.0", "symfony/process": "^6.4|^7.0", "symfony/property-access": "^6.4|^7.0",