8000 feat: add sns->sqs->lambda support by michael-zhao459 · Pull Request #618 · DataDog/datadog-lambda-python · GitHub
[go: up one dir, main page]

Skip to content
8000

feat: add sns->sqs->lambda support #618

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: michael.zhao/lambda-kinesis
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions datadog_lambda/dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def _set_dsm_context_for_record(record, type, arn):
return

carrier_get = _create_carrier_get(context_json)
set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False)
set_consume_checkpoint(type, arn, carrier_get)
except Exception as e:
logger.error(f"Unable to set dsm context: {e}")

Expand All @@ -70,6 +70,8 @@ def _get_dsm_context_from_lambda(message):
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
- message.kinesis.data.decode()._datadog (Kinesis -> lambda)
- message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)
- message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)
"""
context_json = None
message_body = message
Expand All @@ -83,9 +85,19 @@ def _get_dsm_context_from_lambda(message):
except (ValueError, TypeError, KeyError):
logger.debug("Unable to parse kinesis data for lambda message")
return None

if "Sns" in message:
elif "Sns" in message:
message_body = message["Sns"]
else:
try:
body = message.get("body")
if body:
parsed_body = json.loads(body)
if "MessageAttributes" in parsed_body:
message_body = parsed_body
except (ValueError, TypeError):
logger.debug(
"Unable to parse lambda message body as JSON, treat as non-json"
)

message_attributes = message_body.get("MessageAttributes") or message_body.get(
"messageAttributes"
Expand All @@ -108,6 +120,11 @@ def _get_dsm_context_from_lambda(message):
elif "stringValue" in datadog_attr:
# SQS -> lambda
context_json = json.loads(datadog_attr["stringValue"])
elif "binaryValue" in datadog_attr:
# SNS -> SQS -> lambda, raw message delivery
context_json = json.loads(
base64.b64decode(datadog_attr["binaryValue"]).decode()
)
else:
logger.debug("DataStreams did not handle lambda message: %r", message)

Expand Down
68 changes: 68 additions & 0 deletions tests/test_dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,74 @@ def test_kinesis_to_lambda_format(self):
assert result["x-datadog-parent-id"] == "888777666"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_sqs_to_lambda_binary_value_format(self):
"""Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)"""
trace_context = {
"x-datadog-trace-id": "777666555",
"x-datadog-parent-id": "444333222",
"dd-pathway-ctx": "test-pathway-ctx",
}
binary_data = base64.b64encode(
json.dumps(trace_context).encode("utf-8")
).decode("utf-8")

lambda_record = {
"messageId": "test-message-id",
"receiptHandle": "test-receipt-handle",
"body": "Test message body",
"messageAttributes": {
"_datadog": {"binaryValue": binary_data, "dataType": "Binary"}
},
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue",
}

result = _get_dsm_context_from_lambda(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "777666555"
assert result["x-datadog-parent-id"] == "444333222"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_sqs_to_lambda_body_format(self):
"""Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "123987456",
"x-datadog-parent-id": "654321987",
"x-datadog-sampling-priority": "1",
"dd-pathway-ctx": "test-pathway-ctx",
}

message_body = {
"Type": "Notification",
"MessageId": "test-message-id",
"Message": "Test message from SNS",
"MessageAttributes": {
"_datadog": {
"Type": "Binary",
"Value": base64.b64encode(
json.dumps(trace_context).encode("utf-8")
).decode("utf-8"),
}
},
}

lambda_record = {
"messageId": "lambda-message-id",
"body": json.dumps(message_body),
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue",
}

result = _get_dsm_context_from_lambda(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "123987456"
assert result["x-datadog-parent-id"] == "654321987"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_no_message_attributes(self):
"""Test message without MessageAttributes returns None."""
message = {
Expand Down
0