@@ -338,11 +338,14 @@ def modify_context_region(context: RequestContext, region: str):
338
338
flags = re .IGNORECASE ,
339
339
)
340
340
341
- yield context
342
-
343
- # revert the original context
344
- context .region = original_region
345
- context .request .headers ["Authorization" ] = original_authorization
341
+ try :
342
+ yield context
343
+ except Exception :
344
+ raise
345
+ finally :
346
+ # revert the original context
347
+ context .region = original_region
348
+ context .request .headers ["Authorization" ] = original_authorization
346
349
347
350
348
351
class DynamoDBProvider (DynamodbApi , ServiceLifecycleHook ):
@@ -486,6 +489,7 @@ def create_table(
486
489
487
490
backend = get_store (context .account_id , context .region )
488
491
backend .table_definitions [table_name ] = table_definitions = dict (create_table_input )
492
+ backend .TABLE_REGION [table_name ] = context .region
489
493
490
494
if "TableId" not in table_definitions :
491
495
table_definitions ["TableId" ] = long_uid ()
@@ -533,7 +537,10 @@ def delete_table(self, context: RequestContext, table_name: TableName) -> Delete
533
537
table_arn = result .get ("TableDescription" , {}).get ("TableArn" )
534
538
table_arn = self .fix_table_arn (table_arn )
535
539
dynamodbstreams_api .delete_streams (table_arn )
536
- get_store (context .account_id , context .region ).TABLE_TAGS .pop (table_arn , None )
540
+
541
+ store = get_store (context .account_id , context .region )
542
+ store .TABLE_TAGS .pop (table_arn , None )
543
+ store .REPLICAS .pop (table_name , None )
537
544
538
545
return result
539
546
@@ -552,21 +559,22 @@ def describe_table(self, context: RequestContext, table_name: TableName) -> Desc
552
559
store = get_store (context .account_id , context .region )
553
560
554
561
# Update replication details
555
- replicas : dict [ str , set [ str ]] = store .REPLICA_UPDATES .get (table_name , {})
562
+ replicas : Dict [ RegionName , ReplicaDescription ] = store .REPLICAS .get (table_name , {})
556
563
557
564
replica_description_list = []
558
- for source_region , replicated_regions in replicas .items ():
559
- # Contrary to AWS, we show all regions including the current context region where a replica exists
560
- # This is due to the limitation of internal request forwarding mechanism for global tables
565
+
566
+ if global_table_region != context .region :
561
567
replica_description_list .append (
562
- ReplicaDescription (RegionName = source_region , ReplicaStatus = ReplicaStatus .ACTIVE )
563
- )
564
- for replicated_region in replicated_regions :
F438
td>565
- replica_description_list .append (
566
- ReplicaDescription (
567
- RegionName = replicated_region , ReplicaStatus = ReplicaStatus .ACTIVE
568
- )
568
+ ReplicaDescription (
569
+ RegionName = global_table_region , ReplicaStatus = ReplicaStatus .ACTIVE
569
570
)
571
+ )
572
+
573
+ for replica_region , replica_description in replicas .items ():
574
+ # The replica in the region being queried must not be returned
575
+ if replica_region != context .region :
576
+ replica_description_list .append (replica_description )
577
+
570
578
table_description .update ({"Replicas" : replica_description_list })
571
579
572
580
# update only TableId and SSEDescription if present
@@ -608,8 +616,7 @@ def update_table(
608
616
store = get_store (context .account_id , global_table_region )
609
617
610
618
# Dict with source region to set of replicated regions
611
- replicas : dict [str , set (str )] = store .REPLICA_UPDATES .get (table_name , {})
612
- replicas .setdefault (global_table_region , set ())
619
+ replicas : Dict [RegionName , ReplicaDescription ] = store .REPLICAS .get (table_name , {})
613
620
614
621
for replica_update in replica_updates :
615
622
for key , details in replica_update .items ():
@@ -622,23 +629,28 @@ def update_table(
622
629
623
630
match key :
624
631
case "Create" :
625
- if target_region in replicas [ global_table_region ] :
632
+ if target_region in replicas . keys () :
626
633
raise ValidationException (
627
634
f"Failed to create a the new replica of table with name: '{ table_name } ' because one or more replicas already existed as tables."
628
635
)
629
- replicas [global_table_region ].add (target_region )
636
+ replicas [target_region ] = ReplicaDescription (
637
+ RegionName = target_region ,
638
+ KMSMasterKeyId = details .get ("KMSMasterKeyId" ),
639
+ ProvisionedThroughputOverride = details .get (
640
+ "ProvisionedThroughputOverride"
641
+ ),
642
+ GlobalSecondaryIndexes = details .get ("GlobalSecondaryIndexes" ),
643
+ ReplicaStatus = ReplicaStatus .ACTIVE ,
644
+ )
630
645
case "Delete" :
631
646
try :
632
- replicas [global_table_region ].remove (target_region )
633
- if len (replicas [global_table_region ]) == 0 :
634
- # Removing the set indicates that replication is disabled
635
- replicas .pop (global_table_region )
647
+ replicas .pop (target_region )
636
648
except KeyError :
637
649
raise ValidationException (
638
650
"Update global table operation failed because one or more replicas were not part of the global table."
639
651
)
640
652
641
- store .REPLICA_UPDATES [table_name ] = replicas
653
+ store .REPLICAS [table_name ] = replicas
642
654
643
655
# update response content
644
656
schema = SchemaExtractor .get_table_schema (
@@ -663,10 +675,10 @@ def list_tables(
663
675
response = self .forward_request (context )
664
676
665
677
# Add replicated tables
666
- replicas = get_store (context .account_id , context .region ).REPLICA_UPDATES
678
+ replicas = get_store (context .account_id , context .region ).REPLICAS
667
679
for replicated_table , replications in replicas .items ():
668
- for original_region , replicated_regions in replications .items ():
669
- if context .region in replicated_regions :
680
+ for replica_region , replica_description in replications .items ():
681
+ if context .region == replica_region :
670
682
response ["TableNames" ].append (replicated_table )
671
683
672
684
return response
@@ -1343,13 +1355,14 @@ def get_global_table_region(context: RequestContext, table_name: str) -> str:
1343
1355
:param table_name: table name
1344
1356
:return: region
1345
1357
"""
1346
- replicas = get_store (context .account_id , context .region ).REPLICA_UPDATES .get (table_name )
1347
- if replicas :
1348
- global_table_region = list (replicas .keys ())[0 ]
1349
- replicated_at = replicas [global_table_region ]
1350
- # Ensure that a replica exists in the current context region, and that the table exists in DDB Local
1351
- if context .region == global_table_region or context .region in replicated_at :
1352
- return global_table_region
1358
+ store = get_store (context .account_id , context .region )
1359
+
1360
+ table_region = store .TABLE_REGION .get (table_name )
1361
+ replicated_at = store .REPLICAS .get (table_name , {}).keys ()
1362
+
1363
+ if context .region == table_region or context .region in replicated_at :
1364
+ return table_region
1365
+
1353
1366
return context .region
1354
1367
1355
1368
@staticmethod
0 commit comments