@@ -203,8 +203,9 @@ def _initialize_client(self) -> BaseClient:
203
203
return client
204
204
205
205
def _validate_input_transformer (self , input_transformer : InputTransformer ):
206
- if "InputTemplate" not in input_transformer :
207
- raise ValueError ("InputTemplate is required for InputTransformer" )
206
+ # TODO: cover via test
207
+ # if "InputTemplate" not in input_transformer:
208
+ # raise ValueError("InputTemplate is required for InputTransformer")
208
209
input_template = input_transformer ["InputTemplate" ]
209
210
input_paths_map = input_transformer .get ("InputPathsMap" , {})
210
211
placeholders = TRANSFORMER_PLACEHOLDER_PATTERN .findall (input_template )
@@ -338,8 +339,9 @@ def send_event(self, event):
338
339
339
340
def _validate_input (self , target : Target ):
340
341
super ()._validate_input (target )
341
- if not collections .get_safe (target , "$.RoleArn" ):
342
- raise ValueError ("RoleArn is required for ApiGateway target" )
342
+ # TODO: cover via test
343
+ # if not collections.get_safe(target, "$.RoleArn"):
344
+ # raise ValueError("RoleArn is required for ApiGateway target")
343
345
344
346
def _get_predefined_template_replacements (self , event : Dict [str , Any ]) -> Dict [str , Any ]:
345
347
"""Extracts predefined values from the event."""
@@ -365,10 +367,12 @@ def send_event(self, event):
365
367
raise NotImplementedError ("Batch target is not yet implemented" )
366
368
367
369
def _validate_input (self , target : Target ):
368
- if not collections .get_safe (target , "$.BatchParameters.JobDefinition" ):
369
- raise ValueError ("BatchParameters.JobDefinition is required for Batch target" )
370
- if not collections .get_safe (target , "$.BatchParameters.JobName" ):
371
- raise ValueError ("BatchParameters.JobName is required for Batch target" )
370
+ # TODO: cover via test and fix (only required if we have BatchParameters)
371
+ # if not collections.get_safe(target, "$.BatchParameters.JobDefinition"):
372
+ # raise ValueError("BatchParameters.JobDefinition is required for Batch target")
373
+ # if not collections.get_safe(target, "$.BatchParameters.JobName"):
374
+ # raise ValueError("BatchParameters.JobName is required for Batch target")
375
+ pass
372
376
373
377
374
378
class ContainerTargetSender (TargetSender ):
@@ -377,8 +381,9 @@ def send_event(self, event):
377
381
378
382
def _validate_input (self , target : Target ):
379
383
super ()._validate_input (target )
380
- if not collections .get_safe (target , "$.EcsParameters.TaskDefinitionArn" ):
381
- raise ValueError ("EcsParameters.TaskDefinitionArn is required for ECS target" )
384
+ # TODO: cover via test
385
+ # if not collections.get_safe(target, "$.EcsParameters.TaskDefinitionArn"):
386
+ # raise ValueError("EcsParameters.TaskDefinitionArn is required for ECS target")
382
387
383
388
384
389
class EventsTargetSender (TargetSender ):
@@ -434,9 +439,13 @@ def send_event(self, event):
434
439
435
440
class KinesisTargetSender (TargetSender ):
436
441
def send_event (self , event ):
437
- partition_key_path = self .target ["KinesisParameters" ]["PartitionKeyPath" ]
442
+ partition_key_path = collections .get_safe (
443
+ self .target ,
444
+ "$.KinesisParameters.PartitionKeyPath" ,
445
+ default_value = "$.id" ,
446
+ )
438
447
stream_name = self .target ["Arn" ].split ("/" )[- 1 ]
439
- partition_key = event . get ( partition_key_path , event ["id" ])
448
+ partition_key = collections . get_safe ( event , partition_key_path , event ["id" ])
440
449
self .client .put_record (
441
450
StreamName = stream_name ,
442
451
Data = to_bytes (to_json_str (event )),
@@ -445,19 +454,19 @@ def send_event(self, event):
445
454
446
455
def _validate_input (self , target : Target ):
447
456
super ()._validate_input (target )
448
- if not collections .get_safe (target , "$.RoleArn" ):
449
- raise ValueError ("RoleArn is required for Kinesis target" )
450
- if not collections .get_safe (target , "$.KinesisParameters.PartitionKeyPath" ):
451
- raise ValueError ("KinesisParameters.PartitionKeyPath is required for Kinesis target" )
457
+ # TODO: cover via tests
458
+ # if not collections.get_safe(target, "$.RoleArn"):
459
+ # raise ValueError("RoleArn is required for Kinesis target")
460
+ # if not collections.get_safe(target, "$.KinesisParameters.PartitionKeyPath"):
461
+ # raise ValueError("KinesisParameters.PartitionKeyPath is required for Kinesis target")
452
462
453
463
454
464
class LambdaTargetSender (TargetSender ):
455
465
def send_event (self , event ):
456
- asynchronous = True # TODO clarify default behavior of AWS
457
466
self .client .invoke (
458
467
FunctionName = self .target ["Arn" ],
459
468
Payload = to_bytes (to_json_str (event )),
460
- InvocationType = "Event" if asynchronous else "RequestResponse" ,
469
+ InvocationType = "Event" ,
461
470
)
462
471
463
472
@@ -484,8 +493,9 @@ def send_event(self, event):
484
493
485
494
def _validate_input (self , target : Target ):
486
495
super ()._validate_input (target )
487
- if not collections .get_safe (target , "$.RedshiftDataParameters.Database" ):
488
- raise ValueError ("RedshiftDataParameters.Database is required for Redshift target" )
496
+ # TODO: cover via test
497
+ # if not collections.get_safe(target, "$.RedshiftDataParameters.Database"):
498
+ # raise ValueError("RedshiftDataParameters.Database is required for Redshift target")
489
499
490
500
491
501
class SagemakerTargetSender (TargetSender ):
@@ -521,8 +531,9 @@ def send_event(self, event):
521
531
522
532
def _validate_input (self , target : Target ):
523
533
super ()._validate_input (target )
524
- if not collections .get_safe (target , "$.RoleArn" ):
525
- raise ValueError ("RoleArn is required for StepFunctions target" )
534
+ # TODO: cover via test
535
+ # if not collections.get_safe(target, "$.RoleArn"):
536
+ # raise ValueError("RoleArn is required for StepFunctions target")
526
537
527
538
528
539
class SystemsManagerSender (TargetSender ):
@@ -533,14 +544,15 @@ def send_event(self, event):
533
544
534
545
def _validate_input (self , target : Target ):
535
546
super ()._validate_input (target )
536
- if not collections .get_safe (target , "$.RoleArn" ):
537
- raise ValueError (
538
- "RoleArn is required for SystemManager target to invoke a EC2 run command"
539
- )
540
- if not collections .get_safe (target , "$.RunCommandParameters.RunCommandTargets" ):
541
- raise ValueError (
542
- "RunCommandParameters.RunCommandTargets is required for Systems Manager target"
543
- )
547
+ # TODO: cover via test
548
+ # if not collections.get_safe(target, "$.RoleArn"):
549
+ # raise ValueError(
550
+ # "RoleArn is required for SystemManager target to invoke a EC2 run command"
551
+ # )
552
+ # if not collections.get_safe(target, "$.RunCommandParameters.RunCommandTargets"):
553
+ # raise ValueError(
554
+ # "RunCommandParameters.RunCommandTargets is required for Systems Manager target"
555
+ # )
544
556
545
557
546
558
class TargetSenderFactory :
0 commit comments