8000 DynamoDB: Improve global table replicas by viren-nadkarni · Pull Request #8549 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Save additional attributes for replicated tables
  • Loading branch information
viren-nadkarni committed Jun 23, 2023
commit 66331ccf2e89d6e3817bdd1eeb54b7617f46e413
20 changes: 11 additions & 9 deletions localstack/services/dynamodb/models.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
from typing import Dict, Set
from typing import Dict

from localstack.aws.api.dynamodb import RegionName, ReplicaDescription, TableName
from localstack.services.stores import (
AccountRegionBundle,
BaseStore,
CrossRegionAttribute,
LocalAttribute,
)

TableName = str
Region = str
Replica = Dict[Region, Set[Region]]


class DynamoDBStore(BaseStore):
# maps global table names to configurations
# maps global table names to configurations (for the legacy v.2017 tables)
GLOBAL_TABLES: Dict[str, Dict] = CrossRegionAttribute(default=dict)

# Maps table name to the region they exist in on DDBLocal (for v.2019 global tables)
TABLE_REGION: Dict[TableName, RegionName] = CrossRegionAttribute(default=dict)
Copy link
Member
@giograno giograno Jun 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could use the standard collection for type hints.


# Maps the table replicas (for v.2019 global tables)
REPLICAS: Dict[TableName, Dict[RegionName, ReplicaDescription]] = CrossRegionAttribute(
default=dict
)

# cache table taggings - maps table ARN to tags dict
TABLE_TAGS: Dict[str, Dict] = CrossRegionAttribute(default=dict)

Expand All @@ -25,9 +30,6 @@ class DynamoDBStore(BaseStore):
# maps table names to additional table properties that are not stored upstream (e.g., ReplicaUpdates)
table_properties: Dict[str, Dict] = LocalAttribute(default=dict)

# maps the replicas for the v.2019 tables
REPLICA_UPDATES: Dict[TableName, Replica] = CrossRegionAttribute(default=dict)

# maps table names to TTL specifications
ttl_specifications: Dict[str, Dict] = LocalAttribute(default=dict)

Expand Down
75 changes: 42 additions & 33 deletions localstack/services/dynamodb/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ def create_table(

backend = get_store(context.account_id, context.region)
backend.table_definitions[table_name] = table_definitions = dict(create_table_input)
backend.TABLE_REGION[table_name] = context.region

if "TableId" not in table_definitions:
table_definitions["TableId"] = long_uid()
Expand Down Expand Up @@ -536,7 +537,10 @@ def delete_table(self, context: RequestContext, table_name: TableName) -> Delete
table_arn = result.get("TableDescription", {}).get("TableArn")
table_arn = self.fix_table_arn(table_arn)
dynamodbstreams_api.delete_streams(table_arn)
get_store(context.account_id, context.region).TABLE_TAGS.pop(table_arn, None)

store = get_store(context.account_id, context.region)
store.TABLE_TAGS.pop(table_arn, None)
store.REPLICAS.pop(table_name, None)

return result

Expand All @@ -555,22 +559,22 @@ def describe_table(self, context: RequestContext, table_name: TableName) -> Desc
store = get_store(context.account_id, context.region)

# Update replication details
replicas: dict[str, set[str]] = store.REPLICA_UPDATES.get(table_name, {})
replicas: Dict[TableName, RegionName] = store.REPLICAS.get(table_name, {})

replica_description_list = []
for source_region, replicated_regions in replicas.items():
# The replica in the region being queries must not be returned
if source_region != context.region:
replica_description_list.append(
ReplicaDescription(RegionName=source_region, ReplicaStatus=ReplicaStatus.ACTIVE)

if global_table_region != context.region:
replica_description_list.append(
ReplicaDescription(
RegionName=global_table_region, ReplicaStatus=ReplicaStatus.ACTIVE
)
for replicated_region in replicated_regions:
if replicated_region != context.region:
replica_description_list.append(
ReplicaDescription(
RegionName=replicated_region, ReplicaStatus=ReplicaStatus.ACTIVE
)
)
)

for replica_region, replica_description in replicas.items():
# The replica in the region being queried must not be returned
if replica_region != context.region:
replica_description_list.append(replica_description)

table_description.update({"Replicas": replica_description_list})

# update only TableId and SSEDescription if present
Expand Down Expand Up @@ -612,8 +616,7 @@ def update_table(
store = get_store(context.account_id, global_table_region)

# Dict with source region to set of replicated regions
replicas: dict[str, set(str)] = store.REPLICA_UPDATES.get(table_name, {})
replicas.setdefault(global_table_region, set())
replicas: Dict[RegionName, ReplicaDescription] = 8000 store.REPLICAS.get(table_name, {})

for replica_update in replica_updates:
for key, details in replica_update.items():
Expand All @@ -626,23 +629,28 @@ def update_table(

match key:
case "Create":
if target_region in replicas[global_table_region]:
if target_region in replicas.keys():
raise ValidationException(
f"Failed to create a the new replica of table with name: '{table_name}' because one or more replicas already existed as tables."
)
replicas[global_table_region].add(target_region)
replicas[target_region] = ReplicaDescription(
RegionName=target_region,
KMSMasterKeyId=details.get("KMSMasterKeyId"),
ProvisionedThroughputOverride=details.get(
"ProvisionedThroughputOverride"
),
GlobalSecondaryIndexes=details.get("GlobalSecondaryIndexes"),
ReplicaStatus=ReplicaStatus.ACTIVE,
)
case "Delete":
try:
replicas[global_table_region].remove(target_region)
if len(replicas[global_table_region]) == 0:
# Removing the set indicates that replication is disabled
replicas.pop(global_table_region)
replicas.pop(target_region)
except KeyError:
raise ValidationException(
"Update global table operation failed because one or more replicas were not part of the global table."
)

store.REPLICA_UPDATES[table_name] = replicas
store.REPLICAS[table_name] = replicas

# update response content
schema = SchemaExtractor.get_table_schema(
Expand All @@ -667,10 +675,10 @@ def list_tables(
response = self.forward_request(context)

# Add replicated tables
replicas = get_store(context.account_id, context.region).REPLICA_UPDATES
replicas = get_store(context.account_id, context.region).REPLICAS
for replicated_table, replications in replicas.items 936E ():
for original_region, replicated_regions in replications.items():
if context.region in replicated_regions:
for replica_region, replica_description in replications.items():
if context.region == replica_region:
response["TableNames"].append(replicated_table)

return response
Expand Down Expand Up @@ -1347,13 +1355,14 @@ def get_global_table_region(context: RequestContext, table_name: str) -> str:
:param table_name: table name
:return: region
"""
replicas = get_store(context.account_id, context.region).REPLICA_UPDATES.get(table_name)
if replicas:
global_table_region = list(replicas.keys())[0]
replicated_at = replicas[global_table_region]
# Ensure that a replica exists in the current context region, and that the table exists in DDB Local
if context.region == global_table_region or context.region in replicated_at:
return global_table_region
store = get_store(context.account_id, context.region)

table_region = store.TABLE_REGION.get(table_name)
replicated_at = store.REPLICAS.get(table_name, {}).keys()

if context.region == table_region or context.region in replicated_at:
return table_region

return context.region

@staticmethod
Expand Down
0