Skip to content

Commit ca6f7fa

Browse files
andimarekbbakerman
authored andcommitted
Dataloader multilevel dispatch (graphql-java#990)
* mulitlevel datoader dispatch * cleanup: remove not used code * cleanup * Made the code be contained in a separate "approach" class * Added code to allow deferred fields to work with data loader * Added Sam Le Berriguards combining calls approach to the branch * Parameters not used * Tweaked tests * Now runs BOTH approaches in the tests * Hmm it seems to be failing on travis - why?? * Async makes the tests uncertain * Unused exports * improve test by making DF truly async * working * fix batch loader, cleanup * cleanup * cleanup * try to fix NonNull handling a bit * naming * naming * fixing non null handling * cleanup * improve field tracking * Fixed up ES with new return type * Fixed tests, renamed some methods and added a builder. Also deleted a test that just isn't valid any more * Renamed the resolveField2 and made the resolveField method delegate to it. This preserves backwards compatibility * working on deferred field support * make deferred fields work * cleanup/refactoring * fix bug * More tests around data loader and depth of @defer inside it * Added more test the use async data loading * Added tests that fail if the fetching is made async * git hash debug output * Now made the batch loaders be async and not the DFs * Added documentation about the design challenge
1 parent 5e542a9 commit ca6f7fa

44 files changed

Lines changed: 1542 additions & 371 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/batching.rst

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,79 @@ build out a new ``GraphQL`` set of objects on each request.
186186
187187
// you can now throw away the GraphQL and hence DataLoaderDispatcherInstrumentation
188188
// and DataLoaderRegistry objects since they are really cheap to build per request
189+
190+
191+
Async Calls On Your Batch Loader Function Only
192+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
193+
194+
The data loader code pattern works by combining all the outstanding data loader calls into more efficient batch loading calls.
195+
196+
graphql-java tracks what outstanding data loader calls have been made and it is its responsibility to call ``dispatch``
197+
in the background at the most optimal time, which is when all graphql fields have been examined and dispatched.
198+
199+
However there is a code pattern that will cause your data loader calls to never complete and these *MUST* be avoided. This bad
200+
pattern consists of making a an asynchronous off thread call to a ``DataLoader`` in your data fetcher.
201+
202+
The following will not work (it will never complete).
203+
204+
.. code-block:: java
205+
206+
BatchLoader<String, Object> batchLoader = new BatchLoader<String, Object>() {
207+
@Override
208+
public CompletionStage<List<Object>> load(List<String> keys) {
209+
return CompletableFuture.completedFuture(getTheseCharacters(keys));
210+
}
211+
};
212+
213+
DataLoader<String, Object> characterDataLoader = new DataLoader<>(batchLoader);
214+
215+
DataFetcher dataFetcherThatCallsTheDataLoader = new DataFetcher() {
216+
@Override
217+
public Object get(DataFetchingEnvironment environment) {
218+
//
219+
// Don't DO THIS!
220+
//
221+
return CompletableFuture.supplyAsync(() -> {
222+
String argId = environment.getArgument("id");
223+
return characterDataLoader.load(argId);
224+
});
225+
}
226+
};
227+
228+
In the example above, the call to ``characterDataLoader.load(argId)`` can happen some time in the future on another thread. The graphql-java
229+
engine has no way of knowing when it's good time to dispatch outstanding ``DataLoader`` calls and hence the data loader call might never complete
230+
as expected and no results will be returned.
231+
232+
Remember a data loader call is just a promise to actually get a value later when its an optimal time for all outstanding calls to be batched
233+
together. The most optimal time is when the graphql field tree has been examined and all field values are currently dispatched.
234+
235+
The following is how you can still have asynchronous code, by placing it into the ``BatchLoader`` itself.
236+
237+
.. code-block:: java
238+
239+
BatchLoader<String, Object> batchLoader = new BatchLoader<String, Object>() {
240+
@Override
241+
public CompletionStage<List<Object>> load(List<String> keys) {
242+
return CompletableFuture.supplyAsync(() -> getTheseCharacters(keys));
243+
}
244+
};
245+
246+
DataLoader<String, Object> characterDataLoader = new DataLoader<>(batchLoader);
247+
248+
DataFetcher dataFetcherThatCallsTheDataLoader = new DataFetcher() {
249+
@Override
250+
public Object get(DataFetchingEnvironment environment) {
251+
//
252+
// This is OK
253+
//
254+
String argId = environment.getArgument("id");
255+
return characterDataLoader.load(argId);
256+
}
257+
};
258+
259+
Notice above the ``characterDataLoader.load(argId)`` returns immediately. This will enqueue the call for data until a later time when all
260+
the graphql fields are dispatched.
261+
262+
Then later when the ``DataLoader`` is dispatched, it's ``BatchLoader`` function is called. This code can be asynchronous so that if you have multiple batch loader
263+
functions they all can run at once. In the code above ``CompletableFuture.supplyAsync(() -> getTheseCharacters(keys));`` will run the ``getTheseCharacters``
264+
method in another thread.

src/main/java/graphql/execution/Async.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package graphql.execution;
22

3+
import graphql.Assert;
4+
import graphql.Internal;
5+
36
import java.util.ArrayList;
47
import java.util.Iterator;
58
import java.util.List;
69
import java.util.concurrent.CompletableFuture;
710
import java.util.concurrent.CompletionException;
811
import java.util.concurrent.CompletionStage;
912
import java.util.function.BiFunction;
10-
11-
import graphql.Assert;
12-
import graphql.Internal;
13+
import java.util.function.Supplier;
1314

1415
@Internal
1516
public class Async {
@@ -104,4 +105,30 @@ public static <T> CompletableFuture<T> toCompletableFuture(T t) {
104105
}
105106
}
106107

108+
public static <T> CompletableFuture<T> tryCatch(Supplier<CompletableFuture<T>> supplier) {
109+
try {
110+
return supplier.get();
111+
} catch (Exception e) {
112+
CompletableFuture<T> result = new CompletableFuture<>();
113+
result.completeExceptionally(e);
114+
return result;
115+
}
116+
}
117+
118+
public static <T> CompletableFuture<T> exceptionallyCompletedFuture(Throwable exception) {
119+
CompletableFuture<T> result = new CompletableFuture<>();
120+
result.completeExceptionally(exception);
121+
return result;
122+
}
123+
124+
public static <T> void copyResults(CompletableFuture<T> source, CompletableFuture<T> target) {
125+
source.whenComplete((o, throwable) -> {
126+
if (throwable != null) {
127+
target.completeExceptionally(throwable);
128+
return;
129+
}
130+
target.complete(o);
131+
});
132+
}
133+
107134
}

src/main/java/graphql/execution/AsyncExecutionStrategy.java

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,22 @@
44
import graphql.execution.defer.DeferSupport;
55
import graphql.execution.defer.DeferredCall;
66
import graphql.execution.defer.DeferredErrorSupport;
7+
import graphql.execution.instrumentation.DeferredFieldInstrumentationContext;
8+
import graphql.execution.instrumentation.ExecutionStrategyInstrumentationContext;
79
import graphql.execution.instrumentation.Instrumentation;
8-
import graphql.execution.instrumentation.InstrumentationContext;
10+
import graphql.execution.instrumentation.parameters.InstrumentationDeferredFieldParameters;
911
import graphql.execution.instrumentation.parameters.InstrumentationExecutionStrategyParameters;
1012
import graphql.language.Field;
13+
import graphql.schema.GraphQLFieldDefinition;
1114

1215
import java.util.ArrayList;
16+
import java.util.HashMap;
1317
import java.util.List;
1418
import java.util.Map;
1519
import java.util.concurrent.CompletableFuture;
20+
import java.util.function.BiConsumer;
21+
import java.util.function.Supplier;
22+
import java.util.stream.Collectors;
1623

1724
/**
1825
* The standard graphql execution strategy that runs fields asynchronously non-blocking.
@@ -41,46 +48,90 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
4148
Instrumentation instrumentation = executionContext.getInstrumentation();
4249
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);
4350

44-
InstrumentationContext<ExecutionResult> executionStrategyCtx = instrumentation.beginExecutionStrategy(instrumentationParameters);
51+
ExecutionStrategyInstrumentationContext executionStrategyCtx = instrumentation.beginExecutionStrategy(instrumentationParameters);
4552

4653
Map<String, List<Field>> fields = parameters.getFields();
4754
List<String> fieldNames = new ArrayList<>(fields.keySet());
48-
List<CompletableFuture<ExecutionResult>> futures = new ArrayList<>();
55+
List<CompletableFuture<FieldValueInfo>> futures = new ArrayList<>();
4956
List<String> resolvedFields = new ArrayList<>();
5057
for (String fieldName : fieldNames) {
5158
List<Field> currentField = fields.get(fieldName);
5259

5360
ExecutionPath fieldPath = parameters.getPath().segment(fieldName);
5461
ExecutionStrategyParameters newParameters = parameters
55-
.transform(builder -> builder.field(currentField).path(fieldPath));
62+
.transform(builder -> builder.field(currentField).path(fieldPath).parent(parameters));
5663

5764
if (isDeferred(executionContext, newParameters, currentField)) {
65+
executionStrategyCtx.onDeferredField(currentField);
5866
continue;
5967
}
6068
resolvedFields.add(fieldName);
61-
CompletableFuture<ExecutionResult> future = resolveField(executionContext, newParameters);
69+
CompletableFuture<FieldValueInfo> future = resolveFieldWithInfo(executionContext, newParameters);
6270
futures.add(future);
6371
}
64-
6572
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
6673
executionStrategyCtx.onDispatched(overallResult);
6774

68-
Async.each(futures).whenComplete(handleResults(executionContext, resolvedFields, overallResult));
75+
Async.each(futures).whenComplete((completeValueInfos, throwable) -> {
76+
BiConsumer<List<ExecutionResult>, Throwable> handleResultsConsumer = handleResults(executionContext, resolvedFields, overallResult);
77+
if (throwable != null) {
78+
handleResultsConsumer.accept(null, throwable.getCause());
79+
return;
80+
}
81+
List<CompletableFuture<ExecutionResult>> executionResultFuture = completeValueInfos.stream().map(FieldValueInfo::getFieldValue).collect(Collectors.toList());
82+
executionStrategyCtx.onFieldValuesInfo(completeValueInfos);
83+
Async.each(executionResultFuture).whenComplete(handleResultsConsumer);
84+
});
6985

7086
overallResult.whenComplete(executionStrategyCtx::onCompleted);
7187
return overallResult;
7288
}
7389

74-
private boolean isDeferred(ExecutionContext executionContext, ExecutionStrategyParameters newParameters, List<Field> currentField) {
90+
private boolean isDeferred(ExecutionContext executionContext, ExecutionStrategyParameters parameters, List<Field> currentField) {
7591
DeferSupport deferSupport = executionContext.getDeferSupport();
7692
if (deferSupport.checkForDeferDirective(currentField)) {
7793
DeferredErrorSupport errorSupport = new DeferredErrorSupport();
78-
ExecutionStrategyParameters callParameters = newParameters.transform(builder -> builder.deferredErrorSupport(errorSupport));
7994

80-
DeferredCall call = new DeferredCall(() -> resolveField(executionContext, callParameters), errorSupport);
95+
// with a deferred field we are really resetting where we execute from, that is from this current field onwards
96+
Map<String, List<Field>> fields = new HashMap<>();
97+
fields.put(currentField.get(0).getName(), currentField);
98+
99+
ExecutionStrategyParameters callParameters = parameters.transform(builder ->
100+
builder.deferredErrorSupport(errorSupport)
101+
.field(currentField)
102+
.fields(fields)
103+
.parent(null) // this is a break in the parent -> child chain - its a new start effectively
104+
.listSize(0)
105+
.currentListIndex(0)
106+
);
107+
108+
DeferredCall call = new DeferredCall(deferredExecutionResult(executionContext, callParameters), errorSupport);
81109
deferSupport.enqueue(call);
82110
return true;
83111
}
84112
return false;
85113
}
114+
115+
private Supplier<CompletableFuture<ExecutionResult>> deferredExecutionResult(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
116+
return () -> {
117+
GraphQLFieldDefinition fieldDef = getFieldDef(executionContext, parameters, parameters.getField().get(0));
118+
119+
Instrumentation instrumentation = executionContext.getInstrumentation();
120+
DeferredFieldInstrumentationContext fieldCtx = instrumentation.beginDeferredField(
121+
new InstrumentationDeferredFieldParameters(executionContext, parameters, fieldDef, fieldTypeInfo(parameters, fieldDef))
122+
);
123+
CompletableFuture<ExecutionResult> result = new CompletableFuture<>();
124+
fieldCtx.onDispatched(result);
125+
CompletableFuture<FieldValueInfo> fieldValueInfoFuture = resolveFieldWithInfo(executionContext, parameters);
126+
127+
fieldValueInfoFuture.whenComplete((fieldValueInfo, throwable) -> {
128+
fieldCtx.onFieldValueInfo(fieldValueInfo);
129+
130+
CompletableFuture<ExecutionResult> execResultFuture = fieldValueInfo.getFieldValue();
131+
execResultFuture.whenComplete(fieldCtx::onCompleted);
132+
Async.copyResults(execResultFuture, result);
133+
});
134+
return result;
135+
};
136+
}
86137
}

src/main/java/graphql/execution/ExecutionPath.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,43 @@ private ExecutionPath(ExecutionPath parent, PathSegment segment) {
4646
pathList = toListImpl();
4747
}
4848

49+
public int getLevel() {
50+
int counter = 0;
51+
ExecutionPath currentPath = this;
52+
while (currentPath != null) {
53+
if (currentPath.segment instanceof StringPathSegment) {
54+
counter++;
55+
}
56+
currentPath = currentPath.parent;
57+
}
58+
return counter;
59+
}
60+
61+
public ExecutionPath getPathWithoutListEnd() {
62+
if(this == ROOT_PATH) {
63+
return ROOT_PATH;
64+
}
65+
if (segment instanceof StringPathSegment) {
66+
return this;
67+
}
68+
return parent;
69+
}
70+
71+
public String getSegmentName() {
72+
if (segment instanceof StringPathSegment) {
73+
return ((StringPathSegment) segment).getValue();
74+
} else {
75+
if (parent == null) {
76+
return null;
77+
}
78+
return ((StringPathSegment) parent.segment).getValue();
79+
}
80+
}
81+
4982
/**
5083
* Parses an execution path from the provided path string in the format /segment1/segment2[index]/segmentN
5184
*
5285
* @param pathString the path string
53-
*
5486
* @return a parsed execution path
5587
*/
5688
public static ExecutionPath parse(String pathString) {
@@ -79,7 +111,6 @@ public static ExecutionPath parse(String pathString) {
79111
* This will create an execution path from the list of objects
80112
*
81113
* @param objects the path objects
82-
*
83114
* @return a new execution path
84115
*/
85116
public static ExecutionPath fromList(List<?> objects) {
@@ -103,7 +134,6 @@ private static String mkErrMsg() {
103134
* Takes the current path and adds a new segment to it, returning a new path
104135
*
105136
* @param segment the string path segment to add
106-
*
107137
* @return a new path containing that segment
108138
*/
109139
public ExecutionPath segment(String segment) {
@@ -114,7 +144,6 @@ public ExecutionPath segment(String segment) {
114144
* Takes the current path and adds a new segment to it, returning a new path
115145
*
116146
* @param segment the int path segment to add
117-
*
118147
* @return a new path containing that segment
119148
*/
120149
public ExecutionPath segment(int segment) {
@@ -125,7 +154,7 @@ public ExecutionPath segment(int segment) {
125154
* @return converts the path into a list of segments
126155
*/
127156
public List<Object> toList() {
128-
return pathList;
157+
return new ArrayList<>(pathList);
129158
}
130159

131160
private List<Object> toListImpl() {

0 commit comments

Comments
 (0)