diff --git a/localstack-core/localstack/services/kms/provider.py b/localstack-core/localstack/services/kms/provider.py index 02d8eb20f3261..a243e1d7fcea6 100644 --- a/localstack-core/localstack/services/kms/provider.py +++ b/localstack-core/localstack/services/kms/provider.py @@ -959,7 +959,39 @@ def re_encrypt( **kwargs, ) -> ReEncryptResponse: # TODO: when implementing, ensure cross-account support for source_key_id and destination_key_id - raise NotImplementedError + # Parse and fetch source Key + account_id, region_name, source_key_id = self._parse_key_id(source_key_id, context) + source_key = self._get_kms_key(account_id, region_name, source_key_id) + # Decrypt using source key + decrypt_response = self.decrypt( + context=context, + ciphertext_blob=ciphertext_blob, + encryption_context=source_encryption_context, + encryption_algorithm=source_encryption_algorithm, + key_id=source_key_id, + grant_tokens=grant_tokens, + ) + # Parse and fetch destination key + account_id, region_name, destination_key_id = self._parse_key_id( + destination_key_id, context + ) + destination_key = self._get_kms_key(account_id, region_name, destination_key_id) + # Encrypt using destination key + encrypt_response = self.encrypt( + context=context, + encryption_context=destination_encryption_context, + key_id=destination_key_id, + plaintext=decrypt_response["Plaintext"], + grant_tokens=grant_tokens, + dry_run=dry_run, + ) + return ReEncryptResponse( + CiphertextBlob=encrypt_response["CiphertextBlob"], + SourceKeyId=source_key.metadata.get("Arn"), + KeyId=destination_key.metadata.get("Arn"), + SourceEncryptionAlgorithm=source_encryption_algorithm, + DestinationEncryptionAlgorithm=destination_encryption_algorithm, + ) def encrypt( self, diff --git a/tests/aws/services/kms/test_kms.py b/tests/aws/services/kms/test_kms.py index 4b68dd9c38dce..92fcf1f085139 100644 --- a/tests/aws/services/kms/test_kms.py +++ b/tests/aws/services/kms/test_kms.py @@ -871,6 +871,91 @@ def test_encrypt_decrypt(self, kms_create_key, key_spec, algo, aws_client): )["Plaintext"] assert base64.b64decode(plaintext) == message + @pytest.mark.parametrize( + "key_spec,algo", + [ + ("SYMMETRIC_DEFAULT", "SYMMETRIC_DEFAULT"), + ("RSA_2048", "RSAES_OAEP_SHA_256"), + ], + ) + @markers.aws.validated + def test_re_encrypt(self, kms_create_key, key_spec, algo, aws_client, snapshot): + message = b"test message 123 !%$@ 1234567890" + source_key_id = kms_create_key(KeyUsage="ENCRYPT_DECRYPT", KeySpec=key_spec)["KeyId"] + destination_key_id = kms_create_key(KeyUsage="ENCRYPT_DECRYPT", KeySpec=key_spec)["KeyId"] + # Encrypt the message using the source key + ciphertext = aws_client.kms.encrypt( + KeyId=source_key_id, Plaintext=base64.b64encode(message), EncryptionAlgorithm=algo + )["CiphertextBlob"] + # Re-encrypt the previously encryted message using the destination key + result = aws_client.kms.re_encrypt( + SourceKeyId=source_key_id, + DestinationKeyId=destination_key_id, + CiphertextBlob=ciphertext, + SourceEncryptionAlgorithm=algo, + DestinationEncryptionAlgorithm=algo, + ) + snapshot.match("test_re_encrypt", result) + # Decrypt using the source key + source_key_plaintext = aws_client.kms.decrypt( + KeyId=source_key_id, CiphertextBlob=ciphertext, EncryptionAlgorithm=algo + )["Plaintext"] + # Decrypt using the destination key + destination_key_plaintext = aws_client.kms.decrypt( + KeyId=destination_key_id, + CiphertextBlob=result["CiphertextBlob"], + EncryptionAlgorithm=algo, + )["Plaintext"] + # Both source and destination plain texts should match the original + assert base64.b64decode(source_key_plaintext) == message + assert base64.b64decode(destination_key_plaintext) == message + + @markers.aws.validated + def test_re_encrypt_incorrect_source_key(self, kms_create_key, aws_client, snapshot): + algo = "SYMMETRIC_DEFAULT" + message = b"test message 123 !%$@ 1234567890" + source_key_id = kms_create_key(KeyUsage="ENCRYPT_DECRYPT", KeySpec=algo)["KeyId"] + ciphertext = aws_client.kms.encrypt( + KeyId=source_key_id, Plaintext=base64.b64encode(message), EncryptionAlgorithm=algo + )["CiphertextBlob"] + invalid_key_id = kms_create_key( + Description="test hmac key", + KeySpec="HMAC_224", + KeyUsage="GENERATE_VERIFY_MAC", + )["KeyId"] + + with pytest.raises(ClientError) as exc: + aws_client.kms.re_encrypt( + SourceKeyId=invalid_key_id, + DestinationKeyId=invalid_key_id, + CiphertextBlob=ciphertext, + SourceEncryptionAlgorithm=algo, + DestinationEncryptionAlgorithm=algo, + ) + snapshot.match("incorrect-source-key", exc.value.response) + + @markers.aws.validated + def test_re_encrypt_invalid_destination_key(self, kms_create_key, aws_client): + algo = "SYMMETRIC_DEFAULT" + message = b"test message 123 !%$@ 1234567890" + source_key_id = kms_create_key(KeyUsage="ENCRYPT_DECRYPT", KeySpec=algo)["KeyId"] + ciphertext = aws_client.kms.encrypt( + KeyId=source_key_id, Plaintext=base64.b64encode(message), EncryptionAlgorithm=algo + )["CiphertextBlob"] + invalid_key_id = kms_create_key(KeyUsage="SIGN_VERIFY", KeySpec="ECC_NIST_P256")["KeyId"] + with pytest.raises(ClientError) as exc: + aws_client.kms.re_encrypt( + SourceKeyId=source_key_id, + DestinationKeyId=invalid_key_id, + CiphertextBlob=ciphertext, + SourceEncryptionAlgorithm=algo, + DestinationEncryptionAlgorithm=algo, + ) + # TODO: Determine where 'context.operation.name' is being set to 'ReEncryptTo' as the expected AWS operation name is 'ReEncrypt' + # Then enable the snapshot check + # snapshot.match("invalid-destination-key-usage", exc.value.response) + assert exc.match("InvalidKeyUsageException") + @pytest.mark.parametrize( "key_spec,algo", [ @@ -1881,7 +1966,7 @@ def test_cross_accounts_access( # - GenerateDataKeyPairWithoutPlaintext # - GenerateDataKeyWithoutPlaintext # - GenerateMac - # - ReEncrypt (NOT IMPLEMENTED IN LOCALSTACK) + # - ReEncrypt # - Sign # - Verify # - VerifyMac diff --git a/tests/aws/services/kms/test_kms.snapshot.json b/tests/aws/services/kms/test_kms.snapshot.json index 17ebf79f26bb7..0d4f5ff03be92 100644 --- a/tests/aws/services/kms/test_kms.snapshot.json +++ b/tests/aws/services/kms/test_kms.snapshot.json @@ -2238,5 +2238,53 @@ } } } + }, + "tests/aws/services/kms/test_kms.py::TestKMS::test_re_encrypt[SYMMETRIC_DEFAULT-SYMMETRIC_DEFAULT]": { + "recorded-date": "09-06-2025, 12:52:58", + "recorded-content": { + "test_re_encrypt": { + "CiphertextBlob": "ciphertext-blob", + "DestinationEncryptionAlgorithm": "SYMMETRIC_DEFAULT", + "KeyId": "", + "SourceEncryptionAlgorithm": "SYMMETRIC_DEFAULT", + "SourceKeyId": "", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + } + } + }, + "tests/aws/services/kms/test_kms.py::TestKMS::test_re_encrypt[RSA_2048-RSAES_OAEP_SHA_256]": { + "recorded-date": "09-06-2025, 12:53:00", + "recorded-content": { + "test_re_encrypt": { + "CiphertextBlob": "ciphertext-blob", + "DestinationEncryptionAlgorithm": "RSAES_OAEP_SHA_256", + "KeyId": "", + "SourceEncryptionAlgorithm": "RSAES_OAEP_SHA_256", + "SourceKeyId": "", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + } + } + }, + "tests/aws/services/kms/test_kms.py::TestKMS::test_re_encrypt_incorrect_source_key": { + "recorded-date": "09-06-2025, 12:53:24", + "recorded-content": { + "incorrect-source-key": { + "Error": { + "Code": "IncorrectKeyException", + "Message": "The key ID in the request does not identify a CMK that can perform this operation." + }, + "message": "The key ID in the request does not identify a CMK that can perform this operation.", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 400 + } + } + } } } diff --git a/tests/aws/services/kms/test_kms.validation.json b/tests/aws/services/kms/test_kms.validation.json index fb082e9a3265d..df19dfe77dbba 100644 --- a/tests/aws/services/kms/test_kms.validation.json +++ b/tests/aws/services/kms/test_kms.validation.json @@ -200,6 +200,15 @@ "tests/aws/services/kms/test_kms.py::TestKMS::test_plaintext_size_for_encrypt": { "last_validated_date": "2024-04-11T15:54:20+00:00" }, + "tests/aws/services/kms/test_kms.py::TestKMS::test_re_encrypt[RSA_2048-RSAES_OAEP_SHA_256]": { + "last_validated_date": "2025-06-09T12:53:00+00:00" + }, + "tests/aws/services/kms/test_kms.py::TestKMS::test_re_encrypt[SYMMETRIC_DEFAULT-SYMMETRIC_DEFAULT]": { + "last_validated_date": "2025-06-09T12:52:58+00:00" + }, + "tests/aws/services/kms/test_kms.py::TestKMS::test_re_encrypt_incorrect_source_key": { + "last_validated_date": "2025-06-09T12:53:24+00:00" + }, "tests/aws/services/kms/test_kms.py::TestKMS::test_replicate_key": { "last_validated_date": "2024-04-11T15:53:44+00:00" }, @@ -323,16 +332,16 @@ "tests/aws/services/kms/test_kms.py::TestKMSGenerateKeys::test_generate_data_key_pair": { "last_validated_date": "2024-04-11T15:54:29+00:00" }, - "tests/aws/services/kms/test_kms.py::TestKMSGenerateKeys::test_generate_data_key_pair_without_plaintext": { - "last_validated_date": "2024-04-11T15:54:28+00:00" - }, - "tests/aws/services/kms/test_kms.py::TestKMSGenerateKeys::test_generate_data_key_without_plaintext": { - "last_validated_date": "2024-04-11T15:54:31+00:00" - }, "tests/aws/services/kms/test_kms.py::TestKMSGenerateKeys::test_generate_data_key_pair_dry_run": { "last_validated_date": "2025-04-06T11:54:20+00:00" }, + "tests/aws/services/kms/test_kms.py::TestKMSGenerateKeys::test_generate_data_key_pair_without_plaintext": { + "last_validated_date": "2024-04-11T15:54:28+00:00" + }, "tests/aws/services/kms/test_kms.py::TestKMSGenerateKeys::test_generate_data_key_pair_without_plaintext_dry_run": { "last_validated_date": "2025-04-13T15:44:57+00:00" + }, + "tests/aws/services/kms/test_kms.py::TestKMSGenerateKeys::test_generate_data_key_without_plaintext": { + "last_validated_date": "2024-04-11T15:54:31+00:00" } }