-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] expire delay queue and fix auto_setup logic #32631
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,9 +62,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar | |
{ | ||
$this->connectionOptions = array_replace_recursive([ | ||
'delay' => [ | ||
'routing_key_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%', | ||
'exchange_name' => 'delay', | ||
'queue_name_pattern' => 'delay_queue_%exchange_name%_%routing_key%_%delay%', | ||
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%', | ||
], | ||
], $connectionOptions); | ||
$this->exchangeOptions = $exchangeOptions; | ||
|
@@ -93,9 +92,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar | |
* * flags: Exchange flags (Default: AMQP_DURABLE) | ||
* * arguments: Extra arguments | ||
* * delay: | ||
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%exchange_name%_%routing_key%_%delay%") | ||
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%exchange_name%_%routing_key%_%delay%") | ||
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay") | ||
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%") | ||
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay") | ||
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true) | ||
* * prefetch_count: set channel prefetch count | ||
*/ | ||
|
@@ -171,20 +169,20 @@ private static function normalizeQueueArguments(array $arguments): array | |
} | ||
|
||
/** | ||
* @param int $delay The delay in milliseconds | ||
* | ||
* @throws \AMQPException | ||
*/ | ||
public function publish(string $body, array $headers = [], int $delay = 0, AmqpStamp $amqpStamp = null): void | ||
public function publish(string $body, array $headers = [], int $delayInMs = 0, AmqpStamp $amqpStamp = null): void | ||
{ | ||
if (0 !== $delay) { | ||
$this->publishWithDelay($body, $headers, $delay, $amqpStamp); | ||
$this->clearWhenDisconnected(); | ||
|
||
if (0 !== $delayInMs) { | ||
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp); | ||
|
||
return; | ||
} | ||
|
||
if ($this->shouldSetup()) { | ||
$this->setup(); | ||
$this->setupExchangeAndQueues(); | ||
} | ||
|
||
$this->publishOnExchange( | ||
|
@@ -213,9 +211,7 @@ private function publishWithDelay(string $body, array $headers, int $delay, Amqp | |
{ | ||
$routingKey = $this->getRoutingKeyForMessage($amqpStamp); | ||
|
||
if ($this->shouldSetup()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. delay must ignore the auto_setup option as explained below |
||
$this->setupDelay($delay, $routingKey); | ||
} | ||
$this->setupDelay($delay, $routingKey); | ||
|
||
$this->publishOnExchange( | ||
$this->getDelayExchange(), | ||
|
@@ -241,15 +237,12 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string | |
|
||
private function setupDelay(int $delay, ?string $routingKey) | ||
{ | ||
if (!$this->channel()->isConnected()) { | ||
$this->clear(); | ||
if ($this->shouldSetup()) { | ||
$this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to | ||
} | ||
|
||
$this->exchange()->declareExchange(); // setup normal exchange for delay queue to DLX messages to | ||
$this->getDelayExchange()->declareExchange(); | ||
|
||
$queue = $this->createDelayQueue($delay, $routingKey); | ||
$queue->declareQueue(); | ||
$queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance | ||
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey)); | ||
} | ||
|
||
|
@@ -283,6 +276,9 @@ private function createDelayQueue(int $delay, ?string $routingKey) | |
)); | ||
$queue->setArguments([ | ||
'x-message-ttl' => $delay, | ||
// delete the delay queue 10 seconds after the message expires | ||
// publishing another message redeclares the queue which renews the lease | ||
'x-expires' => $delay + 10000, | ||
'x-dead-letter-exchange' => $this->exchangeOptions['name'], | ||
// after being released from to DLX, make sure the original routing key will be used | ||
// we must use an empty string instead of null for the argument to be picked up | ||
|
@@ -297,7 +293,7 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st | |
return str_replace( | ||
['%delay%', '%exchange_name%', '%routing_key%'], | ||
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''], | ||
$this->connectionOptions['delay']['routing_key_pattern'] | ||
$this->connectionOptions['delay']['queue_name_pattern'] | ||
); | ||
} | ||
|
||
|
@@ -308,8 +304,10 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st | |
*/ | ||
public function get(string $queueName): ?\AMQPEnvelope | ||
{ | ||
$this->clearWhenDisconnected(); | ||
|
||
if ($this->shouldSetup()) { | ||
$this->setup(); | ||
$this->setupExchangeAndQueues(); | ||
} | ||
|
||
try { | ||
|
@@ -319,7 +317,7 @@ public function get(string $queueName): ?\AMQPEnvelope | |
} catch (\AMQPQueueException $e) { | ||
if (404 === $e->getCode() && $this->shouldSetup()) { | ||
// If we get a 404 for the queue, it means we need to setup the exchange & queue. | ||
$this->setup(); | ||
$this->setupExchangeAndQueues(); | ||
|
||
return $this->get(); | ||
} | ||
|
@@ -342,10 +340,12 @@ public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQ | |
|
||
public function setup(): void | ||
{ | ||
if (!$this->channel()->isConnected()) { | ||
$this->clear(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should also happen when auto_setup is not enabled to reconnect when necessary. so I moved it outside setup() |
||
} | ||
$this->setupExchangeAndQueues(); | ||
$this->getDelayExchange()->declareExchange(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. setup is called from messenger:setup-transports which should also create the delay exchange. this way, auto_setup can actually be disabled successfully even when using delays/retries |
||
} | ||
|
||
private function setupExchangeAndQueues(): void | ||
{ | ||
$this->exchange()->declareExchange(); | ||
|
||
foreach ($this->queuesOptions as $queueName => $queueConfig) { | ||
|
@@ -424,12 +424,14 @@ public function exchange(): \AMQPExchange | |
return $this->amqpExchange; | ||
} | ||
|
||
private function clear(): void | ||
private function clearWhenDisconnected(): void | ||
{ | ||
$this->amqpChannel = null; | ||
$this->amqpQueues = []; | ||
$this->amqpExchange = null; | ||
$this->amqpDelayExchange = null; | ||
if (!$this->channel()->isConnected()) { | ||
$this->amqpChannel = null; | ||
$this->amqpQueues = []; | ||
$this->amqpExchange = null; | ||
$this->amqpDelayExchange = null; | ||
} | ||
} | ||
|
||
private function shouldSetup(): bool | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to change the name because of the new x-expire argument which would otherwise cause a precondition failed error on redeclaring. but the name is an implementation detail and should not matter.