@@ -7,6 +7,7 @@ import graphql.ExecutionResult
77import graphql.ExperimentalApi
88import graphql.GraphQL
99import graphql.GraphqlErrorBuilder
10+ import graphql.GraphQLContext
1011import graphql.TestUtil
1112import graphql.execution.DataFetcherResult
1213import graphql.execution.pubsub.CapturingSubscriber
@@ -27,6 +28,8 @@ import spock.lang.Specification
2728import spock.lang.Unroll
2829
2930import java.util.concurrent.CompletableFuture
31+ import java.util.concurrent.CountDownLatch
32+ import java.util.concurrent.TimeUnit
3033import java.util.concurrent.atomic.AtomicInteger
3134
3235import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring
@@ -1726,6 +1729,151 @@ class DeferExecutionSupportIntegrationTest extends Specification {
17261729
17271730 }
17281731
1732+ def " eager defer starts before initial result completes when ENABLE_EAGER_DEFER_START" () {
1733+ given :
1734+ def deferStarted = new CountDownLatch (1 )
1735+ def allowDeferredComplete = new CountDownLatch (1 )
1736+
1737+ def runtimeWiring = RuntimeWiring . newRuntimeWiring()
1738+ .type(newTypeWiring(" Query" )
1739+ .dataFetcher(" post" , resolve([id : " 1001" ]))
1740+ )
1741+ .type(newTypeWiring(" Query" ). dataFetcher(" hello" , resolve(" world" , 4000 )))
1742+ .type(newTypeWiring(" Post" ). dataFetcher(" summary" , { env ->
1743+ deferStarted. countDown()
1744+ allowDeferredComplete. await(2 , TimeUnit . SECONDS )
1745+ CompletableFuture . completedFuture(" A summary" )
1746+ } as DataFetcher ))
1747+ .type(newTypeWiring(" Item" ). typeResolver(itemTypeResolver()))
1748+ .build()
1749+
1750+ def schema = TestUtil . schema(schemaSpec, runtimeWiring)
1751+ .transform({ b -> b. additionalDirective(Directives.DeferDirective ) })
1752+ def testGraphQL = GraphQL . newGraphQL(schema). build()
1753+
1754+ def ctx = GraphQLContext . newContext(). build()
1755+ ctx. put(ExperimentalApi . ENABLE_INCREMENTAL_SUPPORT , true )
1756+ ctx. put(IncrementalExecutionContextKeys . ENABLE_EAGER_DEFER_START , true )
1757+
1758+ def query = '''
1759+ query {
1760+ hello
1761+ ... @defer { post { summary } }
1762+ }
1763+ '''
1764+
1765+ when :
1766+ def executionInput = ExecutionInput . newExecutionInput()
1767+ .graphQLContext([(ExperimentalApi . ENABLE_INCREMENTAL_SUPPORT ): true , (IncrementalExecutionContextKeys . ENABLE_EAGER_DEFER_START ): true ])
1768+ .query(query)
1769+ .build()
1770+ def execFuture = CompletableFuture . supplyAsync {
1771+ testGraphQL. execute(executionInput)
1772+ }
1773+
1774+ then :
1775+ // Deferred fetcher starts while initial result is still computing
1776+ assert deferStarted. await(2000 , TimeUnit . MILLISECONDS )
1777+ assert ! execFuture. isDone()
1778+
1779+ when :
1780+ allowDeferredComplete. countDown()
1781+ def initialResult = execFuture. join() as IncrementalExecutionResult
1782+
1783+ then :
1784+ assert initialResult. toSpecification() == [
1785+ data : [hello : " world" ],
1786+ hasNext : true
1787+ ]
1788+
1789+ when :
1790+ def incrementalResults = getIncrementalResults(initialResult)
1791+
1792+ then :
1793+ incrementalResults == [
1794+ [
1795+ hasNext : false ,
1796+ incremental : [
1797+ [
1798+ path : [],
1799+ data : [post : [summary : " A summary" ]]
1800+ ]
1801+ ]
1802+ ]
1803+ ]
1804+ }
1805+
1806+
1807+ def " incremental starts only after initial result when eager start disabled" () {
1808+ given :
1809+ def deferStarted = new CountDownLatch (1 )
1810+ def allowDeferredComplete = new CountDownLatch (1 )
1811+
1812+ def runtimeWiring = RuntimeWiring . newRuntimeWiring()
1813+ .type(newTypeWiring(" Query" )
1814+ .dataFetcher(" post" , resolve([id : " 1001" ]))
1815+ )
1816+ .type(newTypeWiring(" Query" ). dataFetcher(" hello" , resolve(" world" , 300 )))
1817+ .type(newTypeWiring(" Post" ). dataFetcher(" summary" , { env ->
1818+ deferStarted. countDown()
1819+ allowDeferredComplete. await(2 , TimeUnit . SECONDS )
1820+ CompletableFuture . completedFuture(" A summary" )
1821+ } as DataFetcher ))
1822+ .type(newTypeWiring(" Item" ). typeResolver(itemTypeResolver()))
1823+ .build()
1824+
1825+ def schema = TestUtil . schema(schemaSpec, runtimeWiring)
1826+ .transform({ b -> b. additionalDirective(Directives.DeferDirective ) })
1827+ def testGraphQL = GraphQL . newGraphQL(schema). build()
1828+
1829+ def query = '''
1830+ query {
1831+ hello
1832+ ... @defer { post { summary } }
1833+ }
1834+ '''
1835+
1836+ when :
1837+ def executionInput = ExecutionInput . newExecutionInput()
1838+ .graphQLContext([(ExperimentalApi . ENABLE_INCREMENTAL_SUPPORT ): true ]) // no eager flag
1839+ .query(query)
1840+ .build()
1841+ def execFuture = CompletableFuture . supplyAsync {
1842+ testGraphQL. execute(executionInput)
1843+ }
1844+
1845+ then :
1846+ assert ! deferStarted. await(100 , TimeUnit . MILLISECONDS )
1847+ assert ! execFuture. isDone()
1848+
1849+ when :
1850+ def initialResult = execFuture. join() as IncrementalExecutionResult
1851+
1852+ then :
1853+ assert initialResult. toSpecification() == [
1854+ data : [hello : " world" ],
1855+ hasNext : true
1856+ ]
1857+ assert deferStarted. count == 1 // still not started, no subscriber yet
1858+
1859+ when :
1860+ allowDeferredComplete. countDown()
1861+ def incrementalResults = getIncrementalResults(initialResult)
1862+
1863+ then :
1864+ incrementalResults == [
1865+ [
1866+ hasNext : false ,
1867+ incremental : [
1868+ [
1869+ path : [],
1870+ data : [post : [summary : " A summary" ]]
1871+ ]
1872+ ]
1873+ ]
1874+ ]
1875+ }
1876+
17291877
17301878 private ExecutionResult executeQuery (String query ) {
17311879 return this . executeQuery(query, true , [:])
0 commit comments