From 16cf4fc4d0617df32cdeb334d2f791f711539a09 Mon Sep 17 00:00:00 2001 From: Thomas Talbot Date: Sat, 11 Jun 2022 15:17:48 +0200 Subject: [PATCH] [Messenger] move resetting services at worker stopped into ResetServicesListener --- .../EventListener/ResetServicesListener.php | 8 ++++ .../Command/ConsumeMessagesCommandTest.php | 19 ++++----- .../ResetServicesListenerTest.php | 12 ++++++ .../Component/Messenger/Tests/WorkerTest.php | 39 ++++++++++++++++++- src/Symfony/Component/Messenger/Worker.php | 10 ----- 5 files changed, 66 insertions(+), 22 deletions(-) diff --git a/src/Symfony/Component/Messenger/EventListener/ResetServicesListener.php b/src/Symfony/Component/Messenger/EventListener/ResetServicesListener.php index b57ee728981b6..dd170e0e433a1 100644 --- a/src/Symfony/Component/Messenger/EventListener/ResetServicesListener.php +++ b/src/Symfony/Component/Messenger/EventListener/ResetServicesListener.php @@ -14,6 +14,8 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter; use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Event\WorkerStoppedEvent; +use Symfony\Contracts\Service\ResetInterface; /** * @author Grégoire Pineau @@ -34,10 +36,16 @@ public function resetServices(WorkerRunningEvent $event): void } } + public function resetServicesAtStop(WorkerStoppedEvent $event): void + { + $this->servicesResetter->reset(); + } + public static function getSubscribedEvents(): array { return [ WorkerRunningEvent::class => ['resetServices', -1024], + WorkerStoppedEvent::class => ['resetServicesAtStop', -1024], ]; } } diff --git a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php index 8a6f5a7d608cf..1bcfeb04a987a 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php @@ -26,6 +26,7 @@ use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\RoutableMessageBus; use Symfony\Component\Messenger\Stamp\BusNameStamp; +use Symfony\Component\Messenger\Tests\ResettableDummyReceiver; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; class ConsumeMessagesCommandTest extends TestCase @@ -116,15 +117,11 @@ public function testRunWithResetServicesOption(bool $shouldReset) { $envelope = new Envelope(new \stdClass()); - $receiver = $this->createMock(ReceiverInterface::class); - $receiver - ->expects($this->exactly(3)) - ->method('get') - ->willReturnOnConsecutiveCalls( - [$envelope], - [/* idle */], - [$envelope, $envelope] - ); + $receiver = new ResettableDummyReceiver([ + [$envelope], + [/* idle */], + [$envelope, $envelope], + ]); $msgCount = 3; $receiverLocator = $this->createMock(ContainerInterface::class); @@ -134,8 +131,7 @@ public function testRunWithResetServicesOption(bool $shouldReset) $bus = $this->createMock(RoutableMessageBus::class); $bus->expects($this->exactly($msgCount))->method('dispatch'); - $servicesResetter = $this->createMock(ServicesResetter::class); - $servicesResetter->expects($this->exactly($shouldReset ? $msgCount : 0))->method('reset'); + $servicesResetter = new ServicesResetter(new \ArrayIterator([$receiver]), ['reset']); $command = new ConsumeMessagesCommand($bus, $receiverLocator, new EventDispatcher(), null, [], new ResetServicesListener($servicesResetter)); @@ -148,6 +144,7 @@ public function testRunWithResetServicesOption(bool $shouldReset) '--limit' => $msgCount, ], $shouldReset ? [] : ['--no-reset' => null])); + $this->assertEquals($shouldReset, $receiver->hasBeenReset(), '$receiver->reset() should have been called'); $tester->assertCommandIsSuccessful(); $this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay()); } diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/ResetServicesListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/ResetServicesListenerTest.php index ce8f771a0952f..12f86ec6c83cb 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/ResetServicesListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/ResetServicesListenerTest.php @@ -14,6 +14,7 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter; use Symfony\Component\Messenger\Event\WorkerRunningEvent; +use Symfony\Component\Messenger\Event\WorkerStoppedEvent; use Symfony\Component\Messenger\EventListener\ResetServicesListener; use Symfony\Component\Messenger\Worker; @@ -38,4 +39,15 @@ public function testResetServices(bool $shouldReset) $resetListener = new ResetServicesListener($servicesResetter); $resetListener->resetServices($event); } + + public function testResetServicesAtStop() + { + $servicesResetter = $this->createMock(ServicesResetter::class); + $servicesResetter->expects($this->once())->method('reset'); + + $event = new WorkerStoppedEvent($this->createMock(Worker::class)); + + $resetListener = new ResetServicesListener($servicesResetter); + $resetListener->resetServicesAtStop($event); + } } diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index f09d2648798bf..4d0f79b10e41a 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -14,6 +14,7 @@ use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; use Symfony\Component\EventDispatcher\EventDispatcher; +use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; @@ -21,6 +22,7 @@ use Symfony\Component\Messenger\Event\WorkerRunningEvent; use Symfony\Component\Messenger\Event\WorkerStartedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; +use Symfony\Component\Messenger\EventListener\ResetServicesListener; use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; use Symfony\Component\Messenger\Exception\RuntimeException; use Symfony\Component\Messenger\Handler\Acknowledger; @@ -103,15 +105,50 @@ public function testWorkerResetsConnectionIfReceiverIsResettable() { $resettableReceiver = new ResettableDummyReceiver([]); - $bus = $this->createMock(MessageBusInterface::class); $dispatcher = new EventDispatcher(); + $dispatcher->addSubscriber(new ResetServicesListener(new ServicesResetter(new \ArrayIterator([$resettableReceiver]), ['reset']))); + $bus = $this->createMock(MessageBusInterface::class); $worker = new Worker([$resettableReceiver], $bus, $dispatcher); $worker->stop(); $worker->run(); $this->assertTrue($resettableReceiver->hasBeenReset()); } + public function testWorkerResetsTransportsIfResetServicesListenerIsCalled() + { + $envelope = new Envelope(new DummyMessage('Hello')); + $resettableReceiver = new ResettableDummyReceiver([[$envelope]]); + + $dispatcher = new EventDispatcher(); + $dispatcher->addSubscriber(new ResetServicesListener(new ServicesResetter(new \ArrayIterator([$resettableReceiver]), ['reset']))); + $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) { + $event->getWorker()->stop(); + }); + + $bus = $this->createMock(MessageBusInterface::class); + $worker = new Worker([$resettableReceiver], $bus, $dispatcher); + $worker->run(); + $this->assertTrue($resettableReceiver->hasBeenReset()); + } + + public function testWorkerDoesNotResetTransportsIfResetServicesListenerIsNotCalled() + { + $envelope = new Envelope(new DummyMessage('Hello')); + $resettableReceiver = new ResettableDummyReceiver([[$envelope]]); + + $bus = $this->createMock(MessageBusInterface::class); + + $dispatcher = new EventDispatcher(); + $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) { + $event->getWorker()->stop(); + }); + + $worker = new Worker([$resettableReceiver], $bus, $dispatcher); + $worker->run(); + $this->assertFalse($resettableReceiver->hasBeenReset()); + } + public function testWorkerDoesNotSendNullMessagesToTheBus() { $receiver = new DummyReceiver([ diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index f670b564f8fcc..34473bac32267 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -136,7 +136,6 @@ public function run(array $options = []): void $this->flush(true); $this->dispatchEvent(new WorkerStoppedEvent($this)); - $this->resetReceiverConnections(); } private function handleMessage(Envelope $envelope, string $transportName): void @@ -260,15 +259,6 @@ public function getMetadata(): WorkerMetadata return $this->metadata; } - private function resetReceiverConnections(): void - { - foreach ($this->receivers as $receiver) { - if ($receiver instanceof ResetInterface) { - $receiver->reset(); - } - } - } - private function dispatchEvent(object $event): void { if (null === $this->eventDispatcher) {