8000 [Messenger] Add possibility to define routing key when sending message throught amqp by vincenttouzet · Pull Request #28772 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] Add possibility to define routing key when sending message throught amqp #28772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
8000
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\AmqpExt\Stamp\RoutingKeyStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

/**
Expand All @@ -37,4 +38,20 @@ public function testItSendsTheEncodedMessage()
$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);
}

public function testItSendsWithRoutingKey()
{
$envelope = new Envelope(new DummyMessage('Oy'));
$envelope = $envelope->with(new RoutingKeyStamp('dummy_routing'));
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));

$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 'dummy_routing');

$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt\Stamp;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\AmqpExt\Stamp\RoutingKeyStamp;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class RoutingKeyStampTest extends TestCase
{
public function testSerializable()
{
$stamp = new RoutingKeyStamp('dummy_routing');

$this->assertEquals($stamp, unserialize(serialize($stamp)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\AmqpExt\Stamp\RoutingKeyStamp;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -37,8 +38,13 @@ public function __construct(Connection $connection, SerializerInterface $seriali
*/
public function send(Envelope $envelope)
{
$routingKey = null;
/** @var RoutingKeyStamp|null $routingKeyConfig */
if ($routingKeyConfig = $envelope->get(RoutingKeyStamp::class)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$routingKeyStamp

$routingKey = $routingKeyConfig->getRoutingKey();
}
$encodedMessage = $this->serializer->encode($envelope);

$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'], $routingKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ public static function fromDsn(string $dsn, array $options = array(), bool $debu
/**
* @throws \AMQPException
*/
public function publish(string $body, array $headers = array()): void
public function publish(string $body, array $headers = array(), string $routingKey = null): void
{
if ($this->debug && $this->shouldSetup()) {
$this->setup();
}

$this->exchange()->publish($body, null, AMQP_NOPARAM, array('headers' => $headers));
$this->exchange()->publish($body, $routingKey, AMQP_NOPARAM, array('headers' => $headers));
Copy link
Member
@nicolas-grekas nicolas-grekas Oct 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest enhancing the DSN that creates the connection object so that one could express/decide to ignore the routing key at "send time" and force routing without any routing key.
The purpose would be to force messages to go to a worker without taking care of the routing key - and then the worker would use the stamp to decide for the next hop destination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure to understand. You prefer to leave the routing_key parameter on the publish method to null ?

Your idea is to have multiple transports configured ? For example if we want to handle log messages :

framework:
    messenger:
        transports:
            log.info: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%?routing_key=log.info'
                options:
                    exchange: {name: "log", type: "topic"}
                    queue: {name: "log.info", routing_key: "log.info"}
            log.error: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%?routing_key=log.error'
                options:
                    exchange: {name: "log", type: "topic"}
                    queue: {name: "log.error", routing_key: "log.error"}

        routing:
            # Route your messages to the transports
            'App\Message\LogMessage': [log.info, log.error]

A message would be sent to multiple transport and then each transport will decide if he need to handle the message or not ?

RabbitMQ already do this for us by routing the received message using the routing_key. We can then have only one transport for sending and one transport per queue for receiving.

framework:
    messenger:
        transports:
            # to send any log message
            log:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange: {name: "log", type: "topic"}
                    queue: {name: "log.info", routing_key: "log.*"}
            # Only for receiving using the messenger:consume-messages command
            log.info: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange: {name: "log", type: "topic"}
                    queue: {name: "log.info", routing_key: "log.info"}
            log.error: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange: {name: "log", type: "topic"}
                    queue: {name: "log.error", routing_key: "log.error"}

        routing:
            # Route your messages to the transports
            'App\Message\LogMessage': [log]

And the developer just need to create the message with its routing_key

// The routing key is computed regarding the severity of the log message
$bus->dispatch(new Envelop(new \App\Message\LogMessage(...), new RoutingKeyStamp('log.error')));

I understand that it is thightly coupled with AMQP and therefor this is not agnostic 🤔

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\AmqpExt\Stamp;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be moved to the root Stamp namespace: we should make this stamp transport agnostic (and it should be documented as such, doc PR welcome btw :) )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, it's Amqp only so let's keep it here. Routing key makes little sense with Redis for example.

Copy link
Member
@nicolas-grekas nicolas-grekas Oct 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with adding a transport-specific stamp. That'd break the purpose of the component and the underlying patterns.
To me a routing key can be used eg to target a specific user. That's totally not amqp specific with this pov.


use Symfony\Component\Messenger\Stamp\StampInterface;

/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
final class RoutingKeyStamp implements StampInterface
{
private $routingKey;

public function __construct(string $routingKey)
{
$this->routingKey = $routingKey;
}

public function getRoutingKey(): string
{
return $this->routingKey;
}
}
0