8000 [Messenger] On failure retry, make message appear as received from or… · symfony/symfony@ce9154f · GitHub
[go: up one dir, main page]

Skip to content

Commit ce9154f

Browse files
committed
[Messenger] On failure retry, make message appear as received from original transport
This fixes a bug where the incorrect handlers would be called (when using transport-based handler config) because the message was received from the failure transport, and not the original transport. This also adds a large integration test for the most complex interactions.
1 parent 72863e4 commit ce9154f

File tree

10 files changed

+334
-11
lines changed

10 files changed

+334
-11
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line 10000 change
@@ -1659,6 +1659,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16591659
'before' => [
16601660
['id' => 'add_bus_name_stamp_middleware'],
16611661
['id' => 'dispatch_after_current_bus'],
1662+
['id' => 'messenger.middleware.failed_message_processing_middleware'],
16621663
],
16631664
'after' => [
16641665
['id' => 'send_message'],

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
<argument type="service" id="validator" />
4949
</service>
5050

51+
<service id="messenger.middleware.failed_message_processing_middleware" class="Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware" />
52+
5153
<service id="messenger.middleware.traceable" class="Symfony\Component\Messenger\Middleware\TraceableMiddleware" abstract="true">
5254
<argument type="service" id="debug.stopwatch" />
5355
</service>

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,7 @@ public function testMessengerWithMultipleBuses()
753753
$this->assertEquals([
754754
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
755755
['id' => 'dispatch_after_current_bus'],
756+
['id' => 'messenger.middleware.failed_message_processing_middleware'],
756757
['id' => 'send_message'],
757758
['id' => 'handle_message'],
758759
], $container->getParameter('messenger.bus.commands.middleware'));
@@ -761,6 +762,7 @@ public function testMessengerWithMultipleBuses()
761762
$this->assertEquals([
762763
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
763764
['id' => 'dispatch_after_current_bus'],
765+
['id' => 'messenger.middleware.failed_message_processing_middleware'],
764766
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
765767
['id' => 'send_message'],
766768
['id' => 'handle_message'],

src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Symfony\Component\Console\Helper\Dumper;
1616
use Symfony\Component\Console\Style\SymfonyStyle;
1717
use Symfony\Component\Messenger\Envelope;
18+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
1819
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
1920
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
2021
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
@@ -83,6 +84,13 @@ protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
8384

8485
$io->table([], $rows);
8586

87+
/** @var RedeliveryStamp $redeliveryStamp */
88+
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
89+
if (null !== $redeliveryStamp) {
90+
$io->writeln(sprintf(' This message has already been tried by the failure transport %d time%s before.', $redeliveryStamp->getRetryCount(), 1 !== $redeliveryStamp->getRetryCount() ? 's' : ''));
91+
$io->newLine();
92+
}
93+
8694
if ($io->isVeryVerbose()) {
8795
$io->title('Message:');
8896
$dump = new Dumper($io);

src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private function runInteractive(SymfonyStyle $io, bool $shouldForce)
154154
}
155155

156156
// avoid success message if nothing was processed
157-
if (1 < $count) {
157+
if (1 <= $count) {
158158
$io->success('All failed messages have been handled or removed!');
159159
}
160160
}

src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use Symfony\Component\Messenger\MessageBusInterface;
1919
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2020
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
21-
use Symfony\Component\Messenger\Stamp\SentStamp;
2221
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
2322
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
2423

@@ -51,11 +50,8 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)
5150
$envelope = $event->getEnvelope();
5251

5352
// avoid re-sending to the failed sender
54-
foreach ($envelope->all(SentStamp::class) as $sentStamp) {
55-
/** @var SentStamp $sentStamp */
56-
if ($sentStamp->getSenderAlias() === $this->failureSenderAlias) {
57-
return;
58-
}
53+
if (null !== $envelope->last(SentToFailureTransportStamp::class)) {
54+
return;
5955
}
6056

6157
// remove the received stamp so it's redelivered
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
16+
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
17+
18+
/**
19+
* @author Ryan Weaver <ryan@symfonycasts.com>
20+
*
21+
* @experimental in 4.3
22+
*/
23+
class FailedMessageProcessingMiddleware implements MiddlewareInterface
24+
{
25+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
26+
{
27+
// look for "received" messages decorated with the SentToFailureTransportStamp
28+
/** @var SentToFailureTransportStamp|null $sentToFailureStamp */
29+
$sentToFailureStamp = $envelope->last(SentToFailureTransportStamp::class);
30+
if (null !== $sentToFailureStamp && null !== $envelope->last(ReceivedStamp::class)) {
31+
// mark the message as "received" from the original transport
32+
// this guarantees the same behavior as when originally received
33+
$envelope = $envelope->with(new ReceivedStamp($sentToFailureStamp->getOriginalReceiverName()));
34+
}
35+
36+
return $stack->next()->handle($envelope, $stack);
37+
}
38+
}

src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
use Symfony\Component\Messenger\MessageBusInterface;
2020
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2121
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
22-
use Symfony\Component\Messenger\Stamp\SentStamp;
2322
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
2423
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
2524

@@ -112,7 +111,7 @@ public function testDoNotRedeliverToFailed()
112111
);
113112

114113
$envelope = new Envelope(new \stdClass(), [
115-
new SentStamp('MySender', 'failure_sender'),
114+
new SentToFailureTransportStamp('something went wrong', 'my_receiver'),
116115
]);
117116
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), false);
118117

< 316C /div>

0 commit comments

Comments
 (0)
0