@@ -545,7 +545,7 @@ def put_targets(
545
545
546
546
if rule_service .schedule_cron :
547
547
schedule_job_function = self ._get_scheduled_rule_job_function (
548
- account_id , region , rule_service .rule , context
548
+ account_id , region , rule_service .rule
549
549
)
550
550
rule_service .create_schedule_job (schedule_job_function )
551
551
response = PutTargetsResponse (
@@ -1109,9 +1109,7 @@ def _check_resource_exists(
1109
1109
rule_name = resource_arn .split ("/" )[- 1 ]
1110
1110
self .get_rule (rule_name , event_bus )
1111
1111
1112
- def _get_scheduled_rule_job_function (
1113
- self , account_id , region , rule : Rule , context : RequestContext
1114
- ) -> Callable :
1112
+ def _get_scheduled_rule_job_function (self , account_id , region , rule : Rule ) -> Callable :
1115
1113
def func (* args , ** kwargs ):
1116
1114
"""Create custom scheduled event and send it to all targets specified by associated rule using respective TargetSender"""
1117
1115
for target in rule .targets .values ():
@@ -1132,7 +1130,7 @@ def func(*args, **kwargs):
1132
1130
1133
1131
target_sender = self ._target_sender_store [target ["Arn" ]]
1134
1132
try :
1135
- target_sender .process_event (event , context )
1133
+ target_sender .process_event (event . copy () )
1136
1134
except Exception as e :
1137
1135
LOG .info (
1138
1136
"Unable to send event notification %s to target %s: %s" ,
@@ -1327,52 +1325,79 @@ def _process_entries(
1327
1325
For matching rules the event is either sent to the respective target,
1328
1326
via the target sender put to the defined archived."""
1329
1327
processed_entries = []
1330
- failed_entry_count = 0
1328
+ failed_entry_count = { "count" : 0 }
1331
1329
for event in entries :
1332
- event_bus_name_or_arn = event .get ("EventBusName" , "default" )
1333
- event_bus_name = extract_event_bus_name (event_bus_name_or_arn )
1334
- if event_failed_validation := validate_event (event ):
1335
- processed_entries .append (event_failed_validation )
1336
- failed_entry_count += 1
1337
- continue
1338
- region , account_id = extract_region_and_account_id (event_bus_name_or_arn , context )
1339
- if encoded_trace_header := get_trace_header_encoded_region_account (
1340
- event , context .region , context .account_id , region , account_id
1341
- ):
1342
- event ["TraceHeader" ] = encoded_trace_header
1343
- event_formatted = format_event (event , region , account_id , event_bus_name )
1344
- store = self .get_store (region , account_id )
1345
- try :
1346
- event_bus = self .get_event_bus (event_bus_name , store )
1347
- except ResourceNotFoundException :
1348
- # ignore events for non-existing event buses but add processed event
1349
- processed_entries .append ({"EventId" : event_formatted ["id" ]})
1350
- continue
1351
- matching_rules = [rule for rule in event_bus .rules .values ()]
1352
- for rule in matching_rules :
1353
- event_pattern = rule .event_pattern
1354
- event_str = to_json_str (event_formatted )
1355
- if matches_rule (event_str , event_pattern ):
1356
- for target in rule .targets .values ():
1357
- target_arn = target ["Arn" ]
1358
- if is_archive_arn (target_arn ):
1359
<
8000
/td>
- self ._put_to_archive (
1360
- region ,
1361
- account_id ,
1362
- archive_target_id = target ["Id" ],
1363
- event = event_formatted ,
1364
- )
1365
- else :
1366
- target_sender = self ._target_sender_store [target_arn ]
1367
- try :
1368
- target_sender .process_event (event_formatted , context )
1369
- processed_entries .append ({"EventId" : event_formatted ["id" ]})
1370
- except Exception as error :
1371
- processed_entries .append (
1372
- {
1373
- "ErrorCode" : "InternalException" ,
1374
- "ErrorMessage" : str (error ),
1375
- }
1376
- )
1377
- failed_entry_count += 1
1378
- return processed_entries , failed_entry_count
1330
+ self ._process_entry (event , processed_entries , failed_entry_count , context )
1331
+ return processed_entries , failed_entry_count ["count" ]
1332
+
1333
+ def _process_entry (
1334
+ self ,
1335
+ entry : PutEventsRequestEntry ,
1336
+ processed_entries : PutEventsResultEntryList ,
1337
+ failed_entry_count : dict [str , int ],
1338
+ context : RequestContext ,
1339
+ ) -> None :
1340
+ event_bus_name_or_arn = entry .get ("EventBusName" , "default" )
1341
+ event_bus_name = extract_event_bus_name (event_bus_name_or_arn )
1342
+ if event_failed_validation := validate_event (entry ):
1343
+ processed_entries .append (event_failed_validation )
1344
+ failed_entry_count ["count" ] += 1
1345
+ return
1346
+ region , account_id = extract_region_and_account_id (event_bus_name_or_arn , context )
1347
+ if encoded_trace_header := get_trace_header_encoded_region_account (
1348
+ entry , context .region , context .account_id , region , account_id
1349
+ ):
1350
+ entry ["TraceHeader" ] = encoded_trace_header
1351
+ event_formatted = format_event (entry , region , account_id , event_bus_name )
1352
+ store = self .get_store (region , account_id )
1353
+ try :
1354
+ event_bus = self .get_event_bus (event_bus_name , store )
1355
+ except ResourceNotFoundException :
1356
+ # ignore events for non-existing event buses but add processed event
1357
+ processed_entries .append ({"EventId" : event_formatted ["id" ]})
1358
+ return
1359
+ self ._proxy_capture_input_event (event_formatted )
1360
+ matching_rules = [rule for rule in event_bus .rules .values ()]
1361
+ for rule in matching_rules :
1362
+ self ._process_matched_rules (
1363
+ rule , region , account_id , event_formatted , processed_entries , failed_entry_count
1364
+ )
1365
+
1366
+ def _proxy_capture_input_event (self , event : FormattedEvent ) -> None :
1367
+ # only required for eventstudio to capture input event if no rule is configured
1368
+ pass
1369
+
1370
+ def _process_matched_rules (
1371
+ self ,
1372
+ rule : Rule ,
1373
+ region : str ,
1374
+ account_id : str ,
1375
+ event_formatted : FormattedEvent ,
1376
+ processed_entries : PutEventsResultEntryList ,
1377
+ failed_entry_count : dict [str , int ],
1378
+ ) -> None :
1379
+ event_pattern = rule .event_pattern
1380
+ event_str = to_json_str (event_formatted )
1381
+ if matches_rule (event_str , event_pattern ):
1382
+ for target in rule .targets .values ():
1383
+ target_arn = target ["Arn" ]
1384
+ if is_archive_arn (target_arn ):
1385
+ self ._put_to_archive (
1386
+ region ,
1387
+ account_id ,
1388
+ archive_target_id = target ["Id" ],
1389
+ event = event_formatted ,
1390
+ )
1391
+ else :
1392
+ target_sender = self ._target_sender_store [target_arn ]
1393
+ try :
1394
+ target_sender .process_event (event_formatted .copy ())
1395
+ processed_entries .append ({"EventId" : event_formatted ["id" ]})
1396
+ except Exception as error :
1397
+ processed_entries .append (
1398
+ {
1399
+ "ErrorCode" : "InternalException" ,
1400
+ "ErrorMessage" : str (error ),
1401
+ }
1402
+ )
1403
+ failed_entry_count ["count" ] += 1
0 commit comments