8000 KAFKA-19019: Add support for remote storage fetch for share groups by arvi18 · Pull Request #12 · coderabbit-test/kafka · GitHub
[go: up one dir, main page]

Skip to content

KAFKA-19019: Add support for remote storage fetch for share groups #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: trunk
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Addressed Jun's round 4 review comments
  • Loading branch information
adixitconfluent committed Apr 19, 2025
commit 25788b32f2c983184db08aa82dc8a9c450d04dcb
27 changes: 9 additions & 18 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
91CF
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -304,7 +303,7 @@ public boolean tryComplete() {
localPartitionsAlreadyFetched = replicaManagerReadResponse;
boolean completedByMe = forceComplete();
8000 // If invocation of forceComplete is not successful, then that means the request is already completed
// hence release the acquired locks.
// hence the acquired locks are already released.
if (!completedByMe) {
releasePartitionLocks(partitionsAcquired.keySet());
}
Expand Down Expand Up @@ -335,7 +334,7 @@ public boolean tryComplete() {
} else {
boolean completedByMe = forceComplete();
// If invocation of forceComplete is not successful, then that means the request is already completed
// hence release the acquired locks. This can occur in case of remote storage fetch if there is a thread that
// hence the acquired locks are already released. This can occur in case of remote storage fetch if there is a thread that
// completes the operation (due to expiration) right before a different thread is about to enter tryComplete.
if (!completedByMe) {
releasePartitionLocks(partitionsAcquired.keySet());
Expand Down Expand Up @@ -626,7 +625,7 @@ private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchIn
private boolean maybeProcessRemoteFetch(
LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
) throws Exception {
) {
Set<TopicIdPartition> nonRemoteFetchTopicPartitions = new LinkedHashSet<>();
topicPartitionData.keySet().forEach(topicIdPartition -> {
// topic partitions for which fetch would not be happening in this share fetch request.
Expand All @@ -637,19 +636,16 @@ private boolean maybeProcessRemoteFetch(
// Release fetch lock for the topic partitions that were acquired but were not a part of remote fetch and add
// them to the delayed actions queue.
releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions);
Optional<Exception> exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo);
if (exceptionOpt.isPresent()) {
throw exceptionOpt.get();
}
processRemoteFetchOrException(topicPartitionRemoteFetchInfo);
// Check if remote fetch can be completed.
return maybeCompletePendingRemoteFetch();
}

/**
* Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional.
* @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information.
* Throws an exception if a task for remote storage fetch could not be scheduled successfully else updates remoteFetchOpt.
* @param topicPartitionRemoteFetchInfo - The remote storage fetch information.
*/
private Optional<Exception> processRemoteFetchOrException(
private void processRemoteFetchOrException(
TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
) {
TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition();
Expand All @@ -665,17 +661,12 @@ private Optional<Exception> processRemoteFetchOrException(
replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition()));
}
);
} catch (RejectedExecutionException e) {
// Return the error if any in scheduling the remote fetch task.
log.warn("Unable to fetch data from remote storage", e);
remoteStorageFetchException = Optional.of(e);
return Optional.of(e);
} catch (Exception e) {
// Throw the error if any in scheduling the remote fetch task.
remoteStorageFetchException = Optional.of(e);
return Optional.of(e);
throw e;
}
remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, topicPartitionRemoteFetchInfo.logReadResult(), remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo));
return Optional.empty();
}

/**
Expand Down
0