From 8b364e03b465d170cc97d910fda6fa8a04c2f75c Mon Sep 17 00:00:00 2001 From: Zain Zafar Date: Fri, 8 Nov 2024 17:19:59 +0000 Subject: [PATCH 1/5] added EVENT_RULE_ENGINE flag based opt-in --- .../localstack/services/events/provider.py | 2 +- .../localstack/services/events/v1/provider.py | 3 ++- .../event_source_mapping/pollers/poller.py | 11 ++------ .../localstack/utils/event_matcher.py | 27 +++++++++++++++++++ 4 files changed, 32 insertions(+), 11 deletions(-) create mode 100644 localstack-core/localstack/utils/event_matcher.py diff --git a/localstack-core/localstack/services/events/provider.py b/localstack-core/localstack/services/events/provider.py index f05691ad31035..0de6f937b6883 100644 --- a/localstack-core/localstack/services/events/provider.py +++ b/localstack-core/localstack/services/events/provider.py @@ -92,7 +92,7 @@ from localstack.aws.api.events import Rule as ApiTypeRule from localstack.services.events.archive import ArchiveService, ArchiveServiceDict from localstack.services.events.event_bus import EventBusService, EventBusServiceDict -from localstack.services.events.event_ruler import matches_rule +from localstack.utils.event_matcher import matches_event as matches_rule from localstack.services.events.models import ( Archive, ArchiveDict, diff --git a/localstack-core/localstack/services/events/v1/provider.py b/localstack-core/localstack/services/events/v1/provider.py index 953795299bd5e..b40097b42b178 100644 --- a/localstack-core/localstack/services/events/v1/provider.py +++ b/localstack-core/localstack/services/events/v1/provider.py @@ -40,7 +40,8 @@ from localstack.constants import APPLICATION_AMZ_JSON_1_1 from localstack.http import route from localstack.services.edge import ROUTER -from localstack.services.events.event_ruler import matches_rule +from localstack.utils.event_matcher import matches_event as matches_rule + from localstack.services.events.models import ( InvalidEventPatternException as InternalInvalidEventPatternException, ) diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py index 590dbd663b387..4ba7293c6d7c5 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py @@ -7,10 +7,8 @@ from localstack import config from localstack.aws.api.pipes import PipeStateReason -from localstack.services.events.event_ruler import matches_rule +from localstack.utils.event_matcher import matches_event as utils_matches_event -# TODO remove when we switch to Java rule engine -from localstack.services.events.v1.utils import matches_event from localstack.services.lambda_.event_source_mapping.event_processor import EventProcessor from localstack.services.lambda_.event_source_mapping.noops_event_processor import ( NoOpsEventProcessor, @@ -112,12 +110,7 @@ def extra_metadata(self) -> dict: def _matches_event(event_pattern: dict, event: dict) -> bool: - if config.EVENT_RULE_ENGINE == "java": - event_str = json.dumps(event) - event_pattern_str = json.dumps(event_pattern) - return matches_rule(event_str, event_pattern_str) - else: - return matches_event(event_pattern, event) + return utils_matches_event(event_pattern, event) def has_batch_item_failures( diff --git a/localstack-core/localstack/utils/event_matcher.py b/localstack-core/localstack/utils/event_matcher.py new file mode 100644 index 0000000000000..5ccecc3d43d0d --- /dev/null +++ b/localstack-core/localstack/utils/event_matcher.py @@ -0,0 +1,27 @@ +# localstack/utils/event_matcher.py +import json +from typing import Dict, Union +from localstack import config +from localstack.services.events.event_ruler import matches_rule +from localstack.services.events.v1.utils import matches_event as python_matches_event + +def matches_event(event_pattern: Union[Dict, str], event: Union[Dict, str]) -> bool: + """ + Match events based on configured rule engine. + Note: Different services handle patterns/events differently: + - EventBridge uses strings + - ESM and Pipes use dicts + """ + if config.EVENT_RULE_ENGINE == "java": + # If inputs are already strings (EventBridge), use directly + if isinstance(event, str) and isinstance(event_pattern, str): + return matches_rule(event, event_pattern) + # Convert dicts (ESM/Pipes) to strings for Java engine + event_str = event if isinstance(event, str) else json.dumps(event) + pattern_str = event_pattern if isinstance(event_pattern, str) else json.dumps(event_pattern) + return matches_rule(event_str, pattern_str) + + # Python implementation needs dicts + event_dict = json.loads(event) if isinstance(event, str) else event + pattern_dict = json.loads(event_pattern) if isinstance(event_pattern, str) else event_pattern + return python_matches_event(pattern_dict, event_dict) \ No newline at end of file From 491ea2a78aaacf73646a2f5d905c676c60491947 Mon Sep 17 00:00:00 2001 From: Zain Zafar Date: Sat, 9 Nov 2024 16:14:17 +0000 Subject: [PATCH 2/5] added unit test and refactored --- localstack | 1 + .../event_source_mapping/pollers/poller.py | 8 +- .../localstack/utils/event_matcher.py | 21 ++- tests/unit/utils/test_event_matcher.py | 121 ++++++++++++++++++ 4 files changed, 143 insertions(+), 8 deletions(-) create mode 160000 localstack create mode 100644 tests/unit/utils/test_event_matcher.py diff --git a/localstack b/localstack new file mode 160000 index 0000000000000..d08dfc5ac6725 --- /dev/null +++ b/localstack @@ -0,0 +1 @@ +Subproject commit d08dfc5ac6725b13f4bbeee0a38f1a4ef3baac93 diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py index 4ba7293c6d7c5..4f18241e80c1c 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py @@ -7,7 +7,7 @@ from localstack import config from localstack.aws.api.pipes import PipeStateReason -from localstack.utils.event_matcher import matches_event as utils_matches_event +from localstack.utils.event_matcher import matches_event from localstack.services.lambda_.event_source_mapping.event_processor import EventProcessor from localstack.services.lambda_.event_source_mapping.noops_event_processor import ( @@ -87,7 +87,7 @@ def filter_events(self, events: list[dict]) -> list[dict]: filtered_events = [] for event in events: # TODO: add try/catch with default discard and error log for extra resilience - if any(_matches_event(pattern, event) for pattern in self.filter_patterns): + if any(matches_event(pattern, event) for pattern in self.filter_patterns): filtered_events.append(event) return filtered_events @@ -109,10 +109,6 @@ def extra_metadata(self) -> dict: return {} -def _matches_event(event_pattern: dict, event: dict) -> bool: - return utils_matches_event(event_pattern, event) - - def has_batch_item_failures( result: dict | str | None, valid_item_ids: set[str] | None = None ) -> bool: diff --git a/localstack-core/localstack/utils/event_matcher.py b/localstack-core/localstack/utils/event_matcher.py index 5ccecc3d43d0d..b2f309a9254d8 100644 --- a/localstack-core/localstack/utils/event_matcher.py +++ b/localstack-core/localstack/utils/event_matcher.py @@ -1,16 +1,33 @@ # localstack/utils/event_matcher.py import json -from typing import Dict, Union +from typing import Any, Dict, Union from localstack import config from localstack.services.events.event_ruler import matches_rule from localstack.services.events.v1.utils import matches_event as python_matches_event -def matches_event(event_pattern: Union[Dict, str], event: Union[Dict, str]) -> bool: +def matches_event(event_pattern: Union[Dict[str, Any], str], event: Union[Dict[str, Any], str]) -> bool: """ Match events based on configured rule engine. + Note: Different services handle patterns/events differently: - EventBridge uses strings - ESM and Pipes use dicts + + Args: + event_pattern: Event pattern (str for EventBridge, dict for ESM/Pipes) + event: Event to match against pattern (str for EventBridge, dict for ESM/Pipes) + + Returns: + bool: True if event matches pattern, False otherwise + + Examples: + # EventBridge (string-based): + >>> pattern = '{"source": ["aws.ec2"]}' + >>> event = '{"source": "aws.ec2"}' + + # ESM/Pipes (dict-based): + >>> pattern = {"source": ["aws.ec2"]} + >>> event = {"source": "aws.ec2"} """ if config.EVENT_RULE_ENGINE == "java": # If inputs are already strings (EventBridge), use directly diff --git a/tests/unit/utils/test_event_matcher.py b/tests/unit/utils/test_event_matcher.py new file mode 100644 index 0000000000000..26f84f16a73fb --- /dev/null +++ b/tests/unit/utils/test_event_matcher.py @@ -0,0 +1,121 @@ +import json +import pytest +from localstack import config +from localstack.utils.event_matcher import matches_event + + +EVENT_PATTERN_DICT = { + "source": ["aws.ec2"], + "detail-type": ["EC2 Instance State-change Notification"] +} +EVENT_DICT = { + "source": "aws.ec2", + "detail-type": "EC2 Instance State-change Notification", + "detail": {"state": "running"} +} +EVENT_PATTERN_STR = json.dumps(EVENT_PATTERN_DICT) +EVENT_STR = json.dumps(EVENT_DICT) + + +@pytest.fixture +def event_rule_engine(monkeypatch): + """Fixture to control EVENT_RULE_ENGINE config""" + + def _set_engine(engine: str): + monkeypatch.setattr(config, "EVENT_RULE_ENGINE", engine) + + return _set_engine + + +def test_matches_event_with_java_engine_strings(event_rule_engine): + """Test Java engine with string inputs (EventBridge case)""" + event_rule_engine("java") + assert matches_event(EVENT_PATTERN_STR, EVENT_STR) == True + + +def test_matches_event_with_java_engine_dicts(event_rule_engine): + """Test Java engine with dict inputs (ESM/Pipes case)""" + event_rule_engine("java") + assert matches_event(EVENT_PATTERN_DICT, EVENT_DICT) == True + + +def test_matches_event_with_python_engine_strings(event_rule_engine): + """Test Python engine with string inputs""" + event_rule_engine("python") + assert matches_event(EVENT_PATTERN_STR, EVENT_STR) == True + + +def test_matches_event_with_python_engine_dicts(event_rule_engine): + """Test Python engine with dict inputs""" + event_rule_engine("python") + assert matches_event(EVENT_PATTERN_DICT, EVENT_DICT) == True + + +def test_matches_event_mixed_inputs(event_rule_engine): + """Test with mixed string/dict inputs""" + event_rule_engine("java") + assert matches_event(EVENT_PATTERN_STR, EVENT_DICT) == True + assert matches_event(EVENT_PATTERN_DICT, EVENT_STR) == True + + +def test_matches_event_non_matching_pattern(): + """Test with non-matching pattern""" + non_matching_pattern = { + "source": ["aws.s3"], + "detail-type": ["S3 Event"] + } + assert matches_event(non_matching_pattern, EVENT_DICT) == False + + +def test_matches_event_invalid_json(): + """Test with invalid JSON strings""" + with pytest.raises(json.JSONDecodeError): + matches_event("{invalid-json}", EVENT_STR) + + +def test_matches_event_missing_fields(): + """Test with missing required fields""" + incomplete_event = {"source": "aws.ec2"} + assert matches_event(EVENT_PATTERN_DICT, incomplete_event) == False + + +def test_matches_event_pattern_matching(): + """Test various pattern matching scenarios based on AWS examples""" + test_cases = [ + # Exact matching + ( + {"source": ["aws.ec2"], "detail-type": ["EC2 Instance State-change Notification"]}, + {"source": "aws.ec2", "detail-type": "EC2 Instance State-change Notification"}, + True + ), + # Prefix matching in detail field + ( + {"source": ["aws.ec2"], "detail": {"state": [{"prefix": "run"}]}}, + {"source": "aws.ec2", "detail": {"state": "running"}}, + True + ), + # Multiple possible values + ( + {"source": ["aws.ec2"], "detail": {"state": ["pending", "running"]}}, + {"source": "aws.ec2", "detail": {"state": "running"}}, + True + ), + # Anything-but matching + ( + {"source": ["aws.ec2"], "detail": {"state": [{"anything-but": "terminated"}]}}, + {"source": "aws.ec2", "detail": {"state": "running"}}, + True + ), + ] + + for pattern, event, expected in test_cases: + assert matches_event(pattern, event) == expected + + +def test_matches_event_case_sensitivity(): + """Test case sensitivity in matching""" + case_different_event = { + "source": "AWS.ec2", + "detail-type": "EC2 Instance State-Change Notification" + } + assert matches_event(EVENT_PATTERN_DICT, case_different_event) == False From b9b4dc9bb7a34e74384c16f05597d9cf7b1c14d2 Mon Sep 17 00:00:00 2001 From: Zain Zafar Date: Wed, 13 Nov 2024 15:24:32 +0000 Subject: [PATCH 3/5] addressed the comments; docs refrences and refactoring --- localstack | 1 - .../localstack/services/events/provider.py | 2 +- .../localstack/services/events/v1/provider.py | 3 +- .../event_source_mapping/pollers/poller.py | 4 +- .../localstack/utils/event_matcher.py | 53 +++++++++++-------- tests/unit/utils/test_event_matcher.py | 47 ++++++++-------- 6 files changed, 59 insertions(+), 51 deletions(-) delete mode 160000 localstack diff --git a/localstack b/localstack deleted file mode 160000 index d08dfc5ac6725..0000000000000 --- a/localstack +++ /dev/null @@ -1 +0,0 @@ -Subproject commit d08dfc5ac6725b13f4bbeee0a38f1a4ef3baac93 diff --git a/localstack-core/localstack/services/events/provider.py b/localstack-core/localstack/services/events/provider.py index 0de6f937b6883..7f6eb508c922c 100644 --- a/localstack-core/localstack/services/events/provider.py +++ b/localstack-core/localstack/services/events/provider.py @@ -92,7 +92,6 @@ from localstack.aws.api.events import Rule as ApiTypeRule from localstack.services.events.archive import ArchiveService, ArchiveServiceDict from localstack.services.events.event_bus import EventBusService, EventBusServiceDict -from localstack.utils.event_matcher import matches_event as matches_rule from localstack.services.events.models import ( Archive, ArchiveDict, @@ -132,6 +131,7 @@ ) from localstack.services.plugins import ServiceLifecycleHook from localstack.utils.common import truncate +from localstack.utils.event_matcher import matches_event as matches_rule from localstack.utils.strings import long_uid from localstack.utils.time import TIMESTAMP_FORMAT_TZ, timestamp diff --git a/localstack-core/localstack/services/events/v1/provider.py b/localstack-core/localstack/services/events/v1/provider.py index b40097b42b178..21dd90e858a69 100644 --- a/localstack-core/localstack/services/events/v1/provider.py +++ b/localstack-core/localstack/services/events/v1/provider.py @@ -40,8 +40,6 @@ from localstack.constants import APPLICATION_AMZ_JSON_1_1 from localstack.http import route from localstack.services.edge import ROUTER -from localstack.utils.event_matcher import matches_event as matches_rule - from localstack.services.events.models import ( InvalidEventPatternException as InternalInvalidEventPatternException, ) @@ -55,6 +53,7 @@ from localstack.utils.aws.message_forwarding import send_event_to_target from localstack.utils.collections import pick_attributes from localstack.utils.common import TMP_FILES, mkdir, save_file, truncate +from localstack.utils.event_matcher import matches_event as matches_rule from localstack.utils.json import extract_jsonpath from localstack.utils.strings import long_uid, short_uid from localstack.utils.time import TIMESTAMP_FORMAT_TZ, timestamp diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py index 4f18241e80c1c..a4e068c08208f 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py @@ -5,16 +5,14 @@ from botocore.client import BaseClient -from localstack import config from localstack.aws.api.pipes import PipeStateReason -from localstack.utils.event_matcher import matches_event - from localstack.services.lambda_.event_source_mapping.event_processor import EventProcessor from localstack.services.lambda_.event_source_mapping.noops_event_processor import ( NoOpsEventProcessor, ) from localstack.services.lambda_.event_source_mapping.pipe_utils import get_internal_client from localstack.utils.aws.arns import parse_arn +from localstack.utils.event_matcher import matches_event class PipeStateReasonValues(PipeStateReason): diff --git a/localstack-core/localstack/utils/event_matcher.py b/localstack-core/localstack/utils/event_matcher.py index b2f309a9254d8..3a5e05135d9c4 100644 --- a/localstack-core/localstack/utils/event_matcher.py +++ b/localstack-core/localstack/utils/event_matcher.py @@ -1,44 +1,53 @@ -# localstack/utils/event_matcher.py import json -from typing import Any, Dict, Union +from typing import Any + from localstack import config from localstack.services.events.event_ruler import matches_rule from localstack.services.events.v1.utils import matches_event as python_matches_event -def matches_event(event_pattern: Union[Dict[str, Any], str], event: Union[Dict[str, Any], str]) -> bool: + +def matches_event(event_pattern: dict[str, Any] | str, event: dict[str, Any] | str) -> bool: """ Match events based on configured rule engine. - + Note: Different services handle patterns/events differently: - - EventBridge uses strings + - EventBridge uses strings - ESM and Pipes use dicts - + Args: event_pattern: Event pattern (str for EventBridge, dict for ESM/Pipes) event: Event to match against pattern (str for EventBridge, dict for ESM/Pipes) - + Returns: bool: True if event matches pattern, False otherwise - + Examples: # EventBridge (string-based): >>> pattern = '{"source": ["aws.ec2"]}' >>> event = '{"source": "aws.ec2"}' - + # ESM/Pipes (dict-based): >>> pattern = {"source": ["aws.ec2"]} >>> event = {"source": "aws.ec2"} + + References: + - EventBridge Patterns: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html + - EventBridge Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html + - Event Source Mappings: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html """ - if config.EVENT_RULE_ENGINE == "java": - # If inputs are already strings (EventBridge), use directly - if isinstance(event, str) and isinstance(event_pattern, str): - return matches_rule(event, event_pattern) - # Convert dicts (ESM/Pipes) to strings for Java engine - event_str = event if isinstance(event, str) else json.dumps(event) - pattern_str = event_pattern if isinstance(event_pattern, str) else json.dumps(event_pattern) - return matches_rule(event_str, pattern_str) - - # Python implementation needs dicts - event_dict = json.loads(event) if isinstance(event, str) else event - pattern_dict = json.loads(event_pattern) if isinstance(event_pattern, str) else event_pattern - return python_matches_event(pattern_dict, event_dict) \ No newline at end of file + if config.EVENT_RULE_ENGINE == "python": + # Python implementation needs dicts + event_dict = json.loads(event) if isinstance(event, str) else event + pattern_dict = ( + json.loads(event_pattern) if isinstance(event_pattern, str) else event_pattern + ) + return python_matches_event(pattern_dict, event_dict) + + # Java implementation (default) + # If inputs are already strings (EventBridge), use directly + if isinstance(event, str) and isinstance(event_pattern, str): + return matches_rule(event, event_pattern) + # Convert dicts (ESM/Pipes) to strings for Java engine + event_str = event if isinstance(event, str) else json.dumps(event) + pattern_str = event_pattern if isinstance(event_pattern, str) else json.dumps(event_pattern) + return matches_rule(event_str, pattern_str) diff --git a/tests/unit/utils/test_event_matcher.py b/tests/unit/utils/test_event_matcher.py index 26f84f16a73fb..043fe644b8305 100644 --- a/tests/unit/utils/test_event_matcher.py +++ b/tests/unit/utils/test_event_matcher.py @@ -1,17 +1,18 @@ import json + import pytest + from localstack import config from localstack.utils.event_matcher import matches_event - EVENT_PATTERN_DICT = { "source": ["aws.ec2"], - "detail-type": ["EC2 Instance State-change Notification"] + "detail-type": ["EC2 Instance State-change Notification"], } EVENT_DICT = { "source": "aws.ec2", "detail-type": "EC2 Instance State-change Notification", - "detail": {"state": "running"} + "detail": {"state": "running"}, } EVENT_PATTERN_STR = json.dumps(EVENT_PATTERN_DICT) EVENT_STR = json.dumps(EVENT_DICT) @@ -30,41 +31,38 @@ def _set_engine(engine: str): def test_matches_event_with_java_engine_strings(event_rule_engine): """Test Java engine with string inputs (EventBridge case)""" event_rule_engine("java") - assert matches_event(EVENT_PATTERN_STR, EVENT_STR) == True + assert matches_event(EVENT_PATTERN_STR, EVENT_STR) def test_matches_event_with_java_engine_dicts(event_rule_engine): """Test Java engine with dict inputs (ESM/Pipes case)""" event_rule_engine("java") - assert matches_event(EVENT_PATTERN_DICT, EVENT_DICT) == True + assert matches_event(EVENT_PATTERN_DICT, EVENT_DICT) def test_matches_event_with_python_engine_strings(event_rule_engine): """Test Python engine with string inputs""" event_rule_engine("python") - assert matches_event(EVENT_PATTERN_STR, EVENT_STR) == True + assert matches_event(EVENT_PATTERN_STR, EVENT_STR) def test_matches_event_with_python_engine_dicts(event_rule_engine): """Test Python engine with dict inputs""" event_rule_engine("python") - assert matches_event(EVENT_PATTERN_DICT, EVENT_DICT) == True + assert matches_event(EVENT_PATTERN_DICT, EVENT_DICT) def test_matches_event_mixed_inputs(event_rule_engine): """Test with mixed string/dict inputs""" event_rule_engine("java") - assert matches_event(EVENT_PATTERN_STR, EVENT_DICT) == True - assert matches_event(EVENT_PATTERN_DICT, EVENT_STR) == True + assert matches_event(EVENT_PATTERN_STR, EVENT_DICT) + assert matches_event(EVENT_PATTERN_DICT, EVENT_STR) def test_matches_event_non_matching_pattern(): """Test with non-matching pattern""" - non_matching_pattern = { - "source": ["aws.s3"], - "detail-type": ["S3 Event"] - } - assert matches_event(non_matching_pattern, EVENT_DICT) == False + non_matching_pattern = {"source": ["aws.s3"], "detail-type": ["S3 Event"]} + assert not matches_event(non_matching_pattern, EVENT_DICT) def test_matches_event_invalid_json(): @@ -76,35 +74,40 @@ def test_matches_event_invalid_json(): def test_matches_event_missing_fields(): """Test with missing required fields""" incomplete_event = {"source": "aws.ec2"} - assert matches_event(EVENT_PATTERN_DICT, incomplete_event) == False + assert not matches_event(EVENT_PATTERN_DICT, incomplete_event) def test_matches_event_pattern_matching(): - """Test various pattern matching scenarios based on AWS examples""" + """Test various pattern matching scenarios based on AWS examples + + Examples taken from: + - EventBridge: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns-content-based-filtering.html + - SNS Filtering: https://docs.aws.amazon.com/sns/latest/dg/sns-subscription-filter-policies.html + """ test_cases = [ # Exact matching ( {"source": ["aws.ec2"], "detail-type": ["EC2 Instance State-change Notification"]}, {"source": "aws.ec2", "detail-type": "EC2 Instance State-change Notification"}, - True + True, ), # Prefix matching in detail field ( {"source": ["aws.ec2"], "detail": {"state": [{"prefix": "run"}]}}, {"source": "aws.ec2", "detail": {"state": "running"}}, - True + True, ), # Multiple possible values ( {"source": ["aws.ec2"], "detail": {"state": ["pending", "running"]}}, {"source": "aws.ec2", "detail": {"state": "running"}}, - True + True, ), # Anything-but matching ( {"source": ["aws.ec2"], "detail": {"state": [{"anything-but": "terminated"}]}}, {"source": "aws.ec2", "detail": {"state": "running"}}, - True + True, ), ] @@ -116,6 +119,6 @@ def test_matches_event_case_sensitivity(): """Test case sensitivity in matching""" case_different_event = { "source": "AWS.ec2", - "detail-type": "EC2 Instance State-Change Notification" + "detail-type": "EC2 Instance State-Change Notification", } - assert matches_event(EVENT_PATTERN_DICT, case_different_event) == False + assert not matches_event(EVENT_PATTERN_DICT, case_different_event) From dec41a211515799cdbb86a0408d1356d3b8c2245 Mon Sep 17 00:00:00 2001 From: Zain Zafar Date: Thu, 14 Nov 2024 12:09:44 +0000 Subject: [PATCH 4/5] Trigger CI From fd34e228edaa88467d2fa6152d98d4a848877ac6 Mon Sep 17 00:00:00 2001 From: Zain Zafar Date: Thu, 14 Nov 2024 18:05:58 +0000 Subject: [PATCH 5/5] fixed failing test --- .../localstack/services/events/provider.py | 6 +- .../localstack/services/events/v1/provider.py | 54 +--- .../lambda_/event_source_listeners/utils.py | 236 ++++++++++++++++++ .../localstack/utils/event_matcher.py | 35 +-- .../events/test_archive_and_replay.py | 5 + tests/unit/utils/test_event_matcher.py | 2 +- 6 files changed, 266 insertions(+), 72 deletions(-) create mode 100644 localstack-core/localstack/services/lambda_/event_source_listeners/utils.py diff --git a/localstack-core/localstack/services/events/provider.py b/localstack-core/localstack/services/events/provider.py index 7f6eb508c922c..0ca9e58d35420 100644 --- a/localstack-core/localstack/services/events/provider.py +++ b/localstack-core/localstack/services/events/provider.py @@ -131,7 +131,7 @@ ) from localstack.services.plugins import ServiceLifecycleHook from localstack.utils.common import truncate -from localstack.utils.event_matcher import matches_event as matches_rule +from localstack.utils.event_matcher import matches_event from localstack.utils.strings import long_uid from localstack.utils.time import TIMESTAMP_FORMAT_TZ, timestamp @@ -489,7 +489,7 @@ def test_event_pattern( https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html """ try: - result = matches_rule(event, event_pattern) + result = matches_event(event_pattern, event) except InternalInvalidEventPatternException as e: raise InvalidEventPatternException(e.message) from e @@ -1396,7 +1396,7 @@ def _process_rules( ) -> None: event_pattern = rule.event_pattern event_str = to_json_str(event_formatted) - if matches_rule(event_str, event_pattern): + if matches_event(event_pattern, event_str): if not rule.targets: LOG.info( json.dumps( diff --git a/localstack-core/localstack/services/events/v1/provider.py b/localstack-core/localstack/services/events/v1/provider.py index 21dd90e858a69..75ce74e837c9b 100644 --- a/localstack-core/localstack/services/events/v1/provider.py +++ b/localstack-core/localstack/services/events/v1/provider.py @@ -24,7 +24,6 @@ EventBusNameOrArn, EventPattern, EventsApi, - InvalidEventPatternException, PutRuleResponse, PutTargetsResponse, RoleArn, @@ -40,12 +39,8 @@ from localstack.constants import APPLICATION_AMZ_JSON_1_1 from localstack.http import route from localstack.services.edge import ROUTER -from localstack.services.events.models import ( - InvalidEventPatternException as InternalInvalidEventPatternException, -) from localstack.services.events.scheduler import JobScheduler from localstack.services.events.v1.models import EventsStore, events_stores -from localstack.services.events.v1.utils import matches_event from localstack.services.moto import call_moto from localstack.services.plugins import ServiceLifecycleHook from localstack.utils.aws.arns import event_bus_arn, parse_arn @@ -53,7 +48,7 @@ from localstack.utils.aws.message_forwarding import send_event_to_target from localstack.utils.collections import pick_attributes from localstack.utils.common import TMP_FILES, mkdir, save_file, truncate -from localstack.utils.event_matcher import matches_event as matches_rule +from localstack.utils.event_matcher import matches_event from localstack.utils.json import extract_jsonpath from localstack.utils.strings import long_uid, short_uid from localstack.utils.time import TIMESTAMP_FORMAT_TZ, timestamp @@ -115,44 +110,7 @@ def test_event_pattern( """Test event pattern uses EventBridge event pattern matching: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html """ - if config.EVENT_RULE_ENGINE == "java": - try: - result = matches_rule(event, event_pattern) - except InternalInvalidEventPatternException as e: - raise InvalidEventPatternException(e.message) from e - else: - event_pattern_dict = json.loads(event_pattern) - event_dict = json.loads(event) - result = matches_event(event_pattern_dict, event_dict) - - # TODO: unify the different implementations below: - # event_pattern_dict = json.loads(event_pattern) - # event_dict = json.loads(event) - - # EventBridge: - # result = matches_event(event_pattern_dict, event_dict) - - # Lambda EventSourceMapping: - # from localstack.services.lambda_.event_source_listeners.utils import does_match_event - # - # result = does_match_event(event_pattern_dict, event_dict) - - # moto-ext EventBridge: - # from moto.events.models import EventPattern as EventPatternMoto - # - # event_pattern = EventPatternMoto.load(event_pattern) - # result = event_pattern.matches_event(event_dict) - - # SNS: The SNS rule engine seems to differ slightly, for example not allowing the wildcard pattern. - # from localstack.services.sns.publisher import SubscriptionFilter - # subscription_filter = SubscriptionFilter() - # result = subscription_filter._evaluate_nested_filter_policy_on_dict(event_pattern_dict, event_dict) - - # moto-ext SNS: - # from moto.sns.utils import FilterPolicyMatcher - # filter_policy_matcher = FilterPolicyMatcher(event_pattern_dict, "MessageBody") - # result = filter_policy_matcher._body_based_match(event_dict) - + result = matches_event(event_pattern, event) return TestEventPatternResponse(Result=result) @staticmethod @@ -430,13 +388,7 @@ def filter_event_based_on_event_format( return False if rule_information.event_pattern._pattern: event_pattern = rule_information.event_pattern._pattern - if config.EVENT_RULE_ENGINE == "java": - event_str = json.dumps(event) - event_pattern_str = json.dumps(event_pattern) - match_result = matches_rule(event_str, event_pattern_str) - else: - match_result = matches_event(event_pattern, event) - if not match_result: + if not matches_event(event_pattern, event): return False return True diff --git a/localstack-core/localstack/services/lambda_/event_source_listeners/utils.py b/localstack-core/localstack/services/lambda_/event_source_listeners/utils.py new file mode 100644 index 0000000000000..e298500f0b865 --- /dev/null +++ b/localstack-core/localstack/services/lambda_/event_source_listeners/utils.py @@ -0,0 +1,236 @@ +import json +import logging +import re + +from localstack import config +from localstack.aws.api.lambda_ import FilterCriteria +from localstack.utils.event_matcher import matches_event +from localstack.utils.strings import first_char_to_lower + +LOG = logging.getLogger(__name__) + + +class InvalidEventPatternException(Exception): + reason: str + + def __init__(self, reason=None, message=None) -> None: + self.reason = reason + self.message = message or f"Event pattern is not valid. Reason: {reason}" + + +def filter_stream_records(records, filters: list[FilterCriteria]): + filtered_records = [] + for record in records: + for filter in filters: + for rule in filter["Filters"]: + if config.EVENT_RULE_ENGINE == "java": + event_str = json.dumps(record) + event_pattern_str = rule["Pattern"] + match_result = matches_event(event_pattern_str, event_str) + else: + filter_pattern: dict[str, any] = json.loads(rule["Pattern"]) + match_result = does_match_event(filter_pattern, record) + if match_result: + filtered_records.append(record) + break + return filtered_records + + +def does_match_event(event_pattern: dict[str, any], event: dict[str, any]) -> bool: + """Decides whether an event pattern matches an event or not. + Returns True if the `event_pattern` matches the given `event` and False otherwise. + + Implements "Amazon EventBridge event patterns": + https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html + Used in different places: + * EventBridge: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html + * Lambda ESM: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html + * EventBridge Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html + * SNS: https://docs.aws.amazon.com/sns/latest/dg/sns-subscription-filter-policies.html + + Open source AWS rule engine: https://github.com/aws/event-ruler + """ + # TODO: test this conditional: https://coveralls.io/builds/66584026/source?filename=localstack%2Fservices%2Flambda_%2Fevent_source_listeners%2Futils.py#L25 + if not event_pattern: + return True + does_match_results = [] + for key, value in event_pattern.items(): + # check if rule exists in event + event_value = event.get(key) if isinstance(event, dict) else None + does_pattern_match = False + if event_value is not None: + # check if filter rule value is a list (leaf of rule tree) or a dict (recursively call function) + if isinstance(value, list): + if len(value) > 0: + if isinstance(value[0], (str, int)): + does_pattern_match = event_value in value + if isinstance(value[0], dict): + does_pattern_match = verify_dict_filter(event_value, value[0]) + else: + LOG.warning("Empty lambda filter: %s", key) + elif isinstance(value, dict): + does_pattern_match = does_match_event(value, event_value) + else: + # special case 'exists' + def _filter_rule_value_list(val): + if isinstance(val[0], dict): + return not val[0].get("exists", True) + elif val[0] is None: + # support null filter + return True + + def _filter_rule_value_dict(val): + for k, v in val.items(): + return ( + _filter_rule_value_list(val[k]) + if isinstance(val[k], list) + else _filter_rule_value_dict(val[k]) + ) + return True + + if isinstance(value, list) and len(value) > 0: + does_pattern_match = _filter_rule_value_list(value) + elif isinstance(value, dict): + # special case 'exists' for S type, e.g. {"S": [{"exists": false}]} + # https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.Lambda.Tutorial2.html + does_pattern_match = _filter_rule_value_dict(value) + + does_match_results.append(does_pattern_match) + return all(does_match_results) + + +def verify_dict_filter(record_value: any, dict_filter: dict[str, any]) -> bool: + # https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax + does_match_filter = False + for key, filter_value in dict_filter.items(): + if key == "anything-but": + does_match_filter = record_value not in filter_value + elif key == "numeric": + does_match_filter = handle_numeric_conditions(record_value, filter_value) + elif key == "exists": + does_match_filter = bool( + filter_value + ) # exists means that the key exists in the event record + elif key == "prefix": + if not isinstance(record_value, str): + LOG.warning("Record Value %s does not seem to be a valid string.", record_value) + does_match_filter = isinstance(record_value, str) and record_value.startswith( + str(filter_value) + ) + if does_match_filter: + return True + + return does_match_filter + + +def handle_numeric_conditions( + first_operand: int | float, conditions: list[str | int | float] +) -> bool: + """Implements numeric matching for a given list of conditions. + Example: { "numeric": [ ">", 0, "<=", 5 ] } + + Numeric matching works with values that are JSON numbers. + It is limited to values between -5.0e9 and +5.0e9 inclusive, with 15 digits of precision, + or six digits to the right of the decimal point. + https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns-content-based-filtering.html#filtering-numeric-matchinghttps://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns-content-based-filtering.html#filtering-numeric-matching + """ + # Invalid example for uneven list: { "numeric": [ ">", 0, "<" ] } + if len(conditions) % 2 > 0: + raise InvalidEventPatternException("Bad numeric range operator") + + if not isinstance(first_operand, (int, float)): + raise InvalidEventPatternException( + f"The value {first_operand} for the numeric comparison {conditions} is not a valid number" + ) + + for i in range(0, len(conditions), 2): + operator = conditions[i] + second_operand_str = conditions[i + 1] + try: + second_operand = float(second_operand_str) + except ValueError: + raise InvalidEventPatternException( + f"Could not convert filter value {second_operand_str} to a valid number" + ) from ValueError + + if operator == ">" and not (first_operand > second_operand): + return False + if operator == ">=" and not (first_operand >= second_operand): + return False + if operator == "=" and not (first_operand == second_operand): + return False + if operator == "<" and not (first_operand < second_operand): + return False + if operator == "<=" and not (first_operand <= second_operand): + return False + return True + + +def contains_list(filter: dict) -> bool: + if isinstance(filter, dict): + for key, value in filter.items(): + if isinstance(value, list) and len(value) > 0: + return True + return contains_list(value) + return False + + +def validate_filters(filter: FilterCriteria) -> bool: + # filter needs to be json serializeable + for rule in filter["Filters"]: + try: + if not (filter_pattern := json.loads(rule["Pattern"])): + return False + return contains_list(filter_pattern) + except json.JSONDecodeError: + return False + # needs to contain on what to filter (some list with citerias) + # https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax + + return True + + +def message_attributes_to_lower(message_attrs): + """Convert message attribute details (first characters) to lower case (e.g., stringValue, dataType).""" + message_attrs = message_attrs or {} + for _, attr in message_attrs.items(): + if not isinstance(attr, dict): + continue + for key, value in dict(attr).items(): + attr[first_char_to_lower(key)] = attr.pop(key) + return message_attrs + + +def event_source_arn_matches(mapped: str, searched: str) -> bool: + if not mapped: + return False + if not searched or mapped == searched: + return True + # Some types of ARNs can end with a path separated by slashes, for + # example the ARN of a DynamoDB stream is tableARN/stream/ID. It's + # a little counterintuitive that a more specific mapped ARN can + # match a less specific ARN on the event, but some integration tests + # rely on it for things like subscribing to a stream and matching an + # event labeled with the table ARN. + if re.match(r"^%s$" % searched, mapped): + return True + if mapped.startswith(searched): + suffix = mapped[len(searched) :] + return suffix[0] == "/" + return False + + +def has_data_filter_criteria(filters: list[FilterCriteria]) -> bool: + for filter in filters: + for rule in filter.get("Filters", []): + parsed_pattern = json.loads(rule["Pattern"]) + if "data" in parsed_pattern: + return True + return False + + +def has_data_filter_criteria_parsed(parsed_filters: list[dict]) -> bool: + for filter in parsed_filters: + if "data" in filter: + return True + return False diff --git a/localstack-core/localstack/utils/event_matcher.py b/localstack-core/localstack/utils/event_matcher.py index 3a5e05135d9c4..69bb39cac0b77 100644 --- a/localstack-core/localstack/utils/event_matcher.py +++ b/localstack-core/localstack/utils/event_matcher.py @@ -6,7 +6,7 @@ from localstack.services.events.v1.utils import matches_event as python_matches_event -def matches_event(event_pattern: dict[str, Any] | str, event: dict[str, Any] | str) -> bool: +def matches_event(event_pattern: dict[str, Any] | str | None, event: dict[str, Any] | str) -> bool: """ Match events based on configured rule engine. @@ -35,19 +35,20 @@ def matches_event(event_pattern: dict[str, Any] | str, event: dict[str, Any] | s - EventBridge Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html - Event Source Mappings: https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html """ - if config.EVENT_RULE_ENGINE == "python": - # Python implementation needs dicts - event_dict = json.loads(event) if isinstance(event, str) else event - pattern_dict = ( - json.loads(event_pattern) if isinstance(event_pattern, str) else event_pattern - ) - return python_matches_event(pattern_dict, event_dict) - - # Java implementation (default) - # If inputs are already strings (EventBridge), use directly - if isinstance(event, str) and isinstance(event_pattern, str): - return matches_rule(event, event_pattern) - # Convert dicts (ESM/Pipes) to strings for Java engine - event_str = event if isinstance(event, str) else json.dumps(event) - pattern_str = event_pattern if isinstance(event_pattern, str) else json.dumps(event_pattern) - return matches_rule(event_str, pattern_str) + if not event_pattern: + return True + + if config.EVENT_RULE_ENGINE == "java": + # If inputs are already strings (EventBridge), use directly + if isinstance(event, str) and isinstance(event_pattern, str): + return matches_rule(event, event_pattern) + # Convert dicts (ESM/Pipes) to strings for Java engine + event_str = event if isinstance(event, str) else json.dumps(event) + pattern_str = event_pattern if isinstance(event_pattern, str) else json.dumps(event_pattern) + return matches_rule(event_str, pattern_str) + + # Python implementation (default) + # Convert strings to dicts if necessary + event_dict = json.loads(event) if isinstance(event, str) else event + pattern_dict = json.loads(event_pattern) if isinstance(event_pattern, str) else event_pattern + return python_matches_event(pattern_dict, event_dict) diff --git a/tests/aws/services/events/test_archive_and_replay.py b/tests/aws/services/events/test_archive_and_replay.py index 9526f31233528..ba07546940db8 100644 --- a/tests/aws/services/events/test_archive_and_replay.py +++ b/tests/aws/services/events/test_archive_and_replay.py @@ -3,6 +3,7 @@ import pytest +from localstack import config from localstack.testing.pytest import markers from localstack.utils.strings import short_uid from localstack.utils.sync import retry @@ -362,6 +363,10 @@ def test_delete_archive_error_unknown_archive(self, aws_client, snapshot): class TestReplay: @markers.aws.validated @pytest.mark.skipif(is_old_provider(), reason="not supported by the old provider") + @pytest.mark.skipif( + condition=config.EVENT_RULE_ENGINE == "python", + reason="Not supported with Python-based rule engine", + ) @pytest.mark.parametrize("event_bus_type", ["default", "custom"]) @pytest.mark.skip_snapshot_verify(paths=["$..State"]) def test_start_list_describe_canceled_replay( diff --git a/tests/unit/utils/test_event_matcher.py b/tests/unit/utils/test_event_matcher.py index 043fe644b8305..3d727892c8ad1 100644 --- a/tests/unit/utils/test_event_matcher.py +++ b/tests/unit/utils/test_event_matcher.py @@ -49,7 +49,7 @@ def test_matches_event_with_python_engine_strings(event_rule_engine): def test_matches_event_with_python_engine_dicts(event_rule_engine): """Test Python engine with dict inputs""" event_rule_engine("python") - assert matches_event(EVENT_PATTERN_DICT, EVENT_DICT) + assert matches_event(EVENT_PATTERN_DICT, EVENT_STR) def test_matches_event_mixed_inputs(event_rule_engine):