From 61422ebc9669e9f6cd977d2f47670b0cdda328b2 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 13 Jun 2025 11:12:39 -0400 Subject: [PATCH 1/3] add sns->sqs support --- datadog_lambda/dsm.py | 17 +++++++++-- tests/test_dsm.py | 68 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index e9924c21..957db410 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -70,6 +70,8 @@ def _get_dsm_context_from_lambda(message): - message.messageAttributes._datadog.stringValue (SQS -> lambda) - message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda) - message.kinesis.data.decode()._datadog (Kinesis -> lambda) + - message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw) + - message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda) """ context_json = None message_body = message @@ -83,9 +85,15 @@ def _get_dsm_context_from_lambda(message): except (ValueError, TypeError, KeyError): logger.debug("Unable to parse kinesis data for lambda message") return None - - if "Sns" in message: + elif "Sns" in message: message_body = message["Sns"] + else: + try: + body = message.get("body") + if body: + message_body = json.loads(body) + except (ValueError, TypeError): + logger.debug("Unable to parse lambda message body as JSON, treat as non-json") message_attributes = message_body.get("MessageAttributes") or message_body.get( "messageAttributes" @@ -108,6 +116,11 @@ def _get_dsm_context_from_lambda(message): elif "stringValue" in datadog_attr: # SQS -> lambda context_json = json.loads(datadog_attr["stringValue"]) + elif "binaryValue" in datadog_attr: + # SNS -> SQS -> lambda, raw message delivery + context_json = json.loads( + base64.b64decode(datadog_attr["binaryValue"]).decode() + ) else: logger.debug("DataStreams did not handle lambda message: %r", message) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 4099fa81..e41621d4 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -493,6 +493,74 @@ def test_kinesis_to_lambda_format(self): assert result["x-datadog-parent-id"] == "888777666" assert result["dd-pathway-ctx"] == "test-pathway-ctx" + def test_sns_to_sqs_to_lambda_binary_value_format(self): + """Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)""" + trace_context = { + "x-datadog-trace-id": "777666555", + "x-datadog-parent-id": "444333222", + "dd-pathway-ctx": "test-pathway-ctx", + } + binary_data = base64.b64encode( + json.dumps(trace_context).encode("utf-8") + ).decode("utf-8") + + lambda_record = { + "messageId": "test-message-id", + "receiptHandle": "test-receipt-handle", + "body": "Test message body", + "messageAttributes": { + "_datadog": {"binaryValue": binary_data, "dataType": "Binary"} + }, + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue", + } + + result = _get_dsm_context_from_lambda(lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "777666555" + assert result["x-datadog-parent-id"] == "444333222" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_sns_to_sqs_to_lambda_body_format(self): + """Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)""" + trace_context = { + "x-datadog-trace-id": "123987456", + "x-datadog-parent-id": "654321987", + "x-datadog-sampling-priority": "1", + "dd-pathway-ctx": "test-pathway-ctx", + } + + message_body = { + "Type": "Notification", + "MessageId": "test-message-id", + "Message": "Test message from SNS", + "MessageAttributes": { + "_datadog": { + "Type": "Binary", + "Value": base64.b64encode( + json.dumps(trace_context).encode("utf-8") + ).decode("utf-8"), + } + }, + } + + lambda_record = { + "messageId": "lambda-message-id", + "body": json.dumps(message_body), + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue", + } + + result = _get_dsm_context_from_lambda(lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "123987456" + assert result["x-datadog-parent-id"] == "654321987" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + def test_no_message_attributes(self): """Test message without MessageAttributes returns None.""" message = { From 4c1f65c77d96aeee5448d708aa49f71abea2d13c Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 13 Jun 2025 14:32:08 -0400 Subject: [PATCH 2/3] fix bug preventing sqs -> lambda --- datadog_lambda/dsm.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 957db410..7005eb5a 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -51,7 +51,6 @@ def _dsm_set_kinesis_context(event): def _set_dsm_context_for_record(record, type, arn): from ddtrace.data_streams import set_consume_checkpoint - try: context_json = _get_dsm_context_from_lambda(record) if not context_json: @@ -59,7 +58,7 @@ def _set_dsm_context_for_record(record, type, arn): return carrier_get = _create_carrier_get(context_json) - set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False) + set_consume_checkpoint(type, arn, carrier_get) except Exception as e: logger.error(f"Unable to set dsm context: {e}") @@ -91,7 +90,9 @@ def _get_dsm_context_from_lambda(message): try: body = message.get("body") if body: - message_body = json.loads(body) + parsed_body = json.loads(body) + if "MessageAttributes" in parsed_body: + message_body = parsed_body except (ValueError, TypeError): logger.debug("Unable to parse lambda message body as JSON, treat as non-json") From af6fd0ee4c19a3f76fa5cbff7bc10c3da12b5161 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 13 Jun 2025 14:33:17 -0400 Subject: [PATCH 3/3] formatting --- datadog_lambda/dsm.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 7005eb5a..ed327ecd 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -51,6 +51,7 @@ def _dsm_set_kinesis_context(event): def _set_dsm_context_for_record(record, type, arn): from ddtrace.data_streams import set_consume_checkpoint + try: context_json = _get_dsm_context_from_lambda(record) if not context_json: @@ -94,7 +95,9 @@ def _get_dsm_context_from_lambda(message): if "MessageAttributes" in parsed_body: message_body = parsed_body except (ValueError, TypeError): - logger.debug("Unable to parse lambda message body as JSON, treat as non-json") + logger.debug( + "Unable to parse lambda message body as JSON, treat as non-json" + ) message_attributes = message_body.get("MessageAttributes") or message_body.get( "messageAttributes"