From 522a31684aa3d2323402f4117496aa1238619e3d Mon Sep 17 00:00:00 2001 From: Allison Guilhem Date: Wed, 31 Jan 2024 18:31:53 +1100 Subject: [PATCH 1/2] test: non blocking --- .../FrameworkExtension.php | 13 +- .../Resources/config/messenger.php | 11 ++ .../Command/ConsumeMessagesCommand.php | 3 + .../Component/Messenger/DispatchTask.php | 88 ++++++++++++ .../Middleware/HandleMessageMiddleware.php | 6 +- .../Messenger/ParallelMessageBus.php | 53 ++++++++ .../Component/Messenger/Stamp/FutureStamp.php | 31 +++++ .../Messenger/Tests/Fixtures/App/.env | 1 + .../Messenger/Tests/Fixtures/App/Kernel.php | 46 +++++++ .../Tests/ParallelMessageBusTest.php | 53 ++++++++ .../Component/Messenger/Tests/WorkerTest.php | 127 ++++++++++++++++++ .../Messenger/TraceableMessageBus.php | 5 + src/Symfony/Component/Messenger/Worker.php | 92 ++++++++++++- src/Symfony/Component/Messenger/composer.json | 4 + 14 files changed, 524 insertions(+), 9 deletions(-) create mode 100644 src/Symfony/Component/Messenger/DispatchTask.php create mode 100644 src/Symfony/Component/Messenger/ParallelMessageBus.php create mode 100644 src/Symfony/Component/Messenger/Stamp/FutureStamp.php create mode 100644 src/Symfony/Component/Messenger/Tests/Fixtures/App/.env create mode 100644 src/Symfony/Component/Messenger/Tests/Fixtures/App/Kernel.php create mode 100644 src/Symfony/Component/Messenger/Tests/ParallelMessageBusTest.php diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index a44e9a25bb540..1056cdb9a0b0b 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -11,6 +11,7 @@ namespace Symfony\Bundle\FrameworkBundle\DependencyInjection; +use Amp\Parallel\Worker\Task; use Composer\InstalledVersions; use Http\Client\HttpAsyncClient; use Http\Client\HttpClient; @@ -113,6 +114,7 @@ use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Middleware\RouterContextMiddleware; +use Symfony\Component\Messenger\ParallelMessageBus; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\TransportFactoryInterface as MessengerTransportFactoryInterface; use Symfony\Component\Messenger\Transport\TransportInterface; @@ -2090,6 +2092,10 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $loader->load('messenger.php'); + if (!class_exists(ParallelMessageBus::class)) { + $container->removeDefinition('parallel_bus'); + } + if (!interface_exists(DenormalizerInterface::class)) { $container->removeDefinition('serializer.normalizer.flatten_exception'); } @@ -2161,7 +2167,12 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder if ($busId === $config['default_bus']) { $container->setAlias('messenger.default_bus', $busId)->setPublic(true); - $container->setAlias(MessageBusInterface::class, $busId); + + $messageBusAlias = $container->setAlias(MessageBusInterface::class, $busId); + + if (class_exists(Task::class)) { + $messageBusAlias->setPublic(true); + } } else { $container->registerAliasForArgument($busId, MessageBusInterface::class); } diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index df247609653f3..782566dae4de1 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -33,6 +33,7 @@ use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Middleware\TraceableMiddleware; use Symfony\Component\Messenger\Middleware\ValidationMiddleware; +use Symfony\Component\Messenger\ParallelMessageBus; use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; use Symfony\Component\Messenger\RoutableMessageBus; use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory; @@ -54,6 +55,7 @@ abstract_arg('per message senders map'), abstract_arg('senders service locator'), ]) + ->set('messenger.middleware.send_message', SendMessageMiddleware::class) ->abstract() ->args([ @@ -134,6 +136,15 @@ ]) ->tag('messenger.transport_factory') + ->set('parallel_bus', ParallelMessageBus::class) + ->args([ + [], + param('kernel.environment'), + param('kernel.debug'), + param('kernel.project_dir'), + ]) + ->tag('messenger.bus') + ->set('messenger.transport.in_memory.factory', InMemoryTransportFactory::class) ->args([ service('clock')->nullOnInvalid(), diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index a974859f6b23c..4cdae7fcbf32c 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -75,6 +75,7 @@ protected function configure(): void new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'), new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'), new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'), + new InputOption('parallel-limit', 'p', InputOption::VALUE_REQUIRED, 'The number of concurrent processes', 10), ]) ->setHelp(<<<'EOF' The %command.name% command consumes messages and dispatches them to the message bus. @@ -240,6 +241,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int $options['queues'] = $queues; } + $options['parallel-limit'] = $input->getOption('parallel-limit'); + try { $this->worker->run($options); } finally { diff --git a/src/Symfony/Component/Messenger/DispatchTask.php b/src/Symfony/Component/Messenger/DispatchTask.php new file mode 100644 index 0000000000000..ab1506ef2ac1c --- /dev/null +++ b/src/Symfony/Component/Messenger/DispatchTask.php @@ -0,0 +1,88 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +use Amp\Cache\LocalCache; +use Amp\Cancellation; +use Amp\Parallel\Worker\Task; +use Amp\Sync\Channel; +use App\Kernel; +use Psr\Container\ContainerInterface; +use Symfony\Component\Dotenv\Dotenv; +use Symfony\Component\Messenger\Stamp\AckStamp; + +class DispatchTask implements Task +{ + private static ?LocalCache $cache = null; + + public function __construct( + private Envelope $envelope, + private array $stamps, + private readonly string $env, + private readonly bool $isDebug, + private readonly string $projectDir, + ) { + if (!class_exists(LocalCache::class)) { + throw new \LogicException(\sprintf('Package "amp/cache" is required to use the "%s". Try running "composer require amphp/cache".', LocalCache::class)); + } + } + + public function run(Channel $channel, Cancellation $cancellation): mixed + { + $container = $this->getContainer(); + $envelope = $this->dispatch($container, $channel); + + return $envelope->withoutStampsOfType(AckStamp::class); + } + + private function dispatch(ContainerInterface $container, $channel) + { + $messageBus = $container->get(MessageBusInterface::class); + + return $messageBus->dispatch($this->envelope, $this->stamps); + } + + private function getContainer() + { + $cache = self::$cache ??= new LocalCache(); + $container = $cache->get('cache-container'); + + // if not in cache, create container + if (!$container) { + if (!method_exists(Dotenv::class, 'bootEnv')) { + throw new \LogicException(\sprintf("Method bootEnv de \"%s\" doesn't exist.", Dotenv::class)); + } + + (new Dotenv())->bootEnv($this->projectDir.'/.env'); + + if (!class_exists(Kernel::class) && !isset($_ENV['KERNEL_CLASS'])) { + throw new \LogicException('You must set the KERNEL_CLASS environment variable to the fully-qualified class name of your Kernel in .env or have "%s" class.', Kernel::class); + } elseif (isset($_ENV['KERNEL_CLASS'])) { + $kernel = new $_ENV['KERNEL_CLASS']($this->env, $this->isDebug); + } else { + $kernel = new Kernel($this->env, $this->isDebug); + } + + $kernel->boot(); + + $container = $kernel->getContainer(); + $cache->set('cache-container', $container); + } + + return $container; + } + + public function getEnvelope(): Envelope + { + return $this->envelope; + } +} diff --git a/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php index aa5ddab26c470..55d5bd1fd53f4 100644 --- a/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php @@ -20,6 +20,7 @@ use Symfony\Component\Messenger\Handler\HandlerDescriptor; use Symfony\Component\Messenger\Handler\HandlersLocatorInterface; use Symfony\Component\Messenger\Stamp\AckStamp; +use Symfony\Component\Messenger\Stamp\BusNameStamp; use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp; use Symfony\Component\Messenger\Stamp\HandledStamp; use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp; @@ -64,6 +65,10 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope /** @var AckStamp $ackStamp */ if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) { + if ($envelope->last(BusNameStamp::class) && 'parallel_bus' === $envelope->last(BusNameStamp::class)->getBusName()) { + throw new HandlerFailedException($envelope, [new LogicException("Parallel bus can't be used for batch messages")]); + } + $ack = new Acknowledger(get_debug_type($batchHandler), static function (?\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) { if (null !== $e) { $e = new HandlerFailedException($envelope, [$handlerDescriptor->getName() => $e]); @@ -75,7 +80,6 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope }); $result = $this->callHandler($handler, $message, $ack, $envelope->last(HandlerArgumentsStamp::class)); - if (!\is_int($result) || 0 > $result) { throw new LogicException(\sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler))); } diff --git a/src/Symfony/Component/Messenger/ParallelMessageBus.php b/src/Symfony/Component/Messenger/ParallelMessageBus.php new file mode 100644 index 0000000000000..649ae1b7bb5e2 --- /dev/null +++ b/src/Symfony/Component/Messenger/ParallelMessageBus.php @@ -0,0 +1,53 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +use Amp\Parallel\Worker\ContextWorkerPool; +use Symfony\Component\Messenger\Stamp\FutureStamp; + +use function Amp\async; +use function Amp\Parallel\Worker\workerPool; + +/** + * Using this bus will enable concurrent message processing without the need for multiple workers + * using multiple processes or threads. + * It requires a ZTS build of PHP 8.2+ and ext-parallel to create threads; otherwise, it will use processes. + */ +final class ParallelMessageBus implements MessageBusInterface +{ + public static ?ContextWorkerPool $worker = null; + + public function __construct( + private array $something, + private readonly string $env, + private readonly string $debug, + private readonly string $projectdir, + ) { + if (!class_exists(ContextWorkerPool::class)) { + throw new \LogicException(\sprintf('Package "amp/parallel" is required to use the "%s". Try running "composer require amphp/parallel".', self::class)); + } + } + + public function dispatch(object $message, array $stamps = []): Envelope + { + $worker = (self::$worker ??= workerPool()); + + $envelope = Envelope::wrap($message, $stamps); + $task = new DispatchTask($envelope, $stamps, $this->env, $this->debug, $this->projectdir); + + $future = async(function () use ($worker, $task) { + return $worker->submit($task); + }); + + return $envelope->with(new FutureStamp($future)); + } +} diff --git a/src/Symfony/Component/Messenger/Stamp/FutureStamp.php b/src/Symfony/Component/Messenger/Stamp/FutureStamp.php new file mode 100644 index 0000000000000..04ff3d98d7394 --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/FutureStamp.php @@ -0,0 +1,31 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +use Amp\Future; + +/** + * This stamp allows passing the future representing the potential result of the handler, + * which is treated as an asynchronous operation, + * and will be retrieved later by the worker to ack or nack based on the obtained result. + */ +final readonly class FutureStamp implements StampInterface +{ + public function __construct(private Future $future) + { + } + + public function getFuture(): Future + { + return $this->future; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/App/.env b/src/Symfony/Component/Messenger/Tests/Fixtures/App/.env new file mode 100644 index 0000000000000..a1054d1e745eb --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Fixtures/App/.env @@ -0,0 +1 @@ +KERNEL_CLASS=Symfony\Component\Messenger\Tests\Fixtures\App\Kernel diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/App/Kernel.php b/src/Symfony/Component/Messenger/Tests/Fixtures/App/Kernel.php new file mode 100644 index 0000000000000..6c99777274fbf --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Fixtures/App/Kernel.php @@ -0,0 +1,46 @@ +load(function (ContainerBuilder $container) use ($loader) { + $container->loadFromExtension('framework', [ + 'router' => [ + 'resource' => 'kernel::loadRoutes', + 'type' => 'service', + ], + ]); + $container + ->register('message.bus', MessageBus::class); + $container->setAlias(MessageBusInterface::class, 'message.bus')->setPublic(true); + }); + } + + public function getCacheDir(): string + { + return sys_get_temp_dir().'/'. \Symfony\Component\HttpKernel\Kernel::VERSION.'/EmptyAppKernel/cache/'.$this->environment; + } + + public function getLogDir(): string + { + return sys_get_temp_dir().'/'.Kernel::VERSION.'/EmptyAppKernel/logs'; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/ParallelMessageBusTest.php b/src/Symfony/Component/Messenger/Tests/ParallelMessageBusTest.php new file mode 100644 index 0000000000000..dd55e25a12ae2 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/ParallelMessageBusTest.php @@ -0,0 +1,53 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests; + +use Amp\Parallel\Worker\Worker; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\ParallelMessageBus; +use Symfony\Component\Messenger\Stamp\FutureStamp; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; + +class ParallelMessageBusTest extends TestCase +{ + private string $env = 'dev'; + private bool $debug = false; + private string $projectDir = 'path/to/project'; + + public function testItHasTheRightInterface() + { + if (!class_exists(Worker::class)) { + $this->markTestSkipped(\sprintf('%s not available.', Worker::class)); + } + + $bus = new ParallelMessageBus([], $this->env, $this->debug, $this->projectDir); + + $this->assertInstanceOf(MessageBusInterface::class, $bus); + } + + public function testItReturnsWithFutureStamp() + { + if (!class_exists(Worker::class)) { + $this->markTestSkipped(\sprintf('%s not available.', Worker::class)); + } + + $message = new DummyMessage('Hello'); + + $bus = new ParallelMessageBus([], $this->env, $this->debug, $this->projectDir); + + $envelope = $bus->dispatch(new Envelope($message)); + + $this->assertInstanceOf(FutureStamp::class, $envelope->last(FutureStamp::class)); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 12deb98f25358..5716495a975f7 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -11,12 +11,18 @@ namespace Symfony\Component\Messenger\Tests; +use Amp\Future; +use Amp\Parallel\Worker\Execution; +use Amp\Parallel\Worker\Worker as ParallelWorker; use PHPUnit\Framework\TestCase; use Psr\EventDispatcher\EventDispatcherInterface; use Psr\Log\LoggerInterface; use Symfony\Component\Clock\MockClock; use Symfony\Component\EventDispatcher\EventDispatcher; +use Symfony\Component\Filesystem\Filesystem; use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter; +use Symfony\Component\HttpKernel\Kernel; +use Symfony\Component\Messenger\DispatchTask; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; @@ -36,6 +42,8 @@ use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; +use Symfony\Component\Messenger\ParallelMessageBus; +use Symfony\Component\Messenger\Stamp\BusNameStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\SentStamp; @@ -49,11 +57,18 @@ use Symfony\Component\RateLimiter\RateLimiterFactory; use Symfony\Component\RateLimiter\Storage\InMemoryStorage; +use function Amp\async; +use function Amp\Parallel\Worker\workerPool; + /** * @group time-sensitive */ class WorkerTest extends TestCase { + private string $env = 'dev'; + private bool $debug = false; + private string $projectdir = __DIR__.'/Fixtures/App'; + public function testWorkerDispatchTheReceivedMessage() { $apiMessage = new DummyMessage('API'); @@ -102,6 +117,118 @@ public function dispatch(object $event): object $this->assertSame(2, $receiver->getAcknowledgeCount()); } + public function testFlushAllFuturesFromParallelMessageBus() + { + if (!class_exists(ParallelWorker::class)) { + $this->markTestSkipped(\sprintf('%s not available.', Worker::class)); + } + + $apiMessage = new DummyMessage('API'); + $ipaMessage = new DummyMessage('IPA'); + + $receiver = new DummyReceiver([ + [(new Envelope($apiMessage))->with(new BusNameStamp('parallel_bus')), (new Envelope($ipaMessage))->with(new BusNameStamp('parallel_bus'))], + ]); + + $bus = new ParallelMessageBus([$receiver], $this->env, $this->debug, $this->projectdir); + + $dispatcher = new class implements EventDispatcherInterface { + private StopWorkerOnMessageLimitListener $listener; + + public function __construct() + { + $this->listener = new StopWorkerOnMessageLimitListener(2); + } + + public function dispatch(object $event): object + { + if ($event instanceof WorkerRunningEvent) { + $this->listener->onWorkerRunning($event); + } + + return $event; + } + }; + + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher, clock: new MockClock()); + + $worker->run(); + + $this->assertSame(2, $receiver->getAcknowledgeCount()); + } + + public function testFuturesReceived() + { + if (!class_exists(ParallelWorker::class)) { + $this->markTestSkipped(\sprintf('%s not available.', Worker::class)); + } + + $apiMessage = new DummyMessage('API'); + $ipaMessage = new DummyMessage('IPA'); + + $receiver = new DummyReceiver([ + [(new Envelope($apiMessage))->with(new BusNameStamp('parallel_bus')), (new Envelope($ipaMessage))->with(new BusNameStamp('parallel_bus'))], + ]); + + $bus = $this->createMock(ParallelMessageBus::class); + $futures = []; + + $workerPool = workerPool(); + + $bus->expects($this->exactly(2)) + ->method('dispatch') + ->willReturnCallback(function ($envelope) use (&$futures, $workerPool) { + return $futures[] = async(function () use ($workerPool, $envelope) { + return $workerPool->submit( + new DispatchTask($envelope, [], $this->env, $this->debug, $this->projectdir) + ); + }); + }); + + $dispatcher = new class implements EventDispatcherInterface { + private StopWorkerOnMessageLimitListener $listener; + + public function __construct() + { + $this->listener = new StopWorkerOnMessageLimitListener(2); + } + + public function dispatch(object $event): object + { + if ($event instanceof WorkerRunningEvent) { + $this->listener->onWorkerRunning($event); + } + + return $event; + } + }; + + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher, clock: new MockClock()); + + $worker->run(); + + $this->assertSame($futures[0]::class, Future::class); + $execution = $futures[0]->await(); + $executionOther = $futures[1]->await(); + $this->assertSame($execution::class, Execution::class); + $this->assertSame($executionOther::class, Execution::class); + $envelope = $execution->await(); + $envelopeOther = $executionOther->await(); + + $this->assertSame($apiMessage->getMessage(), $envelope->getMessage()->getMessage()); + $this->assertSame($ipaMessage->getMessage(), $envelopeOther->getMessage()->getMessage()); + $this->assertCount(1, $envelope->all(ReceivedStamp::class)); + $this->assertCount(1, $envelope->all(ConsumedByWorkerStamp::class)); + $this->assertSame('transport', $envelope->last(ReceivedStamp::class)->getTransportName()); + + if (!file_exists($dir = sys_get_temp_dir().'/'.Kernel::VERSION.'/EmptyAppKernel')) { + return; + } + + $fs = new Filesystem(); + $fs->remove($dir); + } + public function testHandlingErrorCausesReject() { $receiver = new DummyReceiver([ diff --git a/src/Symfony/Component/Messenger/TraceableMessageBus.php b/src/Symfony/Component/Messenger/TraceableMessageBus.php index 7f0ac09219f18..c0ba090227885 100644 --- a/src/Symfony/Component/Messenger/TraceableMessageBus.php +++ b/src/Symfony/Component/Messenger/TraceableMessageBus.php @@ -101,4 +101,9 @@ private function getCaller(): array 'line' => $line, ]; } + + public function getMessageBus(): MessageBusInterface + { + return $this->decoratedBus; + } } diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index c4c1a2462dc1c..4ed256779b884 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger; +use Amp\Future; use Psr\EventDispatcher\EventDispatcherInterface; use Psr\Log\LoggerInterface; use Symfony\Component\Clock\Clock; @@ -26,8 +27,10 @@ use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; use Symfony\Component\Messenger\Exception\RuntimeException; use Symfony\Component\Messenger\Stamp\AckStamp; +use Symfony\Component\Messenger\Stamp\BusNameStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp; +use Symfony\Component\Messenger\Stamp\FutureStamp; use Symfony\Component\Messenger\Stamp\NoAutoAckStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; @@ -48,6 +51,8 @@ class Worker private array $acks = []; private \SplObjectStorage $unacks; + private static $futures = []; + /** * @param ReceiverInterface[] $receivers Where the key is the transport name */ @@ -102,11 +107,16 @@ public function run(array $options = []): void $envelopes = $receiver->get(); } + if (!$envelopes) { + // flush + $this->handleFutures($transportName, 0); + } + foreach ($envelopes as $envelope) { $envelopeHandled = true; $this->rateLimit($transportName); - $this->handleMessage($envelope, $transportName); + $this->handleMessage($envelope, $transportName, $options['parallel-limit'] ?? 0); $this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false)); if ($this->shouldStop) { @@ -141,7 +151,7 @@ public function run(array $options = []): void $this->eventDispatcher?->dispatch(new WorkerStoppedEvent($this)); } - private function handleMessage(Envelope $envelope, string $transportName): void + private function handleMessage(Envelope $envelope, string $transportName, int $parallelProcessesLimit): void { $event = new WorkerMessageReceivedEvent($envelope, $transportName); $this->eventDispatcher?->dispatch($event); @@ -152,17 +162,49 @@ private function handleMessage(Envelope $envelope, string $transportName): void } $acked = false; - $ack = function (Envelope $envelope, ?\Throwable $e = null) use ($transportName, &$acked) { - $acked = true; - $this->acks[] = [$transportName, $envelope, $e]; - }; + $busNameStamp = $envelope->last(BusNameStamp::class); try { $e = null; - $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp(), new AckStamp($ack))); + + $envelope = $envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()); + + // "non concurrent" behaviour + if (!$busNameStamp || 'parallel_bus' !== $busNameStamp->getBusName()) { + $ack = function (Envelope $envelope, ?\Throwable $e = null) use ($transportName, &$acked) { + $acked = true; + $this->acks[] = [$transportName, $envelope, $e]; + }; + + $envelope = $envelope->with(new AckStamp($ack)); + } + + $envelope = $this->bus->dispatch($envelope); + + // "non concurrent" behaviour + if (null == $futureStamp = $envelope->last(FutureStamp::class)) { + $this->preAck($envelope, $transportName, $acked, $e); + + return; + } + + $envelope = $envelope->withoutStampsOfType(FutureStamp::class); + self::$futures[] = [$futureStamp->getFuture(), $envelope]; } catch (\Throwable $e) { + $this->preAck($envelope, $transportName, $acked, $e); + + return; } + if (\count(self::$futures) < $parallelProcessesLimit) { + return; + } + + $this->handleFutures($transportName, $parallelProcessesLimit); + } + + private function preAck(Envelope $envelope, string $transportName, bool $acked, $e): void + { $noAutoAckStamp = $envelope->last(NoAutoAckStamp::class); if (!$acked && !$noAutoAckStamp) { @@ -281,4 +323,40 @@ public function getMetadata(): WorkerMetadata { return $this->metadata; } + + public function handleFutures(string $transportName, $parallelProcessLimit): void + { + $toHandle = self::$futures; + self::$futures = []; + + if (!$toHandle) { + return; + } + + $futuresReceived = []; + $envelopesAssociated = []; + + foreach ($toHandle as $combo) { + $futuresReceived[] = $combo[0]; + $envelopesAssociated[] = $combo[1]; + } + + foreach (Future::iterate($futuresReceived) as $idx => $future) { + try { + $execution = $future->await(); + $execution->getFuture()->await(); + $envelope = $execution->getFuture()->await(); + + unset($futuresReceived[$idx]); + + $this->preAck($envelope, $transportName, false, null); + } catch (\Throwable $e) { + $envelope = $envelopesAssociated[$idx]; + + unset($futuresReceived[$idx]); + + $this->preAck($envelope, $transportName, false, $e); + } + } + } } diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json index 3fdfe4a55ee26..88b7c55f911bb 100644 --- a/src/Symfony/Component/Messenger/composer.json +++ b/src/Symfony/Component/Messenger/composer.json @@ -22,10 +22,14 @@ "symfony/deprecation-contracts": "^2.5|^3" }, "require-dev": { + "amphp/amp": "^3.0", + "amphp/parallel": "^2.0", "psr/cache": "^1.0|^2.0|^3.0", "symfony/console": "^6.4|^7.0", + "symfony/dotenv": "^6.4|^7.0", "symfony/dependency-injection": "^6.4|^7.0", "symfony/event-dispatcher": "^6.4|^7.0", + "symfony/framework-bundle": "^6.4|^7.0", "symfony/http-kernel": "^6.4|^7.0", "symfony/process": "^6.4|^7.0", "symfony/property-access": "^6.4|^7.0", From 29b4e97ffa99b9ca46da24e018a8a1ab435f6294 Mon Sep 17 00:00:00 2001 From: Mathieu Ledru Date: Fri, 28 Mar 2025 19:18:26 +0100 Subject: [PATCH 2/2] Implement BatchAsyncHandlerTrait for parallel message processing --- src/Symfony/Component/Messenger/CHANGELOG.md | 6 + .../Handler/BatchAsyncHandlerTrait.php | 156 ++++++++++++ .../Handler/BatchAsyncHandlerTraitTest.php | 235 ++++++++++++++++++ 3 files changed, 397 insertions(+) create mode 100644 src/Symfony/Component/Messenger/Handler/BatchAsyncHandlerTrait.php create mode 100644 src/Symfony/Component/Messenger/Tests/Handler/BatchAsyncHandlerTraitTest.php diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index eb35afe06c0c9..0945b8d87123b 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -1,6 +1,12 @@ CHANGELOG ========= +7.3 +--- + + * Add `Symfony\Component\Messenger\Handler\BatchAsyncHandlerTrait` designed for parallel execution using ParallelMessageBus + + 7.2 --- diff --git a/src/Symfony/Component/Messenger/Handler/BatchAsyncHandlerTrait.php b/src/Symfony/Component/Messenger/Handler/BatchAsyncHandlerTrait.php new file mode 100644 index 0000000000000..eab2a7d801cfe --- /dev/null +++ b/src/Symfony/Component/Messenger/Handler/BatchAsyncHandlerTrait.php @@ -0,0 +1,156 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Handler; + +use Amp\Future; +use Symfony\Component\Messenger\Stamp\FutureStamp; +use Symfony\Component\Messenger\ParallelMessageBus; +use Symfony\Component\Messenger\Envelope; + +/** + * A batch handler trait designed for parallel execution using ParallelMessageBus. + * + * This trait collects jobs in worker-specific batches and processes them + * in parallel by dispatching each job individually through ParallelMessageBus. + */ +trait BatchAsyncHandlerTrait +{ + /** @var array Map of worker IDs to their job batches */ + private array $workerJobs = []; + + /** @var ParallelMessageBus|null */ + private ?ParallelMessageBus $parallelBus = null; + + /** + * Set the parallel message bus to use for dispatching jobs. + */ + public function setParallelMessageBus(ParallelMessageBus $bus): void + { + $this->parallelBus = $bus; + } + + public function flush(bool $force): void + { + $workerId = $this->getCurrentWorkerId(); + + if (isset($this->workerJobs[$workerId]) && $jobs = $this->workerJobs[$workerId]) { + $this->workerJobs[$workerId] = []; + + if ($this->parallelBus) { + // Process each job in parallel using ParallelMessageBus + $futures = []; + + foreach ($jobs as [$message, $ack]) { + // Dispatch each message individually + $envelope = $this->parallelBus->dispatch($message); + + $futureStamp = $envelope->last(FutureStamp::class); + if ($futureStamp) { + /** @var Future $future */ + $future = $futureStamp->getFuture(); + $futures[] = [$future, $ack]; + } + } + + // If force is true, wait for all results + if ($force && $futures) { + foreach ($futures as [$future, $ack]) { + try { + $result = $future->await(); + $ack->ack($result); + } catch (\Throwable $e) { + $ack->nack($e); + } + } + } + } else { + // Fallback to synchronous processing + $this->process($jobs); + } + } + } + + /** + * @param Acknowledger|null $ack The function to call to ack/nack the $message. + * + * @return mixed The number of pending messages in the batch if $ack is not null, + * the result from handling the message otherwise + */ + private function handle(object $message, ?Acknowledger $ack): mixed + { + $workerId = $this->getCurrentWorkerId(); + + if (!isset($this->workerJobs[$workerId])) { + $this->workerJobs[$workerId] = []; + } + + if (null === $ack) { + $ack = new Acknowledger(get_debug_type($this)); + $this->workerJobs[$workerId][] = [$message, $ack]; + $this->flush(true); + + return $ack->getResult(); + } + + $this->workerJobs[$workerId][] = [$message, $ack]; + if (!$this->shouldFlush()) { + return \count($this->workerJobs[$workerId]); + } + + $this->flush(true); + + return 0; + } + + private function shouldFlush(): bool + { + $workerId = $this->getCurrentWorkerId(); + return $this->getBatchSize() <= \count($this->workerJobs[$workerId] ?? []); + } + + /** + * Generates a unique identifier for the current worker context. + */ + private function getCurrentWorkerId(): string + { + // In a worker pool, each worker has a unique ID + return getmypid() ?: 'default-worker'; + } + + /** + * Cleans up worker-specific resources when a worker completes its job. + */ + public function cleanupWorker(): void + { + $workerId = $this->getCurrentWorkerId(); + + // Flush any remaining jobs before cleaning up + if (isset($this->workerJobs[$workerId]) && !empty($this->workerJobs[$workerId])) { + $this->flush(true); + } + + unset($this->workerJobs[$workerId]); + } + + /** + * Completes the jobs in the list. + * This is used as a fallback when ParallelMessageBus is not available. + * + * @param list $jobs A list of pairs of messages and their corresponding acknowledgers + */ + abstract private function process(array $jobs): void; + + private function getBatchSize(): int + { + return 10; + } +} \ No newline at end of file diff --git a/src/Symfony/Component/Messenger/Tests/Handler/BatchAsyncHandlerTraitTest.php b/src/Symfony/Component/Messenger/Tests/Handler/BatchAsyncHandlerTraitTest.php new file mode 100644 index 0000000000000..6f8ddc0ccf56f --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Handler/BatchAsyncHandlerTraitTest.php @@ -0,0 +1,235 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Handler; + +use Amp\Future; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Handler\Acknowledger; +use Symfony\Component\Messenger\Handler\BatchAsyncHandlerTrait; +use Symfony\Component\Messenger\ParallelMessageBus; +use Symfony\Component\Messenger\Stamp\FutureStamp; + +class BatchAsyncHandlerTraitTest extends TestCase +{ + public function testHandleWillProcessMessagesAfterReachingBatchSize() + { + $handler = new TestBatchAsyncHandler(); + $handler->setBatchSize(2); + + $message1 = new \stdClass(); + $message2 = new \stdClass(); + $message3 = new \stdClass(); + + $result1 = $handler->handle($message1, new Acknowledger('TestBatchAsyncHandler')); + $this->assertSame(1, $result1); + $this->assertEmpty($handler->getProcessedJobs()); + + $result2 = $handler->handle($message2, new Acknowledger('TestBatchAsyncHandler')); + $this->assertSame(0, $result2); + $this->assertCount(2, $handler->getProcessedJobs()[0]); + + $result3 = $handler->handle($message3, new Acknowledger('TestBatchAsyncHandler')); + $this->assertSame(1, $result3); + $this->assertCount(2, $handler->getProcessedJobs()[0]); + } + + public function testHandleWithNullAcknowledgerProcessesImmediately() + { + $handler = new TestBatchAsyncHandler(); + $handler->setBatchSize(5); + + $message = new \stdClass(); + $message->payload = 'test'; + + $result = $handler->handle($message, null); + $this->assertSame('processed:test', $result); + $this->assertCount(1, $handler->getProcessedJobs()[0]); + } + + public function testFlushWithForceTrueProcessesRegardlessOfBatchSize() + { + $handler = new TestBatchAsyncHandler(); + $handler->setBatchSize(5); + + $message1 = new \stdClass(); + $message2 = new \stdClass(); + + $handler->handle($message1, new Acknowledger('TestBatchAsyncHandler')); + $handler->handle($message2, new Acknowledger('TestBatchAsyncHandler')); + + $this->assertCount(0, $handler->getProcessedJobs()); + + $handler->flush(true); + $this->assertCount(2, $handler->getProcessedJobs()[0]); + } + + public function testParallelDispatch() + { + $message1 = new \stdClass(); + $message1->payload = 'test1'; + + $message2 = new \stdClass(); + $message2->payload = 'test2'; + + $future1 = $this->createMock(Future::class); + $future1->expects($this->once()) + ->method('await') + ->willReturn('future_result1'); + + $future2 = $this->createMock(Future::class); + $future2->expects($this->once()) + ->method('await') + ->willReturn('future_result2'); + + $futureStamp1 = new FutureStamp($future1); + $futureStamp2 = new FutureStamp($future2); + + $envelope1 = new Envelope($message1, [$futureStamp1]); + $envelope2 = new Envelope($message2, [$futureStamp2]); + + $parallelBus = $this->createMock(ParallelMessageBus::class); + $parallelBus->expects($this->exactly(2)) + ->method('dispatch') + ->willReturnOnConsecutiveCalls($envelope1, $envelope2); + + $handler = new TestBatchAsyncHandler(); + $handler->setBatchSize(2); + $handler->setParallelMessageBus($parallelBus); + + $ack1 = $this->createMock(Acknowledger::class); + $ack1->expects($this->once()) + ->method('ack') + ->with('future_result1'); + + $ack2 = $this->createMock(Acknowledger::class); + $ack2->expects($this->once()) + ->method('ack') + ->with('future_result2'); + + $handler->handle($message1, $ack1); + $handler->handle($message2, $ack2); + + // Jobs should have been dispatched via the parallel bus + $this->assertEmpty($handler->getProcessedJobs()); + } + + public function testFallbackToSyncProcessingWhenNoBusAvailable() + { + $handler = new TestBatchAsyncHandler(); + $handler->setBatchSize(2); + + $message1 = new \stdClass(); + $message2 = new \stdClass(); + + $handler->handle($message1, new Acknowledger('TestBatchAsyncHandler')); + $handler->handle($message2, new Acknowledger('TestBatchAsyncHandler')); + + $this->assertCount(2, $handler->getProcessedJobs()[0]); + } + + public function testParallelDispatchWithException() + { + $message = new \stdClass(); + $message->payload = 'test_exception'; + + $future = $this->createMock(Future::class); + $future->expects($this->once()) + ->method('await') + ->willThrowException(new \RuntimeException('Test exception')); + + $futureStamp = new FutureStamp($future); + $envelope = new Envelope($message, [$futureStamp]); + + $parallelBus = $this->createMock(ParallelMessageBus::class); + $parallelBus->expects($this->once()) + ->method('dispatch') + ->willReturn($envelope); + + $handler = new TestBatchAsyncHandler(); + $handler->setParallelMessageBus($parallelBus); + + $ack = $this->createMock(Acknowledger::class); + $ack->expects($this->once()) + ->method('nack') + ->with($this->isInstanceOf(\RuntimeException::class)); + + $handler->handle($message, $ack); + $handler->flush(true); + } + + public function testCleanupWorker() + { + $handler = new TestBatchAsyncHandler(); + + $message = new \stdClass(); + $handler->handle($message, new Acknowledger('TestBatchAsyncHandler')); + + $this->assertEmpty($handler->getProcessedJobs()); + + $handler->cleanupWorker(); + + $this->assertCount(1, $handler->getProcessedJobs()[0]); + } +} + +/** + * Test implementation of BatchAsyncHandlerTrait. + */ +class TestBatchAsyncHandler +{ + use BatchAsyncHandlerTrait { + BatchAsyncHandlerTrait::getCurrentWorkerId as private traitGetCurrentWorkerId; + } + + private int $batchSize = 10; + private array $processedJobs = []; + + public function handle(object $message, ?Acknowledger $ack): mixed + { + return $this->{'handle'}($message, $ack); + } + + private function process(array $jobs): void + { + $this->processedJobs[] = $jobs; + + foreach ($jobs as [$message, $ack]) { + if (property_exists($message, 'payload')) { + $result = 'processed:' . $message->payload; + $ack->ack($result); + } else { + $ack->ack(true); + } + } + } + + public function setBatchSize(int $size): void + { + $this->batchSize = $size; + } + + public function getProcessedJobs(): array + { + return $this->processedJobs; + } + + private function getBatchSize(): int + { + return $this->batchSize; + } + + private function getCurrentWorkerId(): string + { + return 'test-worker-id'; + } +}