8000 KAFKA-19019: Add support for remote storage fetch for share groups by arvi18 · Pull Request #12 · coderabbit-test/kafka · GitHub
[go: up one dir, main page]

Skip to content

KAFKA-19019: Add support for remote storage fetch for share groups #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Code refactor
  • Loading branch information
adixitconfluent committed Apr 12, 2025
commit af9e6cac7ab4b883a7e1c20762f23e17e251441a
45 changes: 33 additions & 12 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,13 @@ private Optional<Exception> processRemoteFetchOrException(
LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap,
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse
) {
// TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for
// a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work,
// we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform
// fetch for multiple remote fetch topic partition in a single share fetch request
TopicIdPartition remoteFetchTopicIdPartition = getRemoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
RemoteStorageFetchInfo remoteStorageFetchInfo = remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition);

LinkedHashMap<TopicIdPartition, LogOffsetMetadata> fetchOffsetMetadataMap = new LinkedHashMap<>();
remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> fetchOffsetMetadataMap.put(
topicIdPartition,
Expand All @@ -639,16 +646,12 @@ private Optional<Exception> processRemoteFetchOrException(

Future<Void> remoteFetchTask;
CompletableFuture<RemoteLogReadResult> remoteFetchResult = new CompletableFuture<>();
// TODO: This is a limitation in remote storage fetch that there will be fetch only for a single topic partition.
Map.Entry<TopicIdPartition, RemoteStorageFetchInfo> firstRemoteStorageFetchInfo = remoteStorageFetchInfoMap.entrySet().iterator().next();
TopicIdPartition topicIdPartition = firstRemoteStorageFetchInfo.getKey();
RemoteStorageFetchInfo remoteStorageFetchInfo = firstRemoteStorageFetchInfo.getValue();
try {
remoteFetchTask = replicaManager.remoteLogManager().get().asyncRead(
remoteStorageFetchInfo,
result -> {
remoteFetchResult.complete(result);
replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()));
replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition()));
}
);
} catch (RejectedExecutionException e) {
Expand All @@ -658,10 +661,28 @@ private Optional<Exception> processRemoteFetchOrException(
} catch (Exception e) {
return Optional.of(e);
}
remoteFetchOpt = Optional.of(new RemoteFetch(topicIdPartition, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap));
remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap));
return Optional.empty();
}

/**
* This function returns the first topic partition for which we need to perform remote storage fetch. We remove all the
* other partitions that can have a remote storage fetch for further processing and release the fetch locks for them.
* @param remoteStorageFetchInfoMap map containing topic partition to remote storage fetch information.
* @return the first topic partition for which we need to perform remote storage fetch
*/
private TopicIdPartition getRemoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {
Map.Entry<TopicIdPartition, RemoteStorageFetchInfo> firstRemoteStorageFetchInfo = remoteStorageFetchInfoMap.entrySet().iterator().next();
TopicIdPartition remoteFetchTopicIdPartition = firstRemoteStorageFetchInfo.getKey();
remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> {
if (!topicIdPartition.equals(remoteFetchTopicIdPartition)) {
partitionsAcquired.remove(topicIdPartition);
releasePartitionLocks(Set.of(topicIdPartition));
}
});
return remoteFetchTopicIdPartition;
}

/**
* This function checks if the remote fetch can be completed or not. It should always be called once you confirm remoteFetchOpt.isPresent().
* The operation can be completed if:
Expand Down Expand Up @@ -741,26 +762,26 @@ private void completeRemoteStorageShareFetchRequest() {
try {
List<ShareFetchPartitionData> shareFetchPartitionData = new ArrayList<>();
int readableBytes = 0;
if (remoteFetchOpt.get().remoteFetchResult.isDone()) {
if (remoteFetchOpt.get().remoteFetchResult().isDone()) {
RemoteFetch remoteFetch = remoteFetchOpt.get();
if (remoteFetch.remoteFetchResult().get().error.isPresent()) {
Throwable error = remoteFetch.remoteFetchResult().get().error.get();
// If there is any error for the remote fetch topic partition, we populate the error accordingly.
shareFetchPartitionData.add(
new ShareFetchPartitionData(
remoteFetch.topicIdPartition,
partitionsAcquired.get(remoteFetch.topicIdPartition),
remoteFetch.topicIdPartition(),
partitionsAcquired.get(remoteFetch.topicIdPartition()),
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
)
);
} else {
FetchDataInfo info = remoteFetch.remoteFetchResult().get().fetchDataInfo.get();
TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition;
TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition();
LogReadResult logReadResult = localPartitionsAlreadyFetched.get(topicIdPartition);
shareFetchPartitionData.add(
new ShareFetchPartitionData(
topicIdPartition,
partitionsAcquired.get(remoteFetch.topicIdPartition),
partitionsAcquired.get(remoteFetch.topicIdPartition()),
new FetchPartitionData(
logReadResult.error(),
logReadResult.highWatermark(),
Expand All @@ -785,7 +806,7 @@ private void completeRemoteStorageShareFetchRequest() {
// Get the local log read based topic partitions.
LinkedHashMap<TopicIdPartition, SharePartition> nonRemoteFetchSharePartitions = new LinkedHashMap<>();
sharePartitions.forEach((topicIdPartition, sharePartition) -> {
if (!partitionsAcquired.containsKey(topicIdPartition)) {
if (!partitionsAcquired.containsKey(topicIdPartition) && !remoteFetchOpt.get().fetchOffsetMetadataMap().containsKey(topicIdPartition)) {
nonRemoteFetchSharePartitions.put(topicIdPartition, sharePartition);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1687,8 +1687,9 @@ public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() {
assertTrue(delayedShareFetch.isCompleted());
// Pending remote fetch object gets created for delayed share fetch.
assertNotNull(delayedShareFetch.remoteFetch());
// Verify the locks are released for tp0 and tp1.
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1));
// Verify the locks are released separately for tp0 (from onComplete) and tp1 (from tryComplete).
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0));
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1));
assertTrue(shareFetch.isCompleted());
// Share fetch response only contains the first remote storage fetch topic partition - tp0.
assertEquals(Set.of(tp0), future.join().keySet());
Expand Down
0