8000 KAFKA-19164: Keep track of groups when deleting transactional offsets · coderabbit-test/kafka@d670f68 · GitHub
[go: up one dir, main page]

Skip to content

Commit d670f68

Browse files
KAFKA-19164: Keep track of groups when deleting transactional offsets
When deleting pending transactional offsets, we must preserve the list of groups associated with the producer ID, otherwise we cannot clean up the list of pending transactions for the group once the transaction is committed or aborted.
1 parent 6a4207f commit d670f68

File tree

2 files changed

+72
-4
lines changed

2 files changed

+72
-4
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,12 +198,19 @@ public OffsetMetadataManager build() {
198198
private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup;
199199

200200
private class Offsets {
201+
/**
202+
* Whether to preserve empty entries for groups when removing offsets.
203+
* We use this to keep track of the groups associated with pending transactions.
204+
*/
205+
private final boolean preserveGroups;
206+
201207
/**
202208
* The offsets keyed by group id, topic name and partition id.
203209
*/
204210
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup;
205211

206-
private Offsets() {
212+
private Offsets(boolean preserveGroups) {
213+
this.preserveGroups = preserveGroups;
207214
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
208215
}
209216

@@ -256,7 +263,7 @@ private OffsetAndMetadata remove(
256263
if (partitionOffsets.isEmpty())
257264
topicOffsets.remove(topic);
258265

259-
if (topicOffsets.isEmpty())
266+
if (!preserveGroups && topicOffsets.isEmpty())
260267
offsetsByGroup.remove(groupId);
261268

262269
return removedValue;
@@ -278,7 +285,7 @@ private OffsetAndMetadata remove(
278285
this.groupMetadataManager = groupMetadataManager;
279286
this.config = config;
280287
this.metrics = metrics;
281-
this.offsets = new Offsets();
288+
this.offsets = new Offsets(false);
282289
this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
283290
this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
284291
}
@@ -995,7 +1002,7 @@ public void replay(
9951002
// offsets store. Pending offsets there are moved to the main store when
9961003
// the transaction is committed; or removed when the transaction is aborted.
9971004
pendingTransactionalOffsets
998-
.computeIfAbsent(producerId, __ -> new Offsets())
1005+
.computeIfAbsent(producerId, __ -> new Offsets(true))
9991006
.put(
10001007
groupId,
10011008
topic,

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2593,6 +2593,67 @@ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() {
25932593
assertEquals(List.of(), records);
25942594
}
25952595

2596+
@Test
2597+
public void testCleanupExpiredOffsetsWithDeletedPendingTransactionalOffsets() {
2598+
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
2599+
Group group = mock(Group.class);
2600+
2601+
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
2602+
.withGroupMetadataManager(groupMetadataManager)
2603+
.withOffsetsRetentionMinutes(1)
2604+
.build();
2605+
2606+
long commitTimestamp = context.time.milliseconds();
2607+
2608+
context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
2609+
context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, commitTimestamp + 500);
2610+
2611+
when(groupMetadataManager.group("group-id")).thenReturn(group);
2612+
when(group.offsetExpirationCondition()).thenReturn(Optional.of(
2613+
new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)));
2614+
when(group.isSubscribedToTopic("foo")).thenReturn(false);
2615+
2616+
// Delete the pending transactional offset.
2617+
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection =
2618+
new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(List.of(
2619+
new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
2620+
.setName("foo")
2621+
.setPartitions(List.of(
2622+
new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(1)
2623+
))
2624+
).iterator());
2625+
CoordinatorResult<OffsetDeleteResponseData, CoordinatorRecord> result = context.deleteOffsets(
2626+
new OffsetDeleteRequestData()
2627+
.setGroupId("group-id")
2628+
.setTopics(requestTopicCollection)
2629+
);
2630+
List<CoordinatorRecord> expectedRecords = List.of(
2631+
GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1)
2632+
);
2633+
assertEquals(expectedRecords, result.records());
2634+
2635+
context.time.sleep(Duration.ofMinutes(1).toMillis());
2636+
2637+
// The group should not be deleted because it has a pending transaction.
2638+
expectedRecords = List.of(
2639+
GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 0)
2640+
);
2641+
List<CoordinatorRecord> records = new ArrayList<>();
2642+
assertFalse(context.cleanupExpiredOffsets("group-id", records));
2643+
assertEquals(expectedRecords, records);
2644+
2645+
// Commit the ongoing transaction.
2646+
context.replayEndTransactionMarker(10L, TransactionResult.COMMIT);
2647+
2648+
// The group should be deletable now.
2649+
context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
2650+
context.time.sleep(Duration.ofMinutes(1).toMillis());
2651+
2652+
records = new ArrayList<>();
2653+
assertTrue(context.cleanupExpiredOffsets("group-id", records));
2654+
assertEquals(expectedRecords, records);
2655+
}
2656+
25962657
private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(
25972658
int partition,
25982659
long offset,

0 commit comments

Comments
 (0)
0