-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Add possibility to define routing key when sending message throught amqp #28772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
8000Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt\Stamp; | ||
|
||
use PHPUnit\Framework\TestCase; | ||
use Symfony\Component\Messenger\Transport\AmqpExt\Stamp\RoutingKeyStamp; | ||
|
||
/** | ||
* @author Vincent Touzet <vincent.touzet@gmail.com> | ||
*/ | ||
class RoutingKeyStampTest extends TestCase | ||
{ | ||
public function testSerializable() | ||
{ | ||
$stamp = new RoutingKeyStamp('dummy_routing'); | ||
|
||
$this->assertEquals($stamp, unserialize(serialize($stamp))); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,13 +94,13 @@ public static function fromDsn(string $dsn, array $options = array(), bool $debu | |
/** | ||
* @throws \AMQPException | ||
*/ | ||
public function publish(string $body, array $headers = array()): void | ||
public function publish(string $body, array $headers = array(), string $routingKey = null): void | ||
{ | ||
if ($this->debug && $this->shouldSetup()) { | ||
$this->setup(); | ||
} | ||
|
||
$this->exchange()->publish($body, null, AMQP_NOPARAM, array('headers' => $headers)); | ||
$this->exchange()->publish($body, $routingKey, AMQP_NOPARAM, array('headers' => $headers)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd suggest enhancing the DSN that creates the connection object so that one could express/decide to ignore the routing key at "send time" and force routing without any routing key. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure to understand. You prefer to leave the Your idea is to have multiple transports configured ? For example if we want to handle log messages : framework:
messenger:
transports:
log.info:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%?routing_key=log.info'
options:
exchange: {name: "log", type: "topic"}
queue: {name: "log.info", routing_key: "log.info"}
log.error:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%?routing_key=log.error'
options:
exchange: {name: "log", type: "topic"}
queue: {name: "log.error", routing_key: "log.error"}
routing:
# Route your messages to the transports
'App\Message\LogMessage': [log.info, log.error] A message would be sent to multiple transport and then each transport will decide if he need to handle the message or not ? RabbitMQ already do this for us by routing the received message using the routing_key. We can then have only one transport for sending and one transport per queue for receiving. framework:
messenger:
transports:
# to send any log message
log:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange: {name: "log", type: "topic"}
queue: {name: "log.info", routing_key: "log.*"}
# Only for receiving using the messenger:consume-messages command
log.info:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange: {name: "log", type: "topic"}
queue: {name: "log.info", routing_key: "log.info"}
log.error:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange: {name: "log", type: "topic"}
queue: {name: "log.error", routing_key: "log.error"}
routing:
# Route your messages to the transports
'App\Message\LogMessage': [log] And the developer just need to create the message with its routing_key // The routing key is computed regarding the severity of the log message
$bus->dispatch(new Envelop(new \App\Message\LogMessage(...), new RoutingKeyStamp('log.error'))); I understand that it is thightly coupled with AMQP and therefor this is not agnostic 🤔 |
||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Transport\AmqpExt\Stamp; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be moved to the root Stamp namespace: we should make this stamp transport agnostic (and it should be documented as such, doc PR welcome btw :) ) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now, it's Amqp only so let's keep it here. Routing key makes little sense with Redis for example. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't agree with adding a transport-specific stamp. That'd break the purpose of the component and the underlying patterns. |
||
|
||
use Symfony\Component\Messenger\Stamp\StampInterface; | ||
|
||
/** | ||
* @author Vincent Touzet <vincent.touzet@gmail.com> | ||
*/ | ||
final class RoutingKeyStamp implements StampInterface | ||
{ | ||
private $routingKey; | ||
|
||
public function __construct(string $routingKey) | ||
{ | ||
$this->routingKey = $routingKey; | ||
} | ||
|
||
public function getRoutingKey(): string | ||
{ | ||
return $this->routingKey; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$routingKeyStamp