21
21
use Symfony \Component \Messenger \Retry \RetryStrategyInterface ;
22
22
use Symfony \Component \Messenger \Stamp \DelayStamp ;
23
23
use Symfony \Component \Messenger \Stamp \RedeliveryStamp ;
24
+ use Symfony \Component \Messenger \Stamp \StampInterface ;
24
25
use Symfony \Component \Messenger \Transport \Sender \SenderInterface ;
25
26
26
27
/**
@@ -31,12 +32,14 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
31
32
private $ sendersLocator ;
32
33
private $ retryStrategyLocator ;
33
34
private $ logger ;
35
+ private $ historySize ;
34
36
35
- public function __construct (ContainerInterface $ sendersLocator , ContainerInterface $ retryStrategyLocator , LoggerInterface $ logger = null )
37
+ public function __construct (ContainerInterface $ sendersLocator , ContainerInterface $ retryStrategyLocator , LoggerInterface $ logger = null , int $ historySize = 10 )
36
38
{
37
39
$ this ->sendersLocator = $ sendersLocator ;
38
40
$ this ->retryStrategyLocator = $ retryStrategyLocator ;
39
41
$ this ->logger = $ logger ;
42
+ $ this ->historySize = $ historySize ;
40
43
}
41
44
42
45
public function onMessageFailed (WorkerMessageFailedEvent $ event )
@@ -64,7 +67,7 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)
64
67
}
65
68
66
69
// add the delay and retry stamp info
67
- $ retryEnvelope = $ envelope -> with ( new DelayStamp ($ delay ), new RedeliveryStamp ($ retryCount ));
70
+ $ retryEnvelope = $ this -> withLimitedHistory ( $ envelope , new DelayStamp ($ delay ), new RedeliveryStamp ($ retryCount ));
68
71
69
72
// re-send the message for retry
70
73
$ this ->getSenderForTransport ($ event ->getReceiverName ())->send ($ retryEnvelope );
@@ -75,6 +78,30 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)
75
78
}
76
79
}
77
80
81
+ /**
82
+ * Adds stamps to the envelope by keeping only the First + Last N stamps
83
+ */
84
+ private function withLimitedHistory (Envelope $ envelope , StampInterface ...$ stamps ): Envelope
85
+ {
86
+ foreach ($ stamps as $ stamp ) {
87
+ $ history = $ envelope ->all (get_class ($ stamp ));
88
+ if (\count ($ history ) < $ this ->historySize ) {
89
+ $ envelope = $ envelope ->with ($ stamp );
90
+ continue ;
91
+ }
92
+
93
+ $ history = \array_merge (
94
+ [$ history [0 ]],
95
+ \array_slice ($ history , -$ this ->historySize + 2 ),
96
+ [$ stamp ]
97
+ );
98
+
99
+ $ envelope = $ envelope ->withoutAll (get_class ($ stamp ))->with (...$ history );
100
+ }
101
+
102
+ return $ envelope ;
8000
103
+ }
104
+
78
105
public static function getSubscribedEvents ()
79
106
{
80
107
return [
0 commit comments