8000 bug #36810 [Messenger] Do not stack retry stamp (jderusse) · symfony/symfony@df3ab76 · GitHub
[go: up one dir, main page]

Skip to content

Commit df3ab76

Browse files
committed
bug #36810 [Messenger] Do not stack retry stamp (jderusse)
This PR was squashed before being merged into the 4.4 branch. Discussion ---------- [Messenger] Do not stack retry stamp | Q | A | ------------- | --- | Branch? | 4.4 | Bug fix? | yes | New feature? | no | Deprecations? | no | Tickets | / | License | MIT | Doc PR | / With the "RecoverableException" or a very high number of retry, the message is currently stacking a lot of stamp, which increase the size of the message sent to queue and (in my case) reach the "maximum size allowed" after 60 retries + php serializer This PR removes previous stamps before adding the new Delay+RetryStamps. Commits ------- ad6f853 [Messenger] Do not stack retry stamp
2 parents 9d995bd + ad6f853 commit df3ab76

File tree

2 files changed

+65
-2
lines changed

2 files changed

+65
-2
lines changed

src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
2222
use Symfony\Component\Messenger\Stamp\DelayStamp;
2323
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
24+
use Symfony\Component\Messenger\Stamp\StampInterface;
2425
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
2526

2627
/**
@@ -31,12 +32,14 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
3132
private $sendersLocator;
3233
private $retryStrategyLocator;
3334
private $logger;
35+
private $historySize;
3436

35-
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null)
37+
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, int $historySize = 10)
3638
{
3739
$this->sendersLocator = $sendersLocator;
3840
$this->retryStrategyLocator = $retryStrategyLocator;
3941
$this->logger = $logger;
42+
$this->historySize = $historySize;
4043
}
4144

4245
public function onMessageFailed(WorkerMessageFailedEvent $event)
@@ -64,7 +67,7 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)
6467
}
6568

6669
// add the delay and retry stamp info
67-
$retryEnvelope = $envelope->with(new DelayStamp($delay), new RedeliveryStamp($retryCount));
70+
$retryEnvelope = $this->withLimitedHistory($envelope, new DelayStamp($delay), new RedeliveryStamp($retryCount));
6871

6972
// re-send the message for retry
7073
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
@@ -75,6 +78,30 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)
7578
}
7679
}
7780

81+
/**
82+
* Adds stamps to the envelope by keeping only the First + Last N stamps
83+
*/
84+
private function withLimitedHistory(Envelope $envelope, StampInterface ...$stamps): Envelope
85+
{
86+
foreach ($stamps as $stamp) {
87+
$history = $envelope->all(get_class($stamp));
88+
if (\count($history) < $this->historySize) {
89+
$envelope = $envelope->with($stamp);
90+
continue;
91+
}
92+
93+
$history = \array_merge(
94+
[$history[0]],
95+
\array_slice($history, -$this->historySize + 2),
96+
[$stamp]
97+
);
98+
99+
$envelope = $envelope->withoutAll(get_class($stamp))->with(...$history);
100+
}
101+
102+
return $envelope;
103+
}
104+
78105
public static function getSubscribedEvents()
79106
{
80107
return [

src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,40 @@ public function testEnvelopeIsSentToTransportOnRetry()
7676

7777
$listener->onMessageFailed($event);
7878
}
79+
80+
public function testEnvelopeKeepOnlyTheLast10Stamps()
81+
{
82+
$exception = new \Exception('no!');
83+
$stamps = \array_merge(
84+
\array_fill(0, 15, new DelayStamp(1)),
85+
\array_fill(0, 3, new RedeliveryStamp(1))
86+
);
87+
$envelope = new Envelope(new \stdClass(), $stamps);
88+
89+
$sender = $this->createMock(SenderInterface::class);
90+
$sender->expects($this->once())->method('send')->willReturnCallback(function (Envelope $envelope) {
91+
$delayStamps = $envelope->all(DelayStamp::class);
92+
$redeliveryStamps = $envelope->all(RedeliveryStamp::class);
93+
94+
$this->assertCount(10, $delayStamps);
95+
$this->assertCount(4, $redeliveryStamps);
96+
97+
return $envelope;
98+
});
99+
$senderLocator = $this->createMock(ContainerInterface::class);
100+
$senderLocator->expects($this->once())->method('has')->willReturn(true);
101+
$senderLocator->expects($this->once())->method('get')->willReturn($sender);
102+
$retryStategy = $this->createMock(RetryStrategyInterface::class);
103+
$retryStategy->expects($this->once())->method('isRetryable')->willReturn(true);
104+
$retryStategy->expects($this->once())->method('getWaitingTime')->willReturn(1000);
105+
$retryStrategyLocator = $this->createMock(ContainerInterface::class);
106+
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
107+
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);
108+
109+
$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
110+
111+
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
112+
113+
$listener->onMessageFailed($event);
114+
}
79115
}

0 commit comments

Comments
 (0)
0