diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md
index d1e85da795d8f..4a19e59dcbe2d 100644
--- a/src/Symfony/Component/Messenger/CHANGELOG.md
+++ b/src/Symfony/Component/Messenger/CHANGELOG.md
@@ -35,6 +35,7 @@ CHANGELOG
* The `ContainerHandlerLocator`, `AbstractHandlerLocator`, `SenderLocator` and `AbstractSenderLocator` classes have been removed
* `Envelope::all()` takes a new optional `$stampFqcn` argument and returns the stamps for the specified FQCN, or all stamps by their class name
* `Envelope::get()` has been renamed `Envelope::last()`
+ * Add option `force-consumption` to force the consumption of messages even if an exception is thrown by a message handler
4.1.0
-----
diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
index 83b834c593035..6067419def23b 100644
--- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
@@ -20,6 +20,7 @@
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
+use Symfony\Component\Messenger\Transport\Receiver\ForceConsumptionReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
@@ -66,6 +67,7 @@ protected function configure(): void
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName),
+ new InputOption('force-consumption', 'f', InputOption::VALUE_REQUIRED, 'Force the consumption of messages even if an exception is thrown by a message handler', false),
))
->setDescription('Consumes messages')
->setHelp(<<<'EOF'
@@ -84,6 +86,10 @@ protected function configure(): void
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
php %command.full_name% --time-limit=3600
+
+Use the --force-consumption option to force the consumption of messages:
+
+ php %command.full_name% --force-consumption
EOF
)
;
@@ -155,6 +161,10 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
}
+ if ($input->getOption('force-consumption')) {
+ $receiver = new ForceConsumptionReceiver($receiver, $this->logger);
+ }
+
$worker = new Worker($receiver, $bus);
$worker->run();
}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/ForceConsumptionReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/ForceConsumptionReceiverTest.php
new file mode 100644
index 0000000000000..f8eff829c306c
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/ForceConsumptionReceiverTest.php
@@ -0,0 +1,68 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Transport\Receiver;
+
+use Exception;
+use PHPUnit\Framework\TestCase;
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Transport\Receiver\ForceConsumptionReceiver;
+
+class ForceConsumptionReceiverTest extends TestCase
+{
+ /**
+ * @dataProvider logProvider
+ */
+ public function testReceiverDoesNotStopWhenExceptionIsThrown(bool $isLoggable)
+ {
+ $callable = function ($handler) {
+ $handler(new Envelope(new DummyMessage('API')));
+ };
+
+ $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
+ ->setConstructorArgs(array($callable))
+ ->enableProxyingToOriginalMethods()
+ ->getMock()
+ ;
+
+ $logger = null;
+ if ($isLoggable) {
+ $logger = $this->createMock(LoggerInterface::class);
+ $logger->expects($this->once())->method('alert')
+ ->with(
+ $this->equalTo('Receiver reached an exception: "{message}"'),
+ $this->equalTo(array('message' => 'my exception'))
+ );
+ }
+
+ $decoratedReceiver->expects($this->exactly(2))->method('receive');
+
+ $timeoutReceiver = new ForceConsumptionReceiver($decoratedReceiver, $logger);
+ $timeoutReceiver->receive(
+ function () {
+ throw new Exception('my exception');
+ }
+ );
+
+ $timeoutReceiver->receive(function () {});
+ }
+
+ public function logProvider()
+ {
+ return array(
+ 'with log' => array(true),
+ 'without log' => array(false),
+ );
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/ForceConsumptionReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/ForceConsumptionReceiver.php
new file mode 100644
index 0000000000000..40b55898a87db
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Transport/Receiver/ForceConsumptionReceiver.php
@@ -0,0 +1,57 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Transport\Receiver;
+
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Messenger\Envelope;
+
+/**
+ * @author Mathias STRASSER
+ *
+ * @experimental in 4.2
+ */
+final class ForceConsumptionReceiver implements ReceiverInterface
+{
+ private $decoratedReceiver;
+ private $logger;
+
+ public function __construct(ReceiverInterface $decoratedReceiver, LoggerInterface $logger = null)
+ {
+ $this->decoratedReceiver = $decoratedReceiver;
+ $this->logger = $logger;
+ }
+
+ public function receive(callable $handler): void
+ {
+ $this->decoratedReceiver->receive(
+ function (?Envelope $envelope) use ($handler) {
+ try {
+ $handler($envelope);
+ } catch (\Throwable $exception) {
+ if (null === $this->logger) {
+ return;
+ }
+
+ $this->logger->alert(
+ 'Receiver reached an exception: "{message}"',
+ array('message' => $exception->getMessage())
+ );
+ }
+ }
+ );
+ }
+
+ public function stop(): void
+ {
+ $this->decoratedReceiver->stop();
+ }
+}