8000 add support for S3 notifications to EventBridge (#6555) · localstack/localstack@bb05ce6 · GitHub
[go: up one dir, main page]

Skip to content

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

8000
Appearance settings

Commit bb05ce6

Browse files
authored
add support for S3 notifications to EventBridge (#6555)
1 parent 0670595 commit bb05ce6

File tree

4 files changed

+296
-28
lines changed

4 files changed

+296
-28
lines changed

localstack/services/s3/s3_listener.py

Lines changed: 91 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,13 @@
7878
)
7979

8080
# list of destination types for bucket notifications
81-
NOTIFICATION_DESTINATION_TYPES = ("Queue", "Topic", "CloudFunction", "LambdaFunction")
81+
NOTIFICATION_DESTINATION_TYPES = (
82+
"Queue",
83+
"Topic",
84+
"CloudFunction",
85+
"LambdaFunction",
86+
"EventBridge",
87+
)
8288

8389
# prefix for object metadata keys in headers and query params
8490
OBJECT_METADATA_KEY_PREFIX = "x-amz-meta-"
@@ -231,6 +237,7 @@ def get_event_message(
231237
version_id=None,
232238
file_size=0,
233239
config_id="testConfigRule",
240+
source_ip="127.0.0.1",
234241
):
235242
# Based on: http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
236243
bucket_name = normalize_bucket_name(bucket_name)
@@ -243,9 +250,7 @@ def get_event_message(
243250
"eventTime": timestamp_millis(),
244251
"eventName": event_name,
245252
"userIdentity": {"principalId": "AIDAJDPLRKLG7UEXAMPLE"},
246-
"requestParameters": {
247-
"sourceIPAddress": "127.0.0.1"
248-
}, # TODO determine real source IP
253+
"requestParameters": {"sourceIPAddress": source_ip},
249254
"responseElements": {
250255
"x-amz-request-id": short_uid(),
251256
"x-amz-id-2": "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2", # Amazon S3 host that processed the request
@@ -308,12 +313,19 @@ def send_notifications(method, bucket_name, object_path, version_id, headers, me
308313

309314

310315
def send_notification_for_subscriber(
311-
notif, bucket_name, object_path, version_id, api_method, action, event_name, headers
316+
notification: Dict,
317+
bucket_name: str,
318+
object_path: str,
319+
version_id: str,
320+
api_method: str,
321+
action: str,
322+
event_name: str,
323+
headers,
312324
):
313325
bucket_name = normalize_bucket_name(bucket_name)
314326

315-
if not event_type_matches(notif["Event"], action, api_method) or not filter_rules_match(
316-
notif.get("Filter"), object_path
327+
if not event_type_matches(notification["Event"], action, api_method) or not filter_rules_match(
328+
notification.get("Filter"), object_path
317329
):
318330
return
319331

@@ -326,6 +338,8 @@ def send_notification_for_subscriber(
326338
except botocore.exceptions.ClientError:
327339
pass
328340

341+
source_ip = headers.get("X-Forwarded-For", "127.0.0.1").split(",")[0]
342+
329343
# build event message
330344
message = get_event_message(
331345
event_name=event_name,
@@ -334,45 +348,40 @@ def send_notification_for_subscriber(
334348
etag=object_data.get("ETag", ""),
335349
file_size=object_data.get("ContentLength", 0),
336350
version_id=version_id,
337-
config_id=notif["Id"],
351+
config_id=notification["Id"],
352+
source_ip=source_ip,
338353
)
339354
message = json.dumps(message)
340355

341-
if notif.get("Queue"):
342-
region = aws_stack.extract_region_from_arn(notif["Queue"])
356+
if notification.get("Queue"):
357+
region = aws_stack.extract_region_from_arn(notification["Queue"])
343358
sqs_client = aws_stack.connect_to_service("sqs", region_name=region)
344359
try:
345-
queue_url = aws_stack.sqs_queue_url_for_arn(notif["Queue"])
360+
queue_url = aws_stack.sqs_queue_url_for_arn(notification["Queue"])
346361
sqs_client.send_message(
347362
QueueUrl=queue_url,
348363
MessageBody=message,
349364
MessageSystemAttributes=create_sqs_system_attributes(headers),
350365
)
351366
except Exception as e:
352367
LOGGER.warning(
353-
'Unable to send notification for S3 bucket "%s" to SQS queue "%s": %s',
354-
bucket_name,
355-
notif["Queue"],
356-
e,
368+
f"Unable to send notification for S3 bucket \"{bucket_name}\" to SQS queue \"{notification['Queue']}\": {e}",
357369
)
358-
if notif.get("Topic"):
359-
region = aws_stack.extract_region_from_arn(notif["Topic"])
370+
if notification.get("Topic"):
371+
region = aws_stack.extract_region_from_arn(notification["Topic"])
360372
sns_client = aws_stack.connect_to_service("sns", region_name=region)
361373
try:
362374
sns_client.publish(
363-
TopicArn=notif["Topic"],
375+
TopicArn=notification["Topic"],
364376
Message=message,
365377
Subject="Amazon S3 Notification",
366378
)
367379
except Exception as e:
368380
LOGGER.warning(
369-
'Unable to send notification for S3 bucket "%s" to SNS topic "%s": %s',
370-
bucket_name,
371-
notif["Topic"],
372-
e,
381+
f"Unable to send notification for S3 bucket \"{bucket_name}\" to SNS topic \"{notification['Topic']}\": {e}"
373382
)
374383
# CloudFunction and LambdaFunction are semantically identical
375-
lambda_function_config = notif.get("CloudFunction") or notif.get("LambdaFunction")
384+
lambda_function_config = notification.get("CloudFunction") or notification.get("LambdaFunction")
376385
if lambda_function_config:
377386
# make sure we don't run into a socket timeout
378387
region = aws_stack.extract_region_from_arn(lambda_function_config)
@@ -388,12 +397,61 @@ def send_notification_for_subscriber(
388397
)
389398
except Exception:
390399
LOGGER.warning(
391-
'Unable to send notification for S3 bucket "%s" to Lambda function "%s".',
392-
bucket_name,
393-
lambda_function_config,
400+
f'Unable to send notification for S3 bucket "{bucket_name}" to Lambda function "{lambda_function_config}".'
401+
)
402+
403+
if "EventBridge" in notification:
404+
s3api_client = aws_stack.connect_to_service("s3")
405+
region = (
406+
s3api_client.get_bucket_location(Bucket=bucket_name)["LocationConstraint"]
407+
or config.DEFAULT_REGION
408+
)
409+
events_client = aws_stack.connect_to_service("events", region_name=region)
410+
411+
entry = {
412+
"Source": "aws.s3",
413+
"Resources": [f"arn:aws:s3:::{bucket_name}"],
414+
"Detail": {
415+
"version": version_id or "0",
416+
"bucket": {"name": bucket_name},
417+
"object": {
418+
"key": key,
419+
"size": object_data.get("ContentLength"),
420+
"etag": object_data.get("ETag", ""),
421+
"sequencer": "0062E99A88DC407460",
422+
},
423+
"request-id": "RKREYG1RN2X92YX6",
424+
"requester": "074255357339",
425+
"source-ip-address": source_ip,
426+
},
427+
}
428+
429+
if action == "ObjectCreated":
430+
entry["DetailType"] = "Object Created"
431+
entry["Detail"]["reason"] = f"{api_method}Object"
432+
433+
if action == "ObjectRemoved":
434+
entry["DetailType"] = "Object Deleted"
435+
entry["Detail"]["reason"] = f"{api_method}Object"
436+
entry["Detail"]["deletion-type"] = "Permanently Deleted"
437+
entry["Detail"]["object"].pop("etag")
438+
entry["Detail"]["object"].pop("size")
439+
440+
if action == "ObjectTagging":
441+
entry["DetailType"] = (
442+
"Object Tags Added" if api_method == "Put" else "Object Tags Deleted"
443+
)
444+
445+
entry["Detail"] = json.dumps(entry["Detail"])
446+
447+
try:
448+
events_client.put_events(Entries=[entry])
449+
except Exception as e:
450+
LOGGER.exception(
451+
f'Unable to send notification for S3 bucket "{bucket_name}" to EventBridge', e
394452
)
395453

396-
if not filter(lambda x: notif.get(x), NOTIFICATION_DESTINATION_TYPES):
454+
if not filter(lambda x: notification.get(x), NOTIFICATION_DESTINATION_TYPES):
397455
LOGGER.warning(
398456
"Neither of %s defined for S3 notification.", "/".join(NOTIFICATION_DESTINATION_TYPES)
399457
)
@@ -1227,11 +1285,16 @@ def handle_put_bucket_notification(bucket, data):
12271285
parsed = strip_xmlns(xmltodict.parse(data))
12281286
notif_config = parsed.get("NotificationConfiguration")
12291287

1288+
if "EventBridgeConfiguration" in notif_config:
1289+
notif_config.update(
1290+
{"EventBridgeConfiguration": {"Event": "s3:*", "EventBridgeEnabled": True}}
1291+
)
1292+
12301293
notifications = BackendState.notification_configs(bucket)
12311294
notifications.clear()
12321295

12331296
for dest in NOTIFICATION_DESTINATION_TYPES:
1234-
config = notif_config.get("%sConfiguration" % dest)
1297+
config = notif_config.get(f"{dest}Configuration")
12351298
configs = config if isinstance(config, list) else [config] if config else []
12361299
for config in configs:
12371300
events = config.get("Event")

localstack/testing/pytest/fixtures.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1496,6 +1496,38 @@ def _create_delivery_stream(**kwargs):
14961496
firehose_client.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
14971497

14981498

1499 9920 +
@pytest.fixture
1500+
def events_create_rule(events_client):
1501+
rules = []
1502+
1503+
def _create_rule(**kwargs):
1504+
rule_name = kwargs["Name"]
1505+
bus_name = kwargs.get("EventBusName", "")
1506+
pattern = kwargs.get("EventPattern", {})
1507+
schedule = kwargs.get("ScheduleExpression", "")
1508+
rule_arn = events_client.put_rule(
1509+
Name=rule_name,
1510+
EventBusName=bus_name,
1511+
EventPattern=json.dumps(pattern),
1512+
ScheduleExpression=schedule,
1513+
)["RuleArn"]
1514+
rules.append({"name": rule_name, "bus": bus_name})
1515+
return rule_arn
1516+
1517+
yield _create_rule
1518+
1519+
for rule in rules:
1520+
targets = events_client.list_targets_by_rule(Rule=rule["name"], EventBusName=rule["bus"])[
1521+
"Targets"
1522+
]
1523+
1524+
targetIds = [target["Id"] for target in targets]
1525+
if len(targetIds) > 0:
1526+
events_client.remove_targets(Rule=rule["name"], EventBusName=rule["bus"], Ids=targetIds)
1527+
1528+
events_client.delete_rule(Name=rule["name"], EventBusName=rule["bus"])
1529+
1530+
14991531
@pytest.fixture
15001532
def cleanups(ec2_client):
15011533
cleanup_fns = []
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import json
2+
3+
import pytest
4+
5+
from localstack.testing.aws.util import is_aws_cloud
6+
from localstack.utils.strings import short_uid
7+
from localstack.utils.sync import retry
8+
9+
10+
class TestS3NotificationsToEventBridge:
11+
@pytest.mark.aws_validated
12+
def test_object_created_put(
13+
self,
14+
s3_client,
15+
s3_create_bucket,
16+
sqs_client,
17+
sqs_create_queue,
18+
sqs_queue_arn,
19+
events_client,
20+
events_create_rule,
21+
snapshot,
22+
):
23+
24+
bus_name = "default"
25+
queue_name = f"test-queue-{short_uid()}"
26+
bucket_name = f"test-bucket-{short_uid()}"
27+
rule_name = f"test-rule-{short_uid()}"
28+
target_id = f"test-target-{short_uid()}"
29+
30+
s3_create_bucket(Bucket=bucket_name)
31+
s3_client.put_bucket_notification_configuration(
32+
Bucket=bucket_name, NotificationConfiguration={"EventBridgeConfiguration": {}}
33+
)
34+
35+
pattern = {
36+
"source": ["aws.s3"],
37+
"detail-type": [
38+
"Object Created",
39+
"Object Deleted",
40+
"Object Restore Initiated",
41+
"Object Restore Completed",
42+
"Object Restore Expired",
43+
"Object Tags Added",
44+
"Object Tags Deleted",
45+
"Object ACL Updated",
46+
"Object Storage Class Changed",
47+
"Object Access Tier Changed",
48+
],
49+
"detail": {"bucket": {"name": [bucket_name]}},
50+
}
51+
rule_arn = events_create_rule(Name=rule_name, EventBusName=bus_name, EventPattern=pattern)
52+
53+
queue_url = sqs_create_queue(QueueName=queue_name)
54+
queue_arn = sqs_queue_arn(queue_url)
55+
queue_policy = {
56+
"Statement": [
57+
{
58+
"Sid": "EventsToMyQueue",
59+
"Effect": "Allow",
60+
"Principal": {"Service": "events.amazonaws.com"},
61+
"Action": "sqs:SendMessage",
62+
"Resource": queue_arn,
63+
"Condition": {"ArnEquals": {"aws:SourceArn": rule_arn}},
64+
}
65+
]
66+
}
67+
sqs_client.set_queue_attributes(
68+
QueueUrl=queue_url,
69+
Attributes={"Policy": json.dumps(queue_policy), "ReceiveMessageWaitTimeSeconds": "5"},
70+
)
71+
events_client.put_targets(Rule=rule_name, Targets=[{"Id": target_id, "Arn": queue_arn}])
72+
73+
test_key = "test-key"
74+
s3_client.put_object(Bucket=bucket_name, Key=test_key, Body=b"data")
75+
s3_client.delete_object(Bucket=bucket_name, Key=test_key)
76+
77+
messages = {}
78+
79+
snapshot.add_transformer(snapshot.transform.s3_api())
80+
snapshot.add_transformers_list(
81+
[
82+
snapshot.transform.jsonpath(
83+
"$..account", "111111111111", reference_replacement=False
84+
),
85+
snapshot.transform.jsonpath("$..detail.bucket.name", "bucket-name"),
86+
snapshot.transform.jsonpath("$..detail.object.key", "key-name"),
87+
snapshot.transform.jsonpath("$..detail.object.etag", "object-etag"),
88+
snapshot.transform.jsonpath(
89+
"$..detail.object.sequencer", "object-sequencer", reference_replacement=False
90+
),
91+
snapshot.transform.jsonpath(
92+
"$..detail.request-id", "request-id", reference_replacement=False
93+
),
94+
snapshot.transform.jsonpath("$..detail.requester", "111111111111"),
95+
snapshot.transform.jsonpath("$..detail.source-ip-address", "ip-address"),
96+
]
97+
)
98+
99+
def _receive_messages():
100+
received = sqs_client.receive_message(QueueUrl=queue_url).get("Messages", [])
101+
for msg in received:
102+
event_message = json.loads(msg["Body"])
103+
messages.update({event_message["detail-type"]: event_message})
104+
105+
assert len(messages) == 2
106+
107+
retries = 10 if is_aws_cloud() else 5
108+
retry(_receive_messages, retries=retries)
109+
110+
snapshot.match("object_deleted", messages.get("Object Deleted"))
111+
snapshot.match("object_created", messages.get("Object Created"))

0 commit comments

Comments
 (0)
0