8000 [AmazonSqsMessenger] Use AsyncAws to handle SQS communication by jderusse · Pull Request #36094 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[AmazonSqsMessenger] Use AsyncAws to handle SQS communication #36094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
"amphp/http-client": "^4.2",
"amphp/http-tunnel": "^1.0",
"async-aws/ses": "^1.0",
"async-aws/sqs": "^1.0",
"cache/integration-tests": "dev-master",
"doctrine/annotations": "~1.0",
"doctrine/cache": "~1.6",
Expand Down
8000
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;

use AsyncAws\Sqs\SqsClient;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
Expand Down Expand Up @@ -39,7 +40,7 @@ private function execute(string $dsn): void
{
$connection = Connection::fromDsn($dsn, []);
$connection->setup();
$this->clearSqs($connection);
$this->clearSqs($dsn);

$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$this->assertSame(1, $connection->getMessageCount());
Expand All @@ -53,15 +54,12 @@ private function execute(string $dsn): void
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}

private function clearSqs(Connection $connection): void
private function clearSqs(string $dsn): void
{
$wait = 0;
while ($wait++ < 50) {
if (null === $message = $connection->get()) {
usleep(5000);
continue;
}
$connection->delete($message['id']);
}
$url = parse_url($dsn);
$client = new SqsClient(['endpoint' => "http://{$url['host']}:{$url['port']}"]);
$client->purgeQueue([
'QueueUrl' => $client->getQueueUrl(['QueueName' => ltrim($url['path'], '/')])->getQueueUrl(),
]);
}
}
8000
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@

namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;

use AsyncAws\Core\Exception\Http\HttpException;
use AsyncAws\Core\Test\ResultMockFactory;
use AsyncAws\Sqs\Result\GetQueueUrlResult;
use AsyncAws\Sqs\Result\ReceiveMessageResult;
use AsyncAws\Sqs\SqsClient;
use AsyncAws\Sqs\ValueObject\Message;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;

class ConnectionTest extends TestCase
{
Expand All @@ -31,7 +35,7 @@ public function testFromDsn()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'queue_name' => 'queue'], $httpClient),
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://default/queue', [], $httpClient)
);
}
Expand All @@ -40,16 +44,16 @@ public function testFromDsnWithRegion()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://sqs.us-east-1.amazonaws.com', 'queue_name' => 'queue', 'region' => 'us-east-1'], $httpClient),
Connection::fromDsn('sqs://default/queue?region=us-east-1', [], $httpClient)
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'us-west-2', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://default/queue?region=us-west-2', [], $httpClient)
);
}

public function testFromDsnWithCustomEndpoint()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://localhost', 'queue_name' => 'queue'], $httpClient),
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://localhost/queue', [], $httpClient)
);
}
Expand All @@ -58,7 +62,7 @@ public function testFromDsnWithCustomEndpointAndPort()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://localhost:1234', 'queue_name' => 'queue'], $httpClient),
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost:1234', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient)
);
}
Expand All @@ -67,7 +71,7 @@ public function testFromDsnWithOptions()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient)
);
}
Expand All @@ -76,153 +80,63 @@ public function testFromDsnWithQueryOptions()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$this->assertEquals(
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient)
);
}

private function handleGetQueueUrl(int $index, $mock): string
{
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();

$mock->expects($this->at($index))->method('request')
->with('POST', 'https://localhost', ['body' => ['Action' => 'GetQueueUrl', 'QueueName' => 'queue']])
->willReturn($response);
$response->expects($this->once())->method('getStatusCode')->willReturn(200);
$response->expects($this->once())->method('getContent')->willReturn('<GetQueueUrlResponse>
<GetQueueUrlResult>
<QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
</GetQueueUrlResult>
<ResponseMetadata>
<RequestId>470a6f13-2ed9-4181-ad8a-2fdea142988e</RequestId>
</ResponseMetadata>
</GetQueueUrlResponse>');

return 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue';
}

public function testKeepGettingPendingMessages()
{
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();

$queueUrl = $this->handleGetQueueUrl(0, $httpClient);

$httpClient->expects($this->at(1))->method('request')
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20, 'MessageAttributeName.1' => 'All']])
->willReturn($response);
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
<ReceiveMessageResult>
<Message>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
<Body>{"body":"this is a test","headers":{}}</Body>
<Attribute>
<Name>SenderId</Name>
<Value>195004372649</Value>
</Attribute>
<Attribute>
<Name>SentTimestamp</Name>
<Value>1238099229000</Value>
</Attribute>
<Attribute>
<Name>ApproximateReceiveCount</Name>
<Value>5</Value>
</Attribute>
<Attribute>
<Name>ApproximateFirstReceiveTimestamp</Name>
<Value>1250700979248</Value>
</Attribute>
</Message>
<Message>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
<Body>{"body":"this is a test","headers":{}}</Body>
<Attribute>
<Name>SenderId</Name>
<Value>195004372649</Value>
</Attribute>
<Attribute>
<Name>SentTimestamp</Name>
<Value>1238099229000</Value>
</Attribute>
<Attribute>
<Name>ApproximateReceiveCount</Name>
<Value>5</Value>
</Attribute>
<Attribute>
<Name>ApproximateFirstReceiveTimestamp</Name>
<Value>1250700979248</Value>
</Attribute>
</Message>
<Message>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
<ReceiptHandle>
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
</ReceiptHandle>
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
<Body>{"body":"this is a test","headers":{}}</Body>
<Attribute>
<Name>SenderId</Name>
<Value>195004372649</Value>
</Attribute>
<Attribute>
<Name>SentTimestamp</Name>
<Value>1238099229000</Value>
</Attribute>
<Attribute>
<Name>ApproximateReceiveCount</Name>
<Value>5</Value>
</Attribute>
<Attribute>
<Name>ApproximateFirstReceiveTimestamp</Name>
<Value>1250700979248</Value>
</Attribute>
</Message>
</ReceiveMessageResult>
<ResponseMetadata>
<RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
</ResponseMetadata>
</ReceiveMessageResponse>');

$connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' => false], $httpClient);
$client = $this->createMock(SqsClient::class);
$client->expects($this->any())
->method('getQueueUrl')
->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123])
->willReturn(ResultMockFactory::create(GetQueueUrlResult::class, ['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue']));
$client->expects($this->at(1))
->method('receiveMessage')
->with([
'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
'MaxNumberOfMessages' => 9,
'WaitTimeSeconds' => 20,
'MessageAttributeNames' => ['All'],
'VisibilityTimeout' => null,
])
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
new Message(['MessageId' => 1, 'Body' => 'this is a test']),
new Message(['MessageId' => 2, 'Body' => 'this is a test']),
new Message(['MessageId' => 3, 'Body' => 'this is a test']),
]]));
$client->expects($this->at(2))
->method('receiveMessage')
->with([
'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
'MaxNumberOfMessages' => 9,
'WaitTimeSeconds' => 20,
'MessageAttributeNames' => ['All'],
'VisibilityTimeout' => null,
])
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
]]));

$connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client);
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
$this->assertNotNull($connection->get());
$this->assertNull($connection->get());
}

public function testUnexpectedSqsError()
{
$this->expectException(TransportException::class);
$this->expectException(HttpException::class);
$this->expectExceptionMessage('SQS error happens');

$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();

$httpClient->expects($this->once())->method('request')->willReturn($response);
$response->expects($this->once())->method('getStatusCode')->willReturn(400);
$response->expects($this->once())->method('getContent')->willReturn('<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
<Error>
<Type>Sender</Type>
<Code>boom</Code>
<Message>SQS error happens</Message>
<Detail/>
</Error>
<RequestId>30441e49-5246-5231-9c87-4bd704b81ce9</RequestId>
</ErrorResponse>');
$connection = Connection::fromDsn('sqs://localhost/queue', [], $httpClient);
$client = $this->createMock(SqsClient::class);
$client->expects($this->any())
->method('getQueueUrl')
->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123])
->willReturn(ResultMockFactory::createFailing(GetQueueUrlResult::class, 400, 'SQS error happens'));

$connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client);
$connection->get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;

use AsyncAws\Core\Exception\Http\HttpException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
Expand All @@ -19,7 +20,6 @@
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;

/**
* @author Jérémy Derussé <jeremy@derusse.com>
Expand All @@ -42,7 +42,7 @@ public function get(): iterable
{
try {
$sqsEnvelope = $this->connection->get();
} catch (HttpExceptionInterface $e) {
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
if (null === $sqsEnvelope) {
Expand Down Expand Up @@ -70,7 +70,7 @@ public function ack(Envelope $envelope): void
{
try {
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
} catch (HttpExceptionInterface $e) {
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
Expand All @@ -82,7 +82,7 @@ public function reject(Envelope $envelope): void
{
try {
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
} catch (HttpExceptionInterface $e) {
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
Expand All @@ -94,7 +94,7 @@ public function getMessageCount(): int
{
try {
return $this->connection->getMessageCount();
} catch (HttpExceptionInterface $e) {
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@

namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;

use AsyncAws\Core\Exception\Http\HttpException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;

/**
* @author Jérémy Derussé <jeremy@derusse.com>
Expand Down Expand Up @@ -61,7 +61,7 @@ public function send(Envelope $envelope): Envelope
$messageGroupId,
$messageDeduplicationId
);
} catch (HttpExceptionInterface $e) {
} catch (HttpException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

Expand Down
Loading
0