8000 [Messenger] expire delay queue and fix auto_setup logic · symfony/symfony@75f1f0e · GitHub
[go: up one dir, main page]

Skip to content

Commit 75f1f0e

Browse files
committed
[Messenger] expire delay queue and fix auto_setup logic
1 parent 950306a commit 75f1f0e

File tree

3 files changed

+47
-41
lines changed

3 files changed

+47
-41
lines changed

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ public function testSetChannelPrefetchWhenSetup()
308308
);
309309

310310
// makes sure the channel looks connected, so it's not re-created
311-
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);
311+
$amqpChannel->expects($this->any())->method('isConnected')->willReturn(true);
312312

313313
$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
314314
$connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [], $factory);
@@ -321,26 +321,31 @@ public function testItDelaysTheMessage()
321321
{
322322
$amqpConnection = $this->createMock(\AMQPConnection::class);
323323
$amqpChannel = $this->createMock(\AMQPChannel::class);
324-
$delayQueue = $this->createMock(\AMQPQueue::class);
325324

326325
$factory = $this->createMock(AmqpFactory::class);
327326
$factory->method('createConnection')->willReturn($amqpConnection);
328327
$factory->method('createChannel')->willReturn($amqpChannel);
329-
$factory->method('createQueue')->willReturn($delayQueue);
328+
$factory->method('createQueue')->will($this->onConsecutiveCalls(
329+
$amqpQueue = $this->createMock(\AMQPQueue::class),
330+
$delayQueue = $this->createMock(\AMQPQueue::class)
331+
));
330332
$factory->method('createExchange')->will($this->onConsecutiveCalls(
331333
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
332334
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
333335
));
334336

335337
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
336338
$amqpExchange->expects($this->once())->method('declareExchange');
339+
$amqpQueue->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
340+
$amqpQueue->expects($this->once())->method('declareQueue');
337341

338342
$delayExchange->expects($this->once())->method('setName')->with('delay');
339343
$delayExchange->expects($this->once())->method('declareExchange');
340344

341-
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000');
345+
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000');
342346
$delayQueue->expects($this->once())->method('setArguments')->with([
343347
'x-message-ttl' => 5000,
348+
'x-expires' => 5000 + 10000,
344349
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
345350
'x-dead-letter-routing-key' => '',
346351
]);
@@ -383,9 +388,10 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
383388

384389
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
385390

386-
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000');
391+
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000');
387392
$delayQueue->expects($this->once())->method('setArguments')->with([
388393
'x-message-ttl' => 120000,
394+
'x-expires' => 120000 + 10000,
389395
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
390396
'x-dead-letter-routing-key' => '',
391397
]);
@@ -492,9 +498,10 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
492498

493499
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
494500

495-
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000');
501+
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000');
496502
$delayQueue->expects($this->once())->method('setArguments')->with([
497503
'x-message-ttl' => 120000,
504+
'x-expires' => 120000 + 10000,
498505
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
499506
'x-dead-letter-routing-key' => 'routing_key',
500507
]);

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@ public function send(Envelope $envelope): Envelope
4545

4646
/** @var DelayStamp|null $delayStamp */
4747
$delayStamp = $envelope->last(DelayStamp::class);
48-
$delay = 0;
49-
if (null !== $delayStamp) {
50-
$delay = $delayStamp->getDelay();
51-
}
48+
$delay = $delayStamp ? $delayStamp->getDelay() : 0;
5249

5350
$amqpStamp = $envelope->last(AmqpStamp::class);
5451
if (isset($encodedMessage['headers']['Content-Type'])) {

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
6262
{
6363
$this->connectionOptions = array_replace_recursive([
6464
'delay' => [
65-
'routing_key_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
6665
'exchange_name' => 'delay',
67-
'queue_name_pattern' => 'delay_queue_%exchange_name%_%routing_key%_%delay%',
66+
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
6867
],
6968
], $connectionOptions);
7069
$this->exchangeOptions = $exchangeOptions;
@@ -93,9 +92,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
9392
* * flags: Exchange flags (Default: AMQP_DURABLE)
9493
* * arguments: Extra arguments
9594
* * delay:
96-
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%exchange_name%_%routing_key%_%delay%")
97-
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%exchange_name%_%routing_key%_%delay%")
98-
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
95+
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
96+
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay")
9997
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
10098
* * prefetch_count: set channel prefetch count
10199
*/
@@ -171,20 +169,20 @@ private static function normalizeQueueArguments(array $arguments): array
171169
}
172170

173171
/**
174-
* @param int $delay The delay in milliseconds
175-
*
176172
* @throws \AMQPException
177173
*/
178-
public function publish(string $body, array $headers = [], int $delay = 0, AmqpStamp $amqpStamp = null): void
174+
public function publish(string $body, array $headers = [], int $delayInMs = 0, AmqpStamp $amqpStamp = null): void
179175
{
180-
if (0 !== $delay) {
181-
$this->publishWithDelay($body, $headers, $delay, $amqpStamp);
176+
$this->clearWhenDisconnected();
177+
178+
if (0 !== $delayInMs) {
179+
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
182180

183181
return;
184182
}
185183

186184
if ($this->shouldSetup()) {
187-
$this->setup();
185+
$this->setupExchangeAndQueues();
188186
}
189187

190188
$this->publishOnExchange(
@@ -213,9 +211,7 @@ private function publishWithDelay(string $body, array $headers, int $delay, Amqp
213211
{
214212
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);
215213

216-
if ($this->shouldSetup()) {
217-
$this->setupDelay($delay, $routingKey);
218-
}
214+
$this->setupDelay($delay, $routingKey);
219215

220216
$this->publishOnExchange(
221217
$this->getDelayExchange(),
@@ -241,15 +237,12 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
241237

242238
private function setupDelay(int $delay, ?string $routingKey)
243239
{
244-
if (!$this->channel()->isConnected()) {
245-
$this->clear();
240+
if ($this->shouldSetup()) {
241+
$this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to
246242
}
247243

248-
$this->exchange()->declareExchange(); // setup normal exchange for delay queue to DLX messages to
249-
$this->getDelayExchange()->declareExchange();
250-
251244
$queue = $this->createDelayQueue($delay, $routingKey);
252-
$queue->declareQueue();
245+
$queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
253246
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey));
254247
}
255248

@@ -283,6 +276,9 @@ private function createDelayQueue(int $delay, ?string $routingKey)
283276
));
284277
$queue->setArguments([
285278
'x-message-ttl' => $delay,
279+
// delete the delay queue 10 seconds after the message expires
280+
// publishing another message redeclares the queue which renews the lease
281+
'x-expires' => $delay + 10000,
286282
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
287283
// after being released from to DLX, make sure the original routing key will be used
288284
// we must use an empty string instead of null for the argument to be picked up
@@ -297,7 +293,7 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
297293
return str_replace(
298294
['%delay%', '%exchange_name%', '%routing_key%'],
299295
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
300-
$this->connectionOptions['delay']['routing_key_pattern']
296+
$this->connectionOptions['delay']['queue_name_pattern']
301297
);
302298
}
303299

@@ -308,8 +304,10 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
308304
*/
309305
public function get(string $queueName): ?\AMQPEnvelope
310306
{
307+
$this->clearWhenDisconnected();
308+
311309
if ($this->shouldSetup()) {
312-
$this->setup();
310+
$this->setupExchangeAndQueues();
313311
}
314312

315313
try {
@@ -319,7 +317,7 @@ public function get(string $queueName): ?\AMQPEnvelope
319317
} catch (\AMQPQueueException $e) {
320318
if (404 === $e->getCode() && $this->shouldSetup()) {
321319
// If we get a 404 for the queue, it means we need to setup the exchange & queue.
322-
$this->setup();
320+
$this->setupExchangeAndQueues();
323321

324322
return $this->get();
325323
}
@@ -342,10 +340,12 @@ public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQ
342340

343341
public function setup(): void
344342
{
345-
if (!$this->channel()->isConnected()) {
346-
$this->clear();
347-
}
343+
$this->setupExchangeAndQueues();
344+
$this->getDelayExchange()->declareExchange();
345+
}
348346

347+
private function setupExchangeAndQueues(): void
348+
{
349349
$this->exchange()->declareExchange();
350350

351351
foreach ($this->queuesOptions as $queueName => $queueConfig) {
@@ -424,12 +424,14 @@ public function exchange(): \AMQPExchange
424424
return $this->amqpExchange;
425425
}
426426

427-
private function clear(): void
427+
private function clearWhenDisconnected(): void
428428
{
429-
$this->amqpChannel = null;
430-
$this->amqpQueues = [];
431-
$this->amqpExchange = null;
432-
$this->amqpDelayExchange = null;
429+
if (!$this->channel()->isConnected()) {
430+
$this->amqpChannel = null;
431+
$this->amqpQueues = [];
432+
$this->amqpExchange = null;
433+
$this->amqpDelayExchange = null;
434+
}
433435
}
434436

435437
private function shouldSetup(): bool

0 commit comments

Comments
 (0)
0