8000 [Messenger] fix delay delivery for non-fanout exchanges · symfony/symfony@0f15306 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0f15306

Browse files
committed
[Messenger] fix delay delivery for non-fanout exchanges
also fix dsn parsing of plain amqp:// uri
1 parent 9865988 commit 0f15306

File tree

2 files changed

+54
-37
lines changed

2 files changed

+54
-37
lines changed

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ class ConnectionTest extends TestCase
2525
{
2626
/**
2727
* @expectedException \InvalidArgumentException
28-
* @expectedExceptionMessage The given AMQP DSN "amqp://" is invalid.
28+
* @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid.
2929
*/
3030
public function testItCannotBeConstructedWithAWrongDsn()
3131
{
32-
Connection::fromDsn('amqp://');
32+
Connection::fromDsn('amqp://:');
3333
}
3434

35-
public function testItGetsParametersFromTheDsn()
35+
public function testItCanBeConstructedWithDefaults()
3636
{
3737
$this->assertEquals(
3838
new Connection([
@@ -44,44 +44,58 @@ public function testItGetsParametersFromTheDsn()
4444
], [
4545
'messages' => [],
4646
]),
47-
Connection::fromDsn('amqp://localhost/%2f/messages')
47+
Connection::fromDsn('amqp://')
48+
);
49+
}
50+
51+
public function testItGetsParametersFromTheDsn()
52+
{
53+
$this->assertEquals(
54+
new Connection([
55+
'host' => 'host',
56+
'port' => 5672,
57+
'vhost' => '/',
58+
], [
59+
'name' => 'custom',
60+
], [
61+
'custom' => [],
62+
]),
63+
Connection::fromDsn('amqp://host/%2f/custom')
4864
);
4965
}
5066

5167
public function testOverrideOptionsViaQueryParameters()
5268
{
5369
$this->assertEquals(
5470
new Connection([
55-
'host' => 'redis',
71+
'host' => 'localhost',
5672
'port' => 1234,
57-
'vhost' => '/',
73+
'vhost' => 'vhost',
5874
'login' => 'guest',
5975
'password' => 'password',
6076
], [
6177
'name' => 'exchangeName',
6278
], [
6379
'queueName' => [],
6480
]),
65-
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]')
81+
Connection::fromDsn('amqp://guest:password@localhost:1234/vhost/queue?exchange[name]=exchangeName&queues[queueName]')
6682
);
6783
}
6884

6985
public function testOptionsAreTakenIntoAccountAndOverwrittenByDsn()
7086
{
7187
$this->assertEquals(
7288
new Connection([
73-
'host' => 'redis',
74-
'port' => 1234,
89+
'host' => 'localhost',
90+
'port' => 5672,
7591
'vhost' => '/',
76-
'login' => 'guest',
77-
'password' => 'password',
7892
'persistent' => 'true',
7993
], [
8094
'name' => 'exchangeName',
8195
], [
8296
'queueName' => [],
8397
]),
84-
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
98+
Connection::fromDsn('amqp://localhost/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
8599
'persistent' => 'true',
86100
'exchange' => ['name' => 'toBeOverwritten'],
87101
])

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,22 @@ class Connection
5858
*/
5959
private $amqpDelayExchange;
6060

61+
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
62+
{
63+
$this->connectionOptions = array_replace_recursive([
64+
'delay' => [
65+
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
66+
'exchange_name' => 'delay',
67+
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
68+
],
69+
], $connectionOptions);
70+
$this->exchangeOptions = $exchangeOptions;
71+
$this->queuesOptions = $queuesOptions;
72+
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
73+
}
74+
6175
/**
62-
* Constructor.
76+
* Creates a connection based on the DSN and options.
6377
*
6478
* Available options:
6579
*
@@ -81,29 +95,19 @@ class Connection
8195
* * delay:
8296
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
8397
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
84-
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
98+
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
8599
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
86-
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
87100
* * prefetch_count: set channel prefetch count
88101
*/
89-
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
90-
{
91-
$this->connectionOptions = array_replace_recursive([
92-
'delay' => [
93-
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
94-
'exchange_name' => 'delay',
95-
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
96-
],
97-
], $connectionOptions);
98-
$this->exchangeOptions = $exchangeOptions;
99-
$this->queuesOptions = $queuesOptions;
100-
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
101-
}
102-
103102
public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
104103
{
105104
if (false === $parsedUrl = parse_url($dsn)) {
106-
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
105+
// this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
106+
if ('amqp://' !== $dsn) {
107+
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
108+
}
109+
110+
$parsedUrl = [];
107111
}
108112

109113
$pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : [];
@@ -275,18 +279,17 @@ private function createDelayQueue(int $delay, ?string $routingKey)
275279
$queue = $this->amqpFactory->createQueue($this->channel());
276280
$queue->setName(str_replace(
277281
['%delay%', '%routing_key%'],
278-
[$delay, $routingKey ?: ''],
282+
[$delay, $routingKey ?? ''],
279283
$this->connectionOptions['delay']['queue_name_pattern']
280-
));
284+
));
281285
$queue->setArguments([
282286
'x-message-ttl' => $delay,
283287
'x-dead-letter-exchange' => $this->exchange()->getName(),
284288
]);
285289

286-
if (null !== $routingKey) {
287-
// after being released from to DLX, this routing key will be used
288-
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
289-
}
290+
// after being released from to DLX, make sure the original routing key will be used
291+
// we must use an empty string instead of null for the argument to be picked up
292+
$queue->setArgument('x-dead-letter-routing-key', $routingKey ?? '');
290293

291294
return $queue;
292295
}
@@ -295,7 +298,7 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
295298
{
296299
return str_replace(
297300
['%delay%', '%routing_key%'],
298-
[$delay, $finalRoutingKey ?: ''],
301+
[$delay, $finalRoutingKey ?? ''],
299302
$this->connectionOptions[' 42E5 ;delay']['routing_key_pattern']
300303
);
301304
}

0 commit comments

Comments
 (0)
0