8000 DynamoDB: Improve global table replicas (#8549) · codeperl/localstack@a5e443b · GitHub
[go: up one dir, main page]

Skip to content

Commit a5e443b

Browse files
DynamoDB: Improve global table replicas (localstack#8549)
1 parent 577f0dc commit a5e443b

File tree

3 files changed

+76
-59
lines changed

3 files changed

+76
-59
lines changed
Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,38 @@
1-
from typing import Dict, Set
2-
1+
from localstack.aws.api.dynamodb import RegionName, ReplicaDescription, TableName
32
from localstack.services.stores import (
43
AccountRegionBundle,
54
BaseStore,
65
CrossRegionAttribute,
76
LocalAttribute,
87
)
98

10-
TableName = str
11-
Region = str
12-
Replica = Dict[Region, Set[Region]]
13-
149

1510
class DynamoDBStore(BaseStore):
16-
# maps global table names to configurations
17-
GLOBAL_TABLES: Dict[str, Dict] = CrossRegionAttribute(default=dict)
11+
# maps global table names to configurations (for the legacy v.2017 tables)
12+
GLOBAL_TABLES: dict[str, dict] = CrossRegionAttribute(default=dict)
13+
14+
# Maps table name to the region they exist in on DDBLocal (for v.2019 global tables)
15+
TABLE_REGION: dict[TableName, RegionName] = CrossRegionAttribute(default=dict)
16+
17+
# Maps the table replicas (for v.2019 global tables)
18+
REPLICAS: dict[TableName, dict[RegionName, ReplicaDescription]] = CrossRegionAttribute(
19+
default=dict
20+
)
1821

1922
# cache table taggings - maps table ARN to tags dict
20-
TABLE_TAGS: Dict[str, Dict] = CrossRegionAttribute(default=dict)
23+
TABLE_TAGS: dict[str, dict] = CrossRegionAttribute(default=dict)
2124

2225
# maps table names to cached table definitions
23-
table_definitions: Dict[str, Dict] = LocalAttribute(default=dict)
26+
table_definitions: dict[str, dict] = LocalAttribute(default=dict)
2427

2528
# maps table names to additional table properties that are not stored upstream (e.g., ReplicaUpdates)
26-
table_properties: Dict[str, Dict] = LocalAttribute(default=dict)
27-
28-
# maps the replicas for the v.2019 tables
29-
REPLICA_UPDATES: Dict[TableName, Replica] = CrossRegionAttribute(default=dict)
29+
table_properties: dict[str, dict] = LocalAttribute(default=dict)
3030

3131
# maps table names to TTL specifications
32-
ttl_specifications: Dict[str, Dict] = LocalAttribute(default=dict)
32+
ttl_specifications: dict[str, dict] = LocalAttribute(default=dict)
3333

3434
# maps backups
35-
backups: Dict[str, Dict] = LocalAttribute(default=dict)
35+
backups: dict[str, dict] = LocalAttribute(default=dict)
3636

3737

3838
dynamodb_stores = AccountRegionBundle("dynamodb", DynamoDBStore)

localstack/services/dynamodb/provider.py

Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -338,11 +338,14 @@ def modify_context_region(context: RequestContext, region: str):
338338
flags=re.IGNORECASE,
339339
)
340340

341-
yield context
342-
343-
# revert the original context
344-
context.region = original_region
345-
context.request.headers["Authorization"] = original_authorization
341+
try:
342+
yield context
343+
except Exception:
344+
raise
345+
finally:
346+
# revert the original context
347+
context.region = original_region
348+
context.request.headers["Authorization"] = original_authorization
346349

347350

348351
class DynamoDBProvider(DynamodbApi, ServiceLifecycleHook):
@@ -486,6 +489,7 @@ def create_table(
486489

487490
backend = get_store(context.account_id, context.region)
488491
backend.table_definitions[table_name] = table_definitions = dict(create_table_input)
492+
backend.TABLE_REGION[table_name] = context.region
489493

490494
if "TableId" not in table_definitions:
491495
table_definitions["TableId"] = long_uid()
@@ -533,7 +537,10 @@ def delete_table(self, context: RequestContext, table_name: TableName) -> Delete
533537
table_arn = result.get("TableDescription", {}).get("TableArn")
534538
table_arn = self.fix_table_arn(table_arn)
535539
dynamodbstreams_api.delete_streams(table_arn)
536-
get_store(context.account_id, context.region).TABLE_TAGS.pop(table_arn, None)
540+
541+
store = get_store(context.account_id, context.region)
542+
store.TABLE_TAGS.pop(table_arn, None)
543+
store.REPLICAS.pop(table_name, None)
537544

538545
return result
539546

@@ -552,21 +559,22 @@ def describe_table(self, context: RequestContext, table_name: TableName) -> Desc
552559
store = get_store(context.account_id, context.region)
553560

554561
# Update replication details
555-
replicas: dict[str, set[str]] = store.REPLICA_UPDATES.get(table_name, {})
562+
replicas: Dict[RegionName, ReplicaDescription] = store.REPLICAS.get(table_name, {})
556563

557564
replica_description_list = []
558-
for source_region, replicated_regions in replicas.items():
559-
# Contrary to AWS, we show all regions including the current context region where a replica exists
560-
# This is due to the limitation of internal request forwarding mechanism for global tables
565+
566+
if global_table_region != context.region:
561567
replica_description_list.append(
562-
ReplicaDescription(RegionName=source_region, ReplicaStatus=ReplicaStatus.ACTIVE)
563-
)
564-
for replicated_region in replicated_regions:
565-
replica_description_list.append(
566-
ReplicaDescription(
567-
RegionName=replicated_region, ReplicaStatus=ReplicaStatus.ACTIVE
568-
)
568+
ReplicaDescription(
569+
RegionName=global_table_region, ReplicaStatus=ReplicaStatus.ACTIVE
569570
)
571+
)
572+
573+
for replica_region, replica_description in replicas.items():
574+
# The replica in the region being queried must not be returned
575+
if replica_region != context.region:
576+
replica_description_list.append(replica_description)
577+
570578
table_description.update({"Replicas": replica_description_list})
571579

572580
# update only TableId and SSEDescription if present
@@ -608,8 +616,7 @@ def update_table(
608616
store = get_store(context.account_id, global_table_region)
609617

610618
# Dict with source region to set of replicated regions
611-
replicas: dict[str, set(str)] = store.REPLICA_UPDATES.get(table_name, {})
612-
replicas.setdefault(global_table_region, set())
619+
replicas: Dict[RegionName, ReplicaDescription] = store.REPLICAS.get(table_name, {})
613620

614621
for replica_update in replica_updates:
615622
for key, details in replica_update.items():
@@ -622,23 +629,28 @@ def update_table(
622629

623630
match key:
624631
case "Create":
625-
if target_region in replicas[global_table_region]:
632+
if target_region in replicas.keys():
626633
raise ValidationException(
627634
f"Failed to create a the new replica of table with name: '{table_name}' because one or more replicas already existed as tables."
628635
)
629-
replicas[global_table_region].add(target_region)
636+
replicas[target_region] = ReplicaDescription(
637+
RegionName=target_region,
638+
KMSMasterKeyId=details.get("KMSMasterKeyId"),
639+
ProvisionedThroughputOverride=details.get(
640+
"ProvisionedThroughputOverride"
641+
),
642+
GlobalSecondaryIndexes=details.get("GlobalSecondaryIndexes"),
643+
ReplicaStatus=ReplicaStatus.ACTIVE,
644+
)
630645
case "Delete":
631646
try:
632-
replicas[global_table_region].remove(target_region)
633-
if len(replicas[global_table_region]) == 0:
634-
# Removing the set indicates that replication is disabled
635-
replicas.pop(global_table_region)
647+
replicas.pop(target_region)
636648
except KeyError:
637649
raise ValidationException(
638650
"Update global table operation failed because one or more replicas were not part of the global table."
639651
)
640652

641-
store.REPLICA_UPDATES[table_name] = replicas
653+
store.REPLICAS[table_name] = replicas
642654

643655
# update response content
644656
schema = SchemaExtractor.get_table_schema(
@@ -663,10 +675,10 @@ def list_tables(
663675
response = self.forward_request(context)
664676

665677
# Add replicated tables
666-
replicas = get_store(context.account_id, context.region).REPLICA_UPDATES
678+
replicas = get_store(context.account_id, context.region).REPLICAS
667679
for replicated_table, replications in replicas.items():
668-
for original_region, replicated_regions in replications.items():
669-
if context.region in replicated_regions:
680+
for replica_region, replica_description in replications.items():
681+
if context.region == replica_region:
670682
response["TableNames"].append(replicated_table)
671683

672684
return response
@@ -1343,13 +1355,14 @@ def get_global_table_region(context: RequestContext, table_name: str) -> str:
13431355
:param table_name: table name
13441356
:return: region
13451357
"""
1346-
replicas = get_store(context.account_id, context.region).REPLICA_UPDATES.get(table_name)
1347-
if replicas:
1348-
global_table_region = list(replicas.keys())[0]
1349-
replicated_at = replicas[global_table_region]
1350-
# Ensure that a replica exists in the current context region, and that the table exists in DDB Local
1351-
if context.region == global_table_region or context.region in replicated_at:
1352-
return global_table_region
1358+
store = get_store(context.account_id, context.region)
1359+
1360+
table_region = store.TABLE_REGION.get(table_name)
1361+
replicated_at = store.REPLICAS.get(table_name, {}).keys()
1362+
1363+
if context.region == table_region or context.region in replicated_at:
1364+
return table_region
1365+
13531366
return context.region
13541367

13551368
@staticmethod

tests/integration/test_dynamodb.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -919,19 +919,23 @@ def test_global_tables_version_2019(
919919

920920
# Replicate table in US and EU
921921
dynamodb_ap_south_1.update_table(
922-
TableName=table_name, ReplicaUpdates=[{"Create": {"RegionName": "us-east-1"}}]
922+
TableName=table_name,
923+
ReplicaUpdates=[{"Create": {"RegionName": "us-east-1", "KMSMasterKeyId": "foo"}}],
923924
)
924925
dynamodb_ap_south_1.update_table(
925-
TableName=table_name, ReplicaUpdates=[{"Create": {"RegionName": "eu-west-1"}}]
926+
TableName=table_name,
927+
ReplicaUpdates=[{"Create": {"RegionName": "eu-west-1", "KMSMasterKeyId": "bar"}}],
926928
)
927929

928930
# Ensure all replicas can be described
929931
response = dynamodb_ap_south_1.describe_table(TableName=table_name)
930-
assert len(response["Table"]["Replicas"]) == 3
932+
assert len(response["Table"]["Replicas"]) == 2
931933
response = dynamodb_us_east_1.describe_table(TableName=table_name)
932-
assert len(response["Table"]["Replicas"]) == 3
934+
assert len(response["Table"]["Replicas"]) == 2
935+
assert "bar" in [replica.get("KMSMasterKeyId") for replica in response["Table"]["Replicas"]]
933936
response = dynamodb_eu_west_1.describe_table(TableName=table_name)
934-
assert len(response["Table"]["Replicas"]) == 3
937+
assert len(response["Table"]["Replicas"]) == 2
938+
assert "foo" in [replica.get("KMSMasterKeyId") for replica in response["Table"]["Replicas"]]
935939
with pytest.raises(Exception) as exc:
936940
dynamodb_sa_east_1.describe_table(TableName=table_name)
937941
exc.match("ResourceNotFoundException")
@@ -986,9 +990,9 @@ def test_global_tables_version_2019(
986990

987991
# Ensure replica details are updated in other regions
988992
response = dynamodb_us_east_1.describe_table(TableName=table_name)
989-
assert len(response["Table"]["Replicas"]) == 2
993+
assert len(response["Table"]["Replicas"]) == 1
990994
response = dynamodb_ap_south_1.describe_table(TableName=table_name)
991-
assert len(response["Table"]["Replicas"]) == 2
995+
assert len(response["Table"]["Replicas"]) == 1
992996

993997
# Ensure removing the last replica disables global table
994998
dynamodb_us_east_1.update_table(

0 commit comments

Comments
 (0)
0