8000 KAFKA-19019: Add support for remote storage fetch for share groups by arvi18 · Pull Request #12 · coderabbit-test/kafka · GitHub
[go: up one dir, main page]

Skip to content

KAFKA-19019: Add support for remote storage fetch for share groups #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added unit tests
  • Loading branch information
adixitconfluent committed Apr 12, 2025
commit 818684c4adf51e61abf211d2482e41a1c975e44a
34 changes: 24 additions & 10 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,8 @@ public DelayedShareFetch(

@Override
public void onExpiration() {
// cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is
// already running as it may force closing opened/cached resources as transaction index.
if (remoteFetchOpt.isPresent()) {
boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false);
if (!cancelled) {
log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}",
remoteFetchOpt.get().remoteFetchInfo(), remoteFetchOpt.get().remoteFetchTask().isDone());
}
cancelRemoteFetchTask();
}
expiredRequestMeter.mark();
}
Expand Down Expand Up @@ -581,6 +575,11 @@ Lock lock() {
return lock;
}

// Visible for testing.
RemoteFetch remoteFetch() {
return remoteFetchOpt.orElse(null);
}

// Visible for testing.
Meter expiredRequestMeter() {
return expiredRequestMeter;
Expand Down Expand Up @@ -674,7 +673,7 @@ private Optional<Exception> processRemoteFetchOrException(
private boolean maybeCompletePendingRemoteFetch() {
boolean canComplete = false;

for (Map.Entry<TopicIdPartition, LogOffsetMetadata> entry : remoteFetchOpt.get().fetchOffsetMetadataMap.entrySet()) {
for (Map.Entry<TopicIdPartition, LogOffsetMetadata> entry : remoteFetchOpt.get().fetchOffsetMetadataMap().entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
LogOffsetMetadata fetchOffsetMetadata = entry.getValue();
try {
Expand All @@ -695,7 +694,7 @@ private boolean maybeCompletePendingRemoteFetch() {
break;
}

if (canComplete || remoteFetchOpt.get().remoteFetchResult.isDone()) { // Case d
if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) { // Case d
boolean completedByMe = forceComplete();
// If invocation of forceComplete is not successful, then that means the request is already completed
// hence release the acquired locks.
Expand Down Expand Up @@ -734,7 +733,7 @@ private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi

/**
* This function completes a share fetch request for which we have identified remoteFetch during tryComplete()
* It should only be called when we know that there is remote fetch in-flight/completed.
* Note - This function should only be called when we know that there is remote fetch in-flight/completed.
*/
private void completeRemoteStorageShareFetchRequest() {
LinkedHashMap<TopicIdPartition, Long> nonRemoteFetchTopicPartitionData = new LinkedHashMap<>();
Expand Down Expand Up @@ -776,6 +775,8 @@ private void completeRemoteStorageShareFetchRequest() {
);
readableBytes += info.records.sizeInBytes();
}
} else {
cancelRemoteFetchTask();
}

// If remote fetch bytes < shareFetch.fetchParams().maxBytes, then we will try for a local read.
Expand Down Expand Up @@ -832,6 +833,19 @@ private void completeRemoteStorageShareFetchRequest() {
}
}

/**
* Cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is
* already running as it may force closing opened/cached resources as transaction index.
* Note - This function should only be called when we know that there is a remote fetch in-flight/completed.
*/
private void cancelRemoteFetchTask() {
boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false);
if (!cancelled) {
log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}",
remoteFetchOpt.get().remoteFetchInfo(), remoteFetchOpt.get().remoteFetchTask().isDone());
}
}

public record RemoteFetch(
TopicIdPartition topicIdPartition,
Future<Void> remoteFetchTask,
Expand Down
Loading
0