8000 MINOR: remove get prefix for internal Kafka Streams methods (#16722) · Python-Repository-Hub/kafka@e4cc5d1 · GitHub
[go: up one dir, main page]

Skip to content

Commit e4cc5d1

Browse files
authored
MINOR: remove get prefix for internal Kafka Streams methods (apache#16722)
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
1 parent 9d81a67 commit e4cc5d1

File tree

45 files changed

+140
-141
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+140
-141
lines changed

streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public final class WordCountDemo {
4949
public static final String INPUT_TOPIC = "streams-plaintext-input";
5050
public static final String OUTPUT_TOPIC = "streams-wordcount-output";
5151

52-
static Properties getStreamsConfig(final String[] args) throws IOException {
52+
static Properties streamsConfig(final String[] args) throws IOException {
5353
final Properties props = new Properties();
5454
if (args != null && args.length > 0) {
5555
try (final FileInputStream fis = new FileInputStream(args[0])) {
@@ -85,7 +85,7 @@ static void createWordCountStream(final StreamsBuilder builder) {
8585
}
8686

8787
public static void main(final String[] args) throws IOException {
88-
final Properties props = getStreamsConfig(args);
88+
final Properties props = streamsConfig(args);
8989

9090
final StreamsBuilder builder = new StreamsBuilder();
9191
createWordCountStream(builder);

streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountDemoTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void setup() throws IOException {
5858
final StreamsBuilder builder = new StreamsBuilder();
5959
//Create Actual Stream Processing pipeline
6060
WordCountDemo.createWordCountStream(builder);
61-
testDriver = new TopologyTestDriver(builder.build(), WordCountDemo.getStreamsConfig(null));
61+
testDriver = new TopologyTestDriver(builder.build(), WordCountDemo.streamsConfig(null));
6262
inputTopic = testDriver.createInputTopic(WordCountDemo.INPUT_TOPIC, new StringSerializer(), new StringSerializer());
6363
outputTopic = testDriver.createOutputTopic(WordCountDemo.OUTPUT_TOPIC, new StringDeserializer(), new LongDeserializer());
6464
}
@@ -111,13 +111,13 @@ public void testCountListOfWords() {
111111
}
112112

113113
@Test
114-
public void testGetStreamsConfig() throws IOException {
114+
public void testStreamsConfig() throws IOException {
115115
final File tmp = TestUtils.tempFile("bootstrap.servers=localhost:1234");
116116
try {
117-
Properties config = WordCountDemo.getStreamsConfig(new String[] {tmp.getPath()});
117+
Properties config = WordCountDemo.streamsConfig(new String[] {tmp.getPath()});
118118
assertThat("localhost:1234", equalTo(config.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)));
119119

120-
config = WordCountDemo.getStreamsConfig(new String[] {tmp.getPath(), "extra", "args"});
120+
config = WordCountDemo.streamsConfig(new String[] {tmp.getPath(), "extra", "args"});
121121
assertThat("localhost:1234", equalTo(config.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)));
122122
} finally {
123123
Files.deleteIfExists(tmp.toPath());

streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
113113
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
114114
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
115-
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
115+
import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSize;
116116
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
117117
import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
118118

@@ -546,7 +546,7 @@ private void handleStreamsUncaughtException(final Throwable throwable,
546546
closeToError();
547547
break;
548548
case SHUTDOWN_APPLICATION:
549-
if (getNumLiveStreamThreads() == 1) {
549+
if (numLiveStreamThreads() == 1) {
550550
log.warn("Attempt to shut down the application requires adding a thread to communicate the shutdown. No processing will be done on this thread");
551551
addStreamThread();
552552
}
@@ -555,7 +555,7 @@ private void handleStreamsUncaughtException(final Throwable throwable,
555555
"but the uncaught exception was an Error, which means this runtime is no " +
556556
"longer in a well-defined state. Attempting to send the shutdown command anyway.", throwable);
557557
}
558-
if (Thread.currentThread().equals(globalStreamThread) && getNumLiveStreamThreads() == 0) {
558+
if (Thread.currentThread().equals(globalStreamThread) && numLiveStreamThreads() == 0) {
559559
log.error("Exception in global thread caused the application to attempt to shutdown." +
560560
" This action will succeed only if there is at least one StreamThread running on this client." +
561561
" Currently there are no running threads so will now close the client.");
@@ -991,12 +991,12 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
991991

992992
// use client id instead of thread client id since this admin client may be shared among threads
993993
this.clientSupplier = clientSupplier;
994-
adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
994+
adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId)));
995995

996996
log.info("Kafka Streams version: {}", ClientMetrics.version());
997997
log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
998998

999-
metrics = getMetrics(applicationConfigs, time, clientId);
999+
metrics = createMetrics(applicationConfigs, time, clientId);
10001000
streamsMetrics = new StreamsMetricsImpl(
10011001
metrics,
10021002
clientId,
@@ -1010,7 +1010,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
10101010
ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString());
10111011
ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);
10121012
threads = Collections.synchronizedList(new LinkedList<>());
1013-
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> getNumLiveStreamThreads());
1013+
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> numLiveStreamThreads());
10141014

10151015
streamsMetadataState = new StreamsMetadataState(
10161016
this.topologyMetadata,
@@ -1023,9 +1023,9 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
10231023
delegatingStateRestoreListener = new DelegatingStateRestoreListener();
10241024
delegatingStandbyUpdateListener = new DelegatingStandbyUpdateListener();
10251025

1026-
totalCacheSize = getTotalCacheSize(applicationConfigs);
1027-
final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs);
1028-
final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads);
1026+
totalCacheSize = totalCacheSize(applicationConfigs);
1027+
final int numStreamThreads = topologyMetadata.numStreamThreads(applicationConfigs);
1028+
final long cacheSizePerThread = cacheSizePerThread(numStreamThreads);
10291029

10301030
GlobalStreamThread.State globalThreadState = null;
10311031
if (hasGlobalTopology) {
@@ -1088,7 +1088,7 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
10881088
return streamThread;
10891089
}
10901090

1091-
static Metrics getMetrics(final StreamsConfig config, final Time time, final String clientId) {
1091+
private static Metrics createMetrics(final StreamsConfig config, final Time time, final String clientId) {
10921092
final MetricConfig metricConfig = new MetricConfig()
10931093
.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
10941094
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -1117,9 +1117,9 @@ public Optional<String> addStreamThread() {
11171117
if (isRunningOrRebalancing()) {
11181118
final StreamThread streamThread;
11191119
synchronized (changeThreadCount) {
1120-
final int threadIdx = getNextThreadIndex();
1121-
final int numLiveThreads = getNumLiveStreamThreads();
1122-
final long cacheSizePerThread = getCacheSizePerThread(numLiveThreads + 1);
1120+
final int threadIdx = nextThreadIndex();
1121+
final int numLiveThreads = numLiveStreamThreads();
1122+
final long cacheSizePerThread = cacheSizePerThread(numLiveThreads + 1);
11231123
log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}",
11241124
threadIdx, numLiveThreads + 1, cacheSizePerThread);
11251125
resizeThreadCache(cacheSizePerThread);
@@ -1136,7 +1136,7 @@ public Optional<String> addStreamThread() {
11361136
log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state);
11371137
streamThread.shutdown();
11381138
threads.remove(streamThread);
1139-
final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
1139+
final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
11401140
log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
11411141
resizeThreadCache(cacheSizePerThread);
11421142
return Optional.empty();
@@ -1194,7 +1194,7 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
11941194
// make a copy of threads to avoid holding lock
11951195
for (final StreamThread streamThread : new ArrayList<>(threads)) {
11961196
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
1197-
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || getNumLiveStreamThreads() == 1)) {
1197+
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
11981198
log.info("Removing StreamThread " + streamThread.getName());
11991199
final Optional<String> groupInstanceID = streamThread.getGroupInstanceID();
12001200
streamThread.requestLeaveGroupDuringShutdown();
@@ -1216,7 +1216,7 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
12161216
+ "for it to complete shutdown as this will result in deadlock.", streamThread.getName());
12171217
}
12181218

1219-
final long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
1219+
final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
12201220
log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread);
12211221
resizeThreadCache(cacheSizePerThread);
12221222
if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) {
@@ -1282,7 +1282,7 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
12821282
* threads lock when looping threads.
12831283
* @return number of alive stream threads
12841284
*/
1285-
private int getNumLiveStreamThreads() {
1285+
private int numLiveStreamThreads() {
12861286
final AtomicInteger numLiveThreads = new AtomicInteger(0);
12871287

12881288
synchronized (threads) {
@@ -1301,7 +1301,7 @@ private int getNumLiveStreamThreads() {
13011301
}
13021302
}
13031303

1304-
private int getNextThreadIndex() {
1304+
private int nextThreadIndex() {
13051305
final HashSet<String> allLiveThreadNames = new HashSet<>();
13061306
final AtomicInteger maxThreadId = new AtomicInteger(1);
13071307
synchronized (threads) {
@@ -1333,7 +1333,7 @@ private int getNextThreadIndex() {
13331333
}
13341334
}
13351335

1336-
private long getCacheSizePerThread(final int numStreamThreads) {
1336+
private long cacheSizePerThread(final int numStreamThreads) {
13371337
if (numStreamThreads == 0) {
13381338
return totalCacheSize;
13391339
}
@@ -1831,7 +1831,7 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
18311831
*/
18321832
public void pause() {
18331833
if (topologyMetadata.hasNamedTopologies()) {
1834-
for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) {
1834+
for (final NamedTopology namedTopology : topologyMetadata.allNamedTopologies()) {
18351835
topologyMetadata.pauseTopology(namedTopology.name());
18361836
}
18371837
} else {
@@ -1844,7 +1844,7 @@ public void pause() {
18441844
*/
18451845
public boolean isPaused() {
18461846
if (topologyMetadata.hasNamedTopologies()) {
1847-
return topologyMetadata.getAllNamedTopologies().stream()
1847+
return topologyMetadata.allNamedTopologies().stream()
18481848
.map(NamedTopology::name)
18491849
.allMatch(topologyMetadata::isPaused);
18501850
} else {
@@ -1857,7 +1857,7 @@ public boolean isPaused() {
18571857
*/
18581858
public void resume() {
18591859
if (topologyMetadata.hasNamedTopologies()) {
1860-
for (final NamedTopology namedTopology : topologyMetadata.getAllNamedTopologies()) {
1860+
for (final NamedTopology namedTopology : topologyMetadata.allNamedTopologies()) {
18611861
topologyMetadata.resumeTopology(namedTopology.name());
18621862
}
18631863
} else {

streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1252,14 +1252,14 @@ public static class InternalConfig {
12521252
// Private API to enable the state updater (i.e. state updating on a dedicated thread)
12531253
public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__";
12541254

1255-
public static boolean getStateUpdaterEnabled(final Map<String, Object> configs) {
1255+
public static boolean stateUpdaterEnabled(final Map<String, Object> configs) {
12561256
return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true);
12571257
}
12581258

12591259
// Private API to enable processing threads (i.e. polling is decoupled from processing)
12601260
public static final String PROCESSING_THREADS_ENABLED = "__processing.threads.enabled__";
12611261

1262-
public static boolean getProcessingThreadsEnabled(final Map<String, Object> configs) {
1262+
public static boolean processingThreadsEnabled(final Map<String, Object> configs) {
12631263
return InternalConfig.getBoolean(configs, InternalConfig.PROCESSING_THREADS_ENABLED, false);
12641264
}
12651265

streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC;
6161
import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG;
6262
import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC;
63-
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
63+
import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSize;
6464

6565
/**
6666
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
@@ -167,7 +167,7 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
167167
final boolean cacheMaxBytesBufferingOverridden = isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides);
168168

169169
if (!stateStoreCacheMaxBytesOverridden && !cacheMaxBytesBufferingOverridden) {
170-
cacheSize = getTotalCacheSize(globalAppConfigs);
170+
cacheSize = totalCacheSize(globalAppConfigs);
171171
} else {
172172
if (stateStoreCacheMaxBytesOverridden && cacheMaxBytesBufferingOverridden) {
173173
cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);

streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public static boolean eosEnabled(final ProcessingMode processingMode) {
7777
}
7878

7979
@SuppressWarnings("deprecation")
80-
public static long getTotalCacheSize(final StreamsConfig config) {
80+
public static long totalCacheSize(final StreamsConfig config) {
8181
// both deprecated and new config set. Warn and use the new one.
8282
if (config.originals().containsKey(CACHE_MAX_BYTES_BUFFERING_CONFIG) && config.originals().containsKey(STATESTORE_CACHE_MAX_BYTES_CONFIG)) {
8383
if (!config.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG).equals(config.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG))) {

streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public enum UpgradeFromValues {
4848
this.value = value;
4949
}
5050

51-
public static UpgradeFromValues getValueFromString(final String upgradeFrom) {
51+
public static UpgradeFromValues fromString(final String upgradeFrom) {
5252
return UpgradeFromValues.valueOf("UPGRADE_FROM_" + upgradeFrom.replace(".", ""));
5353
}
5454
public String toString() {

streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private static boolean isUpgrade(final Map<String, ?> configs) {
5757
return false;
5858
}
5959

60-
switch (UpgradeFromValues.getValueFromString((String) upgradeFrom)) {
60+
switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
6161
case UPGRADE_FROM_0100:
6262
case UPGRADE_FROM_0101:
6363
case UPGRADE_FROM_0102:

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private boolean isNotUpgrade(final Map<String, ?> configs) {
9999
return true;
100100
}
101101

102-
switch (UpgradeFromValues.getValueFromString((String) upgradeFrom)) {
102+
switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
103103
case UPGRADE_FROM_0100:
104104
case UPGRADE_FROM_0101:
105105
case UPGRADE_FROM_0102:

streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private static boolean upgradeFromV0(final Map<String, ?> configs) {
7575
return false;
7676
}
7777

78-
switch (UpgradeFromValues.getValueFromString((String) upgradeFrom)) {
78+
switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
7979
case UPGRADE_FROM_0100:
8080
case UPGRADE_FROM_0101:
8181
case UPGRADE_FROM_0102:

streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
4848
import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
4949
import static org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode;
50-
import static org.apache.kafka.streams.processor.internals.ClientUtils.getTaskProducerClientId;
51-
import static org.apache.kafka.streams.processor.internals.ClientUtils.getThreadProducerClientId;
50+
import static org.apache.kafka.streams.processor.internals.ClientUtils.taskProducerClientId;
51+
import static org.apache.kafka.streams.processor.internals.ClientUtils.threadProducerClientId;
5252

5353
class ActiveTaskCreator {
5454
private final TopologyMetadata topologyMetadata;
@@ -269,7 +269,7 @@ private StreamTask createActiveTask(final TaskId taskId,
269269
inputPartitions,
270270
topology,
271271
consumer,
272-
topologyMetadata.getTaskConfigFor(taskId),
272+
topologyMetadata.taskConfig(taskId),
273273
streamsMetrics,
274274
stateDirectory,
275275
cache,
@@ -319,11 +319,11 @@ Map<MetricName, Metric> producerMetrics() {
319319

320320
Set<String> producerClientIds() {
321321
if (threadProducer != null) {
322-
return Collections.singleton(getThreadProducerClientId(threadId));
322+
return Collections.singleton(threadProducerClientId(threadId));
323323
} else {
324324
return taskProducers.keySet()
325325
.stream()
326-
.map(taskId -> getTaskProducerClientId(threadId, taskId))
326+
.map(taskId -> taskProducerClientId(threadId, taskId))
327327
.collect(Collectors.toSet());
328328
}
329329
}

0 commit comments

Comments
 (0)
0