Skip to content

Commit d608ad5

Browse files
committed
Add targeted unit tests for ExhaustedDataLoaderDispatchStrategy
Add 8 deterministic tests that directly exercise the concurrent state machine in ExhaustedDataLoaderDispatchStrategy, recovering branch and line coverage lost when PR #4299 replaced race-condition-dependent timing with deterministic latch-based synchronization. Tests cover: - Basic dispatch cycle (finishedFetching triggers dispatch) - Early return in newDataLoaderInvocation when flag already set - Dispatch from newDataLoaderInvocation when objectRunningCount is 0 - Multiple dispatch rounds with chained data loader invocations - executionSerialStrategy state reset - Subscription alternative call context with separate call stack - startComplete/stopComplete dispatch coordination - Deferred call context lazy call stack creation via computeIfAbsent https://claude.ai/code/session_01WwTVwjXS1wakJQmZYsXLvY
1 parent 20e1dc6 commit d608ad5

File tree

1 file changed

+301
-0
lines changed

1 file changed

+301
-0
lines changed
Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
package graphql.execution.instrumentation.dataloader
2+
3+
import graphql.ExecutionInput
4+
import graphql.EngineRunningState
5+
import graphql.GraphQLContext
6+
import graphql.Profiler
7+
import graphql.execution.AsyncExecutionStrategy
8+
import graphql.execution.CoercedVariables
9+
import graphql.execution.ExecutionContext
10+
import graphql.execution.ExecutionContextBuilder
11+
import graphql.execution.ExecutionId
12+
import graphql.execution.ExecutionStepInfo
13+
import graphql.execution.ExecutionStrategyParameters
14+
import graphql.execution.NonNullableFieldValidator
15+
import graphql.execution.ResultPath
16+
import graphql.execution.incremental.AlternativeCallContext
17+
import graphql.schema.GraphQLSchema
18+
import org.dataloader.BatchLoader
19+
import org.dataloader.DataLoaderFactory
20+
import org.dataloader.DataLoaderRegistry
21+
import spock.lang.Specification
22+
23+
import java.util.concurrent.CompletableFuture
24+
import java.util.concurrent.CompletionStage
25+
import java.util.concurrent.CountDownLatch
26+
import java.util.concurrent.TimeUnit
27+
import java.util.concurrent.atomic.AtomicInteger
28+
29+
import static graphql.Scalars.GraphQLString
30+
import static graphql.execution.ExecutionStepInfo.newExecutionStepInfo
31+
import static graphql.execution.ExecutionStrategyParameters.newParameters
32+
33+
class ExhaustedDataLoaderDispatchStrategyTest extends Specification {
34+
35+
AtomicInteger batchLoaderInvocations
36+
DataLoaderRegistry dataLoaderRegistry
37+
ExecutionContext executionContext
38+
ExhaustedDataLoaderDispatchStrategy strategy
39+
40+
ExecutionStrategyParameters rootParams
41+
42+
def setup() {
43+
batchLoaderInvocations = new AtomicInteger()
44+
}
45+
46+
private void setupStrategy(BatchLoader<String, String> batchLoader) {
47+
dataLoaderRegistry = new DataLoaderRegistry()
48+
def dataLoader = DataLoaderFactory.newDataLoader(batchLoader)
49+
dataLoaderRegistry.register("testLoader", dataLoader)
50+
51+
def executionInput = ExecutionInput.newExecutionInput()
52+
.query("{ dummy }")
53+
.build()
54+
def engineRunningState = new EngineRunningState(executionInput, Profiler.NO_OP)
55+
56+
def executionStrategy = new AsyncExecutionStrategy()
57+
executionContext = new ExecutionContextBuilder()
58+
.executionId(ExecutionId.generate())
59+
.graphQLSchema(GraphQLSchema.newSchema().query(
60+
graphql.schema.GraphQLObjectType.newObject()
61+
.name("Query")
62+
.field({ f -> f.name("dummy").type(GraphQLString) })
63+
.build()
64+
).build())
65+
.queryStrategy(executionStrategy)
66+
.mutationStrategy(executionStrategy)
67+
.subscriptionStrategy(executionStrategy)
68+
.graphQLContext(GraphQLContext.newContext().build())
69+
.coercedVariables(CoercedVariables.emptyVariables())
70+
.dataLoaderRegistry(dataLoaderRegistry)
71+
.executionInput(executionInput)
72+
.profiler(Profiler.NO_OP)
73+
.engineRunningState(engineRunningState)
74+
.build()
75+
76+
strategy = new ExhaustedDataLoaderDispatchStrategy(executionContext)
77+
78+
rootParams = newParameters()
79+
.executionStepInfo(newExecutionStepInfo()
80+
.type(GraphQLString)
81+
.path(ResultPath.rootPath())
82+
.build())
83+
.source(new Object())
84+
.fields(graphql.execution.MergedSelectionSet.newMergedSelectionSet().build())
85+
.nonNullFieldValidator(new NonNullableFieldValidator(executionContext))
86+
.build()
87+
}
88+
89+
private BatchLoader<String, String> simpleBatchLoader() {
90+
return new BatchLoader<String, String>() {
91+
@Override
92+
CompletionStage<List<String>> load(List<String> keys) {
93+
batchLoaderInvocations.incrementAndGet()
94+
return CompletableFuture.completedFuture(keys)
95+
}
96+
}
97+
}
98+
99+
def "basic dispatch cycle - finishedFetching triggers dispatch when objectRunning reaches 0"() {
100+
given:
101+
setupStrategy(simpleBatchLoader())
102+
// Load a key so the data loader has pending work
103+
dataLoaderRegistry.getDataLoader("testLoader").load("key1")
104+
105+
when:
106+
// executionStrategy: increments running count to 1
107+
strategy.executionStrategy(executionContext, rootParams, 1)
108+
// newDataLoaderInvocation: sets dataLoaderToDispatch = true; running > 0 so no dispatch yet
109+
strategy.newDataLoaderInvocation(null)
110+
// finishedFetching: decrements running to 0; all conditions met -> dispatch fires
111+
strategy.finishedFetching(executionContext, rootParams)
112+
113+
// Give async dispatch a moment to complete
114+
Thread.sleep(100)
115+
116+
then:
117+
batchLoaderInvocations.get() == 1
118+
}
119+
120+
def "early return in newDataLoaderInvocation when dataLoaderToDispatch already set"() {
121+
given:
122+
setupStrategy(simpleBatchLoader())
123+
dataLoaderRegistry.getDataLoader("testLoader").load("key1")
124+
125+
when:
126+
strategy.executionStrategy(executionContext, rootParams, 1)
127+
// First call sets dataLoaderToDispatch = true
128+
strategy.newDataLoaderInvocation(null)
129+
// Second call: flag already true -> early return at line 223
130+
strategy.newDataLoaderInvocation(null)
131+
// Dispatch via finishedFetching
132+
strategy.finishedFetching(executionContext, rootParams)
133+
134+
Thread.sleep(100)
135+
136+
then:
137+
// Batch loader should be called exactly once despite two newDataLoaderInvocation calls
138+
batchLoaderInvocations.get() == 1
139+
}
140+
141+
def "dispatch triggered from newDataLoaderInvocation when objectRunningCount is already 0"() {
142+
given:
143+
setupStrategy(simpleBatchLoader())
144+
145+
when:
146+
// executionStrategy: running count = 1
147+
strategy.executionStrategy(executionContext, rootParams, 1)
148+
// finishedFetching: running count = 0, but dataLoaderToDispatch is false -> no dispatch
149+
strategy.finishedFetching(executionContext, rootParams)
150+
151+
// Now load a key so there's pending work
152+
dataLoaderRegistry.getDataLoader("testLoader").load("key1")
153+
154+
// newDataLoaderInvocation: sets dataLoaderToDispatch = true; running == 0 -> dispatches from line 233
155+
strategy.newDataLoaderInvocation(null)
156+
157+
Thread.sleep(100)
158+
159+
then:
160+
batchLoaderInvocations.get() == 1
161+
}
162+
163+
def "multiple dispatch rounds when data loader invocation happens during dispatch"() {
164+
given:
165+
def secondRoundLatch = new CountDownLatch(1)
166+
AtomicInteger roundCount = new AtomicInteger()
167+
168+
// A batch loader that on the first call, loads another key (triggering a second dispatch round)
169+
def chainedBatchLoader = new BatchLoader<String, String>() {
170+
@Override
171+
CompletionStage<List<String>> load(List<String> keys) {
172+
int round = roundCount.incrementAndGet()
173+
if (round == 1) {
174+
// During first batch, load another key to trigger second dispatch
175+
dataLoaderRegistry.getDataLoader("testLoader").load("key2")
176+
strategy.newDataLoaderInvocation(null)
177+
}
178+
if (round == 2) {
179+
secondRoundLatch.countDown()
180+
}
181+
return CompletableFuture.completedFuture(keys)
182+
}
183+
}
184+
setupStrategy(chainedBatchLoader)
185+
186+
dataLoaderRegistry.getDataLoader("testLoader").load("key1")
187+
188+
when:
189+
strategy.executionStrategy(executionContext, rootParams, 1)
190+
strategy.newDataLoaderInvocation(null)
191+
strategy.finishedFetching(executionContext, rootParams)
192+
193+
// Wait for second dispatch round
194+
def completed = secondRoundLatch.await(2, TimeUnit.SECONDS)
195+
196+
then:
197+
completed
198+
roundCount.get() == 2
199+
}
200+
201+
def "executionSerialStrategy clears and re-initializes state"() {
202+
given:
203+
setupStrategy(simpleBatchLoader())
204+
dataLoaderRegistry.getDataLoader("testLoader").load("key1")
205+
206+
when:
207+
// Start with a root execution
208+
strategy.executionStrategy(executionContext, rootParams, 1)
209+
// executionSerialStrategy clears state and re-inits running count
210+
strategy.executionSerialStrategy(executionContext, rootParams)
211+
// Set dataLoaderToDispatch
212+
strategy.newDataLoaderInvocation(null)
213+
// Finish fetching -> should dispatch
214+
strategy.finishedFetching(executionContext, rootParams)
215+
216+
Thread.sleep(100)
217+
218+
then:
219+
batchLoaderInvocations.get() == 1
220+
}
221+
222+
def "alternative call context - subscription creates separate call stack"() {
223+
given:
224+
setupStrategy(simpleBatchLoader())
225+
dataLoaderRegistry.getDataLoader("testLoader").load("key1")
226+
def altCtx = new AlternativeCallContext()
227+
228+
when:
229+
// Also start the initial call stack so it doesn't interfere
230+
strategy.executionStrategy(executionContext, rootParams, 1)
231+
232+
// Create subscription call stack
233+
strategy.newSubscriptionExecution(altCtx)
234+
// Signal data loader invocation on subscription context
235+
strategy.newDataLoaderInvocation(altCtx)
236+
// Complete subscription event -> triggers dispatch on subscription call stack
237+
strategy.subscriptionEventCompletionDone(altCtx)
238+
239+
Thread.sleep(100)
240+
241+
then:
242+
batchLoaderInvocations.get() == 1
243+
}
244+
245+
def "startComplete and stopComplete affect dispatch"() {
246+
given:
247+
setupStrategy(simpleBatchLoader())
248+
dataLoaderRegistry.getDataLoader("testLoader").load("key1")
249+
250+
when:
251+
strategy.executionStrategy(executionContext, rootParams, 1)
252+
// startComplete increments running count
253+
strategy.startComplete(rootParams)
254+
// finishedFetching decrements, but running count is still > 0 due to startComplete
255+
strategy.finishedFetching(executionContext, rootParams)
256+
// Set dataLoaderToDispatch
257+
strategy.newDataLoaderInvocation(null)
258+
// stopComplete decrements to 0 -> triggers dispatch
259+
strategy.stopComplete(rootParams)
260+
261+
Thread.sleep(100)
262+
263+
then:
264+
batchLoaderInvocations.get() == 1
265+
}
266+
267+
def "deferred call context creates lazy call stack via computeIfAbsent"() {
268+
given:
269+
setupStrategy(simpleBatchLoader())
270+
dataLoaderRegistry.getDataLoader("testLoader").load("key1")
271+
272+
// Create params with a deferred call context
273+
def deferCtx = new AlternativeCallContext(1, 1)
274+
def deferredParams = newParameters()
275+
.executionStepInfo(newExecutionStepInfo()
276+
.type(GraphQLString)
277+
.path(ResultPath.rootPath())
278+
.build())
279+
.source(new Object())
280+
.fields(graphql.execution.MergedSelectionSet.newMergedSelectionSet().build())
281+
.nonNullFieldValidator(new NonNullableFieldValidator(executionContext))
282+
.deferredCallContext(deferCtx)
283+
.build()
284+
285+
when:
286+
// Start initial execution
287+
strategy.executionStrategy(executionContext, rootParams, 1)
288+
289+
// finishedFetching with deferred params triggers lazy call stack creation
290+
// The computeIfAbsent in getCallStack creates a new CallStack and increments its running count
291+
// Then finishedFetching decrements it -> running count = 0
292+
strategy.newDataLoaderInvocation(deferCtx)
293+
strategy.finishedFetching(executionContext, deferredParams)
294+
295+
Thread.sleep(100)
296+
297+
then:
298+
// The deferred call stack dispatches independently
299+
batchLoaderInvocations.get() == 1
300+
}
301+
}

0 commit comments

Comments
 (0)