diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml index 16732e2af480d..87f7c50ca82d0 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml @@ -70,10 +70,11 @@ - + null + diff --git a/src/Symfony/Bundle/FrameworkBundle/composer.json b/src/Symfony/Bundle/FrameworkBundle/composer.json index 76d949b9670dc..8badd284a4c28 100644 --- a/src/Symfony/Bundle/FrameworkBundle/composer.json +++ b/src/Symfony/Bundle/FrameworkBundle/composer.json @@ -41,7 +41,7 @@ "symfony/security": "~3.4|~4.0", "symfony/form": "^4.1", "symfony/expression-language": "~3.4|~4.0", - "symfony/messenger": "^4.1", + "symfony/messenger": "^4.2", "symfony/process": "~3.4|~4.0", "symfony/security-core": "~3.4|~4.0", "symfony/security-csrf": "~3.4|~4.0", @@ -67,6 +67,7 @@ "symfony/asset": "<3.4", "symfony/console": "<3.4", "symfony/form": "<4.1", + "symfony/messenger": "<4.2", "symfony/property-info": "<3.4", "symfony/serializer": "<4.1", "symfony/stopwatch": "<3.4", diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 5ec8b11e86081..f053eaaf148d5 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -6,3 +6,5 @@ CHANGELOG * `ValidationMiddleware::handle()` and `SendMessageMiddleware::handle()` now require an `Envelope` object * `EnvelopeItemInterface` doesn't extend `Serializable` anymore + * [BC BREAK] The `ConsumeMessagesCommand` class now takes an instance of `Psr\Container\ContainerInterface` + as first constructor argument diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index 06b48a50c5ac3..9d93f86bc5bd4 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -20,7 +20,6 @@ use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; -use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver; use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver; use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver; @@ -35,17 +34,19 @@ class ConsumeMessagesCommand extends Command { protected static $defaultName = 'messenger:consume-messages'; - private $bus; + private $busLocator; private $receiverLocator; private $logger; private $receiverNames; + private $busNames; - public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = array()) + public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = array(), array $busNames = array()) { - $this->bus = $bus; + $this->busLocator = $busLocator; $this->receiverLocator = $receiverLocator; $this->logger = $logger; $this->receiverNames = $receiverNames; + $this->busNames = $busNames; parent::__construct(); } @@ -56,6 +57,7 @@ public function __construct(MessageBusInterface $bus, ContainerInterface $receiv protected function configure(): void { $defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null; + $defaultBusName = 1 === \count($this->busNames) ? current($this->busNames) : null; $this ->setDefinition(array( @@ -63,6 +65,7 @@ protected function configure(): void new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'), new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'), new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'), + new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName), )) ->setDescription('Consumes messages') ->setHelp(<<<'EOF' @@ -91,18 +94,35 @@ protected function configure(): void */ protected function interact(InputInterface $input, OutputInterface $output) { - if (!$this->receiverNames || $this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) { - return; + $style = new SymfonyStyle($input, $output); + + if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) { + if (null === $receiverName) { + $style->block('Missing receiver argument.', null, 'error', ' ', true); + $input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames)); + } elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) { + $style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true); + if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) { + $input->setArgument('receiver', $alternatives[0]); + } + } } - $style = new SymfonyStyle($input, $output); - if (null === $receiverName) { - $style->block('Missing receiver argument.', null, 'error', ' ', true); - $input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames)); - } elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) { - $style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true); - if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) { - $input->setArgument('receiver', $alternatives[0]); + $busName = $input->getOption('bus'); + if ($this->busNames && !$this->busLocator->has($busName)) { + if (null === $busName) { + $style->block('Missing bus argument.', null, 'error', ' ', true); + $input->setOption('bus', $style->choice('Select one of the available buses', $this->busNames)); + } elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) { + $style->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true); + + if (1 === \count($alternatives)) { + if ($style->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) { + $input->setOption('bus', $alternatives[0]); + } + } else { + $input->setOption('bus', $style->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0])); + } } } } @@ -116,7 +136,12 @@ protected function execute(InputInterface $input, OutputInterface $output): void throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName)); } + if (!$this->busLocator->has($busName = $input->getOption('bus'))) { + throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName)); + } + $receiver = $this->receiverLocator->get($receiverName); + $bus = $this->busLocator->get($busName); if ($limit = $input->getOption('limit')) { $receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger); @@ -130,7 +155,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void $receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger); } - $worker = new Worker($receiver, $this->bus); + $worker = new Worker($receiver, $bus); $worker->run(); } diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php index 2de3289a51695..d31b8df11442d 100644 --- a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php +++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php @@ -184,6 +184,17 @@ private function registerHandlers(ContainerBuilder $container, array $busIds) } $container->getDefinition('console.command.messenger_debug')->replaceArgument(0, $debugCommandMapping); } + + if ($container->hasDefinition('console.command.messenger_consume_messages')) { + $buses = array(); + foreach ($busIds as $busId) { + $buses[$busId] = new Reference($busId); + } + $container + ->getDefinition('console.command.messenger_consume_messages') + ->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses)) + ->replaceArgument(4, $busIds); + } } private function guessHandledClasses(\ReflectionClass $handlerClass, string $serviceId): iterable diff --git a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php index de489b545f0e4..e5a1940d8c45d 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php @@ -14,13 +14,12 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\Messenger\Command\ConsumeMessagesCommand; -use Symfony\Component\Messenger\MessageBus; class ConsumeMessagesCommandTest extends TestCase { public function testConfigurationWithDefaultReceiver() { - $command = new ConsumeMessagesCommand($this->createMock(MessageBus::class), $this->createMock(ServiceLocator::class), null, array('amqp')); + $command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, array('amqp')); $inputArgument = $command->getDefinition()->getArgument('receiver'); $this->assertFalse($inputArgument->isRequired()); $this->assertSame('amqp', $inputArgument->getDefault()); @@ -28,7 +27,7 @@ public function testConfigurationWithDefaultReceiver() public function testConfigurationWithoutDefaultReceiver() { - $command = new ConsumeMessagesCommand($this->createMock(MessageBus::class), $this->createMock(ServiceLocator::class), null, array('amqp', 'dummy')); + $command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, array('amqp', 'dummy')); $inputArgument = $command->getDefinition()->getArgument('receiver'); $this->assertTrue($inputArgument->isRequired()); $this->assertNull($inputArgument->getDefault()); diff --git a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php index f3fc76baa386d..8306fe59f93ba 100644 --- a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php +++ b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php @@ -241,10 +241,11 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm { $container = $this->getContainerBuilder(); $container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments(array( - new Reference('message_bus'), + null, new Reference('messenger.receiver_locator'), null, null, + null, )); $container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', array('alias' => 'amqp')); @@ -253,6 +254,7 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm (new MessengerPass())->process($container); $this->assertSame(array('amqp', 'dummy'), $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3)); + $this->assertSame(array('message_bus'), $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4)); } public function testItRegistersSenders()