10000 Make EventBridge v2 provider default (#11834) · localstack/localstack@0fc17fb · GitHub
[go: up one dir, main page]

Skip to content

Commit 0fc17fb

Browse files
Make EventBridge v2 provider default (#11834)
Co-authored-by: Dominik Schubert <dominik.schubert91@gmail.com>
1 parent bd86cb5 commit 0fc17fb

File tree

10 files changed

+117
-212
lines changed

10 files changed

+117
-212
lines changed

.circleci/config.yml

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,8 @@ jobs:
461461
- store_test_results:
462462
path: target/reports/
463463

464-
itest-events-v2-provider:
464+
# TODO: remove legacy v1 provider in future 4.x release
465+
itest-events-v1-provider:
465466
executor: ubuntu-machine-amd64
466467
working_directory: /tmp/workspace/repo
467468
environment:
@@ -474,14 +475,14 @@ jobs:
474475
- prepare-pytest-tinybird
475476
- prepare-account-region-randomization
476477
- run:
477-
name: Test EventBridge v2 provider
478+
name: Test EventBridge v1 provider
478479
environment:
479-
PROVIDER_OVERRIDE_EVENTS: "v2"
480+
PROVIDER_OVERRIDE_EVENTS: "v1"
480481
TEST_PATH: "tests/aws/services/events/"
481482
COVERAGE_ARGS: "-p"
482483
command: |
483-
COVERAGE_FILE="target/coverage/.coverage.eventsV2.${CIRCLE_NODE_INDEX}" \
484-
PYTEST_ARGS="${TINYBIRD_PYTEST_ARGS}${TESTSELECTION_PYTEST_ARGS}--reruns 3 --junitxml=target/reports/events_v2.xml -o junit_suite_name='events_v2'" \
484+
COVERAGE_FILE="target/coverage/.coverage.eventsV1.${CIRCLE_NODE_INDEX}" \
485+
PYTEST_ARGS="${TINYBIRD_PYTEST_ARGS}${TESTSELECTION_PYTEST_ARGS}--reruns 3 --junitxml=target/reports/events_v1.xml -o junit_suite_name='events_v1'" \
485486
make test-coverage
486487
- persist_to_workspace:
487488
root:
@@ -881,7 +882,7 @@ workflows:
881882
requires:
882883
- preflight
883884
- test-selection
884-
- itest-events-v2-provider:
885+
- itest-events-v1-provider:
885886
requires:
886887
- preflight
887888
- test-selection
@@ -948,7 +949,7 @@ workflows:
948949
- report:
949950
requires:
950951
- itest-cloudwatch-v1-provider
951-
- itest-events-v2-provider
952+
- itest-events-v1-provider
952953
- itest-ddb-v2-provider
953954
- acceptance-tests-amd64
954955
- acceptance-tests-arm64
@@ -962,7 +963,7 @@ workflows:
962963
only: master
963964
requires:
964965
- itest-cloudwatch-v1-provider
965-
- itest-events-v2-provider
966+
- itest-events-v1-provider
966967
- itest-ddb-v2-provider
967968
- acceptance-tests-amd64
968969
- acceptance-tests-arm64

localstack-core/localstack/deprecations.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,18 @@ def log_deprecation_warnings(deprecations: Optional[List[EnvVarDeprecation]] = N
350350
affected_deprecations = collect_affected_deprecations(deprecations)
351351
log_env_warning(affected_deprecations)
352352

353+
provider_override_events = os.environ.get("PROVIDER_OVERRIDE_EVENTS")
354+
if provider_override_events and provider_override_events in ["v1", "legacy"]:
355+
env_var_value = f"PROVIDER_OVERRIDE_EVENTS={provider_override_events}"
356+
deprecation_version = "4.0.0"
357+
deprecation_path = f"Remove {env_var_value} to use the new EventBridge implementation."
358+
LOG.warning(
359+
"%s is deprecated (since %s) and will be removed in upcoming releases of LocalStack! %s",
360+
env_var_value,
361+
deprecation_version,
362+
deprecation_path,
363+
)
364+
353365

354366
def deprecated_endpoint(
355367
endpoint: Callable, previous_path: str, deprecation_version: str, new_path: str

localstack-core/localstack/services/events/event_bus.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ def put_permission(
5858
condition: Condition,
5959
policy: str,
6060
):
61-
if policy and any([action, principal, statement_id, condition]):
62-
raise ValueError("Combination of policy with other arguments is not allowe F438 d")
61+
# TODO: cover via test
62+
# if policy and any([action, principal, statement_id, condition]):
63+
# raise ValueError("Combination of policy with other arguments is not allowed")
6364
self.event_bus.last_modified_time = datetime.now(timezone.utc)
6465
if policy: # policy document replaces all existing permissions
6566
policy = json.loads(policy)
@@ -104,8 +105,9 @@ def _parse_statement(
104105
resource_arn: Arn,
105106
condition: Condition,
106107
) -> Statement:
107-
if condition and principal != "*":
108-
raise ValueError("Condition can only be set when principal is '*'")
108+
# TODO: cover via test
109+
# if condition and principal != "*":
110+
# raise ValueError("Condition can only be set when principal is '*'")
109111
if principal != "*":
110112
principal = {"AWS": f"arn:{get_partition(self.event_bus.region)}:iam::{principal}:root"}
111113
statement = Statement(

localstack-core/localstack/services/events/scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ def convert_schedule_to_cron(schedule):
4040
if "day" in rate_unit:
4141
return f"0 0 */{rate_value} * *"
4242

43-
raise ValueError(f"Unable to parse events schedule expression: {schedule}")
43+
# TODO: cover via test
44+
# raise ValueError(f"Unable to parse events schedule expression: {schedule}")
4445

4546
return schedule
4647

localstack-core/localstack/services/events/target.py

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,9 @@ def _initialize_client(self) -> BaseClient:
203203
return client
204204

205205
def _validate_input_transformer(self, input_transformer: InputTransformer):
206-
if "InputTemplate" not in input_transformer:
207-
raise ValueError("InputTemplate is required for InputTransformer")
206+
# TODO: cover via test
207+
# if "InputTemplate" not in input_transformer:
208+
# raise ValueError("InputTemplate is required for InputTransformer")
208209
input_template = input_transformer["InputTemplate"]
209210
input_paths_map = input_transformer.get("InputPathsMap", {})
210211
placeholders = TRANSFORMER_PLACEHOLDER_PATTERN.findall(input_template)
@@ -338,8 +339,9 @@ def send_event(self, event):
338339

339340
def _validate_input(self, target: Target):
340341
super()._validate_input(target)
341-
if not collections.get_safe(target, "$.RoleArn"):
342-
raise ValueError("RoleArn is required for ApiGateway target")
342+
# TODO: cover via test
343+
# if not collections.get_safe(target, "$.RoleArn"):
344+
# raise ValueError("RoleArn is required for ApiGateway target")
343345

344346
def _get_predefined_template_replacements(self, event: Dict[str, Any]) -> Dict[str, Any]:
345347
"""Extracts predefined values from the event."""
@@ -365,10 +367,12 @@ def send_event(self, event):
365367
raise NotImplementedError("Batch target is not yet implemented")
366368

367369
def _validate_input(self, target: Target):
368-
if not collections.get_safe(target, "$.BatchParameters.JobDefinition"):
369-
raise ValueError("BatchParameters.JobDefinition is required for Batch target")
370-
if not collections.get_safe(target, "$.BatchParameters.JobName"):
371-
raise ValueError("BatchParameters.JobName is required for Batch target")
370+
# TODO: cover via test and fix (only required if we have BatchParameters)
371+
# if not collections.get_safe(target, "$.BatchParameters.JobDefinition"):
372+
# raise ValueError("BatchParameters.JobDefinition is required for Batch target")
373+
# if not collections.get_safe(target, "$.BatchParameters.JobName"):
374+
# raise ValueError("BatchParameters.JobName is required for Batch target")
375+
pass
372376

373377

374378
class ContainerTargetSender(TargetSender):
@@ -377,8 +381,9 @@ def send_event(self, event):
377381

378382
def _validate_input(self, target: Target):
379383
super()._validate_input(target)
380-
if not collections.get_safe(target, "$.EcsParameters.TaskDefinitionArn"):
381-
raise ValueError("EcsParameters.TaskDefinitionArn is required for ECS target")
384+
# TODO: cover via test
385+
# if not collections.get_safe(target, "$.EcsParameters.TaskDefinitionArn"):
386+
# raise ValueError("EcsParameters.TaskDefinitionArn is required for ECS target")
382387

383388

384389
class EventsTargetSender(TargetSender):
@@ -434,9 +439,13 @@ def send_event(self, event):
434439

435440
class KinesisTargetSender(TargetSender):
436441
def send_event(self, event):
437-
partition_key_path = self.target["KinesisParameters"]["PartitionKeyPath"]
442+
partition_key_path = collections.get_safe(
443+
self.target,
444+
"$.KinesisParameters.PartitionKeyPath",
445+
default_value="$.id",
446+
)
438447
stream_name = self.target["Arn"].split("/")[-1]
439-
partition_key = event.get(partition_key_path, event["id"])
448+
partition_key = collections.get_safe(event, partition_key_path, event["id"])
440449
self.client.put_record(
441450
StreamName=stream_name,
442451
Data=to_bytes(to_json_str(event)),
@@ -445,19 +454,19 @@ def send_event(self, event):
445454

446455
def _validate_input(self, target: Target):
447456
super()._validate_input(target)
448-
if not collections.get_safe(target, "$.RoleArn"):
449-
raise ValueError("RoleArn is required for Kinesis target")
450-
if not collections.get_safe(target, "$.KinesisParameters.PartitionKeyPath"):
451-
raise ValueError("KinesisParameters.PartitionKeyPath is required for Kinesis target")
457+
# TODO: cover via tests
458+
# if not collections.get_safe(target, "$.RoleArn"):
459+
# raise ValueError("RoleArn is required for Kinesis target")
460+
# if not collections.get_safe(target, "$.KinesisParameters.PartitionKeyPath"):
461+
# raise ValueError("KinesisParameters.PartitionKeyPath is required for Kinesis target")
452462

453463

454464
class LambdaTargetSender(TargetSender):
455465
def send_event(self, event):
456-
asynchronous = True # TODO clarify default behavior of AWS
457466
self.client.invoke(
458467
FunctionName=self.target["Arn"],
459468
Payload=to_bytes(to_json_str(event)),
460-
InvocationType="Event" if asynchronous else "RequestResponse",
469+
InvocationType="Event",
461470
)
462471

463472

@@ -484,8 +493,9 @@ def send_event(self, event):
484493

485494
def _validate_input(self, target: Target):
486495
super()._validate_input(target)
487-
if not collections.get_safe(target, "$.RedshiftDataParameters.Database"):
488-
raise ValueError("RedshiftDataParameters.Database is required for Redshift target")
496+
# TODO: cover via test
497+
# if not collections.get_safe(target, "$.RedshiftDataParameters.Database"):
498+
# raise ValueError("RedshiftDataParameters.Database is required for Redshift target")
489499

490500

491501
class SagemakerTargetSender(TargetSender):
@@ -521,8 +531,9 @@ def send_event(self, event):
521531

522532
def _validate_input(self, target: Target):
523533
super()._validate_input(target)
524-
if not collections.get_safe(target, "$.RoleArn"):
525-
raise ValueError("RoleArn is required for StepFunctions target")
534+
# TODO: cover via test
535+
# if not collections.get_safe(target, "$.RoleArn"):
536+
# raise ValueError("RoleArn is required for StepFunctions target")
526537

527538

528539
class SystemsManagerSender(TargetSender):
@@ -533,14 +544,15 @@ def send_event(self, event):
533544

534545
def _validate_input(self, target: Target):
535546
super()._validate_input(target)
536-
if not collections.get_safe(target, "$.RoleArn"):
537-
raise ValueError(
538-
"RoleArn is required for SystemManager target to invoke a EC2 run command"
539-
)
540-
if not collections.get_safe(target, "$.RunCommandParameters.RunCommandTargets"):
541-
raise ValueError(
542-
"RunCommandParameters.RunCommandTargets is required for Systems Manager target"
543-
)
547+
# TODO: cover via test
548+
# if not collections.get_safe(target, "$.RoleArn"):
549+
# raise ValueError(
550+
# "RoleArn is required for SystemManager target to invoke a EC2 run command"
551+
# )
552+
# if not collections.get_safe(target, "$.RunCommandParameters.RunCommandTargets"):
553+
# raise ValueError(
554+
# "RunCommandParameters.RunCommandTargets is required for Systems Manager target"
555+
# )
544556

545557

546558
class TargetSenderFactory:

localstack-core/localstack/services/providers.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -341,11 +341,18 @@ def ssm():
341341

342342
@aws_provider(api="events", name="default")
343343
def events():
344-
from localstack.services.events.v1.provider import EventsProvider
345-
from localstack.services.moto import MotoFallbackDispatcher
344+
from localstack.services.events.provider import EventsProvider
346345

347346
provider = EventsProvider()
348-
return Service.for_provider(provider, dispatch_table_factory=MotoFallbackDispatcher)
347+
return Service.for_provider(provider)
348+
349+
350+
@aws_provider(api="events", name="v2")
351+
def events_v2():
352+
from localstack.services.events.provider import EventsProvider
353+
354+
provider = EventsProvider()
355+
return Service.for_provider(provider)
349356

350357

351358
@aws_provider(api="events", name="v1")
@@ -357,12 +364,13 @@ def events_v1():
357364
return Service.for_provider(provider, dispatch_table_factory=MotoFallbackDispatcher)
358365

359366

360-
@aws_provider(api="events", name="v2")
361-
def events_v2():
362-
from localstack.services.events.provider import EventsProvider
367+
@aws_provider(api="events", name="legacy")
368+
def events_legacy():
369+
from localstack.services.events.v1.provider import EventsProvider
370+
from localstack.services.moto import MotoFallbackDispatcher
363371

364372
provider = EventsProvider()
365-
return Service.for_provider(provider)
373+
return Service.for_provider(provider, dispatch_table_factory=MotoFallbackDispatcher)
366374

367375

368376
@aws_provider()

tests/aws/services/events/helper_functions.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77

88

99
def is_v2_provider():
10-
return os.environ.get("PROVIDER_OVERRIDE_EVENTS") == "v2" and not is_aws_cloud()
10+
return (
11+
os.environ.get("PROVIDER_OVERRIDE_EVENTS", "") not in ("v1", "legacy")
12+
and not is_aws_cloud()
13+
)
1114

1215

1316
def is_old_provider():
14-
return (
15-
"PROVIDER_OVERRIDE_EVENTS" not in os.environ
16-
or os.environ.get("PROVIDER_OVERRIDE_EVENTS") != "v2"
17-
)
17+
return os.environ.get("PROVIDER_OVERRIDE_EVENTS", "") in ("v1", "legacy") and not is_aws_cloud()
1818

1919

2020
def events_time_string_to_timestamp(time_string: str) -> datetime:

0 commit comments

Comments
 (0)
0