8000 [messenger] AMQP configurable routing key & multiple queues · symfony/symfony@92221b8 · GitHub
[go: up one dir, main page]

Skip to content

Commit 92221b8

Browse files
author
Guillaume Gammelin
committed
[messenger] AMQP configurable routing key & multiple queues
1 parent 574097f commit 92221b8

13 files changed

+455
-167
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ CHANGELOG
1818
changed: a required 3rd `SerializerInterface` argument was added.
1919
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
2020
explicitly handle messages synchronously.
21+
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
22+
* Removed publishing with a `routing_key` option from queue configuration, for
23+
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
2124
* Added optional parameter `prefetch_count` in connection configuration,
2225
to setup channel prefetch count.
2326
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
@@ -71,6 +74,8 @@ CHANGELOG
7174
only. Pass the `auto_setup` connection option to control this.
7275
* Added a `SetupTransportsCommand` command to setup the transports
7376
* Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
77+
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
78+
* Deprecated publishing with a routing key from queue configuration, use exchange configuration instead.
7479

7580
4.2.0
7681
-----

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public function testItSendsAndReceivesMessages()
4949

5050
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
5151
$connection->setup();
52-
$connection->queue()->purge();
52+
$connection->purgeQueues();
5353

5454
$sender = new AmqpSender($connection, $serializer);
5555
$receiver = new AmqpReceiver($connection, $serializer);
@@ -79,7 +79,7 @@ public function testRetryAndDelay()
7979

8080
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
8181
$connection->setup();
82-
$connection->queue()->purge();
82+
$connection->purgeQueues();
8383

8484
$sender = new AmqpSender($connection, $serializer);
8585
$receiver = new AmqpReceiver($connection, $serializer);
@@ -126,7 +126,7 @@ public function testItReceivesSignals()
126126

127127
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
128128
$connection->setup();
129-
$connection->queue()->purge();
129+
$connection->purgeQueues();
130130

131131
$sender = new AmqpSender($connection, $serializer);
132132
$sender->send(new Envelope(new DummyMessage('Hello')));
@@ -173,7 +173,7 @@ public function testItCountsMessagesInQueue()
173173

174174
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
175175
$connection->setup();
176-
$connection->queue()->purge();
176+
$connection->purgeQueues();
177177

178178
$sender = new AmqpSender($connection, $serializer);
179179

@@ -182,7 +182,7 @@ public function testItCountsMessagesInQueue()
182182
$sender->send($second = new Envelope(new DummyMessage('Third')));
183183

184184
sleep(1); // give amqp a moment to have the messages ready
185-
$this->assertSame(3, $connection->countMessagesInQueue());
185+
$this->assertSame(3, $connection->countMessagesInQueues());
186186
}
187187

188188
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
16+
17+
/**
18+
* @requires extension amqp
19+
*/
20+
class AmqpReceivedStampTest extends TestCase
21+
{
22+
public function testStamp()
23+
{
24+
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
25+
26+
$stamp = new AmqpReceivedStamp($amqpEnvelope, 'queueName');
27+
28+
$this->assertSame($amqpEnvelope, $stamp->getAmqpEnvelope());
29+
$this->assertSame('queueName', $stamp->getQueueName());
30+
}
31+
}

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ public function testItReturnsTheDecodedMessageToTheHandler()
3636

3737
$amqpEnvelope = $this->createAMQPEnvelope();
3838
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
39-
$connection->method('get')->willReturn($amqpEnvelope);
39+
$connection->method('getQueueNames')->willReturn(['queueName']);
40+
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
4041

4142
$receiver = new AmqpReceiver($connection, $serializer);
4243
$actualEnvelopes = iterator_to_array($receiver->get());
@@ -52,11 +53,12 @@ public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
5253
$serializer = $this->createMock(SerializerInterface::class);
5354
$amqpEnvelope = $this->createAMQPEnvelope();
5455
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
55-
$connection->method('get')->willReturn($amqpEnvelope);
56-
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
56+
$connection->method('getQueueNames')->willReturn(['queueName']);
57+
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
58+
$connection->method('ack')->with($amqpEnvelope, 'queueName')->willThrowException(new \AMQPException());
5759

5860
$receiver = new AmqpReceiver($connection, $serializer);
59-
$receiver->ack(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
61+
$receiver->ack(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope, 'queueName')]));
6062
}
6163

6264
/**
@@ -67,11 +69,12 @@ public function testItThrowsATransportExceptionIfItCannotRejectMessage()
6769
$serializer = $this->createMock(SerializerInterface::class);
6870
$amqpEnvelope = $this->createAMQPEnvelope();
6971
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
70-
$connection->method('get')->willReturn($amqpEnvelope);
71-
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
72+
$connection->method('getQueueNames')->willReturn(['queueName']);
73+
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
74+
$connection->method('nack')->with($amqpEnvelope, 'queueName', AMQP_NOPARAM)->willThrowException(new \AMQPException());
7275

7376
$receiver = new AmqpReceiver($connection, $serializer);
74-
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
77+
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope, 'queueName')]));
7578
}
7679

7780
private function createAMQPEnvelope()
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
16+
17+
class AmqpRoutingKeyStampTest extends TestCase
18+
{
19+
public function testStamp()
20+
{
21+
$stamp = new AmqpRoutingKeyStamp('routing_key');
22+
$this->assertSame('routing_key', $stamp->getRoutingKey());
23+
}
24+
}

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
1718
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
1819
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
1920
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -38,6 +39,21 @@ public function testItSendsTheEncodedMessage()
3839
$sender->send($envelope);
3940
}
4041

42+
public function testItSendsTheEncodedMessageUsingARoutingKey()
43+
{
44+
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk'));
45+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
46+
47+
$serializer = $this->createMock(SerializerInterface::class);
48+
$serializer->method('encode')->with($envelope)->willReturn($encoded);
49+
50+
$connection = $this->createMock(Connection::class);
51+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk');
52+
53+
$sender = new AmqpSender($connection, $serializer);
54+
$sender->send($envelope);
55+
}
56+
4157
public function testItSendsTheEncodedMessageWithoutHeaders()
4258
{
4359
$envelope = new Envelope(new DummyMessage('Oy'));

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ public function testReceivesMessages()
4545
$amqpEnvelope->method('getHeaders')->willReturn(['my' => 'header']);
4646

4747
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
48-
$connection->method('get')->willReturn($amqpEnvelope);
48+
$connection->method('getQueueNames')->willReturn(['queueName']);
49+
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
4950

5051
$envelopes = iterator_to_array($transport->get());
5152
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());

0 commit comments

Comments
 (0)
0