8000 add kinesis -> lambda support · DataDog/datadog-lambda-python@43cf0d9 · GitHub
[go: up one dir, main page]

Skip to content

Commit 43cf0d9

Browse files
add kinesis -> lambda support
1 parent ecae035 commit 43cf0d9

File tree

2 files changed

+113
-0
lines changed

2 files changed

+113
-0
lines changed

datadog_lambda/dsm.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ def set_dsm_context(event, event_source):
1313
_dsm_set_sqs_context(event)
1414
elif event_source.equals(EventTypes.SNS):
1515
_dsm_set_sns_context(event)
16+
elif event_source.equals(EventTypes.KINESIS):
17+
_dsm_set_kinesis_context(event)
1618

1719

1820
def _dsm_set_context_helper(
@@ -78,6 +80,23 @@ def sns_arn_extractor(record):
7880
_dsm_set_context_helper(event, "sns", sns_arn_extractor, sns_payload_calculator)
7981

8082

83+
def _dsm_set_kinesis_context(event):
84+
from ddtrace.internal.datastreams.botocore import calculate_kinesis_payload_size
85+
86+
def< 8000 /span> kinesis_payload_calculator(record, context_json):
87+
return calculate_kinesis_payload_size(record)
88+
89+
def kinesis_arn_extractor(record):
90+
arn = record.get("eventSourceARN")
91+
if arn is None:
92+
return ""
93+
return arn
94+
95+
_dsm_set_context_helper(
96+
event, "kinesis", kinesis_arn_extractor, kinesis_payload_calculator
97+
)
98+
99+
81100
def _get_dsm_context_from_lambda(message):
82101
"""
83102
Lambda-specific message formats:

tests/test_dsm.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
set_dsm_context,
88
_dsm_set_sqs_context,
99
_dsm_set_sns_context,
10+
_dsm_set_kinesis_context,
1011
_get_dsm_context_from_lambda,
1112
)
1213
from datadog_lambda.trigger import EventTypes, _EventSource
@@ -22,6 +23,10 @@ def setUp(self):
2223
self.mock_dsm_set_sns_context = patcher.start()
2324
self.addCleanup(patcher.stop)
2425

26+
patcher = patch("datadog_lambda.dsm._dsm_set_kinesis_context")
27+
self.mock_dsm_set_kinesis_context = patcher.start()
28+
self.addCleanup(patcher.stop)
29+
2530
patcher = patch("ddtrace.internal.datastreams.data_streams_processor")
2631
self.mock_data_streams_processor = patcher.start()
2732
self.addCleanup(patcher.stop)
@@ -45,6 +50,13 @@ def setUp(self):
4550
self.mock_calculate_sns_payload_size.return_value = 150
4651
self.addCleanup(patcher.stop)
4752

53+
patcher = patch(
54+
"ddtrace.internal.datastreams.botocore.calculate_kinesis_payload_size"
55+
)
56+
self.mock_calculate_kinesis_payload_size = patcher.start()
57+
self.mock_calculate_kinesis_payload_size.return_value = 200
58+
self.addCleanup(patcher.stop)
59+
4860
patcher = patch("ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode")
4961
self.mock_dsm_pathway_codec_decode = patcher.start()
5062
self.addCleanup(patcher.stop)
@@ -207,6 +219,88 @@ def test_sns_multiple_records_process_each_record(self):
207219
self.assertIn("type:sns", tags)
208220
self.assertEqual(kwargs["payload_size"], 150)
209221

222+
def test_kinesis_event_with_no_records_does_nothing(self):
223+
"""Test that events where Records is None don't trigger DSM processing"""
224+
events_with_no_records = [
225+
{},
226+
{"Records": None},
227+
{"someOtherField": "value"},
228+
]
229+
230+
for event in events_with_no_records:
231+
_dsm_set_kinesis_context(event)
232+
self.mock_data_streams_processor.assert_not_called()
233+
234+
def test_kinesis_event_triggers_dsm_kinesis_context(self):
235+
"""Test that Kinesis event sources trigger the Kinesis-specific DSM context function"""
236+
kinesis_event = {
237+
"Records": [
238+
{
239+
"eventSource": "aws:kinesis",
240+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
241+
"kinesis": {
242+
"data": "SGVsbG8gZnJvbSBLaW5lc2lzIQ==",
243+
"partitionKey": "partition-key",
244+
},
245+
}
246+
]
247+
}
248+
249+
event_source = _EventSource(EventTypes.KINESIS)
250+
set_dsm_context(kinesis_event, event_source)
251+
252+
self.mock_dsm_set_kinesis_context.assert_called_once_with(kinesis_event)
253+
254+
def test_kinesis_multiple_records_process_each_record(self):
255+
"""Test that each record in a Kinesis event gets processed individually"""
256+
multi_record_event = {
257+
"Records": [
258+
{
259+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream1",
260+
"kinesis": {
261+
"data": "TWVzc2FnZSAx",
262+
"partitionKey": "partition-1",
263+
},
264+
},
265+
{
266+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream2",
267+
"kinesis": {
268+
"data": "TWVzc2FnZSAy",
269+
"partitionKey": "partition-2",
270+
},
271+
},
272+
{
273+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream3",
274+
"kinesis": {
275+
"data": "TWVzc2FnZSAz",
276+
"partitionKey": "partition-3",
277+
},
278+
},
279+
]
280+
}
281+
282+
mock_context = MagicMock()
283+
self.mock_dsm_pathway_codec_decode.return_value = mock_context
284+
285+
_dsm_set_kinesis_context(multi_record_event)
286+
287+
self.assertEqual(mock_context.set_checkpoint.call_count, 3)
288+
289+
calls = mock_context.set_checkpoint.call_args_list
290+
expected_arns = [
291+
"arn:aws:kinesis:us-east-1:123456789012:stream/stream1",
292+
"arn:aws:kinesis:us-east-1:123456789012:stream/stream2",
293+
"arn:aws:kinesis:us-east-1:123456789012:stream/stream3",
294+
]
295+
296+
for i, call in enumerate(calls):
297+
args, kwargs = call
298+
tags = args[0]
299+
self.assertIn("direction:in", tags)
300+
self.assertIn(f"topic:{expected_arns[i]}", tags)
301+
self.assertIn("type:kinesis", tags)
302+
self.assertEqual(kwargs["payload_size"], 200)
303+
210304

211305
class TestGetDSMContext(unittest.TestCase):
212306
def test_sqs_to_lambda_string_value_format(self):

0 commit comments

Comments
 (0)
0