10000 [Messenger] prevent infinite redelivery loops and blocked queues · symfony/symfony@2269ead · GitHub
[go: up one dir, main page]

Skip to content

Commit 2269ead

Browse files
committed
[Messenger] prevent infinite redelivery loops and blocked queues
by republishing the redelivered messages as retries with a retry limit and potential delay
1 parent b9f6944 commit 2269ead

File tree

5 files changed

+85
-3
lines changed

5 files changed

+85
-3
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,6 +1649,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16491649
$defaultMiddleware = [
16501650
'before' => [
16511651
['id' => 'add_bus_name_stamp_middleware'],
1652+
['id' => 'reject_redelivered_message_middleware'],
16521653
['id' => 'dispatch_after_current_bus'],
16531654
['id' => 'failed_message_processing_middleware'],
16541655
],

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
<argument type="service" id="validator" />
4949
</service>
5050

51+
<service id="messenger.middleware.reject_redelivered_message_middleware" class="Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware" />
52+
5153
<service id="messenger.middleware.failed_message_processing_middleware" class="Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware" />
5254

5355
<service id="messenger.middleware.traceable" class="Symfony\Component\Messenger\Middleware\TraceableMiddleware" abstract="true">
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Exception;
13+
14+
/**
15+
* @author Tobias Schultze <http://tobion.de>
16+
*
17+
* @experimental in 4.3
18+
*/
19+
class RejectRedeliveredMessageException extends RuntimeException
20+
{
21+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
16+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
17+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
18+
19+
/**
20+
* Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP.
21+
*
22+
* The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly.
23+
* The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy.
24+
*
25+
* AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out
26+
* or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the
27+
* redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent
28+
* infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry
29+
* limit and potential delay.
30+
*
31+
* @experimental in 4.3
32+
*
33+
* @author Tobias Schultze <http://tobion.de>
34+
*/
35+
class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
36+
{
37+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
38+
{
39+
// ignore the dispatched messages for retry
40+
if (null !== $envelope->last(ReceivedStamp::class)) {
41+
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
42+
43+
if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
44+
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
45+
}
46+
}
47+
48+
return $stack->next()->handle($envelope, $stack);
49+
}
50+
}

src/Symfony/Component/Messenger/Worker.php

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
1919
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
2020
use Symfony\Component\Messenger\Exception\HandlerFailedException;
21+
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
2122
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
2223
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
2324
use Symfony\Component\Messenger\Stamp\DelayStamp;
@@ -135,6 +136,13 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
135136
try {
136137
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
137138
} catch (\Throwable $throwable) {
139+
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
140+
if ($rejectFirst) {
141+
// redelivered messages are rejected first so that continous failures in an event listener or while
142+
// publishing for retry does not cause infinite redelivery loops
143+
$receiver->reject($envelope);
144+
}
145+
138146
if ($throwable instanceof HandlerFailedException) {
139147
$envelope = $throwable->getEnvelope();
140148
}
@@ -156,15 +164,15 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
156164
->with(new RedeliveryStamp($retryCount, $transportName))
157165
->withoutAll(ReceivedStamp::class);
158166

159-
// re-send the message
167+
// re-send the message for retry
160168
$this->bus->dispatch($retryEnvelope);
161-
// acknowledge the previous message has received
162-
$receiver->ack($envelope);
163169
} else {
164170
if (null !== $this->logger) {
165171
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
166172
}
173+
}
167174

175+
if (!$rejectFirst) {
168176
$receiver->reject($envelope);
169177
}
170178

0 commit comments

Comments
 (0)
0