diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php index 0f7ad4c11f312..b9f2e20b2b22a 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php @@ -1200,6 +1200,10 @@ function ($a) { ->end() ->end() ->end() + ->scalarNode('failure_transport') + ->defaultNull() + ->info('Transport name to send failed messages to (after all retries have failed).') + ->end() ->scalarNode('default_bus')->defaultNull()->end() ->arrayNode('buses') ->defaultValue(['messenger.bus.default' => ['default_middleware' => true, 'middleware' => []]]) diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index fda2aa5e38f22..94f1b2e5b33d3 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -40,9 +40,9 @@ use Symfony\Component\Console\Application; use Symfony\Component\Console\Command\Command; use Symfony\Component\DependencyInjection\Alias; -use Symfony\Component\DependencyInjection\Argument\IteratorArgument; use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument; use Symfony\Component\DependencyInjection\ChildDefinition; +use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\ContainerInterface; use Symfony\Component\DependencyInjection\Definition; @@ -284,6 +284,9 @@ public function load(array $configs, ContainerBuilder $container) $container->removeDefinition('console.command.messenger_debug'); $container->removeDefinition('console.command.messenger_stop_workers'); $container->removeDefinition('console.command.messenger_setup_transports'); + $container->removeDefinition('console.command.messenger_failed_messages_retry'); + $container->removeDefinition('console.command.messenger_failed_messages_show'); + $container->removeDefinition('console.command.messenger_failed_messages_remove'); } $propertyInfoEnabled = $this->isConfigEnabled($container, $config['property_info']); @@ -1743,22 +1746,48 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) { throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message)); } - $senders = []; + + // make sure senderAliases contains all senders foreach ($messageConfiguration['senders'] as $sender) { - $senders[$sender] = new Reference($senderAliases[$sender] ?? $sender); + if (!isset($senderAliases[$sender])) { + $senderAliases[$sender] = $sender; + } } - $messageToSendersMapping[$message] = new IteratorArgument($senders); + $messageToSendersMapping[$message] = $messageConfiguration['senders']; $messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle']; } + $senderReferences = []; + foreach ($senderAliases as $alias => $serviceId) { + $senderReferences[$alias] = new Reference($serviceId); + } + $container->getDefinition('messenger.senders_locator') ->replaceArgument(0, $messageToSendersMapping) - ->replaceArgument(1, $messagesToSendAndHandle) + ->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences)) + ->replaceArgument(2, $messagesToSendAndHandle) ; $container->getDefinition('messenger.retry_strategy_locator') ->replaceArgument(0, $transportRetryReferences); + + $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') + ->replaceArgument(1, $config['failure_transport']); + + if ($config['failure_transport']) { + $container->getDefinition('console.command.messenger_failed_messages_retry') + ->replaceArgument(0, $config['failure_transport']) + ->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null); + $container->getDefinition('console.command.messenger_failed_messages_show') + ->replaceArgument(0, $config['failure_transport']); + $container->getDefinition('console.command.messenger_failed_messages_remove') + ->replaceArgument(0, $config['failure_transport']); + } else { + $container->removeDefinition('console.command.messenger_failed_messages_retry'); + $container->removeDefinition('console.command.messenger_failed_messages_show'); + $container->removeDefinition('console.command.messenger_failed_messages_remove'); + } } private function registerCacheConfiguration(array $config, ContainerBuilder $container) diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml index 1c74220bf8417..46c103cee4675 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml @@ -114,6 +114,31 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index b0bcf2fd5ccbb..1671810d395a4 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -9,7 +9,8 @@ - + + @@ -91,5 +92,19 @@ + + + + + + + + + + + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php index bd3ba1c18d8c0..3906a3dc7a347 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php @@ -328,6 +328,7 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor 'enabled' => !class_exists(FullStack::class) && interface_exists(MessageBusInterface::class), 'routing' => [], 'transports' => [], + 'failure_transport' => null, 'serializer' => [ 'default_serializer' => 'messenger.transport.native_php_serializer', 'symfony_serializer' => [ diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index 683264821acbb..f1dade56c6083 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -721,12 +721,16 @@ public function testMessengerRouting() '*' => false, ]; - $this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(1)); + $this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(2)); $sendersMapping = $senderLocatorDefinition->getArgument(0); $this->assertEquals([ - 'amqp' => new Reference('messenger.transport.amqp'), - 'audit' => new Reference('audit'), - ], $sendersMapping[DummyMessage::class]->getValues()); + 'amqp', + 'audit', + ], $sendersMapping[DummyMessage::class]); + $sendersLocator = $container->getDefinition((string) $senderLocatorDefinition->getArgument(1)); + $this->assertSame(['amqp', 'audit'], array_keys($sendersLocator->getArgument(0))); + $this->assertEquals(new Reference('messenger.transport.amqp'), $sendersLocator->getArgument(0)['amqp']->getValues()[0]); + $this->assertEquals(new Reference('audit'), $sendersLocator->getArgument(0)['audit']->getValues()[0]); } public function testMessengerTransportConfiguration() diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 969b01fcc3acf..37c055d9c31d2 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -4,6 +4,13 @@ CHANGELOG 4.3.0 ----- + * [BC BREAK] `SendersLocatorInterface` has an additional method: + `getSenderByAlias()`. + * A new `ListableReceiverInterface` was added, which a receiver + can implement (when applicable) to enable listing and fetching + individual messages by id (used in the new "Failed Messages" commands). + * Both `SenderInterface::send()` and `ReceiverInterface::get()` + should now (when applicable) add a `TransportMessageIdStamp`. * Added `WorkerStoppedEvent` dispatched when a worker is stopped. * Added optional `MessageCountAwareInterface` that receivers can implement to give information about how many messages are waiting to be processed. diff --git a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php new file mode 100644 index 0000000000000..854daba0e6288 --- /dev/null +++ b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php @@ -0,0 +1,112 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Command; + +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Helper\Dumper; +use Symfony\Component\Console\Style\SymfonyStyle; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; +use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; + +/** + * @author Ryan Weaver + * + * @internal + * @experimental in 4.3 + */ +abstract class AbstractFailedMessagesCommand extends Command +{ + private $receiverName; + private $receiver; + + public function __construct(string $receiverName, ReceiverInterface $receiver) + { + $this->receiverName = $receiverName; + $this->receiver = $receiver; + + parent::__construct(); + } + + protected function getReceiverName() + { + return $this->receiverName; + } + + /** + * @return mixed|null + */ + protected function getMessageId(Envelope $envelope) + { + /** @var TransportMessageIdStamp $stamp */ + $stamp = $envelope->last(TransportMessageIdStamp::class); + + return null !== $stamp ? $stamp->getId() : null; + } + + protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io) + { + $io->title('Failed Message Details'); + + /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */ + $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class); + + $rows = [ + ['Class', \get_class($envelope->getMessage())], + ]; + + if (null !== $id = $this->getMessageId($envelope)) { + $rows[] = ['Message Id', $id]; + } + + if (null === $sentToFailureTransportStamp) { + $io->warning('Message does not appear to have been sent to this transport after failing'); + } else { + $rows = array_merge($rows, [ + ['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')], + ['Error', $sentToFailureTransportStamp->getExceptionMessage()], + ['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'], + ['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()], + ]); + } + + $io->table([], $rows); + + if ($io->isVeryVerbose()) { + $io->title('Message:'); + $dump = new Dumper($io); + $io->writeln($dump($envelope->getMessage())); + $io->title('Exception:'); + $io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString()); + } else { + $io->writeln(' Re-run command with -vv to see more message & error details.'); + } + } + + protected function printPendingMessagesMessage(ReceiverInterface $receiver, SymfonyStyle $io) + { + if ($receiver instanceof MessageCountAwareInterface) { + if (1 === $receiver->getMessageCount()) { + $io->writeln('There is 1 message pending in the failure transport.'); + } else { + $io->writeln(sprintf('There are %d messages pending in the failure transport.', $receiver->getMessageCount())); + } + } + } + + protected function getReceiver(): ReceiverInterface + { + return $this->receiver; + } +} diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php new file mode 100644 index 0000000000000..f42e917b833b9 --- /dev/null +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php @@ -0,0 +1,88 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Command; + +use Symfony\Component\Console\Exception\RuntimeException; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\ConsoleOutputInterface; +use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Style\SymfonyStyle; +use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; + +/** + * @author Ryan Weaver + * + * @experimental in 4.3 + */ +class FailedMessagesRemoveCommand extends AbstractFailedMessagesCommand +{ + protected static $defaultName = 'messenger:failed:remove'; + + /** + * {@inheritdoc} + */ + protected function configure(): void + { + $this + ->setDefinition([ + new InputArgument('id', InputArgument::REQUIRED, 'Specific message id to remove'), + new InputOption('force', null, InputOption::VALUE_NONE, 'Force the operation without confirmation'), + ]) + ->setDescription('Remove a message from the failure transport.') + ->setHelp(<<<'EOF' +The %command.name% removes a message that is pending in the failure transport. + + php %command.full_name% {id} + +The specific id can be found via the messenger:failed:show command. +EOF + ) + ; + } + + /** + * {@inheritdoc} + */ + protected function execute(InputInterface $input, OutputInterface $output) + { + $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); + + $receiver = $this->getReceiver(); + + $shouldForce = $input->getOption('force'); + $this->removeSingleMessage($input->getArgument('id'), $receiver, $io, $shouldForce); + } + + private function removeSingleMessage($id, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce) + { + if (!$receiver instanceof ListableReceiverInterface) { + throw new RuntimeException(sprintf('The "%s" receiver does not support removing specific messages.', $this->getReceiverName())); + } + + $envelope = $receiver->find($id); + if (null === $envelope) { + throw new RuntimeException(sprintf('The message with id "%s" was not found.', $id)); + } + $this->displaySingleMessage($envelope, $io); + + if ($shouldForce || $io->confirm('Do you want to permanently remove this message?', false)) { + $receiver->reject($envelope); + + $io->success('Message removed.'); + } else { + $io->note('Message not removed.'); + } + } +} diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php new file mode 100644 index 0000000000000..a2ebc290c179b --- /dev/null +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -0,0 +1,221 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Command; + +use Psr\Log\LoggerInterface; +use Symfony\Component\Console\Exception\RuntimeException; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\ConsoleOutputInterface; +use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Style\SymfonyStyle; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; +use Symfony\Component\Messenger\Exception\LogicException; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Retry\RetryStrategyInterface; +use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; +use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver; +use Symfony\Component\Messenger\Worker; + +/** + * @author Ryan Weaver + * + * @experimental in 4.3 + */ +class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand +{ + protected static $defaultName = 'messenger:failed:retry'; + + private $eventDispatcher; + private $messageBus; + private $retryStrategy; + private $logger; + + public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, RetryStrategyInterface $retryStrategy = null, LoggerInterface $logger = null) + { + $this->eventDispatcher = $eventDispatcher; + $this->messageBus = $messageBus; + $this->retryStrategy = $retryStrategy; + $this->logger = $logger; + + parent::__construct($receiverName, $receiver); + } + + /** + * {@inheritdoc} + */ + protected function configure(): void + { + $this + ->setDefinition([ + new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'), + new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'), + ]) + ->setDescription('Retries one or more messages from the failure transport.') + ->setHelp(<<<'EOF' +The %command.name% retries message in the failure transport. + + php %command.full_name% + +The command will interactively ask if each message should be retried +or discarded. + +Some transports support retrying a specific message id, which comes +from the messenger:failed:show command. + + php %command.full_name% {id} + +Or pass multiple ids at once to process multiple messages: + +php %command.full_name% {id1} {id2} {id3} + +EOF + ) + ; + } + + /** + * {@inheritdoc} + */ + protected function execute(InputInterface $input, OutputInterface $output) + { + $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); + $io->comment('Quit this command with CONTROL-C.'); + if (!$output->isVeryVerbose()) { + $io->comment('Re-run the command with a -vv option to see logs about consumed messages.'); + } + + $receiver = $this->getReceiver(); + $this->printPendingMessagesMessage($receiver, $io); + + $io->writeln(sprintf('To retry all the messages, run messenger:consume %s', $this->getReceiverName())); + + $shouldForce = $input->getOption('force'); + $ids = $input->getArgument('id'); + if (0 === \count($ids)) { + if (!$input->isInteractive()) { + throw new RuntimeException('Message id must be passed when in non-interactive mode.'); + } + + $this->runInteractive($io, $shouldForce); + + return; + } + + $this->retrySpecificIds($ids, $io, $shouldForce); + $io->success('All done!'); + } + + private function runInteractive(SymfonyStyle $io, bool $shouldForce) + { + $receiver = $this->getReceiver(); + $count = 0; + if ($receiver instanceof ListableReceiverInterface) { + // for listable receivers, find the messages one-by-one + // this avoids using get(), which for some less-robust + // transports (like Doctrine), will cause the message + // to be temporarily "acked", even if the user aborts + // handling the message + while (true) { + $ids = []; + foreach ($receiver->all(1) as $envelope) { + ++$count; + + $id = $this->getMessageId($envelope); + if (null === $id) { + throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $this->getReceiverName())); + } + $ids[] = $id; + } + + // break the loop if all messages are consumed + if (0 === \count($ids)) { + break; + } + + $this->retrySpecificIds($ids, $io, $shouldForce); + } + } else { + // get() and ask messages one-by-one + $count = $this->runWorker($this->getReceiver(), $io, $shouldForce); + } + + // avoid success message if nothing was processed + if (1 < $count) { + $io->success('All failed messages have been handled or removed!'); + } + } + + private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int + { + $listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce) { + $envelope = $messageReceivedEvent->getEnvelope(); + + $this->displaySingleMessage($envelope, $io); + + $shouldHandle = $shouldForce || $io->confirm('Do you want to retry (yes) or delete this message (no)?'); + + if ($shouldHandle) { + return; + } + + $messageReceivedEvent->shouldHandle(false); + $receiver->reject($envelope); + }; + $this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener); + + $worker = new Worker( + [$this->getReceiverName() => $receiver], + $this->messageBus, + [$this->getReceiverName() => $this->retryStrategy], + $this->eventDispatcher, + $this->logger + ); + + $count = 0; + try { + $worker->run([], function (?Envelope $envelope) use ($worker, $io, &$count) { + ++$count; + if (null === $envelope) { + $worker->stop(); + } + }); + } finally { + $this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener); + } + + return $count; + } + + private function retrySpecificIds(array $ids, SymfonyStyle $io, bool $shouldForce) + { + $receiver = $this->getReceiver(); + + if (!$receiver instanceof ListableReceiverInterface) { + throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $this->getReceiverName())); + } + + foreach ($ids as $id) { + $envelope = $receiver->find($id); + if (null === $envelope) { + throw new RuntimeException(sprintf('The message "%s" was not found.', $id)); + } + + $singleReceiver = new SingleMessageReceiver($receiver, $envelope); + $this->runWorker($singleReceiver, $io, $shouldForce); + } + } +} diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php new file mode 100644 index 0000000000000..da49a4b7fe2c5 --- /dev/null +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php @@ -0,0 +1,129 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Command; + +use Symfony\Component\Console\Exception\RuntimeException; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\ConsoleOutputInterface; +use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Style\SymfonyStyle; +use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; +use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; + +/** + * @author Ryan Weaver + * + * @experimental in 4.3 + */ +class FailedMessagesShowCommand extends AbstractFailedMessagesCommand +{ + protected static $defaultName = 'messenger:failed:show'; + + /** + * {@inheritdoc} + */ + protected function configure(): void + { + $this + ->setDefinition([ + new InputArgument('id', InputArgument::OPTIONAL, 'Specific message id to show'), + new InputOption('max', null, InputOption::VALUE_REQUIRED, 'Maximum number of messages to list', 50), + ]) + ->setDescription('Shows one or more messages from the failure transport.') + ->setHelp(<<<'EOF' +The %command.name% shows message that are pending in the failure transport. + + php %command.full_name% + +Or look at a specific message by its id: + + php %command.full_name% {id} +EOF + ) + ; + } + + /** + * {@inheritdoc} + */ + protected function execute(InputInterface $input, OutputInterface $output) + { + $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); + + $receiver = $this->getReceiver(); + $this->printPendingMessagesMessage($receiver, $io); + + if (!$receiver instanceof ListableReceiverInterface) { + throw new RuntimeException(sprintf('The "%s" receiver does not support listing or showing specific messages.', $this->getReceiverName())); + } + + if (null === $id = $input->getArgument('id')) { + $this->listMessages($io, $input->getOption('max')); + } else { + $this->showMessage($id, $io); + } + } + + private function listMessages(SymfonyStyle $io, int $max) + { + /** @var ListableReceiverInterface $receiver */ + $receiver = $this->getReceiver(); + $envelopes = $receiver->all($max); + + $rows = []; + foreach ($envelopes as $envelope) { + /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */ + $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class); + + $rows[] = [ + $this->getMessageId($envelope), + \get_class($envelope->getMessage()), + null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s'), + null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getExceptionMessage(), + ]; + } + + if (0 === \count($rows)) { + $io->success('No failed messages were found.'); + + return; + } + + $io->table(['Id', 'Class', 'Failed at', 'Error'], $rows); + + if (\count($rows) === $max) { + $io->comment(sprintf('Showing first %d messages.', $max)); + } + + $io->comment('Run messenger:failed:show {id} -vv to see message details.'); + } + + private function showMessage($id, SymfonyStyle $io) + { + /** @var ListableReceiverInterface $receiver */ + $receiver = $this->getReceiver(); + $envelope = $receiver->find($id); + if (null === $envelope) { + throw new RuntimeException(sprintf('The message "%s" was not found.', $id)); + } + + $this->displaySingleMessage($envelope, $io); + + $io->writeln([ + '', + sprintf(' Run messenger:failed:retry %s to retry this message.', $id), + sprintf(' Run messenger:failed:remove %s to delete it.', $id), + ]); + } +} diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php index 46550f5121497..201688944d643 100644 --- a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php +++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php @@ -247,12 +247,18 @@ private function registerReceivers(ContainerBuilder $container, array $busIds) foreach ($receiverMapping as $name => $reference) { $receiverNames[(string) $reference] = $name; } - if ($container->hasDefinition('console.command.messenger_consume_messages')) { - $buses = []; - foreach ($busIds as $busId) { - $buses[$busId] = new Reference($busId); - } + $buses = []; + foreach ($busIds as $busId) { + $buses[$busId] = new Reference($busId); + } + + if ($container->hasDefinition('messenger.routable_message_bus')) { + $container->getDefinition('messenger.routable_message_bus') + ->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses)); + } + + if ($container->hasDefinition('console.command.messenger_consume_messages')) { $container->getDefinition('console.command.messenger_consume_messages') ->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses)) ->replaceArgument(3, array_values($receiverNames)); @@ -264,6 +270,18 @@ private function registerReceivers(ContainerBuilder $container, array $busIds) } $container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping); + + $failedCommandIds = [ + 'console.command.messenger_failed_messages_retry', + 'console.command.messenger_failed_messages_show', + 'console.command.messenger_failed_messages_remove', + ]; + foreach ($failedCommandIds as $failedCommandId) { + if ($container->hasDefinition($failedCommandId)) { + $definition = $container->getDefinition($failedCommandId); + $definition->replaceArgument(1, $receiverMapping[$definition->getArgument(0)]); + } + } } private function registerBusToCollector(ContainerBuilder $container, string $busId) diff --git a/src/Symfony/Component/Messenger/Event/WorkerMessageReceivedEvent.php b/src/Symfony/Component/Messenger/Event/WorkerMessageReceivedEvent.php index 3df75b8723477..4443853363244 100644 --- a/src/Symfony/Component/Messenger/Event/WorkerMessageReceivedEvent.php +++ b/src/Symfony/Component/Messenger/Event/WorkerMessageReceivedEvent.php @@ -20,4 +20,14 @@ */ class WorkerMessageReceivedEvent extends AbstractWorkerMessageEvent { + private $shouldHandle = true; + + public function shouldHandle(bool $shouldHandle = null): bool + { + if (null !== $shouldHandle) { + $this->shouldHandle = $shouldHandle; + } + + return $this->shouldHandle; + } } diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php new file mode 100644 index 0000000000000..d62a8ed799444 --- /dev/null +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php @@ -0,0 +1,89 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\EventListener; + +use Psr\Log\LoggerInterface; +use Symfony\Component\Debug\Exception\FlattenException; +use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\Exception\HandlerFailedException; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Stamp\SentStamp; +use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; + +/** + * Sends a rejected message to a "failure transport". + * + * @author Ryan Weaver + * + * @experimental in 4.3 + */ +class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface +{ + private $messageBus; + private $failureSenderAlias; + private $logger; + + public function __construct(MessageBusInterface $messageBus, string $failureSenderAlias, LoggerInterface $logger = null) + { + $this->messageBus = $messageBus; + $this->failureSenderAlias = $failureSenderAlias; + $this->logger = $logger; + } + + public function onMessageFailed(WorkerMessageFailedEvent $event) + { + if ($event->willRetry()) { + return; + } + + $envelope = $event->getEnvelope(); + + // avoid re-sending to the failed sender + foreach ($envelope->all(SentStamp::class) as $sentStamp) { + /** @var SentStamp $sentStamp */ + if ($sentStamp->getSenderAlias() === $this->failureSenderAlias) { + return; + } + } + + // remove the received stamp so it's redelivered + $throwable = $event->getThrowable(); + if ($throwable instanceof HandlerFailedException) { + $throwable = $throwable->getNestedExceptions()[0]; + } + + $flattenedException = \class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null; + $envelope = $envelope->withoutAll(ReceivedStamp::class) + ->withoutAll(TransportMessageIdStamp::class) + ->with(new SentToFailureTransportStamp($throwable->getMessage(), $event->getReceiverName(), $flattenedException)) + ->with(new RedeliveryStamp(0, $this->failureSenderAlias)); + + if (null !== $this->logger) { + $this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [ + 'class' => \get_class($envelope->getMessage()), + 'transport' => $this->failureSenderAlias, + ]); + } + + $this->messageBus->dispatch($envelope); + } + + public static function getSubscribedEvents() + { + return [ + WorkerMessageFailedEvent::class => ['onMessageFailed', -100], + ]; + } +} diff --git a/src/Symfony/Component/Messenger/Exception/UnknownSenderException.php b/src/Symfony/Component/Messenger/Exception/UnknownSenderException.php new file mode 100644 index 0000000000000..72fccfa566f90 --- /dev/null +++ b/src/Symfony/Component/Messenger/Exception/UnknownSenderException.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Exception; + +/** + * @author Ryan Weaver + * + * @experimental in 4.3 + */ +class UnknownSenderException extends \InvalidArgumentException implements ExceptionInterface +{ +} diff --git a/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php b/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php index 94392e3c98c58..042d5842f84ea 100644 --- a/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php @@ -32,7 +32,9 @@ public function __construct(string $busName) public function handle(Envelope $envelope, StackInterface $stack): Envelope { - $envelope = $envelope->with(new BusNameStamp($this->busName)); + if (null === $envelope->last(BusNameStamp::class)) { + $envelope = $envelope->with(new BusNameStamp($this->busName)); + } return $stack->next()->handle($envelope, $stack); } diff --git a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php index d2ebaf8cfa011..bc22da6b47f10 100644 --- a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php @@ -19,6 +19,7 @@ use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentStamp; +use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; @@ -65,12 +66,7 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope // dispatch event unless this is a redelivery $shouldDispatchEvent = null === $redeliveryStamp; - foreach ($this->sendersLocator->getSenders($envelope, $handle) as $alias => $sender) { - // on redelivery, only deliver to the given sender - if (null !== $redeliveryStamp && !$redeliveryStamp->shouldRedeliverToSender(\get_class($sender), $alias)) { - continue; - } - + foreach ($this->getSenders($envelope, $handle, $redeliveryStamp) as $alias => $sender) { if (null !== $this->eventDispatcher && $shouldDispatchEvent) { $event = new SendMessageToTransportsEvent($envelope); $this->eventDispatcher->dispatch($event); @@ -107,4 +103,18 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope // message should only be sent and not be handled by the next middleware return $envelope; } + + /** + * * @return iterable|SenderInterface[] + */ + private function getSenders(Envelope $envelope, &$handle, ?RedeliveryStamp $redeliveryStamp): iterable + { + if (null !== $redeliveryStamp) { + return [ + $redeliveryStamp->getSenderClassOrAlias() => $this->sendersLocator->getSenderByAlias($redeliveryStamp->getSenderClassOrAlias()), + ]; + } + + return $this->sendersLocator->getSenders($envelope, $handle); + } } diff --git a/src/Symfony/Component/Messenger/Retry/MultiplierRetryStrategy.php b/src/Symfony/Component/Messenger/Retry/MultiplierRetryStrategy.php index 394be2509d2c5..e883b3ffd9ac5 100644 --- a/src/Symfony/Component/Messenger/Retry/MultiplierRetryStrategy.php +++ b/src/Symfony/Component/Messenger/Retry/MultiplierRetryStrategy.php @@ -39,12 +39,12 @@ class MultiplierRetryStrategy implements RetryStrategyInterface private $maxDelayMilliseconds; /** - * @param int $maxRetries The maximum number of time to retry (0 means indefinitely) + * @param int $maxRetries The maximum number of time to retry (null means indefinitely) * @param int $delayMilliseconds Amount of time to delay (or the initial value when multiplier is used) * @param float $multiplier Multiplier to apply to the delay each time a retry occurs * @param int $maxDelayMilliseconds Maximum delay to allow (0 means no maximum) */ - public function __construct(int $maxRetries = 3, int $delayMilliseconds = 1000, float $multiplier = 1, int $maxDelayMilliseconds = 0) + public function __construct(?int $maxRetries = 3, int $delayMilliseconds = 1000, float $multiplier = 1, int $maxDelayMilliseconds = 0) { $this->maxRetries = $maxRetries; @@ -66,7 +66,7 @@ public function __construct(int $maxRetries = 3, int $delayMilliseconds = 1000, public function isRetryable(Envelope $message): bool { - if (0 === $this->maxRetries) { + if (null === $this->maxRetries) { return true; } diff --git a/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php index fd7c88bcbc5bb..a0dd9b89d869e 100644 --- a/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php +++ b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php @@ -44,17 +44,4 @@ public function getSenderClassOrAlias(): string { return $this->senderClassOrAlias; } - - public function shouldRedeliverToSender(string $senderClass, ?string $senderAlias): bool - { - if (null !== $senderAlias && $senderAlias === $this->senderClassOrAlias) { - return true; - } - - if ($senderClass === $this->senderClassOrAlias) { - return true; - } - - return false; - } } diff --git a/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php b/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php new file mode 100644 index 0000000000000..05a5dfcacf77b --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php @@ -0,0 +1,57 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +use Symfony\Component\Debug\Exception\FlattenException; + +/** + * Stamp applied when a message is sent to the failure transport. + * + * @author Ryan Weaver + * + * @experimental in 4.3 + */ +class SentToFailureTransportStamp implements StampInterface +{ + private $exceptionMessage; + private $originalReceiverName; + private $flattenException; + private $sentAt; + + public function __construct(string $exceptionMessage, string $originalReceiverName, FlattenException $flattenException = null) + { + $this->exceptionMessage = $exceptionMessage; + $this->originalReceiverName = $originalReceiverName; + $this->flattenException = $flattenException; + $this->sentAt = new \DateTime(); + } + + public function getExceptionMessage(): string + { + return $this->exceptionMessage; + } + + public function getOriginalReceiverName(): string + { + return $this->originalReceiverName; + } + + public function getFlattenException(): ?FlattenException + { + return $this->flattenException; + } + + public function getSentAt(): \DateTime + { + return $this->sentAt; + } +} diff --git a/src/Symfony/Component/Messenger/Stamp/TransportMessageIdStamp.php b/src/Symfony/Component/Messenger/Stamp/TransportMessageIdStamp.php new file mode 100644 index 0000000000000..b0b93ae1885c3 --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/TransportMessageIdStamp.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +/** + * Added by a sender or receiver to indicate the id of this message in that transport. + * + * @author Ryan Weaver + * + * @experimental in 4.3 + */ +class TransportMessageIdStamp implements StampInterface +{ + private $id; + + /** + * @param mixed $id some "identifier" of the message in a transport + */ + public function __construct($id) + { + $this->id = $id; + } + + public function getId() + { + return $this->id; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php new file mode 100644 index 0000000000000..74cab385adf9b --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Command; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Console\Tester\CommandTester; +use Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; + +class FailedMessagesRemoveCommandTest extends TestCase +{ + public function testBasicRun() + { + $receiver = $this->createMock(ListableReceiverInterface::class); + $receiver->expects($this->once())->method('find')->with(20)->willReturn(new Envelope(new \stdClass())); + + $command = new FailedMessagesRemoveCommand( + 'failure_receiver', + $receiver + ); + + $tester = new CommandTester($command); + $tester->execute(['id' => 20, '--force' => true]); + + $this->assertContains('Message removed.', $tester->getDisplay()); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php new file mode 100644 index 0000000000000..49f9dbfd2cf9b --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php @@ -0,0 +1,49 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Command; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Console\Tester\CommandTester; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; +use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; +use Symfony\Component\Messenger\Worker; + +class FailedMessagesRetryCommandTest extends TestCase +{ + public function testBasicRun() + { + $receiver = $this->createMock(ListableReceiverInterface::class); + $receiver->expects($this->exactly(2))->method('find')->withConsecutive([10], [12])->willReturn(new Envelope(new \stdClass())); + // message will eventually be ack'ed in Worker + $receiver->expects($this->exactly(2))->method('ack'); + + $dispatcher = $this->createMock(EventDispatcherInterface::class); + $bus = $this->createMock(MessageBusInterface::class); + // the bus should be called in the worker + $bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass())); + + $command = new FailedMessagesRetryCommand( + 'failure_receiver', + $receiver, + $bus, + $dispatcher + ); + + $tester = new CommandTester($command); + $tester->execute(['id' => [10, 12]]); + + $this->assertContains('[OK]', $tester->getDisplay()); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php new file mode 100644 index 0000000000000..ad56462dbd655 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php @@ -0,0 +1,57 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Command; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Console\Tester\CommandTester; +use Symfony\Component\Messenger\Command\FailedMessagesShowCommand; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; +use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; + +/** + * @group time-sensitive + */ +class FailedMessagesShowCommandTest extends TestCase +{ + public function testBasicRun() + { + $sentToFailureStamp = new SentToFailureTransportStamp('Things are bad!', 'async'); + $envelope = new Envelope(new \stdClass(), [ + new TransportMessageIdStamp(15), + $sentToFailureStamp, + ]); + $receiver = $this->createMock(ListableReceiverInterface::class); + $receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope); + + $command = new FailedMessagesShowCommand( + 'failure_receiver', + $receiver + ); + + $tester = new CommandTester($command); + $tester->execute(['id' => 15]); + + $this->assertContains(sprintf(<<getSentAt()->format('Y-m-d H:i:s')), + $tester->getDisplay(true)); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php new file mode 100644 index 0000000000000..9f23e462c05f5 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php @@ -0,0 +1,121 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Handler; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; +use Symfony\Component\Messenger\Exception\HandlerFailedException; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Stamp\SentStamp; +use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; + +class SendFailedMessageToFailureTransportListenerTest extends TestCase +{ + public function testItDispatchesToTheFailureTransport() + { + $bus = $this->createMock(MessageBusInterface::class); + $bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) { + /* @var Envelope $envelope */ + $this->assertInstanceOf(Envelope::class, $envelope); + + /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */ + $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class); + $this->assertNotNull($sentToFailureTransportStamp); + $this->assertSame('no!', $sentToFailureTransportStamp->getExceptionMessage()); + $this->assertSame('my_receiver', $sentToFailureTransportStamp->getOriginalReceiverName()); + $this->assertSame('no!', $sentToFailureTransportStamp->getFlattenException()->getMessage()); + + /** @var RedeliveryStamp $redeliveryStamp */ + $redeliveryStamp = $envelope->last(RedeliveryStamp::class); + $this->assertSame('failure_sender', $redeliveryStamp->getSenderClassOrAlias()); + + $this->assertNull($envelope->last(ReceivedStamp::class)); + $this->assertNull($envelope->last(TransportMessageIdStamp::class)); + + return true; + }))->willReturn(new Envelope(new \stdClass())); + $listener = new SendFailedMessageToFailureTransportListener( + $bus, + 'failure_sender' + ); + + $exception = new \Exception('no!'); + $envelope = new Envelope(new \stdClass()); + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false); + + $listener->onMessageFailed($event); + } + + public function testItGetsNestedHandlerFailedException() + { + $bus = $this->createMock(MessageBusInterface::class); + $bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) { + /** @var Envelope $envelope */ + /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */ + $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class); + $this->assertNotNull($sentToFailureTransportStamp); + $this->assertSame('I am inside!', $sentToFailureTransportStamp->getExceptionMessage()); + $this->assertSame('Exception', $sentToFailureTransportStamp->getFlattenException()->getClass()); + + return true; + }))->willReturn(new Envelope(new \stdClass())); + + $listener = new SendFailedMessageToFailureTransportListener( + $bus, + 'failure_sender' + ); + + $envelope = new Envelope(new \stdClass()); + $exception = new \Exception('I am inside!'); + $exception = new HandlerFailedException($envelope, [$exception]); + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false); + + $listener->onMessageFailed($event); + } + + public function testDoNothingOnRetry() + { + $bus = $this->createMock(MessageBusInterface::class); + $bus->expects($this->never())->method('dispatch'); + $listener = new SendFailedMessageToFailureTransportListener( + $bus, + 'failure_sender' + ); + + $envelope = new Envelope(new \stdClass()); + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), true); + + $listener->onMessageFailed($event); + } + + public function testDoNotRedeliverToFailed() + { + $bus = $this->createMock(MessageBusInterface::class); + $bus->expects($this->never())->method('dispatch'); + $listener = new SendFailedMessageToFailureTransportListener( + $bus, + 'failure_sender' + ); + + $envelope = new Envelope(new \stdClass(), [ + new SentStamp('MySender', 'failure_sender'), + ]); + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), false); + + $listener->onMessageFailed($event); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php index 71fe15b25e90c..564800f744d4d 100644 --- a/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php +++ b/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php @@ -29,5 +29,9 @@ public function testItSendsTheMessageToAssignedSender() $busNameStamp = $finalEnvelope->last(BusNameStamp::class); $this->assertNotNull($busNameStamp); $this->assertSame('the_bus_name', $busNameStamp->getBusName()); + + // the stamp should not be added over and over again + $finalEnvelope = $middleware->handle($finalEnvelope, $this->getStackMock()); + $this->assertCount(1, $finalEnvelope->all(BusNameStamp::class)); } } diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php index a88f93b6a3d25..2df828ddccd31 100644 --- a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php +++ b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Tests\Middleware; +use Psr\Container\ContainerInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; @@ -34,15 +35,16 @@ public function testItSendsTheMessageToAssignedSender() $envelope = new Envelope($message); $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); - $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]])); + $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['my_sender']], ['my_sender' => $sender]); + $middleware = new SendMessageMiddleware($sendersLocator); - $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->will($this->returnArgument(0)); + $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'my_sender')))->will($this->returnArgument(0)); $envelope = $middleware->handle($envelope, $this->getStackMock(false)); /* @var SentStamp $stamp */ $this->assertInstanceOf(SentStamp::class, $stamp = $envelope->last(SentStamp::class), 'it adds a sent stamp'); - $this->assertNull($stamp->getSenderAlias()); + $this->assertSame('my_sender', $stamp->getSenderAlias()); $this->assertStringMatchesFormat('Mock_SenderInterface_%s', $stamp->getSenderClass()); } @@ -52,9 +54,8 @@ public function testItSendsTheMessageToMultipleSenders() $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock(); - $middleware = new SendMessageMiddleware(new SendersLocator([ - DummyMessage::class => ['foo' => $sender, 'bar' => $sender2], - ])); + $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo', 'bar']], ['foo' => $sender, 'bar' => $sender2]); + $middleware = new SendMessageMiddleware($sendersLocator); $sender->expects($this->once()) ->method('send') @@ -92,12 +93,16 @@ public function testItSendsToOnlyOneSenderOnRedelivery() $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock(); - $middleware = new SendMessageMiddleware(new SendersLocator([ - DummyMessage::class => ['foo' => $sender, 'bar' => $sender2], - ], [ - // normally, this class sends and handles (but not on retry) - DummyMessage::class => true, - ])); + $sendersLocator = $this->createSendersLocator( + [DummyMessage::class => ['foo', 'bar']], + ['foo' => $sender, 'bar' => $sender2], + [ + // normally, this class sends and handles (but not on retry) + DummyMessage::class => true, + ] + ); + + $middleware = new SendMessageMiddleware($sendersLocator); $sender->expects($this->never()) ->method('send') @@ -116,9 +121,10 @@ public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage() $envelope = new Envelope(new ChildDummyMessage('Hey')); $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); - $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]])); + $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo_sender']], ['foo_sender' => $sender]); + $middleware = new SendMessageMiddleware($sendersLocator); - $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope); + $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope); $middleware->handle($envelope, $this->getStackMock(false)); } @@ -129,11 +135,12 @@ public function testItAlsoCallsTheNextMiddlewareBasedOnTheMessageClass() $envelope = new Envelope($message); $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); - $middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [ + $sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [ DummyMessage::class => true, - ])); + ]); + $middleware = new SendMessageMiddleware($sendersLocator); - $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope); + $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope); $middleware->handle($envelope, $this->getStackMock()); } @@ -144,11 +151,12 @@ public function testItAlsoCallsTheNextMiddlewareBasedOnTheMessageParentClass() $envelope = new Envelope($message); $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); - $middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [ + $sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [ DummyMessage::class => true, - ])); + ]); + $middleware = new SendMessageMiddleware($sendersLocator); - $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope); + $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope); $middleware->handle($envelope, $this->getStackMock()); } @@ -159,11 +167,12 @@ public function testItAlsoCallsTheNextMiddlewareBasedOnTheMessageInterface() $envelope = new Envelope($message); $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); - $middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [ + $sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [ DummyMessageInterface::class => true, - ])); + ]); + $middleware = new SendMessageMiddleware($sendersLocator); - $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope); + $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope); $middleware->handle($envelope, $this->getStackMock()); } @@ -174,11 +183,12 @@ public function testItAlsoCallsTheNextMiddlewareBasedOnWildcard() $envelope = new Envelope($message); $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); - $middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [ + $sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [ '*' => true, - ])); + ]); + $middleware = new SendMessageMiddleware($sendersLocator); - $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope); + $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope); $middleware->handle($envelope, $this->getStackMock()); } @@ -188,7 +198,7 @@ public function testItCallsTheNextMiddlewareWhenNoSenderForThisMessage() $message = new DummyMessage('Hey'); $envelope = new Envelope($message); - $middleware = new SendMessageMiddleware(new SendersLocator([])); + $middleware = new SendMessageMiddleware($this->createSendersLocator([], [])); $middleware->handle($envelope, $this->getStackMock()); } @@ -199,7 +209,8 @@ public function testItSkipsReceivedMessages() $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); - $middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]])); + $sendersLocator = $this->createSendersLocator(['*' => ['foo']], ['foo' => $sender]); + $middleware = new SendMessageMiddleware($sendersLocator); $sender->expects($this->never())->method('send'); @@ -220,7 +231,8 @@ public function testItDispatchesTheEventOneTime() $sender1 = $this->getMockBuilder(SenderInterface::class)->getMock(); $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock(); - $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender1, $sender2]]), $dispatcher); + $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo', 'bar']], ['foo' => $sender1, 'bar' => $sender2]); + $middleware = new SendMessageMiddleware($sendersLocator, $dispatcher); $sender1->expects($this->once())->method('send')->willReturn($envelope); $sender2->expects($this->once())->method('send')->willReturn($envelope); @@ -235,7 +247,7 @@ public function testItDoesNotDispatchWithNoSenders() $dispatcher = $this->createMock(EventDispatcherInterface::class); $dispatcher->expects($this->never())->method('dispatch'); - $middleware = new SendMessageMiddleware(new SendersLocator([]), $dispatcher); + $middleware = new SendMessageMiddleware($this->createSendersLocator([], []), $dispatcher); $middleware->handle($envelope, $this->getStackMock()); } @@ -249,8 +261,11 @@ public function testItDoesNotDispatchOnRedeliver() $dispatcher->expects($this->never())->method('dispatch'); $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); + $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock(); + $sender2->expects($this->once())->method('send')->willReturn(new Envelope(new \stdClass())); - $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]), $dispatcher); + $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender, 'foo_sender' => $sender2]); + $middleware = new SendMessageMiddleware($sendersLocator, $dispatcher); $middleware->handle($envelope, $this->getStackMock(false)); } @@ -263,9 +278,27 @@ public function testItHandlesWithForceCallHandlersStamp() $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); $sender->expects($this->once())->method('send')->willReturn($envelope); - $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]])); + $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender]); + $middleware = new SendMessageMiddleware($sendersLocator); // next handler *should* be called $middleware->handle($envelope, $this->getStackMock(true)); } + + private function createSendersLocator(array $sendersMap, array $senders, array $sendAndHandle = []) + { + $container = $this->createMock(ContainerInterface::class); + $container->expects($this->any()) + ->method('has') + ->willReturnCallback(function ($id) use ($senders) { + return isset($senders[$id]); + }); + $container->expects($this->any()) + ->method('get') + ->willReturnCallback(function ($id) use ($senders) { + return $senders[$id]; + }); + + return new SendersLocator($sendersMap, $container, $sendAndHandle); + } } diff --git a/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php b/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php index a6c8c8352404b..d1c201955d4fa 100644 --- a/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php +++ b/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php @@ -26,6 +26,16 @@ public function testIsRetryable() $this->assertTrue($strategy->isRetryable($envelope)); } + public function testIsRetryableWithNullMax() + { + $strategy = new MultiplierRetryStrategy(null); + $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]); + $this->assertTrue($strategy->isRetryable($envelope)); + + $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(1, 'sender_alias')]); + $this->assertTrue($strategy->isRetryable($envelope)); + } + public function testIsNotRetryable() { $strategy = new MultiplierRetryStrategy(3); @@ -34,6 +44,13 @@ public function testIsNotRetryable() $this->assertFalse($strategy->isRetryable($envelope)); } + public function testIsNotRetryableWithZeroMax() + { + $strategy = new MultiplierRetryStrategy(0); + $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]); + $this->assertFalse($strategy->isRetryable($envelope)); + } + public function testIsRetryableWithNoStamp() { $strategy = new MultiplierRetryStrategy(3); diff --git a/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php index 74eae64afa493..71b9efcf50ea0 100644 --- a/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Tests; use PHPUnit\Framework\TestCase; +use Psr\Container\ContainerInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Handler\HandlerDescriptor; use Symfony\Component\Messenger\Handler\HandlersLocator; @@ -19,10 +20,10 @@ use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; -use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageHandlerFailingFirstTimes; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; +use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Sender\SendersLocator; use Symfony\Component\Messenger\Worker; @@ -30,20 +31,15 @@ class RetryIntegrationTest extends TestCase { public function testRetryMechanism() { - $apiMessage = new DummyMessage('API'); + $senderAndReceiver = new DummySenderAndReceiver(); - $receiver = $this->createMock(ReceiverInterface::class); - $receiver->method('get') - ->willReturn([ - new Envelope($apiMessage, [ - new SentStamp('Some\Sender', 'sender_alias'), - ]), - ]); + $senderLocator = $this->createMock(ContainerInterface::class); + $senderLocator->method('has')->with('sender_alias')->willReturn(true); + $senderLocator->method('get')->with('sender_alias')->willReturn($senderAndReceiver); + $senderLocator = new SendersLocator([DummyMessage::class => ['sender_alias']], $senderLocator); - $senderLocator = new SendersLocator([], ['*' => true]); - - $handler = new DummyMessageHandlerFailingFirstTimes(); - $throwingHandler = new DummyMessageHandlerFailingFirstTimes(1); + $handler = new DummyMessageHandlerFailingFirstTimes(0, 'A'); + $throwingHandler = new DummyMessageHandlerFailingFirstTimes(1, 'B'); $handlerLocator = new HandlersLocator([ DummyMessage::class => [ new HandlerDescriptor($handler, ['alias' => 'first']), @@ -51,14 +47,54 @@ public function testRetryMechanism() ], ]); + // dispatch the message, which will get "sent" and then received by DummySenderAndReceiver $bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]); + $envelope = new Envelope(new DummyMessage('API')); + $bus->dispatch($envelope); - $worker = new Worker(['receiverName' => $receiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]); - $worker->run([], function () use ($worker) { - $worker->stop(); + $worker = new Worker(['receiverName' => $senderAndReceiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]); + $worker->run([], function (?Envelope $envelope) use ($worker) { + if (null === $envelope) { + $worker->stop(); + } }); $this->assertSame(1, $handler->getTimesCalledWithoutThrowing()); $this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing()); } } + +class DummySenderAndReceiver implements ReceiverInterface, SenderInterface +{ + private $messagesWaiting = []; + + private $messagesReceived = []; + + public function get(): iterable + { + $message = array_shift($this->messagesWaiting); + + if (null === $message) { + return []; + } + + $this->messagesReceived[] = $message; + + return [$message]; + } + + public function ack(Envelope $envelope): void + { + } + + public function reject(Envelope $envelope): void + { + } + + public function send(Envelope $envelope): Envelope + { + $this->messagesWaiting[] = $envelope; + + return $envelope; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php index 43fe0c0b08112..6617abc8e5b31 100644 --- a/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php +++ b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php @@ -16,22 +16,6 @@ class RedeliveryStampTest extends TestCase { - public function testShouldRedeliverToSenderWithAlias() - { - $stamp = new RedeliveryStamp(5, 'foo_alias'); - - $this->assertFalse($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'bar_alias')); - $this->assertTrue($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'foo_alias')); - } - - public function testShouldRedeliverToSenderWithoutAlias() - { - $stampToRedeliverToSender1 = new RedeliveryStamp(5, 'App\Sender1'); - - $this->assertTrue($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender1', null)); - $this->assertFalse($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender2', null)); - } - public function testGetters() { $stamp = new RedeliveryStamp(10, 'sender_alias'); diff --git a/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php b/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php new file mode 100644 index 0000000000000..7639060daa178 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php @@ -0,0 +1,33 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Stamp; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Debug\Exception\FlattenException; +use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; + +class SentToFailureTransportStampTest extends TestCase +{ + public function testGetters() + { + $flattenException = new FlattenException(); + $stamp = new SentToFailureTransportStamp( + 'exception message', + 'original_receiver', + $flattenException + ); + $this->assertSame('exception message', $stamp->getExceptionMessage()); + $this->assertSame('original_receiver', $stamp->getOriginalReceiverName()); + $this->assertSame($flattenException, $stamp->getFlattenException()); + $this->assertInstanceOf(\DateTime::class, $stamp->getSentAt()); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php index 0507a0ccfa91e..79c99d5be3410 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php @@ -12,9 +12,12 @@ namespace Symfony\Component\Messenger\Tests\Transport\Doctrine; use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\Doctrine\Connection; +use Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceivedStamp; use Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\Serializer; @@ -28,14 +31,26 @@ public function testItReturnsTheDecodedMessageToTheHandler() { $serializer = $this->createSerializer(); - $doctrineEnvelop = $this->createDoctrineEnvelope(); + $doctrineEnvelope = $this->createDoctrineEnvelope(); $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); - $connection->method('get')->willReturn($doctrineEnvelop); + $connection->method('get')->willReturn($doctrineEnvelope); $receiver = new DoctrineReceiver($connection, $serializer); $actualEnvelopes = iterator_to_array($receiver->get()); $this->assertCount(1, $actualEnvelopes); + /** @var Envelope $actualEnvelope */ + $actualEnvelope = $actualEnvelopes[0]; $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); + + /** @var DoctrineReceivedStamp $doctrineReceivedStamp */ + $doctrineReceivedStamp = $actualEnvelope->last(DoctrineReceivedStamp::class); + $this->assertNotNull($doctrineReceivedStamp); + $this->assertSame('1', $doctrineReceivedStamp->getId()); + + /** @var TransportMessageIdStamp $transportMessageIdStamp */ + $transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class); + $this->assertNotNull($transportMessageIdStamp); + $this->assertSame(1, $transportMessageIdStamp->getId()); } /** @@ -55,6 +70,34 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException() iterator_to_array($receiver->get()); } + public function testAll() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope1 = $this->createDoctrineEnvelope(); + $doctrineEnvelope2 = $this->createDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('findAll')->with(50)->willReturn([$doctrineEnvelope1, $doctrineEnvelope2]); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelopes = \iterator_to_array($receiver->all(50)); + $this->assertCount(2, $actualEnvelopes); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); + } + + public function testFind() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope = $this->createDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('find')->with(10)->willReturn($doctrineEnvelope); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelope = $receiver->find(10); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); + } + private function createDoctrineEnvelope() { return [ diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineSenderTest.php index 26badf93340f5..dcc90c67854b0 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineSenderTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineSenderTest.php @@ -14,6 +14,7 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\Doctrine\Connection; use Symfony\Component\Messenger\Transport\Doctrine\DoctrineSender; @@ -29,13 +30,18 @@ public function testSend() $connection = $this->getMockBuilder(Connection::class) ->disableOriginalConstructor() ->getMock(); - $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers']); + $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'])->willReturn(15); $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(); $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); $sender = new DoctrineSender($connection, $serializer); - $sender->send($envelope); + $actualEnvelope = $sender->send($envelope); + + /** @var TransportMessageIdStamp $transportMessageIdStamp */ + $transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class); + $this->assertNotNull($transportMessageIdStamp); + $this->assertSame('15', $transportMessageIdStamp->getId()); } public function testSendWithDelay() diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/SingleMessageReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/SingleMessageReceiverTest.php new file mode 100644 index 0000000000000..a900e7ba0ff55 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/SingleMessageReceiverTest.php @@ -0,0 +1,50 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\Sender; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; +use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver; +use Symfony\Component\Messenger\Transport\Sender\SenderInterface; +use Symfony\Component\Messenger\Transport\Sender\SendersLocator; + +class SingleMessageReceiverTest extends TestCase +{ + public function testItReceivesOnlyOneMessage() + { + $innerReceiver = $this->createMock(ReceiverInterface::class); + $envelope = new Envelope(new \stdClass()); + + $receiver = new SingleMessageReceiver($innerReceiver, $envelope); + $received = \iterator_to_array($receiver->get()); + $this->assertCount(1, $received); + $this->assertSame($received[0], $envelope); + + $this->assertEmpty(\iterator_to_array($receiver->get())); + } + + public function testCallsAreForwarded() + { + $envelope = new Envelope(new \stdClass()); + + $innerReceiver = $this->createMock(ReceiverInterface::class); + $innerReceiver->expects($this->once())->method('ack')->with($envelope); + $innerReceiver->expects($this->once())->method('reject')->with($envelope); + + $receiver = new SingleMessageReceiver($innerReceiver, $envelope); + $receiver->ack($envelope); + $receiver->reject($envelope); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php index 1de28332b388d..e085d721ef4cd 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php @@ -12,7 +12,9 @@ namespace Symfony\Component\Messenger\Tests\Transport\Sender; use PHPUnit\Framework\TestCase; +use Psr\Container\ContainerInterface; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\UnknownSenderException; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; @@ -23,21 +25,85 @@ class SendersLocatorTest extends TestCase public function testItReturnsTheSenderBasedOnTheMessageClass() { $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); + $sendersLocator = $this->createContainer([ + 'my_sender' => $sender, + ]); $locator = new SendersLocator([ - DummyMessage::class => [$sender], + DummyMessage::class => ['my_sender'], + ], $sendersLocator); + + $this->assertSame(['my_sender' => $sender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessage('a'))))); + $this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage())))); + } + + public function testGetSenderByAlias() + { + $sender1 = $this->getMockBuilder(SenderInterface::class)->getMock(); + $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock(); + $sendersLocator = $this->createContainer([ + 'sender1' => $sender1, + 'sender2' => $sender2, + ]); + + $locator = new SendersLocator([], $sendersLocator); + + $this->assertSame($sender1, $locator->getSenderByAlias('sender1')); + $this->assertSame($sender2, $locator->getSenderByAlias('sender2')); + } + + public function testGetSenderByAliasThrowsException() + { + $this->expectException(UnknownSenderException::class); + $this->expectExceptionMessage('Unknown sender alias'); + + $sender1 = $this->getMockBuilder(SenderInterface::class)->getMock(); + $sendersLocator = $this->createContainer([ + 'sender1' => $sender1, ]); + $locator = new SendersLocator([], $sendersLocator); + $locator->getSenderByAlias('sender2'); + } + + /** + * @group legacy + */ + public function testItReturnsTheSenderBasedOnTheMessageClassLegacy() + { + $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); + $locator = new SendersLocator([ + DummyMessage::class => [$sender], + ]); $this->assertSame([$sender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessage('a'))))); $this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage())))); } - public function testItYieldsProvidedSenderAliasAsKey() + /** + * @group legacy + */ + public function testItYieldsProvidedSenderAliasAsKeyLegacy() { $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); $locator = new SendersLocator([ DummyMessage::class => ['dummy' => $sender], ]); - $this->assertSame(['dummy' => $sender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessage('a'))))); } + + private function createContainer(array $senders) + { + $container = $this->createMock(ContainerInterface::class); + $container->expects($this->any()) + ->method('has') + ->willReturnCallback(function($id) use ($senders) { + return isset($senders[$id]); + }); + $container->expects($this->any()) + ->method('get') + ->willReturnCallback(function($id) use ($senders) { + return $senders[$id]; + }); + + return $container; + } } diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index e899f1cee8b75..46e8149f6c327 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -97,7 +97,7 @@ public function testDispatchCausesRetry() $this->assertNotNull($redeliveryStamp); // retry count now at 1 $this->assertSame(1, $redeliveryStamp->getRetryCount()); - $this->assertTrue($redeliveryStamp->shouldRedeliverToSender('Some\Sender', 'sender_alias')); + $this->assertSame('sender_alias', $redeliveryStamp->getSenderClassOrAlias()); // received stamp is removed $this->assertNull($envelope->last(ReceivedStamp::class)); diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php index 5bc0324489e64..d2d03c859cdeb 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php @@ -110,7 +110,11 @@ public function reject(Envelope $envelope): void */ public function getMessageCount(): int { - return $this->connection->countMessagesInQueues(); + try { + return $this->connection->countMessagesInQueues(); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } } private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueName): void diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php b/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php index 602ee04715c1e..3663ca1726126 100644 --- a/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php @@ -102,9 +102,10 @@ public static function buildConfiguration($dsn, array $options = []) /** * @param int $delay The delay in milliseconds * + * @return string The inserted id * @throws \Doctrine\DBAL\DBALException */ - public function send(string $body, array $headers, int $delay = 0): void + public function send(string $body, array $headers, int $delay = 0): string { $now = new \DateTime(); $availableAt = (clone $now)->modify(sprintf('+%d seconds', $delay / 1000)); @@ -126,6 +127,8 @@ public function send(string $body, array $headers, int $delay = 0): void ':created_at' => self::formatDateTime($now), ':available_at' => self::formatDateTime($availableAt), ]); + + return $this->driverConnection->lastInsertId(); } public function get(): ?array @@ -205,14 +208,42 @@ public function getMessageCount(): int return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchColumn(); } + public function findAll(int $limit = null): array + { + if ($this->configuration['auto_setup']) { + $this->setup(); + } + + $queryBuilder = $this->createAvailableMessagesQueryBuilder(); + if (null !== $limit) { + $queryBuilder->setMaxResults($limit); + } + + return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchAll(); + } + + public function find($id): ?array + { + if ($this->configuration['auto_setup']) { + $this->setup(); + } + + $queryBuilder = $this->createQueryBuilder() + ->where('m.id = :id'); + + $data = $this->executeQuery($queryBuilder->getSQL(), [ + 'id' => $id, + ])->fetch(); + + return false === $data ? null : $data; + } + private function createAvailableMessagesQueryBuilder(): QueryBuilder { $now = new \DateTime(); $redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout'])); - return $this->driverConnection->createQueryBuilder() - ->select('m.*') - ->from($this->configuration['table_name'], 'm') + return $this->createQueryBuilder() ->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit') ->andWhere('m.available_at <= :now') ->andWhere('m.queue_name = :queue_name') @@ -223,6 +254,13 @@ private function createAvailableMessagesQueryBuilder(): QueryBuilder ]); } + private function createQueryBuilder(): QueryBuilder + { + return $this->driverConnection->createQueryBuilder() + ->select('m.*') + ->from($this->configuration['table_name'], 'm'); + } + private function executeQuery(string $sql, array $parameters = []) { $stmt = null; diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php index 65f2eacdc89b4..aad8cf236d5aa 100644 --- a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php @@ -16,6 +16,8 @@ use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; +use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; @@ -26,7 +28,7 @@ * * @experimental in 4.3 */ -class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface +class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface { private $connection; private $serializer; @@ -52,18 +54,7 @@ public function get(): iterable return []; } - try { - $envelope = $this->serializer->decode([ - 'body' => $doctrineEnvelope['body'], - 'headers' => $doctrineEnvelope['headers'], - ]); - } catch (MessageDecodingFailedException $exception) { - $this->connection->reject($doctrineEnvelope['id']); - - throw $exception; - } - - yield $envelope->with(new DoctrineReceivedStamp($doctrineEnvelope['id'])); + yield $this->createEnvelopeFromData($doctrineEnvelope); } /** @@ -71,7 +62,11 @@ public function get(): iterable */ public function ack(Envelope $envelope): void { - $this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId()); + try { + $this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId()); + } catch (DBALException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } } /** @@ -79,7 +74,11 @@ public function ack(Envelope $envelope): void */ public function reject(Envelope $envelope): void { - $this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId()); + try { + $this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId()); + } catch (DBALException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } } /** @@ -87,7 +86,45 @@ public function reject(Envelope $envelope): void */ public function getMessageCount(): int { - return $this->connection->getMessageCount(); + try { + return $this->connection->getMessageCount(); + } catch (DBALException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + } + + /** + * {@inheritdoc} + */ + public function all(int $limit = null): iterable + { + try { + $doctrineEnvelopes = $this->connection->findAll($limit); + } catch (DBALException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + + foreach ($doctrineEnvelopes as $doctrineEnvelope) { + yield $this->createEnvelopeFromData($doctrineEnvelope); + } + } + + /** + * {@inheritdoc} + */ + public function find($id): ?Envelope + { + try { + $doctrineEnvelope = $this->connection->find($id); + } catch (DBALException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + + if (null === $doctrineEnvelope) { + return null; + } + + return $this->createEnvelopeFromData($doctrineEnvelope); } private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp @@ -101,4 +138,23 @@ private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceived return $doctrineReceivedStamp; } + + private function createEnvelopeFromData(array $data): Envelope + { + try { + $envelope = $this->serializer->decode([ + 'body' => $data['body'], + 'headers' => $data['headers'], + ]); + } catch (MessageDecodingFailedException $exception) { + $this->connection->reject($data['id']); + + throw $exception; + } + + return $envelope->with( + new DoctrineReceivedStamp($data['id']), + new TransportMessageIdStamp($data['id']) + ); + } } diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineSender.php b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineSender.php index 8329d2a53f1bd..e90a7f7e1d112 100644 --- a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineSender.php +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineSender.php @@ -15,6 +15,7 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -47,11 +48,11 @@ public function send(Envelope $envelope): Envelope $delay = null !== $delayStamp ? $delayStamp->getDelay() : 0; try { - $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay); + $id = $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay); } catch (DBALException $exception) { throw new TransportException($exception->getMessage(), 0, $exception); } - return $envelope; + return $envelope->with(new TransportMessageIdStamp($id)); } } diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php index 09d9d5cadfa95..1e0db72e76a8d 100644 --- a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php @@ -12,6 +12,8 @@ namespace Symfony\Component\Messenger\Transport\Doctrine; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; +use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\SetupableTransportInterface; use Symfony\Component\Messenger\Transport\TransportInterface; @@ -21,7 +23,7 @@ * * @experimental in 4.3 */ -class DoctrineTransport implements TransportInterface, SetupableTransportInterface +class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface { private $connection; private $serializer; @@ -58,6 +60,31 @@ public function reject(Envelope $envelope): void ($this->receiver ?? $this->getReceiver())->reject($envelope); } + /** + * {@inheritdoc} + */ + public function getMessageCount(): int + { + return ($this->receiver ?? $this->getReceiver())->getMessageCount(); + } + + /** + * {@inheritdoc} + */ + public function all(int $limit = null): iterable + { + return ($this->receiver ?? $this->getReceiver())->all($limit); + } + + /** + * {@inheritdoc} + */ + public function find($id): ?Envelope + { + return ($this->receiver ?? $this->getReceiver())->find($id); + } + + /** * {@inheritdoc} */ diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/ListableReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/ListableReceiverInterface.php new file mode 100644 index 0000000000000..b4c19295a9318 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Receiver/ListableReceiverInterface.php @@ -0,0 +1,33 @@ + + * + * @experimental in 4.3 + */ +interface ListableReceiverInterface extends ReceiverInterface +{ + /** + * Returns all the messages (up to the limit) in this receiver. + * + * Messages should be given the same stamps as when using ReceiverInterface::get(). + * + * @return Envelope[]|iterable + */ + public function all(int $limit = null): iterable; + + /** + * Returns the Envelope by id or none. + * + * Message should be given the same stamps as when using ReceiverInterface::get(). + */ + public function find($id): ?Envelope; +} diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php index 38f9b9ff35a64..6393c0d8c16a0 100644 --- a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php @@ -35,6 +35,8 @@ interface ReceiverInterface * call to get() takes a long time, blocking other receivers from * being called. * + * If applicable, the Envelope should contain a TransportMessageIdStamp. + * * If a received message cannot be decoded, the message should not * be retried again (e.g. if there's a queue, it should be removed) * and a MessageDecodingFailedException should be thrown. diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/SingleMessageReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/SingleMessageReceiver.php new file mode 100644 index 0000000000000..9aec49d6c0af5 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Receiver/SingleMessageReceiver.php @@ -0,0 +1,47 @@ + + * + * @internal + * @experimental in 4.3 + */ +class SingleMessageReceiver implements ReceiverInterface +{ + private $receiver; + private $envelope; + private $hasReceived = false; + + public function __construct(ReceiverInterface $receiver, Envelope $envelope) + { + $this->receiver = $receiver; + $this->envelope = $envelope; + } + + public function get(): iterable + { + if ($this->hasReceived) { + return []; + } + + $this->hasReceived = true; + + yield $this->envelope; + } + + public function ack(Envelope $envelope): void + { + $this->receiver->ack($envelope); + } + + public function reject(Envelope $envelope): void + { + $this->receiver->reject($envelope); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/Sender/SenderInterface.php b/src/Symfony/Component/Messenger/Transport/Sender/SenderInterface.php index f526a413e5ba2..719442188ffbb 100644 --- a/src/Symfony/Component/Messenger/Transport/Sender/SenderInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Sender/SenderInterface.php @@ -25,6 +25,8 @@ interface SenderInterface * * The sender can read different stamps for transport configuration, * like delivery delay. + * + * If applicable, the returned Envelope should contain a TransportMessageIdStamp. */ public function send(Envelope $envelope): Envelope; } diff --git a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php index 60dde8fb53446..80a01c291a99b 100644 --- a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php +++ b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php @@ -11,7 +11,11 @@ namespace Symfony\Component\Messenger\Transport\Sender; +use Psr\Container\ContainerInterface; +use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\RuntimeException; +use Symfony\Component\Messenger\Exception\UnknownSenderException; use Symfony\Component\Messenger\Handler\HandlersLocator; /** @@ -23,17 +27,30 @@ */ class SendersLocator implements SendersLocatorInterface { - private $senders; + private $sendersMap; + private $sendersLocator; + private $useLegacyLookup = false; private $sendAndHandle; /** - * @param SenderInterface[][] $senders - * @param bool[] $sendAndHandle + * @param string[][] $sendersMap An array, keyed by "type", set to an array of sender aliases + * @param ContainerInterface $sendersLocator Locator of senders, keyed by sender alias + * @param bool[] $sendAndHandle */ - public function __construct(array $senders, array $sendAndHandle = []) + public function __construct(array $sendersMap, /*ContainerInterface*/ $sendersLocator = null, array $sendAndHandle = []) { - $this->senders = $senders; - $this->sendAndHandle = $sendAndHandle; + $this->sendersMap = $sendersMap; + + if (is_array($sendersLocator) || null === $sendersLocator) { + @trigger_error(sprintf('"%s::__construct()" requires a "%s" as 2nd argument. Not doing so is deprecated since Symfony 4.3 and will be required in 5.0.', __CLASS__, ContainerInterface::class), E_USER_DEPRECATED); + // "%s" requires a "%s" as 2nd argument. Not doing so is deprecated since Symfony 4.3 and will be required in 5.0.' + $this->sendersLocator = new ServiceLocator([]); + $this->sendAndHandle = $sendersLocator; + $this->useLegacyLookup = true; + } else { + $this->sendersLocator = $sendersLocator; + $this->sendAndHandle = $sendAndHandle; + } } /** @@ -46,14 +63,43 @@ public function getSenders(Envelope $envelope, ?bool &$handle = false): iterable $seen = []; foreach (HandlersLocator::listTypes($envelope) as $type) { - foreach ($this->senders[$type] ?? [] as $alias => $sender) { - if (!\in_array($sender, $seen, true)) { - yield $alias => $seen[] = $sender; + // the old way of looking up senders + if ($this->useLegacyLookup) { + foreach ($this->sendersMap[$type] ?? [] as $alias => $sender) { + if (!\in_array($sender, $seen, true)) { + yield $alias => $seen[] = $sender; + } + } + + $handle = $handle ?: $this->sendAndHandle[$type] ?? false; + + continue; + } + + foreach ($this->sendersMap[$type] ?? [] as $senderAlias) { + if (!\in_array($senderAlias, $seen, true)) { + if (!$this->sendersLocator->has($senderAlias)) { + throw new RuntimeException(sprintf('Invalid senders configuration: sender "%s" is not in the senders locator.', $senderAlias)); + } + + $seen[] = $senderAlias; + $sender = $this->sendersLocator->get($senderAlias); + yield $senderAlias => $sender; } } + $handle = $handle ?: $this->sendAndHandle[$type] ?? false; } $handle = $handle || null === $sender; } + + public function getSenderByAlias(string $alias): SenderInterface + { + if ($this->sendersLocator->has($alias)) { + return $this->sendersLocator->get($alias); + } + + throw new UnknownSenderException(sprintf('Unknown sender alias "%s"', $alias)); + } } diff --git a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php index e802bebff4deb..1bcb8715f55d1 100644 --- a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Transport\Sender; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\UnknownSenderException; /** * Maps a message to a list of senders. @@ -32,4 +33,13 @@ interface SendersLocatorInterface * @return iterable|SenderInterface[] Indexed by sender alias if available */ public function getSenders(Envelope $envelope, ?bool &$handle = false): iterable; + + /** + * Returns a specific sender by its alias. + * + * @param string $alias The alias given to the sender in getSenders() + * + * @throws UnknownSenderException If the sender is not found + */ + public function getSenderByAlias(string $alias): SenderInterface; } diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index cf3a5f68951c4..44ff30a7297c4 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -47,7 +47,7 @@ class Worker implements WorkerInterface * @param ReceiverInterface[] $receivers Where the key is the transport name * @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match) */ - public function __construct(array $receivers, MessageBusInterface $bus, $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null) + public function __construct(array $receivers, MessageBusInterface $bus, array $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null) { $this->receivers = $receivers; $this->bus = $bus; @@ -114,9 +114,14 @@ public function run(array $options = [], callable $onHandledCallback = null): vo $this->dispatchEvent(new WorkerStoppedEvent()); } - private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy) + private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy): void { - $this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $transportName)); + $event = new WorkerMessageReceivedEvent($envelope, $transportName); + $this->dispatchEvent($event); + + if (!$event->shouldHandle()) { + return; + } $message = $envelope->getMessage(); $context = [ @@ -148,7 +153,7 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, // add the delay and retry stamp info + remove ReceivedStamp $retryEnvelope = $envelope->with(new DelayStamp($retryStrategy->getWaitingTime($envelope))) - ->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope))) + ->with(new RedeliveryStamp($retryCount, $this->getSenderClassOrAlias($envelope))) ->withoutAll(ReceivedStamp::class); // re-send the message @@ -199,6 +204,15 @@ private function shouldRetry(\Throwable $e, Envelope $envelope, ?RetryStrategyIn return false; } + $sentStamp = $envelope->last(SentStamp::class); + if (null === $sentStamp) { + if (null !== $this->logger) { + $this->logger->warning('Message will not be retried because the SentStamp is missing and so the target sender cannot be determined.'); + } + + return false; + } + return $retryStrategy->isRetryable($envelope); } @@ -210,11 +224,16 @@ private function getRetryCount(Envelope $envelope): int return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0; } - private function getSenderAlias(Envelope $envelope): ?string + private function getSenderClassOrAlias(Envelope $envelope): string { /** @var SentStamp|null $sentStamp */ $sentStamp = $envelope->last(SentStamp::class); - return $sentStamp ? $sentStamp->getSenderAlias() : null; + if (null === $sentStamp) { + // should not happen, because of the check in shouldRetry() + throw new LogicException('Could not find SentStamp.'); + } + + return $sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass(); } } diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json index 02d25efd5fc4f..4b552c96437ba 100644 --- a/src/Symfony/Component/Messenger/composer.json +++ b/src/Symfony/Component/Messenger/composer.json @@ -23,6 +23,7 @@ "doctrine/dbal": "^2.5", "psr/cache": "~1.0", "symfony/console": "~3.4|~4.0", + "symfony/debug": "~4.1", "symfony/dependency-injection": "~3.4.19|^4.1.8", "symfony/doctrine-bridge": "~3.4|~4.0", "symfony/event-dispatcher": "~4.3", @@ -35,7 +36,8 @@ "symfony/var-dumper": "~3.4|~4.0" }, "conflict": { - "symfony/event-dispatcher": "<4.3" + "symfony/event-dispatcher": "<4.3", + "symfony/debug": "<4.1" }, "suggest": { "enqueue/messenger-adapter": "For using the php-enqueue library as a transport."