8000 Feature/DynamoDBStreams: decompose process records (#11821) · localstack/localstack@59d4794 · GitHub
[go: up one dir, main page]

Skip to content

Commit 59d4794

Browse files
authored
Feature/DynamoDBStreams: decompose process records (#11821)
1 parent 6ffc483 commit 59d4794

File tree

2 files changed

+66
-55
lines changed

2 files changed

+66
-55
lines changed

localstack-core/localstack/services/dynamodb/provider.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,15 +208,23 @@ def forward_to_targets(
208208
self, account_id: str, region_name: str, records_map: RecordsMap, background: bool = True
209209
) -> None:
210210
if background:
211-
self.executor.submit(
212-
self._forward,
211+
self._submit_records(
213212
account_id=account_id,
214213
region_name=region_name,
215214
records_map=records_map,
216215
)
217216
else:
218217
self._forward(account_id, region_name, records_map)
219218

219+
def _submit_records(self, account_id: str, region_name: str, records_map: RecordsMap):
220+
"""Required for patching submit with local thread context for EventStudio"""
221+
self.executor.submit(
222+
self._forward,
223+
account_id,
224+
region_name,
225+
records_map,
226+
)
227+
220228
def _forward(self, account_id: str, region_name: str, records_map: RecordsMap) -> None:
221229
try:
222230
self.forward_to_kinesis_stream(account_id, region_name, records_map)

localstack-core/localstack/services/dynamodbstreams/dynamodbstreams_api.py

Lines changed: 56 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -87,63 +87,66 @@ def get_stream_for_table(account_id: str, region_name: str, table_arn: str) -> d
8787
return store.ddb_streams.get(table_name)
8888

8989

90+
def _process_forwarded_records(
91+
account_id: str, region_name: str, table_name: TableName, table_records: dict, kinesis
92+
) -> None:
93+
records = table_records["records"]
94+
stream_type = table_records["table_stream_type"]
95+
# if the table does not have a DynamoDB Streams enabled, skip publishing anything
96+
if not stream_type.stream_view_type:
97+
return
98+
99+
# in this case, Kinesis forces the record to have both OldImage and NewImage, so we need to filter it
100+
# as the settings are different for DDB Streams and Kinesis
101+
if stream_type.is_kinesis and stream_type.stream_view_type != StreamViewType.NEW_AND_OLD_IMAGES:
102+
kinesis_records = []
103+
104+
# StreamViewType determines what information is written to the stream for the table
105+
# When an item in the table is inserted, updated or deleted
106+
image_filter = set()
107+
if stream_type.stream_view_type == StreamViewType.KEYS_ONLY:
108+
image_filter = {"OldImage", "NewImage"}
109+
elif stream_type.stream_view_type == StreamViewType.OLD_IMAGE:
110+
image_filter = {"NewImage"}
111+
elif stream_type.stream_view_type == StreamViewType.NEW_IMAGE:
112+
image_filter = {"OldImage"}
113+
114+
for record in records:
115+
record["dynamodb"] = {
116+
k: v for k, v in record["dynamodb"].items() if k not in image_filter
117+
}
118+
119+
if "SequenceNumber" not in record["dynamodb"]:
120+
record["dynamodb"]["SequenceNumber"] = str(
121+
get_and_increment_sequence_number_counter()
122+
)
123+
124+
kinesis_records.append({"Data": dumps(record), "PartitionKey": "TODO"})
125+
126+
else:
127+
kinesis_records = []
128+
for record in records:
129+
if "SequenceNumber" not in record["dynamodb"]:
130+
# we can mutate the record for SequenceNumber, the Kinesis forwarding takes care of filtering it
131+
record["dynamodb"]["SequenceNumber"] = str(
132+
get_and_increment_sequence_number_counter()
133+
)
134+
135+
# simply pass along the records, they already have the right format
136+
kinesis_records.append({"Data": dumps(record), "PartitionKey": "TODO"})
137+
138+
stream_name = get_kinesis_stream_name(table_name)
139+
kinesis.put_records(
140+
StreamName=stream_name,
141+
Records=kinesis_records,
142+
)
143+
144+
90145
def forward_events(account_id: str, region_name: str, records_map: dict[TableName, dict]) -> None:
91146
kinesis = get_kinesis_client(account_id, region_name)
92147

93148
for table_name, table_records in records_map.items():
94-
records = table_records["records"]
95-
stream_type = table_records["table_stream_type"]
96-
# if the table does not have a DynamoDB Streams enabled, skip publishing anything
97-
if not stream_type.stream_view_type:
98-
continue
99-
100-
# in this case, Kinesis forces the record to have both OldImage and NewImage, so we need to filter it
101-
# as the settings are different for DDB Streams and Kinesis
102-
if (
103-
stream_type.is_kinesis
104-
and stream_type.stream_view_type != StreamViewType.NEW_AND_OLD_IMAGES
105-
):
106-
kinesis_records = []
107-
108-
# StreamViewType determines what information is written to the stream for the table
109-
# When an item in the table is inserted, updated or deleted
110-
image_filter = set()
111-
if stream_type.stream_view_type == StreamViewType.KEYS_ONLY:
112-
image_filter = {"OldImage", "NewImage"}
113-
elif stream_type.stream_view_type == StreamViewType.OLD_IMAGE:
114-
image_filter = {"NewImage"}
115-
elif stream_type.stream_view_type == StreamViewType.NEW_IMAGE:
116-
image_filter = {"OldImage"}
117< A3E2 code class="diff-text syntax-highlighted-line deletion">-
118-
for record in records:
119-
record["dynamodb"] = {
120-
k: v for k, v in record["dynamodb"].items() if k not in image_filter
121-
}
122-
123-
if "SequenceNumber" not in record["dynamodb"]:
124-
record["dynamodb"]["SequenceNumber"] = str(
125-
get_and_increment_sequence_number_counter()
126-
)
127-
128-
kinesis_records.append({"Data": dumps(record), "PartitionKey": "TODO"})
129-
130-
else:
131-
kinesis_records = []
132-
for record in records:
133-
if "SequenceNumber" not in record["dynamodb"]:
134-
# we can mutate the record for SequenceNumber, the Kinesis forwarding takes care of filtering it
135-
record["dynamodb"]["SequenceNumber"] = str(
136-
get_and_increment_sequence_number_counter()
137-
)
138-
139-
# simply pass along the records, they already have the right format
140-
kinesis_records.append({"Data": dumps(record), "PartitionKey": "TODO"})
141-
142-
stream_name = get_kinesis_stream_name(table_name)
143-
kinesis.put_records(
144-
StreamName=stream_name,
145-
Records=kinesis_records,
146-
)
149+
_process_forwarded_records(account_id, region_name, table_name, table_records, kinesis)
147150

148151

149152
def delete_streams(account_id: str, region_name: str, table_arn: str) -> None:

0 commit comments

Comments
 (0)
0