78
78
)
79
79
80
80
# list of destination types for bucket notifications
81
- NOTIFICATION_DESTINATION_TYPES = ("Queue" , "Topic" , "CloudFunction" , "LambdaFunction" )
81
+ NOTIFICATION_DESTINATION_TYPES = (
82
+ "Queue" ,
83
+ "Topic" ,
84
+ "CloudFunction" ,
85
+ "LambdaFunction" ,
86
+ "EventBridge" ,
87
+ )
82
88
83
89
# prefix for object metadata keys in headers and query params
84
90
OBJECT_METADATA_KEY_PREFIX = "x-amz-meta-"
@@ -231,6 +237,7 @@ def get_event_message(
231
237
version_id = None ,
232
238
file_size = 0 ,
233
239
config_id = "testConfigRule" ,
240
+ source_ip = "127.0.0.1" ,
234
241
):
235
242
# Based on: http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
236
243
bucket_name = normalize_bucket_name (bucket_name )
@@ -243,9 +250,7 @@ def get_event_message(
243
250
"eventTime" : timestamp_millis (),
244
251
"eventName" : event_name ,
245
252
"userIdentity" : {"principalId" : "AIDAJDPLRKLG7UEXAMPLE" },
246
- "requestParameters" : {
247
- "sourceIPAddress" : "127.0.0.1"
248
- }, # TODO determine real source IP
253
+ "requestParameters" : {"sourceIPAddress" : source_ip },
249
254
"responseElements" : {
250
255
"x-amz-request-id" : short_uid (),
251
256
"x-amz-id-2" : "eftixk72aD6Ap51TnqcoF8eFidJG9Z/2" , # Amazon S3 host that processed the request
@@ -308,12 +313,19 @@ def send_notifications(method, bucket_name, object_path, version_id, headers, me
308
313
309
314
310
315
def send_notification_for_subscriber (
311
- notif , bucket_name , object_path , version_id , api_method , action , event_name , headers
316
+ notification : Dict ,
317
+ bucket_name : str ,
318
+ object_path : str ,
319
+ version_id : str ,
320
+ api_method : str ,
321
+ action : str ,
322
+ event_name : str ,
323
+ headers ,
312
324
):
313
325
bucket_name = normalize_bucket_name (bucket_name )
314
326
315
- if not event_type_matches (notif ["Event" ], action , api_method ) or not filter_rules_match (
316
- notif .get ("Filter" ), object_path
327
+ if not event_type_matches (notification ["Event" ], action , api_method ) or not filter_rules_match (
328
+ notification .get ("Filter" ), object_path
317
329
):
318
330
return
319
331
@@ -326,6 +338,8 @@ def send_notification_for_subscriber(
326
338
except botocore .exceptions .ClientError :
327
339
pass
328
340
341
+ source_ip = headers .get ("X-Forwarded-For" , "127.0.0.1" ).split ("," )[0 ]
342
+
329
343
# build event message
330
344
message = get_event_message (
331
345
event_name = event_name ,
@@ -334,45 +348,40 @@ def send_notification_for_subscriber(
334
348
etag = object_data .get ("ETag" , "" ),
335
349
file_size = object_data .get ("ContentLength" , 0 ),
336
350
version_id = version_id ,
337
- config_id = notif ["Id" ],
351
+ config_id = notification ["Id" ],
352
+ source_ip = source_ip ,
338
353
)
339
354
message = json .dumps (message )
340
355
341
- if notif .get ("Queue" ):
342
- region = aws_stack .extract_region_from_arn (notif ["Queue" ])
356
+ if notification .get ("Queue" ):
357
+ region = aws_stack .extract_region_from_arn (notification ["Queue" ])
343
358
sqs_client = aws_stack .connect_to_service ("sqs" , region_name = region )
344
359
try :
345
- queue_url = aws_stack .sqs_queue_url_for_arn (notif ["Queue" ])
360
+ queue_url = aws_stack .sqs_queue_url_for_arn (notification ["Queue" ])
346
361
sqs_client .send_message (
347
362
QueueUrl = queue_url ,
348
363
MessageBody = message ,
349
364
MessageSystemAttributes = create_sqs_system_attributes (headers ),
350
365
)
351
366
except Exception as e :
352
367
LOGGER .warning (
353
- 'Unable to send notification for S3 bucket "%s" to SQS queue "%s": %s' ,
354
- bucket_name ,
355
- notif ["Queue" ],
356
- e ,
368
+ f"Unable to send notification for S3 bucket \" { bucket_name } \" to SQS queue \" { notification ['Queue' ]} \" : { e } " ,
357
369
)
358
- if notif .get ("Topic" ):
359
- region = aws_stack .extract_region_from_arn (notif ["Topic" ])
370
+ if notification .get ("Topic" ):
371
+ region = aws_stack .extract_region_from_arn (notification ["Topic" ])
360
372
sns_client = aws_stack .connect_to_service ("sns" , region_name = region )
361
373
try :
362
374
sns_client .publish (
363
- TopicArn = notif ["Topic" ],
375
+ TopicArn = notification ["Topic" ],
364
376
Message = message ,
365
377
Subject = "Amazon S3 Notification" ,
366
378
)
367
379
except Exception as e :
368
380
LOGGER .warning (
369
- 'Unable to send notification for S3 bucket "%s" to SNS topic "%s": %s' ,
370
- bucket_name ,
371
- notif ["Topic" ],
372
- e ,
381
+ f"Unable to send notification for S3 bucket \" { bucket_name } \" to SNS topic \" { notification ['Topic' ]} \" : { e } "
373
382
)
374
383
# CloudFunction and LambdaFunction are semantically identical
375
- lambda_function_config = notif .get ("CloudFunction" ) or notif .get ("LambdaFunction" )
384
+ lambda_function_config = notification .get ("CloudFunction" ) or notification .get ("LambdaFunction" )
376
385
if lambda_function_config :
377
386
# make sure we don't run into a socket timeout
378
387
region = aws_stack .extract_region_from_arn (lambda_function_config )
@@ -388,12 +397,61 @@ def send_notification_for_subscriber(
388
397
)
389
398
except Exception :
390
399
LOGGER .warning (
391
- 'Unable to send notification for S3 bucket "%s" to Lambda function "%s".' ,
392
- bucket_name ,
393
- lambda_function_config ,
400
+ f'Unable to send notification for S3 bucket "{ bucket_name } " to Lambda function "{ lambda_function_config } ".'
401
+ )
402
+
403
+ if "EventBridge" in notification :
404
+ s3api_client = aws_stack .connect_to_service ("s3" )
405
+ region = (
406
+ s3api_client .get_bucket_location (Bucket = bucket_name )["LocationConstraint" ]
407
+ or config .DEFAULT_REGION
408
+ )
409
+ events_client = aws_stack .connect_to_service ("events" , region_name = region )
410
+
411
+ entry = {
412
+ "Source" : "aws.s3" ,
413
+ "Resources" : [f"arn:aws:s3:::{ bucket_name } " ],
414
+ "Detail" : {
415
+ "version" : version_id or "0" ,
416
+ "bucket" : {"name" : bucket_name },
417
+ "object" : {
418
+ "key" : key ,
419
+ "size" : object_data .get ("ContentLength" ),
420
+ "etag" : object_data .get ("ETag" , "" ),
421
+ "sequencer" : "0062E99A88DC407460" ,
422
+ },
423
+ "request-id" : "RKREYG1RN2X92YX6" ,
424
+ "requester" : "074255357339" ,
425
+ "source-ip-address" : source_ip ,
426
+ },
427
+ }
428
+
429
+ if action == "ObjectCreated" :
430
+ entry ["DetailType" ] = "Object Created"
431
+ entry ["Detail" ]["reason" ] = f"{ api_method } Object"
432
+
433
+ if action == "ObjectRemoved" :
434
+ entry ["DetailType" ] = "Object Deleted"
435
+ entry ["Detail" ]["reason" ] = f"{ api_method } Object"
436
+ entry ["Detail" ]["deletion-type" ] = "Permanently Deleted"
437
+ entry ["Detail" ]["object" ].pop ("etag" )
438
+ entry ["Detail" ]["object" ].pop ("size" )
439
+
440
+ if action == "ObjectTagging" :
441
+ entry ["DetailType" ] = (
442
+ "Object Tags Added" if api_method == "Put" else "Object Tags Deleted"
443
+ )
444
+
445
+ entry ["Detail" ] = json .dumps (entry ["Detail" ])
446
+
447
+ try :
448
+ events_client .put_events (Entries = [entry ])
449
+ except Exception as e :
450
+ LOGGER .exception (
451
+ f'Unable to send notification for S3 bucket "{ bucket_name } " to EventBridge' , e
394
452
)
395
453
396
- if not filter (lambda x : notif .get (x ), NOTIFICATION_DESTINATION_TYPES ):
454
+ if not filter (lambda x : notification .get (x ), NOTIFICATION_DESTINATION_TYPES ):
397
455
LOGGER .warning (
398
456
"Neither of %s defined for S3 notification." , "/" .join (NOTIFICATION_DESTINATION_TYPES )
399
457
)
@@ -1227,11 +1285,16 @@ def handle_put_bucket_notification(bucket, data):
1227
1285
parsed = strip_xmlns (xmltodict .parse (data ))
1228
1286
notif_config = parsed .get ("NotificationConfiguration" )
1229
1287
1288
+ if "EventBridgeConfiguration" in notif_config :
1289
+ notif_config .update (
1290
+ {"EventBridgeConfiguration" : {"Event" : "s3:*" , "EventBridgeEnabled" : True }}
1291
+ )
1292
+
1230
1293
notifications = BackendState .notification_configs (bucket )
1231
1294
notifications .clear ()
1232
1295
1233
1296
for dest in NOTIFICATION_DESTINATION_TYPES :
1234
- config = notif_config .get ("%sConfiguration" % dest )
1297
+ config = notif_config .get (f" { dest } Configuration" )
1235
1298
configs = config if isinstance (config , list ) else [config ] if config else []
1236
1299
for config in configs :
1237
1300
events = config .get ("Event" )
0 commit comments