@@ -2654,6 +2654,42 @@ public void testCleanupExpiredOffsetsWithDeletedPendingTransactionalOffsets() {
2654
2654
assertEquals (expectedRecords , records );
2655
2655
}
2656
2656
2657
+ @ Test
2658
+ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsetsOnly () {
2659
+ GroupMetadataManager groupMetadataManager = mock (GroupMetadataManager .class );
2660
+ Group group = mock (Group .class );
2661
+
2662
+ OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext .Builder ()
2663
+ .withGroupMetadataManager (groupMetadataManager )
2664
+ .withOffsetsRetentionMinutes (1 )
2665
+ .build ();
2666
+
2667
+ long commitTimestamp = context .time .milliseconds ();
2668
+
2669
+ context .commitOffset ("group-id" , "foo" , 0 , 100L , 0 , commitTimestamp );
2670
+ context .commitOffset (10L , "group-id" , "foo" , 1 , 101L , 0 , commitTimestamp + 500 );
2671
+
2672
+ context .time .sleep (Duration .ofMinutes (1 ).toMillis ());
2673
+
2674
+ when (groupMetadataManager .group ("group-id" )).thenReturn (group );
2675
+ when (group .offsetExpirationCondition ()).thenReturn (Optional .of (
2676
+ new OffsetExpirationConditionImpl (offsetAndMetadata -> offsetAndMetadata .commitTimestampMs )));
2677
+ when (group .isSubscribedToTopic ("foo" )).thenReturn (false );
2678
+
2679
+ // foo-0 is expired, but the group is not deleted beacuse it has pending transactional offset commits.
2680
+ List <CoordinatorRecord > expectedRecords = List .of (
2681
+ GroupCoordinatorRecordHelpers .newOffsetCommitTombstoneRecord ("group-id" , "foo" , 0 )
2682
+ );
2683
+ List <CoordinatorRecord > records = new ArrayList <>();
2684
+ assertFalse (context .cleanupExpiredOffsets ("group-id" , records ));
2685
+ assertEquals (expectedRecords , records );
2686
+
2687
+ // No offsets are expired, and the group is still not deleted because it has pending transactional offset commits.
2688
+ records = new ArrayList <>();
2689
+ assertFalse (context .cleanupExpiredOffsets ("group-id" , records ));
2690
+ assertEquals (List .of (), records );
2691
+ }
2692
+
2657
2693
private static OffsetFetchResponseData .OffsetFetchResponsePartitions mkOffsetPartitionResponse (
2658
2694
int partition ,
2659
2695
long offset ,
0 commit comments