@@ -2593,6 +2593,67 @@ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() {
2593
2593
assertEquals (List .of (), records );
2594
2594
}
2595
2595
2596
+ @ Test
2597
+ public void testCleanupExpiredOffsetsWithDeletedPendingTransactionalOffsets () {
2598
+ GroupMetadataManager groupMetadataManager = mock (GroupMetadataManager .class );
2599
+ Group group = mock (Group .class );
2600
+
2601
+ OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext .Builder ()
2602
+ .withGroupMetadataManager (groupMetadataManager )
2603
+ .withOffsetsRetentionMinutes (1 )
2604
+ .build ();
2605
+
2606
+ long commitTimestamp = context .time .milliseconds ();
2607
+
2608
+ context .commitOffset ("group-id" , "foo" , 0 , 100L , 0 , commitTimestamp );
2609
+ context .commitOffset (10L , "group-id" , "foo" , 1 , 101L , 0 , commitTimestamp + 500 );
2610
+
2611
+ when (groupMetadataManager .group ("group-id" )).thenReturn (group );
2612
+ when (group .offsetExpirationCondition ()).thenReturn (Optional .of (
2613
+ new OffsetExpirationConditionImpl (offsetAndMetadata -> offsetAndMetadata .commitTimestampMs )));
2614
+ when (group .isSubscribedToTopic ("foo" )).thenReturn (false );
2615
+
2616
+ // Delete the pending transactional offset.
2617
+ OffsetDeleteRequestData .OffsetDeleteRequestTopicCollection requestTopicCollection =
2618
+ new OffsetDeleteRequestData .OffsetDeleteRequestTopicCollection (List .of (
2619
+ new OffsetDeleteRequestData .OffsetDeleteRequestTopic ()
2620
+ .setName ("foo" )
2621
+ .setPartitions (List .of (
2622
+ new OffsetDeleteRequestData .OffsetDeleteRequestPartition ().setPartitionIndex (1 )
2623
+ ))
2624
+ ).iterator ());
2625
+ CoordinatorResult <OffsetDeleteResponseData , CoordinatorRecord > result = context .deleteOffsets (
2626
+ new OffsetDeleteRequestData ()
2627
+ .setGroupId ("group-id" )
2628
+ .setTopics (requestTopicCollection )
2629
+ );
2630
+ List <CoordinatorRecord > expectedRecords = List .of (
2631
+ GroupCoordinatorRecordHelpers .newOffsetCommitTombstoneRecord ("group-id" , "foo" , 1 )
2632
+ );
2633
+ assertEquals (expectedRecords , result .records ());
2634
+
2635
+ context .time .sleep (Duration .ofMinutes (1 ).toMillis ());
2636
+
2637
+ // The group should not be deleted because it has a pending transaction.
2638
+ expectedRecords = List .of (
2639
+ GroupCoordinatorRecordHelpers .newOffsetCommitTombstoneRecord ("group-id" , "foo" , 0 )
2640
+ );
2641
+ List <CoordinatorRecord > records = new ArrayList <>();
2642
+ assertFalse (context .cleanupExpiredOffsets ("group-id" , records ));
2643
+ assertEquals (expectedRecords , records );
2644
+
2645
+ // Commit the ongoing transaction.
2646
+ context .replayEndTransactionMarker (10L , TransactionResult .COMMIT );
2647
+
2648
+ // The group should be deletable now.
2649
+ context .commitOffset ("group-id" , "foo" , 0 , 100L , 0 , commitTimestamp );
2650
+ context .time .sleep (Duration .ofMinutes (1 ).toMillis ());
2651
+
2652
+ records = new ArrayList <>();
2653
+ assertTrue (context .cleanupExpiredOffsets ("group-id" , records ));
2654
+ assertEquals (expectedRecords , records );
2655
+ }
2656
+
2596
2657
private static OffsetFetchResponseData .OffsetFetchResponsePartitions mkOffsetPartitionResponse (
2597
2658
int partition ,
2598
2659
long offset ,
0 commit comments