1
1
from typing import Dict
2
2
3
- from localstack .aws .api .route53resolver import FirewallRuleGroup , ResourceNotFoundException
3
+ from localstack .aws .api .route53resolver import (
4
+ FirewallConfig ,
5
+ FirewallRuleGroup ,
6
+ ResourceNotFoundException ,
7
+ )
4
8
from localstack .services .generic_proxy import RegionBackend
9
+ from localstack .services .route53resolver .utils import get_firewall_config_id , validate_vpc
5
10
from localstack .utils .aws import aws_stack
6
11
7
12
@@ -15,94 +20,151 @@ def __init__(self):
15
20
self .firewall_domains = {}
16
21
self .firewall_rules = {}
17
22
self .firewall_rule_group_associations = {}
18
-
19
-
20
- ## helper functions for the backend
21
- def get_firewall_rule_group (id ):
22
- """returns firewall rule group with the given id if it exists"""
23
- region_details = Route53ResolverBackend .get ()
24
- firewall_rule_group = region_details .firewall_rule_groups .get (id )
25
- if not firewall_rule_group :
26
- raise ResourceNotFoundException (
27
- f"Can't find the resource with ID '{ id } '. Trace Id: '{ aws_stack .get_trace_id ()} '"
23
+ self .resolver_query_log_configs = {}
24
+ self .resolver_query_log_config_associations = {}
25
+ self .firewall_configs = {}
26
+
27
+ ## helper functions for the backend
28
+ def get_firewall_rule_group (self , id ):
29
+ """returns firewall rule group with the given id if it exists"""
30
+
31
+ firewall_rule_group = self .firewall_rule_groups .get (id )
32
+ if not firewall_rule_group :
33
+ raise ResourceNotFoundException (
34
+ f"Can't find the resource with ID '{ id } '. Trace Id: '{ aws_stack .get_trace_id ()} '"
35
+ )
36
+ return firewall_rule_group
37
+
38
+ def delete_firewall_rule_group (self , id ):
39
+ """deletes the firewall rule group with the given id"""
40
+ # if firewall_rule_groups doesn't exist it will throw an error
41
+
42
+ firewall_rule_group = self .get_firewall_rule_group (id )
43
+ self .firewall_rule_groups .pop (id )
44
+ return firewall_rule_group
45
+
46
+ def get_firewall_rule_group_association (self , id ):
47
+ """returns firewall rule group association with the given id if it exists"""
48
+
49
+ firewall_rule_group_association = self .firewall_rule_group_associations .get (id )
50
+ if not firewall_rule_group_association :
51
+ raise ResourceNotFoundException (
52
+ f"[RSLVR-02025] Can't find the resource with ID '{ id } '. Trace Id: '{ aws_stack .get_trace_id ()} '"
53
+ )
54
+ return self .firewall_rule_group_associations .get (id )
55
+
56
+ def delete_firewall_rule_group_association (self , id ):
57
+ """deletes the firewall rule group association with the given id"""
58
+ # if firewall_rule_group_associations doesn't exist it will throw an error
59
+
60
+ firewall_rule_group_associations = self .get_firewall_rule_group_association (id )
61
+ self .firewall_rule_group_associations .pop (id )
62
+ return firewall_rule_group_associations
63
+
64
+ def get_firewall_domain (self , d ):
65
+ """returns firewall domain with the given id if it exists"""
66
+ # firewall_domain can return none
67
+
68
+ firewall_domain = self .firewall_domains .get (id )
69
+ return firewall_domain
70
+
71
+ def get_firewall_domain_list (self , id ):
72
+ """returns firewall domain list with the given id if it exists"""
73
+
74
+ firewall_domain_list = self .firewall_domain_lists .get (id )
75
+ if not firewall_domain_list :
76
+ raise ResourceNotFoundException (
77
+ f"Can't find the resource with ID '{ id } '. Trace Id: '{ aws_stack .get_trace_id ()} '"
78
+ )
79
+ return firewall_domain_list
80
+
81
+ def delete_firewall_domain_list (self , id ):
82
+ """deletes the firewall domain list with the given id"""
83
+ # if firewall_domain_lists doesn't exist it will throw an error
84
+
85
+ firewall_domain_list = self .get_firewall_domain_list (id )
86
+ self .firewall_domain_lists .pop (id )
87
+ return firewall_domain_list
88
+
89
+ def get_firewall_rule (self , firewall_rule_group_id , firewall_domain_list_id ):
90
+ """returns firewall rule with the given id if it exists"""
91
+
92
+ firewall_rule = self .firewall_rules .get (firewall_rule_group_id , {}).get (
93
+ firewall_domain_list_id
28
94
)
29
- return firewall_rule_group
30
-
31
-
32
- def delete_firewall_rule_group (id ):
33
- """deletes the firewall rule group with the given id"""
34
- # if firewall_rule_groups doesn't exist it will throw an error
35
- region_details = Route53ResolverBackend .get ()
36
- firewall_rule_group = get_firewall_rule_group (id )
37
- region_details .firewall_rule_groups .pop (id )
38
- return firewall_rule_group
39
-
40
-
41
- def get_firewall_rule_group_association (id ):
42
- """returns firewall rule group association with the given id if it exists"""
43
- region_details = Route53ResolverBackend .get ()
44
- firewall_rule_group_association = region_details .firewall_rule_group_associations .get (id )
45
- if not firewall_rule_group_association :
46
- raise ResourceNotFoundException (
47
- f"[RSLVR-02025] Can't find the resource with ID '{ id } '. Trace Id: '{ aws_stack .get_trace_id ()} '"
48
- )
49
- return region_details .firewall_rule_group_associations .get (id )
50
-
51
-
52
- def delete_firewall_rule_group_association (id ):
53
- """deletes the firewall rule group association with the given id"""
54
- # if firewall_rule_group_associations doesn't exist it will throw an error
55
- region_details = Route53ResolverBackend .get ()
56
- firewall_rule_group_associations = get_firewall_rule_group_association (id )
57
- region_details .firewall_rule_group_associations .pop (id )
58
- return firewall_rule_group_associations
59
-
60
-
61
- def get_firewall_domain (id ):
62
- """returns firewall domain with the given id if it exists"""
63
- # firewall_domain can return none
64
- region_details = Route53ResolverBackend .get ()
65
- firewall_domain = region_details .firewall_domains .get (id )
66
- return firewall_domain
67
-
68
-
69
- def get_firewall_domain_list (id ):
70
- """returns firewall domain list with the given id if it exists"""
71
- region_details = Route53ResolverBackend .get ()
72
- firewall_domain_list = region_details .firewall_domain_lists .get (id )
73
- if not firewall_domain_list :
74
- raise ResourceNotFoundException (
75
- f"Can't find the resource with ID '{ id } '. Trace Id: '{ aws_stack .get_trace_id ()} '"
76
- )
77
- return firewall_domain_list
78
-
79
-
80
- def delete_firewall_domain_list (id ):
81
- """deletes the firewall domain list with the given id"""
82
- # if firewall_domain_lists doesn't exist it will throw an error
83
- region_details = Route53ResolverBackend .get ()
84
- firewall_domain_list = get_firewall_domain_list (id )
85
- region_details .firewall_domain_lists .pop (id )
86
- return firewall_domain_list
87
-
88
-
89
- def get_firewall_rule (firewall_rule_group_id , firewall_domain_list_id ):
90
- """returns firewall rule with the given id if it exists"""
91
- region_details = Route53ResolverBackend .get ()
92
- firewall_rule = region_details .firewall_rules .get (firewall_rule_group_id , {}).get (
93
- firewall_domain_list_id
94
- )
95
- if not firewall_rule :
96
- raise ResourceNotFoundException (
97
- f"Can't find the resource with ID '{ firewall_rule_group_id } '. Trace Id: '{ aws_stack .get_trace_id ()} '"
98
- )
99
- return firewall_rule
100
-
101
-
102
- def delete_firewall_rule (firewall_rule_group_id , firewall_domain_list_id ):
103
- """deletes the firewall rule with the given id"""
104
- # if firewall_rules doesn't exist it will throw an error
105
- region_details = Route53ResolverBackend .get ()
106
- firewall_rule = get_firewall_rule (firewall_rule_group_id , firewall_domain_list_id )
107
- region_details .firewall_rules .get (firewall_rule_group_id , {}).pop (firewall_domain_list_id )
108
- return firewall_rule
95
+ if not firewall_rule :
96
+ raise ResourceNotFoundException (
97
+ f"Can't find the resource with ID '{ firewall_rule_group_id } '. Trace Id: '{ aws_stack .get_trace_id ()} '"
98
+ )
99
+ return firewall_rule
100
+
101
+ def delete_firewall_rule (self , firewall_rule_group_id , firewall_domain_list_id ):
102
+ """deletes the firewall rule with the given id"""
103
+ # if firewall_rules doesn't exist it will throw an error
104
+
105
+ firewall_rule = self .get_firewall_rule (firewall_rule_group_id , firewall_domain_list_id )
106
+ self .firewall_rules .get (firewall_rule_group_id , {}).pop (firewall_domain_list_id )
107
+ return firewall_rule
108
+
109
+ def get_resolver_query_log_config (self , id ):
110
+ """returns resolver query log config with the given id if it exists"""
111
+
112
+ resolver_query_log_config = self .resolver_query_log_configs .get (id )
113
+ if not resolver_query_log_config :
114
+ raise ResourceNotFoundException (
115
+ f"[RSLVR-01601] The specified query logging configuration doesn't exist. Trace Id: '{ aws_stack .get_trace_id ()} '"
116
+ )
117
+ return resolver_query_log_config
118
+
119
+ def delete_resolver_query_log_config (self , id ):
120
+ """deletes the resolver query log config with the given id"""
121
+
122
+ self .get_resolver_query_log_config (id )
123
+ resolver_query_log_config = self .resolver_query_log_configs .pop (id )
124
+ return resolver_query_log_config
125
+
126
+ def get_resolver_query_log_config_associations (self , id ):
127
+ """returns resolver query log config association with the given id if it exists"""
128
+
129
+ resolver_query_log_config_association = self .resolver_query_log_config_associations .get (id )
130
+ if not resolver_query_log_config_association :
131
+ raise ResourceNotFoundException (
132
+ f"[RSLVR-01601] The specified query logging configuration doesn't exist. Trace Id: '{ aws_stack .get_trace_id ()} '"
133
+ )
134
+ return resolver_query_log_config_association
135
+
136
+ def delete_resolver_query_log_config_associations (
137
+ self , resolver_query_log_config_id , resource_id
138
+ ):
139
+ """deletes the resolver query log confi
E8C9
g association with the given id and vpc id"""
140
+
141
+ association_id = None
142
+ for association in self .resolver_query_log_config_associations .values ():
143
+ if not (
144
+ association .get ("ResolverQueryLogConfigId" ) == resolver_query_log_config_id
145
+ and association .get ("ResourceId" ) == resource_id
146
+ ):
147
+ raise ResourceNotFoundException (
148
+ f"[RSLVR-01602] The specified query logging configuration association doesn't exist. Trace Id: '{ aws_stack .get_trace_id ()} '"
149
+ )
150
+ association ["Status" ] = "DELETING"
151
+ association_id = association .get ("Id" )
152
+ return self .resolver_query_log_config_associations .pop (association_id )
153
+
154
+ def get_or_create_firewall_config (self , resource_id , region , owner_id ):
155
+ """returns the firewall config with the given id if it exists or creates a new one"""
156
+
157
+ validate_vpc (resource_id , region )
158
+ firewall_config : FirewallConfig
159
+ if self .firewall_configs .get (resource_id ):
160
+ firewall_config = self .firewall_configs [resource_id ]
161
+ else :
162
+ id = get_firewall_config_id ()
163
+ firewall_config = FirewallConfig (
164
+ Id = id ,
165
+ ResourceId = resource_id ,
166
+ OwnerId = owner_id ,
167
+ FirewallFailOpen = "DISABLED" ,
168
+ )
169
+ self .firewall_configs [resource_id ] = firewall_config
170
+ return firewall_config
0 commit comments