From d223632579e433181e6b42b62d291e20fa5450a7 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Sat, 7 Mar 2026 23:27:55 +1000 Subject: [PATCH] Fix flaky DataLoaderPerformanceTest with deterministic synchronization The "expensive query" tests with multiple root fields (shops + expensiveShops) were flaky because random-sleep async data fetchers could complete at different times, causing ExhaustedDataLoaderDispatchStrategy to dispatch prematurely and split batches. Uses a two-latch synchronization approach: 1. Root fetcher rendezvous: ensures both root data fetchers complete simultaneously 2. Completion overlap latch: ensures both root fields are inside their startComplete/stopComplete window before either proceeds, eliminating the race between thenApply callbacks entirely Also removes @Ignore from the equivalent test in DataLoaderPerformanceWithChainedInstrumentationTest and tightens assertions from <= 2 to == 1 since synchronized fetching guarantees optimal batching. Co-Authored-By: Claude Opus 4.6 --- .../dataloader/BatchCompareDataFetchers.java | 49 ++++++++++++++++++- .../DataLoaderPerformanceTest.groovy | 11 +++-- ...manceWithChainedInstrumentationTest.groovy | 9 ++-- 3 files changed, 59 insertions(+), 10 deletions(-) diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/BatchCompareDataFetchers.java b/src/test/groovy/graphql/execution/instrumentation/dataloader/BatchCompareDataFetchers.java index 08edd13248..36829b0130 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/BatchCompareDataFetchers.java +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/BatchCompareDataFetchers.java @@ -16,6 +16,9 @@ import java.util.Optional; import java.util.Random; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -32,10 +35,21 @@ public class BatchCompareDataFetchers { AtomicBoolean useAsyncBatchLoading = new AtomicBoolean(false); + private volatile CountDownLatch rootFetcherRendezvous; + private volatile CountDownLatch completionOverlapLatch; + private final AtomicBoolean shopsOverlapSignaled = new AtomicBoolean(false); + private final AtomicBoolean exShopsOverlapSignaled = new AtomicBoolean(false); + private final ExecutorService executor = Executors.newFixedThreadPool(4); + public void useAsyncBatchLoading(boolean flag) { useAsyncBatchLoading.set(flag); } + public void useSynchronizedFetching(int numberOfRootFetchers) { + rootFetcherRendezvous = new CountDownLatch(numberOfRootFetchers); + completionOverlapLatch = new CountDownLatch(numberOfRootFetchers); + } + private static final Map shops = new LinkedHashMap<>(); private static final Map expensiveShops = new LinkedHashMap<>(); @@ -52,10 +66,10 @@ public void useAsyncBatchLoading(boolean flag) { public DataFetcher>> shopsDataFetcher = - environment -> supplyAsyncWithSleep(() -> new ArrayList<>(shops.values())); + environment -> supplyAsyncWithRendezvous(() -> new ArrayList<>(shops.values())); public DataFetcher>> expensiveShopsDataFetcher = environment -> - supplyAsyncWithSleep(() -> new ArrayList<>(expensiveShops.values())); + supplyAsyncWithRendezvous(() -> new ArrayList<>(expensiveShops.values())); // Departments private static Map departments = new LinkedHashMap<>(); @@ -101,6 +115,21 @@ private static List> getDepartmentsForShops(List shops) { public DataFetcher>> departmentsForShopDataLoaderDataFetcher = environment -> { Shop shop = environment.getSource(); + // When synchronized fetching is enabled, ensure both root fields (shops and expensiveShops) + // are inside their startComplete/stopComplete window before either proceeds. + // This guarantees objectRunningCount never drops to 0 prematurely. + CountDownLatch overlapLatch = completionOverlapLatch; + if (overlapLatch != null) { + AtomicBoolean flag = shop.getId().startsWith("ex") ? exShopsOverlapSignaled : shopsOverlapSignaled; + if (flag.compareAndSet(false, true)) { + overlapLatch.countDown(); + try { + overlapLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } return (CompletableFuture) environment.getDataLoader("departments").load(shop.getId()); }; @@ -149,6 +178,22 @@ private CompletableFuture maybeAsyncWithSleep(Supplier CompletableFuture supplyAsyncWithRendezvous(Supplier supplier) { + CountDownLatch latch = rootFetcherRendezvous; + if (latch != null) { + return CompletableFuture.supplyAsync(() -> { + try { + latch.countDown(); + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return supplier.get(); + }, executor); + } + return supplyAsyncWithSleep(supplier); + } + private static CompletableFuture supplyAsyncWithSleep(Supplier supplier) { Supplier sleepSome = sleepSome(supplier); return CompletableFuture.supplyAsync(sleepSome); diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceTest.groovy b/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceTest.groovy index 1ee790ded3..6a1a2e09d9 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceTest.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceTest.groovy @@ -60,6 +60,8 @@ class DataLoaderPerformanceTest extends Specification { when: + batchCompareDataFetchers.useSynchronizedFetching(2) + ExecutionInput executionInput = ExecutionInput.newExecutionInput() .query(getExpensiveQuery(false)) .dataLoaderRegistry(dataLoaderRegistry) @@ -71,8 +73,8 @@ class DataLoaderPerformanceTest extends Specification { then: result.data == expectedExpensiveData - batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() <= 2 - batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() <= 2 + batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() == 1 + batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 1 where: incrementalSupport | contextKey @@ -123,6 +125,7 @@ class DataLoaderPerformanceTest extends Specification { when: batchCompareDataFetchers.useAsyncBatchLoading(true) + batchCompareDataFetchers.useSynchronizedFetching(2) ExecutionInput executionInput = ExecutionInput.newExecutionInput() .query(getExpensiveQuery(false)) @@ -136,8 +139,8 @@ class DataLoaderPerformanceTest extends Specification { then: result.data == expectedExpensiveData - batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() <= 2 - batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() <= 2 + batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() == 1 + batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 1 where: incrementalSupport | contextKey diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceWithChainedInstrumentationTest.groovy b/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceWithChainedInstrumentationTest.groovy index 5467c87220..9e779765fd 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceWithChainedInstrumentationTest.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderPerformanceWithChainedInstrumentationTest.groovy @@ -3,7 +3,6 @@ package graphql.execution.instrumentation.dataloader import graphql.ExecutionInput import graphql.GraphQL import org.dataloader.DataLoaderRegistry -import spock.lang.Ignore import spock.lang.Specification import static graphql.ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT @@ -49,11 +48,12 @@ class DataLoaderPerformanceWithChainedInstrumentationTest extends Specification incrementalSupport << [true, false] } - @Ignore("This test flakes on Travis for some reason. Clearly this indicates some sort of problem to investigate. However it also stop releases.") def "chainedInstrumentation: 970 ensure data loader is performant for multiple field with lists"() { when: + batchCompareDataFetchers.useSynchronizedFetching(2) + ExecutionInput executionInput = ExecutionInput.newExecutionInput() .query(getExpensiveQuery(false)) .dataLoaderRegistry(dataLoaderRegistry) @@ -101,6 +101,7 @@ class DataLoaderPerformanceWithChainedInstrumentationTest extends Specification when: batchCompareDataFetchers.useAsyncBatchLoading(true) + batchCompareDataFetchers.useSynchronizedFetching(2) ExecutionInput executionInput = ExecutionInput.newExecutionInput() .query(getExpensiveQuery(false)) @@ -112,8 +113,8 @@ class DataLoaderPerformanceWithChainedInstrumentationTest extends Specification then: result.data == expectedExpensiveData - batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() <= 2 - batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() <= 2 + batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() == 1 + batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 1 where: incrementalSupport << [true, false]