diff --git a/Command/ConsumeMessagesCommand.php b/Command/ConsumeMessagesCommand.php index 9345a71..f1b7d5a 100644 --- a/Command/ConsumeMessagesCommand.php +++ b/Command/ConsumeMessagesCommand.php @@ -143,6 +143,12 @@ protected function interact(InputInterface $input, OutputInterface $output): voi } if ($this->receiverNames && !$input->getArgument('receivers')) { + if (1 === \count($this->receiverNames)) { + $input->setArgument('receivers', $this->receiverNames); + + return; + } + $io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true); $io->writeln('Choose which receivers you want to consume messages from in order of priority.'); diff --git a/DependencyInjection/MessengerPass.php b/DependencyInjection/MessengerPass.php index 9bb32d6..989c9dc 100644 --- a/DependencyInjection/MessengerPass.php +++ b/DependencyInjection/MessengerPass.php @@ -254,6 +254,7 @@ private function registerReceivers(ContainerBuilder $container, array $busIds): } } + $consumableReceiverNames = []; foreach ($container->findTaggedServiceIds('messenger.receiver') as $id => $tags) { $receiverClass = $this->getServiceClass($container, $id); if (!is_subclass_of($receiverClass, ReceiverInterface::class)) { @@ -269,6 +270,9 @@ private function registerReceivers(ContainerBuilder $container, array $busIds): $failureTransportsMap[$tag['alias']] = $receiverMapping[$id]; } } + if (!isset($tag['is_consumable']) || $tag['is_consumable'] !== false) { + $consumableReceiverNames[] = $tag['alias'] ?? $id; + } } } @@ -294,7 +298,7 @@ private function registerReceivers(ContainerBuilder $container, array $busIds): $consumeCommandDefinition->replaceArgument(0, new Reference('messenger.routable_message_bus')); } - $consumeCommandDefinition->replaceArgument(4, array_values($receiverNames)); + $consumeCommandDefinition->replaceArgument(4, $consumableReceiverNames); try { $consumeCommandDefinition->replaceArgument(6, $busIds); } catch (OutOfBoundsException) { diff --git a/Tests/DependencyInjection/MessengerPassTest.php b/Tests/DependencyInjection/MessengerPassTest.php index f830489..4de01a1 100644 --- a/Tests/DependencyInjection/MessengerPassTest.php +++ b/Tests/DependencyInjection/MessengerPassTest.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Tests\DependencyInjection; use PHPUnit\Framework\TestCase; +use Symfony\Component\Console\Command\Command; use Symfony\Component\DependencyInjection\ChildDefinition; use Symfony\Component\DependencyInjection\Compiler\AttributeAutoconfigurationPass; use Symfony\Component\DependencyInjection\Compiler\ResolveChildDefinitionsPass; @@ -471,6 +472,34 @@ public function testItSetsTheReceiverNamesOnTheSetupTransportsCommand() $this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_setup_transports')->getArgument(1)); } + public function testOnlyConsumableTransportsAreAddedToConsumeCommand() + { + $container = new ContainerBuilder(); + + $container->register('messenger.transport.async', DummyReceiver::class) + ->addTag('messenger.receiver', ['alias' => 'async']); + $container->register('messenger.transport.sync', DummyReceiver::class) + ->addTag('messenger.receiver', ['alias' => 'sync', 'is_consumable' => false]); + $container->register('messenger.receiver_locator', ServiceLocator::class) + ->setArguments([[]]); + + $container->register('console.command.messenger_consume_messages', Command::class) + ->setArguments([ + null, + null, + null, + null, + [], + ]); + + (new MessengerPass())->process($container); + + $this->assertSame( + ['async'], + $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4) + ); + } + public function testItRegistersHandlersOnDifferentBuses() { $container = $this->getContainerBuilder($eventsBusId = 'event_bus');