8000 bug #32052 [Messenger] fix AMQP delay queue to be per exchange (Tobion) · symfony/symfony@12b852f · GitHub
[go: up one dir, main page]

Skip to content {"props":{"docsUrl":"https://docs.github.com/get-started/accessibility/keyboard-shortcuts"}}

Commit 12b852f

Browse files
committed
bug #32052 [Messenger] fix AMQP delay queue to be per exchange (Tobion)
This PR was squashed before being merged into the 4.3 branch (closes #32052). Discussion ---------- [Messenger] fix AMQP delay queue to be per exchange | Q | A | ------------- | --- | Branch? | 4.3 | Bug fix? | yes | New feature? | no <!-- please update src/**/CHANGELOG.md files --> | BC breaks? | no <!-- see https://symfony.com/bc --> | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tests pass? | yes <!-- please add some, will be required by reviewers --> | Fixed tickets | #32050 | License | MIT | Doc PR | this makes the delay/retry work when having several exchanges or renaming your exchange. also the delay setup did not declare the target exchange. so if you only do delayed messages for a connection, auto-setup forgot to actually create the target exchange. Commits ------- 5bc3364 [Messenger] fix AMQP delay queue to be per exchange
2 parents 99c44a3 + 5bc3364 commit 12b852f

File tree

2 files changed

+70
-75
lines changed

2 files changed

+70
-75
lines changed

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Lines changed: 53 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
*/
2424
class ConnectionTest extends TestCase
2525
{
26+
private const DEFAULT_EXCHANGE_NAME = 'messages';
27+
2628
/**
2729
* @expectedException \InvalidArgumentException
2830
* @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid.
@@ -40,9 +42,9 @@ public function testItCanBeConstructedWithDefaults()
4042
'port' => 5672,
4143
'vhost' => '/',
4244
], [
43-
'name' => 'messages',
45+
'name' => self::DEFAULT_EXCHANGE_NAME,
4446
], [
45-
'messages' => [],
47+
self::DEFAULT_EXCHANGE_NAME => [],
4648
]),
4749
Connection::fromDsn('amqp://')
4850
);
@@ -196,7 +198,7 @@ public function testItUsesANormalConnectionByDefault()
196198
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
197199
$amqpConnection->expects($this->once())->method('connect');
198200

199-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
201+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
200202
$connection->publish('body');
201203
}
202204

@@ -213,7 +215,7 @@ public function testItAllowsToUseAPersistentConnection()
213215
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
214216
$amqpConnection->expects($this->once())->method('pconnect');
215217

216-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', [], $factory);
218+
$connection = Connection::fromDsn('amqp://localhost?persistent=true', [], $factory);
217219
$connection->publish('body');
218220
}
219221

@@ -226,13 +228,12 @@ public function testItSetupsTheConnectionWithDefaults()
226228
$amqpExchange = $this->createMock(\AMQPExchange::class)
227229
);
228230

229-
$amqpExchange->method('getName')->willReturn('exchange_name');
230231
$amqpExchange->expects($this->once())->method('declareExchange');
231232
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
232233
$amqpQueue->expects($this->once())->method('declareQueue');
233-
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', null);
234+
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);
234235

235-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
236+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
236237
$connection->publish('body');
237238
}
238239

@@ -250,21 +251,20 @@ public function testItSetupsTheConnection()
250251
$factory->method('createExchange')->willReturn($amqpExchange);
251252
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
252253

253-
$amqpExchange->method('getName')->willReturn('exchange_name');
254254
$amqpExchange->expects($this->once())->method('declareExchange');
255255
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]);
256256
$amqpQueue0->expects($this->once())->method('declareQueue');
257257
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
258-
['exchange_name', 'binding_key0'],
259-
['exchange_name', 'binding_key1']
258+
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
259+
[self::DEFAULT_EXCHANGE_NAME, 'binding_key1']
260260
);
261261
$amqpQueue1->expects($this->once())->method('declareQueue');
262262
$amqpQueue1->expects($this->exactly(2))->method('bind')->withConsecutive(
263-
['exchange_name', 'binding_key2'],
264-
['exchange_name', 'binding_key3']
263+
[self::DEFAULT_EXCHANGE_NAME, 'binding_key2'],
264+
[self::DEFAULT_EXCHANGE_NAME, 'binding_key3']
265265
);
266266

267-
$dsn = 'amqp://localhost/%2f/messages?'.
267+
$dsn = 'amqp://localhost?'.
268268
'exchange[default_publish_routing_key]=routing_key&'.
269269
'queues[queue0][binding_keys][0]=binding_key0&'.
270270
'queues[queue0][binding_keys][1]=binding_key1&'.
@@ -284,18 +284,17 @@ public function testItCanDisableTheSetup()
284284
$amqpExchange = $this->createMock(\AMQPExchange::class)
285285
);
286286

287-
$amqpExchange->method('getName')->willReturn('exchange_name');
288287
$amqpExchange->expects($this->never())->method('declareExchange');
289288
$amqpQueue->expects($this->never())->method('declareQueue');
290289
$amqpQueue->expects($this->never())->method('bind');
291290

292-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => 'false'], $factory);
291+
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => 'false'], $factory);
293292
$connection->publish('body');
294293

295-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => false], $factory);
294+
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => false], $factory);
296295
$connection->publish('body');
297296

298-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?auto_setup=false', [], $factory);
297+
$connection = Connection::fromDsn('amqp://localhost?auto_setup=false', [], $factory);
299298
$connection->publish('body');
300299
}
301300

@@ -312,9 +311,9 @@ public function testSetChannelPrefetchWhenSetup()
312311
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);
313312

314313
$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
315-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?prefetch_count=2', [], $factory);
314+
$connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [], $factory);
316315
$connection->setup();
317-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['prefetch_count' => 2], $factory);
316+
$connection = Connection::fromDsn('amqp://localhost', ['prefetch_count' => 2], $factory);
318317
$connection->setup();
319318
}
320319

@@ -329,29 +328,29 @@ public function testItDelaysTheMessage()
329328
$factory->method('createChannel')->willReturn($amqpChannel);
330329
$factory->method('createQueue')->willReturn($delayQueue);
331330
$factory->method('createExchange')->will($this->onConsecutiveCalls(
332-
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
333-
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
331+
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
332+
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
334333
));
335334

336-
$amqpExchange->expects($this->once())->method('setName')->with('messages');
337-
$amqpExchange->method('getName')->willReturn('messages');
335+
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
336+
$amqpExchange->expects($this->once())->method('declareExchange');
338337

339338
$delayExchange->expects($this->once())->method('setName')->with('delay');
340339
$delayExchange->expects($this->once())->method('declareExchange');
341-
$delayExchange->method('getName')->willReturn('delay');
342340

343-
$delayQueue->expects($this->once())->method('setName')->with('delay_queue__5000');
341+
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000');
344342
$delayQueue->expects($this->once())->method('setArguments')->with([
345343
'x-message-ttl' => 5000,
346-
'x-dead-letter-exchange' => 'messages',
344+
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
345+
'x-dead-letter-routing-key' => '',
347346
]);
348347

349348
$delayQueue->expects($this->once())->method('declareQueue');
350-
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay__5000');
349+
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__5000');
351350

352-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
351+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
353352

354-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
353+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
355354
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
356355
}
357356

@@ -366,41 +365,41 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
366365
$factory->method('createChannel')->willReturn($amqpChannel);
367366
$factory->method('createQueue')->willReturn($delayQueue);
368367
$factory->method('createExchange')->will($this->onConsecutiveCalls(
369-
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
370-
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
368+
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
369+
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
371370
));
372371

373-
$amqpExchange->expects($this->once())->method('setName')->with('messages');
374-
$amqpExchange->method('getName')->willReturn('messages');
372+
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
373+
$amqpExchange->expects($this->once())->method('declareExchange');
375374

376375
$delayExchange->expects($this->once())->method('setName')->with('delay');
377376
$delayExchange->expects($this->once())->method('declareExchange');
378-
$delayExchange->method('getName')->willReturn('delay');
379377

380378
$connectionOptions = [
381379
'retry' => [
382380
'dead_routing_key' => 'my_dead_routing_key',
383381
],
384382
];
385383

386-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
384+
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
387385

388-
$delayQueue->expects($this->once())->method('setName')->with('delay_queue__120000');
386+
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000');
389387
$delayQueue->expects($this->once())->method('setArguments')->with([
390388
'x-message-ttl' => 120000,
391-
'x-dead-letter-exchange' => 'messages',
389+
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
390+
'x-dead-letter-routing-key' => '',
392391
]);
393392

394393
$delayQueue->expects($this->once())->method('declareQueue');
395-
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay__120000');
394+
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__120000');
396395

397-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay__120000', AMQP_NOPARAM, ['headers' => []]);
396+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]);
398397
$connection->publish('{}', [], 120000);
399398
}
400399

401400
/**
402401
* @expectedException \AMQPException
403-
* @expectedExceptionMessage Could not connect to the AMQP server. Please verify the provided DSN. ({"delay":{"routing_key_pattern":"delay_%routing_key%_%delay%","exchange_name":"delay","queue_name_pattern":"delay_queue_%routing_key%_%delay%"},"host":"localhost","port":5672,"vhost":"\/","login":"user","password":"********"})
402+
* @expectedExceptionMessage Could not connect to the AMQP server. Please verify the provided DSN. ({"host":"localhost","port":5672,"vhost":"\/","login":"user","password":"********"})
404403
*/
405404
public function testObfuscatePasswordInDsn()
406405
{
@@ -415,7 +414,7 @@ public function testObfuscatePasswordInDsn()
415414
new \AMQPConnectionException('Oups.')
416415
);
417416

418-
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost/%2f/messages', [], $factory);
417+
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost', [], $factory);
419418
$connection->channel();
420419
}
421420

@@ -430,7 +429,7 @@ public function testItCanPublishWithTheDefaultRoutingKey()
430429

431430
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
432431

433-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=routing_key', [], $factory);
432+
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=routing_key', [], $factory);
434433
$connection->publish('body');
435434
}
436435

@@ -445,7 +444,7 @@ public function testItCanPublishWithASuppliedRoutingKey()
445444

446445
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
447446

448-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
447+
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
449448
$connection->publish('body', [], 0, new AmqpStamp('routing_key'));
450449
}
451450

@@ -460,39 +459,35 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
460459
$factory->method('createChannel')->willReturn($amqpChannel);
461460
$factory->method('createQueue')->willReturn($delayQueue);
462461
$factory->method('createExchange')->will($this->onConsecutiveCalls(
463-
$delayExchange = $this->createMock(\AMQPExchange::class),
464-
$amqpExchange = $this->createMock(\AMQPExchange::class)
462+
$amqpExchange = $this->createMock(\AMQPExchange::class),
463+
$delayExchange = $this->createMock(\AMQPExchange::class)
465464
));
466465

467-
$amqpExchange->expects($this->once())->method('setName')->with('messages');
468-
$amqpExchange->method('getName')->willReturn('messages');
466+
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
467+
$amqpExchange->expects($this->once())->method('declareExchange');
469468

470469
$delayExchange->expects($this->once())->method('setName')->with('delay');
471470
$delayExchange->expects($this->once())->method('declareExchange');
472-
$delayExchange->method('getName')->willReturn('delay');
473471

474472
$connectionOptions = [
475473
'retry' => [
476474
'dead_routing_key' => 'my_dead_routing_key',
477475
],
478476
];
479477

480-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
478+
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
481479

482-
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_routing_key_120000');
480+
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000');
483481
$delayQueue->expects($this->once())->method('setArguments')->with([
484482
'x-message-ttl' => 120000,
485-
'x-dead-letter-exchange' => 'messages',
483+
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
484+
'x-dead-letter-routing-key' => 'routing_key',
486485
]);
487-
$delayQueue->expects($this->once())->method('setArgument')->with(
488-
'x-dead-letter-routing-key',
489-
'routing_key'
490-
);
491486

492487
$delayQueue->expects($this->once())->method('declareQueue');
493-
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_routing_key_120000');
488+
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages_routing_key_120000');
494489

495-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
490+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
496491
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
497492
}
498493

@@ -512,7 +507,7 @@ public function testItCanPublishWithCustomFlagsAndAttributes()
512507
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]]
513508
);
514509

515-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
510+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
516511
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2]));
517512
}
518513
}

0 commit comments

Comments
 (0)
0