-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] On failure retry, make message appear received from original sender #31425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,9 +16,9 @@ | |
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; | ||
use Symfony\Component\Messenger\Exception\HandlerFailedException; | ||
use Symfony\Component\Messenger\MessageBusInterface; | ||
use Symfony\Component\Messenger\Stamp\DelayStamp; | ||
use Symfony\Component\Messenger\Stamp\ReceivedStamp; | ||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp; | ||
use Symfony\Component\Messenger\Stamp\SentStamp; | ||
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; | ||
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; | ||
|
||
|
@@ -51,11 +51,8 @@ public function onMessageFailed(WorkerMessageFailedEvent $event) | |
$envelope = $event->getEnvelope(); | ||
|
||
// avoid re-sending to the failed sender | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see that it makes sense. Otherwise consuming the failure transport could be an endless loop of processing and failing and requeuing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, exactly. However, the failure transport acts like any normal transport. If the transport has retries configured (by default it does, and the default is 3 tries), the message will be retried 3 times on the failure transport before being discarded. In this PR, I even added some extra info on the retry command that says that this message has already been tried 1/2/3... times on the failure transport previously. |
||
foreach ($envelope->all(SentStamp::class) as $sentStamp) { | ||
/** @var SentStamp $sentStamp */ | ||
if ($sentStamp->getSenderAlias() === $this->failureSenderAlias) { | ||
return; | ||
} | ||
if (null !== $envelope->last(SentToFailureTransportStamp::class)) { | ||
return; | ||
} | ||
|
||
// remove the received stamp so it's redelivered | ||
|
@@ -67,8 +64,9 @@ public function onMessageFailed(WorkerMessageFailedEvent $event) | |
$flattenedException = \class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null; | ||
$envelope = $envelope->withoutAll(ReceivedStamp::class) | ||
->withoutAll(TransportMessageIdStamp::class) | ||
->with(new SentToFailureTransportStamp($throwable->getMessage(), $event->getReceiverName(), $flattenedException)) | ||
->with(new RedeliveryStamp(0, $this->failureSenderAlias)); | ||
->with(new SentToFailureTransportStamp($event->getReceiverName())) | ||
->with(new DelayStamp(0)) | ||
->with(new RedeliveryStamp(0, $this->failureSenderAlias, $throwable->getMessage(), $flattenedException)); | ||
|
||
if (null !== $this->logger) { | ||
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Middleware; | ||
|
||
use Symfony\Component\Messenger\Envelope; | ||
use Symfony\Component\Messenger\Stamp\ReceivedStamp; | ||
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; | ||
|
||
/** | ||
* @author Ryan Weaver <ryan@symfonycasts.com> | ||
* | ||
* @experimental in 4.3 | ||
*/ | ||
class FailedMessageProcessingMiddleware implements MiddlewareInterface | ||
{ | ||
public function handle(Envelope $envelope, StackInterface $stack): Envelope | ||
{ | ||
// look for "received" messages decorated with the SentToFailureTransportStamp | ||
/** @var SentToFailureTransportStamp|null $sentToFailureStamp */ | ||
$sentToFailureStamp = $envelope->last(SentToFailureTransportStamp::class); | ||
if (null !== $sentToFailureStamp && null !== $envelope->last(ReceivedStamp::class)) { | ||
// mark the message as "received" from the original transport | ||
// this guarantees the same behavior as when originally received | ||
$envelope = $envelope->with(new ReceivedStamp($sentToFailureStamp->getOriginalReceiverName())); | ||
} | ||
|
||
return $stack->next()->handle($envelope, $stack); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it is only collecting one message at a time a few lines above
foreach ($receiver->all(1) as $envelope) {
? Wouldn't it be better to batch more messages to avoid querying doctrine for each message?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. We could... but I don't see a lot of benefit. This is the interactive mode, and we're asking the user if they want to retry one-by-one. In theory, something else could also be reading from the failure transport at the same time, so getting whatever the latest id is one-by-one seems sensible.