diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/cache.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/cache.xml
index a3a5c812ef5b1..4fd8b70fac7aa 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/cache.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/cache.xml
@@ -36,6 +36,10 @@
+
+
+
+
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
index 6ae1cb8cc306d..f753b214192da 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
@@ -83,6 +83,9 @@
+
+
+
@@ -101,6 +104,11 @@
+
+
+
+
+
diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md
index a2f94de63d406..b2bc787558b57 100644
--- a/src/Symfony/Component/Messenger/CHANGELOG.md
+++ b/src/Symfony/Component/Messenger/CHANGELOG.md
@@ -4,6 +4,8 @@ CHANGELOG
4.3.0
-----
+ * Added new `messenger:stop-workers` command that sends a signal
+ to stop all `messenger:consume` workers.
* [BC BREAK] The `TransportFactoryInterface::createTransport()` signature
changed: a required 3rd `SerializerInterface` argument was added.
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
index bd3a7a177c6c3..dea3a204daf2b 100644
--- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
@@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Command;
+use Psr\Cache\CacheItemPoolInterface;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
@@ -25,6 +26,7 @@
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
+use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
@@ -43,6 +45,8 @@ class ConsumeMessagesCommand extends Command
private $receiverNames;
private $retryStrategyLocator;
private $eventDispatcher;
+ /** @var CacheItemPoolInterface|null */
+ private $restartSignalCachePool;
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
{
@@ -62,6 +66,11 @@ public function __construct(ContainerInterface $busLocator, ContainerInterface $
parent::__construct();
}
+ public function setCachePoolForRestartSignal(CacheItemPoolInterface $restartSignalCachePool)
+ {
+ $this->restartSignalCachePool = $restartSignalCachePool;
+ }
+
/**
* {@inheritdoc}
*/
@@ -190,6 +199,11 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
}
+ if (null !== $this->restartSignalCachePool) {
+ $stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
+ $worker = new StopWhenRestartSignalIsReceived($worker, $this->restartSignalCachePool, $this->logger);
+ }
+
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
diff --git a/src/Symfony/Component/Messenger/Command/StopWorkersCommand.php b/src/Symfony/Component/Messenger/Command/StopWorkersCommand.php
new file mode 100644
index 0000000000000..afb2ce0dd654b
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Command/StopWorkersCommand.php
@@ -0,0 +1,73 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Command;
+
+use Psr\Cache\CacheItemPoolInterface;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Console\Style\SymfonyStyle;
+use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
+
+/**
+ * @author Ryan Weaver
+ *
+ * @experimental in 4.3
+ */
+class StopWorkersCommand extends Command
+{
+ protected static $defaultName = 'messenger:stop-workers';
+
+ private $restartSignalCachePool;
+
+ public function __construct(CacheItemPoolInterface $restartSignalCachePool)
+ {
+ $this->restartSignalCachePool = $restartSignalCachePool;
+
+ parent::__construct();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function configure(): void
+ {
+ $this
+ ->setDefinition([])
+ ->setDescription('Stops workers after their current message')
+ ->setHelp(<<<'EOF'
+The %command.name% command sends a signal to stop any messenger:consume processes that are running.
+
+ php %command.full_name%
+
+Each worker command will finish the message they are currently processing
+and then exit. Worker commands are *not* automatically restarted: that
+should be handled by something like supervisord.
+EOF
+ )
+ ;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function execute(InputInterface $input, OutputInterface $output): void
+ {
+ $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
+
+ $cacheItem = $this->restartSignalCachePool->getItem(StopWhenRestartSignalIsReceived::RESTART_REQUESTED_TIMESTAMP_KEY);
+ $cacheItem->set(time());
+ $this->restartSignalCachePool->save($cacheItem);
+
+ $io->success('Signal successfully sent to stop any running workers.');
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Command/StopWorkersCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/StopWorkersCommandTest.php
new file mode 100644
index 0000000000000..fd5ddae244b70
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Command/StopWorkersCommandTest.php
@@ -0,0 +1,35 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Command;
+
+use PHPUnit\Framework\TestCase;
+use Psr\Cache\CacheItemInterface;
+use Psr\Cache\CacheItemPoolInterface;
+use Symfony\Component\Console\Tester\CommandTester;
+use Symfony\Component\Messenger\Command\StopWorkersCommand;
+
+class StopWorkersCommandTest extends TestCase
+{
+ public function testItSetsCacheItem()
+ {
+ $cachePool = $this->createMock(CacheItemPoolInterface::class);
+ $cacheItem = $this->createMock(CacheItemInterface::class);
+ $cacheItem->expects($this->once())->method('set');
+ $cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
+ $cachePool->expects($this->once())->method('save')->with($cacheItem);
+
+ $command = new StopWorkersCommand($cachePool);
+
+ $tester = new CommandTester($command);
+ $tester->execute([]);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenRestartSignalIsReceivedTest.php b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenRestartSignalIsReceivedTest.php
new file mode 100644
index 0000000000000..a5a4937fd0351
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenRestartSignalIsReceivedTest.php
@@ -0,0 +1,71 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Worker;
+
+use PHPUnit\Framework\TestCase;
+use Psr\Cache\CacheItemInterface;
+use Psr\Cache\CacheItemPoolInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
+use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
+
+/**
+ * @group time-sensitive
+ */
+class StopWhenRestartSignalIsReceivedTest extends TestCase
+{
+ /**
+ * @dataProvider restartTimeProvider
+ */
+ public function testWorkerStopsWhenMemoryLimitExceeded(?int $lastRestartTimeOffset, bool $shouldStop)
+ {
+ $decoratedWorker = new DummyWorker([
+ new Envelope(new \stdClass()),
+ ]);
+
+ $cachePool = $this->createMock(CacheItemPoolInterface::class);
+ $cacheItem = $this->createMock(CacheItemInterface::class);
+ $cacheItem->expects($this->once())->method('isHIt')->willReturn(true);
+ $cacheItem->expects($this->once())->method('get')->willReturn(null === $lastRestartTimeOffset ? null : time() + $lastRestartTimeOffset);
+ $cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
+
+ $stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
+ $stopOnSignalWorker->run();
+
+ $this->assertSame($shouldStop, $decoratedWorker->isStopped());
+ }
+
+ public function restartTimeProvider()
+ {
+ yield [null, false]; // no cached restart time, do not restart
+ yield [+10, true]; // 10 seconds after starting, a restart was requested
+ yield [-10, false]; // a restart was requested, but 10 seconds before we started
+ }
+
+ public function testWorkerDoesNotStopIfRestartNotInCache()
+ {
+ $decoratedWorker = new DummyWorker([
+ new Envelope(new \stdClass()),
+ ]);
+
+ $cachePool = $this->createMock(CacheItemPoolInterface::class);
+ $cacheItem = $this->createMock(CacheItemInterface::class);
+ $cacheItem->expects($this->once())->method('isHIt')->willReturn(false);
+ $cacheItem->expects($this->never())->method('get');
+ $cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
+
+ $stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
+ $stopOnSignalWorker->run();
+
+ $this->assertFalse($decoratedWorker->isStopped());
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Worker/StopWhenRestartSignalIsReceived.php b/src/Symfony/Component/Messenger/Worker/StopWhenRestartSignalIsReceived.php
new file mode 100644
index 0000000000000..63f6ea04d67bb
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Worker/StopWhenRestartSignalIsReceived.php
@@ -0,0 +1,72 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Worker;
+
+use Psr\Cache\CacheItemPoolInterface;
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\WorkerInterface;
+
+/**
+ * @author Ryan Weaver
+ *
+ * @experimental in 4.3
+ */
+class StopWhenRestartSignalIsReceived implements WorkerInterface
+{
+ public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers.restart_requested_timestamp';
+
+ private $decoratedWorker;
+ private $cachePool;
+ private $logger;
+
+ public function __construct(WorkerInterface $decoratedWorker, CacheItemPoolInterface $cachePool, LoggerInterface $logger = null)
+ {
+ $this->decoratedWorker = $decoratedWorker;
+ $this->cachePool = $cachePool;
+ $this->logger = $logger;
+ }
+
+ public function run(array $options = [], callable $onHandledCallback = null): void
+ {
+ $workerStartedTimestamp = time();
+
+ $this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $workerStartedTimestamp) {
+ if (null !== $onHandledCallback) {
+ $onHandledCallback($envelope);
+ }
+
+ if ($this->shouldRestart($workerStartedTimestamp)) {
+ $this->stop();
+ if (null !== $this->logger) {
+ $this->logger->info('Worker stopped because a restart was requested.');
+ }
+ }
+ });
+ }
+
+ public function stop(): void
+ {
+ $this->decoratedWorker->stop();
+ }
+
+ private function shouldRestart(int $workerStartedAt)
+ {
+ $cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY);
+ if (!$cacheItem->isHit()) {
+ // no restart has ever been scheduled
+ return false;
+ }
+
+ return $workerStartedAt < $cacheItem->get();
+ }
+}