Skip to content

Commit aa0b31e

Browse files
authored
Merge pull request #4174 from llin2/GQLGW-5297-optimise-incremental-part-execution-for-defer-requests
GQLGW-5297-optimise-incremental-part-execution-for-defer-requests
2 parents ed0791c + 18207de commit aa0b31e

File tree

5 files changed

+198
-0
lines changed

5 files changed

+198
-0
lines changed

src/main/java/graphql/GraphQLUnusualConfiguration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package graphql;
22

33
import graphql.execution.ResponseMapFactory;
4+
import graphql.execution.incremental.IncrementalExecutionContextKeys;
45
import graphql.introspection.GoodFaithIntrospection;
56
import graphql.parser.ParserOptions;
67
import graphql.schema.PropertyDataFetcherHelper;
@@ -337,6 +338,15 @@ public IncrementalSupportConfig enableIncrementalSupport(boolean enable) {
337338
contextConfig.put(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT, enable);
338339
return this;
339340
}
341+
342+
/**
343+
* This controls whether @defer field execution starts as early as possible.
344+
*/
345+
@ExperimentalApi
346+
public IncrementalSupportConfig enableEarlyIncrementalFieldExecution(boolean enable) {
347+
contextConfig.put(IncrementalExecutionContextKeys.ENABLE_EAGER_DEFER_START, enable);
348+
return this;
349+
}
340350
}
341351

342352
public static class DataloaderConfig extends BaseContextConfig {

src/main/java/graphql/execution/ExecutionStrategy.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import graphql.execution.directives.QueryDirectives;
1616
import graphql.execution.directives.QueryDirectivesImpl;
1717
import graphql.execution.incremental.DeferredExecutionSupport;
18+
import graphql.execution.incremental.IncrementalExecutionContextKeys;
1819
import graphql.execution.instrumentation.ExecuteObjectInstrumentationContext;
1920
import graphql.execution.instrumentation.FieldFetchingInstrumentationContext;
2021
import graphql.execution.instrumentation.Instrumentation;
@@ -325,7 +326,16 @@ DeferredExecutionSupport createDeferredExecutionSupport(ExecutionContext executi
325326
Object fieldValueInfo = resolveFieldWithInfo(executionContext, newParameters);
326327
futures.addObject(fieldValueInfo);
327328
}
329+
328330
}
331+
332+
if (executionContext.hasIncrementalSupport()
333+
&& deferredExecutionSupport.deferredFieldsCount() > 0
334+
&& executionContext.getGraphQLContext().getBoolean(IncrementalExecutionContextKeys.ENABLE_EAGER_DEFER_START, false)) {
335+
336+
executionContext.getIncrementalCallState().startDrainingNow();
337+
}
338+
329339
return futures;
330340
}
331341

src/main/java/graphql/execution/incremental/IncrementalCallState.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,8 @@ public Publisher<DelayedIncrementalPartialResult> startDeferredCalls() {
104104
return publisher.get();
105105
}
106106

107+
public void startDrainingNow() {
108+
drainIncrementalCalls();
109+
}
110+
107111
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package graphql.execution.incremental;
2+
3+
4+
import graphql.Internal;
5+
import org.jspecify.annotations.NullMarked;
6+
7+
/**
8+
* GraphQLContext keys for controlling incremental execution behavior.
9+
*/
10+
@Internal
11+
@NullMarked
12+
public final class IncrementalExecutionContextKeys {
13+
private IncrementalExecutionContextKeys() {
14+
}
15+
16+
/**
17+
* Enables eager start of @defer processing so defered work runs before the initial result is computed.
18+
* Defaults to false.
19+
* <p>
20+
* Expects a boolean value.
21+
*/
22+
public static final String ENABLE_EAGER_DEFER_START = "__GJ_enable_eager_defer_start";
23+
24+
}
25+
26+

src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import graphql.ExecutionResult
77
import graphql.ExperimentalApi
88
import graphql.GraphQL
99
import graphql.GraphqlErrorBuilder
10+
import graphql.GraphQLContext
1011
import graphql.TestUtil
1112
import graphql.execution.DataFetcherResult
1213
import graphql.execution.pubsub.CapturingSubscriber
@@ -27,6 +28,8 @@ import spock.lang.Specification
2728
import spock.lang.Unroll
2829

2930
import java.util.concurrent.CompletableFuture
31+
import java.util.concurrent.CountDownLatch
32+
import java.util.concurrent.TimeUnit
3033
import java.util.concurrent.atomic.AtomicInteger
3134

3235
import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring
@@ -1726,6 +1729,151 @@ class DeferExecutionSupportIntegrationTest extends Specification {
17261729

17271730
}
17281731

1732+
def "eager defer starts before initial result completes when ENABLE_EAGER_DEFER_START"() {
1733+
given:
1734+
def deferStarted = new CountDownLatch(1)
1735+
def allowDeferredComplete = new CountDownLatch(1)
1736+
1737+
def runtimeWiring = RuntimeWiring.newRuntimeWiring()
1738+
.type(newTypeWiring("Query")
1739+
.dataFetcher("post", resolve([id: "1001"]))
1740+
)
1741+
.type(newTypeWiring("Query").dataFetcher("hello", resolve("world", 4000)))
1742+
.type(newTypeWiring("Post").dataFetcher("summary", { env ->
1743+
deferStarted.countDown()
1744+
allowDeferredComplete.await(2, TimeUnit.SECONDS)
1745+
CompletableFuture.completedFuture("A summary")
1746+
} as DataFetcher))
1747+
.type(newTypeWiring("Item").typeResolver(itemTypeResolver()))
1748+
.build()
1749+
1750+
def schema = TestUtil.schema(schemaSpec, runtimeWiring)
1751+
.transform({ b -> b.additionalDirective(Directives.DeferDirective) })
1752+
def testGraphQL = GraphQL.newGraphQL(schema).build()
1753+
1754+
def ctx = GraphQLContext.newContext().build()
1755+
ctx.put(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT, true)
1756+
ctx.put(IncrementalExecutionContextKeys.ENABLE_EAGER_DEFER_START, true)
1757+
1758+
def query = '''
1759+
query {
1760+
hello
1761+
... @defer { post { summary } }
1762+
}
1763+
'''
1764+
1765+
when:
1766+
def executionInput = ExecutionInput.newExecutionInput()
1767+
.graphQLContext([(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT): true, (IncrementalExecutionContextKeys.ENABLE_EAGER_DEFER_START): true])
1768+
.query(query)
1769+
.build()
1770+
def execFuture = CompletableFuture.supplyAsync {
1771+
testGraphQL.execute(executionInput)
1772+
}
1773+
1774+
then:
1775+
// Deferred fetcher starts while initial result is still computing
1776+
assert deferStarted.await(2000, TimeUnit.MILLISECONDS)
1777+
assert !execFuture.isDone()
1778+
1779+
when:
1780+
allowDeferredComplete.countDown()
1781+
def initialResult = execFuture.join() as IncrementalExecutionResult
1782+
1783+
then:
1784+
assert initialResult.toSpecification() == [
1785+
data : [hello: "world"],
1786+
hasNext: true
1787+
]
1788+
1789+
when:
1790+
def incrementalResults = getIncrementalResults(initialResult)
1791+
1792+
then:
1793+
incrementalResults == [
1794+
[
1795+
hasNext : false,
1796+
incremental: [
1797+
[
1798+
path: [],
1799+
data: [post: [summary: "A summary"]]
1800+
]
1801+
]
1802+
]
1803+
]
1804+
}
1805+
1806+
1807+
def "incremental starts only after initial result when eager start disabled"() {
1808+
given:
1809+
def deferStarted = new CountDownLatch(1)
1810+
def allowDeferredComplete = new CountDownLatch(1)
1811+
1812+
def runtimeWiring = RuntimeWiring.newRuntimeWiring()
1813+
.type(newTypeWiring("Query")
1814+
.dataFetcher("post", resolve([id: "1001"]))
1815+
)
1816+
.type(newTypeWiring("Query").dataFetcher("hello", resolve("world", 300)))
1817+
.type(newTypeWiring("Post").dataFetcher("summary", { env ->
1818+
deferStarted.countDown()
1819+
allowDeferredComplete.await(2, TimeUnit.SECONDS)
1820+
CompletableFuture.completedFuture("A summary")
1821+
} as DataFetcher))
1822+
.type(newTypeWiring("Item").typeResolver(itemTypeResolver()))
1823+
.build()
1824+
1825+
def schema = TestUtil.schema(schemaSpec, runtimeWiring)
1826+
.transform({ b -> b.additionalDirective(Directives.DeferDirective) })
1827+
def testGraphQL = GraphQL.newGraphQL(schema).build()
1828+
1829+
def query = '''
1830+
query {
1831+
hello
1832+
... @defer { post { summary } }
1833+
}
1834+
'''
1835+
1836+
when:
1837+
def executionInput = ExecutionInput.newExecutionInput()
1838+
.graphQLContext([(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT): true]) // no eager flag
1839+
.query(query)
1840+
.build()
1841+
def execFuture = CompletableFuture.supplyAsync {
1842+
testGraphQL.execute(executionInput)
1843+
}
1844+
1845+
then:
1846+
assert !deferStarted.await(100, TimeUnit.MILLISECONDS)
1847+
assert !execFuture.isDone()
1848+
1849+
when:
1850+
def initialResult = execFuture.join() as IncrementalExecutionResult
1851+
1852+
then:
1853+
assert initialResult.toSpecification() == [
1854+
data : [hello: "world"],
1855+
hasNext: true
1856+
]
1857+
assert deferStarted.count == 1 // still not started, no subscriber yet
1858+
1859+
when:
1860+
allowDeferredComplete.countDown()
1861+
def incrementalResults = getIncrementalResults(initialResult)
1862+
1863+
then:
1864+
incrementalResults == [
1865+
[
1866+
hasNext : false,
1867+
incremental: [
1868+
[
1869+
path: [],
1870+
data: [post: [summary: "A summary"]]
1871+
]
1872+
]
1873+
]
1874+
]
1875+
}
1876+
17291877

17301878
private ExecutionResult executeQuery(String query) {
17311879
return this.executeQuery(query, true, [:])

0 commit comments

Comments
 (0)