8000 Extract span tags from triggering event (#101) · DataDog/datadog-lambda-python@65aa35c · GitHub
[go: up one dir, main page]

Skip to content

Commit 65aa35c

Browse files
Extract span tags from triggering event (#101)
add function trigger span tags
1 parent 539b63a commit 65aa35c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1363
-194
lines changed

datadog_lambda/constants.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ class TraceHeader(object):
2222
# X-Ray subsegment to save Datadog trace metadata
2323
class XraySubsegment(object):
2424
NAME = "datadog-metadata"
25-
KEY = "trace"
25+
TRACE_KEY = "trace"
26+
LAMBDA_FUNCTION_TAGS_KEY = "lambda_function_tags"
2627
NAMESPACE = "datadog"
2728

2829

datadog_lambda/tracing.py

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ def _get_xray_trace_context():
6262
"trace-id": _convert_xray_trace_id(xray_trace_entity.trace_id),
6363
"parent-id": _convert_xray_entity_id(xray_trace_entity.id),
6464
"sampling-priority": _convert_xray_sampling(xray_trace_entity.sampled),
65-
"source": TraceContextSource.XRAY,
6665
}
6766

6867

@@ -90,6 +89,22 @@ def _context_obj_to_headers(obj):
9089
F438 }
9190

9291

92+
def create_dd_dummy_metadata_subsegment(
93+
subsegment_metadata_value, subsegment_metadata_key
94+
):
95+
"""
96+
Create a Datadog subsegment to pass the Datadog trace context or Lambda function
97+
tags into its metadata field, so the X-Ray trace can be converted to a Datadog
98+
trace in the Datadog backend with the correct context.
99+
"""
100+
xray_recorder.begin_subsegment(XraySubsegment.NAME)
101+
subsegment = xray_recorder.current_subsegment()
102+
subsegment.put_metadata(
103+
subsegment_metadata_key, subsegment_metadata_value, XraySubsegment.NAMESPACE
104+
)
105+
xray_recorder.end_subsegment()
106+
107+
93108
def extract_context_from_lambda_context(lambda_context):
94109
"""
95110
Extract Datadog trace context from the `client_context` attr
@@ -156,12 +171,9 @@ def extract_dd_trace_context(event, lambda_context):
156171
157172
Write the context to a global `dd_trace_context`, so the trace
158173
can be continued on the outgoing requests with the context injected.
159-
160-
Save the context to an X-Ray subsegment's metadata field, so the X-Ray
161-
trace can be converted to a Datadog trace in the Datadog backend with
162-
the correct context.
163174
"""
164175
global dd_trace_context
176+
trace_context_source = None
165177

166178
if "headers" in event:
167179
(
@@ -187,19 +199,16 @@ def extract_dd_trace_context(event, lambda_context):
187199
"parent-id": parent_id,
188200
"sampling-priority": sampling_priority,
189201
}
190-
xray_recorder.begin_subsegment(XraySubsegment.NAME)
191-
subsegment = xray_recorder.current_subsegment()
192-
193-
subsegment.put_metadata(XraySubsegment.KEY, metadata, XraySubsegment.NAMESPACE)
194202
dd_trace_context = metadata.copy()
195-
dd_trace_context["source"] = TraceContextSource.EVENT
196-
xray_recorder.end_subsegment()
203+
trace_context_source = TraceContextSource.EVENT
197204
else:
198205
# AWS Lambda runtime caches global variables between invocations,
199206
# reset to avoid using the context from the last invocation.
200207
dd_trace_context = _get_xray_trace_context()
208+
if dd_trace_context:
209+
trace_context_source = TraceContextSource.XRAY
201210
logger.debug("extracted dd trace context %s", dd_trace_context)
202-
return dd_trace_context
211+
return dd_trace_context, trace_context_source
203212

204213

205214
def get_dd_trace_context():
@@ -300,15 +309,20 @@ def is_lambda_context():
300309
return type(xray_recorder.context) == LambdaContext
301310

302311

303-
def set_dd_trace_py_root(trace_context, merge_xray_traces):
304-
if trace_context["source"] == TraceContextSource.EVENT or merge_xray_traces:
312+
def set_dd_trace_py_root(trace_context_source, merge_xray_traces):
313+
if trace_context_source == TraceContextSource.EVENT or merge_xray_traces:
305314
headers = get_dd_trace_context()
306315
span_context = propagator.extract(headers)
307316
tracer.context_provider.activate(span_context)
308317

309318

310319
def create_function_execution_span(
311-
context, function_name, is_cold_start, trace_context, merge_xray_traces
320+
context,
321+
function_name,
322+
is_cold_start,
323+
trace_context_source,
324+
merge_xray_traces,
325+
trigger_tags,
312326
):
313327
tags = {}
314328
if context:
@@ -325,10 +339,9 @@ def create_function_execution_span(
325339
"datadog_lambda": datadog_lambda_version,
326340
"dd_trace": ddtrace_version,
327341
}
328-
source = trace_context["source"]
329-
if source == TraceContextSource.XRAY and merge_xray_traces:
330-
tags["_dd.parent_source"] = source
331-
342+
if trace_context_source == TraceContextSource.XRAY and merge_xray_traces:
343+
tags["_dd.parent_source"] = trace_context_source
344+
tags.update(trigger_tags)
332345
args = {
333346
"service": "aws.lambda",
334347
"resource": function_name,

datadog_lambda/trigger.py

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
# Unless explicitly stated otherwise all files in this repository are licensed
2+
# under the Apache License Version 2.0.
3+
# This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
# Copyright 2019 Datadog, Inc.
5+
6+
import base64
7+
import gzip
8+
import json
9+
from io import BytesIO, BufferedReader
10+
11+
12+
EVENT_SOURCES = [
13+
"aws:dynamodb",
14+
"aws:kinesis",
15+
"aws:s3",
16+
"aws:sns",
17+
"aws:sqs",
18+
]
19+
20+
21+
def get_aws_partition_by_region(region):
22+
if region.startswith("us-gov-"):
23+
return "aws-us-gov"
24+
if region.startswith("cn-"):
25+
return "aws-cn"
26+
return "aws"
27+
28+
29+
def get_first_record(event):
30+
records = event.get("Records")
31+
if records and len(records) > 0:
32+
return records[0]
33+
34+
35+
def parse_event_source(event):
36+
"""Determines the source of the trigger event
37+
38+
Possible Returns:
39+
api-gateway | application-load-balancer | cloudwatch-logs |
40+
cloudwatch-events | cloudfront | dynamodb | kinesis | s3 | sns | sqs
41+
"""
42+
event_source = event.get("eventSource") or event.get("EventSource")
43+
44+
request_context = event.get("requestContext")
45+
if request_context and request_context.get("stage"):
46+
event_source = "api-gateway"
47+
48+
if request_context and request_context.get("elb"):
49+
event_source = "application-load-balancer"
50+
51+
if event.get("awslogs"):
52+
event_source = "cloudwatch-logs"
53+
54+
event_detail = event.get("detail")
55+
cw_event_categories = event_detail and event_detail.get("EventCategories")
56+
if event.get("source") == "aws.events" or cw_event_categories:
57+
event_source = "cloudwatch-events"
58+
59+
event_record = get_first_record(event)
60+
if event_record:
61+
event_source = event_record.get("eventSource") or event_record.get(
62+
"EventSource"
63+
)
64+
if event_record.get("cf"):
65+
event_source = "cloudfront"
66+
67+
if event_source in EVENT_SOURCES:
68+
event_source = event_source.replace("aws:", "")
69+
return event_source
70+
71+
72+
def parse_event_source_arn(source, event, context):
73+
"""
74+
Parses the trigger event for an available ARN. If an ARN field is not provided
75+
in the event we stitch it together.
76+
"""
77+
split_function_arn = context.invoked_function_arn.split(":")
78+
region = split_function_arn[3]
79+
account_id = split_function_arn[4]
80+
aws_arn = get_aws_partition_by_region(region)
81+
82+
event_record = get_first_record(event)
83+
# e.g. arn:aws:s3:::lambda-xyz123-abc890
84+
if source == "s3":
85+
return event_record.get("s3")["bucket"]["arn"]
86+
87+
# e.g. arn:aws:sns:us-east-1:123456789012:sns-lambda
88+
if source == "sns":
89+
return event_record.get("Sns")["TopicArn"]
90+
91+
# e.g. arn:aws:cloudfront::123456789012:distribution/ABC123XYZ
92+
if source == "cloudfront":
93+
distribution_id = event_record.get("cf")["config"]["distributionId"]
94+
return "arn:{}:cloudfront::{}:distribution/{}".format(
95+
aws_arn, account_id, distribution_id
96+
)
97+
98+
# e.g. arn:aws:apigateway:us-east-1::/restapis/xyz123/stages/default
99+
if source == "api-gateway":
100+
request_context = event.get("requestContext")
101+
return "arn:{}:apigateway:{}::/restapis/{}/stages/{}".format(
102+
aws_arn, region, request_context["apiId"], request_context["stage"]
103+
)
104+
105+
# e.g. arn:aws:elasticloadbalancing:us-east-1:123456789012:targetgroup/lambda-xyz/123
106+
if source == "application-load-balancer":
107+
request_context = event.get("requestContext")
108+
return request_context.get("elb")["targetGroupArn"]
109+
110+
# e.g. arn:aws:logs:us-west-1:123456789012:log-group:/my-log-group-xyz
111+
if source == "cloudwatch-logs":
112+
with gzip.GzipFile(
113+
fileobj=BytesIO(base64.b64decode(event["awslogs"]["data"]))
114+
) as decompress_stream:
115+
data = b"".join(BufferedReader(decompress_stream))
116+
logs = json.loads(data)
117+
log_group = logs.get("logGroup", "cloudwatch")
118+
return "arn:{}:logs:{}:{}:log-group:{}".format(
119+
aws_arn, region, account_id, log_group
120+
)
121+
122+
# e.g. arn:aws:events:us-east-1:123456789012:rule/my-schedule
123+
if source == "cloudwatch-events" and event.get("resources"):
124+
return event.get("resources")[0]
125+
126+
127+
def get_event_source_arn(source, event, context):
128+
event_source_arn = event.get("eventSourceARN") or event.get("eventSourceArn")
129+
130+
event_record = get_first_record(event)
131+
if event_record:
132+
event_source_arn = event_record.get("eventSourceARN") or event_record.get(
133+
"eventSourceArn"
134+
)
135+
136+
if event_source_arn is None:
137+
event_source_arn = parse_event_source_arn(source, event, context)
138+
139+
return event_source_arn
140+
141+
142+
def extract_http_tags(event):
143+
"""
144+
Extracts HTTP facet tags from the triggering event
145+
"""
146+
http_tags = {}
147+
request_context = event.get("requestContext")
148+
path = event.get("path")
149+
method = event.get("httpMethod")
150+
if request_context and request_context.get("stage"):
151+
if request_context.get("domainName"):
152+
http_tags["http.url"] = request_context["domainName"]
153+
154+
path = request_context.get("path")
155+
method = request_context.get("httpMethod")
156+
# Version 2.0 HTTP API Gateway
157+
apigateway_v2_http = request_context.get("http")
158+
if event.get("version") == "2.0" and apigateway_v2_http:
159+
path = apigateway_v2_http.get("path")
160+
method = apigateway_v2_http.get("method")
161+
162+
if path:
163+
http_tags["http.url_details.path"] = path
164+
if method:
165+
http_tags["http.method"] = method
166+
167+
headers = event.get("headers")
168+
if headers and headers.get("Referer"):
169+
http_tags["http.referer"] = headers["Referer"]
170+
171+
return http_tags
172+
173+
174+
def extract_trigger_tags(event, context):
175+
"""
176+
Parses the trigger event object to get tags to be added to the span metadata
177+
"""
178+
trigger_tags = {}
179+
event_source = parse_event_source(event)
180+
if event_source:
181+
trigger_tags["function_trigger.event_source"] = event_source
182+
183+
event_source_arn = get_event_source_arn(event_source, event, context)
184+
if event_source_arn:
185+
trigger_tags["function_trigger.event_source_arn"] = event_source_arn
186+
187+
if event_source in ["api-gateway", "application-load-balancer"]:
188+
trigger_tags.update(extract_http_tags(event))
189+
190+
return trigger_tags
191+
192+
193+
def extract_http_status_code_tag(trigger_tags, response):
194+
"""
195+
If the Lambda was triggered by API Gateway or ALB add the returned status code
196+
as a tag to the function execution span.
197+
"""
198+
is_http_trigger = trigger_tags and (
199+
trigger_tags.get("function_trigger.event_source") == "api-gateway"
200+
or trigger_tags.get("function_trigger.event_source")
201+
== "application-load-balancer"
202+
)
203+
if not is_http_trigger:
204+
return
205+
206+
status_code = "200"
207+
if response is None:
208+
# Return a 502 status if no response is found
209+
status_code = "502"
210+
elif response.get("statusCode"):
211+
status_code = response.get("statusCode")
212+
213+
return status_code

0 commit comments

Comments
 (0)
0