10000 Make sync/async Lambda event sources configurable by whummer · Pull Request #2521 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

Make sync/async Lambda event sources configurable #2521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions localstack/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@
DEFAULT_SERVICE_PORTS, LOCALHOST, DEFAULT_PORT_WEB_UI, TRUE_STRINGS, FALSE_STRINGS,
DEFAULT_LAMBDA_CONTAINER_REGISTRY, DEFAULT_PORT_EDGE)


def is_env_true(env_var_name):
""" Whether the given environment variable has a truthy value. """
return os.environ.get(env_var_name, '').lower().strip() in TRUE_STRINGS


def is_env_not_false(env_var_name):
""" Whether the given environment variable is empty or has a truthy value. """
return os.environ.get(env_var_name, '').lower().strip() not in FALSE_STRINGS


# java options to Lambda
LAMBDA_JAVA_OPTS = os.environ.get('LAMBDA_JAVA_OPTS', '').strip()

Expand All @@ -26,6 +37,13 @@
os.environ['DEFAULT_REGION'] = 'us-east-1'
DEFAULT_REGION = os.environ['DEFAULT_REGION']

# Whether or not to handle lambda event sources as synchronous invocations
SYNCHRONOUS_SNS_EVENTS = is_env_true('SYNCHRONOUS_SNS_EVENTS')
SYNCHRONOUS_SQS_EVENTS = is_env_true('SYNCHRONOUS_SQS_EVENTS')
SYNCHRONOUS_API_GATEWAY_EVENTS = is_env_not_false('SYNCHRONOUS_API_GATEWAY_EVENTS')
SYNCHRONOUS_KINESIS_EVENTS = is_env_not_false('SYNCHRONOUS_KINESIS_EVENTS')
SYNCHRONOUS_DYNAMODB_EVENTS = is_env_not_false('SYNCHRONOUS_DYNAMODB_EVENTS')

# randomly inject faults to Kinesis
KINESIS_ERROR_PROBABILITY = float(os.environ.get('KINESIS_ERROR_PROBABILITY', '').strip() or 0.0)

Expand All @@ -45,7 +63,7 @@
LOCALSTACK_HOSTNAME = os.environ.get('LOCALSTACK_HOSTNAME', '').strip() or HOSTNAME

# whether to remotely copy the lambda or locally mount a volume
LAMBDA_REMOTE_DOCKER = os.environ.get('LAMBDA_REMOTE_DOCKER', '').lower().strip() in TRUE_STRINGS
LAMBDA_REMOTE_DOCKER = is_env_true('LAMBDA_REMOTE_DOCKER')

# network that the docker lambda container will be joining
LAMBDA_DOCKER_NETWORK = os.environ.get('LAMBDA_DOCKER_NETWORK', '').strip()
Expand Down Expand Up @@ -83,7 +101,7 @@
DEBUG = os.environ.get('DEBUG', '').lower() in TRUE_STRINGS

# whether to use SSL encryption for the services
USE_SSL = os.environ.get('USE_SSL', '').strip() in TRUE_STRINGS
USE_SSL = is_env_true('USE_SSL')

# default encoding used to convert strings to byte arrays (mainly for Python 3 compatibility)
DEFAULT_ENCODING = 'utf-8'
Expand Down Expand Up @@ -117,7 +135,7 @@
EXTRA_CORS_EXPOSE_HEADERS = os.environ.get('EXTRA_CORS_EXPOSE_HEADERS', '').strip()

# whether to disable publishing events to the API
DISABLE_EVENTS = os.environ.get('DISABLE_EVENTS') in TRUE_STRINGS
DISABLE_EVENTS = is_env_true('DISABLE_EVENTS')

# Whether to skip downloading additional infrastructure components (e.g., custom Elasticsearch versions)
SKIP_INFRA_DOWNLOADS = os.environ.get('SKIP_INFRA_DOWNLOADS', '').strip()
Expand Down Expand Up @@ -172,7 +190,9 @@ def is_linux():
'START_WEB', 'DOCKER_BRIDGE_IP', 'DEFAULT_REGION', 'LAMBDA_JAVA_OPTS', 'LOCALSTACK_API_KEY',
'LAMBDA_CONTAINER_REGISTRY', 'TEST_AWS_ACCOUNT_ID', 'DISABLE_EVENTS', 'EDGE_PORT',
'EDGE_PORT_HTTP', 'SKIP_INFRA_DOWNLOADS', 'STEPFUNCTIONS_LAMBDA_ENDPOINT',
'WINDOWS_DOCKER_MOUNT_PREFIX', 'USE_HTTP2_SERVER']
'WINDOWS_DOCKER_MOUNT_PREFIX', 'USE_HTTP2_SERVER',
'SYNCHRONOUS_API_GATEWAY_EVENTS', 'SYNCHRONOUS_KINESIS_EVENTS',
'SYNCHRONOUS_SNS_EVENTS', 'SYNCHRONOUS_SQS_EVENTS', 'SYNCHRONOUS_DYNAMODB_EVENTS']

for key, value in six.iteritems(DEFAULT_SERVICE_PORTS):
clean_key = key.upper().replace('-', '_')
Expand Down Expand Up @@ -317,7 +337,7 @@ def external_service_url(service_key, host=None):
# initialize config values
populate_configs()

# set log level
# set log levels
if DEBUG:
logging.getLogger('').setLevel(logging.DEBUG)
logging.getLogger('localstack').setLevel(logging.DEBUG)
Expand All @@ -326,4 +346,4 @@ def external_service_url(service_key, host=None):
BUNDLE_API_PROCESSES = True

# whether to use a CPU/memory profiler when running the integration tests
USE_PROFILER = os.environ.get('USE_PROFILER', '').lower() in TRUE_STRINGS
USE_PROFILER = is_env_true('USE_PROFILER')
11 changes: 4 additions & 7 deletions localstack/services/awslambda/lambda_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
from localstack.utils.analytics import event_publisher
from localstack.utils.http_utils import parse_chunked_data
from localstack.utils.aws.aws_models import LambdaFunction
from localstack.utils.cloudwatch.cloudwatch_util import cloudwatched

APP_NAME = 'lambda_api'
PATH_ROOT = '/2015-03-31'
Expand Down Expand Up @@ -259,7 +258,8 @@ def process_apigateway_invocation(func_arn, path, payload, headers={},
'requestContext': request_context,
'stageVariables': {} # TODO
}
return run_lambda(event=event, context={}, func_arn=func_arn)
return run_lambda(event=event, context={}, func_arn=func_arn,
asynchronous=not config.SYNCHRONOUS_API_GATEWAY_EVENTS)
except Exception as e:
LOG.warning('Unable to run Lambda function on API Gateway message: %s %s' % (e, traceback.format_exc()))

Expand Down Expand Up @@ -288,7 +288,7 @@ def process_sns_notification(func_arn, topic_arn, subscription_arn, message,
}
}]
}
return run_lambda(event=event, context={}, func_arn=func_arn, asynchronous=True)
return run_lambda(event=event, context={}, func_arn=func_arn, asynchronous=not config.SYNCHRONOUS_SNS_EVENTS)


def process_kinesis_records(records, stream_name):
Expand All @@ -314,9 +314,7 @@ def chunks(lst, n):
for rec in chunk
]
}

run_lambda(event=event, context={}, func_arn=arn)

run_lambda(event=event, context={}, func_arn=arn, asynchronous=not config.SYNCHRONOUS_KINESIS_EVENTS)
except Exception as e:
LOG.warning('Unable to run Lambda function on Kinesis records: %s %s' % (e, traceback.format_exc()))

Expand Down Expand Up @@ -481,7 +479,6 @@ def do_update_alias(arn, alias, version, description=None):
return new_alias


@cloudwatched('lambda')
def run_lambda(event, context, func_arn, version=None, suppress_output=False, asynchronous=False, callback=None):
if suppress_output:
stdout_ = sys.stdout
Expand Down
57 changes: 31 additions & 26 deletions localstack/services/awslambda/lambda_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
to_str, run, cp_r, json_safe, get_free_tcp_port)
from localstack.services.install import INSTALL_PATH_LOCALSTACK_FAT_JAR
from localstack.utils.aws.dead_letter_queue import lambda_error_to_dead_letter_queue, sqs_error_to_dead_letter_queue
from localstack.utils.cloudwatch.cloudwatch_util import store_cloudwatch_logs
from localstack.utils.cloudwatch.cloudwatch_util import store_cloudwatch_logs, cloudwatched

# constants
LAMBDA_EXECUTOR_JAR = INSTALL_PATH_LOCALSTACK_FAT_JAR
Expand Down Expand Up @@ -98,31 +98,36 @@ def __init__(self):
def execute(self, func_arn, func_details, event, context=None, version=None,
asynchronous=False, callback=None):
def do_execute(*args):
# set the invocation time in milliseconds
invocation_time = int(time.time() * 1000)
# start the execution
raised_error = None
result = None
dlq_sent = None
try:
result = self._execute(func_arn, func_details, event, context, version)
except Exception as e:
raised_error = e
if asynchronous:
if get_from_event(event, 'eventSource') == EVENT 72A1 _SOURCE_SQS:
sqs_queue_arn = get_from_event(event, 'eventSourceARN')
if sqs_queue_arn:
# event source is SQS, send event back to dead letter queue
dlq_sent = sqs_error_to_dead_letter_queue(sqs_queue_arn, event, e)
else:
# event source is not SQS, send back to lambda dead letter queue
lambda_error_to_dead_letter_queue(func_details, event, e)
raise e
finally:
self.function_invoke_times[func_arn] = invocation_time
callback and callback(result, func_arn, event, error=raised_error, dlq_sent=dlq_sent)
# return final result
return result

@cloudwatched('lambda')
def _run(func_arn=None):
# set the invocation time in milliseconds
invocation_time = int(time.time() * 1000)
# start the execution
raised_error = None
result = None
dlq_sent = None
try:
result = self._execute(func_arn, func_details, event, context, version)
except Exception as e:
raised_error = e
if asynchronous:
if get_from_event(event, 'eventSource') == EVENT_SOURCE_SQS:
sqs_queue_arn = get_from_event(event, 'eventSourceARN')
if sqs_queue_arn:
# event source is SQS, send event back to dead letter queue
dlq_sent = sqs_error_to_dead_letter_queue(sqs_queue_arn, event, e)
else:
# event source is not SQS, send back to lambda dead letter queue
lambda_error_to_dead_letter_queue(func_details, event, e)
raise e
finally:
self.function_invoke_times[func_arn] = invocation_time
callback and callback(result, func_arn, event, error=raised_error, dlq_sent=dlq_sent)
# return final result
return result

return _run(func_arn=func_arn)

# Inform users about asynchronous mode of the lambda execution.
if asynchronous:
Expand Down
3 changes: 2 additions & 1 deletion localstack/services/dynamodb/dynamodb_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,8 @@ def forward_to_lambda(records):
'Records': [record]
}
for src in sources:
lambda_api.run_lambda(event=event, context={}, func_arn=src['FunctionArn'])
lambda_api.run_lambda(event=event, context={}, func_arn=src['FunctionArn'],
asynchronous=not config.SYNCHRONOUS_DYNAMODB_EVENTS)


def forward_to_ddb_stream(records):
Expand Down
4 changes: 1 addition & 3 deletions tests/integration/test_events.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
# -*- coding: utf-8 -*-
import os
import json
import unittest
import uuid

import unittest
from localstack import config

from localstack.services.awslambda.lambda_api import LAMBDA_RUNTIME_PYTHON36
from localstack.services.events.events_listener import EVENTS_TMP_DIR
from localstack.services.generic_proxy import ProxyListener
Expand Down
84 changes: 56 additions & 28 deletions tests/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def process_records(records, shard_id):

# put 1 item to stream that will trigger an error in the Lambda
kinesis.put_record(Data='{"%s": 1}' % lambda_integration.MSG_BODY_RAISE_ERROR_FLAG,
PartitionKey='testIderror', StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME)
PartitionKey='testIdError', StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME)

# create SNS topic, connect it to the Lambda, publish test messages
num_events_sns = 3
Expand Down Expand Up @@ -302,16 +302,20 @@ def check_events():
retry(check_events, retries=9, sleep=3)

# check cloudwatch notifications
num_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM)
# TODO: It seems that CloudWatch is currently reporting an incorrect number of
# invocations, namely the sum over *all* lambdas, not the single one we're asking for.
# Also, we need to bear in mind that Kinesis may perform batch updates, i.e., a single
# Lambda invocation may happen with a set of Kinesis records, hence we cannot simply
# add num_events_ddb to num_events_lambda above!
# self.assertEqual(num_invocations, 2 + num_events_lambda)
self.assertGreater(num_invocations, num_events_sns + num_events_sqs)
num_error_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM, 'Errors')
self.assertEqual(num_error_invocations, 1)
def check_cw_invocations():
num_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM)
# TODO: It seems that CloudWatch is currently reporting an incorrect number of
# invocations, namely the sum over *all* lambdas, not the single one we're asking for.
# Also, we need to bear in mind that Kinesis may perform batch updates, i.e., a single
# Lambda invocation may happen with a set of Kinesis records, hence we cannot simply
# add num_events_ddb to num_events_lambda above!
# self.assertEqual(num_invocations, 2 + num_events_lambda)
self.assertGreater(num_invocations, num_events_sns + num_events_sqs)
num_error_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM, 'Errors')
self.assertEqual(num_error_invocations, 1)

# Lambda invocations are running asynchronously, hence sleep some time here to wait for results
retry(check_cw_invocations, retries=5, sleep=2)

# clean up
testutil.delete_lambda_function(TEST_LAMBDA_NAME_STREAM)
Expand Down Expand Up @@ -441,27 +445,49 @@ def check_events():
self.assertEqual(len(modifies), num_modify)
self.assertEqual(len(removes), num_delete)

# assert that all inserts were received

for i, event in enumerate(inserts):
self.assertNotIn('old_image', event)
self.assertEqual(inserts[i]['new_image'], {'id': 'testId%d' % i, 'data': 'foobar123'})

self.assertEqual(modifies[0]['old_image'], {'id': 'testId6', 'data': 'foobar123'})
self.assertEqual(modifies[0]['new_image'], {'id': 'testId6', 'data': 'foobar123_updated1'})
self.assertEqual(modifies[1]['old_image'], {'id': 'testId7', 'data': 'foobar123'})
self.assertEqual(modifies[1]['new_image'], {'id': 'testId7', 'data': 'foobar123_updated1'})
self.assertEqual(modifies[2]['old_image'], {'id': 'testId6', 'data': 'foobar123_updated1'})
self.assertEqual(modifies[2]['new_image'], {'id': 'testId6', 'data': 'foobar123_updated2'})
self.assertEqual(modifies[3]['old_image'], {'id': 'testId7', 'data': 'foobar123_updated1'})
self.assertEqual(modifies[3]['new_image'], {'id': 'testId7', 'data': 'foobar123_updated2'})
self.assertEqual(modifies[4]['old_image'], {'id': 'testId8', 'data': 'foobar123'})
self.assertEqual(modifies[4]['new_image'], {'id': 'testId8', 'data': 'foobar123_updated2'})
item_id = 'testId%d' % i
matching = [i for i in inserts if i['new_image']['id'] == item_id][0]
self.assertEqual(matching['new_image'], {'id': item_id, 'data': 'foobar123'})

# assert that all updates were received

def assert_updates(expected_updates, modifies):
def found(update):
for modif in modifies:
if modif['old_image']['id'] == update['id']:
self.assertEqual(modif['old_image'], {'id': update['id'], 'data': update['old']})
self.assertEqual(modif['new_image'], {'id': update['id'], 'data': update['new']})
return True
for update in expected_updates:
self.assertTrue(found(update))

updates1 = [
{'id': 'testId6', 'old': 'foobar123', 'new': 'foobar123_updated1'},
{'id': 'testId7', 'old': 'foobar123', 'new': 'foobar123_updated1'}
]
updates2 = [
{'id': 'testId6', 'old': 'foobar123_updated1', 'new': 'foobar123_updated2'},
{'id': 'testId7', 'old': 'foobar123_updated1', 'new': 'foobar123_updated2'},
{'id': 'testId8', 'old': 'foobar123', 'new': 'foobar123_updated2'}
]

assert_updates(updates1, modifies[:2])
assert_updates(updates2, modifies[2:])

# assert that all removes were received

for i, event in enumerate(removes):
self.assertEqual(event['old_image'], {'id': 'testId%d' % i, 'data': 'foobar123'})
self.assertNotIn('new_image', event)
item_id = 'testId%d' % i
matching = [i for i in removes if i['old_image']['id'] == item_id][0]
self.assertEqual(matching['old_image'], {'id': item_id, 'data': 'foobar123'})

# this can take a long time in CI, make sure we give it enough time/retries
retry(check_events, retries=9, sleep=3)
retry(check_events, retries=9, sleep=4)

# clean up
testutil.delete_lambda_function(TEST_LAMBDA_NAME_DDB)
Expand All @@ -488,10 +514,12 @@ def test_kinesis_lambda_forward_chain(self):
data[lambda_integration.MSG_BODY_MESSAGE_TARGET] = 'kinesis:%s' % TEST_CHAIN_STREAM2_NAME
kinesis.put_record(Data=to_bytes(json.dumps(data)), PartitionKey='testId', StreamName=TEST_CHAIN_STREAM1_NAME)

def check_results():
all_objects = testutil.list_all_s3_objects()
testutil.assert_objects(test_data, all_objects)

# check results
time.sleep(5)
all_objects = testutil.list_all_s3_objects()
testutil.assert_objects(test_data, all_objects)
retry(check_results, retries=5, sleep=3)

# clean up
kinesis.delete_stream(StreamName=TEST_CHAIN_STREAM1_NAME)
Expand Down
0