10000 [Messenger] prevent infinite redelivery loops and blocked queues · symfony/symfony@a00b354 · GitHub
[go: up one dir, main page]

Skip to content

Commit a00b354

Browse files
committed
[Messenger] prevent infinite redelivery loops and blocked queues
by republishing the redelivered messages as retries with a retry limit and potential delay
1 parent b9f6944 commit a00b354

File tree

8 files changed

+90
-6
lines changed

8 files changed

+90
-6
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,6 +1649,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16491649
$defaultMiddleware = [
16501650
'before' => [
16511651
['id' => 'add_bus_name_stamp_middleware'],
1652+
['id' => 'reject_redelivered_message_middleware'],
16521653
['id' => 'dispatch_after_current_bus'],
16531654
['id' => 'failed_message_processing_middleware'],
16541655
],

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
<argument type="service" id="validator" />
4949
</service>
5050

51+
<service id="messenger.middleware.reject_redelivered_message_middleware" class="Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware" />
52+
5153
<service id="messenger.middleware.failed_message_processing_middleware" class="Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware" />
5254

5355
<service id="messenger.middleware.traceable" class="Symfony\Component\Messenger\Middleware\TraceableMiddleware" abstract="true">

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,7 @@ public function testMessengerWithMultipleBuses()
739739
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
740740
$this->assertEquals([
741741
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
742+
['id' => 'reject_redelivered_message_middleware'],
742743
['id' => 'dispatch_after_current_bus'],
743744
['id' => 'failed_message_processing_middleware'],
744745
['id' => 'send_message'],
@@ -748,6 +749,7 @@ public function testMessengerWithMultipleBuses()
748749
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
749750
$this->assertEquals([
750751
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
752+
['id' => 'reject_redelivered_message_middleware'],
751753
['id' => 'dispatch_after_current_bus'],
752754
['id' => 'failed_message_processing_middleware'],
753755
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],

src/Symfony/Bundle/FrameworkBundle/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
"symfony/dotenv": "<4.2",
7474
"symfony/dom-crawler": "<4.3",
7575
"symfony/form": "<4.3",
76-
"symfony/messenger": "<4.3",
76+
"symfony/messenger": "<4.3.6",
7777
"symfony/property-info": "<3.4",
7878
"symfony/serializer": "<4.2",
7979
"symfony/stopwatch": "<3.4",
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Exception;
13+
14+
/**
15+
* @author Tobias Schultze <http://tobion.de>
16+
*
17+
* @experimental in 4.3
18+
*/
19+
class RejectRedeliveredMessageException extends RuntimeException
20+
{
21+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
16+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
17+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
18+
19+
/**
20+
* Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP.
21+
*
22+
* The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly.
23+
* The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy.
24+
*
25+
* AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out
26+
* or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the
27+
* redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent
28+
* infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry
29+
* limit and potential delay.
30+
*
31+
* @experimental in 4.3
32+
*
33+
* @author Tobias Schultze <http://tobion.de>
34+
*/
35+
class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
36+
{
37+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
38+
{
39+
// ignore the dispatched messages for retry
40+
if (null !== $envelope->last(ReceivedStamp::class)) {
41+
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
42+
43+
if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
44+
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
45+
}
46+
}
47+
48+
return $stack->next()->handle($envelope, $stack);
49+
}
50+
}

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ public function testDispatchCausesRetry()
118118
}
119119
});
120120

121-
// old message acknowledged
122-
$this->assertSame(1, $receiver->getAcknowledgeCount());
121+
// old message rejected
122+
$this->assertSame(1, $receiver->getRejectCount());
123123
}
124124

125125
public function testUnrecoverableMessageHandlingExceptionPreventsRetries()

src/Symfony/Component/Messenger/Worker.php

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
1919
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
2020
use Symfony\Component\Messenger\Exception\HandlerFailedException;
21+
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
2122
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
2223
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
2324
use Symfony\Component\Messenger\Stamp\DelayStamp;
@@ -135,6 +136,13 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
135136
try {
136137
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
137138
} catch (\Throwable $throwable) {
139+
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
140+
if ($rejectFirst) {
141+
// redelivered messages are rejected first so that continuous failures in an event listener or while
142+
// publishing for retry does not cause infinite redelivery loops
143+
$receiver->reject($envelope);
144+
}
145+
138146
if ($throwable instanceof HandlerFailedException) {
139147
$envelope = $throwable->getEnvelope();
140148
}
@@ -156,15 +164,15 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
156164
->with(new RedeliveryStamp($retryCount, $transportName))
157165
->withoutAll(ReceivedStamp::class);
158166

159-
// re-send the message
167+
// re-send the message for retry
160168
$this->bus->dispatch($retryEnvelope);
161-
// acknowledge the previous message has received
162-
$receiver->ack($envelope);
163169
} else {
164170
if (null !== $this->logger) {
165171
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
166172
}
173+
}
167174

175+
if (!$rejectFirst) {
168176
$receiver->reject($envelope);
169177
}
170178

0 commit comments

Comments
 (0)
0