8000 [Messenger] Add possibility to define routing key when sending messag… · symfony/symfony@0b85e87 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0b85e87

Browse files
committed
[Messenger] Add possibility to define routing key when sending message through amqp
1 parent 316e95c commit 0b85e87

File tree

5 files changed

+86
-3
lines changed

5 files changed

+86
-3
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp\AMQP;
13+
14+
use Symfony\Component\Messenger\Stamp\StampInterface;
15+
16+
/**
17+
* @author Vincent Touzet <vincent.touzet@gmail.com>
18+
*/
19+
final class RoutingKeyStamp implements StampInterface
20+
{
21+
private $routingKey;
22+
23+
public function __construct(string $routingKey)
24+
{
25+
$this->routingKey = $routingKey;
26+
}
27+
28+
public function getRoutingKey(): string
29+
{
30+
return $this->routingKey;
31+
}
32+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Stamp\AMQP;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Stamp\AMQP\RoutingKeyStamp;
16+
17+
/**
18+
* @author Vincent Touzet <vincent.touzet@gmail.com>
19+
*/
20+
class RoutingKeyStampTest extends TestCase
21+
{
22+
public function testSerializable()
23+
{
24+
$stamp = new RoutingKeyStamp('dummy_routing');
25+
26+
$this->assertEquals($stamp, unserialize(serialize($stamp)));
27+
}
28+
}

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Envelope;
16+
use Symfony\Component\Messenger\Stamp\AMQP\RoutingKeyStamp;
1617
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1718
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
1819
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
@@ -37,4 +38,20 @@ public function testItSendsTheEncodedMessage()
3738
$sender = new AmqpSende 6D40 r($connection, $serializer);
3839
$sender->send($envelope);
3940
}
41+
42+
public function testItSendsWithRoutingKey()
43+
{
44+
$envelope = Envelope::wrap(new DummyMessage('Oy'));
45+
$envelope = $envelope->with(new RoutingKeyStamp('dummy_routing'));
46+
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));
47+
48+
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
49+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
50+
51+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
52+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 'dummy_routing');
53+
54+
$sender = new AmqpSender($connection, $serializer);
55+
$sender->send($envelope);
56+
}
4057
}

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Transport\AmqpExt;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Stamp\AMQP\RoutingKeyStamp;
1516
use Symfony\Component\Messenger\Transport\SenderInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
1718
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -37,8 +38,13 @@ public function __construct(Connection $connection, SerializerInterface $seriali
3738
*/
3839
public function send(Envelope $envelope)
3940
{
41+
$routingKey = null;
42+
/** @var RoutingKeyStamp|null $routingKeyConfig */
43+
if ($routingKeyConfig = $envelope->get(RoutingKeyStamp::class)) {
44+
$routingKey = $routingKeyConfig->getRoutingKey();
45+
}
4046
$encodedMessage = $this->serializer->encode($envelope);
4147

42-
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
48+
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'], $routingKey);
4349
}
4450
}

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,13 @@ public static function fromDsn(string $dsn, array $options = array(), bool $debu
9494
/**
9595
* @throws \AMQPException
9696
*/
97-
public function publish(string $body, array $headers = array()): void
97+
public function publish(string $body, array $headers = array(), string $routingKey = null): void
9898
{
9999
if ($this->debug && $this->shouldSetup()) {
100100
$this->setup();
101101
}
102102

103-
$this->exchange()->publish($body, null, AMQP_NOPARAM, array('headers' => $headers));
103+
$this->exchange()->publish($body, $routingKey, AMQP_NOPARAM, array('headers' => $headers));
104104
}
105105

106106
/**

0 commit comments

Comments
 (0)
0