7
7
set_dsm_context ,
8
8
_dsm_set_sqs_context ,
9
9
_dsm_set_sns_context ,
10
+ _dsm_set_kinesis_context ,
10
11
_get_dsm_context_from_lambda ,
11
12
)
12
13
from datadog_lambda .trigger import EventTypes , _EventSource
@@ -22,6 +23,10 @@ def setUp(self):
22
23
self .mock_dsm_set_sns_context = patcher .start ()
23
24
self .addCleanup (patcher .stop )
24
25
26
+ patcher = patch ("datadog_lambda.dsm._dsm_set_kinesis_context" )
27
+ self .mock_dsm_set_kinesis_context = patcher .start ()
28
+ self .addCleanup (patcher .stop )
29
+
25
30
patcher = patch ("ddtrace.internal.datastreams.data_streams_processor" )
26
31
self .mock_data_streams_processor = patcher .start ()
27
32
self .addCleanup (patcher .stop )
@@ -45,6 +50,13 @@ def setUp(self):
45
50
self .mock_calculate_sns_payload_size .return_value = 150
46
51
self .addCleanup (patcher .stop )
47
52
53
+ patcher = patch (
54
+ "ddtrace.internal.datastreams.botocore.calculate_kinesis_payload_size"
55
+ )
56
+ self .mock_calculate_kinesis_payload_size = patcher .start ()
57
+ self .mock_calculate_kinesis_payload_size .return_value = 200
58
+ self .addCleanup (patcher .stop )
59
+
48
60
patcher = patch ("ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode" )
49
61
self .mock_dsm_pathway_codec_decode = patcher .start ()
50
62
self .addCleanup (patcher .stop )
@@ -207,6 +219,88 @@ def test_sns_multiple_records_process_each_record(self):
207
219
self .assertIn ("type:sns" , tags )
208
220
self .assertEqual (kwargs ["payload_size" ], 150 )
209
221
222
+ def test_kinesis_event_with_no_records_does_nothing (self ):
223
+ """Test that events where Records is None don't trigger DSM processing"""
224
+ events_with_no_records = [
225
+ {},
226
+ {"Records" : None },
227
+ {"someOtherField" : "value" },
228
+ ]
229
+
230
+ for event in events_with_no_records :
231
+ _dsm_set_kinesis_context (event )
232
+ self .mock_data_streams_processor .assert_not_called ()
233
+
234
+ def test_kinesis_event_triggers_dsm_kinesis_context (self ):
235
+ """Test that Kinesis event sources trigger the Kinesis-specific DSM context function"""
236
+ kinesis_event = {
237
+ "Records" : [
238
+ {
239
+ "eventSource" : "aws:kinesis" ,
240
+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream" ,
241
+ "kinesis" : {
242
+ "data" : "SGVsbG8gZnJvbSBLaW5lc2lzIQ==" ,
243
+ "partitionKey" : "partition-key" ,
244
+ },
245
+ }
246
+ ]
247
+ }
248
+
249
+ event_source = _EventSource (EventTypes .KINESIS )
250
+ set_dsm_context (kinesis_event , event_source )
251
+
252
+ self .mock_dsm_set_kinesis_context .assert_called_once_with (kinesis_event )
253
+
254
+ def test_kinesis_multiple_records_process_each_record (self ):
255
+ """Test that each record in a Kinesis event gets processed individually"""
256
+ multi_record_event = {
257
+ "Records" : [
258
+ {
259
+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/stream1" ,
260
+ "kinesis" : {
261
+ "data" : "TWVzc2FnZSAx" ,
262
+ "partitionKey" : "partition-1" ,
263
+ },
264
+ },
265
+ {
266
+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/stream2" ,
267
+ "kinesis" : {
268
+ "data" : "TWVzc2FnZSAy" ,
269
+ "partitionKey" : "partition-2" ,
270
+ },
271
+ },
272
+ {
273
+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/stream3" ,
274
+ "kinesis" : {
275
+ "data" : "TWVzc2FnZSAz" ,
276
+ "partitionKey" : "partition-3" ,
277
+ },
278
+ },
279
+ ]
280
+ }
281
+
282
+ mock_context = MagicMock ()
283
+ self .mock_dsm_pathway_codec_decode .return_value = mock_context
284
+
285
+ _dsm_set_kinesis_context (multi_record_event )
286
+
287
+ self .assertEqual (mock_context .set_checkpoint .call_count , 3 )
288
+
289
+ calls = mock_context .set_checkpoint .call_args_list
290
+ expected_arns = [
291
+ "arn:aws:kinesis:us-east-1:123456789012:stream/stream1" ,
292
+ "arn:aws:kinesis:us-east-1:123456789012:stream/stream2" ,
293
+ "arn:aws:kinesis:us-east-1:123456789012:stream/stream3" ,
294
+ ]
295
+
296
+ for i , call in enumerate (calls ):
297
+ args , kwargs = call
298
+ tags = args [0 ]
299
+ self .assertIn ("direction:in" , tags )
300
+ self .assertIn (f"topic:{ expected_arns [i ]} " , tags )
301
+ self .assertIn ("type:kinesis" , tags )
302
+ self .assertEqual (kwargs ["payload_size" ], 200 )
303
+
210
304
211
305
class TestGetDSMContext (unittest .TestCase ):
212
306
def test_sqs_to_lambda_string_value_format (self ):
0 commit comments