8000 KAFKA-19163: Avoid deleting groups with pending transactional offsets · coderabbit-test/kafka@3c0cbb0 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3c0cbb0

Browse files
KAFKA-19163: Avoid deleting groups with pending transactional offsets
When a group has pending transactional offsets but no committed offsets, we can accidentally delete it while cleaning up expired offsets. Add a check to avoid this case.
1 parent d670f68 commit 3c0cbb0

File tree

2 files changed

+37
-1
lines changed

2 files changed

+37
-1
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -858,7 +858,7 @@ public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> rec
858858
TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic =
859859
offsets.offsetsByGroup.get(groupId);
860860
if (offsetsByTopic == null) {
861-
return true;
861+
return !openTransactionsByGroup.containsKey(groupId);
862862
}
863863

864864
// We expect the group to exist.

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2654,6 +2654,42 @@ public void testCleanupExpiredOffsetsWithDeletedPendingTransactionalOffsets() {
26542654
assertEquals(expectedRecords, records);
26552655
}
26562656

2657+
@Test
2658+
public void testCleanupExpiredOffsetsWithPendingTransactionalOffsetsOnly() {
2659+
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
2660+
Group group = mock(Group.class);
2661+
2662+
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
2663+
.withGroupMetadataManager(groupMetadataManager)
2664+
.withOffsetsRetentionMinutes(1)
2665+
.build();
2666+
2667+
long commitTimestamp = context.time.milliseconds();
2668+
2669+
context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
2670+
context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, commitTimestamp + 500);
2671+
2672+
context.time.sleep(Duration.ofMinutes(1).toMillis());
2673+
2674+
when(groupMetadataManager.group("group-id")).thenReturn(group);
2675+
when(group.offsetExpirationCondition()).thenReturn(Optional.of(
2676+
new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)));
2677+
when(group.isSubscribedToTopic("foo")).thenReturn(false);
2678+
2679+
// foo-0 is expired, but the group is not deleted beacuse it has pending transactional offset commits.
2680+
List<CoordinatorRecord> expectedRecords = List.of(
2681+
GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 0)
2682+
);
2683+
List<CoordinatorRecord> records = new ArrayList<>();
2684+
assertFalse(context.cleanupExpiredOffsets("group-id", records));
2685+
assertEquals(expectedRecords, records);
2686+
2687+
// No offsets are expired, and the group is still not deleted because it has pending transactional offset commits.
2688+
records = new ArrayList<>();
2689+
assertFalse(context.cleanupExpiredOffsets("group-id", records));
2690+
assertEquals(List.of(), records);
2691+
}
2692+
26572693
private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(
26582694
int partition,
26592695
long offset,

0 commit comments

Comments
 (0)
0