diff --git a/localstack/config.py b/localstack/config.py index 1d5e9fe6e8a84..b319252cda15e 100644 --- a/localstack/config.py +++ b/localstack/config.py @@ -12,6 +12,17 @@ DEFAULT_SERVICE_PORTS, LOCALHOST, DEFAULT_PORT_WEB_UI, TRUE_STRINGS, FALSE_STRINGS, DEFAULT_LAMBDA_CONTAINER_REGISTRY, DEFAULT_PORT_EDGE) + +def is_env_true(env_var_name): + """ Whether the given environment variable has a truthy value. """ + return os.environ.get(env_var_name, '').lower().strip() in TRUE_STRINGS + + +def is_env_not_false(env_var_name): + """ Whether the given environment variable is empty or has a truthy value. """ + return os.environ.get(env_var_name, '').lower().strip() not in FALSE_STRINGS + + # java options to Lambda LAMBDA_JAVA_OPTS = os.environ.get('LAMBDA_JAVA_OPTS', '').strip() @@ -26,6 +37,13 @@ os.environ['DEFAULT_REGION'] = 'us-east-1' DEFAULT_REGION = os.environ['DEFAULT_REGION'] +# Whether or not to handle lambda event sources as synchronous invocations +SYNCHRONOUS_SNS_EVENTS = is_env_true('SYNCHRONOUS_SNS_EVENTS') +SYNCHRONOUS_SQS_EVENTS = is_env_true('SYNCHRONOUS_SQS_EVENTS') +SYNCHRONOUS_API_GATEWAY_EVENTS = is_env_not_false('SYNCHRONOUS_API_GATEWAY_EVENTS') +SYNCHRONOUS_KINESIS_EVENTS = is_env_not_false('SYNCHRONOUS_KINESIS_EVENTS') +SYNCHRONOUS_DYNAMODB_EVENTS = is_env_not_false('SYNCHRONOUS_DYNAMODB_EVENTS') + # randomly inject faults to Kinesis KINESIS_ERROR_PROBABILITY = float(os.environ.get('KINESIS_ERROR_PROBABILITY', '').strip() or 0.0) @@ -45,7 +63,7 @@ LOCALSTACK_HOSTNAME = os.environ.get('LOCALSTACK_HOSTNAME', '').strip() or HOSTNAME # whether to remotely copy the lambda or locally mount a volume -LAMBDA_REMOTE_DOCKER = os.environ.get('LAMBDA_REMOTE_DOCKER', '').lower().strip() in TRUE_STRINGS +LAMBDA_REMOTE_DOCKER = is_env_true('LAMBDA_REMOTE_DOCKER') # network that the docker lambda container will be joining LAMBDA_DOCKER_NETWORK = os.environ.get('LAMBDA_DOCKER_NETWORK', '').strip() @@ -83,7 +101,7 @@ DEBUG = os.environ.get('DEBUG', '').lower() in TRUE_STRINGS # whether to use SSL encryption for the services -USE_SSL = os.environ.get('USE_SSL', '').strip() in TRUE_STRINGS +USE_SSL = is_env_true('USE_SSL') # default encoding used to convert strings to byte arrays (mainly for Python 3 compatibility) DEFAULT_ENCODING = 'utf-8' @@ -117,7 +135,7 @@ EXTRA_CORS_EXPOSE_HEADERS = os.environ.get('EXTRA_CORS_EXPOSE_HEADERS', '').strip() # whether to disable publishing events to the API -DISABLE_EVENTS = os.environ.get('DISABLE_EVENTS') in TRUE_STRINGS +DISABLE_EVENTS = is_env_true('DISABLE_EVENTS') # Whether to skip downloading additional infrastructure components (e.g., custom Elasticsearch versions) SKIP_INFRA_DOWNLOADS = os.environ.get('SKIP_INFRA_DOWNLOADS', '').strip() @@ -172,7 +190,9 @@ def is_linux(): 'START_WEB', 'DOCKER_BRIDGE_IP', 'DEFAULT_REGION', 'LAMBDA_JAVA_OPTS', 'LOCALSTACK_API_KEY', 'LAMBDA_CONTAINER_REGISTRY', 'TEST_AWS_ACCOUNT_ID', 'DISABLE_EVENTS', 'EDGE_PORT', 'EDGE_PORT_HTTP', 'SKIP_INFRA_DOWNLOADS', 'STEPFUNCTIONS_LAMBDA_ENDPOINT', - 'WINDOWS_DOCKER_MOUNT_PREFIX', 'USE_HTTP2_SERVER'] + 'WINDOWS_DOCKER_MOUNT_PREFIX', 'USE_HTTP2_SERVER', + 'SYNCHRONOUS_API_GATEWAY_EVENTS', 'SYNCHRONOUS_KINESIS_EVENTS', + 'SYNCHRONOUS_SNS_EVENTS', 'SYNCHRONOUS_SQS_EVENTS', 'SYNCHRONOUS_DYNAMODB_EVENTS'] for key, value in six.iteritems(DEFAULT_SERVICE_PORTS): clean_key = key.upper().replace('-', '_') @@ -317,7 +337,7 @@ def external_service_url(service_key, host=None): # initialize config values populate_configs() -# set log level +# set log levels if DEBUG: logging.getLogger('').setLevel(logging.DEBUG) logging.getLogger('localstack').setLevel(logging.DEBUG) @@ -326,4 +346,4 @@ def external_service_url(service_key, host=None): BUNDLE_API_PROCESSES = True # whether to use a CPU/memory profiler when running the integration tests -USE_PROFILER = os.environ.get('USE_PROFILER', '').lower() in TRUE_STRINGS +USE_PROFILER = is_env_true('USE_PROFILER') diff --git a/localstack/services/awslambda/lambda_api.py b/localstack/services/awslambda/lambda_api.py index c0e45b5d34512..521ec51b351ed 100644 --- a/localstack/services/awslambda/lambda_api.py +++ b/localstack/services/awslambda/lambda_api.py @@ -45,7 +45,6 @@ from localstack.utils.analytics import event_publisher from localstack.utils.http_utils import parse_chunked_data from localstack.utils.aws.aws_models import LambdaFunction -from localstack.utils.cloudwatch.cloudwatch_util import cloudwatched APP_NAME = 'lambda_api' PATH_ROOT = '/2015-03-31' @@ -259,7 +258,8 @@ def process_apigateway_invocation(func_arn, path, payload, headers={}, 'requestContext': request_context, 'stageVariables': {} # TODO } - return run_lambda(event=event, context={}, func_arn=func_arn) + return run_lambda(event=event, context={}, func_arn=func_arn, + asynchronous=not config.SYNCHRONOUS_API_GATEWAY_EVENTS) except Exception as e: LOG.warning('Unable to run Lambda function on API Gateway message: %s %s' % (e, traceback.format_exc())) @@ -288,7 +288,7 @@ def process_sns_notification(func_arn, topic_arn, subscription_arn, message, } }] } - return run_lambda(event=event, context={}, func_arn=func_arn, asynchronous=True) + return run_lambda(event=event, context={}, func_arn=func_arn, asynchronous=not config.SYNCHRONOUS_SNS_EVENTS) def process_kinesis_records(records, stream_name): @@ -314,9 +314,7 @@ def chunks(lst, n): for rec in chunk ] } - - run_lambda(event=event, context={}, func_arn=arn) - + run_lambda(event=event, context={}, func_arn=arn, asynchronous=not config.SYNCHRONOUS_KINESIS_EVENTS) except Exception as e: LOG.warning('Unable to run Lambda function on Kinesis records: %s %s' % (e, traceback.format_exc())) @@ -481,7 +479,6 @@ def do_update_alias(arn, alias, version, description=None): return new_alias -@cloudwatched('lambda') def run_lambda(event, context, func_arn, version=None, suppress_output=False, asynchronous=False, callback=None): if suppress_output: stdout_ = sys.stdout diff --git a/localstack/services/awslambda/lambda_executors.py b/localstack/services/awslambda/lambda_executors.py index 7d4f97c2afccd..373bf79f1e676 100644 --- a/localstack/services/awslambda/lambda_executors.py +++ b/localstack/services/awslambda/lambda_executors.py @@ -18,7 +18,7 @@ to_str, run, cp_r, json_safe, get_free_tcp_port) from localstack.services.install import INSTALL_PATH_LOCALSTACK_FAT_JAR from localstack.utils.aws.dead_letter_queue import lambda_error_to_dead_letter_queue, sqs_error_to_dead_letter_queue -from localstack.utils.cloudwatch.cloudwatch_util import store_cloudwatch_logs +from localstack.utils.cloudwatch.cloudwatch_util import store_cloudwatch_logs, cloudwatched # constants LAMBDA_EXECUTOR_JAR = INSTALL_PATH_LOCALSTACK_FAT_JAR @@ -98,31 +98,36 @@ def __init__(self): def execute(self, func_arn, func_details, event, context=None, version=None, asynchronous=False, callback=None): def do_execute(*args): - # set the invocation time in milliseconds - invocation_time = int(time.time() * 1000) - # start the execution - raised_error = None - result = None - dlq_sent = None - try: - result = self._execute(func_arn, func_details, event, context, version) - except Exception as e: - raised_error = e - if asynchronous: - if get_from_event(event, 'eventSource') == EVENT_SOURCE_SQS: - sqs_queue_arn = get_from_event(event, 'eventSourceARN') - if sqs_queue_arn: - # event source is SQS, send event back to dead letter queue - dlq_sent = sqs_error_to_dead_letter_queue(sqs_queue_arn, event, e) - else: - # event source is not SQS, send back to lambda dead letter queue - lambda_error_to_dead_letter_queue(func_details, event, e) - raise e - finally: - self.function_invoke_times[func_arn] = invocation_time - callback and callback(result, func_arn, event, error=raised_error, dlq_sent=dlq_sent) - # return final result - return result + + @cloudwatched('lambda') + def _run(func_arn=None): + # set the invocation time in milliseconds + invocation_time = int(time.time() * 1000) + # start the execution + raised_error = None + result = None + dlq_sent = None + try: + result = self._execute(func_arn, func_details, event, context, version) + except Exception as e: + raised_error = e + if asynchronous: + if get_from_event(event, 'eventSource') == EVENT_SOURCE_SQS: + sqs_queue_arn = get_from_event(event, 'eventSourceARN') + if sqs_queue_arn: + # event source is SQS, send event back to dead letter queue + dlq_sent = sqs_error_to_dead_letter_queue(sqs_queue_arn, event, e) + else: + # event source is not SQS, send back to lambda dead letter queue + lambda_error_to_dead_letter_queue(func_details, event, e) + raise e + finally: + self.function_invoke_times[func_arn] = invocation_time + callback and callback(result, func_arn, event, error=raised_error, dlq_sent=dlq_sent) + # return final result + return result + + return _run(func_arn=func_arn) # Inform users about asynchronous mode of the lambda execution. if asynchronous: diff --git a/localstack/services/dynamodb/dynamodb_listener.py b/localstack/services/dynamodb/dynamodb_listener.py index 034dfa373038b..89d00a621cf7e 100644 --- a/localstack/services/dynamodb/dynamodb_listener.py +++ b/localstack/services/dynamodb/dynamodb_listener.py @@ -491,7 +491,8 @@ def forward_to_lambda(records): 'Records': [record] } for src in sources: - lambda_api.run_lambda(event=event, context={}, func_arn=src['FunctionArn']) + lambda_api.run_lambda(event=event, context={}, func_arn=src['FunctionArn'], + asynchronous=not config.SYNCHRONOUS_DYNAMODB_EVENTS) def forward_to_ddb_stream(records): diff --git a/tests/integration/test_events.py b/tests/integration/test_events.py index 7619ffd93a43f..5c009d1c05a72 100644 --- a/tests/integration/test_events.py +++ b/tests/integration/test_events.py @@ -1,11 +1,9 @@ # -*- coding: utf-8 -*- import os import json -import unittest import uuid - +import unittest from localstack import config - from localstack.services.awslambda.lambda_api import LAMBDA_RUNTIME_PYTHON36 from localstack.services.events.events_listener import EVENTS_TMP_DIR from localstack.services.generic_proxy import ProxyListener diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index bfd12aeda516c..5bfc731f45137 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -260,7 +260,7 @@ def process_records(records, shard_id): # put 1 item to stream that will trigger an error in the Lambda kinesis.put_record(Data='{"%s": 1}' % lambda_integration.MSG_BODY_RAISE_ERROR_FLAG, - PartitionKey='testIderror', StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME) + PartitionKey='testIdError', StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME) # create SNS topic, connect it to the Lambda, publish test messages num_events_sns = 3 @@ -302,16 +302,20 @@ def check_events(): retry(check_events, retries=9, sleep=3) # check cloudwatch notifications - num_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM) - # TODO: It seems that CloudWatch is currently reporting an incorrect number of - # invocations, namely the sum over *all* lambdas, not the single one we're asking for. - # Also, we need to bear in mind that Kinesis may perform batch updates, i.e., a single - # Lambda invocation may happen with a set of Kinesis records, hence we cannot simply - # add num_events_ddb to num_events_lambda above! - # self.assertEqual(num_invocations, 2 + num_events_lambda) - self.assertGreater(num_invocations, num_events_sns + num_events_sqs) - num_error_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM, 'Errors') - self.assertEqual(num_error_invocations, 1) + def check_cw_invocations(): + num_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM) + # TODO: It seems that CloudWatch is currently reporting an incorrect number of + # invocations, namely the sum over *all* lambdas, not the single one we're asking for. + # Also, we need to bear in mind that Kinesis may perform batch updates, i.e., a single + # Lambda invocation may happen with a set of Kinesis records, hence we cannot simply + # add num_events_ddb to num_events_lambda above! + # self.assertEqual(num_invocations, 2 + num_events_lambda) + self.assertGreater(num_invocations, num_events_sns + num_events_sqs) + num_error_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM, 'Errors') + self.assertEqual(num_error_invocations, 1) + + # Lambda invocations are running asynchronously, hence sleep some time here to wait for results + retry(check_cw_invocations, retries=5, sleep=2) # clean up testutil.delete_lambda_function(TEST_LAMBDA_NAME_STREAM) @@ -441,27 +445,49 @@ def check_events(): self.assertEqual(len(modifies), num_modify) self.assertEqual(len(removes), num_delete) + # assert that all inserts were received + for i, event in enumerate(inserts): self.assertNotIn('old_image', event) - self.assertEqual(inserts[i]['new_image'], {'id': 'testId%d' % i, 'data': 'foobar123'}) - - self.assertEqual(modifies[0]['old_image'], {'id': 'testId6', 'data': 'foobar123'}) - self.assertEqual(modifies[0]['new_image'], {'id': 'testId6', 'data': 'foobar123_updated1'}) - self.assertEqual(modifies[1]['old_image'], {'id': 'testId7', 'data': 'foobar123'}) - self.assertEqual(modifies[1]['new_image'], {'id': 'testId7', 'data': 'foobar123_updated1'}) - self.assertEqual(modifies[2]['old_image'], {'id': 'testId6', 'data': 'foobar123_updated1'}) - self.assertEqual(modifies[2]['new_image'], {'id': 'testId6', 'data': 'foobar123_updated2'}) - self.assertEqual(modifies[3]['old_image'], {'id': 'testId7', 'data': 'foobar123_updated1'}) - self.assertEqual(modifies[3]['new_image'], {'id': 'testId7', 'data': 'foobar123_updated2'}) - self.assertEqual(modifies[4]['old_image'], {'id': 'testId8', 'data': 'foobar123'}) - self.assertEqual(modifies[4]['new_image'], {'id': 'testId8', 'data': 'foobar123_updated2'}) + item_id = 'testId%d' % i + matching = [i for i in inserts if i['new_image']['id'] == item_id][0] + self.assertEqual(matching['new_image'], {'id': item_id, 'data': 'foobar123'}) + + # assert that all updates were received + + def assert_updates(expected_updates, modifies): + def found(update): + for modif in modifies: + if modif['old_image']['id'] == update['id']: + self.assertEqual(modif['old_image'], {'id': update['id'], 'data': update['old']}) + self.assertEqual(modif['new_image'], {'id': update['id'], 'data': update['new']}) + return True + for update in expected_updates: + self.assertTrue(found(update)) + + updates1 = [ + {'id': 'testId6', 'old': 'foobar123', 'new': 'foobar123_updated1'}, + {'id': 'testId7', 'old': 'foobar123', 'new': 'foobar123_updated1'} + ] + updates2 = [ + {'id': 'testId6', 'old': 'foobar123_updated1', 'new': 'foobar123_updated2'}, + {'id': 'testId7', 'old': 'foobar123_updated1', 'new': 'foobar123_updated2'}, + {'id': 'testId8', 'old': 'foobar123', 'new': 'foobar123_updated2'} + ] + + assert_updates(updates1, modifies[:2]) + assert_updates(updates2, modifies[2:]) + + # assert that all removes were received for i, event in enumerate(removes): - self.assertEqual(event['old_image'], {'id': 'testId%d' % i, 'data': 'foobar123'}) self.assertNotIn('new_image', event) + item_id = 'testId%d' % i + matching = [i for i in removes if i['old_image']['id'] == item_id][0] + self.assertEqual(matching['old_image'], {'id': item_id, 'data': 'foobar123'}) # this can take a long time in CI, make sure we give it enough time/retries - retry(check_events, retries=9, sleep=3) + retry(check_events, retries=9, sleep=4) # clean up testutil.delete_lambda_function(TEST_LAMBDA_NAME_DDB) @@ -488,10 +514,12 @@ def test_kinesis_lambda_forward_chain(self): data[lambda_integration.MSG_BODY_MESSAGE_TARGET] = 'kinesis:%s' % TEST_CHAIN_STREAM2_NAME kinesis.put_record(Data=to_bytes(json.dumps(data)), PartitionKey='testId', StreamName=TEST_CHAIN_STREAM1_NAME) + def check_results(): + all_objects = testutil.list_all_s3_objects() + testutil.assert_objects(test_data, all_objects) + # check results - time.sleep(5) - all_objects = testutil.list_all_s3_objects() - testutil.assert_objects(test_data, all_objects) + retry(check_results, retries=5, sleep=3) # clean up kinesis.delete_stream(StreamName=TEST_CHAIN_STREAM1_NAME)