8000 implement new DDB streams provider using DynamoDB-Local by bentsku · Pull Request #11688 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

implement new DDB streams provider using DynamoDB-Local #11688

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Oct 18, 2024
Merged
12 changes: 12 additions & 0 deletions localstack-core/localstack/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,18 @@ def populate_edge_configuration(
if not os.environ.get("PROVIDER_OVERRIDE_APIGATEWAYMANAGEMENTAPI"):
os.environ["PROVIDER_OVERRIDE_APIGATEWAYMANAGEMENTAPI"] = "next_gen"
8000
# Whether the DynamoDBStreams native provider is enabled
DDB_STREAMS_PROVIDER_V2 = os.environ.get("PROVIDER_OVERRIDE_DYNAMODBSTREAMS", "") == "v2"
_override_dynamodb_v2 = os.environ.get("PROVIDER_OVERRIDE_DYNAMODB", "")
if DDB_STREAMS_PROVIDER_V2:
# in order to not have conflicts between the 2 implementations, as they are tightly coupled, we need to set DDB
# to be v2 as well
if not _override_dynamodb_v2:
os.environ["PROVIDER_OVERRIDE_DYNAMODB"] = "v2"
elif _override_dynamodb_v2 == "v2":
os.environ["PROVIDER_OVERRIDE_DYNAMODBSTREAMS"] = "v2"
DDB_STREAMS_PROVIDER_V2 = True


# TODO remove fallback to LAMBDA_DOCKER_NETWORK with next minor version
MAIN_DOCKER_NETWORK = os.environ.get("MAIN_DOCKER_NETWORK", "") or LAMBDA_DOCKER_NETWORK
Expand Down
35 changes: 5 additions & 30 deletions localstack-core/localstack/services/dynamodb/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import threading
import time
import traceback
from binascii import crc32
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
Expand Down Expand Up @@ -139,6 +138,7 @@
de_dynamize_record,
extract_table_name_from_partiql_update,
get_ddb_access_key,
modify_ddblocal_arns,
)
from localstack.services.dynamodbstreams import dynamodbstreams_api
from localstack.services.dynamodbstreams.models import dynamodbstreams_stores
Expand Down Expand Up @@ -539,21 +539,20 @@ def on_before_state_reset(self):

def on_before_state_load(self):
self.server.stop_dynamodb()
self.server = self._new_dynamodb_server()

def on_after_state_reset(self):
self.server = self._new_dynamodb_server()
self.server.start_dynamodb()

def _new_dynamodb_server(self) -> DynamodbServer:
return DynamodbServer(config.DYNAMODB_LOCAL_PORT)
@staticmethod
def _new_dynamodb_server() -> DynamodbServer:
return DynamodbServer.get()

def on_after_state_load(self):
self.server.start_dynamodb()

def on_after_init(self):
# add response processor specific to ddblocal
handlers.modify_service_response.append(self.service, self._modify_ddblocal_arns)
handlers.modify_service_response.append(self.service, modify_ddblocal_arns)

# routes for the shell ui
ROUTER.add(
Expand All @@ -566,30 +565,6 @@ def on_after_init(self):
endpoint=self.handle_shell_ui_request,
)

def _modify_ddblocal_arns(self, chain, context: RequestContext, response: Response):
"""A service response handler that modifies the dynamodb backend response."""
if response_content := response.get_data(as_text=True):

def _convert_arn(matchobj):
key = matchobj.group(1)
partition = get_partition(context.region)
table_name = matchobj.group(2)
return f'{key}: "arn:{partition}:dynamodb:{context.region}:{context.account_id}:{table_name}"'

# fix the table and latest stream ARNs (DynamoDBLocal hardcodes "ddblocal" as the region)
content_replaced = re.sub(
r'("TableArn"|"LatestStreamArn"|"StreamArn")\s*:\s*"arn:[a-z-]+:dynamodb:ddblocal:000000000000:([^"]+)"',
_convert_arn,
response_content,
)
if content_replaced != response_content:
response.data = content_replaced
# make sure the service response is parsed again later
context.service_response = None

# update x-amz-crc32 header required by some clients
response.headers["x-amz-crc32"] = crc32(response.data) & 0xFFFFFFFF

def _forward_request(
self,
context: RequestContext,
Expand Down
8 changes: 7 additions & 1 deletion localstack-core/localstack/services/dynamodb/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from localstack.utils.common import TMP_THREADS, ShellCommandThread, get_free_tcp_port, mkdir
from localstack.utils.functions import run_safe
from localstack.utils.net import wait_for_port_closed
from localstack.utils.objects import singleton_factory
from localstack.utils.run import FuncThread, run
from localstack.utils.serving import Server
from localstack.utils.sync import retry, synchronized
Expand Down Expand Up @@ -71,6 +72,11 @@ def __init__(
self.cors = os.getenv("DYNAMODB_CORS", None)
self.proxy = AwsRequestProxy(self.url)

@staticmethod
@singleton_factory
def get() -> "DynamodbServer":
return DynamodbServer(config.DYNAMODB_LOCAL_PORT)

def start_dynamodb(self) -> bool:
"""Start the DynamoDB server."""

Expand All @@ -79,7 +85,7 @@ def start_dynamodb(self) -> bool:
# - pod load with some assets already lying in the asset folder
# - ...
# The cleaning is now done via the reset endpoint

self._stopped.clear()
started = self.start()
self.wait_for_dynamodb()
return started
Expand Down
39 changes: 38 additions & 1 deletion localstack-core/localstack/services/dynamodb/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import logging
import re
from binascii import crc32
from typing import Dict, List, Optional

from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from cachetools import TTLCache
from moto.core.exceptions import JsonRESTError

from localstack.aws.api import RequestContext
from localstack.aws.api.dynamodb import (
AttributeMap,
BatchGetRequestMap,
Expand All @@ -20,7 +22,8 @@
)
from localstack.aws.connect import connect_to
from localstack.constants import INTERNAL_AWS_SECRET_ACCESS_KEY
from localstack.utils.aws.arns import dynamodb_table_arn
from localstack.http import Response
from localstack.utils.aws.arns import dynamodb_table_arn, get_partition
from localstack.utils.json import canonical_json
from localstack.utils.testutil import list_all_resources

Expand All @@ -29,6 +32,11 @@
# cache schema definitions
SCHEMA_CACHE = TTLCache(maxsize=50, ttl=20)

_ddb_local_arn_pattern = re.compile(
r'("TableArn"|"LatestStreamArn"|"StreamArn"|"ShardIterator")\s*:\s*"arn:[a-z-]+:dynamodb:ddblocal:000000000000:([^"]+)"'
)
_ddb_local_region_pattern = re.compile(r'"awsRegion"\s*:\s*"([^"]+)"')


def get_ddb_access_key(account_id: str, region_name: str) -> str:
"""
Expand Down Expand Up @@ -305,3 +313,32 @@ def de_dynamize_record(item: dict) -> dict:
"""
deserializer = TypeDeserializer()
return {k: deserializer.deserialize(v) for k, v in item.items()}


def modify_ddblocal_arns(chain, context: RequestContext, response: Response):
"""A service response handler that modifies the dynamodb backend response."""
if response_content := response.get_data(as_text=True):

def _convert_arn(matchobj):
key = matchobj.group(1)
partition = get_partition(context.region)
table_name = matchobj.group(2)
return f'{key}: "arn:{partition}:dynamodb:{context.region}:{context.account_id}:{table_name}"'

# fix the table and latest stream ARNs (DynamoDBLocal hardcodes "ddblocal" as the region)
content_replaced = _ddb_local_arn_pattern.sub(
_convert_arn,
response_content,
)
if context.service.service_name == "dynamodbstreams":
content_replaced = _ddb_local_region_pattern.sub(
f'"awsRegion": "{context.region}"', content_replaced
)

if content_replaced != response_content:
response.data = content_replaced
# make sure the service response is parsed again later
context.service_response = None

# update x-amz-crc32 header required by some clients
response.headers["x-amz-crc32"] = crc32(response.data) & 0xFFFFFFFF
Empty file.
Loading
Loading
0