1616import java .util .Optional ;
1717import java .util .Random ;
1818import java .util .concurrent .CompletableFuture ;
19+ import java .util .concurrent .CountDownLatch ;
20+ import java .util .concurrent .ExecutorService ;
21+ import java .util .concurrent .Executors ;
1922import java .util .concurrent .atomic .AtomicBoolean ;
2023import java .util .concurrent .atomic .AtomicLong ;
2124import java .util .function .Supplier ;
@@ -32,10 +35,21 @@ public class BatchCompareDataFetchers {
3235
3336 AtomicBoolean useAsyncBatchLoading = new AtomicBoolean (false );
3437
38+ private volatile CountDownLatch rootFetcherRendezvous ;
39+ private volatile CountDownLatch completionOverlapLatch ;
40+ private final AtomicBoolean shopsOverlapSignaled = new AtomicBoolean (false );
41+ private final AtomicBoolean exShopsOverlapSignaled = new AtomicBoolean (false );
42+ private final ExecutorService executor = Executors .newFixedThreadPool (4 );
43+
3544 public void useAsyncBatchLoading (boolean flag ) {
3645 useAsyncBatchLoading .set (flag );
3746 }
3847
48+ public void useSynchronizedFetching (int numberOfRootFetchers ) {
49+ rootFetcherRendezvous = new CountDownLatch (numberOfRootFetchers );
50+ completionOverlapLatch = new CountDownLatch (numberOfRootFetchers );
51+ }
52+
3953
4054 private static final Map <String , Shop > shops = new LinkedHashMap <>();
4155 private static final Map <String , Shop > expensiveShops = new LinkedHashMap <>();
@@ -52,10 +66,10 @@ public void useAsyncBatchLoading(boolean flag) {
5266
5367
5468 public DataFetcher <CompletableFuture <List <Shop >>> shopsDataFetcher =
55- environment -> supplyAsyncWithSleep (() -> new ArrayList <>(shops .values ()));
69+ environment -> supplyAsyncWithRendezvous (() -> new ArrayList <>(shops .values ()));
5670
5771 public DataFetcher <CompletableFuture <List <Shop >>> expensiveShopsDataFetcher = environment ->
58- supplyAsyncWithSleep (() -> new ArrayList <>(expensiveShops .values ()));
72+ supplyAsyncWithRendezvous (() -> new ArrayList <>(expensiveShops .values ()));
5973
6074 // Departments
6175 private static Map <String , Department > departments = new LinkedHashMap <>();
@@ -101,6 +115,21 @@ private static List<List<Department>> getDepartmentsForShops(List<Shop> shops) {
101115
102116 public DataFetcher <CompletableFuture <List <Department >>> departmentsForShopDataLoaderDataFetcher = environment -> {
103117 Shop shop = environment .getSource ();
118+ // When synchronized fetching is enabled, ensure both root fields (shops and expensiveShops)
119+ // are inside their startComplete/stopComplete window before either proceeds.
120+ // This guarantees objectRunningCount never drops to 0 prematurely.
121+ CountDownLatch overlapLatch = completionOverlapLatch ;
122+ if (overlapLatch != null ) {
123+ AtomicBoolean flag = shop .getId ().startsWith ("ex" ) ? exShopsOverlapSignaled : shopsOverlapSignaled ;
124+ if (flag .compareAndSet (false , true )) {
125+ overlapLatch .countDown ();
126+ try {
127+ overlapLatch .await ();
128+ } catch (InterruptedException e ) {
129+ throw new RuntimeException (e );
130+ }
131+ }
132+ }
104133 return (CompletableFuture ) environment .getDataLoader ("departments" ).load (shop .getId ());
105134 };
106135
@@ -149,6 +178,22 @@ private <T> CompletableFuture<T> maybeAsyncWithSleep(Supplier<CompletableFuture<
149178 }
150179 }
151180
181+ private <T > CompletableFuture <T > supplyAsyncWithRendezvous (Supplier <T > supplier ) {
182+ CountDownLatch latch = rootFetcherRendezvous ;
183+ if (latch != null ) {
184+ return CompletableFuture .supplyAsync (() -> {
185+ try {
186+ latch .countDown ();
187+ latch .await ();
188+ } catch (InterruptedException e ) {
189+ throw new RuntimeException (e );
190+ }
191+ return supplier .get ();
192+ }, executor );
193+ }
194+ return supplyAsyncWithSleep (supplier );
195+ }
196+
152197 private static <T > CompletableFuture <T > supplyAsyncWithSleep (Supplier <T > supplier ) {
153198 Supplier <T > sleepSome = sleepSome (supplier );
154199 return CompletableFuture .supplyAsync (sleepSome );
0 commit comments