-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Add possibility to define routing key when sending message throught amqp #28772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Messenger] Add possibility to define routing key when sending message throught amqp #28772
Conversation
Guys, do you have a release date please? This PR is blocking for me and would be much appreciated :) |
@brusin I see that @nicolas-grekas added this to the next milestone. So it will probably be available in Symfony 4.2 that will be release in the end of november 2018 |
552f200
to
862c09a
Compare
I've updated the code to be compliant with the PR #28911 |
862c09a
to
0b85e87
Compare
@sroze I've made the changes but I don't understand why the tests failed.
This same code worked for other tests 🤔 |
0b85e87
to
4980898
Compare
The |
Actually, it should be re-introduced, it was a mistake, it's very convenient userland 🤔 |
4980898
to
241ab2b
Compare
meh 😢 You merged the remove just after i rebased to master xD. I've re-rebased and updated my tests. Waiting for Travis. I think the remove may add BC Break to existing application. (If I update my application from Symfony 4.1 to 4.2 it will break without the wrap method.) |
|
OK, thinking twice about this, I'm withdrawing my previous comment. The routing can also be thought as a transport agnostic thing, eg something that specifies the user that some message should be sent to. Each transport can then use or not this info to route directly to some destination. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense actually, here are some comments to generalize the concept
@@ -37,8 +38,13 @@ public function __construct(Connection $connection, SerializerInterface $seriali | |||
*/ | |||
public function send(Envelope $envelope) | |||
{ | |||
$routingKey = null; | |||
/** @var RoutingKeyStamp|null $routingKeyConfig */ | 10000|||
if ($routingKeyConfig = $envelope->get(RoutingKeyStamp::class)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$routingKeyStamp
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Transport\AmqpExt\Stamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be moved to the root Stamp namespace: we should make this stamp transport agnostic (and it should be documented as such, doc PR welcome btw :) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, it's Amqp only so let's keep it here. Routing key makes little sense with Redis for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree with adding a transport-specific stamp. That'd break the purpose of the component and the underlying patterns.
To me a routing key can be used eg to target a specific user. That's totally not amqp specific with this pov.
{ | ||
if ($this->debug && $this->shouldSetup()) { | ||
$this->setup(); | ||
} | ||
|
||
$this->exchange()->publish($body, null, AMQP_NOPARAM, array('headers' => $headers)); | ||
$this->exchange()->publish($body, $routingKey, AMQP_NOPARAM, array('headers' => $headers)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest enhancing the DSN that creates the connection object so that one could express/decide to ignore the routing key at "send time" and force routing without any routing key.
The purpose would be to force messages to go to a worker without taking care of the routing key - and then the worker would use the stamp to decide for the next hop destination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure to understand. You prefer to leave the routing_key
parameter on the publish method to null ?
Your idea is to have multiple transports configured ? For example if we want to handle log messages :
framework:
messenger:
transports:
log.info:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%?routing_key=log.info'
options:
exchange: {name: "log", type: "topic"}
queue: {name: "log.info", routing_key: "log.info"}
log.error:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%?routing_key=log.error'
options:
exchange: {name: "log", type: "topic"}
queue: {name: "log.error", routing_key: "log.error"}
routing:
# Route your messages to the transports
'App\Message\LogMessage': [log.info, log.error]
A message would be sent to multiple transport and then each transport will decide if he need to handle the message or not ?
RabbitMQ already do this for us by routing the received message using the routing_key. We can then have only one transport for sending and one transport per queue for receiving.
framework:
messenger:
transports:
# to send any log message
log:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange: {name: "log", type: "topic"}
queue: {name: "log.info", routing_key: "log.*"}
# Only for receiving using the messenger:consume-messages command
log.info:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange: {name: "log", type: "topic"}
queue: {name: "log.info", routing_key: "log.info"}
log.error:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange: {name: "log", type: "topic"}
queue: {name: "log.error", routing_key: "log.error"}
routing:
# Route your messages to the transports
'App\Message\LogMessage': [log]
And the developer just need to create the message with its routing_key
// The routing key is computed regarding the severity of the log message
$bus->dispatch(new Envelop(new \App\Message\LogMessage(...), new RoutingKeyStamp('log.error')));
I understand that it is thightly coupled with AMQP and therefor this is not agnostic 🤔
Radically simpler approach. Less flexible but maybe only what you need for now: can you check if #28955 would to it for you? |
@nicolas-grekas I think this is the right way to achieve this. With your modification we can then have :
But I wonder how can we check in the handler side how to handle only if it's from a given queue. For example the handler of the |
Shall we close this PR then? |
I think it could be a good idea to explore the way you describe 👍 maybe we can discuss about it at forum PHP today ? |
Sure :)
…On Fri, 26 Oct 2018 at 07:32, Vincent Touzet ***@***.***> wrote:
I think it could be a good idea to explore the way you describe 👍 maybe
we can discuss about it at forum PHP today ?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#28772 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAxHEfJj4747RjUrAAojLG483_enVKSDks5uop5tgaJpZM4XNb8->
.
|
Hi all, I was wondering if Thanks framework:
messenger:
transports:
amqp_image_resize:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: image_resize_ex
type: topic
queues:
- { name: image_resize_jpeg_qu, routing_keys: ['jpeg'] }
- { name: image_resize_png_gif_qu, routing_keys: ['png', 'gif'] }
routing:
'App\MessageBus\Message\Image\Resize': amqp_image_resize // Ends up in "image_resize_jpeg_qu" queue
$this->messageBus->dispatch(
new Envelop(new Resize('/path/to/logo.jpeg'),
new RoutingKeyStamp('jpeg'))
);
// Ends up in "image_resize_png_gif_qu" queue
$this->messageBus->dispatch(
new Envelop(new Resize('/path/to/logo.png'),
new RoutingKeyStamp('png'))
);
// Ends up in "image_resize_png_gif_qu" queue
$this->messageBus->dispatch(
new Envelop(new Resize('/path/to/logo.gif'),
new RoutingKeyStamp('gif'))
); |
As describe in #28133 we can't actually define a custom routing key when sending message. It is now possible with the
Symfony\Component\Messenger\Stamp\AMQP\RoutingKeyStamp