8000 feature #60018 [Messenger] Reset peak memory usage for each message (… · symfony/symfony@99881e6 · GitHub
[go: up one dir, main page]

Skip to content

Commit 99881e6

Browse files
feature #60018 [Messenger] Reset peak memory usage for each message (TimWolla)
This PR was squashed before being merged into the 7.3 branch. Discussion ---------- [Messenger] Reset peak memory usage for each message | Q | A | ------------- | --- | Branch? | 7.3 | Bug fix? | no | New feature? | yes | Deprecations? | no | Issues | - | License | MIT PHP’s peak memory usage is a bit of global state that is not useful to keep in a long-running process handling individual self-contained messages, since a single high-memory message (handler) running early in a worker’s lifecycle will skew the numbers for all remaining messages processed by that worker. By resetting the peak memory usage for each message it becomes possible to measure a given message type’s maximum memory usage more accurately, allowing to optimize hardware resources, for example by placing individual messages with handlers requiring a high memory-usage into their own transport that is executed on a larger worker instance. As part of this change the cycle collection is also moved out of the Worker into an event-listener, since the cycle collection is not a core task of the Worker, since cycle collection would happen implicitly as well. Commits ------- 896ab90 [Messenger] Reset peak memory usage for each message
2 parents a8d6a36 + 896ab90 commit 99881e6

File tree

5 files changed

+107
-5
lines changed

5 files changed

+107
-5
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
1919
use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener;
2020
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
21+
use Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener;
2122
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2223
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
2324
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
@@ -218,6 +219,9 @@
218219
service('services_resetter'),
219220
])
220221

222+
->set('messenger.listener.reset_memory_usage', ResetMemoryUsageListener::class)
223+
->tag('kernel.event_subscriber')
224+
221225
->set('messenger.routable_message_bus', RoutableMessageBus::class)
222226
->args([
223227
abstract_arg('message bus locator'),

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ CHANGELOG
99
* Add `Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and `Symfony\Component\Messenger\Stamp\DeduplicateStamp`
1010
* Add `--class-filter` option to the `messenger:failed:remove` command
1111
* Add `$stamps` parameter to `HandleTrait::handle`
12+
* Add `Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener` to reset PHP's peak memory usage for each processed message
1213

1314
7.2
1415
---
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\EventListener;
13+
14+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
15+
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
16+
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
17+
18+
/**
19+
* @author Tim Düsterhus <tim@tideways-gmbh.com>
20+
*/
21+
final class ResetMemoryUsageListener implements EventSubscriberInterface
22+
{
23+
private bool $collect = false;
24+
25+
public function resetBefore(WorkerMessageReceivedEvent $event): void
26+
{
27+
// Reset the peak memory usage for accurate measurement of the
28+
// memory usage on a per-message basis.
29+
memory_reset_peak_usage();
30+
$this->collect = true;
31+
}
32+
33+
public function collectAfter(WorkerRunningEvent $event): void
34+
{
35+
if ($event->isWorkerIdle() && $this->collect) {
36+
gc_collect_cycles();
37+
$this->collect = false;
38+
}
39+
}
40+
41+
public static function getSubscribedEvents(): array
42+
{
43+
return [
44+
WorkerMessageReceivedEvent::class => ['resetBefore', -1024],
45+
WorkerRunningEvent::class => ['collectAfter', -1024],
46+
];
47+
}
48+
}

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
2727
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
2828
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
29+
use Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener;
2930
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
3031
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
3132
use Symfony\Component\Messenger\Exception\RuntimeException;
@@ -586,7 +587,7 @@ public function testFlushBatchOnStop()
586587
$this->assertSame($expectedMessages, $handler->processedMessages);
587588
}
588589

589-
public function testGcCollectCyclesIsCalledOnMessageHandle()
590+
public function testGcCollectCyclesIsCalledOnIdleWorker()
590591
{
591592
$apiMessage = new DummyMessage('API');
592593

@@ -595,14 +596,64 @@ public function testGcCollectCyclesIsCalledOnMessageHandle()
595596
$bus = $this->createMock(MessageBusInterface::class);
596597

597598
$dispatcher = new EventDispatcher();
599+
$dispatcher->addSubscriber(new ResetMemoryUsageListener());
600+
$before = 0;
601+
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use (&$before) {
602+
static $i = 0;
603+
604+
$after = gc_status()['runs'];
605+
if (0 === $i) {
606+
$this->assertFalse($event->isWorkerIdle());
607+
$this->assertSame(0, $after - $before);
608+
} else if (1 === $i) {
609+
$this->assertTrue($event->isWorkerIdle());
610+
$this->assertSame(1, $after - $before);
611+
} else if (3 === $i) {
612+
// Wait a few idle phases before stopping.
613+
$this->assertSame(1, $after - $before);
614+
$event->getWorker()->stop();
615+
}
616+
617+
$i++;
618+
}, PHP_INT_MIN);
619+
620+
621+
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
622+
623+
gc_collect_cycles();
624+
$before = gc_status()['runs'];
625+
626+
$worker->run([
627+
'sleep' => 0,
628+
]);
629+
}
630+
631+
public function testMemoryUsageIsResetOnMessageHandle()
632+
{
633+
$apiMessage = new DummyMessage('API');
634+
635+
$receiver = new DummyReceiver([[new Envelope($apiMessage)]]);
636+
637+
$bus = $this->createMock(MessageBusInterface::class);
638+
639+
$dispatcher = new EventDispatcher();
640+
$dispatcher->addSubscriber(new ResetMemoryUsageListener());
598641
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
599642

643+
// Allocate and deallocate 4 MB. The use of random_int() is to
644+
// prevent compile-time optimization.
645+
$memory = str_repeat(random_int(0, 1), 4 * 1024 * 1024);
646+
unset($memory);
647+
648+
$before = memory_get_peak_usage();
649+
600650
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
601651
$worker->run();
602652

603-
$gcStatus = gc_status();
653+
// This should be roughly 4 MB smaller than $before.
654+
$after = memory_get_peak_usage();
604655

605-
$this->assertGreaterThan(0, $gcStatus['runs']);
656+
$this->assertTrue($after < $before);
606657
}
607658

608659
/**

src/Symfony/Component/Messenger/Worker.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,6 @@ public function run(array $options = []): void
128128
// this should prevent multiple lower priority receivers from
129129
// blocking too long before the higher priority are checked
130130
if ($envelopeHandled) {
131-
gc_collect_cycles();
132-
133131
break;
134132
}
135133
}

0 commit comments

Comments
 (0)
0