8000 working tunnel to LS · localstack/localstack@0df4b9b · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 0df4b9b

Browse files
committed
working tunnel to LS
1 parent c34fafd commit 0df4b9b

File tree

4 files changed

+71
-10
lines changed

4 files changed

+71
-10
lines changed

localstack/services/awslambda/invocation/executor_endpoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def __init__(self, invocation_id: str, payload: Optional[bytes]):
2525

2626
class ExecutorEndpoint(Server):
2727
service_endpoint: ServiceEndpoint
28-
port: Optional[str]
28+
port: Optional[int]
2929

3030
def __init__(
3131
self,

localstack/services/awslambda/invocation/runtime_environment.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
if TYPE_CHECKING:
1616
from localstack.services.awslambda.invocation.version_manager import QueuedInvocation
1717

18-
STARTUP_TIMEOUT_SEC = 10.0
18+
STARTUP_TIMEOUT_SEC = 30.0
1919
HEX_CHARS = [str(num) for num in range(10)] + ["a", "b", "c", "d", "e", "f"]
2020

2121
LOG = logging.getLogger(__name__)

localstack/services/awslambda/invocation/runtime_executor_kubernetes.py

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import time
77
from pathlib import Path
88
from tempfile import NamedTemporaryFile
9-
from typing import Dict, Literal, Optional, Tuple
9+
from typing import Dict, List, Literal, Optional, Tuple
1010

1111
from kubernetes import client as kubernetes_client
1212
from kubernetes import config as kubernetes_config
@@ -18,11 +18,12 @@
1818
ServiceEndpoint,
1919
)
2020
from localstack.services.awslambda.invocation.lambda_models import FunctionVersion
21-
from localstack.services.awslambda.lambda_utils import get_main_endpoint_from_container
2221
from localstack.services.install import LAMBDA_RUNTIME_INIT_PATH
2322
from localstack.utils.archives import unzip
2423
from localstack.utils.docker_utils import DOCKER_CLIENT as CONTAINER_CLIENT
2524
from localstack.utils.net import get_free_tcp_port
25+
from localstack.utils.run import ShellCommandThread
26+
from localstack.utils.sync import poll_condition
2627

2728
LOG = logging.getLogger(__name__)
2829

@@ -137,12 +138,58 @@ def __init__(self, message: str):
137138
super().__init__(message)
138139

139140

141+
class ExposeLSUtil:
142+
cmd_threads: List[ShellCommandThread]
143+
144+
def __init__(self):
145+
self.cmd_threads = []
146+
147+
def expose_local_port(self, port: int) -> str:
148+
"""Expose the given port on the web using a tunnel
149+
Returns the url the port is exposed
150+
"""
151+
url_regex = r"https://([^.]*.lhrtunnel.link)"
152+
url = None
153+
154+
def log_listener(msg: str, *args, **kwargs):
155+
nonlocal url
156+
match = re.search(url_regex, msg)
157+
if match:
158+
url = match.group(1)
159+
160+
thread = ShellCommandThread(
161+
cmd=["ssh", "-R", f"80:localhost:{port}", "nokey@localhost.run"],
162+
log_listener=log_listener,
163+
)
164+
self.cmd_threads.append(thread)
165+
thread.start()
166+
167+
def check_url_set():
168+
return url is not None
169+
170+
result = poll_condition(check_url_set, timeout=15, interval=1)
171+
if not result:
172+
raise LambdaRuntimeException("Could not setup port forwarding using ssh")
173+
174+
return url
175+
176+
def shutdown(self):
177+
for cmd_thread in self.cmd_threads:
178+
try:
179+
cmd_thread.stop()
180+
except Exception:
181+
LOG.exception("Error shutting down ssh process")
182+
183+
140184
class KubernetesRuntimeExecutor:
141185
id: str
142186
function_version: FunctionVersion
143187
# address the container is available at (for LocalStack)
144188
address: Optional[str]
145189
executor_endpoint: Optional[ExecutorEndpoint]
190+
expose_util: Optional[ExposeLSUtil]
191+
executor_url: Optional[str]
192+
edge_url: Optional[str]
146193

147194
def __init__(
148195
self, id: str, function_version: FunctionVersion, service_endpoint: ServiceEndpoint
@@ -151,6 +198,8 @@ def __init__(
151198
self.function_version = function_version
152199
self.address = None
153200
self.executor_endpoint = self._build_executor_endpoint(service_endpoint)
201+
self.expose_util = None
202+
self.setup_proxies()
154203

155204
@staticmethod
156205
def prepare_version(function_version: FunctionVersion) -> None:
@@ -208,6 +257,13 @@ def get_kubernetes_client(self):
208257
)
209258
return kubernetes_client.ApiClient()
210259

260+
def setup_proxies(self):
261+
self.expose_util = ExposeLSUtil()
262+
self.edge_url = self.expose_util.expose_local_port(config.EDGE_PORT)
263+
self.executor_url = self.expose_util.expose_local_port(self.executor_endpoint.port)
264+
LOG.debug("Edge url: %s", self.edge_url)
265+
LOG.debug("Executor url: %s", self.executor_url)
266+
211267
def start(self, env_vars: Dict[str, str]) -> None:
212268
self.executor_endpoint.start()
213269
# deep copy is not really necessary, but let's keep it to be safe
@@ -235,9 +291,13 @@ def start(self, env_vars: Dict[str, str]) -> None:
235291
# TODO proxy through kube https://github.com/kubernetes-client/python/blob/master/examples/pod_portforward.py
236292

237293
def stop(self) -> None:
294+
self.expose_util.shutdown()
295+
LOG.debug("Expose shutdown complete")
238296
api_client = self.get_kubernetes_client()
239297
core_v1_client = kubernetes_client.CoreV1Api(api_client)
298+
LOG.debug("Deleting pod")
240299
core_v1_client.delete_namespaced_pod(name=self.id, namespace="default")
300+
LOG.debug("Pod deleted")
241301
try:
242302
self.executor_endpoint.shutdown()
243303
except Exception as e:
@@ -253,15 +313,15 @@ def get_address(self) -> str:
253313
return self.address
254314

255315
def get_executor_endpoint_from_executor(self) -> str:
256-
return f"{get_main_endpoint_from_container()}:{self.executor_endpoint.port}"
316+
if not self.executor_url:
317+
raise LambdaRuntimeException("Address of forwarding is not known!")
318+
return f"{self.executor_url}"
257319

258320
def get_localstack_endpoint_from_executor(self) -> str:
259-
return get_main_endpoint_from_container()
321+
if not self.edge_url:
322+
raise LambdaRuntimeException("Edge address of forwarding is not known!")
323+
return self.edge_url
260324

261325
def invoke(self, payload: Dict[str, str]):
262326
LOG.debug("Sending invoke-payload '%s' to executor '%s'", payload, self.id)
263327
self.executor_endpoint.invoke(payload)
264-
265-
266-
class ExposeLSUtil:
267-
pass

localstack/services/awslambda/invocation/version_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ def invoke(self, *, invocation: Invocation) -> "Future[InvocationResult]":
308308
return invocation_storage.result_future
309309

310310
def set_environment_ready(self, executor_id: str) -> None:
311+
LOG.debug("Environment %s ready!", executor_id)
311312
environment = self.all_environments.get(executor_id)
312313
if not environment:
313314
raise Exception(

0 commit comments

Comments
 (0)
0