diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index 8798d5f2e5e3e..e02cd1ca34c0d 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -18,6 +18,7 @@ use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory; use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener; use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener; +use Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener; use Symfony\Component\Messenger\EventListener\ResetServicesListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; @@ -218,6 +219,9 @@ service('services_resetter'), ]) + ->set('messenger.listener.reset_memory_usage', ResetMemoryUsageListener::class) + ->tag('kernel.event_subscriber') + ->set('messenger.routable_message_bus', RoutableMessageBus::class) ->args([ abstract_arg('message bus locator'), diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index a48e4c254ca25..c4eae318d3518 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -9,6 +9,7 @@ CHANGELOG * Add `Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and `Symfony\Component\Messenger\Stamp\DeduplicateStamp` * Add `--class-filter` option to the `messenger:failed:remove` command * Add `$stamps` parameter to `HandleTrait::handle` + * Add `Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener` to reset PHP's peak memory usage for each processed message 7.2 --- diff --git a/src/Symfony/Component/Messenger/EventListener/ResetMemoryUsageListener.php b/src/Symfony/Component/Messenger/EventListener/ResetMemoryUsageListener.php new file mode 100644 index 0000000000000..7a06501c508c8 --- /dev/null +++ b/src/Symfony/Component/Messenger/EventListener/ResetMemoryUsageListener.php @@ -0,0 +1,48 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\EventListener; + +use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; +use Symfony\Component\Messenger\Event\WorkerRunningEvent; + +/** + * @author Tim Düsterhus + */ +final class ResetMemoryUsageListener implements EventSubscriberInterface +{ + private bool $collect = false; + + public function resetBefore(WorkerMessageReceivedEvent $event): void + { + // Reset the peak memory usage for accurate measurement of the + // memory usage on a per-message basis. + memory_reset_peak_usage(); + $this->collect = true; + } + + public function collectAfter(WorkerRunningEvent $event): void + { + if ($event->isWorkerIdle() && $this->collect) { + gc_collect_cycles(); + $this->collect = false; + } + } + + public static function getSubscribedEvents(): array + { + return [ + WorkerMessageReceivedEvent::class => ['resetBefore', -1024], + WorkerRunningEvent::class => ['collectAfter', -1024], + ]; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 553368a193c09..037edf83d4862 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -26,6 +26,7 @@ use Symfony\Component\Messenger\Event\WorkerRunningEvent; use Symfony\Component\Messenger\Event\WorkerStartedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; +use Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener; use Symfony\Component\Messenger\EventListener\ResetServicesListener; use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; use Symfony\Component\Messenger\Exception\RuntimeException; @@ -586,7 +587,7 @@ public function testFlushBatchOnStop() $this->assertSame($expectedMessages, $handler->processedMessages); } - public function testGcCollectCyclesIsCalledOnMessageHandle() + public function testGcCollectCyclesIsCalledOnIdleWorker() { $apiMessage = new DummyMessage('API'); @@ -595,14 +596,64 @@ public function testGcCollectCyclesIsCalledOnMessageHandle() $bus = $this->createMock(MessageBusInterface::class); $dispatcher = new EventDispatcher(); + $dispatcher->addSubscriber(new ResetMemoryUsageListener()); + $before = 0; + $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use (&$before) { + static $i = 0; + + $after = gc_status()['runs']; + if (0 === $i) { + $this->assertFalse($event->isWorkerIdle()); + $this->assertSame(0, $after - $before); + } else if (1 === $i) { + $this->assertTrue($event->isWorkerIdle()); + $this->assertSame(1, $after - $before); + } else if (3 === $i) { + // Wait a few idle phases before stopping. + $this->assertSame(1, $after - $before); + $event->getWorker()->stop(); + } + + $i++; + }, PHP_INT_MIN); + + + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher); + + gc_collect_cycles(); + $before = gc_status()['runs']; + + $worker->run([ + 'sleep' => 0, + ]); + } + + public function testMemoryUsageIsResetOnMessageHandle() + { + $apiMessage = new DummyMessage('API'); + + $receiver = new DummyReceiver([[new Envelope($apiMessage)]]); + + $bus = $this->createMock(MessageBusInterface::class); + + $dispatcher = new EventDispatcher(); + $dispatcher->addSubscriber(new ResetMemoryUsageListener()); $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); + // Allocate and deallocate 4 MB. The use of random_int() is to + // prevent compile-time optimization. + $memory = str_repeat(random_int(0, 1), 4 * 1024 * 1024); + unset($memory); + + $before = memory_get_peak_usage(); + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher); $worker->run(); - $gcStatus = gc_status(); + // This should be roughly 4 MB smaller than $before. + $after = memory_get_peak_usage(); - $this->assertGreaterThan(0, $gcStatus['runs']); + $this->assertTrue($after < $before); } /** diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 14b30ba5645bf..f2500e3e779e8 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -128,8 +128,6 @@ public function run(array $options = []): void // this should prevent multiple lower priority receivers from // blocking too long before the higher priority are checked if ($envelopeHandled) { - gc_collect_cycles(); - break; } }