diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
index 0f7ad4c11f312..b9f2e20b2b22a 100644
--- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
+++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
@@ -1200,6 +1200,10 @@ function ($a) {
->end()
->end()
->end()
+ ->scalarNode('failure_transport')
+ ->defaultNull()
+ ->info('Transport name to send failed messages to (after all retries have failed).')
+ ->end()
->scalarNode('default_bus')->defaultNull()->end()
->arrayNode('buses')
->defaultValue(['messenger.bus.default' => ['default_middleware' => true, 'middleware' => []]])
diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
index fda2aa5e38f22..94f1b2e5b33d3 100644
--- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
@@ -40,9 +40,9 @@
use Symfony\Component\Console\Application;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\DependencyInjection\Alias;
-use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
use Symfony\Component\DependencyInjection\ChildDefinition;
+use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\DependencyInjection\Definition;
@@ -284,6 +284,9 @@ public function load(array $configs, ContainerBuilder $container)
$container->removeDefinition('console.command.messenger_debug');
$container->removeDefinition('console.command.messenger_stop_workers');
$container->removeDefinition('console.command.messenger_setup_transports');
+ $container->removeDefinition('console.command.messenger_failed_messages_retry');
+ $container->removeDefinition('console.command.messenger_failed_messages_show');
+ $container->removeDefinition('console.command.messenger_failed_messages_remove');
}
$propertyInfoEnabled = $this->isConfigEnabled($container, $config['property_info']);
@@ -1743,22 +1746,48 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
}
- $senders = [];
+
+ // make sure senderAliases contains all senders
foreach ($messageConfiguration['senders'] as $sender) {
- $senders[$sender] = new Reference($senderAliases[$sender] ?? $sender);
+ if (!isset($senderAliases[$sender])) {
+ $senderAliases[$sender] = $sender;
+ }
}
- $messageToSendersMapping[$message] = new IteratorArgument($senders);
+ $messageToSendersMapping[$message] = $messageConfiguration['senders'];
$messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle'];
}
+ $senderReferences = [];
+ foreach ($senderAliases as $alias => $serviceId) {
+ $senderReferences[$alias] = new Reference($serviceId);
+ }
+
$container->getDefinition('messenger.senders_locator')
->replaceArgument(0, $messageToSendersMapping)
- ->replaceArgument(1, $messagesToSendAndHandle)
+ ->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences))
+ ->replaceArgument(2, $messagesToSendAndHandle)
;
$container->getDefinition('messenger.retry_strategy_locator')
->replaceArgument(0, $transportRetryReferences);
+
+ $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
+ ->replaceArgument(1, $config['failure_transport']);
+
+ if ($config['failure_transport']) {
+ $container->getDefinition('console.command.messenger_failed_messages_retry')
+ ->replaceArgument(0, $config['failure_transport'])
+ ->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null);
+ $container->getDefinition('console.command.messenger_failed_messages_show')
+ ->replaceArgument(0, $config['failure_transport']);
+ $container->getDefinition('console.command.messenger_failed_messages_remove')
+ ->replaceArgument(0, $config['failure_transport']);
+ } else {
+ $container->removeDefinition('console.command.messenger_failed_messages_retry');
+ $container->removeDefinition('console.command.messenger_failed_messages_show');
+ $container->removeDefinition('console.command.messenger_failed_messages_remove');
+ }
}
private function registerCacheConfiguration(array $config, ContainerBuilder $container)
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
index 1c74220bf8417..46c103cee4675 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
@@ -114,6 +114,31 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
index b0bcf2fd5ccbb..1671810d395a4 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
@@ -9,7 +9,8 @@
-
+
+
@@ -91,5 +92,19 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php
index bd3ba1c18d8c0..3906a3dc7a347 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php
@@ -328,6 +328,7 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
'enabled' => !class_exists(FullStack::class) && interface_exists(MessageBusInterface::class),
'routing' => [],
'transports' => [],
+ 'failure_transport' => null,
'serializer' => [
'default_serializer' => 'messenger.transport.native_php_serializer',
'symfony_serializer' => [
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
index 683264821acbb..f1dade56c6083 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
@@ -721,12 +721,16 @@ public function testMessengerRouting()
'*' => false,
];
- $this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(1));
+ $this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(2));
$sendersMapping = $senderLocatorDefinition->getArgument(0);
$this->assertEquals([
- 'amqp' => new Reference('messenger.transport.amqp'),
- 'audit' => new Reference('audit'),
- ], $sendersMapping[DummyMessage::class]->getValues());
+ 'amqp',
+ 'audit',
+ ], $sendersMapping[DummyMessage::class]);
+ $sendersLocator = $container->getDefinition((string) $senderLocatorDefinition->getArgument(1));
+ $this->assertSame(['amqp', 'audit'], array_keys($sendersLocator->getArgument(0)));
+ $this->assertEquals(new Reference('messenger.transport.amqp'), $sendersLocator->getArgument(0)['amqp']->getValues()[0]);
+ $this->assertEquals(new Reference('audit'), $sendersLocator->getArgument(0)['audit']->getValues()[0]);
}
public function testMessengerTransportConfiguration()
diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md
index 969b01fcc3acf..37c055d9c31d2 100644
--- a/src/Symfony/Component/Messenger/CHANGELOG.md
+++ b/src/Symfony/Component/Messenger/CHANGELOG.md
@@ -4,6 +4,13 @@ CHANGELOG
4.3.0
-----
+ * [BC BREAK] `SendersLocatorInterface` has an additional method:
+ `getSenderByAlias()`.
+ * A new `ListableReceiverInterface` was added, which a receiver
+ can implement (when applicable) to enable listing and fetching
+ individual messages by id (used in the new "Failed Messages" commands).
+ * Both `SenderInterface::send()` and `ReceiverInterface::get()`
+ should now (when applicable) add a `TransportMessageIdStamp`.
* Added `WorkerStoppedEvent` dispatched when a worker is stopped.
* Added optional `MessageCountAwareInterface` that receivers can implement
to give information about how many messages are waiting to be processed.
diff --git a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php
new file mode 100644
index 0000000000000..854daba0e6288
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php
@@ -0,0 +1,112 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Command;
+
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Helper\Dumper;
+use Symfony\Component\Console\Style\SymfonyStyle;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
+use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
+use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
+use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
+
+/**
+ * @author Ryan Weaver
+ *
+ * @internal
+ * @experimental in 4.3
+ */
+abstract class AbstractFailedMessagesCommand extends Command
+{
+ private $receiverName;
+ private $receiver;
+
+ public function __construct(string $receiverName, ReceiverInterface $receiver)
+ {
+ $this->receiverName = $receiverName;
+ $this->receiver = $receiver;
+
+ parent::__construct();
+ }
+
+ protected function getReceiverName()
+ {
+ return $this->receiverName;
+ }
+
+ /**
+ * @return mixed|null
+ */
+ protected function getMessageId(Envelope $envelope)
+ {
+ /** @var TransportMessageIdStamp $stamp */
+ $stamp = $envelope->last(TransportMessageIdStamp::class);
+
+ return null !== $stamp ? $stamp->getId() : null;
+ }
+
+ protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
+ {
+ $io->title('Failed Message Details');
+
+ /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
+ $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
+
+ $rows = [
+ ['Class', \get_class($envelope->getMessage())],
+ ];
+
+ if (null !== $id = $this->getMessageId($envelope)) {
+ $rows[] = ['Message Id', $id];
+ }
+
+ if (null === $sentToFailureTransportStamp) {
+ $io->warning('Message does not appear to have been sent to this transport after failing');
+ } else {
+ $rows = array_merge($rows, [
+ ['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
+ ['Error', $sentToFailureTransportStamp->getExceptionMessage()],
+ ['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'],
+ ['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
+ ]);
+ }
+
+ $io->table([], $rows);
+
+ if ($io->isVeryVerbose()) {
+ $io->title('Message:');
+ $dump = new Dumper($io);
+ $io->writeln($dump($envelope->getMessage()));
+ $io->title('Exception:');
+ $io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString());
+ } else {
+ $io->writeln(' Re-run command with -vv to see more message & error details.');
+ }
+ }
+
+ protected function printPendingMessagesMessage(ReceiverInterface $receiver, SymfonyStyle $io)
+ {
+ if ($receiver instanceof MessageCountAwareInterface) {
+ if (1 === $receiver->getMessageCount()) {
+ $io->writeln('There is 1 message pending in the failure transport.');
+ } else {
+ $io->writeln(sprintf('There are %d messages pending in the failure transport.', $receiver->getMessageCount()));
+ }
+ }
+ }
+
+ protected function getReceiver(): ReceiverInterface
+ {
+ return $this->receiver;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php
new file mode 100644
index 0000000000000..f42e917b833b9
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php
@@ -0,0 +1,88 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Command;
+
+use Symfony\Component\Console\Exception\RuntimeException;
+use Symfony\Component\Console\Input\InputArgument;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Input\InputOption;
+use Symfony\Component\Console\Output\ConsoleOutputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Console\Style\SymfonyStyle;
+use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
+use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
+
+/**
+ * @author Ryan Weaver
+ *
+ * @experimental in 4.3
+ */
+class FailedMessagesRemoveCommand extends AbstractFailedMessagesCommand
+{
+ protected static $defaultName = 'messenger:failed:remove';
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function configure(): void
+ {
+ $this
+ ->setDefinition([
+ new InputArgument('id', InputArgument::REQUIRED, 'Specific message id to remove'),
+ new InputOption('force', null, InputOption::VALUE_NONE, 'Force the operation without confirmation'),
+ ])
+ ->setDescription('Remove a message from the failure transport.')
+ ->setHelp(<<<'EOF'
+The %command.name% removes a message that is pending in the failure transport.
+
+ php %command.full_name% {id}
+
+The specific id can be found via the messenger:failed:show command.
+EOF
+ )
+ ;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function execute(InputInterface $input, OutputInterface $output)
+ {
+ $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
+
+ $receiver = $this->getReceiver();
+
+ $shouldForce = $input->getOption('force');
+ $this->removeSingleMessage($input->getArgument('id'), $receiver, $io, $shouldForce);
+ }
+
+ private function removeSingleMessage($id, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce)
+ {
+ if (!$receiver instanceof ListableReceiverInterface) {
+ throw new RuntimeException(sprintf('The "%s" receiver does not support removing specific messages.', $this->getReceiverName()));
+ }
+
+ $envelope = $receiver->find($id);
+ if (null === $envelope) {
+ throw new RuntimeException(sprintf('The message with id "%s" was not found.', $id));
+ }
+ $this->displaySingleMessage($envelope, $io);
+
+ if ($shouldForce || $io->confirm('Do you want to permanently remove this message?', false)) {
+ $receiver->reject($envelope);
+
+ $io->success('Message removed.');
+ } else {
+ $io->note('Message not removed.');
+ }
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php
new file mode 100644
index 0000000000000..a2ebc290c179b
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php
@@ -0,0 +1,221 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Command;
+
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Console\Exception\RuntimeException;
+use Symfony\Component\Console\Input\InputArgument;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Input\InputOption;
+use Symfony\Component\Console\Output\ConsoleOutputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Console\Style\SymfonyStyle;
+use Symfony\Component\EventDispatcher\EventDispatcherInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
+use Symfony\Component\Messenger\Exception\LogicException;
+use Symfony\Component\Messenger\MessageBusInterface;
+use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
+use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
+use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
+use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
+use Symfony\Component\Messenger\Worker;
+
+/**
+ * @author Ryan Weaver
+ *
+ * @experimental in 4.3
+ */
+class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
+{
+ protected static $defaultName = 'messenger:failed:retry';
+
+ private $eventDispatcher;
+ private $messageBus;
+ private $retryStrategy;
+ private $logger;
+
+ public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, RetryStrategyInterface $retryStrategy = null, LoggerInterface $logger = null)
+ {
+ $this->eventDispatcher = $eventDispatcher;
+ $this->messageBus = $messageBus;
+ $this->retryStrategy = $retryStrategy;
+ $this->logger = $logger;
+
+ parent::__construct($receiverName, $receiver);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function configure(): void
+ {
+ $this
+ ->setDefinition([
+ new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
+ new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
+ ])
+ ->setDescription('Retries one or more messages from the failure transport.')
+ ->setHelp(<<<'EOF'
+The %command.name% retries message in the failure transport.
+
+ php %command.full_name%
+
+The command will interactively ask if each message should be retried
+or discarded.
+
+Some transports support retrying a specific message id, which comes
+from the messenger:failed:show command.
+
+ php %command.full_name% {id}
+
+Or pass multiple ids at once to process multiple messages:
+
+php %command.full_name% {id1} {id2} {id3}
+
+EOF
+ )
+ ;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function execute(InputInterface $input, OutputInterface $output)
+ {
+ $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
+ $io->comment('Quit this command with CONTROL-C.');
+ if (!$output->isVeryVerbose()) {
+ $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
+ }
+
+ $receiver = $this->getReceiver();
+ $this->printPendingMessagesMessage($receiver, $io);
+
+ $io->writeln(sprintf('To retry all the messages, run messenger:consume %s', $this->getReceiverName()));
+
+ $shouldForce = $input->getOption('force');
+ $ids = $input->getArgument('id');
+ if (0 === \count($ids)) {
+ if (!$input->isInteractive()) {
+ throw new RuntimeException('Message id must be passed when in non-interactive mode.');
+ }
+
+ $this->runInteractive($io, $shouldForce);
+
+ return;
+ }
+
+ $this->retrySpecificIds($ids, $io, $shouldForce);
+ $io->success('All done!');
+ }
+
+ private function runInteractive(SymfonyStyle $io, bool $shouldForce)
+ {
+ $receiver = $this->getReceiver();
+ $count = 0;
+ if ($receiver instanceof ListableReceiverInterface) {
+ // for listable receivers, find the messages one-by-one
+ // this avoids using get(), which for some less-robust
+ // transports (like Doctrine), will cause the message
+ // to be temporarily "acked", even if the user aborts
+ // handling the message
+ while (true) {
+ $ids = [];
+ foreach ($receiver->all(1) as $envelope) {
+ ++$count;
+
+ $id = $this->getMessageId($envelope);
+ if (null === $id) {
+ throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $this->getReceiverName()));
+ }
+ $ids[] = $id;
+ }
+
+ // break the loop if all messages are consumed
+ if (0 === \count($ids)) {
+ break;
+ }
+
+ $this->retrySpecificIds($ids, $io, $shouldForce);
+ }
+ } else {
+ // get() and ask messages one-by-one
+ $count = $this->runWorker($this->getReceiver(), $io, $shouldForce);
+ }
+
+ // avoid success message if nothing was processed
+ if (1 < $count) {
+ $io->success('All failed messages have been handled or removed!');
+ }
+ }
+
+ private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
+ {
+ $listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce) {
+ $envelope = $messageReceivedEvent->getEnvelope();
+
+ $this->displaySingleMessage($envelope, $io);
+
+ $shouldHandle = $shouldForce || $io->confirm('Do you want to retry (yes) or delete this message (no)?');
+
+ if ($shouldHandle) {
+ return;
+ }
+
+ $messageReceivedEvent->shouldHandle(false);
+ $receiver->reject($envelope);
+ };
+ $this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
+
+ $worker = new Worker(
+ [$this->getReceiverName() => $receiver],
+ $this->messageBus,
+ [$this->getReceiverName() => $this->retryStrategy],
+ $this->eventDispatcher,
+ $this->logger
+ );
+
+ $count = 0;
+ try {
+ $worker->run([], function (?Envelope $envelope) use ($worker, $io, &$count) {
+ ++$count;
+ if (null === $envelope) {
+ $worker->stop();
+ }
+ });
+ } finally {
+ $this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
+ }
+
+ return $count;
+ }
+
+ private function retrySpecificIds(array $ids, SymfonyStyle $io, bool $shouldForce)
+ {
+ $receiver = $this->getReceiver();
+
+ if (!$receiver instanceof ListableReceiverInterface) {
+ throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $this->getReceiverName()));
+ }
+
+ foreach ($ids as $id) {
+ $envelope = $receiver->find($id);
+ if (null === $envelope) {
+ throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
+ }
+
+ $singleReceiver = new SingleMessageReceiver($receiver, $envelope);
+ $this->runWorker($singleReceiver, $io, $shouldForce);
+ }
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php
new file mode 100644
index 0000000000000..da49a4b7fe2c5
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php
@@ -0,0 +1,129 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Command;
+
+use Symfony\Component\Console\Exception\RuntimeException;
+use Symfony\Component\Console\Input\InputArgument;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Input\InputOption;
+use Symfony\Component\Console\Output\ConsoleOutputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Console\Style\SymfonyStyle;
+use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
+use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
+
+/**
+ * @author Ryan Weaver
+ *
+ * @experimental in 4.3
+ */
+class FailedMessagesShowCommand extends AbstractFailedMessagesCommand
+{
+ protected static $defaultName = 'messenger:failed:show';
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function configure(): void
+ {
+ $this
+ ->setDefinition([
+ new InputArgument('id', InputArgument::OPTIONAL, 'Specific message id to show'),
+ new InputOption('max', null, InputOption::VALUE_REQUIRED, 'Maximum number of messages to list', 50),
+ ])
+ ->setDescription('Shows one or more messages from the failure transport.')
+ ->setHelp(<<<'EOF'
+The %command.name% shows message that are pending in the failure transport.
+
+ php %command.full_name%
+
+Or look at a specific message by its id:
+
+ php %command.full_name% {id}
+EOF
+ )
+ ;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function execute(InputInterface $input, OutputInterface $output)
+ {
+ $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
+
+ $receiver = $this->getReceiver();
+ $this->printPendingMessagesMessage($receiver, $io);
+
+ if (!$receiver instanceof ListableReceiverInterface) {
+ throw new RuntimeException(sprintf('The "%s" receiver does not support listing or showing specific messages.', $this->getReceiverName()));
+ }
+
+ if (null === $id = $input->getArgument('id')) {
+ $this->listMessages($io, $input->getOption('max'));
+ } else {
+ $this->showMessage($id, $io);
+ }
+ }
+
+ private function listMessages(SymfonyStyle $io, int $max)
+ {
+ /** @var ListableReceiverInterface $receiver */
+ $receiver = $this->getReceiver();
+ $envelopes = $receiver->all($max);
+
+ $rows = [];
+ foreach ($envelopes as $envelope) {
+ /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
+ $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
+
+ $rows[] = [
+ $this->getMessageId($envelope),
+ \get_class($envelope->getMessage()),
+ null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s'),
+ null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getExceptionMessage(),
+ ];
+ }
+
+ if (0 === \count($rows)) {
+ $io->success('No failed messages were found.');
+
+ return;
+ }
+
+ $io->table(['Id', 'Class', 'Failed at', 'Error'], $rows);
+
+ if (\count($rows) === $max) {
+ $io->comment(sprintf('Showing first %d messages.', $max));
+ }
+
+ $io->comment('Run messenger:failed:show {id} -vv to see message details.');
+ }
+
+ private function showMessage($id, SymfonyStyle $io)
+ {
+ /** @var ListableReceiverInterface $receiver */
+ $receiver = $this->getReceiver();
+ $envelope = $receiver->find($id);
+ if (null === $envelope) {
+ throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
+ }
+
+ $this->displaySingleMessage($envelope, $io);
+
+ $io->writeln([
+ '',
+ sprintf(' Run messenger:failed:retry %s to retry this message.', $id),
+ sprintf(' Run messenger:failed:remove %s to delete it.', $id),
+ ]);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php
index 46550f5121497..201688944d643 100644
--- a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php
+++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php
@@ -247,12 +247,18 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)
foreach ($receiverMapping as $name => $reference) {
$receiverNames[(string) $reference] = $name;
}
- if ($container->hasDefinition('console.command.messenger_consume_messages')) {
- $buses = [];
- foreach ($busIds as $busId) {
- $buses[$busId] = new Reference($busId);
- }
+ $buses = [];
+ foreach ($busIds as $busId) {
+ $buses[$busId] = new Reference($busId);
+ }
+
+ if ($container->hasDefinition('messenger.routable_message_bus')) {
+ $container->getDefinition('messenger.routable_message_bus')
+ ->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses));
+ }
+
+ if ($container->hasDefinition('console.command.messenger_consume_messages')) {
$container->getDefinition('console.command.messenger_consume_messages')
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
->replaceArgument(3, array_values($receiverNames));
@@ -264,6 +270,18 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)
}
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
+
+ $failedCommandIds = [
+ 'console.command.messenger_failed_messages_retry',
+ 'console.command.messenger_failed_messages_show',
+ 'console.command.messenger_failed_messages_remove',
+ ];
+ foreach ($failedCommandIds as $failedCommandId) {
+ if ($container->hasDefinition($failedCommandId)) {
+ $definition = $container->getDefinition($failedCommandId);
+ $definition->replaceArgument(1, $receiverMapping[$definition->getArgument(0)]);
+ }
+ }
}
private function registerBusToCollector(ContainerBuilder $container, string $busId)
diff --git a/src/Symfony/Component/Messenger/Event/WorkerMessageReceivedEvent.php b/src/Symfony/Component/Messenger/Event/WorkerMessageReceivedEvent.php
index 3df75b8723477..4443853363244 100644
--- a/src/Symfony/Component/Messenger/Event/WorkerMessageReceivedEvent.php
+++ b/src/Symfony/Component/Messenger/Event/WorkerMessageReceivedEvent.php
@@ -20,4 +20,14 @@
*/
class WorkerMessageReceivedEvent extends AbstractWorkerMessageEvent
{
+ private $shouldHandle = true;
+
+ public function shouldHandle(bool $shouldHandle = null): bool
+ {
+ if (null !== $shouldHandle) {
+ $this->shouldHandle = $shouldHandle;
+ }
+
+ return $this->shouldHandle;
+ }
}
diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php
new file mode 100644
index 0000000000000..d62a8ed799444
--- /dev/null
+++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php
@@ -0,0 +1,89 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\EventListener;
+
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Debug\Exception\FlattenException;
+use Symfony\Component\EventDispatcher\EventSubscriberInterface;
+use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
+use Symfony\Component\Messenger\Exception\HandlerFailedException;
+use Symfony\Component\Messenger\MessageBusInterface;
+use Symfony\Component\Messenger\Stamp\ReceivedStamp;
+use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
+use Symfony\Component\Messenger\Stamp\SentStamp;
+use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
+use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
+
+/**
+ * Sends a rejected message to a "failure transport".
+ *
+ * @author Ryan Weaver
+ *
+ * @experimental in 4.3
+ */
+class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface
+{
+ private $messageBus;
+ private $failureSenderAlias;
+ private $logger;
+
+ public function __construct(MessageBusInterface $messageBus, string $failureSenderAlias, LoggerInterface $logger = null)
+ {
+ $this->messageBus = $messageBus;
+ $this->failureSenderAlias = $failureSenderAlias;
+ $this->logger = $logger;
+ }
+
+ public function onMessageFailed(WorkerMessageFailedEvent $event)
+ {
+ if ($event->willRetry()) {
+ return;
+ }
+
+ $envelope = $event->getEnvelope();
+
+ // avoid re-sending to the failed sender
+ foreach ($envelope->all(SentStamp::class) as $sentStamp) {
+ /** @var SentStamp $sentStamp */
+ if ($sentStamp->getSenderAlias() === $this->failureSenderAlias) {
+ return;
+ }
+ }
+
+ // remove the received stamp so it's redelivered
+ $throwable = $event->getThrowable();
+ if ($throwable instanceof HandlerFailedException) {
+ $throwable = $throwable->getNestedExceptions()[0];
+ }
+
+ $flattenedException = \class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null;
+ $envelope = $envelope->withoutAll(ReceivedStamp::class)
+ ->withoutAll(TransportMessageIdStamp::class)
+ ->with(new SentToFailureTransportStamp($throwable->getMessage(), $event->getReceiverName(), $flattenedException))
+ ->with(new RedeliveryStamp(0, $this->failureSenderAlias));
+
+ if (null !== $this->logger) {
+ $this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
+ 'class' => \get_class($envelope->getMessage()),
+ 'transport' => $this->failureSenderAlias,
+ ]);
+ }
+
+ $this->messageBus->dispatch($envelope);
+ }
+
+ public static function getSubscribedEvents()
+ {
+ return [
+ WorkerMessageFailedEvent::class => ['onMessageFailed', -100],
+ ];
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Exception/UnknownSenderException.php b/src/Symfony/Component/Messenger/Exception/UnknownSenderException.php
new file mode 100644
index 0000000000000..72fccfa566f90
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Exception/UnknownSenderException.php
@@ -0,0 +1,21 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Exception;
+
+/**
+ * @author Ryan Weaver
+ *
+ * @experimental in 4.3
+ */
+class UnknownSenderException extends \InvalidArgumentException implements ExceptionInterface
+{
+}
diff --git a/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php b/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php
index 94392e3c98c58..042d5842f84ea 100644
--- a/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php
+++ b/src/Symfony/Component/Messenger/Middleware/AddBusNameStampMiddleware.php
@@ -32,7 +32,9 @@ public function __construct(string $busName)
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
- $envelope = $envelope->with(new BusNameStamp($this->busName));
+ if (null === $envelope->last(BusNameStamp::class)) {
+ $envelope = $envelope->with(new BusNameStamp($this->busName));
+ }
return $stack->next()->handle($envelope, $stack);
}
diff --git a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php
index d2ebaf8cfa011..bc22da6b47f10 100644
--- a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php
+++ b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php
@@ -19,6 +19,7 @@
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
+use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
@@ -65,12 +66,7 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
// dispatch event unless this is a redelivery
$shouldDispatchEvent = null === $redeliveryStamp;
- foreach ($this->sendersLocator->getSenders($envelope, $handle) as $alias => $sender) {
- // on redelivery, only deliver to the given sender
- if (null !== $redeliveryStamp && !$redeliveryStamp->shouldRedeliverToSender(\get_class($sender), $alias)) {
- continue;
- }
-
+ foreach ($this->getSenders($envelope, $handle, $redeliveryStamp) as $alias => $sender) {
if (null !== $this->eventDispatcher && $shouldDispatchEvent) {
$event = new SendMessageToTransportsEvent($envelope);
$this->eventDispatcher->dispatch($event);
@@ -107,4 +103,18 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
// message should only be sent and not be handled by the next middleware
return $envelope;
}
+
+ /**
+ * * @return iterable|SenderInterface[]
+ */
+ private function getSenders(Envelope $envelope, &$handle, ?RedeliveryStamp $redeliveryStamp): iterable
+ {
+ if (null !== $redeliveryStamp) {
+ return [
+ $redeliveryStamp->getSenderClassOrAlias() => $this->sendersLocator->getSenderByAlias($redeliveryStamp->getSenderClassOrAlias()),
+ ];
+ }
+
+ return $this->sendersLocator->getSenders($envelope, $handle);
+ }
}
diff --git a/src/Symfony/Component/Messenger/Retry/MultiplierRetryStrategy.php b/src/Symfony/Component/Messenger/Retry/MultiplierRetryStrategy.php
index 394be2509d2c5..e883b3ffd9ac5 100644
--- a/src/Symfony/Component/Messenger/Retry/MultiplierRetryStrategy.php
+++ b/src/Symfony/Component/Messenger/Retry/MultiplierRetryStrategy.php
@@ -39,12 +39,12 @@ class MultiplierRetryStrategy implements RetryStrategyInterface
private $maxDelayMilliseconds;
/**
- * @param int $maxRetries The maximum number of time to retry (0 means indefinitely)
+ * @param int $maxRetries The maximum number of time to retry (null means indefinitely)
* @param int $delayMilliseconds Amount of time to delay (or the initial value when multiplier is used)
* @param float $multiplier Multiplier to apply to the delay each time a retry occurs
* @param int $maxDelayMilliseconds Maximum delay to allow (0 means no maximum)
*/
- public function __construct(int $maxRetries = 3, int $delayMilliseconds = 1000, float $multiplier = 1, int $maxDelayMilliseconds = 0)
+ public function __construct(?int $maxRetries = 3, int $delayMilliseconds = 1000, float $multiplier = 1, int $maxDelayMilliseconds = 0)
{
$this->maxRetries = $maxRetries;
@@ -66,7 +66,7 @@ public function __construct(int $maxRetries = 3, int $delayMilliseconds = 1000,
public function isRetryable(Envelope $message): bool
{
- if (0 === $this->maxRetries) {
+ if (null === $this->maxRetries) {
return true;
}
diff --git a/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php
index fd7c88bcbc5bb..a0dd9b89d869e 100644
--- a/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php
+++ b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php
@@ -44,17 +44,4 @@ public function getSenderClassOrAlias(): string
{
return $this->senderClassOrAlias;
}
-
- public function shouldRedeliverToSender(string $senderClass, ?string $senderAlias): bool
- {
- if (null !== $senderAlias && $senderAlias === $this->senderClassOrAlias) {
- return true;
- }
-
- if ($senderClass === $this->senderClassOrAlias) {
- return true;
- }
-
- return false;
- }
}
diff --git a/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php b/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php
new file mode 100644
index 0000000000000..05a5dfcacf77b
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Stamp/SentToFailureTransportStamp.php
@@ -0,0 +1,57 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Stamp;
+
+use Symfony\Component\Debug\Exception\FlattenException;
+
+/**
+ * Stamp applied when a message is sent to the failure transport.
+ *
+ * @author Ryan Weaver
+ *
+ * @experimental in 4.3
+ */
+class SentToFailureTransportStamp implements StampInterface
+{
+ private $exceptionMessage;
+ private $originalReceiverName;
+ private $flattenException;
+ private $sentAt;
+
+ public function __construct(string $exceptionMessage, string $originalReceiverName, FlattenException $flattenException = null)
+ {
+ $this->exceptionMessage = $exceptionMessage;
+ $this->originalReceiverName = $originalReceiverName;
+ $this->flattenException = $flattenException;
+ $this->sentAt = new \DateTime();
+ }
+
+ public function getExceptionMessage(): string
+ {
+ return $this->exceptionMessage;
+ }
+
+ public function getOriginalReceiverName(): string
+ {
+ return $this->originalReceiverName;
+ }
+
+ public function getFlattenException(): ?FlattenException
+ {
+ return $this->flattenException;
+ }
+
+ public function getSentAt(): \DateTime
+ {
+ return $this->sentAt;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Stamp/TransportMessageIdStamp.php b/src/Symfony/Component/Messenger/Stamp/TransportMessageIdStamp.php
new file mode 100644
index 0000000000000..b0b93ae1885c3
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Stamp/TransportMessageIdStamp.php
@@ -0,0 +1,37 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Stamp;
+
+/**
+ * Added by a sender or receiver to indicate the id of this message in that transport.
+ *
+ * @author Ryan Weaver
+ *
+ * @experimental in 4.3
+ */
+class TransportMessageIdStamp implements StampInterface
+{
+ private $id;
+
+ /**
+ * @param mixed $id some "identifier" of the message in a transport
+ */
+ public function __construct($id)
+ {
+ $this->id = $id;
+ }
+
+ public function getId()
+ {
+ return $this->id;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php
new file mode 100644
index 0000000000000..74cab385adf9b
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php
@@ -0,0 +1,37 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Command;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Console\Tester\CommandTester;
+use Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
+
+class FailedMessagesRemoveCommandTest extends TestCase
+{
+ public function testBasicRun()
+ {
+ $receiver = $this->createMock(ListableReceiverInterface::class);
+ $receiver->expects($this->once())->method('find')->with(20)->willReturn(new Envelope(new \stdClass()));
+
+ $command = new FailedMessagesRemoveCommand(
+ 'failure_receiver',
+ $receiver
+ );
+
+ $tester = new CommandTester($command);
+ $tester->execute(['id' => 20, '--force' => true]);
+
+ $this->assertContains('Message removed.', $tester->getDisplay());
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php
new file mode 100644
index 0000000000000..49f9dbfd2cf9b
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php
@@ -0,0 +1,49 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Command;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Console\Tester\CommandTester;
+use Symfony\Component\EventDispatcher\EventDispatcherInterface;
+use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\MessageBusInterface;
+use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
+use Symfony\Component\Messenger\Worker;
+
+class FailedMessagesRetryCommandTest extends TestCase
+{
+ public function testBasicRun()
+ {
+ $receiver = $this->createMock(ListableReceiverInterface::class);
+ $receiver->expects($this->exactly(2))->method('find')->withConsecutive([10], [12])->willReturn(new Envelope(new \stdClass()));
+ // message will eventually be ack'ed in Worker
+ $receiver->expects($this->exactly(2))->method('ack');
+
+ $dispatcher = $this->createMock(EventDispatcherInterface::class);
+ $bus = $this->createMock(MessageBusInterface::class);
+ // the bus should be called in the worker
+ $bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass()));
+
+ $command = new FailedMessagesRetryCommand(
+ 'failure_receiver',
+ $receiver,
+ $bus,
+ $dispatcher
+ );
+
+ $tester = new CommandTester($command);
+ $tester->execute(['id' => [10, 12]]);
+
+ $this->assertContains('[OK]', $tester->getDisplay());
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php
new file mode 100644
index 0000000000000..ad56462dbd655
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php
@@ -0,0 +1,57 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Command;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Console\Tester\CommandTester;
+use Symfony\Component\Messenger\Command\FailedMessagesShowCommand;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
+use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
+use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
+
+/**
+ * @group time-sensitive
+ */
+class FailedMessagesShowCommandTest extends TestCase
+{
+ public function testBasicRun()
+ {
+ $sentToFailureStamp = new SentToFailureTransportStamp('Things are bad!', 'async');
+ $envelope = new Envelope(new \stdClass(), [
+ new TransportMessageIdStamp(15),
+ $sentToFailureStamp,
+ ]);
+ $receiver = $this->createMock(ListableReceiverInterface::class);
+ $receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope);
+
+ $command = new FailedMessagesShowCommand(
+ 'failure_receiver',
+ $receiver
+ );
+
+ $tester = new CommandTester($command);
+ $tester->execute(['id' => 15]);
+
+ $this->assertContains(sprintf(<<getSentAt()->format('Y-m-d H:i:s')),
+ $tester->getDisplay(true));
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php
new file mode 100644
index 0000000000000..9f23e462c05f5
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php
@@ -0,0 +1,121 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Handler;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
+use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
+use Symfony\Component\Messenger\Exception\HandlerFailedException;
+use Symfony\Component\Messenger\MessageBusInterface;
+use Symfony\Component\Messenger\Stamp\ReceivedStamp;
+use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
+use Symfony\Component\Messenger\Stamp\SentStamp;
+use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
+use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
+
+class SendFailedMessageToFailureTransportListenerTest extends TestCase
+{
+ public function testItDispatchesToTheFailureTransport()
+ {
+ $bus = $this->createMock(MessageBusInterface::class);
+ $bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) {
+ /* @var Envelope $envelope */
+ $this->assertInstanceOf(Envelope::class, $envelope);
+
+ /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
+ $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
+ $this->assertNotNull($sentToFailureTransportStamp);
+ $this->assertSame('no!', $sentToFailureTransportStamp->getExceptionMessage());
+ $this->assertSame('my_receiver', $sentToFailureTransportStamp->getOriginalReceiverName());
+ $this->assertSame('no!', $sentToFailureTransportStamp->getFlattenException()->getMessage());
+
+ /** @var RedeliveryStamp $redeliveryStamp */
+ $redeliveryStamp = $envelope->last(RedeliveryStamp::class);
+ $this->assertSame('failure_sender', $redeliveryStamp->getSenderClassOrAlias());
+
+ $this->assertNull($envelope->last(ReceivedStamp::class));
+ $this->assertNull($envelope->last(TransportMessageIdStamp::class));
+
+ return true;
+ }))->willReturn(new Envelope(new \stdClass()));
+ $listener = new SendFailedMessageToFailureTransportListener(
+ $bus,
+ 'failure_sender'
+ );
+
+ $exception = new \Exception('no!');
+ $envelope = new Envelope(new \stdClass());
+ $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false);
+
+ $listener->onMessageFailed($event);
+ }
+
+ public function testItGetsNestedHandlerFailedException()
+ {
+ $bus = $this->createMock(MessageBusInterface::class);
+ $bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) {
+ /** @var Envelope $envelope */
+ /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
+ $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
+ $this->assertNotNull($sentToFailureTransportStamp);
+ $this->assertSame('I am inside!', $sentToFailureTransportStamp->getExceptionMessage());
+ $this->assertSame('Exception', $sentToFailureTransportStamp->getFlattenException()->getClass());
+
+ return true;
+ }))->willReturn(new Envelope(new \stdClass()));
+
+ $listener = new SendFailedMessageToFailureTransportListener(
+ $bus,
+ 'failure_sender'
+ );
+
+ $envelope = new Envelope(new \stdClass());
+ $exception = new \Exception('I am inside!');
+ $exception = new HandlerFailedException($envelope, [$exception]);
+ $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception, false);
+
+ $listener->onMessageFailed($event);
+ }
+
+ public function testDoNothingOnRetry()
+ {
+ $bus = $this->createMock(MessageBusInterface::class);
+ $bus->expects($this->never())->method('dispatch');
+ $listener = new SendFailedMessageToFailureTransportListener(
+ $bus,
+ 'failure_sender'
+ );
+
+ $envelope = new Envelope(new \stdClass());
+ $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), true);
+
+ $listener->onMessageFailed($event);
+ }
+
+ public function testDoNotRedeliverToFailed()
+ {
+ $bus = $this->createMock(MessageBusInterface::class);
+ $bus->expects($this->never())->method('dispatch');
+ $listener = new SendFailedMessageToFailureTransportListener(
+ $bus,
+ 'failure_sender'
+ );
+
+ $envelope = new Envelope(new \stdClass(), [
+ new SentStamp('MySender', 'failure_sender'),
+ ]);
+ $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), false);
+
+ $listener->onMessageFailed($event);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php
index 71fe15b25e90c..564800f744d4d 100644
--- a/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Middleware/AddBusNameStampMiddlewareTest.php
@@ -29,5 +29,9 @@ public function testItSendsTheMessageToAssignedSender()
$busNameStamp = $finalEnvelope->last(BusNameStamp::class);
$this->assertNotNull($busNameStamp);
$this->assertSame('the_bus_name', $busNameStamp->getBusName());
+
+ // the stamp should not be added over and over again
+ $finalEnvelope = $middleware->handle($finalEnvelope, $this->getStackMock());
+ $this->assertCount(1, $finalEnvelope->all(BusNameStamp::class));
}
}
diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php
index a88f93b6a3d25..2df828ddccd31 100644
--- a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php
@@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Tests\Middleware;
+use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
@@ -34,15 +35,16 @@ public function testItSendsTheMessageToAssignedSender()
$envelope = new Envelope($message);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
- $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
+ $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['my_sender']], ['my_sender' => $sender]);
+ $middleware = new SendMessageMiddleware($sendersLocator);
- $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->will($this->returnArgument(0));
+ $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'my_sender')))->will($this->returnArgument(0));
$envelope = $middleware->handle($envelope, $this->getStackMock(false));
/* @var SentStamp $stamp */
$this->assertInstanceOf(SentStamp::class, $stamp = $envelope->last(SentStamp::class), 'it adds a sent stamp');
- $this->assertNull($stamp->getSenderAlias());
+ $this->assertSame('my_sender', $stamp->getSenderAlias());
$this->assertStringMatchesFormat('Mock_SenderInterface_%s', $stamp->getSenderClass());
}
@@ -52,9 +54,8 @@ public function testItSendsTheMessageToMultipleSenders()
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
- $middleware = new SendMessageMiddleware(new SendersLocator([
- DummyMessage::class => ['foo' => $sender, 'bar' => $sender2],
- ]));
+ $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo', 'bar']], ['foo' => $sender, 'bar' => $sender2]);
+ $middleware = new SendMessageMiddleware($sendersLocator);
$sender->expects($this->once())
->method('send')
@@ -92,12 +93,16 @@ public function testItSendsToOnlyOneSenderOnRedelivery()
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
- $middleware = new SendMessageMiddleware(new SendersLocator([
- DummyMessage::class => ['foo' => $sender, 'bar' => $sender2],
- ], [
- // normally, this class sends and handles (but not on retry)
- DummyMessage::class => true,
- ]));
+ $sendersLocator = $this->createSendersLocator(
+ [DummyMessage::class => ['foo', 'bar']],
+ ['foo' => $sender, 'bar' => $sender2],
+ [
+ // normally, this class sends and handles (but not on retry)
+ DummyMessage::class => true,
+ ]
+ );
+
+ $middleware = new SendMessageMiddleware($sendersLocator);
$sender->expects($this->never())
->method('send')
@@ -116,9 +121,10 @@ public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage()
$envelope = new Envelope(new ChildDummyMessage('Hey'));
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
- $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
+ $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo_sender']], ['foo_sender' => $sender]);
+ $middleware = new SendMessageMiddleware($sendersLocator);
- $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
+ $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock(false));
}
@@ -129,11 +135,12 @@ public function testItAlsoCallsTheNextMiddlewareBasedOnTheMessageClass()
$envelope = new Envelope($message);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
- $middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [
+ $sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [
DummyMessage::class => true,
- ]));
+ ]);
+ $middleware = new SendMessageMiddleware($sendersLocator);
- $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
+ $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock());
}
@@ -144,11 +151,12 @@ public function testItAlsoCallsTheNextMiddlewareBasedOnTheMessageParentClass()
$envelope = new Envelope($message);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
- $middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [
+ $sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [
DummyMessage::class => true,
- ]));
+ ]);
+ $middleware = new SendMessageMiddleware($sendersLocator);
- $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
+ $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock());
}
@@ -159,11 +167,12 @@ public function testItAlsoCallsTheNextMiddlewareBasedOnTheMessageInterface()
$envelope = new Envelope($message);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
- $middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [
+ $sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [
DummyMessageInterface::class => true,
- ]));
+ ]);
+ $middleware = new SendMessageMiddleware($sendersLocator);
- $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
+ $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock());
}
@@ -174,11 +183,12 @@ public function testItAlsoCallsTheNextMiddlewareBasedOnWildcard()
$envelope = new Envelope($message);
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
- $middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]], [
+ $sendersLocator = $this->createSendersLocator(['*' => ['foo_sender']], ['foo_sender' => $sender], [
'*' => true,
- ]));
+ ]);
+ $middleware = new SendMessageMiddleware($sendersLocator);
- $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
+ $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender), 'foo_sender')))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock());
}
@@ -188,7 +198,7 @@ public function testItCallsTheNextMiddlewareWhenNoSenderForThisMessage()
$message = new DummyMessage('Hey');
$envelope = new Envelope($message);
- $middleware = new SendMessageMiddleware(new SendersLocator([]));
+ $middleware = new SendMessageMiddleware($this->createSendersLocator([], []));
$middleware->handle($envelope, $this->getStackMock());
}
@@ -199,7 +209,8 @@ public function testItSkipsReceivedMessages()
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
- $middleware = new SendMessageMiddleware(new SendersLocator(['*' => [$sender]]));
+ $sendersLocator = $this->createSendersLocator(['*' => ['foo']], ['foo' => $sender]);
+ $middleware = new SendMessageMiddleware($sendersLocator);
$sender->expects($this->never())->method('send');
@@ -220,7 +231,8 @@ public function testItDispatchesTheEventOneTime()
$sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
- $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender1, $sender2]]), $dispatcher);
+ $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo', 'bar']], ['foo' => $sender1, 'bar' => $sender2]);
+ $middleware = new SendMessageMiddleware($sendersLocator, $dispatcher);
$sender1->expects($this->once())->method('send')->willReturn($envelope);
$sender2->expects($this->once())->method('send')->willReturn($envelope);
@@ -235,7 +247,7 @@ public function testItDoesNotDispatchWithNoSenders()
$dispatcher = $this->createMock(EventDispatcherInterface::class);
$dispatcher->expects($this->never())->method('dispatch');
- $middleware = new SendMessageMiddleware(new SendersLocator([]), $dispatcher);
+ $middleware = new SendMessageMiddleware($this->createSendersLocator([], []), $dispatcher);
$middleware->handle($envelope, $this->getStackMock());
}
@@ -249,8 +261,11 @@ public function testItDoesNotDispatchOnRedeliver()
$dispatcher->expects($this->never())->method('dispatch');
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
+ $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
+ $sender2->expects($this->once())->method('send')->willReturn(new Envelope(new \stdClass()));
- $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]), $dispatcher);
+ $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender, 'foo_sender' => $sender2]);
+ $middleware = new SendMessageMiddleware($sendersLocator, $dispatcher);
$middleware->handle($envelope, $this->getStackMock(false));
}
@@ -263,9 +278,27 @@ public function testItHandlesWithForceCallHandlersStamp()
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender->expects($this->once())->method('send')->willReturn($envelope);
- $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
+ $sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender]);
+ $middleware = new SendMessageMiddleware($sendersLocator);
// next handler *should* be called
$middleware->handle($envelope, $this->getStackMock(true));
}
+
+ private function createSendersLocator(array $sendersMap, array $senders, array $sendAndHandle = [])
+ {
+ $container = $this->createMock(ContainerInterface::class);
+ $container->expects($this->any())
+ ->method('has')
+ ->willReturnCallback(function ($id) use ($senders) {
+ return isset($senders[$id]);
+ });
+ $container->expects($this->any())
+ ->method('get')
+ ->willReturnCallback(function ($id) use ($senders) {
+ return $senders[$id];
+ });
+
+ return new SendersLocator($sendersMap, $container, $sendAndHandle);
+ }
}
diff --git a/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php b/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php
index a6c8c8352404b..d1c201955d4fa 100644
--- a/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php
@@ -26,6 +26,16 @@ public function testIsRetryable()
$this->assertTrue($strategy->isRetryable($envelope));
}
+ public function testIsRetryableWithNullMax()
+ {
+ $strategy = new MultiplierRetryStrategy(null);
+ $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]);
+ $this->assertTrue($strategy->isRetryable($envelope));
+
+ $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(1, 'sender_alias')]);
+ $this->assertTrue($strategy->isRetryable($envelope));
+ }
+
public function testIsNotRetryable()
{
$strategy = new MultiplierRetryStrategy(3);
@@ -34,6 +44,13 @@ public function testIsNotRetryable()
$this->assertFalse($strategy->isRetryable($envelope));
}
+ public function testIsNotRetryableWithZeroMax()
+ {
+ $strategy = new MultiplierRetryStrategy(0);
+ $envelope = new Envelope(new \stdClass(), [new RedeliveryStamp(0, 'sender_alias')]);
+ $this->assertFalse($strategy->isRetryable($envelope));
+ }
+
public function testIsRetryableWithNoStamp()
{
$strategy = new MultiplierRetryStrategy(3);
diff --git a/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php
index 74eae64afa493..71b9efcf50ea0 100644
--- a/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php
+++ b/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php
@@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
+use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
@@ -19,10 +20,10 @@
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
-use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageHandlerFailingFirstTimes;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
+use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
use Symfony\Component\Messenger\Worker;
@@ -30,20 +31,15 @@ class RetryIntegrationTest extends TestCase
{
public function testRetryMechanism()
{
- $apiMessage = new DummyMessage('API');
+ $senderAndReceiver = new DummySenderAndReceiver();
- $receiver = $this->createMock(ReceiverInterface::class);
- $receiver->method('get')
- ->willReturn([
- new Envelope($apiMessage, [
- new SentStamp('Some\Sender', 'sender_alias'),
- ]),
- ]);
+ $senderLocator = $this->createMock(ContainerInterface::class);
+ $senderLocator->method('has')->with('sender_alias')->willReturn(true);
+ $senderLocator->method('get')->with('sender_alias')->willReturn($senderAndReceiver);
+ $senderLocator = new SendersLocator([DummyMessage::class => ['sender_alias']], $senderLocator);
- $senderLocator = new SendersLocator([], ['*' => true]);
-
- $handler = new DummyMessageHandlerFailingFirstTimes();
- $throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
+ $handler = new DummyMessageHandlerFailingFirstTimes(0, 'A');
+ $throwingHandler = new DummyMessageHandlerFailingFirstTimes(1, 'B');
$handlerLocator = new HandlersLocator([
DummyMessage::class => [
new HandlerDescriptor($handler, ['alias' => 'first']),
@@ -51,14 +47,54 @@ public function testRetryMechanism()
],
]);
+ // dispatch the message, which will get "sent" and then received by DummySenderAndReceiver
$bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]);
+ $envelope = new Envelope(new DummyMessage('API'));
+ $bus->dispatch($envelope);
- $worker = new Worker(['receiverName' => $receiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]);
- $worker->run([], function () use ($worker) {
- $worker->stop();
+ $worker = new Worker(['receiverName' => $senderAndReceiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]);
+ $worker->run([], function (?Envelope $envelope) use ($worker) {
+ if (null === $envelope) {
+ $worker->stop();
+ }
});
$this->assertSame(1, $handler->getTimesCalledWithoutThrowing());
$this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing());
}
}
+
+class DummySenderAndReceiver implements ReceiverInterface, SenderInterface
+{
+ private $messagesWaiting = [];
+
+ private $messagesReceived = [];
+
+ public function get(): iterable
+ {
+ $message = array_shift($this->messagesWaiting);
+
+ if (null === $message) {
+ return [];
+ }
+
+ $this->messagesReceived[] = $message;
+
+ return [$message];
+ }
+
+ public function ack(Envelope $envelope): void
+ {
+ }
+
+ public function reject(Envelope $envelope): void
+ {
+ }
+
+ public function send(Envelope $envelope): Envelope
+ {
+ $this->messagesWaiting[] = $envelope;
+
+ return $envelope;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php
index 43fe0c0b08112..6617abc8e5b31 100644
--- a/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php
@@ -16,22 +16,6 @@
class RedeliveryStampTest extends TestCase
{
- public function testShouldRedeliverToSenderWithAlias()
- {
- $stamp = new RedeliveryStamp(5, 'foo_alias');
-
- $this->assertFalse($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'bar_alias'));
- $this->assertTrue($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'foo_alias'));
- }
-
- public function testShouldRedeliverToSenderWithoutAlias()
- {
- $stampToRedeliverToSender1 = new RedeliveryStamp(5, 'App\Sender1');
-
- $this->assertTrue($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender1', null));
- $this->assertFalse($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender2', null));
- }
-
public function testGetters()
{
$stamp = new RedeliveryStamp(10, 'sender_alias');
diff --git a/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php b/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php
new file mode 100644
index 0000000000000..7639060daa178
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Stamp/SentToFailureTransportStampTest.php
@@ -0,0 +1,33 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Stamp;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Debug\Exception\FlattenException;
+use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
+
+class SentToFailureTransportStampTest extends TestCase
+{
+ public function testGetters()
+ {
+ $flattenException = new FlattenException();
+ $stamp = new SentToFailureTransportStamp(
+ 'exception message',
+ 'original_receiver',
+ $flattenException
+ );
+ $this->assertSame('exception message', $stamp->getExceptionMessage());
+ $this->assertSame('original_receiver', $stamp->getOriginalReceiverName());
+ $this->assertSame($flattenException, $stamp->getFlattenException());
+ $this->assertInstanceOf(\DateTime::class, $stamp->getSentAt());
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php
index 0507a0ccfa91e..79c99d5be3410 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineReceiverTest.php
@@ -12,9 +12,12 @@
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
+use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
+use Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceivedStamp;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
@@ -28,14 +31,26 @@ public function testItReturnsTheDecodedMessageToTheHandler()
{
$serializer = $this->createSerializer();
- $doctrineEnvelop = $this->createDoctrineEnvelope();
+ $doctrineEnvelope = $this->createDoctrineEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
- $connection->method('get')->willReturn($doctrineEnvelop);
+ $connection->method('get')->willReturn($doctrineEnvelope);
$receiver = new DoctrineReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get());
$this->assertCount(1, $actualEnvelopes);
+ /** @var Envelope $actualEnvelope */
+ $actualEnvelope = $actualEnvelopes[0];
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
+
+ /** @var DoctrineReceivedStamp $doctrineReceivedStamp */
+ $doctrineReceivedStamp = $actualEnvelope->last(DoctrineReceivedStamp::class);
+ $this->assertNotNull($doctrineReceivedStamp);
+ $this->assertSame('1', $doctrineReceivedStamp->getId());
+
+ /** @var TransportMessageIdStamp $transportMessageIdStamp */
+ $transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class);
+ $this->assertNotNull($transportMessageIdStamp);
+ $this->assertSame(1, $transportMessageIdStamp->getId());
}
/**
@@ -55,6 +70,34 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
iterator_to_array($receiver->get());
}
+ public function testAll()
+ {
+ $serializer = $this->createSerializer();
+
+ $doctrineEnvelope1 = $this->createDoctrineEnvelope();
+ $doctrineEnvelope2 = $this->createDoctrineEnvelope();
+ $connection = $this->createMock(Connection::class);
+ $connection->method('findAll')->with(50)->willReturn([$doctrineEnvelope1, $doctrineEnvelope2]);
+
+ $receiver = new DoctrineReceiver($connection, $serializer);
+ $actualEnvelopes = \iterator_to_array($receiver->all(50));
+ $this->assertCount(2, $actualEnvelopes);
+ $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
+ }
+
+ public function testFind()
+ {
+ $serializer = $this->createSerializer();
+
+ $doctrineEnvelope = $this->createDoctrineEnvelope();
+ $connection = $this->createMock(Connection::class);
+ $connection->method('find')->with(10)->willReturn($doctrineEnvelope);
+
+ $receiver = new DoctrineReceiver($connection, $serializer);
+ $actualEnvelope = $receiver->find(10);
+ $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
+ }
+
private function createDoctrineEnvelope()
{
return [
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineSenderTest.php
index 26badf93340f5..dcc90c67854b0 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineSenderTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineSenderTest.php
@@ -14,6 +14,7 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
+use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineSender;
@@ -29,13 +30,18 @@ public function testSend()
$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->getMock();
- $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers']);
+ $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'])->willReturn(15);
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$sender = new DoctrineSender($connection, $serializer);
- $sender->send($envelope);
+ $actualEnvelope = $sender->send($envelope);
+
+ /** @var TransportMessageIdStamp $transportMessageIdStamp */
+ $transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class);
+ $this->assertNotNull($transportMessageIdStamp);
+ $this->assertSame('15', $transportMessageIdStamp->getId());
}
public function testSendWithDelay()
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/SingleMessageReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/SingleMessageReceiverTest.php
new file mode 100644
index 0000000000000..a900e7ba0ff55
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/SingleMessageReceiverTest.php
@@ -0,0 +1,50 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Transport\Sender;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
+use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
+use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
+use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
+use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
+
+class SingleMessageReceiverTest extends TestCase
+{
+ public function testItReceivesOnlyOneMessage()
+ {
+ $innerReceiver = $this->createMock(ReceiverInterface::class);
+ $envelope = new Envelope(new \stdClass());
+
+ $receiver = new SingleMessageReceiver($innerReceiver, $envelope);
+ $received = \iterator_to_array($receiver->get());
+ $this->assertCount(1, $received);
+ $this->assertSame($received[0], $envelope);
+
+ $this->assertEmpty(\iterator_to_array($receiver->get()));
+ }
+
+ public function testCallsAreForwarded()
+ {
+ $envelope = new Envelope(new \stdClass());
+
+ $innerReceiver = $this->createMock(ReceiverInterface::class);
+ $innerReceiver->expects($this->once())->method('ack')->with($envelope);
+ $innerReceiver->expects($this->once())->method('reject')->with($envelope);
+
+ $receiver = new SingleMessageReceiver($innerReceiver, $envelope);
+ $receiver->ack($envelope);
+ $receiver->reject($envelope);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php
index 1de28332b388d..e085d721ef4cd 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/Sender/SendersLocatorTest.php
@@ -12,7 +12,9 @@
namespace Symfony\Component\Messenger\Tests\Transport\Sender;
use PHPUnit\Framework\TestCase;
+use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Exception\UnknownSenderException;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
@@ -23,21 +25,85 @@ class SendersLocatorTest extends TestCase
public function testItReturnsTheSenderBasedOnTheMessageClass()
{
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
+ $sendersLocator = $this->createContainer([
+ 'my_sender' => $sender,
+ ]);
$locator = new SendersLocator([
- DummyMessage::class => [$sender],
+ DummyMessage::class => ['my_sender'],
+ ], $sendersLocator);
+
+ $this->assertSame(['my_sender' => $sender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessage('a')))));
+ $this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
+ }
+
+ public function testGetSenderByAlias()
+ {
+ $sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
+ $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
+ $sendersLocator = $this->createContainer([
+ 'sender1' => $sender1,
+ 'sender2' => $sender2,
+ ]);
+
+ $locator = new SendersLocator([], $sendersLocator);
+
+ $this->assertSame($sender1, $locator->getSenderByAlias('sender1'));
+ $this->assertSame($sender2, $locator->getSenderByAlias('sender2'));
+ }
+
+ public function testGetSenderByAliasThrowsException()
+ {
+ $this->expectException(UnknownSenderException::class);
+ $this->expectExceptionMessage('Unknown sender alias');
+
+ $sender1 = $this->getMockBuilder(SenderInterface::class)->getMock();
+ $sendersLocator = $this->createContainer([
+ 'sender1' => $sender1,
]);
+ $locator = new SendersLocator([], $sendersLocator);
+ $locator->getSenderByAlias('sender2');
+ }
+
+ /**
+ * @group legacy
+ */
+ public function testItReturnsTheSenderBasedOnTheMessageClassLegacy()
+ {
+ $sender = $this->getMockBuilder(SenderInterface::class)->getMock();
+ $locator = new SendersLocator([
+ DummyMessage::class => [$sender],
+ ]);
$this->assertSame([$sender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessage('a')))));
$this->assertSame([], iterator_to_array($locator->getSenders(new Envelope(new SecondMessage()))));
}
- public function testItYieldsProvidedSenderAliasAsKey()
+ /**
+ * @group legacy
+ */
+ public function testItYieldsProvidedSenderAliasAsKeyLegacy()
{
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$locator = new SendersLocator([
DummyMessage::class => ['dummy' => $sender],
]);
-
$this->assertSame(['dummy' => $sender], iterator_to_array($locator->getSenders(new Envelope(new DummyMessage('a')))));
}
+
+ private function createContainer(array $senders)
+ {
+ $container = $this->createMock(ContainerInterface::class);
+ $container->expects($this->any())
+ ->method('has')
+ ->willReturnCallback(function($id) use ($senders) {
+ return isset($senders[$id]);
+ });
+ $container->expects($this->any())
+ ->method('get')
+ ->willReturnCallback(function($id) use ($senders) {
+ return $senders[$id];
+ });
+
+ return $container;
+ }
}
diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php
index e899f1cee8b75..46e8149f6c327 100644
--- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php
+++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php
@@ -97,7 +97,7 @@ public function testDispatchCausesRetry()
$this->assertNotNull($redeliveryStamp);
// retry count now at 1
$this->assertSame(1, $redeliveryStamp->getRetryCount());
- $this->assertTrue($redeliveryStamp->shouldRedeliverToSender('Some\Sender', 'sender_alias'));
+ $this->assertSame('sender_alias', $redeliveryStamp->getSenderClassOrAlias());
// received stamp is removed
$this->assertNull($envelope->last(ReceivedStamp::class));
diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
index 5bc0324489e64..d2d03c859cdeb 100644
--- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
+++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
@@ -110,7 +110,11 @@ public function reject(Envelope $envelope): void
*/
public function getMessageCount(): int
{
- return $this->connection->countMessagesInQueues();
+ try {
+ return $this->connection->countMessagesInQueues();
+ } catch (\AMQPException $exception) {
+ throw new TransportException($exception->getMessage(), 0, $exception);
+ }
}
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, string $queueName): void
diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php b/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
index 602ee04715c1e..3663ca1726126 100644
--- a/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
+++ b/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
@@ -102,9 +102,10 @@ public static function buildConfiguration($dsn, array $options = [])
/**
* @param int $delay The delay in milliseconds
*
+ * @return string The inserted id
* @throws \Doctrine\DBAL\DBALException
*/
- public function send(string $body, array $headers, int $delay = 0): void
+ public function send(string $body, array $headers, int $delay = 0): string
{
$now = new \DateTime();
$availableAt = (clone $now)->modify(sprintf('+%d seconds', $delay / 1000));
@@ -126,6 +127,8 @@ public function send(string $body, array $headers, int $delay = 0): void
':created_at' => self::formatDateTime($now),
':available_at' => self::formatDateTime($availableAt),
]);
+
+ return $this->driverConnection->lastInsertId();
}
public function get(): ?array
@@ -205,14 +208,42 @@ public function getMessageCount(): int
return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchColumn();
}
+ public function findAll(int $limit = null): array
+ {
+ if ($this->configuration['auto_setup']) {
+ $this->setup();
+ }
+
+ $queryBuilder = $this->createAvailableMessagesQueryBuilder();
+ if (null !== $limit) {
+ $queryBuilder->setMaxResults($limit);
+ }
+
+ return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchAll();
+ }
+
+ public function find($id): ?array
+ {
+ if ($this->configuration['auto_setup']) {
+ $this->setup();
+ }
+
+ $queryBuilder = $this->createQueryBuilder()
+ ->where('m.id = :id');
+
+ $data = $this->executeQuery($queryBuilder->getSQL(), [
+ 'id' => $id,
+ ])->fetch();
+
+ return false === $data ? null : $data;
+ }
+
private function createAvailableMessagesQueryBuilder(): QueryBuilder
{
$now = new \DateTime();
$redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
- return $this->driverConnection->createQueryBuilder()
- ->select('m.*')
- ->from($this->configuration['table_name'], 'm')
+ return $this->createQueryBuilder()
->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit')
->andWhere('m.available_at <= :now')
->andWhere('m.queue_name = :queue_name')
@@ -223,6 +254,13 @@ private function createAvailableMessagesQueryBuilder(): QueryBuilder
]);
}
+ private function createQueryBuilder(): QueryBuilder
+ {
+ return $this->driverConnection->createQueryBuilder()
+ ->select('m.*')
+ ->from($this->configuration['table_name'], 'm');
+ }
+
private function executeQuery(string $sql, array $parameters = [])
{
$stmt = null;
diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php
index 65f2eacdc89b4..aad8cf236d5aa 100644
--- a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php
+++ b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineReceiver.php
@@ -16,6 +16,8 @@
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
+use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
+use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -26,7 +28,7 @@
*
* @experimental in 4.3
*/
-class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
+class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
{
private $connection;
private $serializer;
@@ -52,18 +54,7 @@ public function get(): iterable
return [];
}
- try {
- $envelope = $this->serializer->decode([
- 'body' => $doctrineEnvelope['body'],
- 'headers' => $doctrineEnvelope['headers'],
- ]);
- } catch (MessageDecodingFailedException $exception) {
- $this->connection->reject($doctrineEnvelope['id']);
-
- throw $exception;
- }
-
- yield $envelope->with(new DoctrineReceivedStamp($doctrineEnvelope['id']));
+ yield $this->createEnvelopeFromData($doctrineEnvelope);
}
/**
@@ -71,7 +62,11 @@ public function get(): iterable
*/
public function ack(Envelope $envelope): void
{
- $this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId());
+ try {
+ $this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId());
+ } catch (DBALException $exception) {
+ throw new TransportException($exception->getMessage(), 0, $exception);
+ }
}
/**
@@ -79,7 +74,11 @@ public function ack(Envelope $envelope): void
*/
public function reject(Envelope $envelope): void
{
- $this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
+ try {
+ $this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
+ } catch (DBALException $exception) {
+ throw new TransportException($exception->getMessage(), 0, $exception);
+ }
}
/**
@@ -87,7 +86,45 @@ public function reject(Envelope $envelope): void
*/
public function getMessageCount(): int
{
- return $this->connection->getMessageCount();
+ try {
+ return $this->connection->getMessageCount();
+ } catch (DBALException $exception) {
+ throw new TransportException($exception->getMessage(), 0, $exception);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function all(int $limit = null): iterable
+ {
+ try {
+ $doctrineEnvelopes = $this->connection->findAll($limit);
+ } catch (DBALException $exception) {
+ throw new TransportException($exception->getMessage(), 0, $exception);
+ }
+
+ foreach ($doctrineEnvelopes as $doctrineEnvelope) {
+ yield $this->createEnvelopeFromData($doctrineEnvelope);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function find($id): ?Envelope
+ {
+ try {
+ $doctrineEnvelope = $this->connection->find($id);
+ } catch (DBALException $exception) {
+ throw new TransportException($exception->getMessage(), 0, $exception);
+ }
+
+ if (null === $doctrineEnvelope) {
+ return null;
+ }
+
+ return $this->createEnvelopeFromData($doctrineEnvelope);
}
private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp
@@ -101,4 +138,23 @@ private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceived
return $doctrineReceivedStamp;
}
+
+ private function createEnvelopeFromData(array $data): Envelope
+ {
+ try {
+ $envelope = $this->serializer->decode([
+ 'body' => $data['body'],
+ 'headers' => $data['headers'],
+ ]);
+ } catch (MessageDecodingFailedException $exception) {
+ $this->connection->reject($data['id']);
+
+ throw $exception;
+ }
+
+ return $envelope->with(
+ new DoctrineReceivedStamp($data['id']),
+ new TransportMessageIdStamp($data['id'])
+ );
+ }
}
diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineSender.php b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineSender.php
index 8329d2a53f1bd..e90a7f7e1d112 100644
--- a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineSender.php
+++ b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineSender.php
@@ -15,6 +15,7 @@
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
+use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -47,11 +48,11 @@ public function send(Envelope $envelope): Envelope
$delay = null !== $delayStamp ? $delayStamp->getDelay() : 0;
try {
- $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
+ $id = $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
- return $envelope;
+ return $envelope->with(new TransportMessageIdStamp($id));
}
}
diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php
index 09d9d5cadfa95..1e0db72e76a8d 100644
--- a/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php
+++ b/src/Symfony/Component/Messenger/Transport/Doctrine/DoctrineTransport.php
@@ -12,6 +12,8 @@
namespace Symfony\Component\Messenger\Transport\Doctrine;
use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
+use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
@@ -21,7 +23,7 @@
*
* @experimental in 4.3
*/
-class DoctrineTransport implements TransportInterface, SetupableTransportInterface
+class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface
{
private $connection;
private $serializer;
@@ -58,6 +60,31 @@ public function reject(Envelope $envelope): void
($this->receiver ?? $this->getReceiver())->reject($envelope);
}
+ /**
+ * {@inheritdoc}
+ */
+ public function getMessageCount(): int
+ {
+ return ($this->receiver ?? $this->getReceiver())->getMessageCount();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function all(int $limit = null): iterable
+ {
+ return ($this->receiver ?? $this->getReceiver())->all($limit);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function find($id): ?Envelope
+ {
+ return ($this->receiver ?? $this->getReceiver())->find($id);
+ }
+
+
/**
* {@inheritdoc}
*/
diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/ListableReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/ListableReceiverInterface.php
new file mode 100644
index 0000000000000..b4c19295a9318
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/Receiver/ListableReceiverInterface.php
@@ -0,0 +1,33 @@
+
+ *
+ * @experimental in 4.3
+ */
+interface ListableReceiverInterface extends ReceiverInterface
+{
+ /**
+ * Returns all the messages (up to the limit) in this receiver.
+ *
+ * Messages should be given the same stamps as when using ReceiverInterface::get().
+ *
+ * @return Envelope[]|iterable
+ */
+ public function all(int $limit = null): iterable;
+
+ /**
+ * Returns the Envelope by id or none.
+ *
+ * Message should be given the same stamps as when using ReceiverInterface::get().
+ */
+ public function find($id): ?Envelope;
+}
diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php
index 38f9b9ff35a64..6393c0d8c16a0 100644
--- a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php
+++ b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php
@@ -35,6 +35,8 @@ interface ReceiverInterface
* call to get() takes a long time, blocking other receivers from
* being called.
*
+ * If applicable, the Envelope should contain a TransportMessageIdStamp.
+ *
* If a received message cannot be decoded, the message should not
* be retried again (e.g. if there's a queue, it should be removed)
* and a MessageDecodingFailedException should be thrown.
diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/SingleMessageReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/SingleMessageReceiver.php
new file mode 100644
index 0000000000000..9aec49d6c0af5
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/Receiver/SingleMessageReceiver.php
@@ -0,0 +1,47 @@
+
+ *
+ * @internal
+ * @experimental in 4.3
+ */
+class SingleMessageReceiver implements ReceiverInterface
+{
+ private $receiver;
+ private $envelope;
+ private $hasReceived = false;
+
+ public function __construct(ReceiverInterface $receiver, Envelope $envelope)
+ {
+ $this->receiver = $receiver;
+ $this->envelope = $envelope;
+ }
+
+ public function get(): iterable
+ {
+ if ($this->hasReceived) {
+ return [];
+ }
+
+ $this->hasReceived = true;
+
+ yield $this->envelope;
+ }
+
+ public function ack(Envelope $envelope): void
+ {
+ $this->receiver->ack($envelope);
+ }
+
+ public function reject(Envelope $envelope): void
+ {
+ $this->receiver->reject($envelope);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Transport/Sender/SenderInterface.php b/src/Symfony/Component/Messenger/Transport/Sender/SenderInterface.php
index f526a413e5ba2..719442188ffbb 100644
--- a/src/Symfony/Component/Messenger/Transport/Sender/SenderInterface.php
+++ b/src/Symfony/Component/Messenger/Transport/Sender/SenderInterface.php
@@ -25,6 +25,8 @@ interface SenderInterface
*
* The sender can read different stamps for transport configuration,
* like delivery delay.
+ *
+ * If applicable, the returned Envelope should contain a TransportMessageIdStamp.
*/
public function send(Envelope $envelope): Envelope;
}
diff --git a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php
index 60dde8fb53446..80a01c291a99b 100644
--- a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php
+++ b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php
@@ -11,7 +11,11 @@
namespace Symfony\Component\Messenger\Transport\Sender;
+use Psr\Container\ContainerInterface;
+use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Exception\RuntimeException;
+use Symfony\Component\Messenger\Exception\UnknownSenderException;
use Symfony\Component\Messenger\Handler\HandlersLocator;
/**
@@ -23,17 +27,30 @@
*/
class SendersLocator implements SendersLocatorInterface
{
- private $senders;
+ private $sendersMap;
+ private $sendersLocator;
+ private $useLegacyLookup = false;
private $sendAndHandle;
/**
- * @param SenderInterface[][] $senders
- * @param bool[] $sendAndHandle
+ * @param string[][] $sendersMap An array, keyed by "type", set to an array of sender aliases
+ * @param ContainerInterface $sendersLocator Locator of senders, keyed by sender alias
+ * @param bool[] $sendAndHandle
*/
- public function __construct(array $senders, array $sendAndHandle = [])
+ public function __construct(array $sendersMap, /*ContainerInterface*/ $sendersLocator = null, array $sendAndHandle = [])
{
- $this->senders = $senders;
- $this->sendAndHandle = $sendAndHandle;
+ $this->sendersMap = $sendersMap;
+
+ if (is_array($sendersLocator) || null === $sendersLocator) {
+ @trigger_error(sprintf('"%s::__construct()" requires a "%s" as 2nd argument. Not doing so is deprecated since Symfony 4.3 and will be required in 5.0.', __CLASS__, ContainerInterface::class), E_USER_DEPRECATED);
+ // "%s" requires a "%s" as 2nd argument. Not doing so is deprecated since Symfony 4.3 and will be required in 5.0.'
+ $this->sendersLocator = new ServiceLocator([]);
+ $this->sendAndHandle = $sendersLocator;
+ $this->useLegacyLookup = true;
+ } else {
+ $this->sendersLocator = $sendersLocator;
+ $this->sendAndHandle = $sendAndHandle;
+ }
}
/**
@@ -46,14 +63,43 @@ public function getSenders(Envelope $envelope, ?bool &$handle = false): iterable
$seen = [];
foreach (HandlersLocator::listTypes($envelope) as $type) {
- foreach ($this->senders[$type] ?? [] as $alias => $sender) {
- if (!\in_array($sender, $seen, true)) {
- yield $alias => $seen[] = $sender;
+ // the old way of looking up senders
+ if ($this->useLegacyLookup) {
+ foreach ($this->sendersMap[$type] ?? [] as $alias => $sender) {
+ if (!\in_array($sender, $seen, true)) {
+ yield $alias => $seen[] = $sender;
+ }
+ }
+
+ $handle = $handle ?: $this->sendAndHandle[$type] ?? false;
+
+ continue;
+ }
+
+ foreach ($this->sendersMap[$type] ?? [] as $senderAlias) {
+ if (!\in_array($senderAlias, $seen, true)) {
+ if (!$this->sendersLocator->has($senderAlias)) {
+ throw new RuntimeException(sprintf('Invalid senders configuration: sender "%s" is not in the senders locator.', $senderAlias));
+ }
+
+ $seen[] = $senderAlias;
+ $sender = $this->sendersLocator->get($senderAlias);
+ yield $senderAlias => $sender;
}
}
+
$handle = $handle ?: $this->sendAndHandle[$type] ?? false;
}
$handle = $handle || null === $sender;
}
+
+ public function getSenderByAlias(string $alias): SenderInterface
+ {
+ if ($this->sendersLocator->has($alias)) {
+ return $this->sendersLocator->get($alias);
+ }
+
+ throw new UnknownSenderException(sprintf('Unknown sender alias "%s"', $alias));
+ }
}
diff --git a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php
index e802bebff4deb..1bcb8715f55d1 100644
--- a/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php
+++ b/src/Symfony/Component/Messenger/Transport/Sender/SendersLocatorInterface.php
@@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Sender;
use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Exception\UnknownSenderException;
/**
* Maps a message to a list of senders.
@@ -32,4 +33,13 @@ interface SendersLocatorInterface
* @return iterable|SenderInterface[] Indexed by sender alias if available
*/
public function getSenders(Envelope $envelope, ?bool &$handle = false): iterable;
+
+ /**
+ * Returns a specific sender by its alias.
+ *
+ * @param string $alias The alias given to the sender in getSenders()
+ *
+ * @throws UnknownSenderException If the sender is not found
+ */
+ public function getSenderByAlias(string $alias): SenderInterface;
}
diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php
index cf3a5f68951c4..44ff30a7297c4 100644
--- a/src/Symfony/Component/Messenger/Worker.php
+++ b/src/Symfony/Component/Messenger/Worker.php
@@ -47,7 +47,7 @@ class Worker implements WorkerInterface
* @param ReceiverInterface[] $receivers Where the key is the transport name
* @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match)
*/
- public function __construct(array $receivers, MessageBusInterface $bus, $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
+ public function __construct(array $receivers, MessageBusInterface $bus, array $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
{
$this->receivers = $receivers;
$this->bus = $bus;
@@ -114,9 +114,14 @@ public function run(array $options = [], callable $onHandledCallback = null): vo
$this->dispatchEvent(new WorkerStoppedEvent());
}
- private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy)
+ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName, ?RetryStrategyInterface $retryStrategy): void
{
- $this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $transportName));
+ $event = new WorkerMessageReceivedEvent($envelope, $transportName);
+ $this->dispatchEvent($event);
+
+ if (!$event->shouldHandle()) {
+ return;
+ }
$message = $envelope->getMessage();
$context = [
@@ -148,7 +153,7 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
// add the delay and retry stamp info + remove ReceivedStamp
$retryEnvelope = $envelope->with(new DelayStamp($retryStrategy->getWaitingTime($envelope)))
- ->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
+ ->with(new RedeliveryStamp($retryCount, $this->getSenderClassOrAlias($envelope)))
->withoutAll(ReceivedStamp::class);
// re-send the message
@@ -199,6 +204,15 @@ private function shouldRetry(\Throwable $e, Envelope $envelope, ?RetryStrategyIn
return false;
}
+ $sentStamp = $envelope->last(SentStamp::class);
+ if (null === $sentStamp) {
+ if (null !== $this->logger) {
+ $this->logger->warning('Message will not be retried because the SentStamp is missing and so the target sender cannot be determined.');
+ }
+
+ return false;
+ }
+
return $retryStrategy->isRetryable($envelope);
}
@@ -210,11 +224,16 @@ private function getRetryCount(Envelope $envelope): int
return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0;
}
- private function getSenderAlias(Envelope $envelope): ?string
+ private function getSenderClassOrAlias(Envelope $envelope): string
{
/** @var SentStamp|null $sentStamp */
$sentStamp = $envelope->last(SentStamp::class);
- return $sentStamp ? $sentStamp->getSenderAlias() : null;
+ if (null === $sentStamp) {
+ // should not happen, because of the check in shouldRetry()
+ throw new LogicException('Could not find SentStamp.');
+ }
+
+ return $sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass();
}
}
diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json
index 02d25efd5fc4f..4b552c96437ba 100644
--- a/src/Symfony/Component/Messenger/composer.json
+++ b/src/Symfony/Component/Messenger/composer.json
@@ -23,6 +23,7 @@
"doctrine/dbal": "^2.5",
"psr/cache": "~1.0",
"symfony/console": "~3.4|~4.0",
+ "symfony/debug": "~4.1",
"symfony/dependency-injection": "~3.4.19|^4.1.8",
"symfony/doctrine-bridge": "~3.4|~4.0",
"symfony/event-dispatcher": "~4.3",
@@ -35,7 +36,8 @@
"symfony/var-dumper": "~3.4|~4.0"
},
"conflict": {
- "symfony/event-dispatcher": "<4.3"
+ "symfony/event-dispatcher": "<4.3",
+ "symfony/debug": "<4.1"
},
"suggest": {
"enqueue/messenger-adapter": "For using the php-enqueue library as a transport."