diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index e9924c21..ed327ecd 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -59,7 +59,7 @@ def _set_dsm_context_for_record(record, type, arn): return carrier_get = _create_carrier_get(context_json) - set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False) + set_consume_checkpoint(type, arn, carrier_get) except Exception as e: logger.error(f"Unable to set dsm context: {e}") @@ -70,6 +70,8 @@ def _get_dsm_context_from_lambda(message): - message.messageAttributes._datadog.stringValue (SQS -> lambda) - message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda) - message.kinesis.data.decode()._datadog (Kinesis -> lambda) + - message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw) + - message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda) """ context_json = None message_body = message @@ -83,9 +85,19 @@ def _get_dsm_context_from_lambda(message): except (ValueError, TypeError, KeyError): logger.debug("Unable to parse kinesis data for lambda message") return None - - if "Sns" in message: + elif "Sns" in message: message_body = message["Sns"] + else: + try: + body = message.get("body") + if body: + parsed_body = json.loads(body) + if "MessageAttributes" in parsed_body: + message_body = parsed_body + except (ValueError, TypeError): + logger.debug( + "Unable to parse lambda message body as JSON, treat as non-json" + ) message_attributes = message_body.get("MessageAttributes") or message_body.get( "messageAttributes" @@ -108,6 +120,11 @@ def _get_dsm_context_from_lambda(message): elif "stringValue" in datadog_attr: # SQS -> lambda context_json = json.loads(datadog_attr["stringValue"]) + elif "binaryValue" in datadog_attr: + # SNS -> SQS -> lambda, raw message delivery + context_json = json.loads( + base64.b64decode(datadog_attr["binaryValue"]).decode() + ) else: logger.debug("DataStreams did not handle lambda message: %r", message) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 4099fa81..e41621d4 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -493,6 +493,74 @@ def test_kinesis_to_lambda_format(self): assert result["x-datadog-parent-id"] == "888777666" assert result["dd-pathway-ctx"] == "test-pathway-ctx" + def test_sns_to_sqs_to_lambda_binary_value_format(self): + """Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)""" + trace_context = { + "x-datadog-trace-id": "777666555", + "x-datadog-parent-id": "444333222", + "dd-pathway-ctx": "test-pathway-ctx", + } + binary_data = base64.b64encode( + json.dumps(trace_context).encode("utf-8") + ).decode("utf-8") + + lambda_record = { + "messageId": "test-message-id", + "receiptHandle": "test-receipt-handle", + "body": "Test message body", + "messageAttributes": { + "_datadog": {"binaryValue": binary_data, "dataType": "Binary"} + }, + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue", + } + + result = _get_dsm_context_from_lambda(lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "777666555" + assert result["x-datadog-parent-id"] == "444333222" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_sns_to_sqs_to_lambda_body_format(self): + """Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)""" + trace_context = { + "x-datadog-trace-id": "123987456", + "x-datadog-parent-id": "654321987", + "x-datadog-sampling-priority": "1", + "dd-pathway-ctx": "test-pathway-ctx", + } + + message_body = { + "Type": "Notification", + "MessageId": "test-message-id", + "Message": "Test message from SNS", + "MessageAttributes": { + "_datadog": { + "Type": "Binary", + "Value": base64.b64encode( + json.dumps(trace_context).encode("utf-8") + ).decode("utf-8"), + } + }, + } + + lambda_record = { + "messageId": "lambda-message-id", + "body": json.dumps(message_body), + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue", + } + + result = _get_dsm_context_from_lambda(lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "123987456" + assert result["x-datadog-parent-id"] == "654321987" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + def test_no_message_attributes(self): """Test message without MessageAttributes returns None.""" message = {