diff --git a/localstack-core/localstack/config.py b/localstack-core/localstack/config.py index b80ac9bc8f0aa..6564cb1dc69fd 100644 --- a/localstack-core/localstack/config.py +++ b/localstack-core/localstack/config.py @@ -961,9 +961,6 @@ def populate_edge_configuration( # Additional flags passed to Docker run|create commands. LAMBDA_DOCKER_FLAGS = os.environ.get("LAMBDA_DOCKER_FLAGS", "").strip() -# PUBLIC: v2 (default), v1 (deprecated) Version of the Lambda Event Source Mapping implementation -LAMBDA_EVENT_SOURCE_MAPPING = os.environ.get("LAMBDA_EVENT_SOURCE_MAPPING", "v2").strip() - # PUBLIC: 0 (default) # Enable this flag to run cross-platform compatible lambda functions natively (i.e., Docker selects architecture) and # ignore the AWS architectures (i.e., x86_64, arm64) configured for the lambda function. @@ -1039,10 +1036,6 @@ def populate_edge_configuration( ) # the 100 comes from the init defaults ) -LAMBDA_SQS_EVENT_SOURCE_MAPPING_INTERVAL_SEC = float( - os.environ.get("LAMBDA_SQS_EVENT_SOURCE_MAPPING_INTERVAL_SEC") or 1.0 -) - # DEV: 0 (default unless in host mode on macOS) For LS developers only. Only applies to Docker mode. # Whether to explicitly expose a free TCP port in lambda containers when invoking functions in host mode for # systems that cannot reach the container via its IPv4. For example, macOS cannot reach Docker containers: diff --git a/localstack-core/localstack/deprecations.py b/localstack-core/localstack/deprecations.py index 2757abde57eff..32a6dd643fb2a 100644 --- a/localstack-core/localstack/deprecations.py +++ b/localstack-core/localstack/deprecations.py @@ -281,6 +281,17 @@ def is_affected(self) -> bool: "This option is ignored because the LocalStack SQS dependency for event invokes has been removed since 4.0.0" " in favor of a lightweight Lambda-internal SQS implementation.", ), + EnvVarDeprecation( + "LAMBDA_EVENT_SOURCE_MAPPING", + "4.0.0", + "This option has no effect anymore. Please remove this environment variable.", + ), + EnvVarDeprecation( + "LAMBDA_SQS_EVENT_SOURCE_MAPPING_INTERVAL_SEC", + "4.0.0", + "This option is not supported by the new Lambda Event Source Mapping v2 implementation." + " Please create a GitHub issue if you experience any performance challenges.", + ), ] @@ -328,20 +339,6 @@ def log_deprecation_warnings(deprecations: Optional[List[EnvVarDeprecation]] = N affected_deprecations = collect_affected_deprecations(deprecations) log_env_warning(affected_deprecations) - feature_override_lambda_esm = os.environ.get("LAMBDA_EVENT_SOURCE_MAPPING") - if feature_override_lambda_esm and feature_override_lambda_esm in ["v1", "legacy"]: - env_var_value = f"LAMBDA_EVENT_SOURCE_MAPPING={feature_override_lambda_esm}" - deprecation_version = "3.8.0" - deprecation_path = ( - f"Remove {env_var_value} to use the new Lambda Event Source Mapping implementation." - ) - LOG.warning( - "%s is deprecated (since %s) and will be removed in upcoming releases of LocalStack! %s", - env_var_value, - deprecation_version, - deprecation_path, - ) - def deprecated_endpoint( endpoint: Callable, previous_path: str, deprecation_version: str, new_path: str diff --git a/localstack-core/localstack/services/lambda_/event_source_listeners/__init__.py b/localstack-core/localstack/services/lambda_/event_source_listeners/__init__.py deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/localstack-core/localstack/services/lambda_/event_source_listeners/adapters.py b/localstack-core/localstack/services/lambda_/event_source_listeners/adapters.py deleted file mode 100644 index c01c5d8ddc023..0000000000000 --- a/localstack-core/localstack/services/lambda_/event_source_listeners/adapters.py +++ /dev/null @@ -1,263 +0,0 @@ -import abc -import json -import logging -import threading -from abc import ABC -from functools import lru_cache -from typing import Callable, Optional - -from localstack.aws.api.lambda_ import InvocationType -from localstack.aws.connect import ServiceLevelClientFactory, connect_to -from localstack.aws.protocol.serializer import gen_amzn_requestid -from localstack.services.lambda_ import api_utils -from localstack.services.lambda_.api_utils import function_locators_from_arn, qualifier_is_version -from localstack.services.lambda_.event_source_listeners.exceptions import FunctionNotFoundError -from localstack.services.lambda_.event_source_listeners.lambda_legacy import LegacyInvocationResult -from localstack.services.lambda_.event_source_listeners.utils import event_source_arn_matches -from localstack.services.lambda_.invocation.lambda_models import InvocationResult -from localstack.services.lambda_.invocation.lambda_service import LambdaService -from localstack.services.lambda_.invocation.models import lambda_stores -from localstack.utils.aws.client_types import ServicePrincipal -from localstack.utils.json import BytesEncoder -from localstack.utils.strings import to_bytes, to_str - -LOG = logging.getLogger(__name__) - - -class EventSourceAdapter(ABC): - """ - Adapter for the communication between event source mapping and lambda service - Generally just a temporary construct to bridge the old and new provider and re-use the existing event source listeners. - - Remove this file when sunsetting the legacy provider or when replacing the event source listeners. - """ - - def invoke( - self, - function_arn: str, - context: dict, - payload: dict, - invocation_type: InvocationType, - callback: Optional[Callable] = None, - ) -> None: - pass - - def invoke_with_statuscode( - self, - function_arn, - context, - payload, - invocation_type, - callback=None, - *, - lock_discriminator, - parallelization_factor, - ) -> int: - pass - - def get_event_sources(self, source_arn: str): - pass - - @abc.abstractmethod - def get_client_factory(self, function_arn: str, region_name: str) -> ServiceLevelClientFactory: - pass - - -class EventSourceAsfAdapter(EventSourceAdapter): - """ - Used to bridge run_lambda instances to the new provider - """ - - lambda_service: LambdaService - - def __init__(self, lambda_service: LambdaService): - self.lambda_service = lambda_service - - def invoke(self, function_arn, context, payload, invocation_type, callback=None): - request_id = gen_amzn_requestid() - self._invoke_async(request_id, function_arn, context, payload, invocation_type, callback) - - def _invoke_async( - self, - request_id: str, - function_arn: str, - context: dict, - payload: dict, - invocation_type: InvocationType, - callback: Optional[Callable] = None, - ): - # split ARN ( a bit unnecessary since we build an ARN again in the service) - fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(function_arn).groupdict() - function_name = fn_parts["function_name"] - # TODO: think about scaling here because this spawns a new thread for every invoke without limits! - thread = threading.Thread( - target=self._invoke_sync, - args=(request_id, function_arn, context, payload, invocation_type, callback), - daemon=True, - name=f"event-source-invoker-{function_name}-{request_id}", - ) - thread.start() - - def _invoke_sync( - self, - request_id: str, - function_arn: str, - context: dict, - payload: dict, - invocation_type: InvocationType, - callback: Optional[Callable] = None, - ): - """Performs the actual lambda invocation which will be run from a thread.""" - fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(function_arn).groupdict() - function_name = fn_parts["function_name"] - - result = self.lambda_service.invoke( - # basically function ARN - function_name=function_name, - qualifier=fn_parts["qualifier"], - region=fn_parts["region_name"], - account_id=fn_parts["account_id"], - invocation_type=invocation_type, - client_context=json.dumps(context or {}), - payload=to_bytes(json.dumps(payload or {}, cls=BytesEncoder)), - request_id=request_id, - ) - - if callback: - try: - error = None - if result.is_error: - error = "?" - result_payload = to_str(json.loads(result.payload)) if result.payload else "" - callback( - result=LegacyInvocationResult( - result=result_payload, - log_output=result.logs, - ), - func_arn="doesntmatter", - event="doesntmatter", - error=error, - ) - - except Exception as e: - # TODO: map exception to old error format? - LOG.debug("Encountered an exception while handling callback", exc_info=True) - callback( - result=None, - func_arn="doesntmatter", - event="doesntmatter", - error=e, - ) - - def invoke_with_statuscode( - self, - function_arn, - context, - payload, - invocation_type, - callback=None, - *, - lock_discriminator, - parallelization_factor, - ) -> int: - # split ARN ( a bit unnecessary since we build an ARN again in the service) - fn_parts = api_utils.FULL_FN_ARN_PATTERN.search(function_arn).groupdict() - - try: - result = self.lambda_service.invoke( - # basically function ARN - function_name=fn_parts["function_name"], - qualifier=fn_parts["qualifier"], - region=fn_parts["region_name"], - account_id=fn_parts["account_id"], - invocation_type=invocation_type, - client_context=json.dumps(context or {}), - payload=to_bytes(json.dumps(payload or {}, cls=BytesEncoder)), - request_id=gen_amzn_requestid(), - ) - - if callback: - - def mapped_callback(result: InvocationResult) -> None: - try: - error = None - if result.is_error: - error = "?" - result_payload = ( - to_str(json.loads(result.payload)) if result.payload else "" - ) - callback( - result=LegacyInvocationResult( - result=result_payload, - log_output=result.logs, - ), - func_arn="doesntmatter", - event="doesntmatter", - error=error, - ) - - except Exception as e: - LOG.debug("Encountered an exception while handling callback", exc_info=True) - callback( - result=None, - func_arn="doesntmatter", - event="doesntmatter", - error=e, - ) - - mapped_callback(result) - - # they're always synchronous in the ASF provider - if result.is_error: - return 500 - else: - return 200 - except Exception: - LOG.debug("Encountered an exception while handling lambda invoke", exc_info=True) - return 500 - - def get_event_sources(self, source_arn: str): - # assuming the region/account from function_arn - results = [] - for account_id in lambda_stores: - for region in lambda_stores[account_id]: - state = lambda_stores[account_id][region] - for esm in state.event_source_mappings.values(): - if ( - event_source_arn_matches( - mapped=esm.get("EventSourceArn"), searched=source_arn - ) - and esm.get("State", "") == "Enabled" - ): - results.append(esm.copy()) - return results - - @lru_cache(maxsize=64) - def _cached_client_factory(self, region_name: str, role_arn: str) -> ServiceLevelClientFactory: - return connect_to.with_assumed_role( - role_arn=role_arn, region_name=region_name, service_principal=ServicePrincipal.lambda_ - ) - - def _get_role_for_function(self, function_arn: str) -> str: - function_name, qualifier, account, region = function_locators_from_arn(function_arn) - store = lambda_stores[account][region] - function = store.functions.get(function_name) - - if not function: - raise FunctionNotFoundError(f"function not found: {function_arn}") - - if qualifier and qualifier != "$LATEST": - if qualifier_is_version(qualifier): - version_number = qualifier - else: - # the role of the routing config version and the regular configured version has to be identical - version_number = function.aliases.get(qualifier).function_version - version = function.versions.get(version_number) - else: - version = function.latest() - return version.config.role - - def get_client_factory(self, function_arn: str, region_name: str) -> ServiceLevelClientFactory: - role_arn = self._get_role_for_function(function_arn) - - return self._cached_client_factory(region_name=region_name, role_arn=role_arn) diff --git a/localstack-core/localstack/services/lambda_/event_source_listeners/dynamodb_event_source_listener.py b/localstack-core/localstack/services/lambda_/event_source_listeners/dynamodb_event_source_listener.py deleted file mode 100644 index a9724c9056ffb..0000000000000 --- a/localstack-core/localstack/services/lambda_/event_source_listeners/dynamodb_event_source_listener.py +++ /dev/null @@ -1,86 +0,0 @@ -import datetime -from typing import Dict, List, Optional - -from localstack.services.lambda_.event_source_listeners.stream_event_source_listener import ( - StreamEventSourceListener, -) -from localstack.services.lambda_.event_source_listeners.utils import filter_stream_records -from localstack.utils.aws.arns import extract_region_from_arn -from localstack.utils.threads import FuncThread - - -class DynamoDBEventSourceListener(StreamEventSourceListener): - _FAILURE_PAYLOAD_DETAILS_FIELD_NAME = "DDBStreamBatchInfo" - _COORDINATOR_THREAD: Optional[FuncThread] = ( - None # Thread for monitoring state of event source mappings - ) - _STREAM_LISTENER_THREADS: Dict[ - str, FuncThread - ] = {} # Threads for listening to stream shards and forwarding data to mapped Lambdas - - @staticmethod - def source_type() -> Optional[str]: - return "dynamodb" - - def _get_matching_event_sources(self) -> List[Dict]: - event_sources = self._invoke_adapter.get_event_sources(source_arn=r".*:dynamodb:.*") - return [source for source in event_sources if source["State"] == "Enabled"] - - def _get_stream_client(self, function_arn: str, region_name: str): - return self._invoke_adapter.get_client_factory( - function_arn=function_arn, region_name=region_name - ).dynamodbstreams.request_metadata(source_arn=function_arn) - - def _get_stream_description(self, stream_client, stream_arn): - return stream_client.describe_stream(StreamArn=stream_arn)["StreamDescription"] - - def _get_shard_iterator(self, stream_client, stream_arn, shard_id, iterator_type): - return stream_client.get_shard_iterator( - StreamArn=stream_arn, ShardId=shard_id, ShardIteratorType=iterator_type - )["ShardIterator"] - - def _filter_records( - self, records: List[Dict], event_filter_criterias: List[Dict] - ) -> List[Dict]: - if len(event_filter_criterias) == 0: - return records - - return filter_stream_records(records, event_filter_criterias) - - def _create_lambda_event_payload(self, stream_arn, records, shard_id=None): - record_payloads = [] - for record in records: - record_payloads.append( - { - "eventID": record["eventID"], - "eventVersion": "1.0", - "awsRegion": extract_region_from_arn(stream_arn), - "eventName": record["eventName"], - "eventSourceARN": stream_arn, - "eventSource": "aws:dynamodb", - "dynamodb": record["dynamodb"], - } - ) - return {"Records": record_payloads} - - def _get_starting_and_ending_sequence_numbers(self, first_record, last_record): - return first_record["dynamodb"]["SequenceNumber"], last_record["dynamodb"]["SequenceNumber"] - - def _get_first_and_last_arrival_time(self, first_record, last_record): - return ( - first_record.get("ApproximateArrivalTimestamp", datetime.datetime.utcnow()).isoformat() - + "Z", - last_record.get("ApproximateArrivalTimestamp", datetime.datetime.utcnow()).isoformat() - + "Z", - ) - - def _transform_records(self, raw_records: list[dict]) -> list[dict]: - """Convert dynamodb.ApproximateCreationDateTime datetime to float""" - records_new = [] - for record in raw_records: - record_new = record.copy() - if creation_time := record.get("dynamodb", {}).get("ApproximateCreationDateTime"): - # convert datetime object to float timestamp - record_new["dynamodb"]["ApproximateCreationDateTime"] = creation_time.timestamp() - records_new.append(record_new) - return records_new diff --git a/localstack-core/localstack/services/lambda_/event_source_listeners/event_source_listener.py b/localstack-core/localstack/services/lambda_/event_source_listeners/event_source_listener.py deleted file mode 100644 index e7166092da9cd..0000000000000 --- a/localstack-core/localstack/services/lambda_/event_source_listeners/event_source_listener.py +++ /dev/null @@ -1,63 +0,0 @@ -import logging -from typing import Dict, Optional, Type - -from localstack.services.lambda_.event_source_listeners.adapters import ( - EventSourceAdapter, - EventSourceAsfAdapter, -) -from localstack.services.lambda_.invocation.lambda_service import LambdaService -from localstack.utils.bootstrap import is_api_enabled -from localstack.utils.objects import SubtypesInstanceManager - -LOG = logging.getLogger(__name__) - - -class EventSourceListener(SubtypesInstanceManager): - INSTANCES: Dict[str, "EventSourceListener"] = {} - - @staticmethod - def source_type() -> Optional[str]: - """Type discriminator - to be implemented by subclasses.""" - return None - - def start(self, invoke_adapter: Optional[EventSourceAdapter] = None): - """Start listener in the background (for polling mode) - to be implemented by subclasses.""" - pass - - @staticmethod - def start_listeners_for_asf(event_source_mapping: Dict, lambda_service: LambdaService): - """limited version of start_listeners for the new provider during migration""" - # force import EventSourceListener subclasses - # otherwise they will not be detected by EventSourceListener.get(service_type) - from . import ( - dynamodb_event_source_listener, # noqa: F401 - kinesis_event_source_listener, # noqa: F401 - sqs_event_source_listener, # noqa: F401 - ) - - source_arn = event_source_mapping.get("EventSourceArn") or "" - parts = source_arn.split(":") - service_type = parts[2] if len(parts) > 2 else "" - if not service_type: - self_managed_endpoints = event_source_mapping.get("SelfManagedEventSource", {}).get( - "Endpoints", {} - ) - if self_managed_endpoints.get("KAFKA_BOOTSTRAP_SERVERS"): - service_type = "kafka" - elif not is_api_enabled(service_type): - LOG.info( - "Service %s is not enabled, cannot enable event-source-mapping. Please check your 'SERVICES' configuration variable.", - service_type, - ) - return - instance = EventSourceListener.get(service_type, raise_if_missing=False) - if instance: - instance.start(EventSourceAsfAdapter(lambda_service)) - - @classmethod - def impl_name(cls) -> str: - return cls.source_type() - - @classmethod - def get_base_type(cls) -> Type: - return EventSourceListener diff --git a/localstack-core/localstack/services/lambda_/event_source_listeners/exceptions.py b/localstack-core/localstack/services/lambda_/event_source_listeners/exceptions.py deleted file mode 100644 index a40273500cb6c..0000000000000 --- a/localstack-core/localstack/services/lambda_/event_source_listeners/exceptions.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Exceptions for lambda event source mapping machinery.""" - - -class FunctionNotFoundError(Exception): - """Indicates that a function that is part of an existing event source listener does not exist.""" diff --git a/localstack-core/localstack/services/lambda_/event_source_listeners/kinesis_event_source_listener.py b/localstack-core/localstack/services/lambda_/event_source_listeners/kinesis_event_source_listener.py deleted file mode 100644 index 2e17d555a958e..0000000000000 --- a/localstack-core/localstack/services/lambda_/event_source_listeners/kinesis_event_source_listener.py +++ /dev/null @@ -1,161 +0,0 @@ -import base64 -import datetime -import json -import logging -from copy import deepcopy -from typing import Dict, List, Optional - -from localstack.services.lambda_.event_source_listeners.stream_event_source_listener import ( - StreamEventSourceListener, -) -from localstack.services.lambda_.event_source_listeners.utils import ( - filter_stream_records, - has_data_filter_criteria, -) -from localstack.utils.aws.arns import ( - extract_account_id_from_arn, - extract_region_from_arn, - get_partition, -) -from localstack.utils.common import first_char_to_lower, to_str -from localstack.utils.threads import FuncThread - -LOG = logging.getLogger(__name__) - - -class KinesisEventSourceListener(StreamEventSourceListener): - _FAILURE_PAYLOAD_DETAILS_FIELD_NAME = "KinesisBatchInfo" - _COORDINATOR_THREAD: Optional[FuncThread] = ( - None # Thread for monitoring state of event source mappings - ) - _STREAM_LISTENER_THREADS: Dict[ - str, FuncThread - ] = {} # Threads for listening to stream shards and forwarding data to mapped Lambdas - - @staticmethod - def source_type() -> Optional[str]: - return "kinesis" - - def _get_matching_event_sources(self) -> List[Dict]: - event_sources = self._invoke_adapter.get_event_sources(source_arn=r".*:kinesis:.*") - return [source for source in event_sources if source["State"] == "Enabled"] - - def _get_stream_client(self, function_arn: str, region_name: str): - return self._invoke_adapter.get_client_factory( - function_arn=function_arn, region_name=region_name - ).kinesis.request_metadata(source_arn=function_arn) - - def _get_stream_description(self, stream_client, stream_arn): - stream_name = stream_arn.split("/")[-1] - return stream_client.describe_stream(StreamName=stream_name)["StreamDescription"] - - def _get_shard_iterator(self, stream_client, stream_arn, shard_id, iterator_type): - stream_name = stream_arn.split("/")[-1] - return stream_client.get_shard_iterator( - StreamName=stream_name, ShardId=shard_id, ShardIteratorType=iterator_type - )["ShardIterator"] - - def _filter_records( - self, records: List[Dict], event_filter_criterias: List[Dict] - ) -> List[Dict]: - """ - https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html - - Parse data as json if any data filter pattern present. - - Drop record if unable to parse. - - When filtering, the key has to be "data" - """ - if len(records) == 0: - return [] - - if len(event_filter_criterias) == 0: - return records - - if not has_data_filter_criteria(event_filter_criterias): - # Lambda filters (on the other metadata properties only) based on your filter criteria. - return filter_stream_records(records, event_filter_criterias) - - parsed_records = [] - for record in records: - raw_data = record["data"] - try: - # filters expect dict - parsed_data = json.loads(raw_data) - - # remap "data" key for filtering - parsed_record = deepcopy(record) - parsed_record["data"] = parsed_data - - parsed_records.append(parsed_record) - except json.JSONDecodeError: - LOG.warning( - "Unable to convert record '%s' to json... Record will be dropped.", - raw_data, - exc_info=LOG.isEnabledFor(logging.DEBUG), - ) - - filtered_records = filter_stream_records(parsed_records, event_filter_criterias) - - # convert data back to bytes and remap key (why remap???) - for filtered_record in filtered_records: - parsed_data = filtered_record.pop("data") - encoded_data = json.dumps(parsed_data).encode() - filtered_record["data"] = encoded_data - - return filtered_records - - def _create_lambda_event_payload( - self, stream_arn: str, record_payloads: list[dict], shard_id: Optional[str] = None - ) -> dict: - records = [] - account_id = extract_account_id_from_arn(stream_arn) - region = extract_region_from_arn(stream_arn) - partition = get_partition(region) - for record_payload in record_payloads: - records.append( - { - "eventID": "{0}:{1}".format(shard_id, record_payload["sequenceNumber"]), - "eventSourceARN": stream_arn, - "eventSource": "aws:kinesis", - "eventVersion": "1.0", - "eventName": "aws:kinesis:record", - "invokeIdentityArn": f"arn:{partition}:iam::{account_id}:role/lambda-role", - "awsRegion": region, - "kinesis": { - **record_payload, - # boto3 automatically decodes records in get_records(), so we must re-encode - "data": to_str(base64.b64encode(record_payload["data"])), - "kinesisSchemaVersion": "1.0", - }, - } - ) - return {"Records": records} - - def _get_starting_and_ending_sequence_numbers(self, first_record, last_record): - return first_record["sequenceNumber"], last_record["sequenceNumber"] - - def _get_first_and_last_arrival_time(self, first_record, last_record): - return ( - datetime.datetime.fromtimestamp(first_record["approximateArrivalTimestamp"]).isoformat() - + "Z", - datetime.datetime.fromtimestamp(last_record["approximateArrivalTimestamp"]).isoformat() - + "Z", - ) - - def _transform_records(self, raw_records: list[dict]) -> list[dict]: - """some, e.g. kinesis have to transform the incoming records (e.g. lowercasing of keys)""" - record_payloads = [] - for record in raw_records: - record_payload = {} - for key, val in record.items(): - record_payload[first_char_to_lower(key)] = val - # convert datetime obj to timestamp - # AWS requires millisecond precision, but the timestamp has to be in seconds with the milliseconds - # represented by the fraction part of the float - record_payload["approximateArrivalTimestamp"] = record_payload[ - "approximateArrivalTimestamp" - ].timestamp() - # this record should not be present in the payload. Cannot be deserialized by dotnet lambdas, for example - # FIXME remove once it is clear if kinesis should not return this value in the first place - record_payload.pop("encryptionType", None) - record_payloads.append(record_payload) - return record_payloads diff --git a/localstack-core/localstack/services/lambda_/event_source_listeners/lambda_legacy.py b/localstack-core/localstack/services/lambda_/event_source_listeners/lambda_legacy.py deleted file mode 100644 index dc0f10d1c2ce0..0000000000000 --- a/localstack-core/localstack/services/lambda_/event_source_listeners/lambda_legacy.py +++ /dev/null @@ -1,11 +0,0 @@ -# TODO: remove this legacy construct when re-working event source mapping. -class LegacyInvocationResult: - """Data structure for representing the result of a Lambda invocation in the old Lambda provider. - Could not be removed upon 3.0 because it was still used in the `sqs_event_source_listener.py` and `adapters.py`. - """ - - def __init__(self, result, log_output=""): - if isinstance(result, LegacyInvocationResult): - raise Exception("Unexpected invocation result type: %s" % result) - self.result = result - self.log_output = log_output or "" diff --git a/localstack-core/localstack/services/lambda_/event_source_listeners/sqs_event_source_listener.py b/localstack-core/localstack/services/lambda_/event_source_listeners/sqs_event_source_listener.py deleted file mode 100644 index 8acc78e9e1a7a..0000000000000 --- a/localstack-core/localstack/services/lambda_/event_source_listeners/sqs_event_source_listener.py +++ /dev/null @@ -1,318 +0,0 @@ -import json -import logging -import time -from typing import Dict, List, Optional - -from localstack import config -from localstack.aws.api.lambda_ import InvocationType -from localstack.services.lambda_.event_source_listeners.adapters import ( - EventSourceAdapter, -) -from localstack.services.lambda_.event_source_listeners.event_source_listener import ( - EventSourceListener, -) -from localstack.services.lambda_.event_source_listeners.lambda_legacy import LegacyInvocationResult -from localstack.services.lambda_.event_source_listeners.utils import ( - filter_stream_records, - message_attributes_to_lower, -) -from localstack.utils.aws import arns -from localstack.utils.aws.arns import extract_region_from_arn -from localstack.utils.threads import FuncThread - -LOG = logging.getLogger(__name__) - - -class SQSEventSourceListener(EventSourceListener): - # SQS listener thread settings - SQS_LISTENER_THREAD: Dict = {} - SQS_POLL_INTERVAL_SEC: float = config.LAMBDA_SQS_EVENT_SOURCE_MAPPING_INTERVAL_SEC - - _invoke_adapter: EventSourceAdapter - - @staticmethod - def source_type(): - return "sqs" - - def start(self, invoke_adapter: Optional[EventSourceAdapter] = None): - self._invoke_adapter = invoke_adapter - if self._invoke_adapter is None: - LOG.error("Invoke adapter needs to be set for new Lambda provider. Aborting.") - raise Exception("Invoke adapter not set ") - - if self.SQS_LISTENER_THREAD: - return - - LOG.debug("Starting SQS message polling thread for Lambda API") - self.SQS_LISTENER_THREAD["_thread_"] = thread = FuncThread( - self._listener_loop, name="sqs-event-source-listener" - ) - thread.start() - - def get_matching_event_sources(self) -> List[Dict]: - return self._invoke_adapter.get_event_sources(source_arn=r".*:sqs:.*") - - def _listener_loop(self, *args): - while True: - try: - sources = self.get_matching_event_sources() - if not sources: - # Temporarily disable polling if no event sources are configured - # anymore. The loop will get restarted next time a message - # arrives and if an event source is configured. - self.SQS_LISTENER_THREAD.pop("_thread_") - return - - for source in sources: - queue_arn = source["EventSourceArn"] - region_name = extract_region_from_arn(queue_arn) - - sqs_client = self._get_client( - function_arn=source["FunctionArn"], region_name=region_name - ) - batch_size = max(min(source.get("BatchSize", 1), 10), 1) - - try: - queue_url = arns.sqs_queue_url_for_arn(queue_arn) - result = sqs_client.receive_message( - QueueUrl=queue_url, - AttributeNames=["All"], - MessageAttributeNames=["All"], - MaxNumberOfMessages=batch_size, - ) - messages = result.get("Messages") - if not messages: - continue - - self._process_messages_for_event_source(source, messages) - - except Exception as e: - if "NonExistentQueue" not in str(e): - # TODO: remove event source if queue does no longer exist? - LOG.debug( - "Unable to poll SQS messages for queue %s: %s", - queue_arn, - e, - exc_info=True, - ) - - except Exception as e: - LOG.debug(e) - finally: - time.sleep(self.SQS_POLL_INTERVAL_SEC) - - def _process_messages_for_event_source(self, source, messages) -> None: - lambda_arn = source["FunctionArn"] - queue_arn = source["EventSourceArn"] - # https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting - report_partial_failures = "ReportBatchItemFailures" in source.get( - "FunctionResponseTypes", [] - ) - region_name = extract_region_from_arn(queue_arn) - queue_url = arns.sqs_queue_url_for_arn(queue_arn) - LOG.debug("Sending event from event source %s to Lambda %s", queue_arn, lambda_arn) - self._send_event_to_lambda( - queue_arn, - queue_url, - lambda_arn, - messages, - region=region_name, - report_partial_failures=report_partial_failures, - ) - - def _get_client(self, function_arn: str, region_name: str): - return self._invoke_adapter.get_client_factory( - function_arn=function_arn, region_name=region_name - ).sqs.request_metadata(source_arn=function_arn) - - def _get_lambda_event_filters_for_arn(self, function_arn: str, queue_arn: str): - result = [] - sources = self._invoke_adapter.get_event_sources(queue_arn) - filtered_sources = [s for s in sources if s["FunctionArn"] == function_arn] - - for fs in filtered_sources: - fc = fs.get("FilterCriteria") - if fc: - result.append(fc) - return result - - def _send_event_to_lambda( - self, queue_arn, queue_url, lambda_arn, messages, region, report_partial_failures=False - ) -> None: - records = [] - - def delete_messages(result: LegacyInvocationResult, func_arn, event, error=None, **kwargs): - if error: - # Skip deleting messages from the queue in case of processing errors. We'll pick them up and retry - # next time they become visible in the queue. Redrive policies will be handled automatically by SQS - # on the next polling attempt. - # Even if ReportBatchItemFailures is set, the entire batch fails if an error is raised. - return - - region_name = extract_region_from_arn(queue_arn) - sqs_client = self._get_client(function_arn=lambda_arn, region_name=region_name) - - if report_partial_failures: - valid_message_ids = [r["messageId"] for r in records] - # collect messages to delete (= the ones that were processed successfully) - try: - if messages_to_keep := parse_batch_item_failures( - result, valid_message_ids=valid_message_ids - ): - # unless there is an exception or the parse result is empty, only delete those messages that - # are not part of the partial failure report. - messages_to_delete = [ - message_id - for message_id in valid_message_ids - if message_id not in messages_to_keep - ] - else: - # otherwise delete all messages - messages_to_delete = valid_message_ids - - LOG.debug( - "Lambda partial SQS batch failure report: ok=%s, failed=%s", - messages_to_delete, - messages_to_keep, - ) - except Exception as e: - LOG.error( - "Error while parsing batchItemFailures from lambda response %s: %s. " - "Treating the batch as complete failure.", - result.result, - e, - ) - return - - entries = [ - {"Id": r["messageId"], "ReceiptHandle": r["receiptHandle"]} - for r in records - if r["messageId"] in messages_to_delete - ] - - else: - entries = [ - {"Id": r["messageId"], "ReceiptHandle": r["receiptHandle"]} for r in records - ] - - try: - sqs_client.delete_message_batch(QueueUrl=queue_url, Entries=entries) - except Exception as e: - LOG.info( - "Unable to delete Lambda events from SQS queue " - "(please check SQS visibility timeout settings): %s - %s", - entries, - e, - ) - - for msg in messages: - message_attrs = message_attributes_to_lower(msg.get("MessageAttributes")) - record = { - "body": msg.get("Body", "MessageBody"), - "receiptHandle": msg.get("ReceiptHandle"), - "md5OfBody": msg.get("MD5OfBody") or msg.get("MD5OfMessageBody"), - "eventSourceARN": queue_arn, - "eventSource": "aws:sqs", - "awsRegion": region, - "messageId": msg["MessageId"], - "attributes": msg.get("Attributes", {}), - "messageAttributes": message_attrs, - } - - if md5OfMessageAttributes := msg.get("MD5OfMessageAttributes"): - record["md5OfMessageAttributes"] = md5OfMessageAttributes - - records.append(record) - - event_filter_criterias = self._get_lambda_event_filters_for_arn(lambda_arn, queue_arn) - if len(event_filter_criterias) > 0: - # convert to json for filtering - for record in records: - try: - record["body"] = json.loads(record["body"]) - except json.JSONDecodeError: - LOG.warning( - "Unable to convert record '%s' to json... Record might be dropped.", - record["body"], - ) - records = filter_stream_records(records, event_filter_criterias) - # convert them back - for record in records: - record["body"] = ( - json.dumps(record["body"]) - if not isinstance(record["body"], str) - else record["body"] - ) - - # all messages were filtered out - if not len(records) > 0: - return - - event = {"Records": records} - - self._invoke_adapter.invoke( - function_arn=lambda_arn, - context={}, - payload=event, - invocation_type=InvocationType.RequestResponse, - callback=delete_messages, - ) - - -def parse_batch_item_failures( - result: LegacyInvocationResult, valid_message_ids: List[str] -) -> List[str]: - """ - Parses a lambda responses as a partial batch failure response, that looks something like this:: - - { - "batchItemFailures": [ - { - "itemIdentifier": "id2" - }, - { - "itemIdentifier": "id4" - } - ] - } - - If the response returns an empty list, then the batch should be considered as a complete success. If an exception - is raised, the batch should be considered a complete failure. - - See https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting - - :param result: the lambda invocation result - :param valid_message_ids: the list of valid message ids in the batch - :raises KeyError: if the itemIdentifier value is missing or not in the batch - :raises Exception: any other exception related to parsing (e.g., JSON parser error) - :return: a list of message IDs that are part of a failure and should not be deleted from the queue - """ - if not result or not result.result: - return [] - - if isinstance(result.result, dict): - partial_batch_failure = result.result - else: - partial_batch_failure = json.loads(result.result) - - if not partial_batch_failure: - return [] - - batch_item_failures = partial_batch_failure.get("batchItemFailures") - - if not batch_item_failures: - return [] - - messages_to_keep = [] - for item in batch_item_failures: - if "itemIdentifier" not in item: - raise KeyError(f"missing itemIdentifier in batchItemFailure record {item}") - - item_identifier = item["itemIdentifier"] - - if item_identifier not in valid_message_ids: - raise KeyError(f"itemIdentifier '{item_identifier}' not in the batch") - - messages_to_keep.append(item_identifier) - - return messages_to_keep diff --git a/localstack-core/localstack/services/lambda_/event_source_listeners/stream_event_source_listener.py b/localstack-core/localstack/services/lambda_/event_source_listeners/stream_event_source_listener.py deleted file mode 100644 index abd18f1cd2fec..0000000000000 --- a/localstack-core/localstack/services/lambda_/event_source_listeners/stream_event_source_listener.py +++ /dev/null @@ -1,431 +0,0 @@ -import logging -import math -import time -from typing import Dict, List, Optional, Tuple - -from botocore.exceptions import ClientError - -from localstack.aws.api.lambda_ import InvocationType -from localstack.services.lambda_.event_source_listeners.adapters import ( - EventSourceAdapter, -) -from localstack.services.lambda_.event_source_listeners.event_source_listener import ( - EventSourceListener, -) -from localstack.utils.aws.arns import extract_region_from_arn -from localstack.utils.aws.message_forwarding import send_event_to_target -from localstack.utils.common import long_uid, timestamp_millis -from localstack.utils.threads import FuncThread - -LOG = logging.getLogger(__name__) - -monitor_counter = 0 -counter = 0 - - -class StreamEventSourceListener(EventSourceListener): - """ - Abstract class for listening to streams associated with event source mappings, batching data from those streams, - and invoking the appropriate Lambda functions with those data batches. - Because DynamoDB Streams and Kinesis Streams have similar but different APIs, this abstract class is useful for - reducing repeated code. The various methods that must be implemented by inheriting subclasses essentially wrap - client API methods or middleware-style operations on data payloads to compensate for the minor differences between - these two services. - """ - - _COORDINATOR_THREAD: Optional[FuncThread] = ( - None # Thread for monitoring state of event source mappings - ) - _STREAM_LISTENER_THREADS: Dict[ - str, FuncThread - ] = {} # Threads for listening to stream shards and forwarding data to mapped Lambdas - _POLL_INTERVAL_SEC: float = 1 - _FAILURE_PAYLOAD_DETAILS_FIELD_NAME = "" # To be defined by inheriting classes - _invoke_adapter: EventSourceAdapter - - @staticmethod - def source_type() -> Optional[str]: - """ - to be implemented by subclasses - :returns: The type of event source this listener is associated with - """ - # to be implemented by inheriting classes - return None - - def _get_matching_event_sources(self) -> List[Dict]: - """ - to be implemented by subclasses - :returns: A list of active Event Source Mapping objects (as dicts) that match the listener type - """ - raise NotImplementedError - - def _get_stream_client(self, function_arn: str, region_name: str): - """ - to be implemented by subclasses - :returns: An AWS service client instance for communicating with the appropriate API - """ - raise NotImplementedError - - def _get_stream_description(self, stream_client, stream_arn): - """ - to be implemented by subclasses - :returns: The stream description object returned by the client's describe_stream method - """ - raise NotImplementedError - - def _get_shard_iterator(self, stream_client, stream_arn, shard_id, iterator_type): - """ - to be implemented by subclasses - :returns: The shard iterator object returned by the client's get_shard_iterator method - """ - raise NotImplementedError - - def _create_lambda_event_payload( - self, stream_arn: str, records: List[Dict], shard_id: Optional[str] = None - ) -> Dict: - """ - to be implemented by subclasses - Get an event payload for invoking a Lambda function using the given records and stream metadata - :param stream_arn: ARN of the event source stream - :param records: Batch of records to include in the payload, obtained from the source stream - :param shard_id: ID of the shard the records came from. This is only needed for Kinesis event payloads. - :returns: An event payload suitable for invoking a Lambda function - """ - raise NotImplementedError - - def _get_starting_and_ending_sequence_numbers( - self, first_record: Dict, last_record: Dict - ) -> Tuple[str, str]: - """ - to be implemented by subclasses - :returns: the SequenceNumber field values from the given records - """ - raise NotImplementedError - - def _get_first_and_last_arrival_time( - self, first_record: Dict, last_record: Dict - ) -> Tuple[str, str]: - """ - to be implemented by subclasses - :returns: the timestamps the given records were created/entered the source stream in iso8601 format - """ - raise NotImplementedError - - def _filter_records( - self, records: List[Dict], event_filter_criterias: List[Dict] - ) -> List[Dict]: - """ - to be implemented by subclasses - :returns: records after being filtered by event fitlter criterias - """ - raise NotImplementedError - - def start(self, invoke_adapter: Optional[EventSourceAdapter] = None): - """ - Spawn coordinator thread for listening to relevant new/removed event source mappings - """ - global counter - if self._COORDINATOR_THREAD is not None: - return - - LOG.debug("Starting %s event source listener coordinator thread", self.source_type()) - self._invoke_adapter = invoke_adapter - if self._invoke_adapter is None: - LOG.error("Invoke adapter needs to be set for new Lambda provider. Aborting.") - raise Exception("Invoke adapter not set ") - counter += 1 - self._COORDINATOR_THREAD = FuncThread( - self._monitor_stream_event_sources, name=f"stream-listener-{counter}" - ) - self._COORDINATOR_THREAD.start() - - # TODO: remove lock_discriminator and parallelization_factor old lambda provider is gone - def _invoke_lambda( - self, function_arn, payload, lock_discriminator, parallelization_factor - ) -> Tuple[bool, int]: - """ - invoke a given lambda function - :returns: True if the invocation was successful (False otherwise) and the status code of the invocation result - - # TODO: rework this to properly invoke a lambda through the API. Needs additional restructuring upstream of this function as well. - """ - - status_code = self._invoke_adapter.invoke_with_statuscode( - function_arn=function_arn, - payload=payload, - invocation_type=InvocationType.RequestResponse, - context={}, - lock_discriminator=lock_discriminator, - parallelization_factor=parallelization_factor, - ) - - if status_code >= 400: - return False, status_code - return True, status_code - - def _get_lambda_event_filters_for_arn(self, function_arn: str, queue_arn: str): - result = [] - sources = self._invoke_adapter.get_event_sources(queue_arn) - filtered_sources = [s for s in sources if s["FunctionArn"] == function_arn] - - for fs in filtered_sources: - fc = fs.get("FilterCriteria") - if fc: - result.append(fc) - return result - - def _listen_to_shard_and_invoke_lambda(self, params: Dict): - """ - Continuously listens to a stream's shard. Divides records read from the shard into batches and use them to - invoke a Lambda. - This function is intended to be invoked as a FuncThread. Because FuncThreads can only take a single argument, - we pack the numerous arguments needed to invoke this method into a single dictionary. - :param params: Dictionary containing the following elements needed to execute this method: - * function_arn: ARN of the Lambda function to invoke - * stream_arn: ARN of the stream associated with the shard to listen on - * batch_size: number of records to pass to the Lambda function per invocation - * parallelization_factor: parallelization factor for executing lambda funcs asynchronously - * lock_discriminator: discriminator for checking semaphore on lambda function execution. Also used for - checking if this listener loops should continue to run. - * shard_id: ID of the shard to listen on - * stream_client: AWS service client for communicating with the stream API - * shard_iterator: shard iterator object for iterating over records in stream - * max_num_retries: maximum number of times to attempt invoking a batch against the Lambda before giving up - and moving on - * failure_destination: Optional destination config for sending record metadata to if Lambda invocation fails - more than max_num_retries - """ - # TODO: These values will never get updated if the event source mapping configuration changes :( - try: - function_arn = params["function_arn"] - stream_arn = params["stream_arn"] - batch_size = params["batch_size"] - parallelization_factor = params["parallelization_factor"] - lock_discriminator = params["lock_discriminator"] - shard_id = params["shard_id"] - stream_client = params["stream_client"] - shard_iterator = params["shard_iterator"] - failure_destination = params["failure_destination"] - max_num_retries = params["max_num_retries"] - num_invocation_failures = 0 - - while lock_discriminator in self._STREAM_LISTENER_THREADS: - try: - records_response = stream_client.get_records( - ShardIterator=shard_iterator, Limit=batch_size - ) - except ClientError as e: - if "AccessDeniedException" in str(e): - LOG.warning( - "Insufficient permissions to get records from stream %s: %s", - stream_arn, - e, - ) - else: - raise - else: - raw_records = records_response.get("Records") - event_filter_criterias = self._get_lambda_event_filters_for_arn( - function_arn, stream_arn - ) - - # apply transformations on the raw event that the stream produced - records = self._transform_records(raw_records) - - # filter the retrieved & transformed records according to - # https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html - filtered_records = self._filter_records(records, event_filter_criterias) - - should_get_next_batch = True - if filtered_records: - payload = self._create_lambda_event_payload( - stream_arn, records, shard_id=shard_id - ) - is_invocation_successful, status_code = self._invoke_lambda( - function_arn, payload, lock_discriminator, parallelization_factor - ) - if is_invocation_successful: - should_get_next_batch = True - else: - num_invocation_failures += 1 - if num_invocation_failures >= max_num_retries: - should_get_next_batch = True - if failure_destination: - first_rec = records[0] - last_rec = records[-1] - ( - first_seq_num, - last_seq_num, - ) = self._get_starting_and_ending_sequence_numbers( - first_rec, last_rec - ) - ( - first_arrival_time, - last_arrival_time, - ) = self._get_first_and_last_arrival_time(first_rec, last_rec) - self._send_to_failure_destination( - shard_id, - first_seq_num, - last_seq_num, - stream_arn, - function_arn, - num_invocation_failures, - status_code, - batch_size, - first_arrival_time, - last_arrival_time, - failure_destination, - ) - else: - should_get_next_batch = False - if should_get_next_batch: - shard_iterator = records_response["NextShardIterator"] - num_invocation_failures = 0 - time.sleep(self._POLL_INTERVAL_SEC) - except Exception as e: - LOG.error( - "Error while listening to shard / executing lambda with params %s: %s", - params, - e, - exc_info=LOG.isEnabledFor(logging.DEBUG), - ) - raise - - def _send_to_failure_destination( - self, - shard_id, - start_sequence_num, - end_sequence_num, - source_arn, - func_arn, - invoke_count, - status_code, - batch_size, - first_record_arrival_time, - last_record_arrival_time, - destination, - ): - """ - Creates a metadata payload relating to a failed Lambda invocation and delivers it to the given destination - """ - payload = { - "version": "1.0", - "timestamp": timestamp_millis(), - "requestContext": { - "requestId": long_uid(), - "functionArn": func_arn, - "condition": "RetryAttemptsExhausted", - "approximateInvokeCount": invoke_count, - }, - "responseContext": { - "statusCode": status_code, - "executedVersion": "$LATEST", # TODO: don't hardcode these fields - "functionError": "Unhandled", - }, - } - details = { - "shardId": shard_id, - "startSequenceNumber": start_sequence_num, - "endSequenceNumber": end_sequence_num, - "approximateArrivalOfFirstRecord": first_record_arrival_time, - "approximateArrivalOfLastRecord": last_record_arrival_time, - "batchSize": batch_size, - "streamArn": source_arn, - } - payload[self._FAILURE_PAYLOAD_DETAILS_FIELD_NAME] = details - send_event_to_target(target_arn=destination, event=payload, source_arn=source_arn) - - def _monitor_stream_event_sources(self, *args): - """ - Continuously monitors event source mappings. When a new event source for the relevant stream type is created, - spawns listener threads for each shard in the stream. When an event source is deleted, stops the associated - child threads. - """ - global monitor_counter - while True: - try: - # current set of streams + shard IDs that should be feeding Lambda functions based on event sources - mapped_shard_ids = set() - sources = self._get_matching_event_sources() - if not sources: - # Temporarily disable polling if no event sources are configured - # anymore. The loop will get restarted next time a record - # arrives and if an event source is configured. - self._COORDINATOR_THREAD = None - self._STREAM_LISTENER_THREADS = {} - return - - # make sure each event source stream has a lambda listening on each of its shards - for source in sources: - mapping_uuid = source["UUID"] - stream_arn = source["EventSourceArn"] - region_name = extract_region_from_arn(stream_arn) - stream_client = self._get_stream_client(source["FunctionArn"], region_name) - batch_size = source.get("BatchSize", 10) - failure_destination = ( - source.get("DestinationConfig", {}) - .get("OnFailure", {}) - .get("Destination", None) - ) - max_num_retries = source.get("MaximumRetryAttempts", -1) - if max_num_retries < 0: - max_num_retries = math.inf - try: - stream_description = self._get_stream_description(stream_client, stream_arn) - except Exception as e: - LOG.error( - "Cannot describe target stream %s of event source mapping %s: %s", - stream_arn, - mapping_uuid, - e, - ) - continue - if stream_description["StreamStatus"] not in {"ENABLED", "ACTIVE"}: - continue - shard_ids = [shard["ShardId"] for shard in stream_description["Shards"]] - - for shard_id in shard_ids: - lock_discriminator = f"{mapping_uuid}/{stream_arn}/{shard_id}" - mapped_shard_ids.add(lock_discriminator) - if lock_discriminator not in self._STREAM_LISTENER_THREADS: - shard_iterator = self._get_shard_iterator( - stream_client, - stream_arn, - shard_id, - source["StartingPosition"], - ) - monitor_counter += 1 - - listener_thread = FuncThread( - self._listen_to_shard_and_invoke_lambda, - { - "function_arn": source["FunctionArn"], - "stream_arn": stream_arn, - "batch_size": batch_size, - "parallelization_factor": source.get( - "ParallelizationFactor", 1 - ), - "lock_discriminator": lock_discriminator, - "shard_id": shard_id, - "stream_client": stream_client, - "shard_iterator": shard_iterator, - "failure_destination": failure_destination, - "max_num_retries": max_num_retries, - }, - name=f"monitor-stream-thread-{monitor_counter}", - ) - self._STREAM_LISTENER_THREADS[lock_discriminator] = listener_thread - listener_thread.start() - - # stop any threads that are listening to a previously defined event source that no longer exists - orphaned_threads = set(self._STREAM_LISTENER_THREADS.keys()) - mapped_shard_ids - for thread_id in orphaned_threads: - self._STREAM_LISTENER_THREADS.pop(thread_id) - - except Exception as e: - LOG.exception(e) - time.sleep(self._POLL_INTERVAL_SEC) - - def _transform_records(self, raw_records: list[dict]) -> list[dict]: - """some, e.g. kinesis have to transform the incoming records (e.g. lower-casing of keys)""" - return raw_records diff --git a/localstack-core/localstack/services/lambda_/event_source_listeners/utils.py b/localstack-core/localstack/services/lambda_/event_source_listeners/utils.py deleted file mode 100644 index 587245598947f..0000000000000 --- a/localstack-core/localstack/services/lambda_/event_source_listeners/utils.py +++ /dev/null @@ -1,236 +0,0 @@ -import json -import logging -import re - -from localstack import config -from localstack.aws.api.lambda_ import FilterCriteria -from localstack.services.events.event_ruler import matches_rule -from localstack.utils.strings import first_char_to_lower - -LOG = logging.getLogger(__name__) - - -class InvalidEventPatternException(Exception): - reason: str - - def __init__(self, reason=None, message=None) -> None: - self.reason = reason - self.message = message or f"Event pattern is not valid. Reason: {reason}" - - -def filter_stream_records(records, filters: list[FilterCriteria]): - filtered_records = [] - for record in records: - for filter in filters: - for rule in filter["Filters"]: - if config.EVENT_RULE_ENGINE == "java": - event_str = json.dumps(record) - event_pattern_str = rule["Pattern"] - match_result = matches_rule(event_str, event_pattern_str) - else: - filter_pattern: dict[str, any] = json.loads(rule["Pattern"]) - match_result = does_match_event(filter_pattern, record) - if match_result: - filtered_records.append(record) - break - return filtered_records - - -def does_match_event(event_pattern: dict[str, any], event: dict[str, any]) -> bool: - """Decides whether an event pattern matches an event or not. - Returns True if the `event_pattern` matches the given `event` and False otherwise. - - Implements "Amazon EventBridge event patterns": - https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html - Used in different places: - * EventBridge: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html - * Lambda ESM: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html - * EventBridge Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html - * SNS: https://docs.aws.amazon.com/sns/latest/dg/sns-subscription-filter-policies.html - - Open source AWS rule engine: https://github.com/aws/event-ruler - """ - # TODO: test this conditional: https://coveralls.io/builds/66584026/source?filename=localstack%2Fservices%2Flambda_%2Fevent_source_listeners%2Futils.py#L25 - if not event_pattern: - return True - does_match_results = [] - for key, value in event_pattern.items(): - # check if rule exists in event - event_value = event.get(key) if isinstance(event, dict) else None - does_pattern_match = False - if event_value is not None: - # check if filter rule value is a list (leaf of rule tree) or a dict (recursively call function) - if isinstance(value, list): - if len(value) > 0: - if isinstance(value[0], (str, int)): - does_pattern_match = event_value in value - if isinstance(value[0], dict): - does_pattern_match = verify_dict_filter(event_value, value[0]) - else: - LOG.warning("Empty lambda filter: %s", key) - elif isinstance(value, dict): - does_pattern_match = does_match_event(value, event_value) - else: - # special case 'exists' - def _filter_rule_value_list(val): - if isinstance(val[0], dict): - return not val[0].get("exists", True) - elif val[0] is None: - # support null filter - return True - - def _filter_rule_value_dict(val): - for k, v in val.items(): - return ( - _filter_rule_value_list(val[k]) - if isinstance(val[k], list) - else _filter_rule_value_dict(val[k]) - ) - return True - - if isinstance(value, list) and len(value) > 0: - does_pattern_match = _filter_rule_value_list(value) - elif isinstance(value, dict): - # special case 'exists' for S type, e.g. {"S": [{"exists": false}]} - # https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.Lambda.Tutorial2.html - does_pattern_match = _filter_rule_value_dict(value) - - does_match_results.append(does_pattern_match) - return all(does_match_results) - - -def verify_dict_filter(record_value: any, dict_filter: dict[str, any]) -> bool: - # https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax - does_match_filter = False - for key, filter_value in dict_filter.items(): - if key == "anything-but": - does_match_filter = record_value not in filter_value - elif key == "numeric": - does_match_filter = handle_numeric_conditions(record_value, filter_value) - elif key == "exists": - does_match_filter = bool( - filter_value - ) # exists means that the key exists in the event record - elif key == "prefix": - if not isinstance(record_value, str): - LOG.warning("Record Value %s does not seem to be a valid string.", record_value) - does_match_filter = isinstance(record_value, str) and record_value.startswith( - str(filter_value) - ) - if does_match_filter: - return True - - return does_match_filter - - -def handle_numeric_conditions( - first_operand: int | float, conditions: list[str | int | float] -) -> bool: - """Implements numeric matching for a given list of conditions. - Example: { "numeric": [ ">", 0, "<=", 5 ] } - - Numeric matching works with values that are JSON numbers. - It is limited to values between -5.0e9 and +5.0e9 inclusive, with 15 digits of precision, - or six digits to the right of the decimal point. - https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns-content-based-filtering.html#filtering-numeric-matchinghttps://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns-content-based-filtering.html#filtering-numeric-matching - """ - # Invalid example for uneven list: { "numeric": [ ">", 0, "<" ] } - if len(conditions) % 2 > 0: - raise InvalidEventPatternException("Bad numeric range operator") - - if not isinstance(first_operand, (int, float)): - raise InvalidEventPatternException( - f"The value {first_operand} for the numeric comparison {conditions} is not a valid number" - ) - - for i in range(0, len(conditions), 2): - operator = conditions[i] - second_operand_str = conditions[i + 1] - try: - second_operand = float(second_operand_str) - except ValueError: - raise InvalidEventPatternException( - f"Could not convert filter value {second_operand_str} to a valid number" - ) from ValueError - - if operator == ">" and not (first_operand > second_operand): - return False - if operator == ">=" and not (first_operand >= second_operand): - return False - if operator == "=" and not (first_operand == second_operand): - return False - if operator == "<" and not (first_operand < second_operand): - return False - if operator == "<=" and not (first_operand <= second_operand): - return False - return True - - -def contains_list(filter: dict) -> bool: - if isinstance(filter, dict): - for key, value in filter.items(): - if isinstance(value, list) and len(value) > 0: - return True - return contains_list(value) - return False - - -def validate_filters(filter: FilterCriteria) -> bool: - # filter needs to be json serializeable - for rule in filter["Filters"]: - try: - if not (filter_pattern := json.loads(rule["Pattern"])): - return False - return contains_list(filter_pattern) - except json.JSONDecodeError: - return False - # needs to contain on what to filter (some list with citerias) - # https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax - - return True - - -def message_attributes_to_lower(message_attrs): - """Convert message attribute details (first characters) to lower case (e.g., stringValue, dataType).""" - message_attrs = message_attrs or {} - for _, attr in message_attrs.items(): - if not isinstance(attr, dict): - continue - for key, value in dict(attr).items(): - attr[first_char_to_lower(key)] = attr.pop(key) - return message_attrs - - -def event_source_arn_matches(mapped: str, searched: str) -> bool: - if not mapped: - return False - if not searched or mapped == searched: - return True - # Some types of ARNs can end with a path separated by slashes, for - # example the ARN of a DynamoDB stream is tableARN/stream/ID. It's - # a little counterintuitive that a more specific mapped ARN can - # match a less specific ARN on the event, but some integration tests - # rely on it for things like subscribing to a stream and matching an - # event labeled with the table ARN. - if re.match(r"^%s$" % searched, mapped): - return True - if mapped.startswith(searched): - suffix = mapped[len(searched) :] - return suffix[0] == "/" - return False - - -def has_data_filter_criteria(filters: list[FilterCriteria]) -> bool: - for filter in filters: - for rule in filter.get("Filters", []): - parsed_pattern = json.loads(rule["Pattern"]) - if "data" in parsed_pattern: - return True - return False - - -def has_data_filter_criteria_parsed(parsed_filters: list[dict]) -> bool: - for filter in parsed_filters: - if "data" in filter: - return True - return False diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/kinesis_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/kinesis_poller.py index 128cbcf98b5ac..fb66cd8f64307 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/kinesis_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/kinesis_poller.py @@ -10,9 +10,6 @@ from localstack.aws.api.pipes import ( KinesisStreamStartPosition, ) -from localstack.services.lambda_.event_source_listeners.utils import ( - has_data_filter_criteria_parsed, -) from localstack.services.lambda_.event_source_mapping.event_processor import ( EventProcessor, ) @@ -200,3 +197,10 @@ def parse_data(self, raw_data: str) -> dict | str: def encode_data(self, parsed_data: dict) -> str: return base64.b64encode(json.dumps(parsed_data).encode()).decode() + + +def has_data_filter_criteria_parsed(parsed_filters: list[dict]) -> bool: + for filter in parsed_filters: + if "data" in filter: + return True + return False diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py index 7b3c87bdcc00c..65953f13bd263 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py @@ -8,7 +8,6 @@ from localstack.aws.api.pipes import PipeSourceSqsQueueParameters from localstack.aws.api.sqs import MessageSystemAttributeName from localstack.config import internal_service_url -from localstack.services.lambda_.event_source_listeners.utils import message_attributes_to_lower from localstack.services.lambda_.event_source_mapping.event_processor import ( EventProcessor, PartialBatchFailureError, @@ -18,6 +17,7 @@ parse_batch_item_failures, ) from localstack.utils.aws.arns import parse_arn +from localstack.utils.strings import first_char_to_lower LOG = logging.getLogger(__name__) @@ -216,3 +216,14 @@ def get_queue_url(queue_arn: str) -> str: account_id = parsed_arn["account"] name = parsed_arn["resource"] return f"{host}/{account_id}/{name}" + + +def message_attributes_to_lower(message_attrs): + """Convert message attribute details (first characters) to lower case (e.g., stringValue, dataType).""" + message_attrs = message_attrs or {} + for _, attr in message_attrs.items(): + if not isinstance(attr, dict): + continue + for key, value in dict(attr).items(): + attr[first_char_to_lower(key)] = attr.pop(key) + return message_attrs diff --git a/localstack-core/localstack/services/lambda_/provider.py b/localstack-core/localstack/services/lambda_/provider.py index 4733c8e39b2ff..c784f296cbc81 100644 --- a/localstack-core/localstack/services/lambda_/provider.py +++ b/localstack-core/localstack/services/lambda_/provider.py @@ -150,10 +150,6 @@ STATEMENT_ID_REGEX, function_locators_from_arn, ) -from localstack.services.lambda_.event_source_listeners.event_source_listener import ( - EventSourceListener, -) -from localstack.services.lambda_.event_source_listeners.utils import validate_filters from localstack.services.lambda_.event_source_mapping.esm_config_factory import ( EsmConfigFactory, ) @@ -231,7 +227,7 @@ from localstack.utils.bootstrap import is_api_enabled from localstack.utils.collections import PaginatedList from localstack.utils.files import load_file -from localstack.utils.strings import get_random_hex, long_uid, short_uid, to_bytes, to_str +from localstack.utils.strings import get_random_hex, short_uid, to_bytes, to_str from localstack.utils.sync import poll_condition from localstack.utils.urls import localstack_host @@ -338,41 +334,37 @@ def on_after_state_load(self): ) for esm in state.event_source_mappings.values(): - if config.LAMBDA_EVENT_SOURCE_MAPPING == "v2": - # Restores event source workers - function_arn = esm.get("FunctionArn") - - # TODO: How do we know the event source is up? - # A basic poll to see if the mapped Lambda function is active/failed - if not poll_condition( - lambda: get_function_version_from_arn(function_arn).config.state.state - in [State.Active, State.Failed], - timeout=10, - ): - LOG.warning( - "Creating ESM for Lambda that is not in running state: %s", - function_arn, - ) + # Restores event source workers + function_arn = esm.get("FunctionArn") + + # TODO: How do we know the event source is up? + # A basic poll to see if the mapped Lambda function is active/failed + if not poll_condition( + lambda: get_function_version_from_arn(function_arn).config.state.state + in [State.Active, State.Failed], + timeout=10, + ): + LOG.warning( + "Creating ESM for Lambda that is not in running state: %s", + function_arn, + ) - function_version = get_function_version_from_arn(function_arn) - function_role = function_version.config.role + function_version = get_function_version_from_arn(function_arn) + function_role = function_version.config.role - is_esm_enabled = esm.get("State", EsmState.DISABLED) not in ( - EsmState.DISABLED, - EsmState.DISABLING, - ) - esm_worker = EsmWorkerFactory( - esm, function_role, is_esm_enabled - ).get_esm_worker() - - # Note: a worker is created in the DISABLED state if not enabled - esm_worker.create() - # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race - # condition if we get a shutdown here and have a worker thread spawned but not accounted for? - self.esm_workers[esm_worker.uuid] = esm_worker - else: - # Restore event source listeners - EventSourceListener.start_listeners_for_asf(esm, self.lambda_service) + is_esm_enabled = esm.get("State", EsmState.DISABLED) not in ( + EsmState.DISABLED, + EsmState.DISABLING, + ) + esm_worker = EsmWorkerFactory( + esm, function_role, is_esm_enabled + ).get_esm_worker() + + # Note: a worker is created in the DISABLED state if not enabled + esm_worker.create() + # TODO: assigning the esm_worker to the dict only works after .create(). Could it cause a race + # condition if we get a shutdown here and have a worker thread spawned but not accounted for? + self.esm_workers[esm_worker.uuid] = esm_worker def on_after_init(self): self.router.register_routes() @@ -1865,12 +1857,7 @@ def create_event_source_mapping( context: RequestContext, request: CreateEventSourceMappingRequest, ) -> EventSourceMappingConfiguration: - if config.LAMBDA_EVENT_SOURCE_MAPPING == "v2": - event_source_configuration = self.create_event_source_mapping_v2(context, request) - else: - event_source_configuration = self.create_event_source_mapping_v1(context, request) - - return event_source_configuration + return self.create_event_source_mapping_v2(context, request) def create_event_source_mapping_v2( self, @@ -1896,82 +1883,6 @@ def create_event_source_mapping_v2( esm_worker.create() return esm_config - def create_event_source_mapping_v1( - self, context: RequestContext, request: CreateEventSourceMappingRequest - ) -> EventSourceMappingConfiguration: - fn_arn, function_name, state = self.validate_event_source_mapping(context, request) - # create new event source mappings - new_uuid = long_uid() - # defaults etc. vary depending on type of event source - # TODO: find a better abstraction to create these - params = request.copy() - params.pop("FunctionName") - if not (service_type := self.get_source_type_from_request(request)): - raise InvalidParameterValueException("Unrecognized event source.") - - batch_size = api_utils.validate_and_set_batch_size(service_type, request.get("BatchSize")) - params["FunctionArn"] = fn_arn - params["BatchSize"] = batch_size - params["UUID"] = new_uuid - params["MaximumBatchingWindowInSeconds"] = request.get("MaximumBatchingWindowInSeconds", 0) - params["LastModified"] = api_utils.generate_lambda_date() - params["FunctionResponseTypes"] = request.get("FunctionResponseTypes", []) - params["State"] = "Enabled" - if "sqs" in service_type: - # can be "sqs" or "sqs-fifo" - params["StateTransitionReason"] = "USER_INITIATED" - if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0: - raise InvalidParameterValueException( - "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10", - Type="User", - ) - elif service_type == "kafka": - params["StartingPosition"] = request.get("StartingPosition", "TRIM_HORIZON") - params["StateTransitionReason"] = "USER_INITIATED" - params["LastProcessingResult"] = "No records processed" - consumer_group = {"ConsumerGroupId": new_uuid} - if request.get("SelfManagedEventSource"): - params["SelfManagedKafkaEventSourceConfig"] = request.get( - "SelfManagedKafkaEventSourceConfig", consumer_group - ) - else: - params["AmazonManagedKafkaEventSourceConfig"] = request.get( - "AmazonManagedKafkaEventSourceConfig", consumer_group - ) - - params["MaximumBatchingWindowInSeconds"] = request.get("MaximumBatchingWindowInSeconds") - # Not available for kafka - del params["FunctionResponseTypes"] - else: - # afaik every other one currently is a stream - params["StateTransitionReason"] = "User action" - params["MaximumRetryAttempts"] = request.get("MaximumRetryAttempts", -1) - params["ParallelizationFactor"] = request.get("ParallelizationFactor", 1) - params["BisectBatchOnFunctionError"] = request.get("BisectBatchOnFunctionError", False) - params["LastProcessingResult"] = "No records processed" - params["MaximumRecordAgeInSeconds"] = request.get("MaximumRecordAgeInSeconds", -1) - params["TumblingWindowInSeconds"] = request.get("TumblingWindowInSeconds", 0) - destination_config = request.get("DestinationConfig", {"OnFailure": {}}) - self._validate_destination_config(state, function_name, destination_config) - params["DestinationConfig"] = destination_config - # TODO: create domain models and map accordingly - esm_config = EventSourceMappingConfiguration(**params) - filter_criteria = esm_config.get("FilterCriteria") - if filter_criteria: - # validate for valid json - if not validate_filters(filter_criteria): - raise InvalidParameterValueException( - "Invalid filter pattern definition.", Type="User" - ) # TODO: verify - state.event_source_mappings[new_uuid] = esm_config - # TODO: evaluate after temp migration - EventSourceListener.start_listeners_for_asf(request, self.lambda_service) - event_source_configuration = { - **esm_config, - "State": "Creating", - } # TODO: should be set asynchronously - return event_source_configuration - def validate_event_source_mapping(self, context, request): # TODO: test whether stream ARNs are valid sources for Pipes or ESM or whether only DynamoDB table ARNs work is_create_esm_request = context.operation.name == self.create_event_source_mapping.operation @@ -2086,75 +1997,7 @@ def update_event_source_mapping( context: RequestContext, request: UpdateEventSourceMappingRequest, ) -> EventSourceMappingConfiguration: - if config.LAMBDA_EVENT_SOURCE_MAPPING == "v2": - return self.update_event_source_mapping_v2(context, request) - else: - return self.update_event_source_mapping_v1(context, request) - - def update_event_source_mapping_v1( - self, - context: RequestContext, - request: UpdateEventSourceMappingRequest, - ) -> EventSourceMappingConfiguration: - state = lambda_stores[context.account_id][context.region] - request_data = {**request} - uuid = request_data.pop("UUID", None) - if not uuid: - raise ResourceNotFoundException( - "The resource you requested does not exist.", Type="User" - ) - old_event_source_mapping = state.event_source_mappings.get(uuid) - if old_event_source_mapping is None: - raise ResourceNotFoundException( - "The resource you requested does not exist.", Type="User" - ) # TODO: test? - - # remove the FunctionName field - function_name_or_arn = request_data.pop("FunctionName", None) - - # normalize values to overwrite - event_source_mapping = old_event_source_mapping | request_data - - if not (service_type := self.get_source_type_from_request(event_source_mapping)): - # TODO validate this error - raise InvalidParameterValueException("Unrecognized event source.") - - if function_name_or_arn: - # if the FunctionName field was present, update the FunctionArn of the EventSourceMapping - account_id, region = api_utils.get_account_and_region(function_name_or_arn, context) - function_name, qualifier = api_utils.get_name_and_qualifier( - function_name_or_arn, None, context - ) - event_source_mapping["FunctionArn"] = api_utils.qualified_lambda_arn( - function_name, qualifier, account_id, region - ) - - temp_params = {} # values only set for the returned response, not saved internally (e.g. transient state) - - if request.get("Enabled") is not None: - if request["Enabled"]: - esm_state = "Enabled" - else: - esm_state = "Disabled" - temp_params["State"] = "Disabling" # TODO: make this properly async - event_source_mapping["State"] = esm_state - - if request.get("BatchSize"): - batch_size = api_utils.validate_and_set_batch_size(service_type, request["BatchSize"]) - if batch_size > 10 and request.get("MaximumBatchingWindowInSeconds", 0) == 0: - raise InvalidParameterValueException( - "Maximum batch window in seconds must be greater than 0 if maximum batch size is greater than 10", - Type="User", - ) - if request.get("DestinationConfig"): - destination_config = request["DestinationConfig"] - self._validate_destination_config( - state, event_source_mapping["FunctionName"], destination_config - ) - event_source_mapping["DestinationConfig"] = destination_config - event_source_mapping["LastProcessingResult"] = "OK" - state.event_source_mappings[uuid] = event_source_mapping - return {**event_source_mapping, **temp_params} + return self.update_event_source_mapping_v2(context, request) def update_event_source_mapping_v2( self, @@ -2237,14 +2080,10 @@ def delete_event_source_mapping( "The resource you requested does not exist.", Type="User" ) esm = state.event_source_mappings[uuid] - if config.LAMBDA_EVENT_SOURCE_MAPPING == "v2": - # TODO: add proper locking - esm_worker = self.esm_workers[uuid] - # Asynchronous delete in v2 - esm_worker.delete() - else: - # Synchronous delete in v1 (AWS parity issue) - del state.event_source_mappings[uuid] + # TODO: add proper locking + esm_worker = self.esm_workers[uuid] + # Asynchronous delete in v2 + esm_worker.delete() return {**esm, "State": EsmState.DELETING} def get_event_source_mapping( @@ -2256,10 +2095,9 @@ def get_event_source_mapping( raise ResourceNotFoundException( "The resource you requested does not exist.", Type="User" ) - if config.LAMBDA_EVENT_SOURCE_MAPPING == "v2": - esm_worker = self.esm_workers[uuid] - event_source_mapping["State"] = esm_worker.current_state - event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason + esm_worker = self.esm_workers[uuid] + event_source_mapping["State"] = esm_worker.current_state + event_source_mapping["StateTransitionReason"] = esm_worker.state_transition_reason return event_source_mapping def list_event_source_mappings( diff --git a/tests/aws/services/cloudformation/resources/test_lambda.py b/tests/aws/services/cloudformation/resources/test_lambda.py index 88c6d07029a36..b242123ad2944 100644 --- a/tests/aws/services/cloudformation/resources/test_lambda.py +++ b/tests/aws/services/cloudformation/resources/test_lambda.py @@ -17,7 +17,6 @@ from localstack.utils.strings import to_bytes, to_str from localstack.utils.sync import retry, wait_until from localstack.utils.testutil import create_lambda_archive, get_lambda_log_events -from tests.aws.services.lambda_.event_source_mapping.utils import is_v2_esm # TODO: Fix for new Lambda provider (was tested for old provider) @@ -724,10 +723,10 @@ def wait_logs(): def test_lambda_dynamodb_event_filter( self, dynamodb_wait_for_table_active, deploy_cfn_template, aws_client, monkeypatch ): - if is_v2_esm(): - # Filtering is broken with the Python rule engine for this specific case (exists:false) in ESM v2 - # -> using java engine as workaround. - monkeypatch.setattr(config, "EVENT_RULE_ENGINE", "java") + # TODO: Filtering is broken with the Python rule engine for this specific case (exists:false) in ESM v2 + # -> using java engine as workaround for now. + monkeypatch.setattr(config, "EVENT_RULE_ENGINE", "java") + function_name = f"test-fn-{short_uid()}" table_name = f"ddb-tbl-{short_uid()}" diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py index 0a8cf65781225..7c9bd622616cf 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py @@ -22,8 +22,6 @@ from localstack.utils.testutil import check_expected_lambda_log_events_length, get_lambda_log_events from tests.aws.services.lambda_.event_source_mapping.utils import ( create_lambda_with_response, - is_old_esm, - is_v2_esm, ) from tests.aws.services.lambda_.functions import FUNCTIONS_PATH from tests.aws.services.lambda_.test_lambda import ( @@ -66,12 +64,6 @@ def _get_lambda_logs_event(function_name, expected_num_events, retries=30): @markers.snapshot.skip_snapshot_verify( - condition=is_old_esm, - # Only match EventSourceMappingArn field if ESM v2 and above - paths=["$..EventSourceMappingArn"], -) -@markers.snapshot.skip_snapshot_verify( - condition=is_v2_esm, paths=[ # Lifecycle updates not yet implemented in ESM v2 "$..LastProcessingResult", @@ -359,15 +351,6 @@ def test_deletion_event_source_mapping_with_dynamodb( "$..TableDescription.TableId", ], ) - @markers.snapshot.skip_snapshot_verify( - condition=is_old_esm, - paths=[ - "$..Message.DDBStreamBatchInfo.approximateArrivalOfFirstRecord", # Incorrect timestamp formatting - "$..Message.DDBStreamBatchInfo.approximateArrivalOfLastRecord", - "$..Message.requestContext.approximateInvokeCount", - "$..Message.responseContext.statusCode", - ], - ) @markers.aws.validated def test_dynamodb_event_source_mapping_with_sns_on_failure_destination_config( self, @@ -483,15 +466,6 @@ def verify_failure_received(): "$..TableDescription.TableId", ], ) - @markers.snapshot.skip_snapshot_verify( - condition=is_old_esm, - paths=[ - "$..Messages..Body.DDBStreamBatchInfo.approximateArrivalOfFirstRecord", # Incorrect timestamp formatting - "$..Messages..Body.DDBStreamBatchInfo.approximateArrivalOfLastRecord", - "$..Messages..Body.requestContext.approximateInvokeCount", - "$..Messages..Body.responseContext.statusCode", - ], - ) @markers.aws.validated def test_dynamodb_event_source_mapping_with_on_failure_destination_config( self, @@ -698,7 +672,7 @@ def test_dynamodb_event_filter( Test assumption: The first item MUST always match the filter and the second item CAN match the filter. => This enables two-step testing (i.e., snapshots between inserts) but is unreliable and should be revised. """ - if is_v2_esm() and filter == {"eventName": ["INSERT"], "eventSource": ["aws:dynamodb"]}: + if filter == {"eventName": ["INSERT"], "eventSource": ["aws:dynamodb"]}: pytest.skip(reason="content_multiple_filters failing for ESM v2 (needs investigation)") function_name = f"lambda_func-{short_uid()}" table_name = f"test-table-{short_uid()}" @@ -782,9 +756,7 @@ def assert_events_called_multiple(): snapshot.match("lambda-multiple-log-events", events) @markers.aws.validated - @pytest.mark.skipif( - is_v2_esm(), reason="Invalid filter detection not yet implemented in ESM v2" - ) + @pytest.mark.skip(reason="Invalid filter detection not yet implemented in ESM v2") @pytest.mark.parametrize( "filter", [ @@ -837,10 +809,6 @@ def test_dynamodb_invalid_event_filter( snapshot.match("exception_event_source_creation", expected.value.response) expected.match(InvalidParameterValueException.code) - @pytest.mark.skipif( - is_old_esm(), - reason="ReportBatchItemFailures: Partial batch failure handling not implemented in ESM v1", - ) @markers.snapshot.skip_snapshot_verify( paths=[ "$..TableDescription.TableId", @@ -951,9 +919,6 @@ def verify_failure_received(): snapshot.match("dynamodb_records", {"Records": sorted_records}) - @pytest.mark.skipif( - is_old_esm(), reason="ReportBatchItemFailures: Total batch fails not implemented in ESM v1" - ) @pytest.mark.parametrize( "set_lambda_response", [ diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py index 4119c4f4cb836..a919b75532479 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py @@ -21,8 +21,6 @@ from localstack.utils.sync import ShortCircuitWaitException, retry, wait_until from tests.aws.services.lambda_.event_source_mapping.utils import ( create_lambda_with_response, - is_old_esm, - is_v2_esm, ) from tests.aws.services.lambda_.functions import FUNCTIONS_PATH, lambda_integration from tests.aws.services.lambda_.test_lambda import ( @@ -74,11 +72,6 @@ def _snapshot_transformers(snapshot): "$..TumblingWindowInSeconds", ], ) -@markers.snapshot.skip_snapshot_verify( - condition=is_old_esm, - # Only match EventSourceMappingArn field if ESM v2 and above - paths=["$..EventSourceMappingArn"], -) class TestKinesisSource: @markers.aws.validated def test_create_kinesis_event_source_mapping( @@ -469,15 +462,6 @@ def _send_and_receive_messages(): "$..Messages..Body.KinesisBatchInfo.streamArn", ], ) - @markers.snapshot.skip_snapshot_verify( - condition=is_old_esm, - paths=[ - "$..Messages..Body.KinesisBatchInfo.approximateArrivalOfFirstRecord", - "$..Messages..Body.KinesisBatchInfo.approximateArrivalOfLastRecord", - "$..Messages..Body.requestContext.approximateInvokeCount", - "$..Messages..Body.responseContext.statusCode", - ], - ) @markers.aws.validated def test_kinesis_event_source_mapping_with_on_failure_destination_config( self, @@ -558,10 +542,6 @@ def verify_failure_received(): sqs_payload = retry(verify_failure_received, retries=15, sleep=sleep, sleep_before=5) snapshot.match("sqs_payload", sqs_payload) - @pytest.mark.skipif( - is_old_esm(), - reason="ReportBatchItemFailures: Partial batch failure handling not implemented in ESM v1", - ) @markers.snapshot.skip_snapshot_verify( paths=[ # FIXME Conflict between shardId and AWS account number when transforming @@ -663,9 +643,6 @@ def verify_failure_received(): snapshot.match("kinesis_records", {"Records": sorted_records}) @markers.aws.validated - @pytest.mark.skipif( - is_old_esm(), reason="ReportBatchItemFailures: Total batch fails not implemented in ESM v1" - ) @markers.snapshot.skip_snapshot_verify( paths=[ "$..Messages..Body.KinesisBatchInfo.shardId", @@ -775,15 +752,6 @@ def verify_failure_received(): "$..Message.KinesisBatchInfo.streamArn", ], ) - @markers.snapshot.skip_snapshot_verify( - condition=is_old_esm, - paths=[ - "$..Message.KinesisBatchInfo.approximateArrivalOfFirstRecord", - "$..Message.KinesisBatchInfo.approximateArrivalOfLastRecord", - "$..Message.requestContext.approximateInvokeCount", - "$..Message.responseContext.statusCode", - ], - ) @markers.aws.validated def test_kinesis_event_source_mapping_with_sns_on_failure_destination_config( self, @@ -1049,17 +1017,11 @@ def _verify_invoke(): # https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-kinesis class TestKinesisEventFiltering: @markers.snapshot.skip_snapshot_verify( - condition=is_v2_esm, paths=[ # Lifecycle updates not yet implemented in ESM v2 "$..LastProcessingResult", ], ) - @markers.snapshot.skip_snapshot_verify( - condition=is_old_esm, - # Only match EventSourceMappingArn field if ESM v2 and above - paths=["$..EventSourceMappingArn"], - ) @markers.snapshot.skip_snapshot_verify( paths=[ "$..Messages..Body.KinesisBatchInfo.shardId", diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py index dd9b07447a171..377f21ae2e551 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py @@ -13,10 +13,6 @@ from localstack.utils.strings import short_uid from localstack.utils.sync import retry from localstack.utils.testutil import check_expected_lambda_log_events_length, get_lambda_log_events -from tests.aws.services.lambda_.event_source_mapping.utils import ( - is_old_esm, - is_v2_esm, -) from tests.aws.services.lambda_.functions import FUNCTIONS_PATH, lambda_integration from tests.aws.services.lambda_.test_lambda import ( TEST_LAMBDA_PYTHON, @@ -71,11 +67,6 @@ def _snapshot_transformers(snapshot): "$..StateTransitionReason", ] ) -@markers.snapshot.skip_snapshot_verify( - condition=is_old_esm, - # Only match EventSourceMappingArn field if ESM v2 and above - paths=["$..EventSourceMappingArn"], -) @markers.aws.validated def test_failing_lambda_retries_after_visibility_timeout( create_lambda_function, @@ -442,11 +433,6 @@ def receive_dlq(): "$..create_event_source_mapping.ResponseMetadata", ] ) -@markers.snapshot.skip_snapshot_verify( - condition=is_old_esm, - # Only match EventSourceMappingArn field if ESM v2 and above - paths=["$..EventSourceMappingArn"], -) @markers.aws.validated def test_report_batch_item_failures( create_lambda_function, @@ -864,17 +850,6 @@ def test_report_batch_item_failures_empty_json_batch_succeeds( assert "Messages" not in dlq_response or dlq_response["Messages"] == [] -@markers.snapshot.skip_snapshot_verify( - condition=is_old_esm, - paths=[ - # hardcoded extra field in old ESM - "$..LastProcessingResult", - # async update not implemented in old ESM - "$..State", - # Only match EventSourceMappingArn field if ESM v2 and above - "$..EventSourceMappingArn", - ], -) @markers.aws.validated def test_fifo_message_group_parallelism( aws_client, @@ -963,10 +938,6 @@ def test_fifo_message_group_parallelism( "$..Records..md5OfMessageAttributes", ], ) -@markers.snapshot.skip_snapshot_verify( - condition=is_old_esm, - paths=["$..EventSourceMappingArn"], -) class TestSQSEventSourceMapping: @markers.aws.validated def test_event_source_mapping_default_batch_size( @@ -1139,7 +1110,7 @@ def test_sqs_event_filter( aws_client, monkeypatch, ): - if is_v2_esm() and item_not_matching == "this is a test string": + if item_not_matching == "this is a test string": # String comparison is broken in the Python rule engine for this specific case in ESM v2, using java engine. monkeypatch.setattr(config, "EVENT_RULE_ENGINE", "java") function_name = f"lambda_func-{short_uid()}" @@ -1205,9 +1176,7 @@ def _check_lambda_logs(): rs = aws_client.sqs.receive_message(QueueUrl=queue_url_1) assert rs.get("Messages", []) == [] - @pytest.mark.skipif( - is_v2_esm(), reason="Invalid filter detection not yet implemented in ESM v2" - ) + @pytest.mark.skip(reason="Invalid filter detection not yet implemented in ESM v2") @markers.aws.validated @pytest.mark.parametrize( "invalid_filter", [None, "simple string", {"eventSource": "aws:sqs"}, {"eventSource": []}] diff --git a/tests/aws/services/lambda_/event_source_mapping/utils.py b/tests/aws/services/lambda_/event_source_mapping/utils.py index 2e2df4d3dd74f..c8bb04e7dd31c 100644 --- a/tests/aws/services/lambda_/event_source_mapping/utils.py +++ b/tests/aws/services/lambda_/event_source_mapping/utils.py @@ -1,6 +1,3 @@ -from localstack.config import LAMBDA_EVENT_SOURCE_MAPPING -from localstack.testing.aws.util import is_aws_cloud - _LAMBDA_WITH_RESPONSE = """ import json @@ -13,11 +10,3 @@ def handler(event, context): def create_lambda_with_response(response: str) -> str: """Creates a lambda with pre-defined response""" return _LAMBDA_WITH_RESPONSE.format(response=response) - - -def is_v2_esm(): - return LAMBDA_EVENT_SOURCE_MAPPING == "v2" and not is_aws_cloud() - - -def is_old_esm(): - return LAMBDA_EVENT_SOURCE_MAPPING == "v1" and not is_aws_cloud() diff --git a/tests/aws/services/lambda_/test_lambda_api.py b/tests/aws/services/lambda_/test_lambda_api.py index 8472cd3171f98..ff86f6b7be40e 100644 --- a/tests/aws/services/lambda_/test_lambda_api.py +++ b/tests/aws/services/lambda_/test_lambda_api.py @@ -58,7 +58,6 @@ from localstack.utils.strings import long_uid, short_uid, to_str from localstack.utils.sync import ShortCircuitWaitException, wait_until from localstack.utils.testutil import create_lambda_archive -from tests.aws.services.lambda_.event_source_mapping.utils import is_old_esm, is_v2_esm from tests.aws.services.lambda_.test_lambda import ( TEST_LAMBDA_JAVA_WITH_LIB, TEST_LAMBDA_NODEJS, @@ -5170,10 +5169,6 @@ def test_account_settings_total_code_size_config_update( ) -@markers.snapshot.skip_snapshot_verify( - condition=is_old_esm, - paths=["$..EventSourceMappingArn", "$..UUID", "$..FunctionArn"], -) class TestLambdaEventSourceMappings: @markers.aws.validated def test_event_source_mapping_exceptions(self, snapshot, aws_client): @@ -5376,7 +5371,7 @@ def test_create_event_source_validation( snapshot.match("error", response) @markers.aws.validated - @pytest.mark.skipif(is_v2_esm, reason="ESM v2 validation for Kafka poller only works with ext") + @pytest.mark.skip(reason="ESM v2 validation for Kafka poller only works with ext") def test_create_event_source_self_managed( self, create_lambda_function, diff --git a/tests/unit/services/lambda_/test_lambda_utils.py b/tests/unit/services/lambda_/test_lambda_utils.py index 59e401d4ab0c6..4276d3b180fa1 100644 --- a/tests/unit/services/lambda_/test_lambda_utils.py +++ b/tests/unit/services/lambda_/test_lambda_utils.py @@ -1,7 +1,4 @@ -import json - from localstack.aws.api.lambda_ import Runtime -from localstack.services.lambda_.event_source_listeners.utils import filter_stream_records from localstack.services.lambda_.lambda_utils import format_name_to_path, get_handler_file_from_name @@ -38,113 +35,3 @@ def test_get_handler_file_from_name(self): assert "main" == get_handler_file_from_name("main", Runtime.go1_x) assert "../handler.py" == get_handler_file_from_name("../handler.execute") assert "bootstrap" == get_handler_file_from_name("", Runtime.provided) - - -class TestFilterStreamRecords: - """ - https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html - - Test filtering logic for supported syntax - """ - - records = [ - { - "partitionKey": "1", - "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", - "data": { - "City": "Seattle", - "State": "WA", - "Temperature": 46, - "Month": "December", - "Population": None, - "Flag": "", - }, - "approximateArrivalTimestamp": 1545084650.987, - "encryptionType": "NONE", - } - ] - - def test_match_metadata(self): - filters = [{"Filters": [{"Pattern": json.dumps({"partitionKey": ["1"]})}]}] - assert self.records == filter_stream_records(self.records, filters) - - def test_match_data(self): - filters = [{"Filters": [{"Pattern": json.dumps({"data": {"State": ["WA"]}})}]}] - - assert self.records == filter_stream_records(self.records, filters) - - def test_match_multiple(self): - filters = [ - { - "Filters": [ - {"Pattern": json.dumps({"partitionKey": ["1"], "data": {"State": ["WA"]}})} - ] - } - ] - - assert self.records == filter_stream_records(self.records, filters) - - def test_match_exists(self): - filters = [{"Filters": [{"Pattern": json.dumps({"data": {"State": [{"exists": True}]}})}]}] - assert self.records == filter_stream_records(self.records, filters) - - def test_match_numeric_equals(self): - filters = [ - { - "Filters": [ - {"Pattern": json.dumps({"data": {"Temperature": [{"numeric": ["=", 46]}]}})} - ] - } - ] - assert self.records == filter_stream_records(self.records, filters) - - def test_match_numeric_range(self): - filters = [ - { - "Filters": [ - { - "Pattern": json.dumps( - {"data": {"Temperature": [{"numeric": [">", 40, "<", 50]}]}} - ) - } - ] - } - ] - assert self.records == filter_stream_records(self.records, filters) - - def test_match_prefix(self): - filters = [{"Filters": [{"Pattern": json.dumps({"data": {"City": [{"prefix": "Sea"}]}})}]}] - assert self.records == filter_stream_records(self.records, filters) - - def test_match_null(self): - filters = [{"Filters": [{"Pattern": json.dumps({"data": {"Population": [None]}})}]}] - assert self.records == filter_stream_records(self.records, filters) - - def test_match_empty(self): - filters = [{"Filters": [{"Pattern": json.dumps({"data": {"Flag": [""]}})}]}] - assert self.records == filter_stream_records(self.records, filters) - - def test_no_match_exists(self): - filters = [{"Filters": [{"Pattern": json.dumps({"data": {"Foo": [{"exists": True}]}})}]}] - assert [] == filter_stream_records(self.records, filters) - - def test_no_filters(self): - filters = [] - assert [] == filter_stream_records(self.records, filters) - - def test_no_match_partial(self): - filters = [ - { - "Filters": [ - {"Pattern": json.dumps({"partitionKey": ["2"], "data": {"City": ["Seattle"]}})} - ] - } - ] - - assert [] == filter_stream_records(self.records, filters) - - def test_no_match_exists_dict(self): - filters = [ - {"Filters": [{"Pattern": json.dumps({"data": {"Foo": {"S": [{"exists": True}]}}})}]} - ] - assert [] == filter_stream_records(self.records, filters)