8000 [Messenger] Add possibility to define routing key when sending message throught amqp by vincenttouzet · Pull Request #28772 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] Add possibility to define routing key when sending message throught amqp #28772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

vincenttouzet
Copy link
Contributor
@vincenttouzet vincenttouzet commented Oct 8, 2018
Q A
Branch? 4.2
Bug fix? yes
New feature? no
BC breaks? no
Deprecations? no
Tests pass? yes
Fixed tickets #28133
License MIT

As describe in #28133 we can't actually define a custom routing key when sending message. It is now possible with the Symfony\Component\Messenger\Stamp\AMQP\RoutingKeyStamp

$envelop = Envelop::wrap($message)->with(new RoutingKeyStamp('dummy_routing'));

@carsonbot carsonbot added < 8000 a id="label-19c7b4" href="/symfony/symfony/labels/Status%3A%20Needs%20Review" data-name="Status: Needs Review" style="--label-r:237;--label-g:237;--label-b:237;--label-h:0;--label-s:0;--label-l:92;" data-view-component="true" class="IssueLabel hx_IssueLabel d-inline-block v-align-middle"> Status: Needs Review Messenger labels Oct 8, 2018
@nicolas-grekas nicolas-grekas added this to the next milestone Oct 14, 2018
@brusin
Copy link
brusin commented Oct 15, 2018

Guys, do you have a release date please? This PR is blocking for me and would be much appreciated :)

@vincenttouzet
Copy link
Contributor Author

@brusin I see that @nicolas-grekas added this to the next milestone. So it will probably be available in Symfony 4.2 that will be release in the end of november 2018

@vincenttouzet vincenttouzet force-pushed the messenger-amqp-routing-key branch 2 times, most recently from 552f200 to 862c09a Compare October 21, 2018 11:54
@vincenttouzet
Copy link
Contributor Author

I've updated the code to be compliant with the PR #28911

@vincenttouzet vincenttouzet force-pushed the messenger-amqp-routing-key branch from 862c09a to 0b85e87 Compare October 21, 2018 13:42
@vincenttouzet
Copy link
Contributor Author

@sroze I've made the changes but I don't understand why the tests failed.

1) Symfony\Component\Messenger\Tests\Transport\AmqpExt\AmqpSenderTest::testItSendsWithRoutingKey
Error: Call to undefined method Symfony\Component\Messenger\Envelope::wrap()

This same code worked for other tests 🤔

@vincenttouzet vincenttouzet force-pushed the messenger-amqp-routing-key branch from 0b85e87 to 4980898 Compare October 21, 2018 14:10
@sroze
Copy link
Contributor
sroze commented Oct 21, 2018

The Envelope::wrap method has been removed in master 🙃

@sroze
Copy link
Contributor
sroze commented Oct 21, 2018

Actually, it should be re-introduced, it was a mistake, it's very convenient userland 🤔

@vincenttouzet vincenttouzet force-pushed the messenger-amqp-routing-key branch from 4980898 to 241ab2b Compare October 21, 2018 14:47
@vincenttouzet
Copy link
Contributor Author

meh 😢 You merged the remove just after i rebased to master xD. I've re-rebased and updated my tests. Waiting for Travis.

I think the remove may add BC Break to existing application. (If I update my application from Symfony 4.1 to 4.2 it will break without the wrap method.)

@nicolas-grekas
Copy link
Member
nicolas-grekas commented Oct 21, 2018

FYI, I think this is not the approach that we should follo: adding this routing key is tightening the message to the transport, which defeats the purpose of the component. Instead, the message should be routed with a name, and the transport should be selected by name. I'm on a PR to prepare the neede code infrastructure, i'll keep you posted.

8000

@nicolas-grekas
Copy link
Member

OK, thinking twice about this, I'm withdrawing my previous comment. The routing can also be thought as a transport agnostic thing, eg something that specifies the user that some message should be sent to. Each transport can then use or not this info to route directly to some destination.

Copy link
Member
@nicolas-grekas nicolas-grekas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense actually, here are some comments to generalize the concept

10000
@@ -37,8 +38,13 @@ public function __construct(Connection $connection, SerializerInterface $seriali
*/
public function send(Envelope $envelope)
{
$routingKey = null;
/** @var RoutingKeyStamp|null $routingKeyConfig */
if ($routingKeyConfig = $envelope->get(RoutingKeyStamp::class)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$routingKeyStamp

* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\AmqpExt\Stamp;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be moved to the root Stamp namespace: we should make this stamp transport agnostic (and it should be documented as such, doc PR welcome btw :) )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, it's Amqp only so let's keep it here. Routing key makes little sense with Redis for example.

Copy link
Member
@nicolas-grekas nicolas-grekas Oct 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with adding a transport-specific stamp. That'd break the purpose of the component and the underlying patterns.
To me a routing key can be used eg to target a specific user. That's totally not amqp specific with this pov.

{
if ($this->debug && $this->shouldSetup()) {
$this->setup();
}

$this->exchange()->publish($body, null, AMQP_NOPARAM, array('headers' => $headers));
$this->exchange()->publish($body, $routingKey, AMQP_NOPARAM, array('headers' => $headers));
Copy link
Member
@nicolas-grekas nicolas-grekas Oct 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest enhancing the DSN that creates the connection object so that one could express/decide to ignore the routing key at "send time" and force routing without any routing key.
The purpose would be to force messages to go to a worker without taking care of the routing key - and then the worker would use the stamp to decide for the next hop destination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure to understand. You prefer to leave the routing_key parameter on the publish method to null ?

Your idea is to have multiple transports configured ? For example if we want to handle log messages :

framework:
    messenger:
        transports:
            log.info: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%?routing_key=log.info'
                options:
                    exchange: {name: "log", type: "topic"}
                    queue: {name: "log.info", routing_key: "log.info"}
            log.error: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%?routing_key=log.error'
                options:
                    exchange: {name: "log", type: "topic"}
                    queue: {name: "log.error", routing_key: "log.error"}

        routing:
            # Route your messages to the transports
            'App\Message\LogMessage': [log.info, log.error]

A message would be sent to multiple transport and then each transport will decide if he need to handle the message or not ?

RabbitMQ already do this for us by routing the received message using the routing_key. We can then have only one transport for sending and one transport per queue for receiving.

framework:
    messenger:
        transports:
            # to send any log message
            log:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange: {name: "log", type: "topic"}
                    queue: {name: "log.info", routing_key: "log.*"}
            # Only for receiving using the messenger:consume-messages command
            log.info: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange: {name: "log", type: "topic"}
                    queue: {name: "log.info", routing_key: "log.info"}
            log.error: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange: {name: "log", type: "topic"}
                    queue: {name: "log.error", routing_key: "log.error"}

        routing:
            # Route your messages to the transports
            'App\Message\LogMessage': [log]

And the developer just need to create the message with its routing_key

// The routing key is computed regarding the severity of the log message
$bus->dispatch(new Envelop(new \App\Message\LogMessage(...), new RoutingKeyStamp('log.error')));

I understand that it is thightly coupled with AMQP and therefor this is not agnostic 🤔

@nicolas-grekas
Copy link
Member

Radically simpler approach. Less flexible but maybe only what you need for now: can you check if #28955 would to it for you?

@vincenttouzet
Copy link
Contributor Author

@nicolas-grekas I think this is the right way to achieve this.

With your modification we can then have :

framework:
    messenger:
        transports:
            log.info: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange: {name: "log", type: "topic"}
                    queue: {name: "log.info", routing_key: "log.info"}
            log.error: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange: {name: "log", type: "topic"}
                    queue: {name: "log.error", routing_key: "log.error"}

        routing:
            # Route your messages to the transports
            'App\Message\LogMessage': [log.info, log.error]

But I wonder how can we check in the handler side how to handle only if it's from a given queue. For example the handler of the App\Message\LogMessage may be different if the message comes from the log.error queue or the log.info. Maybe with a Symfony\Component\Messenger\Handler\MessageSubscriberInterface but we can only define method and priority

@nicolas-grekas
Copy link
Member

Shall we close this PR then?
#28955 fixes the issue that was spotted right?
I think my comments above go in a different direction, which could be interesting to follow IMHO, but maybe later on, WDYT?

@vincenttouzet
Copy link
Contributor Author

I think it could be a good idea to explore the way you describe 👍 maybe we can discuss about it at forum PHP today ?

@sroze
Copy link
Contributor
sroze commented Oct 26, 2018 via email
9E81

@nicolas-grekas nicolas-grekas modified the milestones: next, 4.2 Nov 1, 2018
@bentcoder
Copy link
bentcoder commented Jan 20, 2019

Hi all,

I was wondering if RoutingKeyStamp will ever be implemented at all? Other than that, it would be lovely being able to use something like below.

Thanks

framework:
    messenger:
        transports:
            amqp_image_resize:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: image_resize_ex
                        type: topic
                    queues:
                        - { name: image_resize_jpeg_qu, routing_keys: ['jpeg'] }
                        - { name: image_resize_png_gif_qu, routing_keys: ['png', 'gif'] }

        routing:
            'App\MessageBus\Message\Image\Resize': amqp_image_resize
        // Ends up in "image_resize_jpeg_qu" queue
        $this->messageBus->dispatch(
            new Envelop(new Resize('/path/to/logo.jpeg'),
            new RoutingKeyStamp('jpeg'))
        );

        // Ends up in "image_resize_png_gif_qu" queue
        $this->messageBus->dispatch(
            new Envelop(new Resize('/path/to/logo.png'),
            new RoutingKeyStamp('png'))
        );

        // Ends up in "image_resize_png_gif_qu" queue
        $this->messageBus->dispatch(
            new Envelop(new Resize('/path/to/logo.gif'),
            new RoutingKeyStamp('gif'))
        );

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants
0