8000 MINOR: Cleanup 0.10.x legacy references in ClusterResourceListener an… · coderabbit-test/kafka@c6496e0 · GitHub
[go: up one dir, main page]

Skip to content

Commit c6496e0

Browse files
authored
MINOR: Cleanup 0.10.x legacy references in ClusterResourceListener and TopicConfig (clients module) (apache#19388)
This PR is a minor follow-up to [PR apache#19320](apache#19320), which cleaned up 0.10.x legacy information from the clients module. It addresses remaining reviewer suggestions that were not included in the original PR: - `ClusterResourceListener`: Removed "Note the minimum supported broker version is 2.1." per review suggestion to avoid repeating version-specific details across multiple classes. - `TopicConfig`: Simplified `MAX_MESSAGE_BYTES_DOC` by removing obsolete notes about behavior in versions prior to 0.10.2. These changes help reduce outdated version information in client documentation and improve clarity. Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 8d66481 commit c6496e0

File tree

9 files changed

+11
-40
lines changed
  • config
  • test/java/org/apache/kafka/clients/consumer/internals
  • 9 files changed

    +11
    -40
    lines changed

    clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

    Lines changed: 1 addition & 2 deletions
    Original file line numberDiff line numberDiff line change
    @@ -371,8 +371,7 @@ public class ConsumerConfig extends AbstractConfig {
    371371
    public static final String ALLOW_AUTO_CREATE_TOPICS_CONFIG = "allow.auto.create.topics";
    372372
    private static final String ALLOW_AUTO_CREATE_TOPICS_DOC = "Allow automatic topic creation on the broker when" +
    373373
    " subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the" +
    374-
    " broker allows for it using `auto.create.topics.enable` broker configuration. This configuration must" +
    375-
    " be set to `true` when using brokers older than 0.11.0";
    374+
    " broker allows for it using `auto.create.topics.enable` broker configuration.";
    376375
    public static final boolean DEFAULT_ALLOW_AUTO_CREATE_TOPICS = true;
    377376

    378377
    /**

    clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java

    Lines changed: 1 addition & 2 deletions
    Original file line numberDiff line numberDiff line change
    @@ -218,8 +218,7 @@ protected void handleFetchSuccess(final Node fetchTarget,
    218218
    partition,
    219219
    partitionData,
    220220
    metricAggregator,
    221-
    fetchOffset,
    222-
    requestVersion);
    221+
    fetchOffset);
    223222
    fetchBuffer.add(completedFetch);
    224223
    }
    225224

    clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java

    Lines changed: 1 addition & 4 deletions
    Original file line numberDiff line numberDiff line change
    @@ -60,7 +60,6 @@ public class CompletedFetch {
    6060

    6161
    final TopicPartition partition;
    6262
    final FetchResponseData.PartitionData partitionData;
    63-
    final short requestVersion;
    6463

    6564
    private final Logger log;
    6665
    private final SubscriptionState subscriptions;
    @@ -88,8 +87,7 @@ public class CompletedFetch {
    8887
    TopicPartition partition,
    8988
    FetchResponseData.PartitionData partitionData,
    9089
    FetchMetricsAggregator metricAggregator,
    91-
    Long fetchOffset,
    92-
    short requestVersion) {
    90+
    Long fetchOffset) {
    9391
    this.log = log;
    9492
    this.subscriptions = subscriptions;
    9593
    this.decompressionBufferSupplier = decompressionBufferSupplier;
    @@ -98,7 +96,6 @@ public class CompletedFetch {
    9896
    this.metricAggregator = metricAggregator;
    9997
    this.batches = FetchResponse.recordsOrFail(partitionData).batches().iterator();
    10098
    this.nextFetchOffset = fetchOffset;
    101-
    this.requestVersion = requestVersion;
    10299
    this.lastEpoch = Optional.empty();
    103100
    this.abortedProducerIds = new HashSet<>();
    104101
    this.abortedTransactions = abortedTransactions(partitionData);

    clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java

    Lines changed: 4 additions & 18 deletions
    Original file line numberDiff line numberDiff line change
    @@ -16,13 +16,11 @@
    1616
    */
    1717
    package org.apache.kafka.clients.consumer.internals;
    1818

    19-
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    2019
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    2120
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    2221
    import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
    2322
    import org.apache.kafka.common.KafkaException;
    2423
    import org.apache.kafka.common.TopicPartition;
    25-
    import org.apache.kafka.common.errors.RecordTooLargeException;
    2624
    import org.apache.kafka.common.errors.TopicAuthorizationException;
    2725
    import org.apache.kafka.common.message.FetchResponseData;
    2826
    import org.apache.kafka.common.protocol.Errors;
    @@ -37,7 +35,6 @@
    3735
    import java.util.Collections;
    3836
    import java.util.Iterator;
    3937
    import java.util.List;
    40-
    import java.util.Map;
    4138
    import java.util.Optional;
    4239
    import java.util.Queue;
    4340

    @@ -263,21 +260,10 @@ private CompletedFetch handleInitializeSuccess(final CompletedFetch completedFet
    263260
    Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partition).batches().iterator();
    264261

    265262
    if (!batches.hasNext() && FetchResponse.recordsSize(partition) > 0) {
    266-
    if (completedFetch.requestVersion < 3) {
    267-
    // Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
    268-
    Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
    269-
    throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " +
    270-
    recordTooLargePartitions + " whose size is larger than the fetch size " + fetchConfig.fetchSize +
    271-
    " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " +
    272-
    "newer to avoid this issue. Alternately, increase the fetch size on the client (using " +
    273-
    ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
    274-
    recordTooLargePartitions);
    275-
    } else {
    276-
    // This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)
    277-
    throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
    278-
    fetchOffset + ". Received a non-empty fetch response from the server, but no " +
    279-
    "complete records were found.");
    280-
    }
    263+
    // This should not happen with brokers that support FetchRequest/Response V4 or higher (i.e. KIP-74)
    264+
    throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
    265+
    fetchOffset + ". Received a non-empty fetch response from the server, but no " +
    266+
    "complete records were found.");
    281267
    }
    282268

    283269
    if (!updatePartitionState(partition, tp)) {

    clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java

    Lines changed: 0 additions & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -24,7 +24,6 @@
    2424
    * <p>
    2525
    * <h4>Clients</h4>
    2626
    * There will be one invocation of {@link ClusterResourceListener#onUpdate(ClusterResource)} after each metadata response.
    27-
    * Note the minimum supported broker version is 2.1.
    2827
    * <p>
    2928
    * {@link org.apache.kafka.clients.producer.ProducerInterceptor} : The {@link ClusterResourceListener#onUpdate(ClusterResource)} method will be invoked after {@link org.apache.kafka.clients.producer.ProducerInterceptor#onSend(org.apache.kafka.clients.producer.ProducerRecord)}
    3029
    * but before {@link org.apache.kafka.clients.producer.ProducerInterceptor#onAcknowledgement(org.apache.kafka.clients.producer.RecordMetadata, Exception)} .

    clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java

    Lines changed: 1 addition & 4 deletions
    Original file line numberDiff line numberDiff line change
    @@ -108,10 +108,7 @@ public class TopicConfig {
    108108

    109109
    public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
    110110
    public static final String MAX_MESSAGE_BYTES_DOC =
    111-
    "The largest record batch size allowed by Kafka (after compression if compression is enabled). " +
    112-
    "In the latest message format version, records are always grouped into batches for efficiency. " +
    113-
    "In previous message format versions, uncompressed records are not grouped into batches and this " +
    114-
    "limit only applies to a single record in that case.";
    111+
    "The largest record batch size allowed by Kafka (after compression if compression is enabled).";
    115112

    116113
    public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes";
    117114
    public static final String INDEX_INTERVAL_BYTES_DOC = "This setting controls how frequently " +

    clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java

    Lines changed: 1 addition & 3 deletions
    Original file line numberDiff line numberDiff line change
    @@ -26,7 +26,6 @@
    2626
    import org.apache.kafka.common.header.internals.RecordHeaders;
    2727
    import org.apache.kafka.common.message.FetchResponseData;
    2828
    import org.apache.kafka.common.metrics.Metrics;
    29-
    import org.apache.kafka.common.protocol.ApiKeys;
    3029
    import org.apache.kafka.common.record.ControlRecordType;
    3130
    import org.apache.kafka.common.record.EndTransactionMarker;
    3231
    import org.apache.kafka.common.record.MemoryRecords;
    @@ -227,8 +226,7 @@ private CompletedFetch newCompletedFetch(long fetchOffset,
    227226
    TP,
    228227
    partitionData,
    229228
    metricAggregator,
    230-
    fetchOffset,
    231-
    ApiKeys.FETCH.latestVersion());
    229+
    fetchOffset);
    232230
    }
    233231

    234232
    private static Deserializers<UUID, UUID> newUuidDeserializers() {

    clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java

    Lines changed: 1 addition & 3 deletions
    Original file line numberDiff line numberDiff line change
    @@ -20,7 +20,6 @@
    2020
    import org.apache.kafka.common.TopicPartition;
    2121
    import org.apache.kafka.common.message.FetchResponseData;
    2222
    import org.apache.kafka.common.metrics.Metrics;
    23-
    import org.apache.kafka.common.protocol.ApiKeys;
    2423
    import org.apache.kafka.common.serialization.StringSerializer;
    2524
    import org.apache.kafka.common.utils.BufferSupplier;
    2625
    import org.apache.kafka.common.utils.LogContext;
    @@ -198,8 +197,7 @@ private CompletedFetch completedFetch(TopicPartition tp) {
    198197
    tp,
    199198
    partitionData,
    200199
    metricsAggregator,
    201-
    0L,
    202-
    ApiKeys.FETCH.latestVersion());
    200+
    0L);
    203201
    }
    204202

    205203
    /**

    clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java

    Lines changed: 1 addition & 3 deletions
    Original file line numberDiff line numberDiff line change
    @@ -26,7 +26,6 @@
    2626
    import org.apache.kafka.common.internals.ClusterResourceListeners;
    2727
    import org.apache.kafka.common.message.FetchResponseData;
    2828
    import org.apache.kafka.common.metrics.Metrics;
    29-
    import org.apache.kafka.common.protocol.ApiKeys;
    3029
    import org.apache.kafka.common.protocol.Errors;
    3130
    import org.apache.kafka.common.record.ControlRecordType;
    3231
    import org.apache.kafka.common.record.EndTransactionMarker;
    @@ -921,8 +920,7 @@ private CompletedFetch build() {
    921920
    topicPartition,
    922921
    partitionData,
    923922
    metricsAggregator,
    924-
    fetchOffset,
    925-
    ApiKeys.FETCH.latestVersion());
    923+
    fetchOffset);
    926924
    }
    927925
    }
    928926

    0 commit comments

    Comments
     (0)
    0