File tree 2 files changed +40
-0
lines changed
src/Symfony/Component/Messenger
2 files changed +40
-0
lines changed Original file line number Diff line number Diff line change 29
29
use Symfony \Component \Messenger \Transport \Receiver \ReceiverInterface ;
30
30
use Symfony \Component \Messenger \Worker ;
31
31
use Symfony \Contracts \EventDispatcher \EventDispatcherInterface ;
32
+ use Symfony \Contracts \Service \ResetInterface ;
32
33
33
34
/**
34
35
* @group time-sensitive
@@ -85,6 +86,19 @@ public function testHandlingErrorCausesReject()
85
86
$ this ->assertSame (0 , $ receiver ->getAcknowledgeCount ());
86
87
}
87
88
89
+ public function testWorkerResetsConnectionIfReceiverIsResettable ()
90
+ {
91
+ $ resettableReceiver = new ResettableDummyReceiver ([]);
92
+
93
+ $ bus = $ this ->createMock (MessageBusInterface::class);
94
+ $ dispatcher = new EventDispatcher ();
95
+
96
+ $ worker = new Worker ([$ resettableReceiver ], $ bus , $ dispatcher );
97
+ $ worker ->stop ();
98
+ $ worker ->run ();
99
+ $ this ->assertTrue ($ resettableReceiver ->hasBeenReset ());
100
+ }
101
+
88
102
public function testWorkerDoesNotSendNullMessagesToTheBus ()
89
103
{
90
104
$ receiver = new DummyReceiver ([
@@ -283,3 +297,18 @@ public function getRejectCount(): int
283
297
return $ this ->rejectCount ;
284
298
}
285
299
}
300
+
301
+ class ResettableDummyReceiver extends DummyReceiver implements ResetInterface
302
+ {
303
+ private $ hasBeenReset = false ;
304
+
305
+ public function reset ()
306
+ {
307
+ $ this ->hasBeenReset = true ;
308
+ }
309
+
310
+ public function hasBeenReset (): bool
311
+ {
312
+ return $ this ->hasBeenReset ;
313
+ }
314
+ }
Original file line number Diff line number Diff line change 25
25
use Symfony \Component \Messenger \Stamp \ReceivedStamp ;
26
26
use Symfony \Component \Messenger \Transport \Receiver \ReceiverInterface ;
27
27
use Symfony \Contracts \EventDispatcher \EventDispatcherInterface ;
28
+ use Symfony \Contracts \Service \ResetInterface ;
28
29
29
30
/**
30
31
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -102,6 +103,7 @@ public function run(array $options = []): void
102
103
}
103
104
104
105
$ this ->dispatchEvent (new WorkerStoppedEvent ($ this ));
106
+ $ this ->resetReceiverConnections ();
105
107
}
106
108
107
109
private function handleMessage (Envelope $ envelope , ReceiverInterface $ receiver , string $ transportName ): void
@@ -155,6 +157,15 @@ public function stop(): void
155
157
$ this ->shouldStop = true ;
156
158
}
157
159
160
+ private function resetReceiverConnections (): void
161
+ {
162
+ foreach ($ this ->receivers as $ transportName => $ receiver ) {
163
+ if ($ receiver instanceof ResetInterface) {
164
+ $ receiver ->reset ();
165
+ }
166
+ }
167
+ }
168
+
158
169
private function dispatchEvent ($ event )
159
170
{
160
171
if (null === $ this ->eventDispatcher ) {
You can’t perform that action at this time.
0 commit comments