diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php index 398f0092922c7..88234f04d50b9 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php @@ -16,6 +16,7 @@ use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; +use Symfony\Component\Messenger\Transport\AmqpExt\Stamp\RoutingKeyStamp; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; /** @@ -37,4 +38,20 @@ public function testItSendsTheEncodedMessage() $sender = new AmqpSender($connection, $serializer); $sender->send($envelope); } + + public function testItSendsWithRoutingKey() + { + $envelope = new Envelope(new DummyMessage('Oy')); + $envelope = $envelope->with(new RoutingKeyStamp('dummy_routing')); + $encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class)); + + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 'dummy_routing'); + + $sender = new AmqpSender($connection, $serializer); + $sender->send($envelope); + } } diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Stamp/RoutingKeyStampTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Stamp/RoutingKeyStampTest.php new file mode 100644 index 0000000000000..7a3a08f69264e --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Stamp/RoutingKeyStampTest.php @@ -0,0 +1,28 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt\Stamp; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Transport\AmqpExt\Stamp\RoutingKeyStamp; + +/** + * @author Vincent Touzet + */ +class RoutingKeyStampTest extends TestCase +{ + public function testSerializable() + { + $stamp = new RoutingKeyStamp('dummy_routing'); + + $this->assertEquals($stamp, unserialize(serialize($stamp))); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php index bb084a8f03ae7..7f8cc41204637 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\AmqpExt\Stamp\RoutingKeyStamp; use Symfony\Component\Messenger\Transport\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -37,8 +38,13 @@ public function __construct(Connection $connection, SerializerInterface $seriali */ public function send(Envelope $envelope) { + $routingKey = null; + /** @var RoutingKeyStamp|null $routingKeyConfig */ + if ($routingKeyConfig = $envelope->get(RoutingKeyStamp::class)) { + $routingKey = $routingKeyConfig->getRoutingKey(); + } $encodedMessage = $this->serializer->encode($envelope); - $this->connection->publish($encodedMessage['body'], $encodedMessage['headers']); + $this->connection->publish($encodedMessage['body'], $encodedMessage['headers'], $routingKey); } } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index 5fef71cd39470..9909eb6c835fb 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -94,13 +94,13 @@ public static function fromDsn(string $dsn, array $options = array(), bool $debu /** * @throws \AMQPException */ - public function publish(string $body, array $headers = array()): void + public function publish(string $body, array $headers = array(), string $routingKey = null): void { if ($this->debug && $this->shouldSetup()) { $this->setup(); } - $this->exchange()->publish($body, null, AMQP_NOPARAM, array('headers' => $headers)); + $this->exchange()->publish($body, $routingKey, AMQP_NOPARAM, array('headers' => $headers)); } /** diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Stamp/RoutingKeyStamp.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Stamp/RoutingKeyStamp.php new file mode 100644 index 0000000000000..63b59cf5095f2 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Stamp/RoutingKeyStamp.php @@ -0,0 +1,32 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\AmqpExt\Stamp; + +use Symfony\Component\Messenger\Stamp\StampInterface; + +/** + * @author Vincent Touzet + */ +final class RoutingKeyStamp implements StampInterface +{ + private $routingKey; + + public function __construct(string $routingKey) + { + $this->routingKey = $routingKey; + } + + public function getRoutingKey(): string + { + return $this->routingKey; + } +}