From 431a3dc9336d0cc18b6dbdd947c99b6b4228fffb Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Thu, 5 Mar 2026 21:13:03 +1000 Subject: [PATCH] Fix spec303 flaky failure in ordered publisher TCK tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ordered subscriber's drain loop (emptyInFlightQueueIfWeCan) calls onNext for every completed CF in a while loop. With per-element executors, multiple CFs can complete concurrently, so the drain loop calls onNext multiple times on the same thread — the TCK flags this as a spec303 (unbounded recursion) violation. Fix: use a shared single-thread executor per publisher instead of a new executor per element. This ensures CFs complete sequentially, so when the drain loop runs on the executor thread, no other CFs can complete (they're queued behind it), and the loop only processes one item. Co-Authored-By: Claude Opus 4.6 --- ...PublisherRandomCompleteTckVerificationTest.java | 9 +++++++-- ...MappingOrderedPublisherTckVerificationTest.java | 14 +++++++++++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherRandomCompleteTckVerificationTest.java b/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherRandomCompleteTckVerificationTest.java index 8a8b84fed0..1dd0a08cb1 100644 --- a/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherRandomCompleteTckVerificationTest.java +++ b/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherRandomCompleteTckVerificationTest.java @@ -12,12 +12,16 @@ import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Function; /** * This uses the reactive streams TCK to test that our CompletionStageMappingPublisher meets spec - * when it's got CFs that complete at different times + * when it's got CFs that complete at different times. + *

+ * Uses a shared single-thread executor per publisher so CFs complete sequentially — see + * CompletionStageMappingOrderedPublisherTckVerificationTest for details on why. */ @Test public class CompletionStageMappingOrderedPublisherRandomCompleteTckVerificationTest extends PublisherVerification { @@ -50,6 +54,7 @@ public boolean skipStochasticTests() { @NonNull private static Function> mapperFunc() { + ExecutorService executor = Executors.newSingleThreadExecutor(); return i -> CompletableFuture.supplyAsync(() -> { int ms = rand(0, 5); try { @@ -58,7 +63,7 @@ private static Function> mapperFunc() { throw new RuntimeException(e); } return i + "!"; - }, Executors.newSingleThreadExecutor()); + }, executor); } static Random rn = new Random(); diff --git a/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherTckVerificationTest.java b/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherTckVerificationTest.java index b1b8689190..bdd45ae082 100644 --- a/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherTckVerificationTest.java +++ b/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherTckVerificationTest.java @@ -10,12 +10,18 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Function; /** * This uses the reactive streams TCK to test that our CompletionStageMappingPublisher meets spec - * when it's got CFs that complete off thread + * when it's got CFs that complete off thread. + *

+ * Uses a shared single-thread executor per publisher so CFs complete sequentially. + * The ordered subscriber drains completed CFs in a while loop — with concurrent executors, + * multiple CFs can complete before the drain starts, causing multiple onNext calls on the + * same thread which the TCK flags as a spec303 (unbounded recursion) violation. */ @Test public class CompletionStageMappingOrderedPublisherTckVerificationTest extends PublisherVerification { @@ -32,14 +38,16 @@ public long maxElementsFromPublisher() { @Override public Publisher createPublisher(long elements) { Publisher publisher = Flowable.range(0, (int) elements); - Function> mapper = i -> CompletableFuture.supplyAsync(() -> i + "!", Executors.newSingleThreadExecutor()); + ExecutorService executor = Executors.newSingleThreadExecutor(); + Function> mapper = i -> CompletableFuture.supplyAsync(() -> i + "!", executor); return new CompletionStageMappingOrderedPublisher<>(publisher, mapper); } @Override public Publisher createFailedPublisher() { Publisher publisher = Flowable.error(() -> new RuntimeException("Bang")); - Function> mapper = i -> CompletableFuture.supplyAsync(() -> i + "!", Executors.newSingleThreadExecutor()); + ExecutorService executor = Executors.newSingleThreadExecutor(); + Function> mapper = i -> CompletableFuture.supplyAsync(() -> i + "!", executor); return new CompletionStageMappingOrderedPublisher<>(publisher, mapper); }