8000 bug #33127 [Messenger] make delay exchange and queues durable like th… · symfony/symfony@431ead2 · GitHub
[go: up one dir, main page]

Skip to content

Commit 431ead2

Browse files
committed
bug #33127 [Messenger] make delay exchange and queues durable like the normal ones by default (Tobion)
This PR was merged into the 4.3 branch. Discussion ---------- [Messenger] make delay exchange and queues durable like the normal ones by default | Q | A | ------------- | --- | Branch? | 4.3 | Bug fix? | yes | New feature? | no <!-- please update src/**/CHANGELOG.md files --> | BC breaks? | no <!-- see https://symfony.com/bc --> | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tests pass? | yes <!-- please add some, will be required by reviewers --> | Fixed tickets | #32891 | License | MIT | Doc PR | This also imrproves BC of #32631 by only adding the new expiry argument in case the delay queue name was not overwritten using the options. I will remove the checks in 4.4 again. Please merge this PR before releasing the new 4.3 version so that 32631 and this PR are part of one release. Commits ------- e5ecda6 [Messenger] make delay exchange and queues durable like the normal ones by default
2 parents a553173 + e5ecda6 commit 431ead2

File tree

2 files changed

+23
-10
lines changed

2 files changed

+23
-10
lines changed

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay()
337337
$amqpQueue->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
338338
$amqpQueue->expects($this->once())->method('declareQueue');
339339

340-
$delayExchange->expects($this->once())->method('setName')->with('delay');
340+
$delayExchange->expects($this->once())->method('setName')->with('delays');
341341
$delayExchange->expects($this->once())->method('declareExchange');
342342
$delayExchange->expects($this->once())->method('publish');
343343

@@ -371,7 +371,7 @@ public function testItDelaysTheMessage()
371371
]);
372372

373373
$delayQueue->expects($this->once())->method('declareQueue');
374-
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__5000');
374+
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
375375

376376
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
377377

@@ -413,7 +413,7 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
413413
]);
414414

415415
$delayQueue->expects($this->once())->method('declareQueue');
416-
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__120000');
416+
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
417417

418418
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]);
419419
$connection->publish('{}', [], 120000);
@@ -517,7 +517,7 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
517517
]);
518518

519519
$delayQueue->expects($this->once())->method('declareQueue');
520-
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages_routing_key_120000');
520+
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
521521

522522
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
523523
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
6262
{
6363
$this->connectionOptions = array_replace_recursive([
6464
'delay' => [
65-
'exchange_name' => 'delay',
65+
'exchange_name' => 'delays',
6666
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
6767
],
6868
], $connectionOptions);
@@ -93,7 +93,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
9393
* * arguments: Extra arguments
9494
* * delay:
9595
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
96-
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay")
96+
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays")
9797
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
9898
* * prefetch_count: set channel prefetch count
9999
*/
@@ -252,6 +252,11 @@ private function getDelayExchange(): \AMQPExchange
252252
$this->amqpDelayExchange = $this->amqpFactory->createExchange($this->channel());
253253
$this->amqpDelayExchange->setName($this->connectionOptions['delay']['exchange_name']);
254254
$this->amqpDelayExchange->setType(AMQP_EX_TYPE_DIRECT);
255+
if ('delays' === $this->connectionOptions['delay']['exchange_name']) {
256+
// only add the new flag when the name was not provided explicitly so we're using the new default name to prevent a redeclaration error
257+
// the condition will be removed in 4.4
258+
$this->amqpDelayExchange->setFlags(AMQP_DURABLE);
259+
}
255260
}
256261

257262
return $this->amqp 8000 DelayExchange;
@@ -274,16 +279,24 @@ private function createDelayQueue(int $delay, ?string $routingKey)
274279
[$delay, $this->exchangeOptions['name'], $routingKey ?? ''],
275280
$this->connectionOptions['delay']['queue_name_pattern']
276281
));
282+
if ('delay_%exchange_name%_%routing_key%_%delay%' === $this->connectionOptions['delay']['queue_name_pattern']) {
283+
// the condition will be removed in 4.4
284+
$queue->setFlags(AMQP_DURABLE);
285+
$extraArguments = [
286+
// delete the delay queue 10 seconds after the message expires
287+
// publishing another message redeclares the queue which renews the lease
288+
'x-expires' => $delay + 10000,
289+
];
290+
} else {
291+
$extraArguments = [];
292+
}
277293
$queue->setArguments([
278294
'x-message-ttl' => $delay,
279-
// delete the delay queue 10 seconds after the message expires
280-
// publishing another message redeclares the queue which renews the lease
281-
'x-expires' => $delay + 10000,
282295
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
283296
// after being released from to DLX, make sure the 5E2D original routing key will be used
284297
// we must use an empty string instead of null for the argument to be picked up
285298
'x-dead-letter-routing-key' => $routingKey ?? '',
286-
]);
299+
] + $extraArguments);
287300

288301
return $queue;
289302
}

0 commit comments

Comments
 (0)
0