diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
index 8cb3fdd242cd..3fd5ff90286b 100644
--- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
@@ -1649,6 +1649,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$defaultMiddleware = [
'before' => [
['id' => 'add_bus_name_stamp_middleware'],
+ ['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
],
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
index 5ef47e751efd..f50b613dc785 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
@@ -48,6 +48,8 @@
+
+
diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
index c1824ecafead..5f2e007ddbcd 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
@@ -739,6 +739,7 @@ public function testMessengerWithMultipleBuses()
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
+ ['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'send_message'],
@@ -748,6 +749,7 @@ public function testMessengerWithMultipleBuses()
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
+ ['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
diff --git a/src/Symfony/Bundle/FrameworkBundle/composer.json b/src/Symfony/Bundle/FrameworkBundle/composer.json
index 8854a78d1419..57c344f68f22 100644
--- a/src/Symfony/Bundle/FrameworkBundle/composer.json
+++ b/src/Symfony/Bundle/FrameworkBundle/composer.json
@@ -42,7 +42,7 @@
"symfony/expression-language": "~3.4|~4.0",
"symfony/http-client": "^4.3",
"symfony/mailer": "^4.3",
- "symfony/messenger": "^4.3",
+ "symfony/messenger": "^4.3.6",
"symfony/mime": "^4.3",
"symfony/process": "~3.4|~4.0",
"symfony/security-csrf": "~3.4|~4.0",
@@ -73,7 +73,7 @@
"symfony/dotenv": "<4.2",
"symfony/dom-crawler": "<4.3",
"symfony/form": "<4.3",
- "symfony/messenger": "<4.3",
+ "symfony/messenger": "<4.3.6",
"symfony/property-info": "<3.4",
"symfony/serializer": "<4.2",
"symfony/stopwatch": "<3.4",
diff --git a/src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php b/src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php
new file mode 100644
index 000000000000..79b94fa2656f
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php
@@ -0,0 +1,21 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Exception;
+
+/**
+ * @author Tobias Schultze
+ *
+ * @experimental in 4.3
+ */
+class RejectRedeliveredMessageException extends RuntimeException
+{
+}
diff --git a/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php
new file mode 100644
index 000000000000..2c6e6b6ff718
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php
@@ -0,0 +1,50 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Middleware;
+
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
+use Symfony\Component\Messenger\Stamp\ReceivedStamp;
+use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
+
+/**
+ * Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP.
+ *
+ * The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly.
+ * The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy.
+ *
+ * AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out
+ * or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the
+ * redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent
+ * infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry
+ * limit and potential delay.
+ *
+ * @experimental in 4.3
+ *
+ * @author Tobias Schultze
+ */
+class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
+{
+ public function handle(Envelope $envelope, StackInterface $stack): Envelope
+ {
+ // ignore the dispatched messages for retry
+ if (null !== $envelope->last(ReceivedStamp::class)) {
+ $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
+
+ if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
+ throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
+ }
+ }
+
+ return $stack->next()->handle($envelope, $stack);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php
index ad7477253e86..9a09c0a04a33 100644
--- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php
+++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php
@@ -118,8 +118,8 @@ public function testDispatchCausesRetry()
}
});
- // old message acknowledged
- $this->assertSame(1, $receiver->getAcknowledgeCount());
+ // old message rejected
+ $this->assertSame(1, $receiver->getRejectCount());
}
public function testUnrecoverableMessageHandlingExceptionPreventsRetries()
diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php
index 398124e99491..6205baefd43b 100644
--- a/src/Symfony/Component/Messenger/Worker.php
+++ b/src/Symfony/Component/Messenger/Worker.php
@@ -18,6 +18,7 @@
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
+use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
@@ -135,6 +136,13 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
} catch (\Throwable $throwable) {
+ $rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
+ if ($rejectFirst) {
+ // redelivered messages are rejected first so that continuous failures in an event listener or while
+ // publishing for retry does not cause infinite redelivery loops
+ $receiver->reject($envelope);
+ }
+
if ($throwable instanceof HandlerFailedException) {
$envelope = $throwable->getEnvelope();
}
@@ -156,15 +164,15 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
->with(new RedeliveryStamp($retryCount, $transportName))
->withoutAll(ReceivedStamp::class);
- // re-send the message
+ // re-send the message for retry
$this->bus->dispatch($retryEnvelope);
- // acknowledge the previous message has received
- $receiver->ack($envelope);
} else {
if (null !== $this->logger) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}
+ }
+ if (!$rejectFirst) {
$receiver->reject($envelope);
}