-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
Improvement of message consumption for messenger with AMQP #30454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,8 @@ | |
namespace Symfony\Component\Messenger\Transport\AmqpExt; | ||
|
||
use Symfony\Component\Messenger\Exception\TransportException; | ||
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface; | ||
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RecoverableMessageExceptionInterface; | ||
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\UnrecoverableMessageExceptionInterface; | ||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; | ||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; | ||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; | ||
|
@@ -61,24 +62,34 @@ public function receive(callable $handler): void | |
])); | ||
|
||
$this->connection->ack($AMQPEnvelope); | ||
} catch (RejectMessageExceptionInterface $e) { | ||
} catch (RecoverableMessageExceptionInterface $e) { | ||
try { | ||
$this->connection->reject($AMQPEnvelope); | ||
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); | ||
} catch (\AMQPException $exception) { | ||
throw new TransportException($exception->getMessage(), 0, $exception); | ||
} | ||
} catch (UnrecoverableMessageExceptionInterface $e) { | ||
try { | ||
$this->connection->nack($AMQPEnvelope, AMQP_NOPARAM); | ||
} catch (\AMQPException $exception) { | ||
throw new TransportException($exception->getMessage(), 0, $exception); | ||
} | ||
|
||
throw $e; | ||
} catch (\AMQPException $e) { | ||
throw new TransportException($e->getMessage(), 0, $e); | ||
} catch (\Throwable $e) { | ||
$connectionCredentials = $this->connection->getConnectionCredentials() + [ | ||
'consume_fatal' => true, | ||
'consume_requeue' => false, | ||
]; | ||
$flag = $connectionCredentials['consume_requeue'] ? AMQP_REQUEUE : AMQP_NOPARAM; | ||
try { | ||
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); | ||
$this->connection->nack($AMQPEnvelope, $flag); | ||
} catch (\AMQPException $exception) { | ||
throw new TransportException($exception->getMessage(), 0, $exception); | ||
} | ||
|
||
throw $e; | ||
if ($connectionCredentials['consume_fatal']) { | ||
throw $e; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm very interested in this part, and these two new options. I'd like to make sure we're thinking about a solution that could be generically applied to other transports and support a "retry" ability. Here's the behavior I'd like to see, which is built off of your ideas:
Morever, I think one of your goals was to standardize a few things (even though we don't have multiple transports yet), which I love! Which is why I'm wondering if we should move some of this logic to the WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, could be a goog idea to move this into the worker ! +1 :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For your point 4, I wanted to keep the compatibility. As you wish :) |
||
} finally { | ||
if (\function_exists('pcntl_signal_dispatch')) { | ||
pcntl_signal_dispatch(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -185,7 +185,7 @@ public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool | |
|
||
public function setup(): void | ||
{ | ||
if (!$this->channel()->isConnected()) { | ||
if (null === $this->amqpChannel || false === $this->amqpChannel->isConnected()) { | ||
$this->clear(); | ||
} | ||
|
||
|
@@ -206,6 +206,9 @@ public function channel(): \AMQPChannel | |
} | ||
|
||
$this->amqpChannel = $this->amqpFactory->createChannel($connection); | ||
if (isset($this->connectionCredentials['prefetch_count'])) { | ||
$this->amqpChannel->setPrefetchCount($this->connectionCredentials['prefetch_count']); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an extra feature unrelated to the main purpose of this PR, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's related to AMQP rejection process. By default, RabbitMQ uses a prefetch of 3. If you requeue a message, it comes behind the prefetched messages but not in the queue itself. If consumption speed is not important, you can choose a prefetch of 1, then the messages go back to the queue. |
||
} | ||
|
||
return $this->amqpChannel; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Transport\AmqpExt\Exception; | ||
|
||
/** | ||
* If something goes wrong while consuming and handling a message from the AMQP broker, if the exception that is thrown | ||
* by the bus while dispatching the message implements this interface, the message will be nack and re-queued. | ||
* | ||
* Bus continue handling messages. | ||
* | ||
* @author Frederic Bouchery <frederic@bouchery.fr> | ||
* | ||
* @experimental in 4.3 | ||
*/ | ||
interface RecoverableMessageExceptionInterface extends \Throwable | ||
{ | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Transport\AmqpExt\Exception; | ||
|
||
/** | ||
* If something goes wrong while consuming and handling a message from the AMQP broker, if the exception that is thrown | ||
* by the bus while dispatching the message implements this interface, the message will be nack and not re-queued. | ||
* | ||
* Bus continue handling messages. | ||
* | ||
* @author Frederic Bouchery <frederic@bouchery.fr> | ||
* | ||
* @experimental in 4.3 | ||
*/ | ||
interface UnrecoverableMessageExceptionInterface extends \Throwable | ||
{ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wouldn't we
reject
here? 🤔