diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index b5406b59ea3f6..24c281c364d39 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -29,6 +29,7 @@ use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Worker; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; +use Symfony\Contracts\Service\ResetInterface; /** * @group time-sensitive @@ -85,6 +86,19 @@ public function testHandlingErrorCausesReject() $this->assertSame(0, $receiver->getAcknowledgeCount()); } + public function testWorkerResetsConnectionIfReceiverIsResettable() + { + $resettableReceiver = new ResettableDummyReceiver([]); + + $bus = $this->createMock(MessageBusInterface::class); + $dispatcher = new EventDispatcher(); + + $worker = new Worker([$resettableReceiver], $bus, $dispatcher); + $worker->stop(); + $worker->run(); + $this->assertTrue($resettableReceiver->hasBeenReset()); + } + public function testWorkerDoesNotSendNullMessagesToTheBus() { $receiver = new DummyReceiver([ @@ -283,3 +297,18 @@ public function getRejectCount(): int return $this->rejectCount; } } + +class ResettableDummyReceiver extends DummyReceiver implements ResetInterface +{ + private $hasBeenReset = false; + + public function reset() + { + $this->hasBeenReset = true; + } + + public function hasBeenReset(): bool + { + return $this->hasBeenReset; + } +} diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 1a41009fefc6a..98156c887bda9 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -25,6 +25,7 @@ use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; +use Symfony\Contracts\Service\ResetInterface; /** * @author Samuel Roze @@ -102,6 +103,7 @@ public function run(array $options = []): void } $this->dispatchEvent(new WorkerStoppedEvent($this)); + $this->resetReceiverConnections(); } private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName): void @@ -155,6 +157,15 @@ public function stop(): void $this->shouldStop = true; } + private function resetReceiverConnections(): void + { + foreach ($this->receivers as $transportName => $receiver) { + if ($receiver instanceof ResetInterface) { + $receiver->reset(); + } + } + } + private function dispatchEvent($event) { if (null === $this->eventDispatcher) {