diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md
index 664ab03e49a53..47f72d79192d7 100644
--- a/src/Symfony/Component/Messenger/CHANGELOG.md
+++ b/src/Symfony/Component/Messenger/CHANGELOG.md
@@ -8,6 +8,7 @@ CHANGELOG
* Moved Doctrine transport to package `symfony/doctrine-messenger`. All classes in `Symfony\Component\Messenger\Transport\Doctrine` have been moved to `Symfony\Component\Messenger\Bridge\Doctrine\Transport`
* Moved RedisExt transport to package `symfony/redis-messenger`. All classes in `Symfony\Component\Messenger\Transport\RedisExt` have been moved to `Symfony\Component\Messenger\Bridge\Redis\Transport`
* Added support for passing a `\Throwable` argument to `RetryStrategyInterface` methods. This allows to define strategies based on the reason of the handling failure.
+ * Added `StopWorkerOnFailureLimitListener` to stop the worker after a specified amount of failed messages is reached.
5.0.0
-----
diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
index ac718153d2cda..8dfcbe240fe81 100644
--- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
@@ -23,6 +23,7 @@
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
+use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
@@ -64,6 +65,7 @@ protected function configure(): void
->setDefinition([
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
+ new InputOption('failure-limit', 'f', InputOption::VALUE_REQUIRED, 'The number of failed messages the worker can consume'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
@@ -82,6 +84,10 @@ protected function configure(): void
Use the --limit option to limit the number of messages received:
php %command.full_name% --limit=10
+
+Use the --failure-limit option to stop the worker when the given number of failed messages is reached:
+
+ php %command.full_name% --failure-limit=2
Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
@@ -152,6 +158,11 @@ protected function execute(InputInterface $input, OutputInterface $output)
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
}
+ if ($failureLimit = $input->getOption('failure-limit')) {
+ $stopsWhen[] = "reached {$failureLimit} failed messages";
+ $this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitListener($failureLimit, $this->logger));
+ }
+
if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
diff --git a/src/Symfony/Component/Messenger/EventListener/StopWorkerOnFailureLimitListener.php b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnFailureLimitListener.php
new file mode 100644
index 0000000000000..29dc6aaaf2c3b
--- /dev/null
+++ b/src/Symfony/Component/Messenger/EventListener/StopWorkerOnFailureLimitListener.php
@@ -0,0 +1,63 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\EventListener;
+
+use Psr\Log\LoggerInterface;
+use Symfony\Component\EventDispatcher\EventSubscriberInterface;
+use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
+use Symfony\Component\Messenger\Event\WorkerRunningEvent;
+use Symfony\Component\Messenger\Exception\InvalidArgumentException;
+
+/**
+ * @author Michel Hunziker
+ */
+class StopWorkerOnFailureLimitListener implements EventSubscriberInterface
+{
+ private $maximumNumberOfFailures;
+ private $logger;
+ private $failedMessages = 0;
+
+ public function __construct(int $maximumNumberOfFailures, LoggerInterface $logger = null)
+ {
+ $this->maximumNumberOfFailures = $maximumNumberOfFailures;
+ $this->logger = $logger;
+
+ if ($maximumNumberOfFailures <= 0) {
+ throw new InvalidArgumentException('Failure limit must be greater than zero.');
+ }
+ }
+
+ public function onMessageFailed(WorkerMessageFailedEvent $event): void
+ {
+ ++$this->failedMessages;
+ }
+
+ public function onWorkerRunning(WorkerRunningEvent $event): void
+ {
+ if (!$event->isWorkerIdle() && $this->failedMessages >= $this->maximumNumberOfFailures) {
+ $this->failedMessages = 0;
+ $event->getWorker()->stop();
+
+ if (null !== $this->logger) {
+ $this->logger->info('Worker stopped due to limit of {count} failed message(s) is reached', ['count' => $this->maximumNumberOfFailures]);
+ }
+ }
+ }
+
+ public static function getSubscribedEvents(): array
+ {
+ return [
+ WorkerMessageFailedEvent::class => 'onMessageFailed',
+ WorkerRunningEvent::class => 'onWorkerRunning',
+ ];
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnFailureLimitListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnFailureLimitListenerTest.php
new file mode 100644
index 0000000000000..9f12b0b258a75
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnFailureLimitListenerTest.php
@@ -0,0 +1,79 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\EventListener;
+
+use PHPUnit\Framework\TestCase;
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
+use Symfony\Component\Messenger\Event\WorkerRunningEvent;
+use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Worker;
+use Throwable;
+
+class StopWorkerOnFailureLimitListenerTest extends TestCase
+{
+ /**
+ * @dataProvider countProvider
+ */
+ public function testWorkerStopsWhenMaximumCountReached(int $max, bool $shouldStop): void
+ {
+ $worker = $this->createMock(Worker::class);
+ $worker->expects($shouldStop ? $this->atLeastOnce() : $this->never())->method('stop');
+
+ $failedEvent = $this->createFailedEvent();
+ $runningEvent = new WorkerRunningEvent($worker, false);
+
+ $failureLimitListener = new StopWorkerOnFailureLimitListener($max);
+ // simulate three messages (of which 2 failed)
+ $failureLimitListener->onMessageFailed($failedEvent);
+ $failureLimitListener->onWorkerRunning($runningEvent);
+
+ $failureLimitListener->onWorkerRunning($runningEvent);
+
+ $failureLimitListener->onMessageFailed($failedEvent);
+ $failureLimitListener->onWorkerRunning($runningEvent);
+ }
+
+ public function countProvider(): iterable
+ {
+ yield [1, true];
+ yield [2, true];
+ yield [3, false];
+ yield [4, false];
+ }
+
+ public function testWorkerLogsMaximumCountReachedWhenLoggerIsGiven(): void
+ {
+ $logger = $this->createMock(LoggerInterface::class);
+ $logger->expects($this->once())->method('info')
+ ->with(
+ $this->equalTo('Worker stopped due to limit of {count} failed message(s) is reached'),
+ $this->equalTo(['count' => 1])
+ );
+
+ $worker = $this->createMock(Worker::class);
+ $event = new WorkerRunningEvent($worker, false);
+
+ $failureLimitListener = new StopWorkerOnFailureLimitListener(1, $logger);
+ $failureLimitListener->onMessageFailed($this->createFailedEvent());
+ $failureLimitListener->onWorkerRunning($event);
+ }
+
+ private function createFailedEvent(): WorkerMessageFailedEvent
+ {
+ $envelope = new Envelope(new DummyMessage('hello'));
+
+ return new WorkerMessageFailedEvent($envelope, 'default', $this->createMock(Throwable::class));
+ }
+}