13
13
from pipes import quote as cmd_quote
14
14
from localstack import config
15
15
from localstack .utils .aws import aws_stack
16
- from localstack .utils .common import run , TMP_FILES , short_uid , save_file , to_str , cp_r , CaptureOutput
16
+ from localstack .utils .common import (
17
+ CaptureOutput , FuncThread , TMP_FILES , short_uid , save_file , to_str , run , cp_r )
17
18
from localstack .services .install import INSTALL_PATH_LOCALSTACK_FAT_JAR
18
19
19
20
# constants
@@ -55,19 +56,30 @@ def __init__(self):
55
56
self .function_invoke_times = {}
56
57
57
58
def execute (self , func_arn , func_details , event , context = None , version = None , asynchronous = False ):
58
- # set the invocation time in milliseconds
59
- invocation_time = int (time .time () * 1000 )
60
- # start the execution
61
- try :
62
- result , log_output = self ._execute (func_arn , func_details , event , context , version , asynchronous )
63
- finally :
64
- self .function_invoke_times [func_arn ] = invocation_time
65
- # forward log output to cloudwatch logs
66
- self ._store_logs (func_details , log_output , invocation_time )
67
- # return final result
68
- return result , log_output
69
59
70
- def _execute (self , func_arn , func_details , event , context = None , version = None , asynchronous = False ):
60
+ def do_execute (* args ):
61
+ # set the invocation time in milliseconds
62
+ invocation_time = int (time .time () * 1000 )
63
+ # start the execution
64
+ try :
65
+ result , log_output = self ._execute (func_arn , func_details , event , context , version )
66
+ finally :
67
+ self .function_invoke_times [func_arn ] = invocation_time
68
+ # forward log output to cloudwatch logs
69
+ self ._store_logs (func_details , log_output , invocation_time )
70
+ # return final result
71
+ return result , log_output
72
+
73
+ # Inform users about asynchronous mode of the lambda execution.
74
+ if asynchronous :
75
+ LOG .debug ('Lambda executed in Event (asynchronous) mode, no response from this '
76
+ 'function will be returned to caller' )
77
+ FuncThread (do_execute ).start ()
78
10000
+ return None , 'Lambda executed asynchronously.'
79
+
80
+ return do_execute ()
81
+
82
+ def _execute (self , func_arn , func_details , event , context = None , version = None ):
71
83
""" This method must be overwritten by subclasses. """
72
84
raise Exception ('Not implemented.' )
73
85
@@ -115,26 +127,23 @@ def _store_logs(self, func_details, log_output, invocation_time):
115
127
logEvents = log_events
116
128
)
117
129
118
- def run_lambda_executor (self , cmd , env_vars = {}, asynchronous = False ):
130
+ def run_lambda_executor (self , cmd , env_vars = {}):
119
131
process = run (cmd , asynchronous = True , stderr = subprocess .PIPE , outfile = subprocess .PIPE , env_vars = env_vars )
120
- if asynchronous :
121
- result = '{"asynchronous": "%s"}' % asynchronous
122
- log_output = 'Lambda executed asynchronously'
123
- else :
124
- result , log_output = process .communicate ()
125
- result = to_str (result ).strip ()
126
- log_output = to_str (log_output ).strip ()
127
- return_code = process .returncode
128
- # Note: The user's code may have been logging to stderr, in which case the logs
129
- # will be part of the "result" variable here. Hence, make sure that we extract
130
- # only the *last* line of "result" and consider anything above that as log output.
131
- if '\n ' in result :
132
- additional_logs , _ , result = result .rpartition ('\n ' )
133
- log_output += '\n %s' % additional_logs
134
-
135
- if return_code != 0 :
136
- raise Exception ('Lambda process returned error status code: %s. Output:\n %s' %
137
- (return_code , log_output ))
132
+ result , log_output = process .communicate ()
133
+ result = to_str (result ).strip ()
134
+ log_output = to_str (log_output ).strip ()
135
+ return_code = process .returncode
136
+ # Note: The user's code may have been logging to stderr, in which case the logs
137
+ # will be part of the "result" variable here. Hence, make sure that we extract
138
+ # only the *last* line of "result" and consider anything above that as log output.
139
+ if '\n ' in result :
140
+ additional_logs , _ , result = result .rpartition ('\n ' )
141
+ log_output += '\n %s' % additional_logs
142
+
143
+ if return_code != 0 :
144
+ raise Exception ('Lambda process returned error status code: %s. Output:\n %s' %
145
+ (return_code , log_output ))
146
+
138
147
return result , log_output
139
148
140
149
@@ -157,7 +166,7 @@ def _docker_cmd(self):
157
166
""" Return the string to be used for running Docker commands. """
158
167
return config .DOCKER_CMD
159
168
160
- def _execute (self , func_arn , func_details , event , context = None , version = None , asynchronous = False ):
169
+ def _execute (self , func_arn , func_details , event , context = None , version = None ):
161
170
162
171
lambda_cwd = func_details .cwd
163
172
runtime = func_details .runtime
@@ -205,7 +214,7 @@ def _execute(self, func_arn, func_details, event, context=None, version=None, as
205
214
206
215
# lambci writes the Lambda result to stdout and logs to stderr, fetch it from there!
207
216
LOG .debug ('Running lambda cmd: %s' % cmd )
208
- result , log_output = self .run_lambda_executor (cmd , environment , asynchronous )
217
+ result , log_output = self .run_lambda_executor (cmd , environment )
209
218
log_formatted = log_output .strip ().replace ('\n ' , '\n > ' )
210
219
LOG .debug ('Lambda %s result / log output:\n %s\n >%s' % (func_arn , result .strip (), log_formatted ))
211
220
return result , log_output
@@ -574,7 +583,7 @@ def get_host_path_for_path_in_docker(self, path):
574
583
575
584
class LambdaExecutorLocal (LambdaExecutor ):
576
585
577
- def _execute (self , func_arn , func_details , event , context = None , version = None , asynchronous = False ):
586
+ def _execute (self , func_arn , func_details , event , context = None , version = None ):
578
587
lambda_cwd = func_details .cwd
579
588
environment = func_details .envvars .copy ()
580
589
@@ -610,16 +619,9 @@ def execute_java_lambda(self, event, context, handler, main_file):
610
619
class_name = handler .split ('::' )[0 ]
611
620
classpath = '%s:%s' % (LAMBDA_EXECUTOR_JAR , main_file )
612
621
cmd = 'java -cp %s %s %s %s' % (classpath , LAMBDA_EXECUTOR_CLASS , class_name , event_file )
613
- asynchronous = False
614
- # flip asynchronous flag depending on origin
615
- if 'Records' in event :
616
- # TODO: add more event supporting asynchronous lambda execution
617
- if 'Sns' in event ['Records' ][0 ]:
618
- asynchronous = True
619
- if 'dynamodb' in event ['Records' ][0 ]:
620
- asynchronous = True
621
- result , log_output = self .run_lambda_executor (cmd , asynchronous = asynchronous )
622
- LOG .debug ('Lambda result / log output:\n %s\n > %s' % (result .strip (), log_output .strip ().replace ('\n ' , '\n > ' )))
622
+ result , log_output = self .run_lambda_executor (cmd )
623
+ LOG .debug ('Lambda result / log output:\n %s\n > %s' % (
624
+ result .strip (), log_output .strip ().replace ('\n ' , '\n > ' )))
623
625
return result , log_output
624
626
625
627
0 commit comments