2525import java .util .Set ;
2626import java .util .concurrent .CompletableFuture ;
2727import java .util .concurrent .ConcurrentHashMap ;
28- import java .util .concurrent .atomic .AtomicReference ;
2928import java .util .function .Supplier ;
30- import java .util .stream .Collectors ;
3129
3230@ Internal
3331@ NullMarked
@@ -38,7 +36,6 @@ public class PerLevelDataLoaderDispatchStrategy implements DataLoaderDispatchStr
3836 private final boolean enableDataLoaderChaining ;
3937
4038
41- static final long DEFAULT_BATCH_WINDOW_NANO_SECONDS_DEFAULT = 500_000L ;
4239 private final Profiler profiler ;
4340
4441 private final Map <AlternativeCallContext , CallStack > deferredCallStackMap = new ConcurrentHashMap <>();
@@ -94,15 +91,15 @@ private static class CallStack {
9491 // all levels that are ready to be dispatched
9592 private int highestReadyLevel ;
9693
97- private final List <ResultPathWithDataLoader > allResultPathWithDataLoader = Collections .synchronizedList (new ArrayList <>());
98- private final Map <Integer , Set <ResultPathWithDataLoader >> levelToResultPathWithDataLoader = new ConcurrentHashMap <>();
99-
94+ /**
95+ * Data for chained dispatching.
96+ * A result path is used to identify a DataFetcher.
97+ */
98+ private final List <DataLoaderInvocation > allDataLoaderInvocations = Collections .synchronizedList (new ArrayList <>());
99+ private final Map <Integer , Set <DataLoaderInvocation >> levelToDataLoaderInvocation = new ConcurrentHashMap <>();
100100 private final Set <Integer > dispatchingStartedPerLevel = ConcurrentHashMap .newKeySet ();
101101 private final Set <Integer > dispatchingFinishedPerLevel = ConcurrentHashMap .newKeySet ();
102- // Set of ResultPath
103- private final Set <String > batchWindowOfDelayedDataLoaderToDispatch = ConcurrentHashMap .newKeySet ();
104-
105- private boolean batchWindowOpen ;
102+ private final Set <Integer > currentlyDelayedDispatchingLevels = ConcurrentHashMap .newKeySet ();
106103
107104
108105 private final List <FieldValueInfo > deferredFragmentRootFieldsFetched = new ArrayList <>();
@@ -113,8 +110,8 @@ public CallStack() {
113110 expectedExecuteObjectCallsPerLevel .set (0 , 1 );
114111 }
115112
116- public void addResultPathWithDataLoader (int level , ResultPathWithDataLoader resultPathWithDataLoader ) {
117- levelToResultPathWithDataLoader .computeIfAbsent (level , k -> new LinkedHashSet <>()).add (resultPathWithDataLoader );
113+ public void addDataLoaderInvocationForLevel (int level , DataLoaderInvocation dataLoaderInvocation ) {
114+ levelToDataLoaderInvocation .computeIfAbsent (level , k -> new LinkedHashSet <>()).add (dataLoaderInvocation );
118115 }
119116
120117
@@ -328,12 +325,9 @@ private void resetCallStack(CallStack callStack) {
328325 callStack .clearHappenedExecuteObjectCalls ();
329326 callStack .clearHappenedOnFieldValueCalls ();
330327 callStack .expectedExecuteObjectCallsPerLevel .set (0 , 1 );
331- callStack .dispatchingFinishedPerLevel .clear ();
332- callStack .dispatchingStartedPerLevel .clear ();
333- callStack .allResultPathWithDataLoader .clear ();
334- callStack .batchWindowOfDelayedDataLoaderToDispatch .clear ();
335- callStack .batchWindowOpen = false ;
336- callStack .levelToResultPathWithDataLoader .clear ();
328+ callStack .currentlyDelayedDispatchingLevels .clear ();
329+ callStack .allDataLoaderInvocations .clear ();
330+ callStack .levelToDataLoaderInvocation .clear ();
337331 callStack .highestReadyLevel = 0 ;
338332 });
339333 }
@@ -475,16 +469,12 @@ void dispatch(int level, CallStack callStack) {
475469 dispatchAll (dataLoaderRegistry , level );
476470 return ;
477471 }
478- Set <ResultPathWithDataLoader > resultPathWithDataLoaders = callStack .levelToResultPathWithDataLoader .get (level );
479- if (resultPathWithDataLoaders != null ) {
480- Set < String > resultPathToDispatch = callStack .lock .callLocked (() -> {
472+ Set <DataLoaderInvocation > dataLoaderInvocations = callStack .levelToDataLoaderInvocation .get (level );
473+ if (dataLoaderInvocations != null ) {
474+ callStack .lock .runLocked (() -> {
481475 callStack .dispatchingStartedPerLevel .add (level );
482- return resultPathWithDataLoaders
483- .stream ()
484- .map (resultPathWithDataLoader -> resultPathWithDataLoader .resultPath )
485- .collect (Collectors .toSet ());
486476 });
487- dispatchDLCFImpl (resultPathToDispatch , level , callStack );
477+ dispatchDLCFImpl (level , callStack , false , false );
488478 } else {
489479 callStack .lock .runLocked (() -> {
490480 callStack .dispatchingStartedPerLevel .add (level );
@@ -504,110 +494,93 @@ private void dispatchAll(DataLoaderRegistry dataLoaderRegistry, int level) {
504494 }
505495 }
506496
507- private void dispatchDLCFImpl (Set < String > resultPathsToDispatch , @ Nullable Integer level , CallStack callStack ) {
497+ private void dispatchDLCFImpl (Integer level , CallStack callStack , boolean delayed , boolean chained ) {
508498
509- // filter out all DataLoaderCFS that are matching the fields we want to dispatch
510- List <ResultPathWithDataLoader > relevantResultPathWithDataLoader = new ArrayList <>();
511- // we need to copy the list because the callStack.allResultPathWithDataLoader can be modified concurrently
512- // while iterating over it
513- ArrayList <ResultPathWithDataLoader > resultPathWithDataLoaders = new ArrayList <>(callStack .allResultPathWithDataLoader );
514- for (ResultPathWithDataLoader resultPathWithDataLoader : resultPathWithDataLoaders ) {
515- if (resultPathsToDispatch .contains (resultPathWithDataLoader .resultPath )) {
516- relevantResultPathWithDataLoader .add (resultPathWithDataLoader );
499+ List <DataLoaderInvocation > relevantDataLoaderInvocations = callStack .lock .callLocked (() -> {
500+ List <DataLoaderInvocation > result = new ArrayList <>();
501+ for (DataLoaderInvocation dataLoaderInvocation : callStack .allDataLoaderInvocations ) {
502+ if (dataLoaderInvocation .level == level ) {
503+ result .add (dataLoaderInvocation );
504+ }
517505 }
518- }
519- // we are cleaning up the list of all DataLoadersCFs
520- callStack . allResultPathWithDataLoader . removeAll ( relevantResultPathWithDataLoader ) ;
521-
522- // means we are all done dispatching the fields
523- if ( relevantResultPathWithDataLoader . size () == 0 ) {
524- if ( level != null ) {
506+ callStack . allDataLoaderInvocations . removeAll ( result );
507+ if ( result . size () > 0 ) {
508+ return result ;
509+ }
510+ if ( delayed ) {
511+ callStack . currentlyDelayedDispatchingLevels . remove ( level );
512+ } else {
525513 callStack .dispatchingFinishedPerLevel .add (level );
526514 }
515+ return result ;
516+ });
517+ if (relevantDataLoaderInvocations .size () == 0 ) {
527518 return ;
528519 }
529520 List <CompletableFuture > allDispatchedCFs = new ArrayList <>();
530- for (ResultPathWithDataLoader resultPathWithDataLoader : relevantResultPathWithDataLoader ) {
531- CompletableFuture <List > dispatch = resultPathWithDataLoader .dataLoader .dispatch ();
521+ for (DataLoaderInvocation dataLoaderInvocation : relevantDataLoaderInvocations ) {
522+ CompletableFuture <List > dispatch = dataLoaderInvocation .dataLoader .dispatch ();
532523 allDispatchedCFs .add (dispatch );
533524 dispatch .whenComplete ((objects , throwable ) -> {
534525 if (objects != null && objects .size () > 0 ) {
535- profiler .batchLoadedNewStrategy (resultPathWithDataLoader .name , level , objects .size ());
526+ profiler .batchLoadedNewStrategy (dataLoaderInvocation .name , level , objects .size (), delayed , chained );
536527 }
537528 });
538529 }
539530 CompletableFuture .allOf (allDispatchedCFs .toArray (new CompletableFuture [0 ]))
540531 .whenComplete ((unused , throwable ) -> {
541- dispatchDLCFImpl (resultPathsToDispatch , level , callStack );
532+ dispatchDLCFImpl (level , callStack , delayed , true );
542533 }
543534 );
544535
545536 }
546537
547538
548- public void newDataLoaderLoadCall (String resultPath , int level , DataLoader dataLoader , String
549- dataLoaderName , Object key , @ Nullable AlternativeCallContext alternativeCallContext ) {
539+ public void newDataLoaderInvocation (String resultPath ,
540+ int level ,
541+ DataLoader dataLoader ,
542+ String dataLoaderName ,
543+ Object key ,
544+ @ Nullable AlternativeCallContext alternativeCallContext ) {
550545 if (!enableDataLoaderChaining ) {
551546 return ;
552547 }
553- ResultPathWithDataLoader resultPathWithDataLoader = new ResultPathWithDataLoader (resultPath , level , dataLoader , dataLoaderName , key );
548+ DataLoaderInvocation dataLoaderInvocation = new DataLoaderInvocation (resultPath , level , dataLoader , dataLoaderName , key );
554549 CallStack callStack = getCallStack (alternativeCallContext );
555- boolean levelFinished = callStack .lock .callLocked (() -> {
550+ boolean startNewDelayedDispatching = callStack .lock .callLocked (() -> {
551+ callStack .allDataLoaderInvocations .add (dataLoaderInvocation );
552+
553+ boolean started = callStack .dispatchingStartedPerLevel .contains (level );
554+ if (!started ) {
555+ callStack .addDataLoaderInvocationForLevel (level , dataLoaderInvocation );
556+ }
556557 boolean finished = callStack .dispatchingFinishedPerLevel .contains (level );
557- callStack .allResultPathWithDataLoader .add (resultPathWithDataLoader );
558- // only add to the list of DataLoader for this level if we are not already dispatching
559- if (!callStack .dispatchingStartedPerLevel .contains (level )) {
560- callStack .addResultPathWithDataLoader (level , resultPathWithDataLoader );
558+ // we need to start a new delayed dispatching if
559+ // the normal dispatching is finished and there is no currently delayed dispatching for this level
560+ boolean newDelayedInvocation = finished && !callStack .currentlyDelayedDispatchingLevels .contains (level );
561+ if (newDelayedInvocation ) {
562+ callStack .currentlyDelayedDispatchingLevels .add (level );
561563 }
562- return finished ;
564+ return newDelayedInvocation ;
563565 });
564- if (levelFinished ) {
565- newDelayedDataLoader ( resultPathWithDataLoader , callStack );
566+ if (startNewDelayedDispatching ) {
567+ dispatchDLCFImpl ( level , callStack , true , false );
566568 }
567569
568570
569571 }
570572
571- class DispatchDelayedDataloader implements Runnable {
572-
573- private final CallStack callStack ;
574-
575- public DispatchDelayedDataloader (CallStack callStack ) {
576- this .callStack = callStack ;
577- }
578-
579- @ Override
580- public void run () {
581- AtomicReference <Set <String >> resultPathToDispatch = new AtomicReference <>();
582- callStack .lock .runLocked (() -> {
583- resultPathToDispatch .set (new LinkedHashSet <>(callStack .batchWindowOfDelayedDataLoaderToDispatch ));
584- callStack .batchWindowOfDelayedDataLoaderToDispatch .clear ();
585- callStack .batchWindowOpen = false ;
586- });
587- dispatchDLCFImpl (Assert .assertNotNull (resultPathToDispatch .get ()), null , callStack );
588- }
589- }
590-
591- private void newDelayedDataLoader (ResultPathWithDataLoader resultPathWithDataLoader , CallStack callStack ) {
592- dispatchDLCFImpl (Set .of (resultPathWithDataLoader .resultPath ), null , callStack );
593- // callStack.lock.runLocked(() -> {
594- // callStack.batchWindowOfDelayedDataLoaderToDispatch.add(resultPathWithDataLoader.resultPath);
595- // if (!callStack.batchWindowOpen) {
596- // callStack.batchWindowOpen = true;
597- // delayedDataLoaderDispatchExecutor.get().schedule(new DispatchDelayedDataloader(callStack), this.batchWindowNs, TimeUnit.NANOSECONDS);
598- // }
599- //
600- // });
601- }
602-
603- private static class ResultPathWithDataLoader {
573+ /**
574+ * A single data loader invocation.
575+ */
576+ private static class DataLoaderInvocation {
604577 final String resultPath ;
605578 final int level ;
606579 final DataLoader dataLoader ;
607580 final String name ;
608581 final Object key ;
609582
610- public ResultPathWithDataLoader (String resultPath , int level , DataLoader dataLoader , String name , Object key ) {
583+ public DataLoaderInvocation (String resultPath , int level , DataLoader dataLoader , String name , Object key ) {
611584 this .resultPath = resultPath ;
612585 this .level = level ;
613586 this .dataLoader = dataLoader ;
0 commit comments