8000 [Messenger] prevent infinite redelivery loops and blocked queues by Tobion · Pull Request #34107 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] prevent infinite redelivery loops and blocked queues #34107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$defaultMiddleware = [
'before' => [
['id' => 'add_bus_name_stamp_middleware'],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
<argument type="service" id="validator" />
</service>

<service id="messenger.middleware.reject_redelivered_message_middleware" class="Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware" />

<service id="messenger.middleware.failed_message_processing_middleware" class="Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware" />

<service id="messenger.middleware.traceable" class="Symfony\Component\Messenger\Middleware\TraceableMiddleware" abstract="true">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ public function testMessengerWithMultipleBuses()
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'send_message'],
Expand All @@ -748,6 +749,7 @@ public function testMessengerWithMultipleBuses()
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
Expand Down
4 changes: 2 additions & 2 deletions src/Symfony/Bundle/FrameworkBundle/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"symfony/expression-language": "~3.4|~4.0",
"symfony/http-client": "^4.3",
"symfony/mailer": "^4.3",
"symfony/messenger": "^4.3",
"symfony/messenger": "^4.3.6",
"symfony/mime": "^4.3",
"symfony/process": "~3.4|~4.0",
"symfony/security-csrf": "~3.4|~4.0",
Expand Down Expand Up @@ -73,7 +73,7 @@
"symfony/dotenv": "<4.2",
"symfony/dom-crawler": "<4.3",
"symfony/form": "<4.3",
"symfony/messenger": "<4.3",
"symfony/messenger": "<4.3.6",
"symfony/property-info": "<3.4",
"symfony/serializer": "<4.2",
"symfony/stopwatch": "<3.4",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Exception;

/**
* @author Tobias Schultze <http://tobion.de>
*
* @experimental in 4.3
*/
class RejectRedeliveredMessageException extends RuntimeException
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Middleware;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;

/**
* Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP.
*
* The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly.
* The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy.
*
* AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out
* or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the
* redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent
* infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry
* limit and potential delay.
*
* @experimental in 4.3
*
* @author Tobias Schultze <http://tobion.de>
*/
class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
// ignore the dispatched messages for retry
if (null !== $envelope->last(ReceivedStamp::class)) {
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);

if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
}
}

return $stack->next()->handle($envelope, $stack);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish there were a way to only run this if at least one Amqp transport were configured, but I know that’s not possible (so not a blocker or real feedback). It makes me wish each transport was actually its own package...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said in #34107 (comment), we might want to enhance the middleware later to handle other transports like doctrine.

}
4 changes: 2 additions & 2 deletions src/Symfony/Component/Messenger/Tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ public function testDispatchCausesRetry()
}
});

// old message acknowledged
$this->assertSame(1, $receiver->getAcknowledgeCount());
// old message rejected
$this->assertSame(1, $receiver->getRejectCount());
}

public function testUnrecoverableMessageHandlingExceptionPreventsRetries()
Expand Down
14 changes: 11 additions & 3 deletions src/Symfony/Component/Messenger/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
Expand Down Expand Up @@ -135,6 +136,13 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
} catch (\Throwable $throwable) {
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that the BIG important thing here is: if a message is redelivered, don't actually try to handle it again (because it'll probably timeout again). Instead, skip handling and have it be redelivered.

Question then: to fix this bug, is it actually important to reject this message before sending the redelivery? If we manage to skip the AMQP-redelivered message from being handled, isn't it ok if we follow the normal messenger-redelivery logic (redeliver the message and then reject it)? Or am I missing something?

If rejecting it first is not actually important, we could simplify this patch considerably.

thx :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rejecting redelivered messages first is important. As I described in #32055 (comment) and #34082, the repeating errors can happen inside the listeners or the retry publishing (anywhere inside the catch block). In this case, the message never gets rejected (as the exception happens before) and would be redelivered forever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also comment two lines below

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, ok. To make it feel like less of a one-off solution, wdyt about a RejectMessageImmediatelyExceptionInterface that we actually look for (then the class implements this)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only want to reject redelivered messages first and not others to prevent potential loss of messages. So the interface would suggest something we don't want to promote. Also the RejectRedeliveredMessageException is not specific to amqp (hence the generic name) and we could use it also for doctrine and other transports. But for doctrine it's not that important as it has a timer in

that prevents the queue-blocking (but it does not prevent the potential redelivery loop)

if ($rejectFirst) {
// redelivered messages are rejected first so that continuous failures in an event listener or while
// publishing for retry does not cause infinite redelivery loops
$receiver->reject($envelope);
}

if ($throwable instanceof HandlerFailedException) {
$envelope = $throwable->getEnvelope();
}
Expand All @@ -156,15 +164,15 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
->with(new RedeliveryStamp($retryCount, $transportName))
->withoutAll(ReceivedStamp::class);

// re-send the message
// re-send the message for retry
$this->bus->dispatch($retryEnvelope);
// acknowledge the previous message has received
$receiver->ack($envelope);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to reject and moved it outside the if else. Reject is more appropriate. Technically ack and reject are the same. There is only a semantical difference. Here we are handling a failure case and whether we retry or not does not change this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that seems reasonable.

} else {
if (null !== $this->logger) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}
}

if (!$rejectFirst) {
$receiver->reject($envelope);
}

Expand Down
0