6
6
import time
7
7
from pathlib import Path
8
8
from tempfile import NamedTemporaryFile
9
- from typing import Dict , Literal , Optional , Tuple
9
+ from typing import Dict , List , Literal , Optional , Tuple
10
10
11
11
from kubernetes import client as kubernetes_client
12
12
from kubernetes import config as kubernetes_config
18
18
ServiceEndpoint ,
19
19
)
20
20
from localstack .services .awslambda .invocation .lambda_models import FunctionVersion
21
- from localstack .services .awslambda .lambda_utils import get_main_endpoint_from_container
22
21
from localstack .services .install import LAMBDA_RUNTIME_INIT_PATH
23
22
from localstack .utils .archives import unzip
24
23
from localstack .utils .docker_utils import DOCKER_CLIENT as CONTAINER_CLIENT
25
24
from localstack .utils .net import get_free_tcp_port
25
+ from localstack .utils .run import ShellCommandThread
26
+ from localstack .utils .sync import poll_condition
26
27
27
28
LOG = logging .getLogger (__name__ )
28
29
@@ -137,12 +138,58 @@ def __init__(self, message: str):
137
138
super ().__init__ (message )
138
139
139
140
141
+ class ExposeLSUtil :
142
+ cmd_threads : List [ShellCommandThread ]
143
+
144
+ def __init__ (self ):
145
+ self .cmd_threads = []
146
+
147
+ def expose_local_port (self , port : int ) -> str :
148
+ """Expose the given port on the web using a tunnel
149
+ Returns the url the port is exposed
150
+ """
151
+ url_regex = r"https://([^.]*.lhrtunnel.link)"
152
+ url = None
153
+
154
+ def log_listener (msg : str , * args , ** kwargs ):
155
+ nonlocal url
156
+ match = re .search (url_regex , msg )
157
+ if match :
158
+ url = match .group (1 )
159
+
160
+ thread = ShellCommandThread (
161
+ cmd = ["ssh" , "-R" , f"80:localhost:{ port } " , "nokey@localhost.run" ],
162
+ log_listener = log_listener ,
163
+ )
164
+ self .cmd_threads .append (thread )
165
+ thread .start ()
166
+
167
+ def check_url_set ():
168
+ return url is not None
169
+
170
+ result = poll_condition (check_url_set , timeout = 15 , interval = 1 )
171
+ if not result :
172
+ raise LambdaRuntimeException ("Could not setup port forwarding using ssh" )
173
+
174
+ return url
175
+
176
+ def shutdown (self ):
177
+ for cmd_thread in self .cmd_threads :
178
+ try :
179
+ cmd_thread .stop ()
180
+ except Exception :
181
+ LOG .exception ("Error shutting down ssh process" )
182
+
183
+
140
184
class KubernetesRuntimeExecutor :
141
185
id : str
142
186
function_version : FunctionVersion
143
187
# address the container is available at (for LocalStack)
144
188
address : Optional [str ]
145
189
executor_endpoint : Optional [ExecutorEndpoint ]
190
+ expose_util : Optional [ExposeLSUtil ]
191
+ executor_url : Optional [str ]
192
+ edge_url : Optional [str ]
146
193
147
194
def __init__ (
148
195
self , id : str , function_version : FunctionVersion , service_endpoint : ServiceEndpoint
@@ -151,6 +198,8 @@ def __init__(
151
198
self .function_version = function_version
152
199
self .address = None
153
200
self .executor_endpoint = self ._build_executor_endpoint (service_endpoint )
201
+ self .expose_util = None
202
+ self .setup_proxies ()
154
203
155
204
@staticmethod
156
205
def prepare_version (function_version : FunctionVersion ) -> None :
@@ -208,6 +257,13 @@ def get_kubernetes_client(self):
208
257
)
209
258
return kubernetes_client .ApiClient ()
210
259
260
+ def setup_proxies (self ):
261
+ self .expose_util = ExposeLSUtil ()
262
+ self .edge_url = self .expose_util .expose_local_port (config .EDGE_PORT )
263
+ self .executor_url = self .expose_util .expose_local_port (self .executor_endpoint .port )
264
+ LOG .debug ("Edge url: %s" , self .edge_url )
265
+ LOG .debug ("Executor url: %s" , self .executor_url )
266
+
211
267
def start (self , env_vars : Dict [str , str ]) -> None :
212
268
self .executor_endpoint .start ()
213
269
# deep copy is not really necessary, but let's keep it to be safe
@@ -235,9 +291,13 @@ def start(self, env_vars: Dict[str, str]) -> None:
235
291
# TODO proxy through kube https://github.com/kubernetes-client/python/blob/master/examples/pod_portforward.py
236
292
237
293
def stop (self ) -> None :
294
+ self .expose_util .shutdown ()
295
+ LOG .debug ("Expose shutdown complete" )
238
296
api_client = self .get_kubernetes_client ()
239
297
core_v1_client = kubernetes_client .CoreV1Api (api_client )
298
+ LOG .debug ("Deleting pod" )
240
299
core_v1_client .delete_namespaced_pod (name = self .id , namespace = "default" )
300
+ LOG .debug ("Pod deleted" )
241
301
try :
242
302
self .executor_endpoint .shutdown ()
243
303
except Exception as e :
@@ -253,15 +313,15 @@ def get_address(self) -> str:
253
313
return self .address
254
314
255
315
def get_executor_endpoint_from_executor (self ) -> str :
256
- return f"{ get_main_endpoint_from_container ()} :{ self .executor_endpoint .port } "
316
+ if not self .executor_url :
317
+ raise LambdaRuntimeException ("Address of forwarding is not known!" )
318
+ return f"{ self .executor_url } "
257
319
258
320
def get_localstack_endpoint_from_executor (self ) -> str :
259
- return get_main_endpoint_from_container ()
321
+ if not self .edge_url :
322
+ raise LambdaRuntimeException ("Edge address of forwarding is not known!" )
323
+ return self .edge_url
260
324
261
325
def invoke (self , payload : Dict [str , str ]):
262
326
LOG .debug ("Sending invoke-payload '%s' to executor '%s'" , payload , self .id )
263
327
self .executor_endpoint .invoke (payload )
264
-
265
-
266
- class ExposeLSUtil :
267
- pass
0 commit comments