8000 simplify delayed dataloader dispatching by not using a batch window · graphql-java/graphql-java@1dbd50e · GitHub
[go: up one dir, main page]

Skip to content

Commit 1dbd50e

Browse files
committed
simplify delayed dataloader dispatching by not using a batch window
1 parent 677e436 commit 1dbd50e

File tree

7 files changed

+130
-143
lines changed

7 files changed

+130
-143
lines changed

src/main/java/graphql/Profiler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ default void oldStrategyDispatchingAll(int level) {
4646

4747
default void batchLoadedOldStrategy(String name, int level, int count) {
4848

49-
5049
}
5150

52-
default void batchLoadedNewStrategy(String dataLoaderName, @Nullable Integer level, int count) {
51+
52+
default void batchLoadedNewStrategy(String dataLoaderName, Integer level, int count, boolean delayed, boolean chained) {
5353

5454
}
5555

src/main/java/graphql/ProfilerImpl.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,22 @@ public void oldStrategyDispatchingAll(int level) {
147147

148148
@Override
149149
public void batchLoadedOldStrategy(String name, int level, int count) {
150-
profilerResult.addDispatchEvent(name, level, count, ProfilerResult.DispatchEventType.STRATEGY_DISPATCH);
150+
profilerResult.addDispatchEvent(name, level, count, ProfilerResult.DispatchEventType.LEVEL_STRATEGY_DISPATCH);
151151
}
152152

153153
@Override
154-
public void batchLoadedNewStrategy(String dataLoaderName, @Nullable Integer level, int count) {
155-
profilerResult.addDispatchEvent(dataLoaderName, level, count, ProfilerResult.DispatchEventType.STRATEGY_DISPATCH);
154+
public void batchLoadedNewStrategy(String dataLoaderName, Integer level, int count, boolean delayed, boolean chained) {
155+
ProfilerResult.DispatchEventType dispatchEventType = null;
156+
if (delayed && !chained) {
157+
dispatchEventType = ProfilerResult.DispatchEventType.DELAYED_DISPATCH;
158+
} else if (delayed) {
159+
dispatchEventType = ProfilerResult.DispatchEventType.CHAINED_DELAYED_DISPATCH;
160+
} else if (!chained) {
161+
dispatchEventType = ProfilerResult.DispatchEventType.LEVEL_STRATEGY_DISPATCH;
162+
} else {
163+
dispatchEventType = ProfilerResult.DispatchEventType.CHAINED_STRATEGY_DISPATCH;
164+
}
165+
profilerResult.addDispatchEvent(dataLoaderName, level, count, dispatchEventType);
156166
}
157167

158168
@Override

src/main/java/graphql/ProfilerResult.java

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,20 @@ public void setInstrumentationClasses(List<String> instrumentationClasses) {
6464

6565

6666
public enum DispatchEventType {
67-
STRATEGY_DISPATCH,
67+
LEVEL_STRATEGY_DISPATCH,
68+
CHAINED_STRATEGY_DISPATCH,
69+
DELAYED_DISPATCH,
70+
CHAINED_DELAYED_DISPATCH,
6871
MANUAL_DISPATCH,
6972
}
7073

7174
public static class DispatchEvent {
7275
final String dataLoaderName;
73-
final @Nullable Integer level; // is nul 5276 l for delayed dispatching
76+
final Integer level; // is null for delayed dispatching
7477
final int keyCount; // how many
7578
final DispatchEventType type;
7679

77-
public DispatchEvent(String dataLoaderName, @Nullable Integer level, int keyCount, DispatchEventType type) {
80+
public DispatchEvent(String dataLoaderName, Integer level, int keyCount, DispatchEventType type) {
7881
this.dataLoaderName = dataLoaderName;
7982
this.level = level;
8083
this.keyCount = keyCount;
@@ -85,7 +88,7 @@ public String getDataLoaderName() {
8588
return dataLoaderName;
8689
}
8790

88-
public @Nullable Integer getLevel() {
91+
public Integer getLevel() {
8992
return level;
9093
}
9194

@@ -175,7 +178,7 @@ void oldStrategyDispatchingAll(int level) {
175178
}
176179

177180

178-
void addDispatchEvent(String dataLoaderName, @Nullable Integer level, int count, DispatchEventType type) {
181+
void addDispatchEvent(String dataLoaderName, Integer level, int count, DispatchEventType type) {
179182
dispatchEvents.add(new DispatchEvent(dataLoaderName, level, count, type));
180183
}
181184

@@ -316,34 +319,13 @@ public Map<String, Object> shortSummaryMap() {
316319
}
317320

318321

319-
private String printDispatchEvents() {
320-
if (dispatchEvents.isEmpty()) {
321-
return "[]";
322-
}
323-
StringBuilder sb = new StringBuilder();
324-
sb.append("[");
325-
int i = 0;
326-
for (DispatchEvent event : dispatchEvents) {
327-
sb.append("dataLoader=")
328-
.append(event.getDataLoaderName())
329-
.append(", level=")
330-
.append(event.getLevel())
331-
.append(", count=").append(event.getKeyCount());
332-
if (i++ < dispatchEvents.size() - 1) {
333-
sb.append("; ");
334-
}
335-
}
336-
sb.append("]");
337-
return sb.toString();
338-
}
339-
340322
public List<Map<String, Object>> getDispatchEventsAsMap() {
341323
List<Map<String, Object>> result = new ArrayList<>();
342324
for (DispatchEvent event : dispatchEvents) {
343325
Map<String, Object> eventMap = new LinkedHashMap<>();
344326
eventMap.put("type", event.getType().name());
345327
eventMap.put("dataLoader", event.getDataLoaderName());
346-
eventMap.put("level", event.getLevel() != null ? event.getLevel() : "delayed");
328+
eventMap.put("level", event.getLevel());
347329
eventMap.put("keyCount", event.getKeyCount());
348330
result.add(eventMap);
349331
}

src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java

Lines changed: 64 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@
2525
import java.util.Set;
2626
import java.util.concurrent.CompletableFuture;
2727
import java.util.concurrent.ConcurrentHashMap;
28-
import java.util.concurrent.atomic.AtomicReference;
2928
import java.util.function.Supplier;
30-
import java.util.stream.Collectors;
3129

3230
@Internal
3331
@NullMarked
@@ -38,7 +36,6 @@ public class PerLevelDataLoaderDispatchStrategy implements DataLoaderDispatchStr
3836
private final boolean enableDataLoaderChaining;
3937

4038

41-
static final long DEFAULT_BATCH_WINDOW_NANO_SECONDS_DEFAULT = 500_000L;
4239
private final Profiler profiler;
4340

4441
private final Map<AlternativeCallContext, CallStack> deferredCallStackMap = new ConcurrentHashMap<>();
@@ -94,15 +91,15 @@ private static class CallStack {
9491
// all levels that are ready to be dispatched
9592
private int highestReadyLevel;
9693

97-
private final List<ResultPathWithDataLoader> allResultPathWithDataLoader = Collections.synchronizedList(new ArrayList<>());
98-
private final Map<Integer, Set<ResultPathWithDataLoader>> levelToResultPathWithDataLoader = new ConcurrentHashMap<>();
99-
94+
/**
95+
* Data for chained dispatching.
96+
* A result path is used to identify a DataFetcher.
97+
*/
98+
private final List<DataLoaderInvocation> allDataLoaderInvocations = Collections.synchronizedList(new ArrayList<>());
99+
private final Map<Integer, Set<DataLoaderInvocation>> levelToDataLoaderInvocation = new ConcurrentHashMap<>();
100100
private final Set<Integer> dispatchingStartedPerLevel = ConcurrentHashMap.newKeySet();
101101
private final Set<Integer> dispatchingFinishedPerLevel = ConcurrentHashMap.newKeySet();
102-
// Set of ResultPath
103-
private final Set<String> batchWindowOfDelayedDataLoaderToDispatch = ConcurrentHashMap.newKeySet();
104-
105-
private boolean batchWindowOpen;
102+
private final Set<Integer> currentlyDelayedDispatchingLevels = ConcurrentHashMap.newKeySet();
106103

107104

108105
private final List<FieldValueInfo> deferredFragmentRootFieldsFetched = new ArrayList<>();
@@ -113,8 +110,8 @@ public CallStack() {
113110
expectedExecuteObjectCallsPerLevel.set(0, 1);
114111
}
115112

116-
public void addResultPathWithDataLoader(int level, ResultPathWithDataLoader resultPathWithDataLoader) {
117-
levelToResultPathWithDataLoader.computeIfAbsent(level, k -> new LinkedHashSet<>()).add(resultPathWithDataLoader);
113+
public void addDataLoaderInvocationForLevel(int level, DataLoaderInvocation dataLoaderInvocation) {
114+
levelToDataLoaderInvocation.computeIfAbsent(level, k -> new LinkedHashSet<>()).add(dataLoaderInvocation);
118115
}
119116

120117

@@ -328,12 +325,9 @@ private void resetCallStack(CallStack callStack) {
328325
callStack.clearHappenedExecuteObjectCalls();
329326
callStack.clearHappenedOnFieldValueCalls();
330327
callStack.expectedExecuteObjectCallsPerLevel.set(0, 1);
331-
callStack.dispatchingFinishedPerLevel.clear();
332-
callStack.dispatchingStartedPerLevel.clear();
333-
callStack.allResultPathWithDataLoader.clear();
334-
callStack.batchWindowOfDelayedDataLoaderToDispatch.clear();
335-
callStack.batchWindowOpen = false;
336-
callStack.levelToResultPathWithDataLoader.clear();
328+
callStack.currentlyDelayedDispatchingLevels.clear();
329+
callStack.allDataLoaderInvocations.clear();
330+
callStack.levelToDataLoaderInvocation.clear();
337331
callStack.highestReadyLevel = 0;
338332
});
339333
}
@@ -475,16 +469,12 @@ void dispatch(int level, CallStack callStack) {
475469
dispatchAll(dataLoaderRegistry, level);
476470
return;
477471
}
478-
Set<ResultPathWithDataLoader> resultPathWithDataLoaders = callStack.levelToResultPathWithDataLoader.get(level);
479-
if (resultPathWithDataLoaders != null) {
480-
Set<String> resultPathToDispatch = callStack.lock.callLocked(() -> {
472+
Set<DataLoaderInvocation> dataLoaderInvocations = callStack.levelToDataLoaderInvocation.get(level);
473+
if (dataLoaderInvocations != null) {
474+
callStack.lock.runLocked(() -> {
481475
callStack.dispatchingStartedPerLevel.add(level);
482-
return resultPathWithDataLoaders
483-
.stream()
484-
.map(resultPathWithDataLoader -> resultPathWithDataLoader.resultPath)
485-
.collect(Collectors.toSet());
486476
});
487-
dispatchDLCFImpl(resultPathToDispatch, level, callStack);
477+
dispatchDLCFImpl(level, callStack, false, false);
488478
} else {
489479
callStack.lock.runLocked(() -> {
490480
callStack.dispatchingStartedPerLevel.add(level);
@@ -504,110 +494,93 @@ private void dispatchAll(DataLoaderRegistry dataLoaderRegistry, int level) {
504494
}
505495
}
506496

507-
private void dispatchDLCFImpl(Set<String> resultPathsToDispatch, @Nullable Integer level, CallStack callStack) {
497+
private void dispatchDLCFImpl(Integer level, CallStack callStack, boolean delayed, boolean chained) {
508498

509-
// filter out all DataLoaderCFS that are matching the fields we want to dispatch
510-
List<ResultPathWithDataLoader> relevantResultPathWithDataLoader = new ArrayList<>();
511-
// we need to copy the list because the callStack.allResultPathWithDataLoader can be modified concurrently
512-
// while iterating over it
513-
ArrayList<ResultPathWithDataLoader> resultPathWithDataLoaders = new ArrayList<>(callStack.allResultPathWithDataLoader);
514-
for (ResultPathWithDataLoader resultPathWithDataLoader : resultPathWithDataLoaders) {
515-
if (resultPathsToDispatch.contains(resultPathWithDataLoader.resultPath)) {
516-
relevantResultPathWithDataLoader.add(resultPathWithDataLoader);
499+
List<DataLoaderInvocation> relevantDataLoaderInvocations = callStack.lock.callLocked(() -> {
500+
List<DataLoaderInvocation> result = new ArrayList<>();
501+
for (DataLoaderInvocation dataLoaderInvocation : callStack.allDataLoaderInvocations) {
502+
if (dataLoaderInvocation.level == level) {
503+
result.add(dataLoaderInvocation);
504+
}
517505
}
518-
}
519-
// we are cleaning up the list of all DataLoadersCFs
520-
callStack.allResultPathWithDataLoader.removeAll(relevantResultPathWithDataLoader);
521-
522-
// means we are all done dispatching the fields
523-
if (relevantResultPathWithDataLoader.size() == 0) {
524-
if (level != null) {
506+
callStack.allDataLoaderInvocations.removeAll(result);
507+
if (result.size() > 0) {
508+
return result;
509+
}
510+
if (delayed) {
511+
callStack.currentlyDelayedDispatchingLevels.remove(level);
512+
} else {
525513
callStack.dispatchingFinishedPerLevel.add(level);
526514
}
515+
return result;
516+
});
517+
if (relevantDataLoaderInvocations.size() == 0) {
527518
return;
528519
}
529520
List<CompletableFuture> allDispatchedCFs = new ArrayList<>();
530-
for (ResultPathWithDataLoader resultPathWithDataLoader : relevantResultPathWithDataLoader) {
531-
CompletableFuture<List> dispatch = resultPathWithDataLoader.dataLoader.dispatch();
521+
for (DataLoaderInvocation dataLoaderInvocation : relevantDataLoaderInvocations) {
522+
CompletableFuture<List> dispatch = dataLoaderInvocation.dataLoader.dispatch();
532523
allDispatchedCFs.add(dispatch);
533524
dispatch.whenComplete((objects, throwable) -> {
534525
if (objects != null && objects.size() > 0) {
535-
profiler.batchLoadedNewStrategy(resultPathWithDataLoader.name, level, objects.size());
526+
profiler.batchLoadedNewStrategy(dataLoaderInvocation.name, level, objects.size(), delayed, chained);
536527
}
537528
});
538529
}
539530
CompletableFuture.allOf(allDispatchedCFs.toArray(new CompletableFuture[0]))
540531
.whenComplete((unused, throwable) -> {
541-
dispatchDLCFImpl(resultPathsToDispatch, level, callStack);
532+
dispatchDLCFImpl(level, callStack, delayed, true);
542533
}
543534
);
544535

545536
}
546537

547538

548-
public void newDataLoaderLoadCall(String resultPath, int level, DataLoader dataLoader, String
549-
dataLoaderName, Object key, @Nullable AlternativeCallContext alternativeCallContext) {
539+
public void newDataLoaderInvocation(String resultPath,
540+
int level,
541+
DataLoader dataLoader,
542+
String dataLoaderName,
543+
Object key,
544+
@Nullable AlternativeCallContext alternativeCallContext) {
550545
if (!enableDataLoaderChaining) {
551546
return;
552547
}
553-
ResultPathWithDataLoader resultPathWithDataLoader = new ResultPathWithDataLoader(resultPath, level, dataLoader, dataLoaderName, key);
548+
DataLoaderInvocation dataLoaderInvocation = new DataLoaderInvocation(resultPath, level, dataLoader, dataLoaderName, key);
554549
CallStack callStack = getCallStack(alternativeCallContext);
555-
boolean levelFinished = callStack.lock.callLocked(() -> {
550+
boolean startNewDelayedDispatching = callStack.lock.callLocked(() -> {
551+
callStack.allDataLoaderInvocations.add(dataLoaderInvocation);
552+
553+
boolean started = callStack.dispatchingStartedPerLevel.contains(level);
554+
if (!started) {
555+
callStack.addDataLoaderInvocationForLevel(level, dataLoaderInvocation);
556+
}
556557
boolean finished = callStack.dispatchingFinishedPerLevel.contains(level);
557-
callStack.allResultPathWithDataLoader.add(resultPathWithDataLoader);
558-
// only add to the list of DataLoader for this level if we are not already dispatching
559-
if (!callStack.dispatchingStartedPerLevel.contains(level)) {
560-
callStack.addResultPathWithDataLoader(level, resultPathWithDataLoader);
558+
// we need to start a new delayed dispatching if
559+
// the normal dispatching is finished and there is no currently delayed dispatching for this level
560+
boolean newDelayedInvocation = finished && !callStack.currentlyDelayedDispatchingLevels.contains(level);
561+
if (newDelayedInvocation) {
562+
callStack.currentlyDelayedDispatchingLevels.add(level);
561563
}
562-
return finished;
564+
return newDelayedInvocation;
563565
});
564-
if (levelFinished) {
565-
newDelayedDataLoader(resultPathWithDataLoader, callStack);
566+
if (startNewDelayedDispatching) {
567+
dispatchDLCFImpl(level, callStack, true, false);
566568
}
567569

568570

569571
}
570572

571-
class DispatchDelayedDataloader implements Runnable {
572-
573-
private final CallStack callStack;
574-
575-
public DispatchDelayedDataloader(CallStack callStack) {
576-
this.callStack = callStack;
577-
}
578-
579-
@Override
580-
public void run() {
581-
AtomicReference<Set<String>> resultPathToDispatch = new AtomicReference<>();
582-
callStack.lock.runLocked(() -> {
583-
resultPathToDispatch.set(new LinkedHashSet<>(callStack.batchWindowOfDelayedDataLoaderToDispatch));
584-
callStack.batchWindowOfDelayedDataLoaderToDispatch.clear();
585-
callStack.batchWindowOpen = false;
586-
});
587-
dispatchDLCFImpl(Assert.assertNotNull(resultPathToDispatch.get()), null, callStack);
588-
}
589-
}
590-
591-
private void newDelayedDataLoader(ResultPathWithDataLoader resultPathWithDataLoader, CallStack callStack) {
592-
dispatchDLCFImpl(Set.of(resultPathWithDataLoader.resultPath), null, callStack);
593-
// callStack.lock.runLocked(() -> {
594-
// callStack.batchWindowOfDelayedDataLoaderToDispatch.add(resultPathWithDataLoader.resultPath);
595-
// if (!callStack.batchWindowOpen) {
596-
// callStack.batchWindowOpen = true;
597-
// delayedDataLoaderDispatchExecutor.get().schedule(new DispatchDelayedDataloader(callStack), this.batchWindowNs, TimeUnit.NANOSECONDS);
598-
// }
599-
//
600-
// });
601-
}
602-
603-
private static class ResultPathWithDataLoader {
573+
/**
574+
* A single data loader invocation.
575+
*/
576+
private static class DataLoaderInvocation {
604577
final String resultPath;
605578
final int level;
606579
final DataLoader dataLoader;
607580
final String name;
608581
final Object key;
609582

610-
public ResultPathWithDataLoader(String resultPath, int level, DataLoader dataLoader, String name, Object key) {
583+
public DataLoaderInvocation(String resultPath, int level, DataLoader dataLoader, String name, Object key) {
611584
this.resultPath = resultPath;
612585
this.level = level;
613586
this.dataLoader = dataLoader;

src/main/java/graphql/schema/DataLoaderWithContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public CompletableFuture<V> load(@NonNull K key, @Nullable Object keyContext) {
3636
AlternativeCallContext alternativeCallContext = dfeInternalState.getDeferredCallContext();
3737
int level = dfe.getExecutionStepInfo().getPath().getLevel();
3838
String path = dfe.getExecutionStepInfo().getPath().toString();
39-
((PerLevelDataLoaderDispatchStrategy) dfeInternalState.dataLoaderDispatchStrategy).newDataLoaderLoadCall(path, level, delegate, dataLoaderName, key, alternativeCallContext);
39+
((PerLevelDataLoaderDispatchStrategy) dfeInternalState.dataLoaderDispatchStrategy).newDataLoaderInvocation(path, level, delegate, dataLoaderName, key, alternativeCallContext);
4040
}
4141
return result;
4242
}

src/test/groovy/graphql/ChainedDataLoaderTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ class ChainedDataLoaderTest extends Specification {
264264
}
265265

266266

267-
def "chained data loaders with an isolated data loader"() {
267+
def "chained data loaders with an delayed data loader"() {
268268
given:
269269
def sdl = '''
270270

0 commit comments

Comments
 (0)
0