-
-
Notifications
You must be signed in to change notification settings - Fork 9.7k
Improvement of message consumption for messenger with AMQP #30454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
consume_fatal
and consume_requeue
to allow c…
…onsumer to continue processing other messages without stopping * Added `UnrecoverableMessageExceptionInterface` and `RecoverableMessageExceptionInterface` into AMQP transport into exception to allow handler to nack messages with or without requeue * Standardized AMQP rejection with `nack` (`reject` is a `nack` without multiple rejections abilities which is not used in Messenger context).
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,8 @@ | |
namespace Symfony\Component\Messenger\Transport\AmqpExt; | ||
|
||
use Symfony\Component\Messenger\Exception\TransportException; | ||
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface; | ||
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RecoverableMessageExceptionInterface; | ||
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\UnrecoverableMessageExceptionInterface; | ||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; | ||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; | ||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; | ||
|
@@ -61,24 +62,34 @@ public function receive(callable $handler): void | |
])); | ||
|
||
$this->connection->ack($AMQPEnvelope); | ||
} catch (RejectMessageExceptionInterface $e) { | ||
} catch (RecoverableMessageExceptionInterface $e) { | ||
try { | ||
$this->connection->reject($AMQPEnvelope); | ||
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); | ||
} catch (\AMQPException $exception) { | ||
throw new TransportException($exception->getMessage(), 0, $exception); | ||
} | ||
} catch (UnrecoverableMessageExceptionInterface $e) { | ||
try { | ||
$this->connection->nack($AMQPEnvelope, AMQP_NOPARAM); | ||
} catch (\AMQPException $exception) { | ||
throw new TransportException($exception->getMessage(), 0, $exception); | ||
} | ||
|
||
throw $e; | ||
} catch (\AMQPException $e) { | ||
throw new TransportException($e->getMessage(), 0, $e); | ||
} catch (\Throwable $e) { | ||
$connectionCredentials = $this->connection->getConnectionCredentials() + [ | ||
'consume_fatal' => true, | ||
'consume_requeue' => false | ||
]; | ||
$flag = $connectionCredentials['consume_requeue'] ? AMQP_REQUEUE : AMQP_NOPARAM; | ||
try { | ||
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); | ||
$this->connection->nack($AMQPEnvelope, $flag); | ||
} catch (\AMQPException $exception) { | ||
throw new TransportException($exception->getMessage(), 0, $exception); | ||
} | ||
|
||
throw $e; | ||
if ($connectionCredentials['consume_fatal']) { | ||
throw $e; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm very interested in this part, and these two new options. I'd like to make sure we're thinking about a solution that could be generically applied to other transports and support a "retry" ability. Here's the behavior I'd like to see, which is built off of your ideas:
Morever, I think one of your goals was to standardize a few things (even though we don't have multiple transports yet), which I love! Which is why I'm wondering if we should move some of this logic to the WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, could be a goog idea to move this into the worker ! +1 :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For your point 4, I wanted to keep the compatibility. As you wish :) |
||
} finally { | ||
if (\function_exists('pcntl_signal_dispatch')) { | ||
pcntl_signal_dispatch(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -185,7 +185,7 @@ public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool | |
|
||
public function setup(): void | ||
{ | ||
if (!$this->channel()->isConnected()) { | ||
if (null === $this->amqpChannel || false === $this->amqpChannel->isConnected()) { | ||
$this->clear(); | ||
} | ||
|
||
|
@@ -206,6 +206,9 @@ public function channel(): \AMQPChannel | |
} | ||
|
||
$this->amqpChannel = $this->amqpFactory->createChannel($connection); | ||
if (isset($this->connectionCredentials['prefetch_count'])) { | ||
$this->amqpChannel->setPrefetchCount($this->connectionCredentials['prefetch_count']); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an extra feature unrelated to the main purpose of this PR, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's related to AMQP rejection process. By default, RabbitMQ uses a prefetch of 3. If you requeue a message, it comes behind the prefetched messages but not in the queue itself. If consumption speed is not important, you can choose a prefetch of 1, then the messages go back to the queue. |
||
} | ||
|
||
return $this->amqpChannel; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Transport\AmqpExt\Exception; | ||
|
||
/** | ||
* If something goes wrong while consuming and handling a message from the AMQP broker, there are two choices: rejecting | ||
* or re-queuing the message. | ||
* | ||
* If the exception that is thrown by the bus while dispatching the message implements this interface, the message will | ||
* be rejected. Otherwise, it will be re-queued. | ||
* | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
* | ||
* @experimental in 4.2 | ||
*/ | ||
interface RecoverableMessageExceptionInterface extends \Throwable | ||
{ | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Transport\AmqpExt\Exception; | ||
|
||
/** | ||
* If something goes wrong while consuming and handling a message from the AMQP broker, there are two choices: rejecting | ||
* or re-queuing the message. | ||
* | ||
* If the exception that is thrown by the bus while dispatching the message implements this interface, the message will | ||
* be rejected. Otherwise, it will be re-queued. | ||
* | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
* | ||
* @experimental in 4.2 | ||
*/ | ||
interface UnrecoverableMessageExceptionInterface extends \Throwable | ||
{ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wouldn't we
reject
here? 🤔