8000 [messenger] Adds a stamp to provide a routing key on message publishing by G15N · Pull Request #30008 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[messenger] Adds a stamp to provide a routing key on message publishing #30008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 6, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[messenger] AMQP configurable routing key & multiple queues
  • Loading branch information
Guillaume Gammelin authored and sroze committed Apr 6, 2019
commit 3151b54b7a2289a56aa0bcf85331a72ce67a050e
5 changes: 5 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ CHANGELOG
changed: a required 3rd `SerializerInterface` argument was added.
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
explicitly handle messages synchronously.
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* Removed publishing with a `routing_key` option from queue configuration, for
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
* Added optional parameter `prefetch_count` in connection configuration,
to setup channel prefetch count.
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
Expand Down Expand Up @@ -71,6 +74,8 @@ CHANGELOG
only. Pass the `auto_setup` connection option to control this.
* Added a `SetupTransportsCommand` command to setup the transports
* Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* Deprecated publishing with a routing key from queue configuration, use exchange configuration instead.

4.2.0
-----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public function testItSendsAndReceivesMessages()

$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$connection->purgeQueues();

$sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($connection, $serializer);
Expand Down Expand Up @@ -79,7 +79,7 @@ public function testRetryAndDelay()

$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$connection->purgeQueues();

$sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($connection, $serializer);
Expand Down Expand Up @@ -126,7 +126,7 @@ public function testItReceivesSignals()

$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$connection->purgeQueues();

$sender = new AmqpSender($connection, $serializer);
$sender->send(new Envelope(new DummyMessage('Hello')));
Expand Down Expand Up @@ -173,7 +173,7 @@ public function testItCountsMessagesInQueue()

$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$connection->purgeQueues();

$sender = new AmqpSender($connection, $serializer);

Expand All @@ -182,7 +182,7 @@ public function testItCountsMessagesInQueue()
$sender->send(new Envelope(new DummyMessage('Third')));

sleep(1); // give amqp a moment to have the messages ready
$this->assertSame(3, $connection->countMessagesInQueue());
$this->assertSame(3, $connection->countMessagesInQueues());
}

private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;

/**
* @requires extension amqp
*/
class AmqpReceivedStampTest extends TestCase
{
public function testStamp()
{
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);

$stamp = new AmqpReceivedStamp($amqpEnvelope, 'queueName');

$this->assertSame($amqpEnvelope, $stamp->getAmqpEnvelope());
$this->assertSame('queueName', $stamp->getQueueName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public function testItReturnsTheDecodedMessageToTheHandler()

$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);

$receiver = new AmqpReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get());
Expand All @@ -52,11 +53,12 @@ public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
$serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
$connection->method('ack')->with($amqpEnvelope, 'queueName')->willThrowException(new \AMQPException());

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->ack(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
$receiver->ack(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope, 'queueName')]));
}

/**
Expand All @@ -67,11 +69,12 @@ public function testItThrowsATransportExceptionIfItCannotRejectMessage()
$serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
$connection->method('nack')->with($amqpEnvelope, 'queueName', AMQP_NOPARAM)->willThrowException(new \AMQPException());

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope)]));
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope, 'queueName')]));
}

private function createAMQPEnvelope()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;

class AmqpRoutingKeyStampTest extends TestCase
{
public function testStamp()
{
$stamp = new AmqpRoutingKeyStamp('routing_key');
$this->assertSame('routing_key', $stamp->getRoutingKey());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -38,6 +39,21 @@ public function testItSendsTheEncodedMessage()
$sender->send($envelope);
}

public function testItSendsTheEncodedMessageUsingARoutingKey()
{
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];

$serializer = $this->createMock(SerializerInterface::class);
$serializer->method('encode')->with($envelope)->willReturn($encoded);

$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk');

$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);
}

public function testItSendsTheEncodedMessageWithoutHeaders()
{
$envelope = new Envelope(new DummyMessage('Oy'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public function testReceivesMessages()
$amqpEnvelope->method('getHeaders')->willReturn(['my' => 'header']);

$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);

$envelopes = iterator_to_array($transport->get());
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
Expand Down
Loading
0