8000 Use AsyncAws to handle SQS communication · symfony/symfony@49c0c68 · GitHub
[go: up one dir, main page]

Skip to content

Commit 49c0c68

Browse files
committed
Use AsyncAws to handle SQS communication
1 parent 6dc7d8b commit 49c0c68

File tree

6 files changed

+213
-288
lines changed

6 files changed

+213
-288
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
"require-dev": {
105105
"amphp/http-client": "^4.2",
106106
"amphp/http-tunnel": "^1.0",
107+
"async-aws/sqs": "^0.3",
107108
"cache/integration-tests": "dev-master",
108109
"doctrine/annotations": "~1.0",
109110
"doctrine/cache": "~1.6",

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php

Lines changed: 124 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
1313

14+
use AsyncAws\Core\Exception\Http\HttpException;
15+
use AsyncAws\Sqs\SqsClient;
1416
use PHPUnit\Framework\TestCase;
17+
use Symfony\Component\HttpClient\MockHttpClient;
18+
use Symfony\Component\HttpClient\Response\MockResponse;
1519
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
16-
use Symfony\Component\Messenger\Exception\TransportException;
1720
use Symfony\Contracts\HttpClient\HttpClientInterface;
1821
use Symfony\Contracts\HttpClient\ResponseInterface;
1922

@@ -31,7 +34,7 @@ public function testFromDsn()
3134
{
3235
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
3336
$this->assertEquals(
34-
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'queue_name' => 'queue'], $httpClient),
37+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
3538
Connection::fromDsn('sqs://default/queue', [], $httpClient)
3639
);
3740
}
@@ -40,16 +43,16 @@ public function testFromDsnWithRegion()
4043
{
4144
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
4245
$this->assertEquals(
43-
new Connection(['endpoint' => 'https://sqs.us-east-1.amazonaws.com', 'queue_name' => 'queue', 'region' => 'us-east-1'], $httpClient),
44-
Connection::fromDsn('sqs://default/queue?region=us-east-1', [], $httpClient)
46+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'us-west-2', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
47+
Connection::fromDsn('sqs://default/queue?region=us-west-2', [], $httpClient)
4548
);
4649
}
4750

4851
public function testFromDsnWithCustomEndpoint()
4952
{
5053
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
5154
$this->assertEquals(
52-
new Connection(['endpoint' => 'https://localhost', 'queue_name' => 'queue'], $httpClient),
55+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
5356
Connection::fromDsn('sqs://localhost/queue', [], $httpClient)
5457
);
5558
}
@@ -58,7 +61,7 @@ public function testFromDsnWithCustomEndpointAndPort()
5861
{
5962
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
6063
$this->assertEquals(
61-
new Connection(['endpoint' => 'https://localhost:1234', 'queue_name' => 'queue'], $httpClient),
64+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost:1234', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
6265
Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient)
6366
);
6467
}
@@ -67,7 +70,7 @@ public function testFromDsnWithOptions()
6770
{
6871
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
6972
$this->assertEquals(
70-
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
73+
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
7174
Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient)
7275
);
7376
}
@@ -76,126 +79,113 @@ public function testFromDsnWithQueryOptions()
7679
{
7780
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
7881
$this->assertEquals(
79-
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
82+
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
8083
Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient)
8184
);
8285
}
8386

84-
private function handleGetQueueUrl(int $index, $mock): string
85-
{
86-
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
87-
88-
$mock->expects($this->at($index))->method('request')
89-
->with('POST', 'https://localhost', ['body' => ['Action' => 'GetQueueUrl', 'QueueName' => 'queue']])
90-
->willReturn($response);
91-
$response->expects($this->once())->method('getStatusCode')->willReturn(200);
92-
$response->expects($this->once())->method('getContent')->willReturn('<GetQueueUrlResponse>
93-
<GetQueueUrlResult>
94-
<QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
95-
</GetQueueUrlResult>
96-
<ResponseMetadata>
97-
<RequestId>470a6f13-2ed9-4181-ad8a-2fdea142988e</RequestId>
98-
</ResponseMetadata>
99-
</GetQueueUrlResponse>');
100-
101-
return 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue';
102-
}
103-
10487
public function testKeepGettingPendingMessages()
10588
{
106-
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
107-
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
108-
109-
$queueUrl = $this->handleGetQueueUrl(0, $httpClient);
110-
111-
$httpClient->expects($this->at(1))->method('request')
112-
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20]])
113-
->willReturn($response);
114-
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
115-
<ReceiveMessageResult>
116-
<Message>
117-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
118-
<ReceiptHandle>
119-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
120-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
121-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
122-
</ReceiptHandle>
123-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
124-
<Body>{"body":"this is a test","headers":{}}</Body>
125-
<Attribute>
126-
<Name>SenderId</Name>
127-
<Value>195004372649</Value>
128-
</Attribute>
129-
<Attribute>
130-
<Name>SentTimestamp</Name>
131-
<Value>1238099229000</Value>
132-
</Attribute>
133-
<Attribute>
134-
<Name>ApproximateReceiveCount</Name>
135-
<Value>5</Value>
136-
</Attribute>
137-
<Attribute>
138-
<Name>ApproximateFirstReceiveTimestamp</Name>
139-
<Value>1250700979248</Value>
140-
</Attribute>
141-
</Message>
142-
<Message>
143-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
144-
<ReceiptHandle>
145-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
146-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
147-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
148-
</ReceiptHandle>
149-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
150-
<Body>{"body":"this is a test","headers":{}}</Body>
151-
<Attribute>
152-
<Name>SenderId</Name>
153-
<Value>195004372649</Value>
154-
</Attribute>
155-
<Attribute>
156-
<Name>SentTimestamp</Name>
157-
<Value>1238099229000</Value>
158-
</Attribute>
159-
<Attribute>
160-
<Name>ApproximateReceiveCount</Name>
161-
<Value>5</Value>
162-
</Attribute>
163-
<Attribute>
164-
<Name>ApproximateFirstReceiveTimestamp</Name>
165-
<Value>1250700979248</Value>
166-
</Attribute>
167-
</Message>
168-
<Message>
169-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
170-
<ReceiptHandle>
171-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
172-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
173-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
174-
</ReceiptHandle>
175-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
176-
<Body>{"body":"this is a test","headers":{}}</Body>
177-
<Attribute>
178-
<Name>SenderId</Name>
179-
<Value>195004372649</Value>
180-
</Attribute>
181-
<Attribute>
182-
<Name>SentTimestamp</Name>
183-
<Value>1238099229000</Value>
184-
</Attribute>
185-
<Attribute>
186-
<Name>ApproximateReceiveCount</Name>
187-
<Value>5</Value>
188-
</Attribute>
189-
<Attribute>
190-
<Name>ApproximateFirstReceiveTimestamp</Name>
191-
<Value>1250700979248</Value>
192-
</Attribute>
193-
</Message>
194-
</ReceiveMessageResult>
195-
<ResponseMetadata>
196-
<RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
197-
</ResponseMetadata>
198-
</ReceiveMessageResponse>');
89+
$httpClient = new MockHttpClient(function(string $method, string $url, array $options): ResponseInterface {
90+
if ($options['body'] === 'Action=GetQueueUrl&Version=2012-11-05&QueueName=queue') {
91+
return new MockResponse('<GetQueueUrlResponse>
92+
<GetQueueUrlResult>
93+
<QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
94+
</GetQueueUrlResult>
95+
<ResponseMetadata>
96+
<RequestId>470a6f13-2ed9-4181-ad8a-2fdea142988e</RequestId>
97+
</ResponseMetadata>
98+
</GetQueueUrlResponse>');
99+
}
100+
if ($options['body'] === 'Action=ReceiveMessage&Version=2012-11-05&QueueUrl=https%3A%2F%2Fsqs.us-east-2.amazonaws.com%2F123456789012%2FMyQueue&MaxNumberOfMessages=9&WaitTimeSeconds=20') {
101+
return new MockResponse('<ReceiveMessageResponse>
102+
<ReceiveMessageResult>
103+
<Message>
104+
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
105+
<ReceiptHandle>
106+
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
107+
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
108+
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
109+
</ReceiptHandle>
110+
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
111+
<Body>{"body":"this is a test","headers":{}}</Body>
112+
<Attribute>
113+
<Name>SenderId</Name>
114+
<Value>195004372649</Value>
115+
</Attribute>
116+
<Attribute>
117+
<Name>SentTimestamp</Name>
118+
<Value>1238099229000</Value>
119+
</Attribute>
120+
<Attribute>
121+
<Name>ApproximateReceiveCount</Name>
122+
<Value>5</Value>
123+
</Attribute>
124+
<Attribute>
125+
<Name>ApproximateFirstReceiveTimestamp</Name>
126+
<Value>1250700979248</Value>
127+
</Attribute>
128+
</Message>
129+
<Message>
130+
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
131+
<ReceiptHandle>
132+
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
133+
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
134+
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
135+
</ReceiptHandle>
136+
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
137+
<Body>{"body":"this is a test","headers":{}}</Body>
138+
<Attribute>
139+
<Name>SenderId</Name>
140+
<Value>195004372649</Value>
141+
</Attribute>
142+
<Attribute>
143+
<Name>SentTimestamp</Name>
144+
<Value>1238099229000</Value>
145+
</Attribute>
146+
<Attribute>
147+
<Name>ApproximateReceiveCount</Name>
148+
<Value>5</Value>
149+
</Attribute>
150+
<Attribute>
151+
<Name>ApproximateFirstReceiveTimestamp</Name>
152+
<Value>1250700979248</Value>
153+
</Attribute>
154+
</Message>
155+
<Message>
156+
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
157+
<ReceiptHandle>
158+
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
159+
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
160+
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
161+
</ReceiptHandle>
162+
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
163+
<Body>{"body":"this is a test","headers":{}}</Body>
164+
<Attribute>
165+
<Name>SenderId</Name>
166+
<Value>195004372649</Value>
167+
</Attribute>
168+
<Attribute>
169+
<Name>SentTimestamp</Name>
170+
<Value>1238099229000</Value>
171+
</Attribute>
172+
<Attribute>
173+
<Name>ApproximateReceiveCount</Name>
174+
<Value>5</Value>
175+
</Attribute>
176+
<Attribute>
177+
<Name>ApproximateFirstReceiveTimestamp</Name>
178+
<Value>1250700979248</Value>
179+
</Attribute>
180+
</Message>
181+
</ReceiveMessageResult>
182+
<ResponseMetadata>
183+
<RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
184+
</ResponseMetadata>
185+
</ReceiveMessageResponse>');
186+
}
187+
$this->fail('Unexpected HTTP call');
188+
});
199189

200190
$connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' => false], $httpClient);
201191
$this->assertNotNull($connection->get());
@@ -205,23 +195,21 @@ public function testKeepGettingPendingMessages()
205195

206196
public function testUnexpectedSqsError()
207197
{
208-
$this->expectException(TransportException::class);
198+
$this->expectException(HttpException::class);
209199
$this->expectExceptionMessage('SQS error happens');
210200

211-
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
212-
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
213-
214-
$httpClient->expects($this->once())->method('request')->willReturn($response);
215-
$response->expects($this->once())->method('getStatusCode')->willReturn(400);
216-
$response->expects($this->once())->method('getContent')->willReturn('<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
217-
<Error>
218-
<Type>Sender</Type>
219-
<Code>boom</Code>
220-
<Message>SQS error happens</Message>
221-
<Detail/>
222-
</Error>
223-
<RequestId>30441e49-5246-5231-9c87-4bd704b81ce9</RequestId>
224-
</ErrorResponse>');
201+
$httpClient = new MockHttpClient(function(string $method, string $url, array $options): ResponseInterface {
202+
return new MockResponse('<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
203+
<Error>
204+
<Type>Sender</Type>
205+
<Code>boom</Code>
206+
<Message>SQS error happens</Message>
207+
<Detail/>
208+
</Error>
209+
<RequestId>30441e49-5246-5231-9c87-4bd704b81ce9</RequestId>
210+
</ErrorResponse>', ['http_code'=>400]);
211+
});
212+
225213
$connection = Connection::fromDsn('sqs://localhost/queue', [], $httpClient);
226214
$connection->get();
227215
}

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
1313

14+
use AsyncAws\Core\Exception\Http\HttpException;
1415
use Symfony\Component\Messenger\Envelope;
1516
use Symfony\Component\Messenger\Exception\LogicException;
1617
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
@@ -19,7 +20,6 @@
1920
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2021
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2122
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
22-
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
2323

2424
/**
2525
* @author Jérémy Derussé <jeremy@derusse.com>
@@ -42,7 +42,7 @@ public function get(): iterable
4242
{
4343
try {
4444
$sqsEnvelope = $this->connection->get();
45-
} catch (HttpExceptionInterface $e) {
45+
} catch (HttpException $e) {
4646
throw new TransportException($e->getMessage(), 0, $e);
4747
}
4848
if (null === $sqsEnvelope) {
@@ -70,7 +70,7 @@ public function ack(Envelope $envelope): void
7070
{
7171
try {
7272
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
73-
} catch (HttpExceptionInterface $e) {
73+
} catch (HttpException $e) {
7474
throw new TransportException($e->getMessage(), 0, $e);
7575
}
7676
}
@@ -82,7 +82,7 @@ public function reject(Envelope $envelope): void
8282
{
8383
try {
8484
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
85-
} catch (HttpExceptionInterface $e) {
85+
} catch (HttpException $e) {
8686
throw new TransportException($e->getMessage(), 0, $e);
8787
}
8888
}
@@ -94,7 +94,7 @@ public function getMessageCount(): int
9494
{
9595
try {
9696
return $this->connection->getMessageCount();
97-
} catch (HttpExceptionInterface $e) {
97+
} catch (HttpException $e) {
9898
throw new TransportException($e->getMessage(), 0, $e);
9999
}
100100
}

0 commit comments

Comments
 (0)
0