8000 Feature/Eventstudio/Events: use thread local for trace_context propag… · localstack/localstack@1786ed9 · GitHub
[go: up one dir, main page]

Skip to content

Commit 1786ed9

Browse files
authored
Feature/Eventstudio/Events: use thread local for trace_context propagation (#11533)
1 parent df051a8 commit 1786ed9

File tree

2 files changed

+80
-65
lines changed

2 files changed

+80
-65
lines changed

localstack-core/localstack/services/events/provider.py

Lines changed: 78 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ def put_targets(
545545

546546
if rule_service.schedule_cron:
547547
schedule_job_function = self._get_scheduled_rule_job_function(
548-
account_id, region, rule_service.rule, context
548+
account_id, region, rule_service.rule
549549
)
550550
rule_service.create_schedule_job(schedule_job_function)
551551
response = PutTargetsResponse(
@@ -1109,9 +1109,7 @@ def _check_resource_exists(
11091109
rule_name = resource_arn.split("/")[-1]
11101110
self.get_rule(rule_name, event_bus)
11111111

1112-
def _get_scheduled_rule_job_function(
1113-
self, account_id, region, rule: Rule, context: RequestContext
1114-
) -> Callable:
1112+
def _get_scheduled_rule_job_function(self, account_id, region, rule: Rule) -> Callable:
11151113
def func(*args, **kwargs):
11161114
"""Create custom scheduled event and send it to all targets specified by associated rule using respective TargetSender"""
11171115
for target in rule.targets.values():
@@ -1132,7 +1130,7 @@ def func(*args, **kwargs):
11321130

11331131
target_sender = self._target_sender_store[target["Arn"]]
11341132
try:
1135-
target_sender.process_event(event, context)
1133+
target_sender.process_event(event.copy())
11361134
except Exception as e:
11371135
LOG.info(
11381136
"Unable to send event notification %s to target %s: %s",
@@ -1327,52 +1325,79 @@ def _process_entries(
13271325
For matching rules the event is either sent to the respective target,
13281326
via the target sender put to the defined archived."""
13291327
processed_entries = []
1330-
failed_entry_count = 0
1328+
failed_entry_count = {"count": 0}
13311329
for event in entries:
1332-
event_bus_name_or_arn = event.get("EventBusName", "default")
1333-
event_bus_name = extract_event_bus_name(event_bus_name_or_arn)
1334-
if event_failed_validation := validate_event(event):
1335-
processed_entries.append(event_failed_validation)
1336-
failed_entry_count += 1
1337-
continue
1338-
region, account_id = extract_region_and_account_id(event_bus_name_or_arn, context)
1339-
if encoded_trace_header := get_trace_header_encoded_region_account(
1340-
event, context.region, context.account_id, region, account_id
1341-
):
1342-
event["TraceHeader"] = encoded_trace_header
1343-
event_formatted = format_event(event, region, account_id, event_bus_name)
1344-
store = self.get_store(region, account_id)
1345-
try:
1346-
event_bus = self.get_event_bus(event_bus_name, store)
1347-
except ResourceNotFoundException:
1348-
# ignore events for non-existing event buses but add processed event
1349-
processed_entries.append({"EventId": event_formatted["id"]})
1350-
continue
1351-
matching_rules = [rule for rule in event_bus.rules.values()]
1352-
for rule in matching_rules:
1353-
event_pattern = rule.event_pattern
1354-
event_str = to_json_str(event_formatted)
1355-
if matches_rule(event_str, event_pattern):
1356-
for target in rule.targets.values():
1357-
target_arn = target["Arn"]
1358-
if is_archive_arn(target_arn):
1359< 8000 /td>-
self._put_to_archive(
1360-
region,
1361-
account_id,
1362-
archive_target_id=target["Id"],
1363-
event=event_formatted,
1364-
)
1365-
else:
1366-
target_sender = self._target_sender_store[target_arn]
1367-
try:
1368-
target_sender.process_event(event_formatted, context)
1369-
processed_entries.append({"EventId": event_formatted["id"]})
1370-
except Exception as error:
1371-
processed_entries.append(
1372-
{
1373-
"ErrorCode": "InternalException",
1374-
"ErrorMessage": str(error),
1375-
}
1376-
)
1377-
failed_entry_count += 1
1378-
return processed_entries, failed_entry_count
1330+
self._process_entry(event, processed_entries, failed_entry_count, context)
1331+
return processed_entries, failed_entry_count["count"]
1332+
1333+
def _process_entry(
1334+
self,
1335+
entry: PutEventsRequestEntry,
1336+
processed_entries: PutEventsResultEntryList,
1337+
failed_entry_count: dict[str, int],
1338+
context: RequestContext,
1339+
) -> None:
1340+
event_bus_name_or_arn = entry.get("EventBusName", "default")
1341+
event_bus_name = extract_event_bus_name(event_bus_name_or_arn)
1342+
if event_failed_validation := validate_event(entry):
1343+
processed_entries.append(event_failed_validation)
1344+
failed_entry_count["count"] += 1
1345+
return
1346+
region, account_id = extract_region_and_account_id(event_bus_name_or_arn, context)
1347+
if encoded_trace_header := get_trace_header_encoded_region_account(
1348+
entry, context.region, context.account_id, region, account_id
1349+
):
1350+
entry["TraceHeader"] = encoded_trace_header
1351+
event_formatted = format_event(entry, region, account_id, event_bus_name)
1352+
store = self.get_store(region, account_id)
1353+
try:
1354+
event_bus = self.get_event_bus(event_bus_name, store)
1355+
except ResourceNotFoundException:
1356+
# ignore events for non-existing event buses but add processed event
1357+
processed_entries.append({"EventId": event_formatted["id"]})
1358+
return
1359+
self._proxy_capture_input_event(event_formatted)
1360+
matching_rules = [rule for rule in event_bus.rules.values()]
1361+
for rule in matching_rules:
1362+
self._process_matched_rules(
1363+
rule, region, account_id, event_formatted, processed_entries, failed_entry_count
1364+
)
1365+
1366+
def _proxy_capture_input_event(self, event: FormattedEvent) -> None:
1367+
# only required for eventstudio to capture input event if no rule is configured
1368+
pass
1369+
1370+
def _process_matched_rules(
1371+
self,
1372+
rule: Rule,
1373+
region: str,
1374+
account_id: str,
1375+
event_formatted: FormattedEvent,
1376+
processed_entries: PutEventsResultEntryList,
1377+
failed_entry_count: dict[str, int],
1378+
) -> None:
1379+
event_pattern = rule.event_pattern
1380+
event_str = to_json_str(event_formatted)
1381+
if matches_rule(event_str, event_pattern):
1382+
for target in rule.targets.values():
1383+
target_arn = target["Arn"]
1384+
if is_archive_arn(target_arn):
1385+
self._put_to_archive(
1386+
region,
1387+
account_id,
1388+
archive_target_id=target["Id"],
1389+
event=event_formatted,
1390+
)
1391+
else:
1392+
target_sender = self._target_sender_store[target_arn]
1393+
try:
1394+
target_sender.process_event(event_formatted.copy())
1395+
processed_entries.append({"EventId": event_formatted["id"]})
1396+
except Exception as error:
1397+
processed_entries.append(
1398+
{
1399+
"ErrorCode": "InternalException",
1400+
"ErrorMessage": str(error),
1401+
}
1402+
)
1403+
failed_entry_count["count"] += 1

localstack-core/localstack/services/events/target.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
from botocore.client import BaseClient
1010

11-
from localstack.aws.api import RequestContext
1211
from localstack.aws.api.events import Arn, InputTransformer, RuleName, Target, TargetInputPath
1312
from localstack.aws.connect import connect_to
1413
from localstack.services.events.models import FormattedEvent, TransformedEvent, ValidationException
@@ -139,24 +138,15 @@ def client(self):
139138
def send_event(self, event: FormattedEvent | TransformedEvent):
140139
pass
141140

142-
def proxy_send_event(
143-
self, event: FormattedEvent | TransformedEvent, context: RequestContext
144-
): # context required by eventstudio
145-
"""Proxy method to process the event and send it to the target,
146-
in addition it removes the field event-bus-name from the event,
147-
required for EventStudio extension"""
148-
self.send_event(event)
149-
150-
def process_event(self, event: FormattedEvent, context: RequestContext):
151-
# context required by eventstudio
141+
def process_event(self, event: FormattedEvent):
152142
"""Processes the event and send it to the target."""
153143
if isinstance(event, dict):
154144
event.pop("event-bus-name", None)
155145
if input_path := self.target.get("InputPath"):
156146
event = transform_event_with_target_input_path(input_path, event)
157147
if input_transformer := self.target.get("InputTransformer"):
158148
event = self.transform_event_with_target_input_transformer(input_transformer, event)
159-
self.proxy_send_event(event, context)
149+
self.send_event(event)
160150

161151
def transform_event_with_target_input_transformer(
162152
self, input_transformer: InputTransformer, event: FormattedEvent

0 commit comments

Comments
 (0)
0