8000 FIFO support for SQS · symfony/symfony@3975263 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3975263

Browse files
committed
FIFO support for SQS
1 parent d1fd0b1 commit 3975263

File tree

3 files changed

+49
-18
lines changed

3 files changed

+49
-18
lines changed

src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ CHANGELOG
55
-----
66

77
* Introduced the Amazon SQS bridge.
8-
8+
* FIFO support

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,42 +17,51 @@
1717

1818
class AmazonSqsIntegrationTest extends TestCase
1919
{
20-
private $connection;
20+
public function testConnectionSendToFifoQueueAndGet(): void
21+
{
22+
if (!getenv('MESSENGER_SQS_FIFO_QUEUE_DSN')) {
23+
$this->markTestSkipped('The "MESSENGER_SQS_FIFO_QUEUE_DSN" environment variable is required.');
24+
}
2125

22-
protected function setUp(): void
26+
$this->execute(getenv('MESSENGER_SQS_FIFO_QUEUE_DSN'));
27+
}
28+
29+
public function testConnectionSendAndGet(): void
2330
{
2431
if (!getenv('MESSENGER_SQS_DSN')) {
2532
$this->markTestSkipped('The "MESSENGER_SQS_DSN" environment variable is required.');
2633
}
2734

28-
$this->connection = Connection::fromDsn(getenv('MESSENGER_SQS_DSN'), []);
29-
$this->connection->setup();
30-
$this->clearSqs();
35+
$this->execute(getenv('MESSENGER_SQS_DSN'));
3136
}
3237

33-
public function testConnectionSendAndGet()
38+
private function execute(string $dsn): void
3439
{
35-
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
36-
$this->assertSame(1, $this->connection->getMessageCount());
40+
$connection = Connection::fromDsn($dsn, []);
41+
$connection->setup();
42+
$this->clearSqs($connection);
43+
44+
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
45+
$this->assertSame(1, $connection->getMessageCount());
3746

3847
$wait = 0;
39-
while ((null === $encoded = $this->connection->get()) && $wait++ < 200) {
48+
while ((null === $encoded = $connection->get()) && $wait++ < 200) {
4049
usleep(5000);
4150
}
4251

4352
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
4453
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
4554
}
4655

47-
private function clearSqs()
56+
private function clearSqs(Connection $connection): void
4857
{
4958
$wait = 0;
5059
while ($wait++ < 50) {
51-
if (null === $message = $this->connection->get()) {
60+
if (null === $message = $connection->get()) {
5261
usleep(5000);
5362
continue;
5463
}
55-
$this->connection->delete($message['id']);
64+
$connection->delete($message['id']);
5665
}
5766
}
5867
}

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
*/
2828
class Connection
2929
{
30+
private const AWS_SQS_FIFO_SUFFIX = '.fifo';
31+
3032
private const DEFAULT_OPTIONS = [
3133
'buffer_size' => 9,
3234
'wait_time' => 20,
@@ -196,10 +198,16 @@ private function getNewMessages(): \Generator
196198

197199
public function setup(): void
198200
{
199-
$this->call($this->configuration['endpoint'], [
201+
$parameters = [
200202
'Action' => 'CreateQueue',
201203
'QueueName' => $this->configuration['queue_name'],
202-
]);
204+
];
205+
206+
if ($this->isFifoQueue($this->configuration['queue_name'])) {
207+
$parameters['FifoQueue'] = true;
208+
}
209+
210+
$this->call($this->configuration['endpoint'], $parameters);
203211
$this->queueUrl = null;
204212

205213
$this->configuration['auto_setup'] = false;
@@ -238,11 +246,20 @@ public function send(string $body, array $headers, int $delay = 0): void
238246
$this->setup();
239247
}
240248

241-
$this->call($this->getQueueUrl(), [
249+
$messageBody = json_encode(['body' => $body, 'headers' => $headers]);
250+
251+
$parameters = [
242252
'Action' => 'SendMessage',
243-
'MessageBody' => json_encode(['body' => $body, 'headers' => $headers]),
253+
'MessageBody' => $messageBody,
244254
'DelaySeconds' => $delay,
245-
]);
255+
];
256+
257+
if ($this->isFifoQueue($this->configuration['queue_name'])) {
258+
$parameters['MessageGroupId'] = __METHOD__;
259+
$parameters['MessageDeduplicationId'] = md5($messageBody);
260+
}
261+
262+
$this->call($this->getQueueUrl(), $parameters);
246263
}
247264

248265
public function reset(): void
@@ -362,4 +379,9 @@ private function checkResponse(ResponseInterface $response): void
362379
throw new TransportException($error->Error->Message);
363380
}
364381
}
382+
383+
private function isFifoQueue(string $queueName): bool
384+
{
385+
return self::AWS_SQS_FIFO_SUFFIX === substr($queueName, -strlen(self::AWS_SQS_FIFO_SUFFIX));
386+
}
365387
}

0 commit comments

Comments
 (0)
0