112
112
import static org .apache .kafka .streams .errors .StreamsUncaughtExceptionHandler .StreamThreadExceptionResponse .SHUTDOWN_CLIENT ;
113
113
import static org .apache .kafka .streams .internals .ApiUtils .prepareMillisCheckFailMsgPrefix ;
114
114
import static org .apache .kafka .streams .internals .ApiUtils .validateMillisecondDuration ;
115
- import static org .apache .kafka .streams .internals .StreamsConfigUtils .getTotalCacheSize ;
115
+ import static org .apache .kafka .streams .internals .StreamsConfigUtils .totalCacheSize ;
116
116
import static org .apache .kafka .streams .processor .internals .ClientUtils .fetchEndOffsets ;
117
117
import static org .apache .kafka .streams .processor .internals .TopologyMetadata .UNNAMED_TOPOLOGY ;
118
118
@@ -546,7 +546,7 @@ private void handleStreamsUncaughtException(final Throwable throwable,
546
546
closeToError ();
547
547
break ;
548
548
case SHUTDOWN_APPLICATION :
549
- if (getNumLiveStreamThreads () == 1 ) {
549
+ if (numLiveStreamThreads () == 1 ) {
550
550
log .warn ("Attempt to shut down the application requires adding a thread to communicate the shutdown. No processing will be done on this thread" );
551
551
addStreamThread ();
552
552
}
@@ -555,7 +555,7 @@ private void handleStreamsUncaughtException(final Throwable throwable,
555
555
"but the uncaught exception was an Error, which means this runtime is no " +
556
556
"longer in a well-defined state. Attempting to send the shutdown command anyway." , throwable );
557
557
}
558
- if (Thread .currentThread ().equals (globalStreamThread ) && getNumLiveStreamThreads () == 0 ) {
558
+ if (Thread .currentThread ().equals (globalStreamThread ) && numLiveStreamThreads () == 0 ) {
559
559
log .error ("Exception in global thread caused the application to attempt to shutdown." +
560
560
" This action will succeed only if there is at least one StreamThread running on this client." +
561
561
" Currently there are no running threads so will now close the client." );
@@ -991,12 +991,12 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
991
991
992
992
// use client id instead of thread client id since this admin client may be shared among threads
993
993
this .clientSupplier = clientSupplier ;
994
- adminClient = clientSupplier .getAdmin (applicationConfigs .getAdminConfigs (ClientUtils .getSharedAdminClientId (clientId )));
994
+ adminClient = clientSupplier .getAdmin (applicationConfigs .getAdminConfigs (ClientUtils .adminClientId (clientId )));
995
995
996
996
log .info ("Kafka Streams version: {}" , ClientMetrics .version ());
997
997
log .info ("Kafka Streams commit ID: {}" , ClientMetrics .commitId ());
998
998
999
- metrics = getMetrics (applicationConfigs , time , clientId );
999
+ metrics = createMetrics (applicationConfigs , time , clientId );
1000
1000
streamsMetrics = new StreamsMetricsImpl (
1001
1001
metrics ,
1002
1002
clientId ,
@@ -1010,7 +1010,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
1010
1010
ClientMetrics .addTopologyDescriptionMetric (streamsMetrics , (metricsConfig , now ) -> this .topologyMetadata .topologyDescriptionString ());
1011
1011
ClientMetrics .addStateMetric (streamsMetrics , (metricsConfig , now ) -> state );
1012
1012
threads = Collections .synchronizedList (new LinkedList <>());
1013
- ClientMetrics .addNumAliveStreamThreadMetric (streamsMetrics , (metricsConfig , now ) -> getNumLiveStreamThreads ());
1013
+ ClientMetrics .addNumAliveStreamThreadMetric (streamsMetrics , (metricsConfig , now ) -> numLiveStreamThreads ());
1014
1014
1015
1015
streamsMetadataState = new StreamsMetadataState (
1016
1016
this .topologyMetadata ,
@@ -1023,9 +1023,9 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
1023
1023
delegatingStateRestoreListener = new DelegatingStateRestoreListener ();
1024
1024
delegatingStandbyUpdateListener = new DelegatingStandbyUpdateListener ();
1025
1025
1026
- totalCacheSize = getTotalCacheSize (applicationConfigs );
1027
- final int numStreamThreads = topologyMetadata .getNumStreamThreads (applicationConfigs );
1028
- final long cacheSizePerThread = getCacheSizePerThread (numStreamThreads );
1026
+ totalCacheSize = totalCacheSize (applicationConfigs );
1027
+ final int numStreamThreads = topologyMetadata .numStreamThreads (applicationConfigs );
1028
+ final long cacheSizePerThread = cacheSizePerThread (numStreamThreads );
1029
1029
1030
1030
GlobalStreamThread .State globalThreadState = null ;
1031
1031
if (hasGlobalTopology ) {
@@ -1088,7 +1088,7 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
1088
1088
return streamThread ;
1089
1089
}
1090
1090
1091
- static Metrics getMetrics (final StreamsConfig config , final Time time , final String clientId ) {
1091
+ private static Metrics createMetrics (final StreamsConfig config , final Time time , final String clientId ) {
1092
1092
final MetricConfig metricConfig = new MetricConfig ()
1093
1093
.samples (config .getInt (StreamsConfig .METRICS_NUM_SAMPLES_CONFIG ))
1094
1094
.recordLevel (Sensor .RecordingLevel .forName (config .getString (StreamsConfig .METRICS_RECORDING_LEVEL_CONFIG )))
@@ -1117,9 +1117,9 @@ public Optional<String> addStreamThread() {
1117
1117
if (isRunningOrRebalancing ()) {
1118
1118
final StreamThread streamThread ;
1119
1119
synchronized (changeThreadCount ) {
1120
- final int threadIdx = getNextThreadIndex ();
1121
- final int numLiveThreads = getNumLiveStreamThreads ();
1122
- final long cacheSizePerThread = getCacheSizePerThread (numLiveThreads + 1 );
1120
+ final int threadIdx = nextThreadIndex ();
1121
+ final int numLiveThreads = numLiveStreamThreads ();
1122
+ final long cacheSizePerThread = cacheSizePerThread (numLiveThreads + 1 );
1123
1123
log .info ("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}" ,
1124
1124
threadIdx , numLiveThreads + 1 , cacheSizePerThread );
1125
1125
resizeThreadCache (cacheSizePerThread );
@@ -1136,7 +1136,7 @@ public Optional<String> addStreamThread() {
1136
1136
log .warn ("Terminating the new thread because the Kafka Streams client is in state {}" , state );
1137
1137
streamThread .shutdown ();
1138
1138
threads .remove (streamThread );
1139
- final long cacheSizePerThread = getCacheSizePerThread ( getNumLiveStreamThreads ());
1139
+ final long cacheSizePerThread = cacheSizePerThread ( numLiveStreamThreads ());
1140
1140
log .info ("Resizing thread cache due to terminating added thread, new cache size per thread is {}" , cacheSizePerThread );
1141
1141
resizeThreadCache (cacheSizePerThread );
1142
1142
return Optional .empty ();
@@ -1194,7 +1194,7 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
1194
1194
// make a copy of threads to avoid holding lock
1195
1195
for (final StreamThread streamThread : new ArrayList <>(threads )) {
1196
1196
final boolean callingThreadIsNotCurrentStreamThread = !streamThread .getName ().equals (Thread .currentThread ().getName ());
1197
- if (streamThread .isThreadAlive () && (callingThreadIsNotCurrentStreamThread || getNumLiveStreamThreads () == 1 )) {
1197
+ if (streamThread .isThreadAlive () && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads () == 1 )) {
1198
1198
log .info ("Removing StreamThread " + streamThread .getName ());
1199
1199
final Optional <String > groupInstanceID = streamThread .getGroupInstanceID ();
1200
1200
streamThread .requestLeaveGroupDuringShutdown ();
@@ -1216,7 +1216,7 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
1216
1216
+ "for it to complete shutdown as this will result in deadlock." , streamThread .getName ());
1217
1217
}
1218
1218
1219
- final long cacheSizePerThread = getCacheSizePerThread ( getNumLiveStreamThreads ());
1219
+ final long cacheSizePerThread = cacheSizePerThread ( numLiveStreamThreads ());
1220
1220
log .info ("Resizing thread cache due to thread removal, new cache size per thread is {}" , cacheSizePerThread );
1221
1221
resizeThreadCache (cacheSizePerThread );
1222
1222
if (groupInstanceID .isPresent () && callingThreadIsNotCurrentStreamThread ) {
@@ -1282,7 +1282,7 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
1282
1282
* threads lock when looping threads.
1283
1283
* @return number of alive stream threads
1284
1284
*/
1285
- private int getNumLiveStreamThreads () {
1285
+ private int numLiveStreamThreads () {
1286
1286
final AtomicInteger numLiveThreads = new AtomicInteger (0 );
1287
1287
1288
1288
synchronized (threads ) {
@@ -1301,7 +1301,7 @@ private int getNumLiveStreamThreads() {
1301
1301
}
1302
1302
}
1303
1303
1304
- private int getNextThreadIndex () {
1304
+ private int nextThreadIndex () {
1305
1305
final HashSet <String > allLiveThreadNames = new HashSet <>();
1306
1306
final AtomicInteger maxThreadId = new AtomicInteger (1 );
1307
1307
synchronized (threads ) {
@@ -1333,7 +1333,7 @@ private int getNextThreadIndex() {
1333
1333
}
1334
1334
}
1335
1335
1336
- private long getCacheSizePerThread (final int numStreamThreads ) {
1336
+ private long cacheSizePerThread (final int numStreamThreads ) {
1337
1337
if (numStreamThreads == 0 ) {
1338
1338
return totalCacheSize ;
1339
1339
}
@@ -1831,7 +1831,7 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
1831
1831
*/
1832
1832
public void pause () {
1833
1833
if (topologyMetadata .hasNamedTopologies ()) {
1834
- for (final NamedTopology namedTopology : topologyMetadata .getAllNamedTopologies ()) {
1834
+ for (final NamedTopology namedTopology : topologyMetadata .allNamedTopologies ()) {
1835
1835
topologyMetadata .pauseTopology (namedTopology .name ());
1836
1836
}
1837
1837
} else {
@@ -1844,7 +1844,7 @@ public void pause() {
1844
1844
*/
1845
1845
public boolean isPaused () {
1846
1846
if (topologyMetadata .hasNamedTopologies ()) {
1847
- return topologyMetadata .getAllNamedTopologies ().stream ()
1847
+ return topologyMetadata .allNamedTopologies ().stream ()
1848
1848
.map (NamedTopology ::name )
1849
1849
.allMatch (topologyMetadata ::isPaused );
1850
1850
} else {
@@ -1857,7 +1857,7 @@ public boolean isPaused() {
1857
1857
*/
1858
1858
public void resume () {
1859
1859
if (topologyMetadata .hasNamedTopologies ()) {
1860
- for (final NamedTopology namedTopology : topologyMetadata .getAllNamedTopologies ()) {
1860
+ for (final NamedTopology namedTopology : topologyMetadata .allNamedTopologies ()) {
1861
1861
topologyMetadata .resumeTopology (namedTopology .name ());
1862
1862
}
1863
1863
} else {
0 commit comments