From 3ac93dbe30137f10cd18fa7e671984ddea979409 Mon Sep 17 00:00:00 2001 From: fbouchery Date: Tue, 5 Mar 2019 18:11:45 +0100 Subject: [PATCH 1/3] * Added AMQP options `consume_fatal` and `consume_requeue` to allow consumer to continue processing other messages without stopping * Added `UnrecoverableMessageExceptionInterface` and `RecoverableMessageExceptionInterface` into AMQP transport into exception to allow handler to nack messages with or without requeue * Standardized AMQP rejection with `nack` (`reject` is a `nack` without multiple rejections abilities which is not used in Messenger context). --- src/Symfony/Component/Messenger/CHANGELOG.md | 9 ++ .../Transport/AmqpExt/AmqpReceiverTest.php | 129 +++++++++++++++--- .../Transport/AmqpExt/ConnectionTest.php | 16 +++ .../Transport/AmqpExt/AmqpReceiver.php | 27 ++-- .../Transport/AmqpExt/Connection.php | 5 +- .../RecoverableMessageExceptionInterface.php | 27 ++++ .../RejectMessageExceptionInterface.php | 4 + ...UnrecoverableMessageExceptionInterface.php | 27 ++++ 8 files changed, 215 insertions(+), 29 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RecoverableMessageExceptionInterface.php create mode 100644 src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/UnrecoverableMessageExceptionInterface.php diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index b7f604f41c00c..e48f8471bf053 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -15,6 +15,15 @@ CHANGELOG * [BC BREAK] If listening to exceptions while using `AmqpSender` or `AmqpReceiver`, `\AMQPException` is no longer thrown in favor of `TransportException`. + + * Added `prefetch_count` AMQP option which set the channel prefetch count + + * Added `consume_fatal` (default `true`) and `consume_requeue` (default `false`) AMQP options which allow consumer to + continue processing messages or nack it with requeue. + + * Added `UnrecoverableMessageExceptionInterface` and `RecoverableMessageExceptionInterface` into AMQP transport + exception for nack with or without requeue, and then continue to consume the other messages. + 4.2.0 ----- diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php index 8e224e0653df7..7f87f8bd48f1c 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php @@ -16,7 +16,8 @@ use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; -use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface; +use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RecoverableMessageExceptionInterface; +use Symfony\Component\Messenger\Transport\AmqpExt\Exception\UnrecoverableMessageExceptionInterface; use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Serializer as SerializerComponent; use Symfony\Component\Serializer\Encoder\JsonEncoder; @@ -69,7 +70,7 @@ public function testItNonAcknowledgeTheMessageIfAnExceptionHappened() $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); $connection->method('get')->willReturn($envelope); - $connection->expects($this->once())->method('nack')->with($envelope); + $connection->expects($this->once())->method('nack')->with($envelope, AMQP_NOPARAM); $receiver = new AmqpReceiver($connection, $serializer); $receiver->receive(function () { @@ -78,9 +79,9 @@ public function testItNonAcknowledgeTheMessageIfAnExceptionHappened() } /** - * @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\WillNeverWorkException + * @expectedException \Symfony\Component\Messenger\Exception\TransportException */ - public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface() + public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() { $serializer = new Serializer( new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) @@ -94,18 +95,19 @@ public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionIn $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); $connection->method('get')->willReturn($envelope); - $connection->expects($this->once())->method('reject')->with($envelope); + + $connection->method('ack')->with($envelope)->willThrowException(new \AMQPException()); $receiver = new AmqpReceiver($connection, $serializer); - $receiver->receive(function () { - throw new WillNeverWorkException('Well...'); + $receiver->receive(function (?Envelope $envelope) use ($receiver) { + $receiver->stop(); }); } /** * @expectedException \Symfony\Component\Messenger\Exception\TransportException */ - public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() + public function testItThrowsATransportExceptionIfItCannotRejectMessage() { $serializer = new Serializer( new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) @@ -119,19 +121,18 @@ public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); $connection->method('get')->willReturn($envelope); - - $connection->method('ack')->with($envelope)->willThrowException(new \AMQPException()); + $connection->method('nack')->with($envelope, AMQP_NOPARAM)->willThrowException(new \AMQPException()); $receiver = new AmqpReceiver($connection, $serializer); - $receiver->receive(function (?Envelope $envelope) use ($receiver) { - $receiver->stop(); + $receiver->receive(function () { + throw new InterruptException('Well...'); }); } /** * @expectedException \Symfony\Component\Messenger\Exception\TransportException */ - public function testItThrowsATransportExceptionIfItCannotRejectMessage() + public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage() { $serializer = new Serializer( new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) @@ -145,18 +146,102 @@ public function testItThrowsATransportExceptionIfItCannotRejectMessage() $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); $connection->method('get')->willReturn($envelope); - $connection->method('reject')->with($envelope)->willThrowException(new \AMQPException()); + + $connection->method('nack')->with($envelope)->willThrowException(new \AMQPException()); $receiver = new AmqpReceiver($connection, $serializer); $receiver->receive(function () { - throw new WillNeverWorkException('Well...'); + throw new InterruptException('Well...'); + }); + } + + public function testItNackAndRequeueTheMessageIfTheExceptionIsARecoverableMessageExceptionInterface() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn([ + 'type' => DummyMessage::class, + ]); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($envelope); + $connection->expects($this->exactly(3))->method('nack')->with($envelope, AMQP_REQUEUE); + + $receiver = new AmqpReceiver($connection, $serializer); + $count = 1; + $receiver->receive(function () use (&$count, $receiver) { + if ($count++ >= 3) { + $receiver->stop(); + } + throw new RecoverableMessageException('Temporary...'); + }); + } + + public function testItNackWithoutRequeueTheMessageIfTheExceptionIsAnUnrecoverableMessageExceptionInterface() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn([ + 'type' => DummyMessage::class, + ]); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($envelope); + $connection->expects($this->once())->method('nack')->with($envelope, AMQP_NOPARAM); + $connection->expects($this->once())->method('ack')->with($envelope); + + $receiver = new AmqpReceiver($connection, $serializer); + $count = 0; + $receiver->receive(function () use (&$count, $receiver) { + $count++; + if ($count === 1) { + throw new UnrecoverableMessageException('Temporary...'); + } + $receiver->stop(); + }); + } + + public function testItNackWithoutRequeueTheMessageIfTheExceptionIsAThrowableExceptionAndContinue() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn([ + 'type' => DummyMessage::class, + ]); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('getConnectionCredentials')->willReturn(['consume_fatal' => false]); + $connection->method('get')->willReturn($envelope); + $connection->expects($this->once())->method('nack')->with($envelope, AMQP_NOPARAM); + $connection->expects($this->once())->method('ack')->with($envelope); + + $receiver = new AmqpReceiver($connection, $serializer); + $count = 0; + $receiver->receive(function () use (&$count, $receiver) { + $count++; + if ($count === 1) { + throw new InterruptException('Temporary...'); + } + $receiver->stop(); }); } /** - * @expectedException \Symfony\Component\Messenger\Exception\TransportException + * @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\InterruptException */ - public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage() + public function testItNackAndRequeueTheMessageIfTheExceptionIsAThrowableExceptionAndGenerateFatal() { $serializer = new Serializer( new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) @@ -169,9 +254,9 @@ public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage() ]); $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('getConnectionCredentials')->willReturn(['consume_requeue' => true]); $connection->method('get')->willReturn($envelope); - - $connection->method('nack')->with($envelope)->willThrowException(new \AMQPException()); + $connection->expects($this->once())->method('nack')->with($envelope, AMQP_REQUEUE); $receiver = new AmqpReceiver($connection, $serializer); $receiver->receive(function () { @@ -184,6 +269,10 @@ class InterruptException extends \Exception { } -class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface +class RecoverableMessageException extends \Exception implements RecoverableMessageExceptionInterface +{ +} + +class UnrecoverableMessageException extends \Exception implements UnrecoverableMessageExceptionInterface { } diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index b8809368e5b3d..421f6294d9456 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -251,6 +251,22 @@ public function testPublishWithQueueOptions() $connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], true, $factory); $connection->publish('body', $headers); } + + public function testSetChannelPrefetchWhenSetup() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->createMock(\AMQPConnection::class), + $amqpChannel = $this->createMock(\AMQPChannel::class), + $amqpQueue = $this->createMock(\AMQPQueue::class), + $amqpExchange = $this->createMock(\AMQPExchange::class) + ); + + $amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2); + $connection = Connection::fromDsn('amqp://localhost/%2f/messages?prefetch_count=2', [], true, $factory); + $connection->setup(); + $connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['prefetch_count' => 2], true, $factory); + $connection->setup(); + } } class TestAmqpFactory extends AmqpFactory diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php index cb7a4db013fa9..8ca4a7dc5dfcb 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php @@ -12,7 +12,8 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; use Symfony\Component\Messenger\Exception\TransportException; -use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface; +use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RecoverableMessageExceptionInterface; +use Symfony\Component\Messenger\Transport\AmqpExt\Exception\UnrecoverableMessageExceptionInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -61,24 +62,34 @@ public function receive(callable $handler): void ])); $this->connection->ack($AMQPEnvelope); - } catch (RejectMessageExceptionInterface $e) { + } catch (RecoverableMessageExceptionInterface $e) { try { - $this->connection->reject($AMQPEnvelope); + $this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + } catch (UnrecoverableMessageExceptionInterface $e) { + try { + $this->connection->nack($AMQPEnvelope, AMQP_NOPARAM); } catch (\AMQPException $exception) { throw new TransportException($exception->getMessage(), 0, $exception); } - - throw $e; } catch (\AMQPException $e) { throw new TransportException($e->getMessage(), 0, $e); } catch (\Throwable $e) { + $connectionCredentials = $this->connection->getConnectionCredentials() + [ + 'consume_fatal' => true, + 'consume_requeue' => false + ]; + $flag = $connectionCredentials['consume_requeue'] ? AMQP_REQUEUE : AMQP_NOPARAM; try { - $this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); + $this->connection->nack($AMQPEnvelope, $flag); } catch (\AMQPException $exception) { throw new TransportException($exception->getMessage(), 0, $exception); } - - throw $e; + if ($connectionCredentials['consume_fatal']) { + throw $e; + } } finally { if (\function_exists('pcntl_signal_dispatch')) { pcntl_signal_dispatch(); diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index e576916b8d041..a0d771e6198dd 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -185,7 +185,7 @@ public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool public function setup(): void { - if (!$this->channel()->isConnected()) { + if (null === $this->amqpChannel || false === $this->amqpChannel->isConnected()) { $this->clear(); } @@ -206,6 +206,9 @@ public function channel(): \AMQPChannel } $this->amqpChannel = $this->amqpFactory->createChannel($connection); + if (isset($this->connectionCredentials['prefetch_count'])) { + $this->amqpChannel->setPrefetchCount($this->connectionCredentials['prefetch_count']); + } } return $this->amqpChannel; diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RecoverableMessageExceptionInterface.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RecoverableMessageExceptionInterface.php new file mode 100644 index 0000000000000..8a62a0a854d3d --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RecoverableMessageExceptionInterface.php @@ -0,0 +1,27 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\AmqpExt\Exception; + +/** + * If something goes wrong while consuming and handling a message from the AMQP broker, there are two choices: rejecting + * or re-queuing the message. + * + * If the exception that is thrown by the bus while dispatching the message implements this interface, the message will + * be rejected. Otherwise, it will be re-queued. + * + * @author Samuel Roze + * + * @experimental in 4.2 + */ +interface RecoverableMessageExceptionInterface extends \Throwable +{ +} diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RejectMessageExceptionInterface.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RejectMessageExceptionInterface.php index 9b820a7d8fbf8..e3099360d9546 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RejectMessageExceptionInterface.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RejectMessageExceptionInterface.php @@ -20,6 +20,10 @@ * * @author Samuel Roze * + * @deprecated use RecoverableMessageExceptionInterface or UnrecoverableMessageExceptionInterface instead. Now, it is + * handle as a `\Throwable`: `nack` instead of `reject` + * + * * @experimental in 4.2 */ interface RejectMessageExceptionInterface extends \Throwable diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/UnrecoverableMessageExceptionInterface.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/UnrecoverableMessageExceptionInterface.php new file mode 100644 index 0000000000000..dd07341e7fcf1 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/UnrecoverableMessageExceptionInterface.php @@ -0,0 +1,27 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\AmqpExt\Exception; + +/** + * If something goes wrong while consuming and handling a message from the AMQP broker, there are two choices: rejecting + * or re-queuing the message. + * + * If the exception that is thrown by the bus while dispatching the message implements this interface, the message will + * be rejected. Otherwise, it will be re-queued. + * + * @author Samuel Roze + * + * @experimental in 4.2 + */ +interface UnrecoverableMessageExceptionInterface extends \Throwable +{ +} From 72bd4a568849a353f1102a05960d4f1f866e6326 Mon Sep 17 00:00:00 2001 From: fbouchery Date: Tue, 5 Mar 2019 21:40:11 +0100 Subject: [PATCH 2/3] Apply CS --- .../Tests/Transport/AmqpExt/AmqpReceiverTest.php | 8 ++++---- .../Messenger/Transport/AmqpExt/AmqpReceiver.php | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php index 7f87f8bd48f1c..e6e84dfbbd3e2 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php @@ -201,8 +201,8 @@ public function testItNackWithoutRequeueTheMessageIfTheExceptionIsAnUnrecoverabl $receiver = new AmqpReceiver($connection, $serializer); $count = 0; $receiver->receive(function () use (&$count, $receiver) { - $count++; - if ($count === 1) { + ++$count; + if (1 === $count) { throw new UnrecoverableMessageException('Temporary...'); } $receiver->stop(); @@ -230,8 +230,8 @@ public function testItNackWithoutRequeueTheMessageIfTheExceptionIsAThrowableExce $receiver = new AmqpReceiver($connection, $serializer); $count = 0; $receiver->receive(function () use (&$count, $receiver) { - $count++; - if ($count === 1) { + ++$count; + if (1 === $count) { throw new InterruptException('Temporary...'); } $receiver->stop(); diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php index 8ca4a7dc5dfcb..68eff9d97534a 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php @@ -79,7 +79,7 @@ public function receive(callable $handler): void } catch (\Throwable $e) { $connectionCredentials = $this->connection->getConnectionCredentials() + [ 'consume_fatal' => true, - 'consume_requeue' => false + 'consume_requeue' => false, ]; $flag = $connectionCredentials['consume_requeue'] ? AMQP_REQUEUE : AMQP_NOPARAM; try { From 0337546f12257f5cd2ebb217351cd6a99f87bbec Mon Sep 17 00:00:00 2001 From: fbouchery Date: Thu, 7 Mar 2019 09:50:44 +0100 Subject: [PATCH 3/3] Update comment for AMQP recoverable exception --- .../RecoverableMessageExceptionInterface.php | 11 +++++------ .../UnrecoverableMessageExceptionInterface.php | 11 +++++------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RecoverableMessageExceptionInterface.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RecoverableMessageExceptionInterface.php index 8a62a0a854d3d..a3ed4c2410001 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RecoverableMessageExceptionInterface.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RecoverableMessageExceptionInterface.php @@ -12,15 +12,14 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt\Exception; /** - * If something goes wrong while consuming and handling a message from the AMQP broker, there are two choices: rejecting - * or re-queuing the message. + * If something goes wrong while consuming and handling a message from the AMQP broker, if the exception that is thrown + * by the bus while dispatching the message implements this interface, the message will be nack and re-queued. * - * If the exception that is thrown by the bus while dispatching the message implements this interface, the message will - * be rejected. Otherwise, it will be re-queued. + * Bus continue handling messages. * - * @author Samuel Roze + * @author Frederic Bouchery * - * @experimental in 4.2 + * @experimental in 4.3 */ interface RecoverableMessageExceptionInterface extends \Throwable { diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/UnrecoverableMessageExceptionInterface.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/UnrecoverableMessageExceptionInterface.php index dd07341e7fcf1..9d1695c27acd6 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/UnrecoverableMessageExceptionInterface.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/UnrecoverableMessageExceptionInterface.php @@ -12,15 +12,14 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt\Exception; /** - * If something goes wrong while consuming and handling a message from the AMQP broker, there are two choices: rejecting - * or re-queuing the message. + * If something goes wrong while consuming and handling a message from the AMQP broker, if the exception that is thrown + * by the bus while dispatching the message implements this interface, the message will be nack and not re-queued. * - * If the exception that is thrown by the bus while dispatching the message implements this interface, the message will - * be rejected. Otherwise, it will be re-queued. + * Bus continue handling messages. * - * @author Samuel Roze + * @author Frederic Bouchery * - * @experimental in 4.2 + * @experimental in 4.3 */ interface UnrecoverableMessageExceptionInterface extends \Throwable {