8000 fix S3 pre-signed logic for IGNORED_SIGV4_HEADERS by bentsku · Pull Request #9609 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions localstack/services/s3/presigned_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,7 @@
# headers to blacklist from request_dict.signed_headers
BLACKLISTED_HEADERS = ["X-Amz-Security-Token"]

# query params overrides for multipart upload and node sdk
# TODO: this will depends on query/post v2/v4. Manage independently
ALLOWED_QUERY_PARAMS = [
"x-id",
"x-amz-user-agent",
"x-amz-content-sha256",
"versionid",
"uploadid",
"partnumber",
]
Comment on lines -69 to -76
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was not used anywhere


IGNORED_SIGV4_HEADERS = [
"x-id",
"x-amz-user-agent",
"x-amz-content-sha256",
]

Expand Down Expand Up @@ -578,11 +565,10 @@ def _get_signed_headers_and_filtered_query_string(self) -> Tuple[Dict[str, str],
not_signed_headers = []
for header, value in headers.items():
header_low = header.lower()
if header_low.startswith("x-amz-"):
if header_low.startswith("x-amz-") and header_low not in signed_headers.lower():
if header_low in IGNORED_SIGV4_HEADERS:
continue
if header_low not in signed_headers.lower():
not_signed_headers.append(header_low)
not_signed_headers.append(header_low)
if header_low in signed_headers:
signature_headers[header_low] = value

Expand Down
83 changes: 82 additions & 1 deletion tests/aws/services/s3/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import shutil
import tempfile
import time
from importlib.util import find_spec
from io import BytesIO
from operator import itemgetter
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -6546,7 +6547,7 @@ def test_pre_signed_url_forward_slash_bucket(
"signature_version",
["s3", "s3v4"],
)
@markers.aws.unknown
@markers.aws.validated
def test_s3_presign_url_encoding(
self, aws_client, s3_bucket, signature_version, patch_s3_skip_signature_validation_false
):
Expand All @@ -6568,6 +6569,86 @@ def test_s3_presign_url_encoding(
assert req.ok
assert req.content == b"123"

@markers.aws.validated
def test_s3_ignored_special_headers(
self,
s3_bucket,
patch_s3_skip_signature_validation_false,
monkeypatch,
):
# 7440 if the crt.auth is not available, not need to patch as it will use it by default
if find_spec("botocore.crt.auth"):
# the CRT client does not allow us to pass a protected header, it will trigger an exception, so we need
# to patch the Signer selection to the Python implementation which does not have this check
from botocore.auth import AUTH_TYPE_MAPS, S3SigV4QueryAuth

monkeypatch.setitem(AUTH_TYPE_MAPS, "s3v4-query", S3SigV4QueryAuth)

key = "my-key"
presigned_client = _s3_client_custom_config(
Config(signature_version="s3v4", s3={"payload_signing_enabled": True}),
endpoint_url=_endpoint_url(),
)

def add_content_sha_header(request, **kwargs):
request.headers["x-amz-content-sha256"] = "UNSIGNED-PAYLOAD"

presigned_client.meta.events.register(
"before-sign.s3.PutObject",
handler=add_content_sha_header,
)
try:
url = presigned_client.generate_presigned_url(
"put_object", Params={"Bucket": s3_bucket, "Key": key}
)
assert "x-amz-content-sha256" in url
# somehow, it's possible to add "x-amz-content-sha256" to signed headers, the AWS Go SDK does it
resp = requests.put(
url,
data="something",
verify=False,
headers={"x-amz-content-sha256": "UNSIGNED-PAYLOAD"},
)
assert resp.ok

# if signed but not provided, AWS will raise an exception
resp = requests.put(url, data="something", verify=False)
assert resp.status_code == 403

finally:
presigned_client.meta.events.unregister(
"before-sign.s3.PutObject",
add_content_sha_header,
)

# recreate the request, without the signed header
url = presigned_client.generate_presigned_url(
"put_object", Params={"Bucket": s3_bucket, "Key": key}
)
assert "x-amz-content-sha256" not in url

# assert that if provided and not signed, AWS will ignore it even if it starts with `x-amz`
resp = requests.put(
url,
data="something",
verify=False,
headers={"x-amz-content-sha256": "UNSIGNED-PAYLOAD"},
)
assert resp.ok

# assert that x-amz-user-agent is not ignored, it must be set in SignedHeaders
resp = requests.put(
url, data="something", verify=False, headers={"x-amz-user-agent": "test"}
)
assert resp.status_code == 403

# X-Amz-Signature needs to be the last query string parameter: insert x-id before like the Go SDK
index = url.find("&X-Amz-Signature")
rewritten_url = url[:index] + "&x-id=PutObject" + url[index:]
# however, the x-id query string parameter is not ignored
resp = requests.put(rewritten_url, data="something", verify=False)
assert resp.status_code == 403


class TestS3DeepArchive:
"""
Expand Down
0