Skip to content

Commit bd67c78

Browse files
committed
3662 - fixes dataloader dispatching during subscriptions
1 parent 579c174 commit bd67c78

File tree

4 files changed

+150
-27
lines changed

4 files changed

+150
-27
lines changed

src/main/java/graphql/execution/ExecutionContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public class ExecutionContext {
8686
this.errors.set(builder.errors);
8787
this.localContext = builder.localContext;
8888
this.executionInput = builder.executionInput;
89+
this.dataLoaderDispatcherStrategy = builder.dataLoaderDispatcherStrategy;
8990
this.queryTree = FpKit.interThreadMemoize(() -> ExecutableNormalizedOperationFactory.createExecutableNormalizedOperation(graphQLSchema, operationDefinition, fragmentsByName, coercedVariables));
9091
}
9192

src/main/java/graphql/execution/ExecutionContextBuilder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class ExecutionContextBuilder {
4545
ValueUnboxer valueUnboxer;
4646
Object localContext;
4747
ExecutionInput executionInput;
48+
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = DataLoaderDispatchStrategy.NO_OP;
4849

4950
/**
5051
* @return a new builder of {@link graphql.execution.ExecutionContext}s
@@ -90,6 +91,7 @@ public ExecutionContextBuilder() {
9091
errors = ImmutableList.copyOf(other.getErrors());
9192
valueUnboxer = other.getValueUnboxer();
9293
executionInput = other.getExecutionInput();
94+
dataLoaderDispatcherStrategy = other.getDataLoaderDispatcherStrategy();
9395
}
9496

9597
public ExecutionContextBuilder instrumentation(Instrumentation instrumentation) {
@@ -203,6 +205,12 @@ public ExecutionContextBuilder executionInput(ExecutionInput executionInput) {
203205
return this;
204206
}
205207

208+
@Internal
209+
public ExecutionContextBuilder dataLoaderDispatcherStrategy(DataLoaderDispatchStrategy dataLoaderDispatcherStrategy) {
210+
this.dataLoaderDispatcherStrategy = dataLoaderDispatcherStrategy;
211+
return this;
212+
}
213+
206214
public ExecutionContextBuilder resetErrors() {
207215
this.errors = emptyList();
208216
return this;

src/test/groovy/graphql/execution/ExecutionContextBuilderTest.groovy

Lines changed: 58 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,26 @@ class ExecutionContextBuilderTest extends Specification {
2525
def operation = document.definitions[0] as OperationDefinition
2626
def fragment = document.definitions[1] as FragmentDefinition
2727
def dataLoaderRegistry = new DataLoaderRegistry()
28+
def mockDataLoaderDispatcherStrategy = Mock(DataLoaderDispatchStrategy)
2829

2930
def "builds the correct ExecutionContext"() {
3031
when:
3132
def executionContext = new ExecutionContextBuilder()
32-
.instrumentation(instrumentation)
33-
.queryStrategy(queryStrategy)
34-
.mutationStrategy(mutationStrategy)
35-
.subscriptionStrategy(subscriptionStrategy)
36-
.graphQLSchema(schema)
37-
.executionId(executionId)
38-
.context(context) // Retain deprecated builder for test coverage
39-
.graphQLContext(graphQLContext)
40-
.root(root)
41-
.operationDefinition(operation)
42-
.fragmentsByName([MyFragment: fragment])
43-
.variables([var: 'value']) // Retain deprecated builder for test coverage
44-
.dataLoaderRegistry(dataLoaderRegistry)
45-
.build()
33+
.instrumentation(instrumentation)
34+
.queryStrategy(queryStrategy)
35+
.mutationStrategy(mutationStrategy)
36+
.subscriptionStrategy(subscriptionStrategy)
37+
.graphQLSchema(schema)
38+
.executionId(executionId)
39+
.context(context) // Retain deprecated builder for test coverage
40+
.graphQLContext(graphQLContext)
41+
.root(root)
42+
.operationDefinition(operation)
43+
.fragmentsByName([MyFragment: fragment])
44+
.variables([var: 'value']) // Retain deprecated builder for test coverage
45+
.dataLoaderRegistry(dataLoaderRegistry)
46+
.dataLoaderDispatcherStrategy(mockDataLoaderDispatcherStrategy)
47+
.build()
4648

4749
then:
4850
executionContext.executionId == executionId
@@ -58,6 +60,7 @@ class ExecutionContextBuilderTest extends Specification {
5860
executionContext.getFragmentsByName() == [MyFragment: fragment]
5961
executionContext.operationDefinition == operation
6062
executionContext.dataLoaderRegistry == dataLoaderRegistry
63+
executionContext.dataLoaderDispatcherStrategy == mockDataLoaderDispatcherStrategy
6164
}
6265

6366
def "builds the correct ExecutionContext with coerced variables"() {
@@ -66,19 +69,19 @@ class ExecutionContextBuilderTest extends Specification {
6669

6770
when:
6871
def executionContext = new ExecutionContextBuilder()
69-
.instrumentation(instrumentation)
70-
.queryStrategy(queryStrategy)
71-
.mutationStrategy(mutationStrategy)
72-
.subscriptionStrategy(subscriptionStrategy)
73-
.graphQLSchema(schema)
74-
.executionId(executionId)
75-
.graphQLContext(graphQLContext)
76-
.root(root)
77-
.operationDefinition(operation)
78-
.fragmentsByName([MyFragment: fragment])
79-
.coercedVariables(coercedVariables)
80-
.dataLoaderRegistry(dataLoaderRegistry)
81-
.build()
72+
.instrumentation(instrumentation)
73+
.queryStrategy(queryStrategy)
74+
.mutationStrategy(mutationStrategy)
75+
.subscriptionStrategy(subscriptionStrategy)
76+
.graphQLSchema(schema)
77+
.executionId(executionId)
78+
.graphQLContext(graphQLContext)
79+
.root(root)
80+
.operationDefinition(operation)
81+
.fragmentsByName([MyFragment: fragment])
82+
.coercedVariables(coercedVariables)
83+
.dataLoaderRegistry(dataLoaderRegistry)
84+
.build()
8285

8386
then:
8487
executionContext.executionId == executionId
@@ -205,4 +208,32 @@ class ExecutionContextBuilderTest extends Specification {
205208
executionContext.operationDefinition == operation
206209
executionContext.dataLoaderRegistry == dataLoaderRegistry
207210
}
211+
212+
def "transform copies dispatcher"() {
213+
given:
214+
def oldCoercedVariables = CoercedVariables.emptyVariables()
215+
def executionContextOld = new ExecutionContextBuilder()
216+
.instrumentation(instrumentation)
217+
.queryStrategy(queryStrategy)
218+
.mutationStrategy(mutationStrategy)
219+
.subscriptionStrategy(subscriptionStrategy)
220+
.graphQLSchema(schema)
221+
.executionId(executionId)
222+
.graphQLContext(graphQLContext)
223+
.root(root)
224+
.operationDefinition(operation)
225+
.coercedVariables(oldCoercedVariables)
226+
.fragmentsByName([MyFragment: fragment])
227+
.dataLoaderRegistry(dataLoaderRegistry)
228+
.dataLoaderDispatcherStrategy(DataLoaderDispatchStrategy.NO_OP)
229+
.build()
230+
231+
when:
232+
def executionContext = executionContextOld
233+
.transform(builder -> builder
234+
.dataLoaderDispatcherStrategy(mockDataLoaderDispatcherStrategy))
235+
236+
then:
237+
executionContext.getDataLoaderDispatcherStrategy() == mockDataLoaderDispatcherStrategy
238+
}
208239
}

src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderDispatcherTest.groovy

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
package graphql.execution.instrumentation.dataloader
22

33
import graphql.ExecutionInput
4+
import graphql.ExecutionResult
45
import graphql.GraphQL
56
import graphql.TestUtil
67
import graphql.execution.AsyncSerialExecutionStrategy
78
import graphql.execution.instrumentation.ChainedInstrumentation
89
import graphql.execution.instrumentation.InstrumentationState
910
import graphql.execution.instrumentation.SimplePerformantInstrumentation
1011
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters
12+
import graphql.execution.pubsub.CapturingSubscriber
1113
import graphql.schema.DataFetcher
14+
import graphql.schema.DataFetchingEnvironment
15+
import org.awaitility.Awaitility
1216
import org.dataloader.BatchLoader
1317
import org.dataloader.DataLoaderFactory
1418
import org.dataloader.DataLoaderRegistry
1519
import org.jetbrains.annotations.NotNull
20+
import org.reactivestreams.Publisher
21+
import reactor.core.publisher.Mono
1622
import spock.lang.Specification
1723
import spock.lang.Unroll
1824

@@ -275,4 +281,81 @@ class DataLoaderDispatcherTest extends Specification {
275281
er.errors.isEmpty()
276282
er.data == support.buildResponse(depth)
277283
}
284+
285+
def "issue 3662 - dataloader dispatching can work with subscriptions"() {
286+
287+
def sdl = '''
288+
type Query {
289+
field : String
290+
}
291+
292+
type Subscription {
293+
onSub : OnSub
294+
}
295+
296+
type OnSub {
297+
x : String
298+
y : String
299+
}
300+
'''
301+
302+
// the dispatching is ALWAYS so not really batching but it completes
303+
BatchLoader batchLoader = { keys ->
304+
CompletableFuture.supplyAsync {
305+
Thread.sleep(50) // some delay
306+
keys
307+
}
308+
}
309+
310+
DataFetcher dlDF = { DataFetchingEnvironment env ->
311+
def dataLoader = env.getDataLoaderRegistry().getDataLoader("dl")
312+
return dataLoader.load("working as expected")
313+
}
314+
DataFetcher dlSub = { DataFetchingEnvironment env ->
315+
return Mono.just([x: "X", y: "Y"])
316+
}
317+
def runtimeWiring = newRuntimeWiring()
318+
.type(newTypeWiring("OnSub")
319+
.dataFetcher("x", dlDF)
320+
.dataFetcher("y", dlDF)
321+
.build()
322+
)
323+
.type(newTypeWiring("Subscription")
324+
.dataFetcher("onSub", dlSub)
325+
.build()
326+
)
327+
.build()
328+
329+
def graphql = TestUtil.graphQL(sdl, runtimeWiring).build()
330+
331+
DataLoaderRegistry dataLoaderRegistry = new DataLoaderRegistry()
332+
dataLoaderRegistry.register("dl", DataLoaderFactory.newDataLoader(batchLoader))
333+
334+
when:
335+
def query = """
336+
subscription s {
337+
onSub {
338+
x, y
339+
}
340+
}
341+
"""
342+
def executionInput = newExecutionInput()
343+
.dataLoaderRegistry(dataLoaderRegistry)
344+
.query(query)
345+
.build()
346+
def er = graphql.execute(executionInput)
347+
348+
then:
349+
er.errors.isEmpty()
350+
def subscriber = new CapturingSubscriber()
351+
Publisher pub = er.data
352+
pub.subscribe(subscriber)
353+
354+
Awaitility.await().untilTrue(subscriber.isDone())
355+
356+
subscriber.getEvents().size() == 1
357+
358+
def msgER = subscriber.getEvents()[0] as ExecutionResult
359+
msgER.data == [onSub: [x: "working as expected", y: "working as expected"]]
360+
}
278361
}

0 commit comments

Comments
 (0)