8000 feature #36094 [AmazonSqsMessenger] Use AsyncAws to handle SQS commun… · symfony/symfony@017420b · GitHub
[go: up one dir, main page]

Skip to content

Commit 017420b

Browse files
committed
feature #36094 [AmazonSqsMessenger] Use AsyncAws to handle SQS communication (jderusse)
This PR was squashed before being merged into the 5.1-dev branch. Discussion ---------- [AmazonSqsMessenger] Use AsyncAws to handle SQS communication | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | / | License | MIT | Doc PR | / Similar to #35992 this PR use AsyncAws to handle Sqs messages sent/receive It move complexity of authentication/streaming outside Symfony while keeping HttpClient integration. Commits ------- 7c4888e [AmazonSqsMessenger] Use AsyncAws to handle SQS communication
2 parents 87a5701 + 7c4888e commit 017420b

File tree

8 files changed

+174
-317
lines changed

8 files changed

+174
-317
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
"amphp/http-client": "^4.2",
106106
"amphp/http-tunnel": "^1.0",
107107
"async-aws/ses": "^1.0",
108+
"async-aws/sqs": "^1.0",
108109
"cache/integration-tests": "dev-master",
109110
"doctrine/annotations": "~1.0",
110111
"doctrine/cache": "~1.6",

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
1313

14+
use AsyncAws\Sqs\SqsClient;
1415
use PHPUnit\Framework\TestCase;
1516
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
1617
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
@@ -39,7 +40,7 @@ private function execute(string $dsn): void
3940
{
4041
$connection = Connection::fromDsn($dsn, []);
4142
$connection->setup();
42-
$this->clearSqs($connection);
43+
$this->clearSqs($dsn);
4344

4445
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
4546
$this->assertSame(1, $connection->getMessageCount());
@@ -53,15 +54,12 @@ private function execute(string $dsn): void
5354
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
5455
}
5556

56-
private function clearSqs(Connection $connection): void
57+
private function clearSqs(string $dsn): void
5758
{
58-
$wait = 0;
59-
while ($wait++ < 50) {
60-
if (null === $message = $connection->get()) {
61-
usleep(5000);
62-
continue;
63-
}
64-
$connection->delete($message['id']);
65-
}
59+
$url = parse_url($dsn);
60+
$client = new SqsClient(['endpoint' => "http://{$url['host']}:{$url['port']}"]);
61+
$client->purgeQueue([
62+
'QueueUrl' => $client->getQueueUrl(['QueueName' => ltrim($url['path'], '/')])->getQueueUrl(),
63+
]);
6664
}
6765
}

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php

Lines changed: 54 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
1313

14+
use AsyncAws\Core\Exception\Http\HttpException;
15+
use AsyncAws\Core\Test\ResultMockFactory;
16+
use AsyncAws\Sqs\Result\GetQueueUrlResult;
17+
use AsyncAws\Sqs\Result\ReceiveMessageResult;
18+
use AsyncAws\Sqs\SqsClient;
19+
use AsyncAws\Sqs\ValueObject\Message;
1420
use PHPUnit\Framework\TestCase;
1521
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
16-
use Symfony\Component\Messenger\Exception\TransportException;
1722
use Symfony\Contracts\HttpClient\HttpClientInterface;
18-
use Symfony\Contracts\HttpClient\ResponseInterface;
1923

2024
class ConnectionTest extends TestCase
2125
{
@@ -31,7 +35,7 @@ public function testFromDsn()
3135
{
3236
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
3337
$this->assertEquals(
34-
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'queue_name' => 'queue'], $httpClient),
38+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
3539
Connection::fromDsn('sqs://default/queue', [], $httpClient)
3640
);
3741
}
@@ -40,16 +44,16 @@ public function testFromDsnWithRegion()
4044
{
4145
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
4246
$this->assertEquals(
43-
new Connection(['endpoint' => 'https://sqs.us-east-1.amazonaws.com', 'queue_name' => 'queue', 'region' => 'us-east-1'], $httpClient),
44-
Connection::fromDsn('sqs://default/queue?region=us-east-1', [], $httpClient)
47+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'us-west-2', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
48+
Connection::fromDsn('sqs://default/queue?region=us-west-2', [], $httpClient)
4549
);
4650
}
4751

4852
public function testFromDsnWithCustomEndpoint()
4953
{
5054
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
5155
$this->assertEquals(
52-
new Connection(['endpoint' => 'https://localhost', 'queue_name' => 'queue'], $httpClient),
56+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
5357
Connection::fromDsn('sqs://localhost/queue', [], $httpClient)
5458
);
5559
}
@@ -58,7 +62,7 @@ public function testFromDsnWithCustomEndpointAndPort()
5862
{
5963
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
6064
$this->assertEquals(
61-
new Connection(['endpoint' => 'https://localhost:1234', 'queue_name' => 'queue'], $httpClient),
65+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost:1234', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
6266
Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient)
6367
);
6468
}
@@ -67,7 +71,7 @@ public function testFromDsnWithOptions()
6771
{
6872
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
6973
$this->assertEquals(
70-
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
74+
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
7175
Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient)
7276
);
7377
}
@@ -76,153 +80,63 @@ public function testFromDsnWithQueryOptions()
7680
{
7781
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
7882
$this->assertEquals(
79-
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
83+
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
8084
Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient)
8185
);
8286
}
8387

84-
private function handleGetQueueUrl(int $index, $mock): string
85-
{
86-
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
87-
88-
$mock->expects($this->at($index))->method('request')
89-
->with('POST', 'https://localhost', ['body' => ['Action' => 'GetQueueUrl', 'QueueName' => 'queue']])
90-
->willReturn($response);
91-
$response->expects($this->once())->method('getStatusCode')->willReturn(200);
92-
$response->expects($this->once())->method('getContent')->willReturn('<GetQueueUrlResponse>
93-
<GetQueueUrlResult>
94-
<QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
95-
</GetQueueUrlResult>
96-
<ResponseMetadata>
97-
<RequestId>470a6f13-2ed9-4181-ad8a-2fdea142988e</RequestId>
98-
</ResponseMetadata>
99-
</GetQueueUrlResponse>');
100-
101-
return 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue';
102-
}
103-
10488
public function testKeepGettingPendingMessages()
10589
{
106-
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
107-
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
108-
109-
$queueUrl = $this->handleGetQueueUrl(0, $httpClient);
110-
111-
$httpClient->expects($this->at(1))->method('request')
112-
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20, 'MessageAttributeName.1' => 'All']])
113-
->willReturn($response);
114-
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
115-
<ReceiveMessageResult>
116-
<Message>
117-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
118-
<ReceiptHandle>
119-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
120-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
121-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
122-
</ReceiptHandle>
123-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
124-
<Body>{"body":"this is a test","headers":{}}</Body>
125-
<Attribute>
126-
<Name>SenderId</Name>
127-
<Value>195004372649</Value>
128-
</Attribute>
129-
<Attribute>
130-
<Name>SentTimestamp</Name>
131-
<Value>1238099229000</Value>
132-
</Attribute>
133-
<Attribute>
134-
<Name>ApproximateReceiveCount</Name>
135-
<Value>5</Value>
136-
</Attribute>
137-
<Attribute>
138-
<Name>ApproximateFirstReceiveTimestamp</Name>
139-
<Value>1250700979248</Value>
140-
</Attribute>
141-
</Message>
142-
<Message>
143-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
144-
<ReceiptHandle>
145-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
146-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
147-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
148-
</ReceiptHandle>
149-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
150-
<Body>{"body":"this is a test","headers":{}}</Body>
151-
<Attribute>
152-
<Name>SenderId</Name>
153-
<Value>195004372649</Value>
154-
</Attribute>
155-
<Attribute>
156-
<Name>SentTimestamp</Name>
157-
<Value>1238099229000</Value>
158-
</Attribute>
159-
<Attribute>
160-
<Name>ApproximateReceiveCount</Name>
161-
<Value>5</Value>
162-
</Attribute>
163-
<Attribute>
164-
<Name>ApproximateFirstReceiveTimestamp</Name>
165-
<Value>1250700979248</Value>
166-
</Attribute>
167-
</Message>
168-
<Message>
169-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
170-
<ReceiptHandle>
171-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
172-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
173-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
174-
</ReceiptHandle>
175-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
176-
<Body>{"body":"this is a test","headers":{}}</Body>
177-
<Attribute>
178-
<Name>SenderId</Name>
179-
<Value>195004372649</Value>
180-
</Attribute>
181-
<Attribute>
182-
<Name>SentTimestamp</Name>
183-
<Value>1238099229000</Value>
184-
</Attribute>
185-
<Attribute>
186-
<Name>ApproximateReceiveCount</Name>
187-
<Value>5</Value>
188-
</Attribute>
189-
<Attribute>
190-
<Name>ApproximateFirstReceiveTimestamp</Name>
191-
<Value>1250700979248</Value>
192-
</Attribute>
193-
</Message>
194-
</ReceiveMessageResult>
195-
<ResponseMetadata>
196-
<RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
197-
</ResponseMetadata>
198-
</ReceiveMessageResponse>');
199-
200-
$connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' => false], $httpClient);
90+
$client = $this->createMock(SqsClient::class);
91+
$client->expects($this->any())
92+
->method('getQueueUrl')
93+
->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123])
94+
->willReturn(ResultMockFactory::create(GetQueueUrlResult::class, ['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue']));
95+
$client->expects($this->at(1))
96+
->method('receiveMessage')
97+
->with([
98+
'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
99+
'MaxNumberOfMessages' => 9,
100+
'WaitTimeSeconds' => 20,
101+
'MessageAttributeNames' => ['All'],
102+
'VisibilityTimeout' => null,
103+
])
104+
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
105+
new Message(['MessageId' => 1, 'Body' => 'this is a test']),
106+
new Message(['MessageId' => 2, 'Body' => 'this is a test']),
107+
new Message(['MessageId' => 3, 'Body' => 'this is a test']),
108+
]]));
109+
$client->expects($this->at(2))
110+
->method('receiveMessage')
111+
->with([
112+
'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
113+
'MaxNumberOfMessages' => 9,
114+
'WaitTimeSeconds' => 20,
115+
'MessageAttributeNames' => ['All'],
116+
'VisibilityTimeout' => null,
117+
])
118+
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
119+
]]));
120+
121+
$connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client);
201122
$this->assertNotNull($connection->get());
202123
$this->assertNotNull($connection->get());
203124
$this->assertNotNull($connection->get());
125+
$this->assertNull($connection->get());
204126
}
205127

206128
public function testUnexpectedSqsError()
207129
{
208-
$this->expectException(TransportException::class);
130+
$this->expectException(HttpException::class);
209131
$this->expectExceptionMessage('SQS error happens');
210132

211-
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
212-
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
213-
214-
$httpClient->expects($this->once())->method('request')->willReturn($response);
215-
$response->expects($this->once())->method('getStatusCode')->willReturn(400);
216-
$response->expects($this->once())->method('getContent')->willReturn('<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
217-
<Error>
218-
<Type>Sender</Type>
219-
<Code>boom</Code>
220-
<Message>SQS error happens</Message>
221-
<Detail/>
222-
</Error>
223-
<RequestId>30441e49-5246-5231-9c87-4bd704b81ce9</RequestId>
224-
</ErrorResponse>');
225-
$connection = Connection::fromDsn('sqs://localhost/queue', [], $httpClient);
133+
$client = $this->createMock(SqsClient::class);
134+
$client->expects($this->any())
135+
->method('getQueueUrl')
136+
->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123])
137+
->willReturn(ResultMockFactory::createFailing(GetQueueUrlResult::class, 400, 'SQS error happens'));
138+
139+
$connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client);
226140
$connection->get();
227141
}
228142
}

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
1313

14+
use AsyncAws\Core\Exception\Http\HttpException;
1415
use Symfony\Component\Messenger\Envelope;
1516
use Symfony\Component\Messenger\Exception\LogicException;
1617
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
@@ -19,7 +20,6 @@
1920
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2021
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2122
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
22-
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
2323

2424
/**
2525
* @author Jérémy Derussé <jeremy@derusse.com>
@@ -42,7 +42,7 @@ public function get(): iterable
4242
{
4343
try {
4444
$sqsEnvelope = $this->connection->get();
45-
} catch (HttpExceptionInterface $e) {
45+
} catch (HttpException $e) {
4646
throw new TransportException($e->getMessage(), 0, $e);
4747
}
4848
if (null === $sqsEnvelope) {
@@ -70,7 +70,7 @@ public function ack(Envelope $envelope): void
7070
{
7171
try {
7272
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
73-
} catch (HttpExceptionInterface $e) {
73+
} catch (HttpException $e) {
7474
throw new TransportException($e->getMessage(), 0, $e);
7575
}
7676
}
@@ -82,7 +82,7 @@ public function reject(Envelope $envelope): void
8282
{
8383
try {
8484
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
85-
} catch (HttpExceptionInterface $e) {
85+
} catch (HttpException $e) {
8686
throw new TransportException($e->getMessage(), 0, $e);
8787
}
8888
}
@@ -94,7 +94,7 @@ public function getMessageCount(): int
9494
{
9595
try {
9696
return $this->connection->getMessageCount();
97-
} catch (HttpExceptionInterface $e) {
97+
} catch (HttpException $e) {
9898
throw new TransportException($e->getMessage(), 0, $e);
9999
}
100100
}

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
1313

14+
use AsyncAws\Core\Exception\Http\HttpException;
1415
use Symfony\Component\Messenger\Envelope;
1516
use Symfony\Component\Messenger\Exception\TransportException;
1617
use Symfony\Component\Messenger\Stamp\DelayStamp;
1718
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1819
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
19-
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
2020

2121
/**
2222
* @author Jérémy Derussé <jeremy@derusse.com>
@@ -61,7 +61,7 @@ public function send(Envelope $envelope): Envelope
6161
$messageGroupId,
6262
$messageDeduplicationId
6363
);
64-
} catch (HttpExceptionInterface $e) {
64+
} catch (HttpException $e) {
6565
throw new TransportException($e->getMessage(), 0, $e);
6666
}
6767

0 commit comments

Comments
 (0)
0