8000 [Messenger] AMQP configurable routing key & multiple queues by weaverryan · Pull Request #30770 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] AMQP configurable routing key & multiple queues #30770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ CHANGELOG
4.3.0
-----

* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* Removed publishing with a `routing_key` option from queue configuration, for
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
* Added optional parameter `prefetch_count` in connection configuration,
to setup channel prefetch count
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
Expand Down Expand Up @@ -52,7 +55,6 @@ CHANGELOG
* [BC BREAK] The Amqp Transport now automatically sets up the exchanges
and queues by default. Previously, this was done when in "debug" mode
only. Pass the `auto_setup` connection option to control this.

* Added a `SetupTransportsCommand` command to setup the transports

4.2.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;

class AmqpRoutingKeyStampTest extends TestCase
{
public function testStamp()
{
$stamp = new AmqpRoutingKeyStamp('routing_key');
$this->assertSame('routing_key', $stamp->getRoutingKey());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -38,6 +39,21 @@ public function testItSendsTheEncodedMessage()
$sender->send($envelope);
}

public function testItSendsTheEncodedMessageUsingARoutingKey()
{
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];

$serializer = $this->createMock(SerializerInterface::class);
$serializer->method('encode')->with($envelope)->willReturn($encoded);

$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk');

$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);
}

public function testItSendsTheEncodedMessageWithoutHeaders()
{
$envelope = new Envelope(new DummyMessage('Oy'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,11 @@ public function testItSetupsTheConnectionByDefault()

$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', 'exchange_key', AMQP_NOPARAM, ['headers' => []]);
$amqpQueue->expects($this->once())->method('declareQueue');
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'my_key');
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'queue_key');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=exchange_key&queue[routing_key]=queue_key', [], $factory);
$connection->publish('body');
}

Expand All @@ -228,13 +229,13 @@ public function testItCanDisableTheSetup()
$amqpQueue->expects($this->never())->method('declareQueue');
$amqpQueue->expects($this->never())->method('bind');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto_setup' => 'false'], $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key', ['auto_setup' => 'false'], $factory);
$connection->publish('body');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto_setup' => false], $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key', ['auto_setup' => false], $factory);
$connection->publish('body');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto_setup=false', [], $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key&auto_setup=false', [], $factory);
$connection->publish('body');
}

Expand Down Expand Up @@ -375,6 +376,83 @@ public function testObfuscatePasswordInDsn()
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost/%2f/messages', [], $factory);
$connection->channel();
}

public function testItCanPublishWithTheDefaultQueueRoutingKey()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->expects($this->once())->method('publish')->with('body', 'my_key');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key', [], $factory);
$connection->publish('body');
}

public function testItCanPublishWithASuppliedRoutingKey()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->expects($this->once())->method('publish')->with('body', 'supplied_key');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key', [], $factory);
$connection->publish('body', [], 0, 'supplied_key');
}

public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);

$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$delayExchange = $this->createMock(\AMQPExchange::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
));

$amqpExchange->expects($this->once())->method('setName')->with('messages');
$amqpExchange->method('getName')->willReturn('messages');

$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->method('getName')->willReturn('delay');

$connectionOptions = [
'retry' => [
'dead_routing_key' => 'my_dead_routing_key',
],
];

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);

$delayQueue->expects($this->once())->method('setName')->with('delay_queue_120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-dead-letter-exchange' => 'messages',
]);
$delayQueue->expects($this->once())->method('setArgument')->with(
'x-dead-letter-routing-key',
'routing_key'
);

$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000');

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]);
$connection->publish('{}', [], 120000, 'routing_key');
}
}

class TestAmqpFactory extends AmqpFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,15 @@ public function receive(callable $handler): void
{
while (!$this->shouldStop) {
try {
$amqpEnvelope = $this->connection->get();
// TODO - update this after #30708 is merged
$amqpEnvelope = null;
foreach ($this->getAllQueueNames() as $queueName) {
$amqpEnvelope = $this->connection->get($queueName);

if (null !== $amqpEnvelope) {
break;
}
}
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Stamp\StampInterface;

/**
* @author Guillaume Gammelin <ggammelin@gmail.com>
*
* @experimental in 4.3
*/
final class AmqpRoutingKeyStamp implements StampInterface
{
private $routingKey;

public function __construct(string $routingKey)
{
$this->routingKey = $routingKey;
}

public function getRoutingKey(): string
{
return $this->routingKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ public function send(Envelope $envelope): Envelope
}

try {
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
/** @var $routingKeyStamp AmqpRoutingKeyStamp */
$routingKeyStamp = $envelope->last(AmqpRoutingKeyStamp::class);
$routingKey = $routingKeyStamp ? $routingKeyStamp->getRoutingKey() : null;

$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, $routingKey);
} catch (\AMQPException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
Expand Down
Loading
0