diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
index 14117ee8e40a4..2b2c086346dfd 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
@@ -56,6 +56,13 @@
+
+
+
+
+
+
+
diff --git a/src/Symfony/Component/Messenger/Middleware/StopWorkerOnExceptionMiddleware.php b/src/Symfony/Component/Messenger/Middleware/StopWorkerOnExceptionMiddleware.php
new file mode 100644
index 0000000000000..e1787fd9e39a6
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Middleware/StopWorkerOnExceptionMiddleware.php
@@ -0,0 +1,81 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Middleware;
+
+use Psr\Cache\CacheItemPoolInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
+use Symfony\Component\Messenger\Exception\HandlerFailedException;
+
+/**
+ * Stop all workers when an exceptions is thrown.
+ *
+ * @author Tobias Nyholm
+ */
+class StopWorkerOnExceptionMiddleware
+{
+ private $exceptions;
+ private $restartSignalCachePool;
+
+ /**
+ *
+ * @param array $exceptions of fully qualified class names of exceptions
+ */
+ public function __construct(array $exceptions)
+ {
+ $this->exceptions = array_values($exceptions);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function handle(Envelope $envelope, StackInterface $stack): Envelope
+ {
+ try {
+ return $stack->next()->handle($envelope, $stack);
+ } catch (HandlerFailedException $e) {
+ if (count($this->exceptions) === 0) {
+ throw $e;
+ }
+
+ if (count($this->exceptions) === 1 && $this->exceptions[0] === '*') {
+ $this->stopWorkers();
+ throw $e;
+ }
+
+ foreach ($e->getNestedExceptions() as $exception) {
+ if (in_array(get_class($exception), $this->exceptions)) {
+ $this->stopWorkers();
+ break;
+ }
+ }
+
+ throw $e;
+ }
+ }
+
+ private function stopWorkers(): void
+ {
+ $cacheItem = $this->restartSignalCachePool->getItem(StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY);
+ $cacheItem->set(microtime(true));
+ $this->restartSignalCachePool->save($cacheItem);
+ }
+
+ public function setRestartSignalCachePool(CacheItemPoolInterface $restartSignalCachePool): void
+ {
+ if ($this->restartSignalCachePool !== null) {
+ throw new \RuntimeException('Cannot update restartSignalCachePool dependency');
+ }
+
+ $this->restartSignalCachePool = $restartSignalCachePool;
+ }
+}