diff --git a/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherRandomCompleteTckVerificationTest.java b/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherRandomCompleteTckVerificationTest.java index 8a8b84fed..1dd0a08cb 100644 --- a/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherRandomCompleteTckVerificationTest.java +++ b/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherRandomCompleteTckVerificationTest.java @@ -12,12 +12,16 @@ import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Function; /** * This uses the reactive streams TCK to test that our CompletionStageMappingPublisher meets spec - * when it's got CFs that complete at different times + * when it's got CFs that complete at different times. + *

+ * Uses a shared single-thread executor per publisher so CFs complete sequentially — see + * CompletionStageMappingOrderedPublisherTckVerificationTest for details on why. */ @Test public class CompletionStageMappingOrderedPublisherRandomCompleteTckVerificationTest extends PublisherVerification { @@ -50,6 +54,7 @@ public boolean skipStochasticTests() { @NonNull private static Function> mapperFunc() { + ExecutorService executor = Executors.newSingleThreadExecutor(); return i -> CompletableFuture.supplyAsync(() -> { int ms = rand(0, 5); try { @@ -58,7 +63,7 @@ private static Function> mapperFunc() { throw new RuntimeException(e); } return i + "!"; - }, Executors.newSingleThreadExecutor()); + }, executor); } static Random rn = new Random(); diff --git a/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherTckVerificationTest.java b/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherTckVerificationTest.java index b1b868919..bdd45ae08 100644 --- a/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherTckVerificationTest.java +++ b/src/test/groovy/graphql/execution/reactive/tck/CompletionStageMappingOrderedPublisherTckVerificationTest.java @@ -10,12 +10,18 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Function; /** * This uses the reactive streams TCK to test that our CompletionStageMappingPublisher meets spec - * when it's got CFs that complete off thread + * when it's got CFs that complete off thread. + *

+ * Uses a shared single-thread executor per publisher so CFs complete sequentially. + * The ordered subscriber drains completed CFs in a while loop — with concurrent executors, + * multiple CFs can complete before the drain starts, causing multiple onNext calls on the + * same thread which the TCK flags as a spec303 (unbounded recursion) violation. */ @Test public class CompletionStageMappingOrderedPublisherTckVerificationTest extends PublisherVerification { @@ -32,14 +38,16 @@ public long maxElementsFromPublisher() { @Override public Publisher createPublisher(long elements) { Publisher publisher = Flowable.range(0, (int) elements); - Function> mapper = i -> CompletableFuture.supplyAsync(() -> i + "!", Executors.newSingleThreadExecutor()); + ExecutorService executor = Executors.newSingleThreadExecutor(); + Function> mapper = i -> CompletableFuture.supplyAsync(() -> i + "!", executor); return new CompletionStageMappingOrderedPublisher<>(publisher, mapper); } @Override public Publisher createFailedPublisher() { Publisher publisher = Flowable.error(() -> new RuntimeException("Bang")); - Function> mapper = i -> CompletableFuture.supplyAsync(() -> i + "!", Executors.newSingleThreadExecutor()); + ExecutorService executor = Executors.newSingleThreadExecutor(); + Function> mapper = i -> CompletableFuture.supplyAsync(() -> i + "!", executor); return new CompletionStageMappingOrderedPublisher<>(publisher, mapper); }