1
1
import json
2
+
2
3
import pytest
4
+
3
5
from localstack import config
4
6
from localstack .utils .event_matcher import matches_event
5
7
6
-
7
8
EVENT_PATTERN_DICT = {
8
9
"source" : ["aws.ec2" ],
9
- "detail-type" : ["EC2 Instance State-change Notification" ]
10
+ "detail-type" : ["EC2 Instance State-change Notification" ],
10
11
}
11
12
EVENT_DICT = {
12
13
"source" : "aws.ec2" ,
13
14
"detail-type" : "EC2 Instance State-change Notification" ,
14
- "detail" : {"state" : "running" }
15
+ "detail" : {"state" : "running" },
15
16
}
16
17
EVENT_PATTERN_STR = json .dumps (EVENT_PATTERN_DICT )
17
18
EVENT_STR = json .dumps (EVENT_DICT )
@@ -30,41 +31,38 @@ def _set_engine(engine: str):
30
31
def test_matches_event_with_java_engine_strings (event_rule_engine ):
31
32
"""Test Java engine with string inputs (EventBridge case)"""
32
33
event_rule_engine ("java" )
33
- assert matches_event (EVENT_PATTERN_STR , EVENT_STR ) == True
34
+ assert matches_event (EVENT_PATTERN_STR , EVENT_STR )
34
35
35
36
36
37
def test_matches_event_with_java_engine_dicts (event_rule_engine ):
37
38
"""Test Java engine with dict inputs (ESM/Pipes case)"""
38
39
event_rule_engine ("java" )
39
- assert matches_event (EVENT_PATTERN_DICT , EVENT_DICT ) == True
40
+ assert matches_event (EVENT_PATTERN_DICT , EVENT_DICT )
40
41
41
42
42
43
def test_matches_event_with_python_engine_strings (event_rule_engine ):
43
44
"""Test Python engine with string inputs"""
44
45
event_rule_engine ("python" )
45
- assert matches_event (EVENT_PATTERN_STR , EVENT_STR ) == True
46
+ assert matches_event (EVENT_PATTERN_STR , EVENT_STR )
46
47
47
48
48
49
def test_matches_event_with_python_engine_dicts (event_rule_engine ):
49
50
"""Test Python engine with dict inputs"""
50
51
event_rule_engine ("python" )
51
- assert matches_event (EVENT_PATTERN_DICT , EVENT_DICT ) == True
52
+ assert matches_event (EVENT_PATTERN_DICT , EVENT_DICT )
52
53
53
54
54
55
def test_matches_event_mixed_inputs (event_rule_engine ):
55
56
"""Test with mixed string/dict inputs"""
56
57
event_rule_engine ("java" )
57
- assert matches_event (EVENT_PATTERN_STR , EVENT_DICT ) == True
58
- assert matches_event (EVENT_PATTERN_DICT , EVENT_STR ) == True
58
+ assert matches_event (EVENT_PATTERN_STR , EVENT_DICT )
59
+ assert matches_event (EVENT_PATTERN_DICT , EVENT_STR )
59
60
60
61
61
62
def test_matches_event_non_matching_pattern ():
62
63
"""Test with non-matching pattern"""
63
- non_matching_pattern = {
64
- "source" : ["aws.s3" ],
65
- "detail-type" : ["S3 Event" ]
66
- }
67
- assert matches_event (non_matching_pattern , EVENT_DICT ) == False
64
+ non_matching_pattern = {"source" : ["aws.s3" ], "detail-type" : ["S3 Event" ]}
65
+ assert not matches_event (non_matching_pattern , EVENT_DICT )
68
66
69
67
70
68
def test_matches_event_invalid_json ():
@@ -76,35 +74,40 @@ def test_matches_event_invalid_json():
76
74
def test_matches_event_missing_fields ():
77
75
"""Test with missing required fields"""
78
76
incomplete_event = {"source" : "aws.ec2" }
79
- assert matches_event (EVENT_PATTERN_DICT , incomplete_event ) == False
77
+ assert not matches_event (EVENT_PATTERN_DICT , incomplete_event )
80
78
81
79
82
80
def test_matches_event_pattern_matching ():
83
- """Test various pattern matching scenarios based on AWS examples"""
81
+ """Test various pattern matching scenarios based on AWS examples
82
+
83
+ Examples taken from:
84
+ - EventBridge: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns-content-based-filtering.html
85
+ - SNS Filtering: https://docs.aws.amazon.com/sns/latest/dg/sns-subscription-filter-policies.html
86
+ """
84
87
test_cases = [
85
88
# Exact matching
86
89
(
87
90
{"source" : ["aws.ec2" ], "detail-type" : ["EC2 Instance State-change Notification" ]},
88
91
{"source" : "aws.ec2" , "detail-type" : "EC2 Instance State-change Notification" },
89
- True
92
+ True ,
90
93
),
91
94
# Prefix matching in detail field
92
95
(
93
96
{"source" : ["aws.ec2" ], "detail" : {"state" : [{"prefix" : "run" }]}},
94
97
{"source" : "aws.ec2" , "detail" : {"state" : "running" }},
95
- True
98
+ True ,
96
99
),
97
100
# Multiple possible values
98
101
(
99
102
{"source" : ["aws.ec2" ], "detail" : {"state" : ["pending" , "running" ]}},
100
103
{"source" : "aws.ec2" , "detail" : {"state" : "running" }},
101
- True
104
+ True ,
102
105
),
103
106
# Anything-but matching
104
107
(
105
108
{"source" : ["aws.ec2" ], "detail" : {"state" : [{"anything-but" : "terminated" }]}},
106
109
{"source" : "aws.ec2" , "detail" : {"state" : "running" }},
107
- True
110
+ True ,
108
111
),
109
112
]
110
113
@@ -116,6 +119,6 @@ def test_matches_event_case_sensitivity():
116
119
"""Test case sensitivity in matching"""
117
120
case_different_event = {
118
121
"source" : "AWS.ec2" ,
119
- "detail-type" : "EC2 Instance State-Change Notification"
122
+ "detail-type" : "EC2 Instance State-Change Notification" ,
120
123
}
121
- assert matches_event (EVENT_PATTERN_DICT , case_different_event ) == False
124
+ assert not matches_event (EVENT_PATTERN_DICT , case_different_event )
0 commit comments