diff --git a/src/main/java/graphql/GraphQLUnusualConfiguration.java b/src/main/java/graphql/GraphQLUnusualConfiguration.java index 3e3443c07d..e96b0b7380 100644 --- a/src/main/java/graphql/GraphQLUnusualConfiguration.java +++ b/src/main/java/graphql/GraphQLUnusualConfiguration.java @@ -1,6 +1,7 @@ package graphql; import graphql.execution.ResponseMapFactory; +import graphql.execution.incremental.IncrementalExecutionContextKeys; import graphql.introspection.GoodFaithIntrospection; import graphql.parser.ParserOptions; import graphql.schema.PropertyDataFetcherHelper; @@ -337,6 +338,15 @@ public IncrementalSupportConfig enableIncrementalSupport(boolean enable) { contextConfig.put(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT, enable); return this; } + + /** + * This controls whether @defer field execution starts as early as possible. + */ + @ExperimentalApi + public IncrementalSupportConfig enableEarlyIncrementalFieldExecution(boolean enable) { + contextConfig.put(IncrementalExecutionContextKeys.ENABLE_EAGER_DEFER_START, enable); + return this; + } } public static class DataloaderConfig extends BaseContextConfig { diff --git a/src/main/java/graphql/execution/ExecutionStrategy.java b/src/main/java/graphql/execution/ExecutionStrategy.java index 39d4b08701..06d3b644b1 100644 --- a/src/main/java/graphql/execution/ExecutionStrategy.java +++ b/src/main/java/graphql/execution/ExecutionStrategy.java @@ -15,6 +15,7 @@ import graphql.execution.directives.QueryDirectives; import graphql.execution.directives.QueryDirectivesImpl; import graphql.execution.incremental.DeferredExecutionSupport; +import graphql.execution.incremental.IncrementalExecutionContextKeys; import graphql.execution.instrumentation.ExecuteObjectInstrumentationContext; import graphql.execution.instrumentation.FieldFetchingInstrumentationContext; import graphql.execution.instrumentation.Instrumentation; @@ -325,7 +326,16 @@ DeferredExecutionSupport createDeferredExecutionSupport(ExecutionContext executi Object fieldValueInfo = resolveFieldWithInfo(executionContext, newParameters); futures.addObject(fieldValueInfo); } + } + + if (executionContext.hasIncrementalSupport() + && deferredExecutionSupport.deferredFieldsCount() > 0 + && executionContext.getGraphQLContext().getBoolean(IncrementalExecutionContextKeys.ENABLE_EAGER_DEFER_START, false)) { + + executionContext.getIncrementalCallState().startDrainingNow(); + } + return futures; } diff --git a/src/main/java/graphql/execution/incremental/IncrementalCallState.java b/src/main/java/graphql/execution/incremental/IncrementalCallState.java index f2c0b9dbc7..fc1f352ca3 100644 --- a/src/main/java/graphql/execution/incremental/IncrementalCallState.java +++ b/src/main/java/graphql/execution/incremental/IncrementalCallState.java @@ -104,4 +104,8 @@ public Publisher startDeferredCalls() { return publisher.get(); } + public void startDrainingNow() { + drainIncrementalCalls(); + } + } diff --git a/src/main/java/graphql/execution/incremental/IncrementalExecutionContextKeys.java b/src/main/java/graphql/execution/incremental/IncrementalExecutionContextKeys.java new file mode 100644 index 0000000000..293a4ca4fb --- /dev/null +++ b/src/main/java/graphql/execution/incremental/IncrementalExecutionContextKeys.java @@ -0,0 +1,26 @@ +package graphql.execution.incremental; + + +import graphql.Internal; +import org.jspecify.annotations.NullMarked; + +/** + * GraphQLContext keys for controlling incremental execution behavior. + */ +@Internal +@NullMarked +public final class IncrementalExecutionContextKeys { + private IncrementalExecutionContextKeys() { + } + + /** + * Enables eager start of @defer processing so defered work runs before the initial result is computed. + * Defaults to false. + *

+ * Expects a boolean value. + */ + public static final String ENABLE_EAGER_DEFER_START = "__GJ_enable_eager_defer_start"; + +} + + diff --git a/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy b/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy index b3b522d90b..487caee669 100644 --- a/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy +++ b/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy @@ -7,6 +7,7 @@ import graphql.ExecutionResult import graphql.ExperimentalApi import graphql.GraphQL import graphql.GraphqlErrorBuilder +import graphql.GraphQLContext import graphql.TestUtil import graphql.execution.DataFetcherResult import graphql.execution.pubsub.CapturingSubscriber @@ -27,6 +28,8 @@ import spock.lang.Specification import spock.lang.Unroll import java.util.concurrent.CompletableFuture +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring @@ -1726,6 +1729,151 @@ class DeferExecutionSupportIntegrationTest extends Specification { } + def "eager defer starts before initial result completes when ENABLE_EAGER_DEFER_START"() { + given: + def deferStarted = new CountDownLatch(1) + def allowDeferredComplete = new CountDownLatch(1) + + def runtimeWiring = RuntimeWiring.newRuntimeWiring() + .type(newTypeWiring("Query") + .dataFetcher("post", resolve([id: "1001"])) + ) + .type(newTypeWiring("Query").dataFetcher("hello", resolve("world", 4000))) + .type(newTypeWiring("Post").dataFetcher("summary", { env -> + deferStarted.countDown() + allowDeferredComplete.await(2, TimeUnit.SECONDS) + CompletableFuture.completedFuture("A summary") + } as DataFetcher)) + .type(newTypeWiring("Item").typeResolver(itemTypeResolver())) + .build() + + def schema = TestUtil.schema(schemaSpec, runtimeWiring) + .transform({ b -> b.additionalDirective(Directives.DeferDirective) }) + def testGraphQL = GraphQL.newGraphQL(schema).build() + + def ctx = GraphQLContext.newContext().build() + ctx.put(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT, true) + ctx.put(IncrementalExecutionContextKeys.ENABLE_EAGER_DEFER_START, true) + + def query = ''' + query { + hello + ... @defer { post { summary } } + } + ''' + + when: + def executionInput = ExecutionInput.newExecutionInput() + .graphQLContext([(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT): true, (IncrementalExecutionContextKeys.ENABLE_EAGER_DEFER_START): true]) + .query(query) + .build() + def execFuture = CompletableFuture.supplyAsync { + testGraphQL.execute(executionInput) + } + + then: + // Deferred fetcher starts while initial result is still computing + assert deferStarted.await(2000, TimeUnit.MILLISECONDS) + assert !execFuture.isDone() + + when: + allowDeferredComplete.countDown() + def initialResult = execFuture.join() as IncrementalExecutionResult + + then: + assert initialResult.toSpecification() == [ + data : [hello: "world"], + hasNext: true + ] + + when: + def incrementalResults = getIncrementalResults(initialResult) + + then: + incrementalResults == [ + [ + hasNext : false, + incremental: [ + [ + path: [], + data: [post: [summary: "A summary"]] + ] + ] + ] + ] + } + + + def "incremental starts only after initial result when eager start disabled"() { + given: + def deferStarted = new CountDownLatch(1) + def allowDeferredComplete = new CountDownLatch(1) + + def runtimeWiring = RuntimeWiring.newRuntimeWiring() + .type(newTypeWiring("Query") + .dataFetcher("post", resolve([id: "1001"])) + ) + .type(newTypeWiring("Query").dataFetcher("hello", resolve("world", 300))) + .type(newTypeWiring("Post").dataFetcher("summary", { env -> + deferStarted.countDown() + allowDeferredComplete.await(2, TimeUnit.SECONDS) + CompletableFuture.completedFuture("A summary") + } as DataFetcher)) + .type(newTypeWiring("Item").typeResolver(itemTypeResolver())) + .build() + + def schema = TestUtil.schema(schemaSpec, runtimeWiring) + .transform({ b -> b.additionalDirective(Directives.DeferDirective) }) + def testGraphQL = GraphQL.newGraphQL(schema).build() + + def query = ''' + query { + hello + ... @defer { post { summary } } + } + ''' + + when: + def executionInput = ExecutionInput.newExecutionInput() + .graphQLContext([(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT): true]) // no eager flag + .query(query) + .build() + def execFuture = CompletableFuture.supplyAsync { + testGraphQL.execute(executionInput) + } + + then: + assert !deferStarted.await(100, TimeUnit.MILLISECONDS) + assert !execFuture.isDone() + + when: + def initialResult = execFuture.join() as IncrementalExecutionResult + + then: + assert initialResult.toSpecification() == [ + data : [hello: "world"], + hasNext: true + ] + assert deferStarted.count == 1 // still not started, no subscriber yet + + when: + allowDeferredComplete.countDown() + def incrementalResults = getIncrementalResults(initialResult) + + then: + incrementalResults == [ + [ + hasNext : false, + incremental: [ + [ + path: [], + data: [post: [summary: "A summary"]] + ] + ] + ] + ] + } + private ExecutionResult executeQuery(String query) { return this.executeQuery(query, true, [:])