8000 [Messenger] fix delay delivery for non-fanout exchanges · symfony/symfony@e9a587f · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e9a587f

Browse files
committed
[Messenger] fix delay delivery for non-fanout exchanges
also fix dsn parsing of plain amqp:// uri
1 parent 9865988 commit e9a587f

File tree

2 files changed

+53
-38
lines changed

2 files changed

+53
-38
lines changed

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ class ConnectionTest extends TestCase
2525
{
2626
/**
2727
* @expectedException \InvalidArgumentException
28-
* @expectedExceptionMessage The given AMQP DSN "amqp://" is invalid.
28+
* @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid.
2929
*/
3030
public function testItCannotBeConstructedWithAWrongDsn()
3131
{
32-
Connection::fromDsn('amqp://');
32+
Connection::fromDsn('amqp://:');
3333
}
3434

35-
public function testItGetsParametersFromTheDsn()
35+
public function testItCanBeConstructedWithDefaults()
3636
{
3737
$this->assertEquals(
3838
new Connection([
@@ -44,44 +44,58 @@ public function testItGetsParametersFromTheDsn()
4444
], [
4545
'messages' => [],
4646
]),
47-
Connection::fromDsn('amqp://localhost/%2f/messages')
47+
Connection::fromDsn('amqp://')
48+
);
49+
}
50+
51+
public function testItGetsParametersFromTheDsn()
52+
{
53+
$this->assertEquals(
54+
new Connection([
55+
'host' => 'host',
56+
'port' => 5672,
57+
'vhost' => '/',
58+
], [
59+
'name' => 'custom',
60+
], [
61+
'custom' => [],
62+
]),
63+
Connection::fromDsn('amqp://host/%2f/custom')
4864
);
4965
}
5066

5167
public function testOverrideOptionsViaQueryParameters()
5268
{
5369
$this->assertEquals(
5470
new Connection([
55-
'host' => 'redis',
71+
'host' => 'localhost',
5672
'port' => 1234,
57-
'vhost' => '/',
73+
'vhost' => 'vhost',
5874
'login' => 'guest',
5975
'password' => 'password',
6076
], [
6177
'name' => 'exchangeName',
6278
], [
6379
'queueName' => [],
6480
]),
65-
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]')
81+
Connection::fromDsn('amqp://guest:password@localhost:1234/vhost/queue?exchange[name]=exchangeName&queues[queueName]')
6682
);
6783
}
6884

6985
public function testOptionsAreTakenIntoAccountAndOverwrittenByDsn()
7086
{
7187
$this->assertEquals(
7288
new Connection([
73-
'host' => 'redis',
74-
'port' => 1234,
89+
'host' => 'localhost',
90+
'port' => 5672,
7591
'vhost' => '/',
76-
'login' => 'guest',
77-
'password' => 'password',
7892
'persistent' => 'true',
7993
], [
8094
'name' => 'exchangeName',
8195
], [
8296
'queueName' => [],
8397
]),
84-
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
98+
Connection::fromDsn('amqp://localhost/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
8599
'persistent' => 'true',
86100
'exchange' => ['name' => 'toBeOverwritten'],
87101
])

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,21 @@ class Connection
5858
*/
5959
private $amqpDelayExchange;
6060

61+
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
62+
{
63+
$this->connectionOptions = array_replace_recursive([
64+
'delay' => [
65+
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
66+
'exchange_name' => 'delay',
67+
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
68+
],
69+
], $connectionOptions);
70+
$this->exchangeOptions = $exchangeOptions;
71+
$this->queuesOptions = $queuesOptions;
72+
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
73+
}
74+
6175
/**
62-
* Constructor.
63-
*
6476
* Available options:
6577
*
6678
* * host: Hostname of the AMQP service
@@ -81,29 +93,19 @@ class Connection
8193
* * delay:
8294
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
8395
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
84-
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
96+
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
8597
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
86-
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
8798
* * prefetch_count: set channel prefetch count
8899
*/
89-
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
90-
{
91-
$this->connectionOptions = array_replace_recursive([
92-
'delay' => [
93-
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
94-
'exchange_name' => 'delay',
95-
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
96-
],
97-
], $connectionOptions);
98-
$this->exchangeOptions = $exchangeOptions;
99-
$this->queuesOptions = $queuesOptions;
100-
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
101-
}
102-
103100
public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
104101
{
105102
if (false === $parsedUrl = parse_url($dsn)) {
106-
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
103+
// this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
104+
if ('amqp://' !== $dsn) {
105+
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
106+
}
107+
108+
$parsedUrl = [];
107109
}
1081 10000 10

109111
$pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : [];
@@ -275,18 +277,17 @@ private function createDelayQueue(int $delay, ?string $routingKey)
275277
$queue = $this->amqpFactory->createQueue($this->channel());
276278
$queue->setName(str_replace(
277279
['%delay%', '%routing_key%'],
278-
[$delay, $routingKey ?: ''],
280+
[$delay, $routingKey ?? ''],
279281
$this->connectionOptions['delay']['queue_name_pattern']
280-
));
282+
));
281283
$queue->setArguments([
282284
'x-message-ttl' => $delay,
283285
'x-dead-letter-exchange' => $this->exchange()->getName(),
284286
]);
285287

286-
if (null !== $routingKey) {
287-
// after being released from to DLX, this routing key will be used
288-
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
289-
}
288+
// after being released from to DLX, make sure the original routing key will be used
289+
// we must use an empty string instead of null for the argument to be picked up
290+
$queue->setArgument('x-dead-letter-routing-key', $routingKey ?? '');
290291

291292
return $queue;
292293
}
@@ -295,7 +296,7 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
295296
{
296297
return str_replace(
297298
['%delay%', '%routing_key%'],
298-
[$delay, $finalRoutingKey ?: ''],
299+
[$delay, $finalRoutingKey ?? ''],
299300
$this->connectionOptions['delay']['routing_key_pattern']
300301
);
301302
}

0 commit comments

Comments
 (0)
0