10000 Fix logging issue with lambda asynchronous calls (#1467) · sharp-bits/localstack@52d492a · GitHub
[go: up one dir, main page]

Skip to content

Commit 52d492a

Browse files
authored
Fix logging issue with lambda asynchronous calls (localstack#1467)
1 parent fa016c1 commit 52d492a

File tree

3 files changed

+49
-47
lines changed

3 files changed

+49
-47
lines changed

localstack/services/awslambda/lambda_executors.py

Lines changed: 47 additions & 45 deletions
10000
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
from pipes import quote as cmd_quote
1414
from localstack import config
1515
from localstack.utils.aws import aws_stack
16-
from localstack.utils.common import run, TMP_FILES, short_uid, save_file, to_str, cp_r, CaptureOutput
16+
from localstack.utils.common import (
17+
CaptureOutput, FuncThread, TMP_FILES, short_uid, save_file, to_str, run, cp_r)
1718
from localstack.services.install import INSTALL_PATH_LOCALSTACK_FAT_JAR
1819

1920
# constants
@@ -55,19 +56,30 @@ def __init__(self):
5556
self.function_invoke_times = {}
5657

5758
def execute(self, func_arn, func_details, event, context=None, version=None, asynchronous=False):
58-
# set the invocation time in milliseconds
59-
invocation_time = int(time.time() * 1000)
60-
# start the execution
61-
try:
62-
result, log_output = self._execute(func_arn, func_details, event, context, version, asynchronous)
63-
finally:
64-
self.function_invoke_times[func_arn] = invocation_time
65-
# forward log output to cloudwatch logs
66-
self._store_logs(func_details, log_output, invocation_time)
67-
# return final result
68-
return result, log_output
6959

70-
def _execute(self, func_arn, func_details, event, context=None, version=None, asynchronous=False):
60+
def do_execute(*args):
61+
# set the invocation time in milliseconds
62+
invocation_time = int(time.time() * 1000)
63+
# start the execution
64+
try:
65+
result, log_output = self._execute(func_arn, func_details, event, context, version)
66+
finally:
67+
self.function_invoke_times[func_arn] = invocation_time
68+
# forward log output to cloudwatch logs
69+
self._store_logs(func_details, log_output, invocation_time)
70+
# return final result
71+
return result, log_output
72+
73+
# Inform users about asynchronous mode of the lambda execution.
74+
if asynchronous:
75+
LOG.debug('Lambda executed in Event (asynchronous) mode, no response from this '
76+
'function will be returned to caller')
77+
FuncThread(do_execute).start()
78+
return None, 'Lambda executed asynchronously.'
79+
80+
return do_execute()
81+
82+
def _execute(self, func_arn, func_details, event, context=None, version=None):
7183
""" This method must be overwritten by subclasses. """
7284
raise Exception('Not implemented.')
7385

@@ -115,26 +127,23 @@ def _store_logs(self, func_details, log_output, invocation_time):
115127
logEvents=log_events
116128
)
117129

118-
def run_lambda_executor(self, cmd, env_vars={}, asynchronous=False):
130+
def run_lambda_executor(self, cmd, env_vars={}):
119131
process = run(cmd, asynchronous=True, stderr=subprocess.PIPE, outfile=subprocess.PIPE, env_vars=env_vars)
120-
if asynchronous:
121-
result = '{"asynchronous": "%s"}' % asynchronous
122-
log_output = 'Lambda executed asynchronously'
123-
else:
124-
result, log_output = process.communicate()
125-
result = to_str(result).strip()
126-
log_output = to_str(log_output).strip()
127-
return_code = process.returncode
128-
# Note: The user's code may have been logging to stderr, in which case the logs
129-
# will be part of the "result" variable here. Hence, make sure that we extract
130-
# only the *last* line of "result" and consider anything above that as log output.
131-
if '\n' in result:
132-
additional_logs, _, result = result.rpartition('\n')
133-
log_output += '\n%s' % additional_logs
134-
135-
if return_code != 0:
136-
raise Exception('Lambda process returned error status code: %s. Output:\n%s' %
137-
(return_code, log_output))
132+
result, log_output = process.communicate()
133+
result = to_str(result).strip()
134+
log_output = to_str(log_output).strip()
135+
return_code = process.returncode
136+
# Note: The user's code may have been logging to stderr, in which case the logs
137+
# will be part of the "result" variable here. Hence, make sure that we extract
138+
# only the *last* line of "result" and consider anything above that as log output.
139+
if '\n' in result:
140+
additional_logs, _, result = result.rpartition('\n')
141+
log_output += '\n%s' % additional_logs
142+
143+
if return_code != 0:
144+
raise Exception('Lambda process returned error status code: %s. Output:\n%s' %
145+
(return_code, log_output))
146+
138147
return result, log_output
139148

140149

@@ -157,7 +166,7 @@ def _docker_cmd(self):
157166
""" Return the string to be used for running Docker commands. """
158167
return config.DOCKER_CMD
159168

160-
def _execute(self, func_arn, func_details, event, context=None, version=None, asynchronous=False):
169+
def _execute(self, func_arn, func_details, event, context=None, version=None):
161170

162171
lambda_cwd = func_details.cwd
163172
runtime = func_details.runtime
@@ -205,7 +214,7 @@ def _execute(self, func_arn, func_details, event, context=None, version=None, as
205214

206215
# lambci writes the Lambda result to stdout and logs to stderr, fetch it from there!
207216
LOG.debug('Running lambda cmd: %s' % cmd)
208-
result, log_output = self.run_lambda_executor(cmd, environment, asynchronous)
217+
result, log_output = self.run_lambda_executor(cmd, environment)
209218
log_formatted = log_output.strip().replace('\n', '\n> ')
210219
LOG.debug('Lambda %s result / log output:\n%s\n>%s' % (func_arn, result.strip(), log_formatted))
211220
return result, log_output
@@ -574,7 +583,7 @@ def get_host_path_for_path_in_docker(self, path):
574583

575584
class LambdaExecutorLocal(LambdaExecutor):
576585

577-
def _execute(self, func_arn, func_details, event, context=None, version=None, asynchronous=False):
586+
def _execute(self, func_arn, func_details, event, context=None, version=None):
578587
lambda_cwd = func_details.cwd
579588
environment = func_details.envvars.copy()
580589

@@ -610,16 +619,9 @@ def execute_java_lambda(self, event, context, handler, main_file):
610619
class_name = handler.split('::')[0]
611620
classpath = '%s:%s' % (LAMBDA_EXECUTOR_JAR, main_file)
612621
cmd = 'java -cp %s %s %s %s' % (classpath, LAMBDA_EXECUTOR_CLASS, class_name, event_file)
613-
asynchronous = False
614-
# flip asynchronous flag depending on origin
615-
if 'Records' in event:
616-
# TODO: add more event supporting asynchronous lambda execution
617-
if 'Sns' in event['Records'][0]:
618-
asynchronous = True
619-
if 'dynamodb' in event['Records'][0]:
620-
asynchronous = True
621-
result, log_output = self.run_lambda_executor(cmd, asynchronous=asynchronous)
622-
LOG.debug('Lambda result / log output:\n%s\n> %s' % (result.strip(), log_output.strip().replace('\n', '\n> ')))
622+
result, log_output = self.run_lambda_executor(cmd)
623+
LOG.debug('Lambda result / log output:\n%s\n> %s' % (
624+
result.strip(), log_output.strip().replace('\n', '\n> ')))
623625
return result, log_output
624626

625627

localstack/services/generic_proxy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ def forward(self, method):
302302
except Exception as e:
303303
trace = str(traceback.format_exc())
304304
conn_errors = ('ConnectionRefusedError', 'NewConnectionError',
305-
'Connection aborted', 'Unexpected EOF')
305+
'Connection aborted', 'Unexpected EOF', 'Connection reset by peer')
306306
conn_error = any(e in trace for e in conn_errors)
307307
error_msg = 'Error forwarding request: %s %s' % (e, trace)
308308
if 'Broken pipe' in trace:

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ forbiddenfruit==0.1.3
2727
jsonpath-rw==1.4.0
2828
localstack-ext>=0.8.6
2929
localstack-client==0.9 #basic-lib
30-
moto-ext>=1.3.7.1
30+
moto-ext==1.3.7.2
3131
nose>=1.3.7
3232
nose-timer>=0.7.5
3333
psutil==5.4.8

0 commit comments

Comments
 (0)
0