8000 [Messenger] Add a memory limit option for `ConsumeMessagesCommand` · symfony/symfony@08f98cf · GitHub
[go: up one dir, main page]

Skip to content

Commit 08f98cf

Browse files
sdelicataAlptisAssurances
authored andcommitted
[Messenger] Add a memory limit option for ConsumeMessagesCommand
1 parent cef8d28 commit 08f98cf

File tree

9 files changed

+329
-33
lines changed

9 files changed

+329
-33
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
7373
<argument type="service" id="message_bus" />
7474
<argument type="service" id="messenger.receiver_locator" />
75+
<argument type="service" id="logger" on-invalid="null" />
7576

7677
<tag name="console.command" command="messenger:consume-messages" />
7778
</service>

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
namespace Symfony\Component\Messenger\Command;
1313

1414
use Psr\Container\ContainerInterface;
15+
use Psr\Log\LoggerInterface;
1516
use Symfony\ 3419 Component\Console\Command\Command;
1617
use Symfony\Component\Console\Input\InputArgument;
1718
use Symfony\Component\Console\Input\InputInterface;
1819
use Symfony\Component\Console\Input\InputOption;
1920
use Symfony\Component\Console\Output\OutputInterface;
2021
use Symfony\Component\Messenger\MessageBusInterface;
21-
use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver;
22+
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
23+
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
2224
use Symfony\Component\Messenger\Transport\ReceiverInterface;
2325
use Symfony\Component\Messenger\Worker;
2426

@@ -33,24 +35,27 @@ class ConsumeMessagesCommand extends Command
3335

3436
private $bus;
3537
private $receiverLocator;
38+
private $logger;
3639

37-
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator)
40+
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator, LoggerInterface $logger = null)
3841
{
3942
parent::__construct();
4043

4144
$this->bus = $bus;
4245
$this->receiverLocator = $receiverLocator;
46+
$this->logger = $logger;
4347
}
4448

4549
/**
4650
* {@inheritdoc}
4751
*/
48-
protected function configure()
52+
protected function configure(): void
4953
{
5054
$this
5155
->setDefinition(array(
5256
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
5357
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
58+
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
5459
))
5560
->setDescription('Consumes messages')
5661
->setHelp(<<<'EOF'
@@ -61,6 +66,10 @@ protected function configure()
6166
Use the --limit option to limit the number of messages received:
6267
6368
<info>php %command.full_name% <receiver-name> --limit=10</info>
69+
70+
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
71+
72+
<info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
6473
EOF
6574
)
6675
;
@@ -69,7 +78,7 @@ protected function configure()
6978
/**
7079
* {@inheritdoc}
7180
*/
72-
protected function execute(InputInterface $input, OutputInterface $output)
81+
protected function execute(InputInterface $input, OutputInterface $output): void
7382
{
7483
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
7584
throw new \RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
@@ -80,10 +89,39 @@ protected function execute(InputInterface $input, OutputInterface $output)
8089
}
8190

8291
if ($limit = $input->getOption('limit')) {
83-
$receiver = new MaximumCountReceiver($receiver, $limit);
92+
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
93+
}
94+
95+
if ($memoryLimit = $input->getOption('memory-limit')) {
96+
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
8497
}
8598

8699
$worker = new Worker($receiver, $this->bus);
87100
$worker->run();
88101
}
102+
103+
private function convertToBytes(string $memoryLimit): int
104+
{
105+
$memoryLimit = strtolower($memoryLimit);
106+
$max = strtolower(ltrim($memoryLimit, '+'));
107+
if (0 === strpos($max, '0x')) {
108+
$max = intval($max, 16);
109+
} elseif (0 === strpos($max, '0')) {
110+
$max = intval($max, 8);
111+
} else {
112+
$max = (int) $max;
113+
}
114+
115+
switch (substr($memoryLimit, -1)) {
116+
case 't': $max *= 1024;
117+
// no break
118+
case 'g': $max *= 1024;
119+
// no break
120+
case 'm': $max *= 1024;
121+
// no break
122+
case 'k': $max *= 1024;
123+
}
124+
125+
return $max;
126+
}
89127
}

src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpExtIntegrationTest.php

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpSender;
1717
use Symfony\Component\Messenger\Adapter\AmqpExt\Connection;
1818
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
19-
use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver;
2019
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
2120
use Symfony\Component\Process\PhpProcess;
2221
use Symfony\Component\Process\Process;
@@ -58,7 +57,7 @@ public function testItSendsAndReceivesMessages()
5857
$receiver->receive(function ($message) use ($receiver, &$receivedMessages, $firstMessage, $secondMessage) {
5958
$this->assertEquals(0 == $receivedMessages ? $firstMessage : $secondMessage, $message);
6059

61-
if (2 == ++$receivedMessages) {
60+
if (2 === ++$receivedMessages) {
6261
$receiver->stop();
6362
}
6463
});
@@ -116,9 +115,15 @@ public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
116115
$connection->queue()->purge();
117116

118117
$sender = new AmqpSender($serializer, $connection);
119-
$receiver = new MaximumCountReceiver(new AmqpReceiver($serializer, $connection), 2);
120-
$receiver->receive(function ($message) {
118+
$receiver = new AmqpReceiver($serializer, $connection);
119+
120+
$receivedMessages = 0;
121+
$receiver->receive(function ($message) use ($receiver, &$receivedMessages) {
121122
$this->assertNull($message);
123+
124+
if (2 === ++$receivedMessages) {
125+
$receiver->stop();
126+
}
122127
});
123128
}
124129

Lines changed: 25 additions & 0 deletions
F438
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
namespace Symfony\Component\Messenger\Tests\Fixtures;
4+
5+
use Symfony\Component\Messenger\Transport\ReceiverInterface;
6+
7+
class CallbackReceiver implements ReceiverInterface
8+
{
9+
private $callable;
10+
11+
public function __construct(callable $callable)
12+
{
13+
$this->callable = $callable;
14+
}
15+
16+
public function receive(callable $handler): void
17+
{
18+
$callable = $this->callable;
19+
$callable($handler);
20+
}
21+
22+
public function stop(): void
23+
{
24+
}
25+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Psr\Log\LoggerInterface;
16+
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
17+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
18+
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
19+
20+
class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
21+
{
22+
/**
23+
* @dataProvider memoryProvider
24+
*/
25+
public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
26+
{
27+
$callable = function ($handler) {
28+
$handler(new DummyMessage('API'));
29+
};
30+
31+
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
32+
->setConstructorArgs(array($callable))
33+
->enableProxyingToOriginalMethods()
34+
->getMock();
35+
36+
$decoratedReceiver->expects($this->once())->method('receive');
37+
if (true === $shouldStop) {
38+
$decoratedReceiver->expects($this->once())->method('stop');
39+
} else {
40+
$decoratedReceiver->expects($this->never())->method('stop');
41+
}
42+
43+
$memoryResolver = function () use ($memoryUsage) {
44+
return $memoryUsage;
45+
};
46+
47+
$memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, $memoryLimit, null, $memoryResolver);
48+
$memoryLimitReceiver->receive(function () {});
49+
}
50+
51+
public function memoryProvider()
52+
{
53+
yield array(2048, 1024, true);
54+
yield array(1024, 1024, false);
55+
yield array(1024, 2048, false);
56+
}
57+
58+
public function testReceiverLogsMemoryExceededWhenLoggerIsGiven()
59+
{
60+
$callable = function ($handler) {
61+
$handler(new DummyMessage('API'));
62+
};
63+
64+
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
65+
->setConstructorArgs(array($callable))
66+
->enableProxyingToOriginalMethods()
67+
->getMock();
68+
69+
$decoratedReceiver->expects($this->once())->method('receive');
70+
$decoratedReceiver->expects($this->once())->method('stop');
71+
72+
$logger = $this->createMock(LoggerInterface::class);
73+
$logger->expects($this->once())->method('info')
74+
->with('Receiver stopped due to memory limit of {limit} exceeded', array('limit' => 64 * 1024 * 1024));
75+
76+
$memoryResolver = function () {
77+
return 70 * 1024 * 1024;
78+
};
79+
80+
$memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, 64 * 1024 * 1024, $logger, $memoryResolver);
81+
$memoryLimitReceiver->receive(function () {});
82+
}
83+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Psr\Log\LoggerInterface;
16+
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
17+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
18+
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
19+
20+
class StopWhenMessageCountIsExceededReceiverTest extends TestCase
21+
{
22+
/**
23+
* @dataProvider countProvider
24+
*/
25+
public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop)
26+
{
27+
$callable = function ($handler) {
28+
$handler(new DummyMessage('First message'));
29+
$handler(new DummyMessage('Second message'));
30+
$handler(new DummyMessage('Third message'));
31+
};
32+
33+
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
34+
->setConstructorArgs(array($callable))
35+
->enableProxyingToOriginalMethods()
36+
->getMock();
37+
38+
$decoratedReceiver->expects($this->once())->method('receive');
39+
if (true === $shouldStop) {
40+
$decoratedReceiver->expects($this->any())->method('stop');
41+
} else {
42+
$decoratedReceiver->expects($this->never())->method('stop');
43+
}
44+
45+
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, $max);
46+
$maximumCountReceiver->receive(function () {});
47+
}
48+
49+
public function countProvider()
50+
{
51+
yield array(1, true);
52+
yield array(2, true);
53+
yield array(3, true);
54+
yield array(4, false);
55+
}
56+
57+
public function testReceiverDoesntIncreaseItsCounterWhenReceiveNullMessage()
58+
{
59+
$callable = function ($handler) {
60+
$handler(null);
61+
$handler(null);
62+
$handler(null);
63+
$handler(null);
64+
};
65+
66+
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
67+
->setConstructorArgs(array($callable))
68+
->enableProxyingToOriginalMethods()
69+
->getMock();
70+
71+
$decoratedReceiver->expects($this->once())->method('receive');
72+
$decoratedReceiver->expects($this->never())->method('stop');
73+
74+
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1);
75+
$maximumCountReceiver->receive(function () {});
76+
}
77+
78+
public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven()
79+
{
80+
$callable = function ($handler) {
81+
$handler(new DummyMessage('First message'));
82+
};
83+
84+
$decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
85+
->setConstructorArgs(array($callable))
86+
->enableProxyingToOriginalMethods()
87+
->getMock();
88+
89+
$decoratedReceiver->expects($this->once())->method('receive');
90+
$decoratedReceiver->expects($this->once())->method('stop');
91+
92+
$logger = $this->createMock(LoggerInterface::class);
93+
$logger->expects($this->once())->method('info')
94+
->with(
95+
$this->equalTo('Receiver stopped due to maximum count of {count} exceeded'),
96+
$this->equalTo(array('count' => 1))
97+
);
98+
99+
$maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1, $logger);
100+
$maximumCountReceiver->receive(function () {});
101+
}
102+
}

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
1616
use Symfony\Component\Messenger\MessageBusInterface;
17+
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
1718
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
18-
use Symfony\Component\Messenger\Transport\ReceiverInterface;
1919
use Symfony\Component\Messenger\Worker;
20< E3E9 code>20

2121
class WorkerTest extends TestCase
@@ -83,23 +83,3 @@ public function testWorkerDoesNotSendNullMessagesToTheBus()
8383
$worker->run();
8484
}
8585
}
86-
87-
class CallbackReceiver implements ReceiverInterface
88-
{
89-
private $callable;
90-
91-
public function __construct(callable $callable)
92-
{
93-
$this->callable = $callable;
94-
}
95-
96-
public function receive(callable $handler): void
97-
{
98-
$callable = $this->callable;
99-
$callable($handler);
100-
}
101-
102-
public function stop(): void
103-
{
104-
}
105-
}

0 commit comments

Comments
 (0)
0