8000 KAFKA-16736 Remove `offsets.commit.required.acks` in 4.0 (#16938) · Python-Repository-Hub/kafka@944c135 · GitHub
[go: up one dir, main page]

8000
Skip to content

Commit 944c135

Browse files
authored
KAFKA-16736 Remove offsets.commit.required.acks in 4.0 (apache#16938)
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 7b6d4fb commit 944c135

File tree

8 files changed

+5
-50
lines changed

8 files changed

+5
-50
lines changed

core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import org.apache.kafka.coordinator.group.{Group, OffsetConfig}
3636
import org.apache.kafka.server.record.BrokerCompressionType
3737
import org.apache.kafka.storage.internals.log.VerificationGuard
3838

39-
import scala.annotation.nowarn
4039
import scala.collection.{Map, Seq, Set, immutable, mutable}
4140
import scala.math.max
4241

@@ -1768,7 +1767,6 @@ object GroupCoordinator {
17681767
GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, time, metrics)
17691768
}
17701769

1771-
@nowarn("cat=deprecation")
17721770
private[group] def offsetConfig(config: KafkaConfig) = new OffsetConfig(
17731771
config.groupCoordinatorConfig.offsetMetadataMaxSize,
17741772
config.groupCoordinatorConfig.offsetsLoadBufferSize,
@@ -1778,8 +1776,7 @@ object GroupCoordinator {
17781776
config.groupCoordinatorConfig.offsetsTopicSegmentBytes,
17791777
config.groupCoordinatorConfig.offsetsTopicReplicationFactor,
17801778
config.groupCoordinatorConfig.offsetTopicCompressionType,
1781-
config.groupCoordinatorConfig.offsetCommitTimeoutMs,
1782-
config.groupCoordinatorConfig.offsetCommitRequiredAcks
1779+
config.groupCoordinatorConfig.offsetCommitTimeoutMs
17831780
)
17841781

17851782
private[group] def apply(

core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ class GroupMetadataManager(brokerId: Int,
330330< 8000 code class="diff-text syntax-highlighted-line">
// call replica manager to append the group message
331331
replicaManager.appendRecords(
332332
timeout = config.offsetCommitTimeoutMs.toLong,
333-
requiredAcks = config.offsetCommitRequiredAcks,
333+
requiredAcks = -1,
334334
internalTopicsAllowed = true,
335335
origin = AppendOrigin.COORDINATOR,
336336
entriesPerPartition = records,

core/src/main/scala/kafka/server/KafkaConfig.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,11 +1067,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
10671067
require(principalBuilderClass != null, s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must be non-null")
10681068
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass),
10691069
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")
1070-
1071-
1072-
if (originals.containsKey(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG)) {
1073-
warn(s"${GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG} is deprecated and it will be removed in Apache Kafka 4.0.")
1074-
}
10751070
}
10761071

10771072
/**

core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
5656
import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyShort}
5757
import org.mockito.Mockito.{mock, reset, times, verify, when}
5858

59-
import scala.annotation.nowarn
6059
import scala.jdk.CollectionConverters._
6160
import scala.collection.{immutable, _}
6261

@@ -80,7 +79,6 @@ class GroupMetadataManagerTest {
8079
val defaultRequireStable = false
8180
val numOffsetsPartitions = 2
8281

83-
@nowarn("cat=deprecation")
8482
private val offsetConfig = {
8583
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = ""))
8684
new OffsetConfig(config.groupCoordinatorConfig.offsetMetadataMaxSize,
@@ -91,8 +89,7 @@ class GroupMetadataManagerTest {
9189
config.groupCoordinatorConfig.offsetsTopicSegmentBytes,
9290
config.groupCoordinatorConfig.offsetsTopicReplicationFactor,
9391
config.groupCoordinatorConfig.offsetTopicCompressionType,
94-
config.groupCoordinatorConfig.offsetCommitTimeoutMs,
95-
config.groupCoordinatorConfig.offsetCommitRequiredAcks)
92+
config.groupCoordinatorConfig.offsetCommitTimeoutMs)
9693
}
9794

9895
@BeforeEach

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -982,7 +982,6 @@ class KafkaConfigTest {
982982
case GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
983983
case GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
984984
case GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
985-
case GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-2")
986985
case TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
987986
case TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
988987
case TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,6 @@ public class GroupCoordinatorConfig {
204204
public static final String OFFSET_COMMIT_TIMEOUT_MS_DOC = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " +
205205
"or this timeout is reached. This is similar to the producer request timeout.";
206206

207-
@Deprecated
208-
public static final String OFFSET_COMMIT_REQUIRED_ACKS_CONFIG = "offsets.commit.required.acks";
209-
public static final short OFFSET_COMMIT_REQUIRED_ACKS_DEFAULT = -1;
210-
public static final String OFFSET_COMMIT_REQUIRED_ACKS_DOC = "DEPRECATED: The required acks before the commit can be accepted. In general, the default (-1) should not be overridden.";
211-
212207
public static final ConfigDef GROUP_COORDINATOR_CONFIG_DEF = new ConfigDef()
213208
.define(GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
214209
.define(GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
@@ -232,8 +227,7 @@ public class GroupCoordinatorConfig {
232227
.define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
233228
.define(OFFSETS_RETENTION_MINUTES_CONFIG, INT, OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_MINUTES_DOC)
234229
.define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC)
235-
.define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC)
236-
.define(OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, SHORT, OFFSET_COMMIT_REQUIRED_ACKS_DEFAULT, HIGH, OFFSET_COMMIT_REQUIRED_ACKS_DOC);
230+
.define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC);
237231
public static final ConfigDef CONSUMER_GROUP_CONFIG_DEF = new ConfigDef()
238232
.define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
239233
.define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
@@ -278,7 +272,6 @@ public class GroupCoordinatorConfig {
278272
private final int offsetsLoadBufferSize;
279273
private final int offsetsTopicPartitions;
280274
private final short offsetsTopicReplicationFactor;
281-
private final short offsetCommitRequiredAcks;
282275
private final int consumerGroupMinSessionTimeoutMs;
283276
private final int consumerGroupMaxSessionTimeoutMs;
284277
private final int consumerGroupMinHeartbeatIntervalMs;
@@ -317,7 +310,6 @@ public GroupCoordinatorConfig(AbstractConfig config) {
317310
this.offsetsLoadBufferSize = config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG);
318311
this.offsetsTopicPartitions = config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG);
319312
this.offsetsTopicReplicationFactor = config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG);
320-
this.offsetCommitRequiredAcks = config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG);
321313
this.consumerGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
322314
this.consumerGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
323315
this.consumerGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -331,9 +323,6 @@ public GroupCoordinatorConfig(AbstractConfig config) {
331323
this.shareGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
332324
this.shareGroupMaxSize = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
333325

334-
require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor,
335-
String.format("%s must be greater or equal to -1 and less or equal to %s", OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG));
336-
337326
// New group coordinator configs validation.
338327
require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
339328
String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
@@ -543,15 +532,6 @@ public short offsetsTopicReplicationFactor() {
543532
return offsetsTopicReplicationFactor;
544533
}
545534

546-
/**
547-
* DEPRECATED: The required acks before the commit can be accepted.
548-
* In general, the default (-1) should not be overridden.
549-
*/
550-
@Deprecated // since 3.8
551-
public short offsetCommitRequiredAcks() {
552-
return offsetCommitRequiredAcks;
553-
}
554-
555535
/**
556536
* The minimum allowed session timeout for registered consumers.
557537
*/

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ public class OffsetConfig {
2828
public final short offsetsTopicReplicationFactor;
2929
public final CompressionType offsetsTopicCompressionType;
3030
public final int offsetCommitTimeoutMs;
31-
public final short offsetCommitRequiredAcks;
3231

3332
/**
3433
* Configuration settings for in-built offset management
@@ -50,8 +49,6 @@ public class OffsetConfig {
5049
* order to achieve "atomic" commits.
5150
* @param offsetCommitTimeoutMs The offset commit will be delayed until all replicas for the offsets topic receive the
5251
* commit or this timeout is reached. (Similar to the producer request timeout.)
53-
* @param offsetCommitRequiredAcks The required acks before the commit can be accepted. In general, the default (-1)
54-
* should not be overridden.
5552
*/
5653
public OffsetConfig(int maxMetadataSize,
5754
int loadBufferSize,
@@ -61,8 +58,7 @@ public OffsetConfig(int maxMetadataSize,
6158
int offsetsTopicSegmentBytes,
6259
short offsetsTopicReplicationFactor,
6360
CompressionType offsetsTopicCompressionType,
64-
int offsetCommitTimeoutMs,
65-
short offsetCommitRequiredAcks
61+
int offsetCommitTimeoutMs
6662
) {
6763
this.maxMetadataSize = maxMetadataSize;
6864
this.loadBufferSize = loadBufferSize;
@@ -73,6 +69,5 @@ public OffsetConfig(int maxMetadataSize,
7369
this.offsetsTopicReplicationFactor = offsetsTopicReplicationFactor;
7470
this.offsetsTopicCompressionType = offsetsTopicCompressionType;
7571
this.offsetCommitTimeoutMs = offsetCommitTimeoutMs;
76-
this.offsetCommitRequiredAcks = offsetCommitRequiredAcks;
7772
}
7873
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ public void testConfigs() {
6868
configs.put(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG, 555);
6969
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 111);
7070
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 11);
71-
configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, (short) 0);
7271
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 333);
7372
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 666);
7473
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111);
@@ -97,7 +96,6 @@ public void testConfigs() {
9796
assertEquals(555, config.offsetsLoadBufferSize());
9897
assertEquals(111, config.offsetsTopicPartitions());
9998
assertEquals(11, config.offsetsTopicReplicationFactor());
100-
assertEquals(0, config.offsetCommitRequiredAcks());
10199
assertEquals(333, config.consumerGroupMinSessionTimeoutMs());
102100
assertEquals(666, config.consumerGroupMaxSessionTimeoutMs());
103101
assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs());
@@ -107,12 +105,6 @@ public void testConfigs() {
107105
@Test
108106
public void testInvalidConfigs() {
109107
Map<String, Object> configs = new HashMap<>();
110-
configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, (short) -2);
111-
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 3);
112-
assertEquals("offsets.commit.required.acks must be greater or equal to -1 and less or equal to offsets.topic.replication.factor",
113-
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
114-
115-
configs.clear();
116108
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 10);
117109
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 20);
118110
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 20);

0 commit comments

Comments
 (0)
0