8000 feature #36431 [Messenger] Add FIFO support to the SQS transport (cv6… · symfony/symfony@67948a7 · GitHub
[go: up one dir, main page]

Skip to content

Commit 67948a7

Browse files
committed
feature #36431 [Messenger] Add FIFO support to the SQS transport (cv65kr)
This PR was squashed before being merged into the 5.1-dev branch. Discussion ---------- [Messenger] Add FIFO support to the SQS transport | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | no | License | MIT | Doc PR | -- https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html Commits ------- 3760175 [Messenger] Add FIFO support to the SQS transport
2 parents a85545f + 3760175 commit 67948a7

File tree

7 files changed

+127
-21
lines changed

7 files changed

+127
-21
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ env:
2222
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
2323
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages
2424
- MESSENGER_SQS_DSN=sqs://localhost:9494/messages?sslmode=disable
25+
- MESSENGER_SQS_FIFO_QUEUE_DSN=sqs://localhost:9494/messages.fifo?sslmode=disable
2526
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1
2627

2728
matrix:

src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ CHANGELOG
55
-----
66

77
* Introduced the Amazon SQS bridge.
8-
8+
* Added FIFO support to the SQS transport

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,42 +17,51 @@
1717

1818
class AmazonSqsIntegrationTest extends TestCase
1919
{
20-
private $connection;
20+
public function testConnectionSendToFifoQueueAndGet(): void
21+
{
22+
if (!getenv('MESSENGER_SQS_FIFO_QUEUE_DSN')) {
23+
$this->markTestSkipped('The "MESSENGER_SQS_FIFO_QUEUE_DSN" environment variable is required.');
24+
}
2125

22-
protected function setUp(): void
26+
$this->execute(getenv('MESSENGER_SQS_FIFO_QUEUE_DSN'));
27+
}
28+
29+
public function testConnectionSendAndGet(): void
2330
{
2431
if (!getenv('MESSENGER_SQS_DSN')) {
2532
$this->markTestSkipped('The "MESSENGER_SQS_DSN" environment variable is required.');
2633
}
2734

28-
$this->connection = Connection::fromDsn(getenv('MESSENGER_SQS_DSN'), []);
29-
$this->connection->setup();
30-
$this->clearSqs();
35+
$this->execute(getenv('MESSENGER_SQS_DSN'));
3136
}
3237

33-
public function testConnectionSendAndGet()
38+
private function execute(string $dsn): void
3439
{
35-
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
36-
$this->assertSame(1, $this->connection->getMessageCount());
40+
$connection = Connection::fromDsn($dsn, []);
41+
$connection->setup();
42+
$this->clearSqs($connection);
43+
44+
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
45+
$this->assertSame(1, $connection->getMessageCount());
3746

3847
$wait = 0;
39-
while ((null === $encoded = $this->connection->get()) && $wait++ < 200) {
48+
while ((null === $encoded = $connection->get()) && $wait++ < 200) {
4049
usleep(5000);
4150
}
4251

4352
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
4453
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
4554
}
4655

47-
private function clearSqs()
56+
private function clearSqs(Connection $connection): void
4857
{
4958
$wait = 0;
5059
while ($wait++ < 50) {
51-
if (null === $message = $this->connection->get()) {
60+
if (null === $message = $connection->get()) {
5261
usleep(5000);
5362
continue;
5463
}
55-
$this->connection->delete($message['id']);
64+
$connection->delete($message['id']);
5665
}
5766
}
5867
}

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp;
1617
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsSender;
1718
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
1819
use Symfony\Component\Messenger\Envelope;
1920
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2021

2122
class AmazonSqsSenderTest extends TestCase
2223
{
23-
public function testSend()
24+
public function testSend(): void
2425
{
2526
$envelope = new Envelope(new DummyMessage('Oy'));
2627
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
@@ -36,4 +37,24 @@ public function testSend()
3637
$sender = new AmazonSqsSender($connection, $serializer);
3738
$sender->send($envelope);
3839
}
40+
41+
public function testSendWithAmazonSqsFifoStamp(): void
42+
{
43+
$envelope = (new Envelope(new DummyMessage('Oy')))
44+
->with($stamp = new AmazonSqsFifoStamp('testGroup', 'testDeduplicationId'));
45+
46+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
47+
48+
$connection = $this->getMockBuilder(Connection::class)
49+
->disableOriginalConstructor()
50+
->getMock();
51+
$connection->expects($this->once())->method('send')
52+
->with($encoded['body'], $encoded['headers'], 0, $stamp->getMessageGroupId(), $stamp->getMessageDeduplicationId());
53+
54+
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
55+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
56+
57+
$sender = new AmazonSqsSender($connection, $serializer);
58+
$sender->send($envelope);
59+
}
3960
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
13+
14+
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
15+
16+
final class AmazonSqsFifoStamp implements NonSendableStampInterface
17+
{
18+
private $messageGroupId;
19+
20+
private $messageDeduplicationId;
21+
22+
public function __construct(?string $messageGroupId = null, ?string $messageDeduplicationId = null)
23+
{
24+
$this->messageGroupId = $messageGroupId;
25+
$this->messageDeduplicationId = $messageDeduplicationId;
26+
}
27+
28+
public function getMessageGroupId(): ?string
29+
{
30+
return $this->messageGroupId;
31+
}
32+
33+
public function getMessageDeduplicationId(): ?string
34+
{
35+
return $this->messageDeduplicationId;
36+
}
37+
}

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,24 @@ public function send(Envelope $envelope): Envelope
4343
$delayStamp = $envelope->last(DelayStamp::class);
4444
$delay = null !== $delayStamp ? (int) ceil($delayStamp->getDelay() / 1000) : 0;
4545

46+
$messageGroupId = null;
47+
$messageDeduplicationId = null;
48+
49+
/** @var AmazonSqsFifoStamp|null $amazonSqsFifoStamp */
50+
$amazonSqsFifoStamp = $envelope->last(AmazonSqsFifoStamp::class);
51+
if (null !== $amazonSqsFifoStamp) {
52+
$messageGroupId = $amazonSqsFifoStamp->getMessageGroupId();
53+
$messageDeduplicationId = $amazonSqsFifoStamp->getMessageDeduplicationId();
54+
}
55+
4656
try {
47-
$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
57+
$this->connection->send(
58+
$encodedMessage['body'],
59+
$encodedMessage['headers'] ?? [],
60+
$delay,
61+
$messageGroupId,
62+
$messageDeduplicationId
63+
);
4864
} catch (HttpExceptionInterface $e) {
4965
throw new TransportException($e->getMessage(), 0, $e);
5066
}

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
*/
2828
class Connection
2929
{
30+
private const AWS_SQS_FIFO_SUFFIX = '.fifo';
31+
3032
private const DEFAULT_OPTIONS = [
3133
'buffer_size' => 9,
3234
'wait_time' => 20,
@@ -196,10 +198,16 @@ private function getNewMessages(): \Generator
196198

197199
public function setup(): void
198200
{
199-
$this->call($this->configuration['endpoint'], [
201+
$parameters = [
200202
'Action' => 'CreateQueue',
201203
'QueueName' => $this->configuration['queue_name'],
202-
]);
204+
];
205+
206+
if ($this->isFifoQueue($this->configuration['queue_name'])) {
207+
$parameters['FifoQueue'] = true;
208+
}
209+
210+
$this->call($this->configuration['endpoint'], $parameters);
203211
$this->queueUrl = null;
204212

205213
$this->configuration['auto_setup'] = false;
@@ -232,17 +240,26 @@ public function getMessageCount(): int
232240
return 0;
233241
}
234242

235-
public function send(string $body, array $headers, int $delay = 0): void
243+
public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null): void
236244
{
237245
if ($this->configuration['auto_setup']) {
238246
$this->setup();
239247
}
240248

241-
$this->call($this->getQueueUrl(), [
249+
$messageBody = json_encode(['body' => $body, 'headers' => $headers]);
250+
251+
$parameters = [
242252
'Action' => 'SendMessage',
243-
'MessageBody' => json_encode(['body' => $body, 'headers' => $headers]),
253+
'MessageBody' => $messageBody,
244254
'DelaySeconds' => $delay,
245-
]);
255+
];
256+
257+
if ($this->isFifoQueue($this->configuration['queue_name'])) {
258+
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
259+
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1($messageBody);
260+
}
261+
262+
$this->call($this->getQueueUrl(), $parameters);
246263
}
247264

248265
public function reset(): void
@@ -362,4 +379,9 @@ private function checkResponse(ResponseInterface $response): void
362379
throw new TransportException($error->Error->Message);
363380
}
364381
}
382+
383+
private function isFifoQueue(string $queueName): bool
384+
{
385+
return self::AWS_SQS_FIFO_SUFFIX === substr($queueName, -\strlen(self::AWS_SQS_FIFO_SUFFIX));
386+
}
365387
}

0 commit comments

Comments
 (0)
0