8000 Uses an `AmqpStamp` to provide flags and attributes · symfony/symfony@6d76faa · GitHub
[go: up one dir, main page]

Skip to content

Commit 6d76faa

Browse files
committed
Uses an AmqpStamp to provide flags and attributes
1 parent 09dee17 commit 6d76faa

File tree

8 files changed

+125
-50
lines changed

8 files changed

+125
-50
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ CHANGELOG
1818
changed: a required 3rd `SerializerInterface` argument was added.
1919
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
2020
explicitly handle messages synchronously.
21-
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
21+
* Added `AmqpStamp` allowing to provide a routing key, flags and attributes on message publishing.
2222
* [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for
23-
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
23+
AMQP. Use exchange `default_publish_routing_key` or `AmqpStamp` instead.
2424
* [BC BREAK] Changed the `queue` option in the AMQP transport DSN to be `queues[name]`. You can
2525
therefore name the queue but also configure `binding_keys`, `flags` and `arguments`.
2626
* [BC BREAK] The methods `get`, `ack`, `nack` and `queue` of the AMQP `Connection`

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpRoutingKeyStampTest.php

Lines changed: 0 additions & 24 deletions
This file was deleted.

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17-
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
17+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
1818
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
1919
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
2020
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -41,14 +41,14 @@ public function testItSendsTheEncodedMessage()
4141

4242
public function testItSendsTheEncodedMessageUsingARoutingKey()
4343
{
44-
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk'));
44+
$envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp = new AmqpStamp('rk'));
4545
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
4646

4747
$serializer = $this->createMock(SerializerInterface::class);
4848
$serializer->method('encode')->with($envelope)->willReturn($encoded);
4949

5050
$connection = $this->createMock(Connection::class);
51-
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk');
51+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp);
5252

5353
$sender = new AmqpSender($connection, $serializer);
5454
$sender->send($envelope);
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
16+
17+
class AmqpStampTest extends TestCase
18+
{
19+
public function testRoutingKeyOnly()
20+
{
21+
$stamp = new AmqpStamp('routing_key');
22+
$this->assertSame('routing_key', $stamp->getRoutingKey());
23+
$this->assertSame(AMQP_NOPARAM, $stamp->getFlags());
24+
$this->assertSame([], $stamp->getAttributes());
25+
}
26+
27+
public function testFlagsAndAttributes()
28+
{
29+
$stamp = new AmqpStamp(null, AMQP_DURABLE, ['delivery_mode' => 'unknown']);
30+
$this->assertSame(null, $stamp->getRoutingKey());
31+
$this->assertSame(AMQP_DURABLE, $stamp->getFlags());
32+
$this->assertSame(['delivery_mode' => 'unknown'], $stamp->getAttributes());
33+
}
34+
}

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
16+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1617
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
18+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
1719
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
1820

1921
/**
@@ -430,7 +432,7 @@ public function testItCanPublishWithASuppliedRoutingKey()
430432
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
431433

432434
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
433-
$connection->publish('body', [], 0, 'routing_key');
435+
$connection->publish('body', [], 0, new AmqpStamp('routing_key'));
434436
}
435437

436438
public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument()
@@ -477,7 +479,27 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
477479
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000');
478480

479481
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]);
480-
$connection->publish('{}', [], 120000, 'routing_key');
482+
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
483+
}
484+
485+
public function testItCanPublishWithCustomFlagsAndAttributes()
486+
{
487+
$factory = new TestAmqpFactory(
488+
$amqpConnection = $this->createMock(\AMQPConnection::class),
489+
$amqpChannel = $this->createMock(\AMQPChannel::class),
490+
$amqpQueue = $this->createMock(\AMQPQueue::class),
491+
$amqpExchange = $this->createMock(\AMQPExchange::class)
492+
);
493+
494+
$amqpExchange->expects($this->once())->method('publish')->with(
495+
'body',
496+
'routing_key',
497+
AMQP_IMMEDIATE,
498+
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]]
499+
);
500+
501+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
502+
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2]));
481503
}
482504
}
483505

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,12 @@ public function send(Envelope $envelope): Envelope
5151
}
5252

5353
try {
54-
/** @var $routingKeyStamp AmqpRoutingKeyStamp */
55-
$routingKeyStamp = $envelope->last(AmqpRoutingKeyStamp::class);
56-
$routingKey = $routingKeyStamp ? $routingKeyStamp->getRoutingKey() : null;
57-
58-
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, $routingKey);
54+
$this->connection->publish(
55+
$encodedMessage['body'],
56+
$encodedMessage['headers'] ?? [],
57+
$delay,
58+
$envelope->last(AmqpStamp::class)
59+
);
5960
} catch (\AMQPException $e) {
6061
throw new TransportException($e->getMessage(), 0, $e);
6162
}

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php renamed to src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpStamp.php

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,35 @@
1515

1616
/**
1717
* @author Guillaume Gammelin <ggammelin@gmail.com>
18+
* @author Samuel Roze <samuel.roze@gmail.com>
1819
*
1920
* @experimental in 4.3
2021
*/
21-
final class AmqpRoutingKeyStamp implements StampInterface
22+
final class AmqpStamp implements StampInterface
2223
{
2324
private $routingKey;
25+
private $flags;
26+
private $attributes;
2427

25-
public function __construct(string $routingKey)
28+
public function __construct(string $routingKey = null, int $flags = AMQP_NOPARAM, array $attributes = [])
2629
{
2730
$this->routingKey = $routingKey;
31+
$this->flags = $flags;
32+
$this->attributes = $attributes;
2833
}
2934

30-
public function getRoutingKey(): string
35+
public function getRoutingKey(): ?string
3136
{
3237
return $this->routingKey;
3338
}
39+
40+
public function getFlags(): int
41+
{
42+
return $this->flags;
43+
}
44+
45+
public function getAttributes(): array
46+
{
47+
return $this->attributes;
48+
}
3449
}

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,10 @@ private static function normalizeQueueArguments(array $arguments): array
171171
*
172172
* @throws \AMQPException
173173
*/
174-
public function publish(string $body, array $headers = [], int $delay = 0, string $routingKey = null): void
174+
public function publish(string $body, array $headers = [], int $delay = 0, AmqpStamp $amqpStamp = null): void
175175
{
176176
if (0 !== $delay) {
177-
$this->publishWithDelay($body, $headers, $delay, $routingKey);
177+
$this->publishWithDelay($body, $headers, $delay, $amqpStamp);
178178

179179
return;
180180
}
@@ -183,13 +183,14 @@ public function publish(string $body, array $headers = [], int $delay = 0, strin
183183
$this->setup();
184184
}
185185

186-
$this->exchange()->publish(
186+
$this->publishOnExchange(
187+
$this->exchange(),
187188
$body,
188-
$routingKey ?? $this->getDefaultPublishRoutingKey(),
189-
AMQP_NOPARAM,
189+
$this->routingKeyFromStamp($amqpStamp) ?? $this->getDefaultPublishRoutingKey(),
190190
[
191191
'headers' => $headers,
192-
]
192+
],
193+
$amqpStamp
193194
);
194195
}
195196

@@ -206,19 +207,36 @@ public function countMessagesInQueues(): int
206207
/**
207208
* @throws \AMQPException
208209
*/
209-
private function publishWithDelay(string $body, array $headers, int $delay, ?string $exchangeRoutingKey)
210+
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
210211
{
211212
if ($this->shouldSetup()) {
212-
$this->setupDelay($delay, $exchangeRoutingKey);
213+
$this->setupDelay($delay, $this->routingKeyFromStamp($amqpStamp));
213214
}
214215

215-
$this->getDelayExchange()->publish(
216+
$this->publishOnExchange(
217+
$this->getDelayExchange(),
216218
$body,
217219
$this->getRoutingKeyForDelay($delay),
218-
AMQP_NOPARAM,
219220
[
220221
'headers' => $headers,
221-
]
222+
],
223+
$amqpStamp
224+
);
225+
}
226+
227+
private function publishOnExchange(
228+
\AMQPExchange $exchange,
229+
string $body,
230+
string $routingKey = null,
231+
array $attributes = [],
232+
AmqpStamp $amqpStamp = null
233+
)
234+
{
235+
$exchange->publish(
236+
$body,
237+
$routingKey,
238+
$amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM,
239+
array_merge($amqpStamp ? $amqpStamp->getAttributes() : [], $attributes)
222240
);
223241
}
224242

@@ -432,4 +450,13 @@ public function purgeQueues()
432450
$this->queue($queueName)->purge();
433451
}
434452
}
453+
454+
private function routingKeyFromStamp(AmqpStamp $amqpStamp = null): ?string
455+
{
456+
if (null === $amqpStamp) {
457+
return null;
458+
}
459+
460+
return $amqpStamp->getRoutingKey();
461+
}
435462
}

0 commit comments

Comments
 (0)
0