10000 kinesis: Add scala build behind feature flag · localstack/localstack@bf87505 · GitHub
[go: up one dir, main page]

Skip to content

Commit bf87505

Browse files
committed
kinesis: Add scala build behind feature flag
1 parent 49dcd93 commit bf87505

File tree

3 files changed

+152
-31
lines changed

3 files changed

+152
-31
lines changed

localstack-core/localstack/config.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,20 @@ def populate_edge_configuration(
896896
# randomly inject faults to Kinesis
897897
KINESIS_ERROR_PROBABILITY = float(os.environ.get("KINESIS_ERROR_PROBABILITY", "").strip() or 0.0)
898898

899+
# SEMI-PUBLIC: "node" (default); not actively communicated
900+
# Select whether to use the node or scala build when running Kinesis Mock
901+
KINESIS_MOCK_PROVIDER_ENGINE = os.environ.get("KINESIS_MOCK_PROVIDER_ENGINE", "").strip() or "node"
902+
903+
# set the maximum Java heap size corresponding to the '-Xmx<size>' flag
904+
KINESIS_MOCK_MAXIMUM_HEAP_SIZE = (
905+
os.environ.get("KINESIS_MOCK_MAXIMUM_HEAP_SIZE", "").strip() or "512m"
906+
)
907+
908+
# set the initial Java heap size corresponding to the '-Xms<size>' flag
909+
KINESIS_MOCK_INITIAL_HEAP_SIZE = (
910+
os.environ.get("KINESIS_MOCK_INITIAL_HEAP_SIZE", "").strip() or "256m"
911+
)
912+
899913
# randomly inject faults to DynamoDB
900914
DYNAMODB_ERROR_PROBABILITY = float(os.environ.get("DYNAMODB_ERROR_PROBABILITY", "").strip() or 0.0)
901915
DYNAMODB_READ_ERROR_PROBABILITY = float(

localstack-core/localstack/services/kinesis/kinesis_mock_server.py

Lines changed: 72 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import logging
22
import os
33
import threading
4+
from abc import abstractmethod
45
from pathlib import Path
56
from typing import Dict, List, Optional, Tuple
67

78
from localstack import config
8-
from localstack.services.kinesis.packages import kinesismock_package
9+
from localstack.services.kinesis.packages import KinesisMockEngine, kinesismock_package
910
from localstack.utils.common import TMP_THREADS, ShellCommandThread, get_free_tcp_port, mkdir
1011
from localstack.utils.run import FuncThread
1112
from localstack.utils.serving import Server
@@ -21,7 +22,7 @@ class KinesisMockServer(Server):
2122
def __init__(
2223
self,
2324
port: int,
24-
js_path: Path,
25+
exe_path: Path,
2526
latency: str,
2627
account_id: str,
2728
host: str = "localhost",
@@ -32,7 +33,7 @@ def __init__(
3233
self._latency = latency
3334
self._data_dir = data_dir
3435
self._data_filename = f"{self._account_id}.json"
35-
self._js_path = js_path
36+
self._exe_path = exe_path
3637
self._log_level = log_level
3738
super().__init__(port, host)
3839

@@ -51,15 +52,9 @@ def do_start_thread(self) -> FuncThread:
5152
t.start()
5253
return t
5354

54-
def _create_shell_command(self) -> Tuple[List, Dict]:
55-
"""
56-
Helper method for creating kinesis mock invocation command
57-
:return: returns a tuple containing the command list and a dictionary with the environment variables
58-
"""
59-
55+
@property
56+
def _environment_variables(self) -> Dict:
6057
env_vars = {
61-
# Use the `server.json` packaged next to the main.js
62-
"KINESIS_MOCK_CERT_PATH": str((self._js_path.parent / "server.json").absolute()),
6358
"KINESIS_MOCK_PLAIN_PORT": self.port,
6459
# Each kinesis-mock instance listens to two ports - secure and insecure.
6560
# LocalStack uses only one - the insecure one. Block the secure port to avoid conflicts.
@@ -91,13 +86,64 @@ def _create_shell_command(self) -> Tuple[List, Dict]:
9186
env_vars["PERSIST_INTERVAL"] = config.KINESIS_MOCK_PERSIST_INTERVAL
9287

9388
env_vars["LOG_LEVEL"] = self._log_level
94-
cmd = ["node", self._js_path]
95-
return cmd, env_vars
89+
90+
return env_vars
91+
92+
@abstractmethod
93+
def _create_shell_command(self) -> Tuple[List, Dict]:
94+
"""
95+
Helper method for creating kinesis mock invocation command
96+
:return: returns a tuple containing the command list and a dictionary with the environment variables
97+
"""
98+
pass
9699

97100
def _log_listener(self, line, **_kwargs):
98101
LOG.info(line.rstrip())
99102

100103

104+
class KinesisMockScalaServer( F438 KinesisMockServer):
105+
def _create_shell_command(self) -> Tuple[List, Dict]:
106+
cmd = ["java", "-jar", *self._get_java_vm_options(), str(self._exe_path)]
107+
return cmd, self._environment_variables
108+
109+
@property
110+
def _environment_variables(self) -> Dict:
111+
default_env_vars = super()._environment_variables
112+
kinesis_mock_installer = kinesismock_package.get_installer()
113+
return {
114+
**default_env_vars,
115+
**kinesis_mock_installer.get_java_env_vars(),
116+
}
117+
118+
def _get_java_vm_options(self) -> list[str]:
119+
return [
120+
f"-Xms{config.KINESIS_MOCK_INITIAL_HEAP_SIZE}",
121+
f"-Xmx{config.KINESIS_MOCK_MAXIMUM_HEAP_SIZE}",
122+
"-XX:+UseG1GC",
123+
"-XX:MaxGCPauseMillis=500",
124+
"-XX:+UseGCOverheadLimit",
125+
"-XX:+ExplicitGCInvokesConcurrent",
126+
"-XX:+HeapDumpOnOutOfMemoryError",
127+
"-XX:+ExitOnOutOfMemoryError",
128+
]
129+
130+
131+
class KinesisMockNodeServer(KinesisMockServer):
132+
@property
133+
def _environment_variables(self) -> Dict:
134+
node_env_vars = {
135+
# Use the `server.json` packaged next to the main.js
136+
"KINESIS_MOCK_CERT_PATH": str((self._exe_path.parent / "server.json").absolute()),
137+
}
138+
139+
default_env_vars = super()._environment_variables
140+
return {**node_env_vars, **default_env_vars}
141+
142+
def _create_shell_command(self) -> Tuple[List, Dict]:
143+
cmd = ["node", self._exe_path]
144+
return cmd, self._environment_variables
145+
146+
101147
class KinesisServerManager:
102148
default_startup_timeout = 60
103149

@@ -137,7 +183,7 @@ def _create_kinesis_mock_server(self, account_id: str) -> KinesisMockServer:
137183
"""
138184
port = get_free_tcp_port()
139185
kinesismock_package.install()
140-
kinesis_mock_js_path = Path(kinesismock_package.get_installer().get_executable_path())
186+
kinesis_mock_path = Path(kinesismock_package.get_installer().get_executable_path())
141187

142188
# kinesis-mock stores state in json files <account_id>.json, so we can dump everything into `kinesis/`
143189
persist_path = os.path.join(config.dirs.data, "kinesis")
@@ -159,12 +205,21 @@ def _create_kinesis_mock_server(self, account_id: str) -> KinesisMockServer:
159205
log_level = "INFO"
160206
latency = config.KINESIS_LATENCY + "ms"
161207

162-
server = KinesisMockServer(
208+
if kinesismock_package.engine == KinesisMockEngine.SCALA:
209+
return KinesisMockScalaServer(
210+
port=port,
211+
exe_path=kinesis_mock_path,
212+
log_level=log_level,
213+
latency=latency,
214+
data_dir=persist_path,
215+
account_id=account_id,
216+
)
217+
218+
return KinesisMockNodeServer(
163219
port=port,
164-
js_path=kinesis_mock_js_path,
220+
exe_path=kinesis_mock_path,
165221
log_level=log_level,
166222
latency=latency,
167223
data_dir=persist_path,
168224
account_id=account_id,
169225
)
170-
return server
Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,80 @@
11
import os
2-
from functools import lru_cache
3-
from typing import List
2+
from enum import StrEnum
3+
from functools import cached_property, lru_cache
4+
from typing import Any, List
45

5-
from localstack.packages import Package
6-
from localstack.packages.core import NodePackageInstaller
6+
from localstack import config
7+
from localstack.packages import InstallTarget, Package
8+
from localstack.packages.core import GitHubReleaseInstaller, NodePackageInstaller
9+
from localstack.packages.java import JavaInstallerMixin, java_package
710

8-
_KINESIS_MOCK_VERSION = os.environ.get("KINESIS_MOCK_VERSION") or "0.4.9"
11+
_KINESIS_MOCK_VERSION = os.environ.get("KINESIS_MOCK_VERSION") or "0.4.12"
912

1013

11-
class KinesisMockPackage(Package[NodePackageInstaller]):
12-
def __init__(self, default_version: str = _KINESIS_MOCK_VERSION):
14+
class KinesisMockEngine(StrEnum):
15+
NODE = "node"
16+
SCALA = "scala"
17+
18+
@classmethod
19+
def _missing_(cls, value: str | Any):
20+
# default to 'node' if invalid enum
21+
if not isinstance(value, str):
22+
return cls(cls.NODE)
23+
return cls.__members__.get(value.upper(), cls.NODE)
24+
25+
26+
class KinesisMockNodePackageInstaller(NodePackageInstaller):
27+
def __init__(self, version: str):
28+
super().__init__(package_name="kinesis-local", version=version)
29+
30+
31+
class KinesisMockScalaPackageInstaller(JavaInstallerMixin, GitHubReleaseInstaller):
32+
def __init__(self, version: str = _KINESIS_MOCK_VERSION):
33+
super().__init__(
34+
name="kinesis-local", tag=f"v{version}", github_slug="etspaceman/kinesis-mock"
35+
)
36+
37+
# Kinesis Mock requires JRE 21+
38+
self.java_version = "21"
39+
40+
def _get_github_asset_name(self) -> str:
41+
return "kinesis-mock.jar"
42+
43+
def _prepare_installation(self, target: InstallTarget) -> None:
44+
java_package.get_installer(self.java_version).install(target)
45+
46+
def get_java_env_vars(self, *args, **kwargs) -> dict[str, str]:
47+
java_home = java_package.get_installer(self.java_version).get_java_home()
48+
path = f"{java_home}/bin:{os.environ['PATH']}"
49+
50+
return {
51+
"JAVA_HOME": java_home,
52+
"PATH": path,
53+
}
54+
55+
56+
class KinesisMockPackage(Package):
57+
def __init__(
58+
self,
59+
default_version: str = _KINESIS_MOCK_VERSION,
60+
):
1361
super().__init__(name="Kinesis Mock", default_version=default_version)
1462

63+
@cached_property
64+
def engine(self) -> KinesisMockEngine:
65+
return KinesisMockEngine(config.KINESIS_MOCK_PROVIDER_ENGINE)
66+
1567
@lru_cache
16-
def _get_installer(self, version: str) -> NodePackageInstaller:
17-
return KinesisMockPackageInstaller(version)
68+
def _get_installer(
69+
self, version: str
70+
) -> KinesisMockNodePackageInstaller | KinesisMockScalaPackageInstaller:
71+
if self.engine == KinesisMockEngine.SCALA:
72+
return KinesisMockScalaPackageInstaller(version)
73+
74+
return KinesisMockNodePackageInstaller(version)
1875

1976
def get_versions(self) -> List[str]:
2077
return [_KINESIS_MOCK_VERSION]
2178

2279

23-
class KinesisMockPackageInstaller(NodePackageInstaller):
24-
def __init__(self, version: str):
25-
super().__init__(package_name="kinesis-local", version=version)
26-
27-
2880
kinesismock_package = KinesisMockPackage()

0 commit comments

Comments
 (0)
0