8000 KAFKA-19019: Add support for remote storage fetch for share groups by arvi18 · Pull Request #12 · coderabbit-test/kafka · GitHub
[go: up one dir, main page]

Skip to content

KAFKA-19019: Add support for remote storage fetch for share groups #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressed Jun's round 1 review comments - part 2
  • Loading branch information
adixitconfluent committed Apr 15, 2025
commit 2dc8ee0486b96647a04c710bd3859fdedce20830
121 changes: 52 additions & 69 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,11 @@ public boolean tryComplete() {
// replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for
// those topic partitions.
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
// Map to store the remote fetch metadata corresponding to the topic partitions for which we need to perform remote fetch.
LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse);
// Store the remote fetch info and the topic partition for which we need to perform remote fetch.
Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse);

if (!remoteStorageFetchInfoMap.isEmpty()) {
return maybeProcessRemoteFetch(topicPartitionData, remoteStorageFetchInfoMap, replicaManagerReadResponse);
if (topicPartitionRemoteFetchInfoOpt.isPresent()) {
return maybeProcessRemoteFetch(topicPartitionData, topicPartitionRemoteFetchInfoOpt.get(), replicaManagerReadResponse);
}
maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse);
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) {
Expand Down Expand Up @@ -592,33 +592,40 @@ Meter expiredRequestMeter() {
return expiredRequestMeter;
}

private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> maybePrepareRemoteStorageFetchInfo(
private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchInfo(
LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse
) {
LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchMetadataMap = new LinkedHashMap<>();
replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) -> {
Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap = Optional.empty();
for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponse.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
LogReadResult logReadResult = entry.getValue();
if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
remoteStorageFetchMetadataMap.put(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get());
// TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for
// a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work,
// we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform
// fetch for multiple remote fetch topic partition in a single share fetch request
remoteStorageFetchMetadataMap = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult.info().delayedRemoteStorageFetch.get()));
partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition));
break;
}
});
}
return remoteStorageFetchMetadataMap;
}

private boolean maybeProcessRemoteFetch(
LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap,
TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse
) throws Exception {
topicPartitionData.keySet().forEach(topicIdPartition -> {
// topic partitions for which fetching would be happening from local log and not remote storage.
if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
// topic partitions for which fetch would not be happening in this share fetch request.
if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
// Release acquisition lock for the topic partitions that were acquired but were not a part of remote fetch.
releasePartitionLocks(Set.of(topicIdPartition));
}
});
Optional<Exception> exceptionOpt = processRemoteFetchOrException(remoteStorageFetchInfoMap, replicaManagerReadResponse);
Optional<Exception> exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo, replicaManagerReadResponse);
if (exceptionOpt.isPresent()) {
remoteStorageFetchException = exceptionOpt;
throw exceptionOpt.get();
Expand All @@ -629,25 +636,15 @@ private boolean maybeProcessRemoteFetch(

/**
* Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional.
* @param remoteStorageFetchInfoMap - The topic partition to remote storage fetch info map
* @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information.
* @param replicaManagerReadResponse - The replica manager read response containing log read results for acquired topic partitions
*/
private Optional<Exception> processRemoteFetchOrException(
LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap,
TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse
) {
// TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for
// a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work,
// we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform
// fetch for multiple remote fetch topic partition in a single share fetch request
TopicIdPartition remoteFetchTopicIdPartition = remoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
RemoteStorageFetchInfo remoteStorageFetchInfo = remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition);

LinkedHashMap<TopicIdPartition, LogOffsetMetadata> fetchOffsetMetadataMap = new LinkedHashMap<>();
remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> fetchOffsetMetadataMap.put(
topicIdPartition,
replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata
));
TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition();
RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo.remoteStorageFetchInfo();
LogReadResult logReadResult = replicaManagerReadResponse.get(remoteFetchTopicIdPartition);

Future<Void> remoteFetchTask;
Expand All @@ -667,28 +664,10 @@ private Optional<Exception> processRemoteFetchOrException(
} catch (Exception e) {
return Optional.of(e);
}
remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo, fetchOffsetMetadataMap));
remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo));
return Optional.empty();
}

/**
* This function returns the first topic partition for which we need to perform remote storage fetch. We remove all the
* other partitions that can have a remote storage fetch for further processing and release the fetch locks for them.
* @param remoteStorageFetchInfoMap map containing topic partition to remote storage fetch information.
* @return the first topic partition for which we need to perform remote storage fetch
*/
private TopicIdPartition remoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {
Map.Entry<TopicIdPartition, RemoteStorageFetchInfo> firstRemoteStorageFetchInfo = remoteStorageFetchInfoMap.entrySet().iterator().next();
TopicIdPartition remoteFetchTopicIdPartition = firstRemoteStorageFetchInfo.getKey();
remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> {
if (!topicIdPartition.equals(remoteFetchTopicIdPartition)) {
partitionsAcquired.remove(topicIdPartition);
releasePartitionLocks(Set.of(topicIdPartition));
}
});
return remoteFetchTopicIdPartition;
}

/**
* This function checks if the remote fetch can be completed or not. It should always be called once you confirm remoteFetchOpt.isPresent().
* The operation can be completed if:
Expand All @@ -701,25 +680,18 @@ private TopicIdPartition remoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartit
private boolean maybeCompletePendingRemoteFetch() {
boolean canComplete = false;

for (Map.Entry<TopicIdPartition, LogOffsetMetadata> entry : remoteFetchOpt.get().fetchOffsetMetadataMap().entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
LogOffsetMetadata fetchOffsetMetadata = entry.getValue();
try {
if (fetchOffsetMetadata != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
}
} catch (KafkaStorageException e) { // Case a
log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
} catch (UnknownTopicOrPartitionException e) { // Case b
log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
} catch (NotLeaderOrFollowerException e) { // Case c
log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
}
if (canComplete)
break;
TopicIdPartition topicIdPartition = remoteFetchOpt.get().topicIdPartition();
try {
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
} catch (KafkaStorageException e) { // Case a
log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
} catch (UnknownTopicOrPartitionException e) { // Case b
log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
} catch (NotLeaderOrFollowerException e) { // Case c
log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
canComplete = true;
}

if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) { // Case d
Expand Down Expand Up @@ -813,7 +785,7 @@ private void completeRemoteStorageShareFetchRequest() {
// Get the local log read based topic partitions.
LinkedHashMap<TopicIdPartition, SharePartition> nonRemoteFetchSharePartitions = new LinkedHashMap<>();
sharePartitions.forEach((topicIdPartition, sharePartition) -> {
if (!partitionsAcquired.containsKey(topicIdPartition) && !remoteFetchOpt.get().fetchOffsetMetadataMap().containsKey(topicIdPartition)) {
if (!partitionsAcquired.containsKey(topicIdPartition)) {
nonRemoteFetchSharePartitions.put(topicIdPartition, sharePartition);
}
});
Expand Down Expand Up @@ -880,8 +852,7 @@ public record RemoteFetch(
LogReadResult logReadResult,
Future<Void> remoteFetchTask,
CompletableFuture<RemoteLogReadResult> remoteFetchResult,
RemoteStorageFetchInfo remoteFetchInfo,
LinkedHashMap<TopicIdPartition, LogOffsetMetadata> fetchOffsetMetadataMap
RemoteStorageFetchInfo remoteFetchInfo
) {
@Override
public String toString() {
Expand All @@ -891,7 +862,19 @@ public String toString() {
", remoteFetchTask=" + remoteFetchTask +
", remoteFetchResult=" + remoteFetchResult +
", remoteFetchInfo=" + remoteFetchInfo +
", fetchOffsetMetadataMap=" + fetchOffsetMetadataMap +
")";
}
}

public record TopicPartitionRemoteFetchInfo(
TopicIdPartition topicIdPartition,
RemoteStorageFetchInfo remoteStorageFetchInfo
) {
@Override
public String toString() {
return "TopicPartitionRemoteFetchInfo(" +
"topicIdPartition=" + topicIdPartition +
", remoteStorageFetchInfo=" + remoteStorageFetchInfo +
")";
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1688,9 +1688,11 @@ public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() {
assertTrue(delayedShareFetch.isCompleted());
// Pending remote fetch object gets created for delayed share fetch.
assertNotNull(delayedShareFetch.remoteFetch());
// Verify the locks are released separately for tp0 (from onComplete) and tp1 (from tryComplete).
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0));
// Verify the locks are released separately for tp1 (from tryComplete).
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1));
// From onComplete, the locks will be released for both tp0 and tp1. tp0 because it was acquired from
// tryComplete and has remote fetch processed. tp1 will be reacquired in onComplete when we check for local log read.
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1));
assertTrue(shareFetch.isCompleted());
// Share fetch response only contains the first remote storage fetch topic partition - tp0.
assertEquals(Set.of(tp0), future.join().keySet());
Expand Down
0