8000 [messenger] Adds a stamp to provide a routing key on message publishing. · symfony/symfony@8fddd02 · GitHub
[go: up one dir, main page]

Skip to content

Commit 8fddd02

Browse files
author
Guillaume Gammelin
committed
[messenger] Adds a stamp to provide a routing key on message publishing.
1 parent c30f462 commit 8fddd02

File tree

9 files changed

+192
-17
lines changed

9 files changed

+192
-17
lines changed

UPGRADE-4.3.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ Messenger
9191

9292
* `Amqp` transport does not throw `\AMQPException` anymore, catch `TransportException` instead.
9393
* Deprecated the `LoggingMiddleware` class, pass a logger to `SendMessageMiddleware` instead.
94+
* Deprecated routing key from queue configuration (`queue[routing_key]` in the DSN), use exchange configuration instead (`exchange[routing_key]` in the DSN).
9495

9596
Routing
9697
-------

UPGRADE-5.0.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ Messenger
236236
---------
237237

238238
* The `LoggingMiddleware` class has been removed, pass a logger to `SendMessageMiddleware` instead.
239+
* Routing key from queue configuration has been removed. Use exchange configuration instead (`exchange[routing_key]` in DSN).
239240

240241
Monolog
241242
-------

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ CHANGELOG
5252
* [BC BREAK] The Amqp Transport now automatically sets up the exchanges
5353
and queues by default. Previously, this was done when in "debug" mode
5454
only. Pass the `auto_setup` connection option to control this.
55-
5655
* Added a `SetupTransportsCommand` command to setup the transports
56+
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
57+
* Deprecated publis A3E2 hing with a routing key from queue configuration, use exchange configuration instead.
5758

5859
4.2.0
5960
-----
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
16+
17+
class AmqpRoutingKeyStampTest extends TestCase
18+
{
19+
public function testStamp()
20+
{
21+
$stamp = new AmqpRoutingKeyStamp('routing_key');
22+
$this->assertSame('routing_key', $stamp->getRoutingKey());
23+
}
24+
}

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
1718
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
1819
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
1920
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -38,6 +39,21 @@ public function testItSendsTheEncodedMessage()
3839
$sender->send($envelope);
3940
}
4041

42+
public function testItSendsTheEncodedMessageUsingARoutingKey()
43+
{
44+
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk'));
45+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
46+
47+
$serializer = $this->createMock(SerializerInterface::class);
48+
$serializer->method('encode')->with($envelope)->willReturn($encoded);
49+
50+
$connection = $this->createMock(Connection::class);
51+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk');
52+
53+
$sender = new AmqpSender($connection, $serializer);
54+
$sender->send($envelope);
55+
}
56+
4157
public function testItSendsTheEncodedMessageWithoutHeaders()
4258
{
4359
$envelope = new Envelope(new DummyMessage('Oy'));

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,11 @@ public function testItSetupsTheConnectionByDefault()
207207

208208
$amqpExchange->method('getName')->willReturn('exchange_name');
209209
$amqpExchange->expects($this->once())->method('declareExchange');
210+
$amqpExchange->expects($this->once())->method('publish')->with('body', 'exchange_key', AMQP_NOPARAM, ['headers' => []]);
210211
$amqpQueue->expects($this->once())->method('declareQueue');
211-
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'my_key');
212+
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'queue_key');
212213

213-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], $factory);
214+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=exchange_key&queue[routing_key]=queue_key', [], $factory);
214215
$connection->publish('body');
215216
}
216217

@@ -228,13 +229,13 @@ public function testItCanDisableTheSetup()
228229
$amqpQueue->expects($this->never())->method('declareQueue');
229230
$amqpQueue->expects($this->never())->method('bind');
230231

231-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto_setup' => 'false'], $factory);
232+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key', ['auto_setup' => 'false'], $factory);
232233
$connection->publish('body');
233234

234-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto_setup' => false], $factory);
235+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key', ['auto_setup' => false], $factory);
235236
$connection->publish('body');
236237

237-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto_setup=false', [], $factory);
238+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key&auto_setup=false', [], $factory);
238239
$connection->publish('body');
239240
}
240241

@@ -375,6 +376,83 @@ public function testObfuscatePasswordInDsn()
375376
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost/%2f/messages', [], $factory);
376377
$connection->channel();
377378
}
379+
380+
public function testItCanPublishWithTheDefaultQueueRoutingKey()
381+
{
382+
$factory = new TestAmqpFactory(
383+
$amqpConnection = $this->createMock(\AMQPConnection::class),
384+
$amqpChannel = $this->createMock(\AMQPChannel::class),
385+
$amqpQueue = $this->createMock(\AMQPQueue::class),
386+
$amqpExchange = $this->createMock(\AMQPExchange::class)
387+
);
388+
389+
$amqpExchange->expects($this->once())->method('publish')->with('body', 'my_key');
390+
391+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key', [], $factory);
392+
$connection->publish('body');
393+
}
394+
395+
public function testItCanPublishWithASuppliedRoutingKey()
396+
{
397+
$factory = new TestAmqpFactory(
398+
$amqpConnection = $this->createMock(\AMQPConnection::class),
399+
$amqpChannel = $this->createMock(\AMQPChannel::class),
400+
$amqpQueue = $this->createMock(\AMQPQueue::class),
401+
$amqpExchange = $this->createMock(\AMQPExchange::class)
402+
);
403+
404+
$amqpExchange->expects($this->once())->method('publish')->with('body', 'supplied_key');
405+
406+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key', [], $factory);
407+
$connection->publish('body', [], 0, 'supplied_key');
408+
}
409+
410+
public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument()
411+
{
412+
$amqpConnection = $this->createMock(\AMQPConnection::class);
413+
$amqpChannel = $this->createMock(\AMQPChannel::class);
414+
$delayQueue = $this->createMock(\AMQPQueue::class);
415+
416+
$factory = $this->createMock(AmqpFactory::class);
417+
$factory->method('createConnection')->willReturn($amqpConnection);
418+
$factory->method('createChannel')->willReturn($amqpChannel);
419+
$factory->method('createQueue')->willReturn($delayQueue);
420+
$factory->method('createExchange')->will($this->onConsecutiveCalls(
421+
$delayExchange = $this->createMock(\AMQPExchange::class),
422+
$amqpExchange = $this->createMock(\AMQPExchange::class)
423+
));
424+
425+
$amqpExchange->expects($this->once())->method('setName')->with('messages');
426+
$amqpExchange->method('getName')->willReturn('messages');
427+
428+
$delayExchange->expects($this->once())->method('setName')->with('delay');
429+
$delayExchange->expects($this->once())->method('declareExchange');
430+
$delayExchange->method('getName')->willReturn('delay');
431+
432+
$connectionOptions = [
433+
'retry' => [
434+
'dead_routing_key' => 'my_dead_routing_key',
435+
],
436+
];
437+
438+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
439+
440+
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_120000');
441+
$delayQueue->expects($this->once())->method('setArguments')->with([
442+
'x-message-ttl' => 120000,
443+
'x-dead-letter-exchange' => 'messages',
444+
]);
445+
$delayQueue->expects($this->once())->method('setArgument')->with(
446+
'x-dead-letter-routing-key',
447+
'routing_key'
448+
);
449+
450+
$delayQueue->expects($this->once())->method('declareQueue');
451+
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000');
452+
453+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]);
454+
$connection->publish('{}', [], 120000, 'routing_key');
455+
}
378456
}
379457

380458
class TestAmqpFactory extends AmqpFactory
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\AmqpExt;
13+
14+
use Symfony\Component\Messenger\Stamp\StampInterface;
15+
16+
/**
17+
* @author Guillaume Gammelin <ggammelin@gmail.com>
18+
*
19+
* @experimental in 4.3
20+
*/
21+
final class AmqpRoutingKeyStamp implements StampInterface
22+
{
23+
/**
24+
* @var string
25+
*/
26+
private $routingKey;
27+
28+
public function __construct(string $routingKey)
29+
{
30+
$this->routingKey = $routingKey;
31+
}
32+
33+
public function getRoutingKey(): string
34+
{
35+
return $this->routingKey;
36+
}
37+
}

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ public function send(Envelope $envelope): Envelope
5151
}
5252

5353
try {
54-
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
54+
/** @var $routingKeyStamp AmqpRoutingKeyStamp */
55+
$routingKeyStamp = $envelope->last(AmqpRoutingKeyStamp::class);
56+
$routingKey = $routingKeyStamp ? $routingKeyStamp->getRoutingKey() : null;
57+
58+
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, $routingKey);
5559
} catch (\AMQPException $e) {
5660
throw new TransportException($e->getMessage(), 0, $e);
5761
}

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,10 @@ private static function normalizeQueueArguments(array $arguments): array
166166
*
167167
* @throws \AMQPException
168168
*/
169-
public function publish(string $body, array $headers = [], int $delay = 0): void
169+
public function publish(string $body, array $headers = [], int $delay = 0, string $routingKey = null): void
170170
{
171171
if (0 !== $delay) {
172-
$this->publishWithDelay($body, $headers, $delay);
172+
$this->publishWithDelay($body, $headers, $delay, $routingKey);
173173

174174
return;
175175
}
@@ -180,17 +180,18 @@ public function publish(string $body, array $headers = [], int $delay = 0): void
180180

181181
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
182182
$attributes = $this->getAttributes($headers);
183+
$routingKey = $routingKey ?? $this->getExchangeRoutingKey();
183184

184-
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes);
185+
$this->exchange()->publish($body, $routingKey, $flags, $attributes);
185186
}
186187

187188
/**
188189
* @throws \AMQPException
189190
*/
190-
private function publishWithDelay(string $body, array $headers = [], int $delay)
191+
private function publishWithDelay(string $body, array $headers, int $delay, ?string $exchangeRoutingKey)
191192
{
192193
if ($this->shouldSetup()) {
193-
$this->setupDelay($delay);
194+
$this->setupDelay($delay, $exchangeRoutingKey);
194195
}
195196

196197
$routingKey = $this->getRoutingKeyForDelay($delay);
@@ -200,7 +201,7 @@ private function publishWithDelay(string $body, array $headers = [], int $delay)
200201
$this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes);
201202
}
202203

203-
private function setupDelay(int $delay)
204+
private function setupDelay(int $delay, ?string $exchangeRoutingKey)
204205
{
205206
if (!$this->channel()->isConnected()) {
206207
$this->clear();
@@ -209,7 +210,7 @@ private function setupDelay(int $delay)
209210
$exchange = $this->getDelayExchange();
210211
$exchange->declareExchange();
211212

212-
$queue = $this->createDelayQueue($delay);
213+
$queue = $this->createDelayQueue($delay, $exchangeRoutingKey);
213214
$queue->declareQueue();
214215
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
215216
}
@@ -234,7 +235,7 @@ private function getDelayExchange(): \AMQPExchange
234235
* which is the original exchange, resulting on it being put back into
235236
* the original queue.
236237
*/
237-
private function createDelayQueue(int $delay)
238+
private function createDelayQueue(int $delay, ?string $exchangeRoutingKey)
238239
{
239240
$delayConfiguration = $this->connectionConfiguration['delay'];
240241

@@ -245,9 +246,10 @@ private function createDelayQueue(int $delay)
245246
'x-dead-letter-exchange' => $this->exchange()->getName(),
246247
]);
247248

248-
if (isset($this->queueConfiguration['routing_key'])) {
249+
$exchangeRoutingKey = $exchangeRoutingKey ?? $this->getExchangeRoutingKey();
250+
if (null !== $exchangeRoutingKey) {
249251
// after being released from to DLX, this routing key will be used
250-
$queue->setArgument('x-dead-letter-routing-key', $this->queueConfiguration['routing_key']);
252+
$queue->setArgument('x-dead-letter-routing-key', $exchangeRoutingKey);
251253
}
252254

253255
return $queue;
@@ -393,4 +395,15 @@ private function getAttributes(array $headers): array
393395
{
394396
return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
395397
}
398+
399+
private function getExchangeRoutingKey(): ?string
400+
{
401+
$routingKey = $this->exchangeConfiguration['routing_key'] ?? null;
402+
if (null === $routingKey && isset($this->queueConfiguration['routing_key'])) {
403+
$routingKey = $this->queueConfiguration['routing_key'];
404+
@trigger_error('Routing key from "queue" configuration is deprecated. Use "exchange" configuration instead.', E_USER_DEPRECATED);
405+
}
406+
407+
return $routingKey;
408+
}
396409
}

0 commit comments

Comments
 (0)
0