8000 Improvement of message consumption for messenger with AMQP by f2r · Pull Request #30454 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

Improvement of message consumption for messenger with AMQP #30454

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ CHANGELOG

* [BC BREAK] If listening to exceptions while using `AmqpSender` or `AmqpReceiver`, `\AMQPException` is
no longer thrown in favor of `TransportException`.

* Added `prefetch_count` AMQP option which set the channel prefetch count

* Added `consume_fatal` (default `true`) and `consume_requeue` (default `false`) AMQP options which allow consumer to
continue processing messages or nack it with requeue.

* Added `UnrecoverableMessageExceptionInterface` and `RecoverableMessageExceptionInterface` into AMQP transport
exception for nack with or without requeue, and then continue to consume the other messages.


4.2.0
-----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RecoverableMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\UnrecoverableMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
Expand Down Expand Up @@ -69,7 +70,7 @@ public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);

$connection->expects($this->once())->method('nack')->with($envelope);
$connection->expects($this->once())->method('nack')->with($envelope, AMQP_NOPARAM);

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function () {
Expand All @@ -78,9 +79,9 @@ public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
}

/**
* @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\WillNeverWorkException
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface()
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
Expand All @@ -94,18 +95,19 @@ public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionIn

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->expects($this->once())->method('reject')->with($envelope);

$connection->method('ack')->with($envelope)->willThrowException(new \AMQPException());

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function () {
throw new WillNeverWorkException('Well...');
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$receiver->stop();
});
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
Expand All @@ -119,19 +121,18 @@ public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);

$connection->method('ack')->with($envelope)->willThrowException(new \AMQPException());
$connection->method('nack')->with($envelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$receiver->stop();
$receiver->receive(function () {
throw new InterruptException('Well...');
});
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
Expand All @@ -145,18 +146,102 @@ public function testItThrowsATransportExceptionIfItCannotRejectMessage()

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->method('reject')->with($envelope)->willThrowException(new \AMQPException());

$connection->method('nack')->with($envelope)->willThrowException(new \AMQPException());

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function () {
throw new WillNeverWorkException('Well...');
throw new InterruptException('Well...');
});
}

public function testItNackAndRequeueTheMessageIfTheExceptionIsARecoverableMessageExceptionInterface()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);

$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn([
'type' => DummyMessage::class,
]);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->expects($this->exactly(3))->method('nack')->with($envelope, AMQP_REQUEUE);

$receiver = new AmqpReceiver($connection, $serializer);
$count = 1;
$receiver->receive(function () use (&$count, $receiver) {
if ($count++ >= 3) {
$receiver->stop();
}
throw new RecoverableMessageException('Temporary...');
});
}

public function testItNackWithoutRequeueTheMessageIfTheExceptionIsAnUnrecoverableMessageExceptionInterface()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);

$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn([
'type' => DummyMessage::class,
]);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->expects($this->once())->method('nack')->with($envelope, AMQP_NOPARAM);
$connection->expects($this->once())->method('ack')->with($envelope);

$receiver = new AmqpReceiver($connection, $serializer);
$count = 0;
$receiver->receive(function () use (&$count, $receiver) {
++$count;
if (1 === $count) {
throw new UnrecoverableMessageException('Temporary...');
}
$receiver->stop();
});
}

public function testItNackWithoutRequeueTheMessageIfTheExceptionIsAThrowableExceptionAndContinue()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);

$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn([
'type' => DummyMessage::class,
]);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('getConnectionCredentials')->willReturn(['consume_fatal' => false]);
$connection->method('get')->willReturn($envelope);
$connection->expects($this->once())->method('nack')->with($envelope, AMQP_NOPARAM);
$connection->expects($this->once())->method('ack')->with($envelope);

$receiver = new AmqpReceiver($connection, $serializer);
$count = 0;
$receiver->receive(function () use (&$count, $receiver) {
++$count;
if (1 === $count) {
throw new InterruptException('Temporary...');
}
$receiver->stop();
});
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
* @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\InterruptException
*/
public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage()
public function testItNackAndRequeueTheMessageIfTheExceptionIsAThrowableExceptionAndGenerateFatal()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
Expand All @@ -169,9 +254,9 @@ public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage()
]);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('getConnectionCredentials')->willReturn(['consume_requeue' => true]);
$connection->method('get')->willReturn($envelope);

$connection->method('nack')->with($envelope)->willThrowException(new \AMQPException());
$connection->expects($this->once())->method('nack')->with($envelope, AMQP_REQUEUE);

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function () {
Expand All @@ -184,6 +269,10 @@ class InterruptException extends \Exception
{
}

class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface
class RecoverableMessageException extends \Exception implements RecoverableMessageExceptionInterface
{
}

class UnrecoverableMessageException extends \Exception implements UnrecoverableMessageExceptionInterface
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,22 @@ public function testPublishWithQueueOptions()
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], true, $factory);
$connection->publish('body', $headers);
}

public function testSetChannelPrefetchWhenSetup()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?prefetch_count=2', [], true, $factory);
$connection->setup();
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['prefetch_count' => 2], true, $factory);
$connection->setup();
}
}

class TestAmqpFactory extends AmqpFactory
Expand Down
27 changes: 19 additions & 8 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RecoverableMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\UnrecoverableMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand Down Expand Up @@ -61,24 +62,34 @@ public function receive(callable $handler): void
]));

$this->connection->ack($AMQPEnvelope);
} catch (RejectMessageExceptionInterface $e) {
} catch (RecoverableMessageExceptionInterface $e) {
try {
$this->connection->reject($AMQPEnvelope);
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
} catch (UnrecoverableMessageExceptionInterface $e) {
try {
$this->connection->nack($AMQPEnvelope, AMQP_NOPARAM);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wouldn't we reject here? 🤔

} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}

throw $e;
} catch (\AMQPException $e) {
throw new TransportException($e->getMessage(), 0, $e);
} catch (\Throwable $e) {
$connectionCredentials = $this->connection->getConnectionCredentials() + [
'consume_fatal' => true,
'consume_requeue' => false,
];
$flag = $connectionCredentials['consume_requeue'] ? AMQP_REQUEUE : AMQP_NOPARAM;
try {
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
$this->connection->nack($AMQPEnvelope, $flag);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}

throw $e;
if ($connectionCredentials['consume_fatal']) {
throw $e;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very interested in this part, and these two new options. I'd like to make sure we're thinking about a solution that could be generically applied to other transports and support a "retry" ability. Here's the behavior I'd like to see, which is built off of your ideas:

  1. If RecoverableMessageExceptionInterface is thrown, we re-queue
  2. If UnrecoverableMessageExceptionInterface is thrown, we discard
  3. If there is an AMQPException at any point, probably do nothing - because the message was never ack'ed, so it should still be on the queue? But, it should not kill the worker process (but that's another PR)
  4. If any other Throwable occurs, I think we should requeue always and never throw the exception. However, a few caveats:
  • it should only requeue the message a certain number of times before discarding it (that's the retry stuff)
  • once it hits the retry max, the exception is still not thrown, but perhaps an event is dispatched to handle this.

Morever, I think one of your goals was to standardize a few things (even though we don't have multiple transports yet), which I love! Which is why I'm wondering if we should move some of this logic to the Worker class - #30557 is related. I'm proposing that this method should not handle any exceptions or requeue - it should just handle the message and try to ack it on success. We would move the exception-handling logic into Worker (so we can standardize behavior) and add a few new methods to ReceiverInterface, like requeu() or discard(). Worker would call this under the appropriate situations.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, could be a goog idea to move this into the worker ! +1 :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For your point 4, I wanted to keep the compatibility. As you wish :)

} finally {
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool

public function setup(): void
{
if (!$this->channel()->isConnected()) {
if (null === $this->amqpChannel || false === $this->amqpChannel->isConnected()) {
$this->clear();
}

Expand All @@ -206,6 +206,9 @@ public function channel(): \AMQPChannel
}

$this->amqpChannel = $this->amqpFactory->createChannel($connection);
if (isset($this->connectionCredentials['prefetch_count'])) {
$this->amqpChannel->setPrefetchCount($this->connectionCredentials['prefetch_count']);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an extra feature unrelated to the main purpose of this PR, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's related to AMQP rejection process. By default, RabbitMQ uses a prefetch of 3. If you requeue a message, it comes behind the prefetched messages but not in the queue itself. If consumption speed is not important, you can choose a prefetch of 1, then the messages go back to the queue.

}

return $this->amqpChannel;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\AmqpExt\Exception;

/**
* If something goes wrong while consuming and handling a message from the AMQP broker, if the exception that is thrown
* by the bus while dispatching the message implements this interface, the message will be nack and re-queued.
*
* Bus continue handling messages.
*
* @author Frederic Bouchery <frederic@bouchery.fr>
*
* @experimental in 4.3
*/
interface RecoverableMessageExceptionInterface extends \Throwable
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
*
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @deprecated use RecoverableMessageExceptionInterface or UnrecoverableMessageExceptionInterface instead. Now, it is
* handle as a `\Throwable`: `nack` instead of `reject`
*
*
* @experimental in 4.2
*/
interface RejectMessageExceptionInterface extends \Throwable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\AmqpExt\Exception;

/**
* If something goes wrong while consuming and handling a message from the AMQP broker, if the exception that is thrown
* by the bus while dispatching the message implements this interface, the message will be nack and not re-queued.
*
* Bus continue handling messages.
*
* @author Frederic Bouchery <frederic@bouchery.fr>
*
* @experimental in 4.3
*/
interface UnrecoverableMessageExceptionInterface extends \Throwable
{
}
0