1
1
import logging
2
2
import os
3
3
import threading
4
+ from abc import abstractmethod
4
5
from pathlib import Path
5
6
from typing import Dict , List , Optional , Tuple
6
7
7
8
from localstack import config
8
- from localstack .services .kinesis .packages import kinesismock_package
9
+ from localstack .services .kinesis .packages import KinesisMockEngine , kinesismock_package
9
10
from localstack .utils .common import TMP_THREADS , ShellCommandThread , get_free_tcp_port , mkdir
10
11
from localstack .utils .run import FuncThread
11
12
from localstack .utils .serving import Server
@@ -21,7 +22,7 @@ class KinesisMockServer(Server):
21
22
def __init__ (
22
23
self ,
23
24
port : int ,
24
- js_path : Path ,
25
+ exe_path : Path ,
25
26
latency : str ,
26
27
account_id : str ,
27
28
host : str = "localhost" ,
@@ -32,7 +33,7 @@ def __init__(
32
33
self ._latency = latency
33
34
self ._data_dir = data_dir
34
35
self ._data_filename = f"{ self ._account_id } .json"
35
- self ._js_path = js_path
36
+ self ._exe_path = exe_path
36
37
self ._log_level = log_level
37
38
super ().__init__ (port , host )
38
39
@@ -51,15 +52,9 @@ def do_start_thread(self) -> FuncThread:
51
52
t .start ()
52
53
return t
53
54
54
- def _create_shell_command (self ) -> Tuple [List , Dict ]:
55
- """
56
- Helper method for creating kinesis mock invocation command
57
- :return: returns a tuple containing the command list and a dictionary with the environment variables
58
- """
59
-
55
+ @property
56
+ def _environment_variables (self ) -> Dict :
60
57
env_vars = {
61
- # Use the `server.json` packaged next to the main.js
62
- "KINESIS_MOCK_CERT_PATH" : str ((self ._js_path .parent / "server.json" ).absolute ()),
63
58
"KINESIS_MOCK_PLAIN_PORT" : self .port ,
64
59
# Each kinesis-mock instance listens to two ports - secure and insecure.
65
60
# LocalStack uses only one - the insecure one. Block the secure port to avoid conflicts.
@@ -91,13 +86,64 @@ def _create_shell_command(self) -> Tuple[List, Dict]:
91
86
env_vars ["PERSIST_INTERVAL" ] = config .KINESIS_MOCK_PERSIST_INTERVAL
92
87
93
88
env_vars ["LOG_LEVEL" ] = self ._log_level
94
- cmd = ["node" , self ._js_path ]
95
- return cmd , env_vars
89
+
90
+ return env_vars
91
+
92
+ @abstractmethod
93
+ def _create_shell_command (self ) -> Tuple [List , Dict ]:
94
+ """
95
+ Helper method for creating kinesis mock invocation command
96
+ :return: returns a tuple containing the command list and a dictionary with the environment variables
97
+ """
98
+ pass
96
99
97
100
def _log_listener (self , line , ** _kwargs ):
98
101
LOG .info (line .rstrip ())
99
102
100
103
104
+ class KinesisMockScalaServer (
F438
KinesisMockServer ):
105
+ def _create_shell_command (self ) -> Tuple [List , Dict ]:
106
+ cmd = ["java" , "-jar" , * self ._get_java_vm_options (), str (self ._exe_path )]
107
+ return cmd , self ._environment_variables
108
+
109
+ @property
110
+ def _environment_variables (self ) -> Dict :
111
+ default_env_vars = super ()._environment_variables
112
+ kinesis_mock_installer = kinesismock_package .get_installer ()
113
+ return {
114
+ ** default_env_vars ,
115
+ ** kinesis_mock_installer .get_java_env_vars (),
116
+ }
117
+
118
+ def _get_java_vm_options (self ) -> list [str ]:
119
+ return [
120
+ f"-Xms{ config .KINESIS_MOCK_INITIAL_HEAP_SIZE } " ,
121
+ f"-Xmx{ config .KINESIS_MOCK_MAXIMUM_HEAP_SIZE } " ,
122
+ "-XX:+UseG1GC" ,
123
+ "-XX:MaxGCPauseMillis=500" ,
124
+ "-XX:+UseGCOverheadLimit" ,
125
+ "-XX:+ExplicitGCInvokesConcurrent" ,
126
+ "-XX:+HeapDumpOnOutOfMemoryError" ,
127
+ "-XX:+ExitOnOutOfMemoryError" ,
128
+ ]
129
+
130
+
131
+ class KinesisMockNodeServer (KinesisMockServer ):
132
+ @property
133
+ def _environment_variables (self ) -> Dict :
134
+ node_env_vars = {
135
+ # Use the `server.json` packaged next to the main.js
136
+ "KINESIS_MOCK_CERT_PATH" : str ((self ._exe_path .parent / "server.json" ).absolute ()),
137
+ }
138
+
139
+ default_env_vars = super ()._environment_variables
140
+ return {** node_env_vars , ** default_env_vars }
141
+
142
+ def _create_shell_command (self ) -> Tuple [List , Dict ]:
143
+ cmd = ["node" , self ._exe_path ]
144
+ return cmd , self ._environment_variables
145
+
146
+
101
147
class KinesisServerManager :
102
148
default_startup_timeout = 60
103
149
@@ -137,7 +183,7 @@ def _create_kinesis_mock_server(self, account_id: str) -> KinesisMockServer:
137
183
"""
138
184
port = get_free_tcp_port ()
139
185
kinesismock_package .install ()
140
- kinesis_mock_js_path = Path (kinesismock_package .get_installer ().get_executable_path ())
186
+ kinesis_mock_path = Path (kinesismock_package .get_installer ().get_executable_path ())
141
187
142
188
# kinesis-mock stores state in json files <account_id>.json, so we can dump everything into `kinesis/`
143
189
persist_path = os .path .join (config .dirs .data , "kinesis" )
@@ -159,12 +205,21 @@ def _create_kinesis_mock_server(self, account_id: str) -> KinesisMockServer:
159
205
log_level = "INFO"
160
206
latency = config .KINESIS_LATENCY + "ms"
161
207
162
- server = KinesisMockServer (
208
+ if kinesismock_package .engine == KinesisMockEngine .SCALA :
209
+ return KinesisMockScalaServer (
210
+ port = port ,
211
+ exe_path = kinesis_mock_path ,
212
+ log_level = log_level ,
213
+ latency = latency ,
214
+ data_dir = persist_path ,
215
+ account_id = account_id ,
216
+ )
217
+
218
+ return KinesisMockNodeServer (
163
219
port = port ,
164
- js_path = kinesis_mock_js_path ,
220
+ exe_path = kinesis_mock_path ,
165
221
log_level = log_level ,
166
222
latency = latency ,
167
223
data_dir = persist_path ,
168
224
account_id = account_id ,
169
225
)
170
- return server
0 commit comments