8000 fixes · DataDog/datadog-lambda-python@bbe50f5 · GitHub
[go: up one dir, main page]

Skip to content

Commit bbe50f5

Browse files
fixes
1 parent 583865a commit bbe50f5

File tree

2 files changed

+57
-11
lines changed

2 files changed

+57
-11
lines changed

datadog_lambda/dsm.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -85,29 +85,36 @@ def _dsm_set_sns_context(event):
8585
def _dsm_set_kinesis_context(event):
8686
from ddtrace.internal.datastreams.botocore import calculate_kinesis_payload_size
8787

88-
def kinesis_payload_calculator(record, context_json):
89-
return calculate_kinesis_payload_size(record)
90-
91-
def kinesis_arn_extractor(record):
92-
arn = record.get("eventSourceARN")
93-
if arn is None:
94-
return ""
95-
return arn
88+
records = event.get("Records")
89+
if records is None:
90+
return
9691

97-
_dsm_set_context_helper(
98-
event, "kinesis", kinesis_arn_extractor, kinesis_payload_calculator
99-
)
92+
for record in records:
93+
arn = record.get("eventSourceARN", "")
94+
payload_size = calculate_kinesis_payload_size(record)
95+
_dsm_set_context_helper(record, "kinesis", arn, payload_size)
10096

10197

10298
def _get_dsm_context_from_lambda(message):
10399
"""
104100
Lambda-specific message formats:
105101
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
106102
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
103+
- message.kinesis.data.decode()._datadog (Kinesis -> lambda)
107104
"""
108105
context_json = None
109106
message_body = message
110107

108+
if "kinesis" in message:
109+
try:
110+
kinesis_data = json.loads(
111+
base64.b64decode(message["kinesis"]["data"]).decode()
112+
)
113+
return kinesis_data.get("_datadog")
114+
except (ValueError, TypeError, KeyError):
115+
log.debug("Unable to parse kinesis data for lambda message")
116+
return None
117+
111118
if "Sns" in message:
112119
message_body = message["Sns"]
113120

tests/test_dsm.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,45 @@ def test_sns_to_lambda_format(self):
386386
assert result["x-datadog-parent-id"] == "222222222"
387387
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
388388

389+
def test_kinesis_to_lambda_format(self):
390+
"""Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)"""
391+
trace_context = {
392+
"x-datadog-trace-id": "555444333",
393+
"x-datadog-parent-id": "888777666",
394+
"dd-pathway-ctx": "test-pathway-ctx",
395+
}
396+
397+
# Create the kinesis data payload
398+
kinesis_payload = {
399+
"_datadog": trace_context,
400+
"actualData": "some business data",
401+
}
402+
encoded_kinesis_data = base64.b64encode(
403+
json.dumps(kinesis_payload).encode("utf-8")
404+
).decode("utf-8")
405+
406+
kinesis_lambda_record = {
407+
"eventSource": "aws:kinesis",
408+
"eventSourceARN": (
409+
"arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
410+
),
411+
"kinesis": {
412+
"data": encoded_kinesis_data,
413+
"partitionKey": "partition-key-1",
414+
"sequenceNumber": (
415+
"49590338271490256608559692538361571095921575989136588898"
416+
),
417+
},
418+
}
419+
420+
result = _get_dsm_context_from_lambda(kinesis_lambda_record)
421+
422+
assert result is not None
423+
assert result == trace_context
424+
assert result["x-datadog-trace-id"] == "555444333"
425+
assert result["x-datadog-parent-id"] == "888777666"
426+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
427+
389428
def test_no_message_attributes(self):
390429
"""Test message without MessageAttributes returns None."""
391430
message = {

0 commit comments

Comments
 (0)
0