8000 [messenger] Moves RoutingKeyStamp to Transport\AmqpExt. · symfony/symfony@28c6a74 · GitHub
[go: up one dir, main page]

Skip to content 8000

Commit 28c6a74

Browse files
author
Guillaume Gammelin
committed
[messenger] Moves RoutingKeyStamp to Transport\AmqpExt.
1 parent 5a59170 commit 28c6a74

File tree

6 files changed

+80
-27
lines changed

6 files changed

+80
-27
lines changed

src/Symfony/Component/Messenger/Tests/Stamp/AmqpExt/RoutingKeyStampTest.php renamed to src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpRoutingKeyStampTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
namespace Symfony\Component\Messenger\Tests\Stamp\AmqpExt;
12+
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
1313

1414
use PHPUnit\Framework\TestCase;
15-
use Symfony\Component\Messenger\Stamp\AmqpExt\RoutingKeyStamp;
15+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
1616

17-
class RoutingKeyStampTest extends TestCase
17+
class AmqpRoutingKeyStampTest extends TestCase
1818
{
1919
public function testStamp()
2020
{
21-
$stamp = new RoutingKeyStamp('routing_key');
21+
$stamp = new AmqpRoutingKeyStamp('routing_key');
2222
$this->assertSame('routing_key', $stamp->getRoutingKey());
2323
}
2424
}

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Envelope;
16< E864 /code>-
use Symfony\Component\Messenger\Stamp\AmqpExt\RoutingKeyStamp;
1716
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1817
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
1918
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
19+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
2020
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2121

2222
/**
@@ -41,14 +41,14 @@ public function testItSendsTheEncodedMessage()
4141

4242
public function testItSendsTheEncodedMessageUsingARoutingKey()
4343
{
44-
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new RoutingKeyStamp('rk'));
44+
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk'));
4545
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
4646

47-
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
47+
$serializer = $this->createMock(SerializerInterface::class);
4848
$serializer->method('encode')->with($envelope)->willReturn($encoded);
4949

5050
$connection = $this->createMock(Connection::class);
51-
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 'rk');
51+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk');
5252

5353
$sender = new AmqpSender($connection, $serializer);
5454
$sender->send($envelope);

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -375,16 +375,63 @@ public function testItCanPublishWithTheDefaultQueueRoutingKey()
375375
public function testItCanPublishWithASuppliedRoutingKey()
376376
{
377377
$factory = new TestAmqpFactory(
378-
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
379-
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(),
380-
$amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(),
381-
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
378+
$amqpConnection = $this->createMock(\AMQPConnection::class),
379+
$amqpChannel = $this->createMock(\AMQPChannel::class),
380+
$amqpQueue = $this->createMock(\AMQPQueue::class),
381+
$amqpExchange = $this->createMock(\AMQPExchange::class)
382382
);
383383

384384
$amqpExchange->expects($this->once())->method('publish')->with('body', 'supplied_key');
385385

386386
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], $factory);
387-
$connection->publish('body', [], 'supplied_key');
387+
$connection->publish('body', [], 0, 'supplied_key');
388+
}
389+
390+
public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument()
391+
{
392+
$amqpConnection = $this->createMock(\AMQPConnection::class);
393+
$amqpChannel = $this->createMock(\AMQPChannel::class);
394+
$delayQueue = $this->createMock(\AMQPQueue::class);
395+
396+
$factory = $this->createMock(AmqpFactory::class);
397+
$factory->method('createConnection')->willReturn($amqpConnection);
398+
$factory->method('createChannel')->willReturn($amqpChannel);
399+
$factory->method('createQueue')->willReturn($delayQueue);
400+
$factory->method('createExchange')->will($this->onConsecutiveCalls(
401+
$delayExchange = $this->createMock(\AMQPExchange::class),
402+
$amqpExchange = $this->createMock(\AMQPExchange::class)
403+
));
404+
405+
$amqpExchange->expects($this->once())->method('setName')->with('messages');
406+
$amqpExchange->method('getName')->willReturn('messages');
407+
408+
$delayExchange->expects($this->once())->method('setName')->with('delay');
409+
$delayExchange->expects($this->once())->method('declareExchange');
410+
$delayExchange->method('getName')->willReturn('delay');
411+
412+
$connectionOptions = [
413+
'retry' => [
414+
'dead_routing_key' => 'my_dead_routing_key',
415+
],
416+
];
417+
418+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
419+
420+
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_120000');
421+
$delayQueue->expects($this->once())->method('setArguments')->with([
422+
'x-message-ttl' => 120000,
423+
'x-dead-letter-exchange' => 'messages',
424+
]);
425+
$delayQueue->expects($this->once())->method('setArgument')->with(
426+
'x-dead-letter-routing-key',
427+
'routing_key'
428+
);
429+
430+
$delayQueue->expects($this->once())->method('declareQueue');
431+
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000');
432+
433+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]);
434+
$connection->publish('{}', [], 120000, 'routing_key');
388435
}
389436
}
390437

src/Symfony/Component/Messenger/Stamp/AmqpExt/RoutingKeyStamp.php renamed to src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
namespace Symfony\Component\Messenger\Stamp\AmqpExt;
12+
namespace Symfony\Component\Messenger\Transport\AmqpExt;
1313

1414
use Symfony\Component\Messenger\Stamp\StampInterface;
1515

@@ -18,7 +18,7 @@
1818
*
1919
* @experimental in 4.3
2020
*/
21-
final class RoutingKeyStamp implements StampInterface
21+
final class AmqpRoutingKeyStamp implements StampInterface
2222
{
2323
/**
2424
* @var string

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\TransportException;
1616
use Symfony\Component\Messenger\Stamp\DelayStamp;
17-
use Symfony\Component\Messenger\Stamp\AmqpExt\RoutingKeyStamp;
17+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1818
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1919
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
20-
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2120

2221
/**
2322
* Symfony Messenger sender to send messages to AMQP brokers using PHP's AMQP extension.
@@ -52,7 +51,8 @@ public function send(Envelope $envelope): Envelope
5251
}
5352

5453
try {
55-
$routingKeyStamp = $envelope->last(RoutingKeyStamp::class);
54+
/** @var $routingKeyStamp AmqpRoutingKeyStamp */
55+
$routingKeyStamp = $envelope->last(AmqpRoutingKeyStamp::class);
5656
$routingKey = $routingKeyStamp ? $routingKeyStamp->getRoutingKey() : null;
5757

5858
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, $routingKey);

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ private static function normalizeQueueArguments(array $arguments): array
168168
public function publish(string $body, array $headers = [], int $delay = 0, string $routingKey = null): void
169169
{
170170
if (0 !== $delay) {
171-
$this->publishWithDelay($body, $headers, $delay);
171+
$this->publishWithDelay($body, $headers, $delay, $routingKey);
172172

173173
return;
174174
}
@@ -179,18 +179,18 @@ public function publish(string $body, array $headers = [], int $delay = 0, strin
179179

180180
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
181181
$attributes = $this->getAttributes($headers);
182-
$routingKey = $routingKey ?: ($this->queueConfiguration['routing_key'] ?? null);
182+
$routingKey = $this->getRoutingKey($routingKey);
183183

184184
$this->exchange()->publish($body, $routingKey, $flags, $attributes);
185185
}
186186

187187
/**
188188
* @throws \AMQPException
189189
*/
190-
private function publishWithDelay(string $body, array $headers = [], int $delay)
190+
private function publishWithDelay(string $body, array $headers, int $delay, ?string $routingKey)
191191
{
192192
if ($this->shouldSetup()) {
193-
$this->setupDelay($delay);
193+
$this->setupDelay($delay, $routingKey);
194194
}
195195

196196
$routingKey = $this->getRoutingKeyForDelay($delay);
@@ -200,7 +200,7 @@ private function publishWithDelay(string $body, array $headers = [], int $delay)
200200
$this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes);
201201
}
202202

203-
private function setupDelay(int $delay)
203+
private function setupDelay(int $delay, ?string $routingKey)
204204
{
205205
if (!$this->channel()->isConnected()) {
206206
$this->clear();
@@ -209,7 +209,7 @@ private function setupDelay(int $delay)
209209
$exchange = $this->getDelayExchange();
210210
$exchange->declareExchange();
211211

212-
$queue = $this->createDelayQueue($delay);
212+
$queue = $this->createDelayQueue($delay, $routingKey);
213213
$queue->declareQueue();
214214
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
215215
}
@@ -234,7 +234,7 @@ private function getDelayExchange(): \AMQPExchange
234234
* which is the original exchange, resulting on it being put back into
235235
* the original queue.
236236
*/
237-
private function createDelayQueue(int $delay)
237+
private function createDelayQueue(int $delay, ?string $routingKey)
238238
{
239239
$delayConfiguration = $this->connectionConfiguration['delay'];
240240

@@ -245,9 +245,10 @@ private function createDelayQueue(int $delay)
245245
'x-dead-letter-exchange' => $this->exchange()->getName(),
246246
]);
247247

248-
if (isset($this->queueConfiguration['routing_key'])) {
248+
$routingKey = $this->getRoutingKey($routingKey);
249+
if ($routingKey) {
249250
// after being released from to DLX, this routing key will be used
250-
$queue->setArgument('x-dead-letter-routing-key', $this->queueConfiguration['routing_key']);
251+
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
251252
}
252253

253254
return $queue;
@@ -389,4 +390,9 @@ private function getAttributes(array $headers): array
389390
{
390391
return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
391392
}
393+
394+
private function getRoutingKey(?string $expected = ''): ?string
395+
{
396+
return $expected ?: ($this->queueConfiguration['routing_key'] ?? null);
397+
}
392398
}

0 commit comments

Comments
 (0)
0