From 52dc6849e241c6285bed698ac3e327f8a2d19bbc Mon Sep 17 00:00:00 2001 From: Peter Trebaticky Date: Fri, 21 Feb 2025 01:04:45 +0100 Subject: [PATCH 1/2] [Messenger] Add options to specify SQS queue attributes and tags --- CHANGELOG.md | 5 ++++ Tests/Transport/ConnectionTest.php | 39 ++++++++++++++++++++++++++++++ Transport/Connection.php | 16 +++++++++--- 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e29f4f..0661c85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ CHANGELOG ========= +7.3 +--- + + * Add new `queue_attributes` and `queue_tags` options for SQS queue creation + 7.2 --- diff --git a/Tests/Transport/ConnectionTest.php b/Tests/Transport/ConnectionTest.php index 3352bfd..159c674 100644 --- a/Tests/Transport/ConnectionTest.php +++ b/Tests/Transport/ConnectionTest.php @@ -13,7 +13,9 @@ use AsyncAws\Core\Exception\Http\HttpException; use AsyncAws\Core\Test\ResultMockFactory; +use AsyncAws\Sqs\Enum\QueueAttributeName; use AsyncAws\Sqs\Result\GetQueueUrlResult; +use AsyncAws\Sqs\Result\QueueExistsWaiter; use AsyncAws\Sqs\Result\ReceiveMessageResult; use AsyncAws\Sqs\SqsClient; use AsyncAws\Sqs\ValueObject\Message; @@ -385,6 +387,43 @@ public function testKeepaliveWithTooSmallTtl() $connection->keepalive('123', 2); } + public function testQueueAttributesAndTags() + { + $queueName = 'queueName.fifo'; + $queueAttributes = [ + QueueAttributeName::MESSAGE_RETENTION_PERIOD => '900', + QueueAttributeName::MAXIMUM_MESSAGE_SIZE => '1024', + ]; + $queueTags = ['tag1' => 'value1', 'tag2' => 'value2']; + + $queueExistsWaiter = ResultMockFactory::waiter(QueueExistsWaiter::class, QueueExistsWaiter::STATE_FAILURE); + $client = $this->createMock(SqsClient::class); + $client->method('queueExists')->willReturn($queueExistsWaiter); + $client->expects($this->once())->method('createQueue')->with(['QueueName' => $queueName, 'Attributes' => array_merge($queueAttributes, [QueueAttributeName::FIFO_QUEUE => 'true']), 'tags' => $queueTags]); + + $connection = new Connection(['queue_name' => $queueName, 'queue_attributes' => $queueAttributes, 'queue_tags' => $queueTags], $client); + + $this->expectException(TransportException::class); + $connection->setup(); + } + + public function testQueueAttributesAndTagsFromDsn() + { + $httpClient = $this->createMock(HttpClientInterface::class); + + $queueName = 'queueName'; + $queueAttributes = [ + QueueAttributeName::MESSAGE_RETENTION_PERIOD => '900', + QueueAttributeName::MAXIMUM_MESSAGE_SIZE => '1024', + ]; + $queueTags = ['tag1' => 'value1', 'tag2' => 'value2']; + + $this->assertEquals( + new Connection(['queue_name' => $queueName, 'queue_attributes' => $queueAttributes, 'queue_tags' => $queueTags], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)), + Connection::fromDsn('sqs://default/'.$queueName, ['queue_attributes' => $queueAttributes, 'queue_tags' => $queueTags], $httpClient) + ); + } + private function getMockedQueueUrlResponse(): MockResponse { if ($this->isAsyncAwsSqsVersion2Installed()) { diff --git a/Transport/Connection.php b/Transport/Connection.php index 40a6e06..3661451 100644 --- a/Transport/Connection.php +++ b/Transport/Connection.php @@ -46,6 +46,8 @@ class Connection 'endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'region' => 'eu-west-1', 'queue_name' => 'messages', + 'queue_attributes' => null, + 'queue_tags' => null, 'account' => null, 'sslmode' => null, 'debug' => null, @@ -89,6 +91,8 @@ public function __destruct() * * endpoint: absolute URL to the SQS service (Default: https://sqs.eu-west-1.amazonaws.com) * * region: name of the AWS region (Default: eu-west-1) * * queue_name: name of the queue (Default: messages) + * * queue_attributes: attributes of the queue, array + * * queue_tags: tags of the queue, array * * account: identifier of the AWS account * * access_key: AWS access key * * secret_key: AWS secret key @@ -132,6 +136,8 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option 'visibility_timeout' => null !== $options['visibility_timeout'] ? (int) $options['visibility_timeout'] : null, 'auto_setup' => filter_var($options['auto_setup'], \FILTER_VALIDATE_BOOL), 'queue_name' => (string) $options['queue_name'], + 'queue_attributes' => $options['queue_attributes'], + 'queue_tags' => $options['queue_tags'], ]; $clientConfiguration = [ @@ -278,12 +284,14 @@ public function setup(): void throw new InvalidArgumentException(\sprintf('The Amazon SQS queue "%s" does not exist (or you don\'t have permissions on it), and can\'t be created when an account is provided.', $this->configuration['queue_name'])); } - $parameters = ['QueueName' => $this->configuration['queue_name']]; + $parameters = [ + 'QueueName' => $this->configuration['queue_name'], + 'Attributes' => $this->configuration['queue_attributes'], + 'tags' => $this->configuration['queue_tags'], + ]; if (self::isFifoQueue($this->configuration['queue_name'])) { - $parameters['Attributes'] = [ - 'FifoQueue' => 'true', - ]; + $parameters['Attributes'][QueueAttributeName::FIFO_QUEUE] = 'true'; } $this->client->createQueue($parameters); From 5405e07be3331a4181ddc84cbc7a423c304a7607 Mon Sep 17 00:00:00 2001 From: Andrii Date: Tue, 25 Feb 2025 22:48:28 +0100 Subject: [PATCH 2/2] [Messenger] Allow to close the transport connection --- CHANGELOG.md | 1 + Transport/AmazonSqsTransport.php | 8 +++++++- composer.json | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0661c85..38117ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ CHANGELOG 7.3 --- + * Implement the `CloseableTransportInterface` to allow closing the transport * Add new `queue_attributes` and `queue_tags` options for SQS queue creation 7.2 diff --git a/Transport/AmazonSqsTransport.php b/Transport/AmazonSqsTransport.php index d98efef..2c26100 100644 --- a/Transport/AmazonSqsTransport.php +++ b/Transport/AmazonSqsTransport.php @@ -14,6 +14,7 @@ use AsyncAws\Core\Exception\Http\HttpException; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Transport\CloseableTransportInterface; use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; @@ -27,7 +28,7 @@ /** * @author Jérémy Derussé */ -class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, MessageCountAwareInterface, ResetInterface +class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, CloseableTransportInterface, MessageCountAwareInterface, ResetInterface { private SerializerInterface $serializer; @@ -91,6 +92,11 @@ public function reset(): void } } + public function close(): void + { + $this->reset(); + } + private function getReceiver(): MessageCountAwareInterface&ReceiverInterface { return $this->receiver ??= new AmazonSqsReceiver($this->connection, $this->serializer); diff --git a/composer.json b/composer.json index 341026c..f334cd3 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,7 @@ "php": ">=8.2", "async-aws/core": "^1.7", "async-aws/sqs": "^1.0|^2.0", - "symfony/messenger": "^7.2", + "symfony/messenger": "^7.3", "symfony/service-contracts": "^2.5|^3", "psr/log": "^1|^2|^3" },