@@ -87,63 +87,66 @@ def get_stream_for_table(account_id: str, region_name: str, table_arn: str) -> d
87
87
return store .ddb_streams .get (table_name )
88
88
89
89
90
+ def _process_forwarded_records (
91
+ account_id : str , region_name : str , table_name : TableName , table_records : dict , kinesis
92
+ ) -> None :
93
+ records = table_records ["records" ]
94
+ stream_type = table_records ["table_stream_type" ]
95
+ # if the table does not have a DynamoDB Streams enabled, skip publishing anything
96
+ if not stream_type .stream_view_type :
97
+ return
98
+
99
+ # in this case, Kinesis forces the record to have both OldImage and NewImage, so we need to filter it
100
+ # as the settings are different for DDB Streams and Kinesis
101
+ if stream_type .is_kinesis and stream_type .stream_view_type != StreamViewType .NEW_AND_OLD_IMAGES :
102
+ kinesis_records = []
103
+
104
+ # StreamViewType determines what information is written to the stream for the table
105
+ # When an item in the table is inserted, updated or deleted
106
+ image_filter = set ()
107
+ if stream_type .stream_view_type == StreamViewType .KEYS_ONLY :
108
+ image_filter = {"OldImage" , "NewImage" }
109
+ elif stream_type .stream_view_type == StreamViewType .OLD_IMAGE :
110
+ image_filter = {"NewImage" }
111
+ elif stream_type .stream_view_type == StreamViewType .NEW_IMAGE :
112
+ image_filter = {"OldImage" }
113
+
114
+ for record in records :
115
+ record ["dynamodb" ] = {
116
+ k : v for k , v in record ["dynamodb" ].items () if k not in image_filter
117
+ }
118
+
119
+ if "SequenceNumber" not in record ["dynamodb" ]:
120
+ record ["dynamodb" ]["SequenceNumber" ] = str (
121
+ get_and_increment_sequence_number_counter ()
122
+ )
123
+
124
+ kinesis_records .append ({"Data" : dumps (record ), "PartitionKey" : "TODO" })
125
+
126
+ else :
127
+ kinesis_records = []
128
+ for record in records :
129
+ if "SequenceNumber" not in record ["dynamodb" ]:
130
+ # we can mutate the record for SequenceNumber, the Kinesis forwarding takes care of filtering it
131
+ record ["dynamodb" ]["SequenceNumber" ] = str (
132
+ get_and_increment_sequence_number_counter ()
8000
div>
133
+ )
134
+
135
+ # simply pass along the records, they already have the right format
136
+ kinesis_records .append ({"Data" : dumps (record ), "PartitionKey" : "TODO" })
137
+
138
+ stream_name = get_kinesis_stream_name (table_name )
139
+ kinesis .put_records (
140
+ StreamName = stream_name ,
141
+ Records = kinesis_records ,
142
+ )
143
+
144
+
90
145
def forward_events (account_id : str , region_name : str , records_map : dict [TableName , dict ]) -> None :
91
146
kinesis = get_kinesis_client (account_id , region_name )
92
147
93
148
for table_name , table_records in records_map .items ():
94
- records = table_records ["records" ]
95
- stream_type = table_records ["table_stream_type" ]
96
- # if the table does not have a DynamoDB Streams enabled, skip publishing anything
97
- if not stream_type .stream_view_type :
98
- continue
99
-
100
- # in this case, Kinesis forces the record to have both OldImage and NewImage, so we need to filter it
101
- # as the settings are different for DDB Streams and Kinesis
102
- if (
103
- stream_type .is_kinesis
104
- and stream_type .stream_view_type != StreamViewType .NEW_AND_OLD_IMAGES
105
- ):
106
- kinesis_records = []
107
-
108
- # StreamViewType determines what information is written to the stream for the table
109
- # When an item in the table is inserted, updated or deleted
110
- image_filter = set ()
111
- if stream_type .stream_view_type == StreamViewType .KEYS_ONLY :
112
- image_filter = {"OldImage" , "NewImage" }
113
- elif stream_type .stream_view_type == StreamViewType .OLD_IMAGE :
114
- image_filter = {"NewImage" }
115
- elif stream_type .stream_view_type == StreamViewType .NEW_IMAGE :
116
- image_filter = {"OldImage" }
117
<
A3E2
code class="diff-text syntax-highlighted-line deletion">-
118
- for record in records :
119
- record ["dynamodb" ] = {
120
- k : v for k , v in record ["dynamodb" ].items () if k not in image_filter
121
- }
122
-
123
- if "SequenceNumber" not in record ["dynamodb" ]:
124
- record ["dynamodb" ]["SequenceNumber" ] = str (
125
- get_and_increment_sequence_number_counter ()
126
- )
127
-
128
- kinesis_records .append ({"Data" : dumps (record ), "PartitionKey" : "TODO" })
129
-
130
- else :
131
- kinesis_records = []
132
- for record in records :
133
- if "SequenceNumber" not in record ["dynamodb" ]:
134
- # we can mutate the record for SequenceNumber, the Kinesis forwarding takes care of filtering it
135
- record ["dynamodb" ]["SequenceNumber" ] = str (
136
- get_and_increment_sequence_number_counter ()
137
- )
138
-
139
- # simply pass along the records, they already have the right format
140
- kinesis_records .append ({"Data" : dumps (record ), "PartitionKey" : "TODO" })
141
-
142
- stream_name = get_kinesis_stream_name (table_name )
143
- kinesis .put_records (
144
- StreamName = stream_name ,
145
- Records = kinesis_records ,
146
- )
149
+ _process_forwarded_records (account_id , region_name , table_name , table_records , kinesis )
147
150
148
151
149
152
def delete_streams (account_id : str , region_name : str , table_arn : str ) -> None :
0 commit comments