Skip to content

Commit 0140398

Browse files
authored
improve async handling (graphql-java#613)
refactoring async CF handling make completeValueForList non blocking
1 parent ef3de64 commit 0140398

File tree

9 files changed

+241
-139
lines changed

9 files changed

+241
-139
lines changed

src/main/java/graphql/GraphQL.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.List;
2828
import java.util.Map;
2929
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.CompletionException;
3031
import java.util.function.UnaryOperator;
3132

3233
import static graphql.Assert.assertNotNull;
@@ -324,7 +325,15 @@ public ExecutionResult execute(UnaryOperator<ExecutionInput.Builder> builderFunc
324325
* @return an {@link ExecutionResult} which can include errors
325326
*/
326327
public ExecutionResult execute(ExecutionInput executionInput) {
327-
return executeAsync(executionInput).join();
328+
try {
329+
return executeAsync(executionInput).join();
330+
} catch (CompletionException e) {
331+
if (e.getCause() instanceof RuntimeException) {
332+
throw (RuntimeException) e.getCause();
333+
} else {
334+
throw e;
335+
}
336+
}
328337
}
329338

330339
/**

src/main/java/graphql/execution/AbstractAsyncExecutionStrategy.java

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import java.util.List;
88
import java.util.Map;
99
import java.util.concurrent.CompletableFuture;
10-
import java.util.concurrent.CompletionException;
10+
import java.util.function.BiConsumer;
1111

1212

1313
public abstract class AbstractAsyncExecutionStrategy extends ExecutionStrategy {
@@ -19,30 +19,20 @@ public AbstractAsyncExecutionStrategy(DataFetcherExceptionHandler dataFetcherExc
1919
super(dataFetcherExceptionHandler);
2020
}
2121

22-
protected void handleException(ExecutionContext executionContext, CompletableFuture<ExecutionResult> result, Throwable e) {
23-
if (e instanceof CompletionException && e.getCause() instanceof NonNullableFieldWasNullException) {
24-
assertNonNullFieldPrecondition((NonNullableFieldWasNullException) e.getCause(), result);
25-
if (!result.isDone()) {
26-
result.complete(new ExecutionResultImpl(null, executionContext.getErrors()));
22+
protected BiConsumer<List<ExecutionResult>, Throwable> handleResults(ExecutionContext executionContext, List<String> fieldNames, CompletableFuture<ExecutionResult> overallResult) {
23+
return (List<ExecutionResult> results, Throwable exception) -> {
24+
if (exception != null) {
25+
handleNonNullException(executionContext, overallResult, exception);
26+
return;
2727
}
28-
} else {
29-
result.completeExceptionally(e);
30-
}
31-
}
28+
Map<String, Object> resolvedValuesByField = new LinkedHashMap<>();
29+
int ix = 0;
30+
for (ExecutionResult executionResult : results) {
3231

33-
protected void completeCompletableFuture(ExecutionContext executionContext, List<String> fieldNames, List<CompletableFuture<ExecutionResult>> futures, CompletableFuture<ExecutionResult> result) {
34-
Map<String, Object> resolvedValuesByField = new LinkedHashMap<>();
35-
int ix = 0;
36-
for (CompletableFuture<ExecutionResult> future : futures) {
37-
38-
if (future.isCompletedExceptionally()) {
39-
future.whenComplete((Null, e) -> handleException(executionContext, result, e));
40-
return;
32+
String fieldName = fieldNames.get(ix++);
33+
resolvedValuesByField.put(fieldName, executionResult.getData());
4134
}
42-
String fieldName = fieldNames.get(ix++);
43-
ExecutionResult resolvedResult = future.join();
44-
resolvedValuesByField.put(fieldName, resolvedResult.getData());
45-
}
46-
result.complete(new ExecutionResultImpl(resolvedValuesByField, executionContext.getErrors()));
35+
overallResult.complete(new ExecutionResultImpl(resolvedValuesByField, executionContext.getErrors()));
36+
};
4737
}
4838
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package graphql.execution;
2+
3+
import graphql.Assert;
4+
import graphql.Internal;
5+
6+
import java.util.ArrayList;
7+
import java.util.Iterator;
8+
import java.util.List;
9+
import java.util.concurrent.CompletableFuture;
10+
11+
@Internal
12+
public class Async {
13+
14+
@FunctionalInterface
15+
public interface CFFactory<T, U> {
16+
CompletableFuture<U> apply(T input, int index, List<U> previousResults);
17+
}
18+
19+
public static <U> CompletableFuture<List<U>> each(List<CompletableFuture<U>> futures) {
20+
CompletableFuture<List<U>> overallResult = new CompletableFuture<>();
21+
CompletableFuture
22+
.allOf(futures.toArray(new CompletableFuture[futures.size()]))
23+
.whenComplete((noUsed, exception) -> {
24+
if (exception != null) {
25+
overallResult.completeExceptionally(exception);
26+
return;
27+
}
28+
List<U> results = new ArrayList<>();
29+
for (CompletableFuture<U> future : futures) {
30+
results.add(future.join());
31+
}
32+
overallResult.complete(results);
33+
});
34+
return overallResult;
35+
}
36+
37+
38+
public static <T, U> CompletableFuture<List<U>> eachSequentially(Iterable<T> list, CFFactory<T, U> cfFactory) {
39+
CompletableFuture<List<U>> result = new CompletableFuture<>();
40+
eachSequentiallyImpl(list.iterator(), cfFactory, 0, new ArrayList<>(), result);
41+
return result;
42+
}
43+
44+
private static <T, U> void eachSequentiallyImpl(Iterator<T> iterator, CFFactory<T, U> cfFactory, int index, List<U> tmpResult, CompletableFuture<List<U>> overallResult) {
45+
if (!iterator.hasNext()) {
46+
overallResult.complete(tmpResult);
47+
}
48+
CompletableFuture<U> cf;
49+
try {
50+
cf = cfFactory.apply(iterator.next(), index, tmpResult);
51+
Assert.assertNotNull(cf, "cfFactory must return a non null value");
52+
} catch (Exception e) {
53+
cf = new CompletableFuture<>();
54+
cf.completeExceptionally(e);
55+
}
56+
cf.whenComplete((cfResult, exception) -> {
57+
if (exception != null) {
58+
overallResult.completeExceptionally(exception);
59+
return;
60+
}
61+
tmpResult.add(cfResult);
62+
eachSequentiallyImpl(iterator, cfFactory, index + 1, tmpResult, overallResult);
63+
});
64+
}
65+
}

src/main/java/graphql/execution/AsyncExecutionStrategy.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import java.util.List;
88
import java.util.Map;
99
import java.util.concurrent.CompletableFuture;
10-
import java.util.function.BiConsumer;
1110

1211
/**
1312
* The standard graphql execution strategy that runs fields asynchronously non-blocking.
@@ -47,22 +46,9 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
4746
futures.add(future);
4847
}
4948

50-
CompletableFuture<ExecutionResult> result = new CompletableFuture<>();
51-
CompletableFuture
52-
.allOf(futures.toArray(new CompletableFuture[futures.size()]))
53-
.whenComplete(futuresCompleted(executionContext, fieldNames, futures, result));
54-
55-
return result;
56-
}
57-
58-
private BiConsumer<Void, Throwable> futuresCompleted(ExecutionContext executionContext,
59-
List<String> fieldNames,
60-
List<CompletableFuture<ExecutionResult>> futures,
61-
CompletableFuture<ExecutionResult> result) {
62-
return (notUsed1, notUsed2) -> {
63-
completeCompletableFuture(executionContext, fieldNames, futures, result);
64-
};
49+
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
50+
Async.each(futures).whenComplete(handleResults(executionContext, fieldNames, overallResult));
51+
return overallResult;
6552
}
6653

67-
6854
}
Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package graphql.execution;
22

33
import graphql.ExecutionResult;
4+
import graphql.ExecutionResultImpl;
45
import graphql.language.Field;
56

67
import java.util.ArrayList;
8+
import java.util.LinkedHashMap;
79
import java.util.List;
810
import java.util.Map;
911
import java.util.concurrent.CompletableFuture;
12+
import java.util.function.BiConsumer;
1013

1114
/**
1215
* Async non-blocking execution, but serial: only one field at the the time will be resolved.
@@ -25,35 +28,21 @@ public AsyncSerialExecutionStrategy(DataFetcherExceptionHandler exceptionHandler
2528
@Override
2629
public CompletableFuture<ExecutionResult> execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
2730

28-
CompletableFuture<ExecutionResult> result = new CompletableFuture<>();
29-
resolveNthField(executionContext, parameters, 0, new ArrayList<>(), result);
30-
31-
return result;
32-
}
33-
34-
private void resolveNthField(ExecutionContext executionContext,
35-
ExecutionStrategyParameters parameters,
36-
int index,
37-
List<CompletableFuture<ExecutionResult>> allFutures,
38-
CompletableFuture<ExecutionResult> overallResult) {
3931
Map<String, List<Field>> fields = parameters.fields();
4032
List<String> fieldNames = new ArrayList<>(fields.keySet());
41-
String fieldName = fieldNames.get(index);
42-
43-
List<Field> currentField = fields.get(fieldName);
44-
ExecutionPath fieldPath = parameters.path().segment(fieldName);
45-
ExecutionStrategyParameters newParameters = parameters
46-
.transform(builder -> builder.field(currentField).path(fieldPath));
47-
CompletableFuture<ExecutionResult> future = resolveField(executionContext, newParameters);
48-
future.whenComplete((notUsed1, notUsed2) -> {
49-
allFutures.add(future);
50-
if (index + 1 == fields.size()) {
51-
completeCompletableFuture(executionContext, fieldNames, allFutures, overallResult);
52-
} else {
53-
resolveNthField(executionContext, parameters, index + 1, allFutures, overallResult);
54-
}
33+
34+
CompletableFuture<List<ExecutionResult>> resultsFuture = Async.eachSequentially(fieldNames, (fieldName, index, prevResults) -> {
35+
List<Field> currentField = fields.get(fieldName);
36+
ExecutionPath fieldPath = parameters.path().segment(fieldName);
37+
ExecutionStrategyParameters newParameters = parameters
38+
.transform(builder -> builder.field(currentField).path(fieldPath));
39+
return resolveField(executionContext, newParameters);
5540
});
56-
}
5741

42+
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
43+
BiConsumer<List<ExecutionResult>, Throwable> listThrowableBiConsumer = handleResults(executionContext, fieldNames, overallResult);
44+
resultsFuture.whenComplete(listThrowableBiConsumer);
45+
return overallResult;
46+
}
5847

5948
}

src/main/java/graphql/execution/ExecutionStrategy.java

Lines changed: 33 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package graphql.execution;
22

3-
import graphql.Assert;
43
import graphql.ExecutionResult;
54
import graphql.ExecutionResultImpl;
65
import graphql.GraphQLException;
@@ -486,15 +485,13 @@ private Object handleCoercionProblem(ExecutionContext context, ExecutionStrategy
486485
* @return an {@link ExecutionResult}
487486
*/
488487
protected CompletableFuture<ExecutionResult> completeValueForList(ExecutionContext executionContext, ExecutionStrategyParameters parameters, Iterable<Object> iterableValues) {
489-
List<CompletableFuture<ExecutionResult>> completedValues = new ArrayList<>();
488+
490489
ExecutionTypeInfo typeInfo = parameters.typeInfo();
491490
GraphQLList fieldType = typeInfo.castType(GraphQLList.class);
492491

493-
boolean forceListToBeNull = false;
494-
int idx = 0;
495-
for (Object item : iterableValues) {
492+
CompletableFuture<List<ExecutionResult>> resultsFuture = Async.eachSequentially(iterableValues, (item, index, prevResults) -> {
496493

497-
ExecutionPath indexedPath = parameters.path().segment(idx);
494+
ExecutionPath indexedPath = parameters.path().segment(index);
498495

499496
ExecutionTypeInfo wrappedTypeInfo = ExecutionTypeInfo.newTypeInfo()
500497
.parentInfo(typeInfo)
@@ -513,26 +510,25 @@ protected CompletableFuture<ExecutionResult> completeValueForList(ExecutionConte
513510
.source(item)
514511
.build();
515512

516-
CompletableFuture<ExecutionResult> completedValue;
517-
try {
518-
completedValue = completeValue(executionContext, newParameters);
519-
} catch (NonNullableFieldWasNullException nne) {
520-
assertNonNullFieldPrecondition(nne);
521-
// the list becomes null if the wrapped type is non null but a null list entry is encountered
522-
forceListToBeNull = true;
523-
break;
513+
return completeValue(executionContext, newParameters);
514+
});
515+
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
516+
resultsFuture.whenComplete((results, exception) -> {
517+
if (exception != null) {
518+
handleNonNullException(executionContext, overallResult, exception);
519+
return;
524520
}
525-
completedValues.add(completedValue);
526-
idx++;
527-
}
528-
if (forceListToBeNull) {
529-
return completedFuture(new ExecutionResultImpl(null, executionContext.getErrors()));
530-
} else {
531-
ExecutionResult executionResult = joinAllOf(completedValues);
532-
return completedFuture(executionResult);
533-
}
521+
List<Object> completedResults = new ArrayList<>();
522+
for (ExecutionResult completedValue : results) {
523+
completedResults.add(completedValue.getData());
524+
}
525+
overallResult.complete(new ExecutionResultImpl(completedResults, null));
526+
return;
527+
});
528+
return overallResult;
534529
}
535530

531+
536532
/**
537533
* Called to discover the field definition give the current parameters and the AST {@link Field}
538534
*
@@ -604,29 +600,20 @@ protected void assertNonNullFieldPrecondition(NonNullableFieldWasNullException e
604600
}
605601
}
606602

607-
/**
608-
* This will join all of the promises of a result as one and return a execution result that
609-
* is a list of all the promised values
610-
*
611-
* @param completableFutures the list of futures to wait for to complete
612-
*
613-
* @return a new execution result of all the values in the promises
614-
*
615-
* @throws CompletionException if anything bad happens while waiting
616-
*/
617-
protected ExecutionResult joinAllOf(List<CompletableFuture<ExecutionResult>> completableFutures) throws CompletionException {
618-
CompletableFuture[] stages = completableFutures.toArray(new CompletableFuture[completableFutures.size()]);
619-
620-
CompletableFuture.allOf(stages).join();
621-
// they are all now complete (or an runtime exception has been thrown)
622-
623-
List<Object> completedResults = new ArrayList<>();
624-
completableFutures.forEach(future -> {
625-
ExecutionResult completedValue = future.getNow(null);
626-
Assert.assertNotNull(completedValue, "A null execution result value is not allowed");
627-
completedResults.add(completedValue.getData());
628-
});
629-
return new ExecutionResultImpl(completedResults, null);
603+
protected void handleNonNullException(ExecutionContext executionContext, CompletableFuture<ExecutionResult> result, Throwable e) {
604+
if (e instanceof NonNullableFieldWasNullException) {
605+
assertNonNullFieldPrecondition((NonNullableFieldWasNullException) e, result);
606+
if (!result.isDone()) {
607+
result.complete(new ExecutionResultImpl(null, executionContext.getErrors()));
608+
}
609+
} else if (e instanceof CompletionException && e.getCause() instanceof NonNullableFieldWasNullException) {
610+
assertNonNullFieldPrecondition((NonNullableFieldWasNullException) e.getCause(), result);
611+
if (!result.isDone()) {
612+
result.complete(new ExecutionResultImpl(null, executionContext.getErrors()));
613+
}
614+
} else {
615+
result.completeExceptionally(e);
616+
}
630617
}
631618

632619

src/main/java/graphql/execution/instrumentation/ChainedInstrumentation.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package graphql.execution.instrumentation;
22

33
import graphql.ExecutionResult;
4+
import graphql.execution.Async;
45
import graphql.execution.instrumentation.parameters.InstrumentationDataFetchParameters;
56
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters;
67
import graphql.execution.instrumentation.parameters.InstrumentationFieldFetchParameters;
@@ -118,29 +119,12 @@ public DataFetcher<?> instrumentDataFetcher(DataFetcher<?> dataFetcher, Instrume
118119

119120
@Override
120121
public CompletableFuture<ExecutionResult> instrumentExecutionResult(ExecutionResult executionResult, InstrumentationExecutionParameters parameters) {
121-
CompletableFuture<ExecutionResult> result = new CompletableFuture<>();
122-
123-
chainNthResult(0, executionResult, parameters, result);
124-
return result;
125-
}
126-
127-
private void chainNthResult(int index, ExecutionResult lastResult, InstrumentationExecutionParameters parameters, CompletableFuture<ExecutionResult> overallResult) {
128-
if (index == instrumentations.size()) {
129-
overallResult.complete(lastResult);
130-
return;
131-
}
132-
Instrumentation instrumentation = instrumentations.get(index);
133-
InstrumentationState state = getState(instrumentation, parameters.getInstrumentationState());
134-
CompletableFuture<ExecutionResult> chainedResultFuture = instrumentation.instrumentExecutionResult(lastResult, parameters.withNewState(state));
135-
136-
chainedResultFuture.whenComplete((newResult, exception) -> {
137-
if (exception != null) {
138-
overallResult.completeExceptionally(exception);
139-
} else {
140-
chainNthResult(index + 1, newResult, parameters, overallResult);
141-
}
122+
CompletableFuture<List<ExecutionResult>> resultsFuture = Async.eachSequentially(instrumentations, (instrumentation, index, prevResults) -> {
123+
InstrumentationState state = getState(instrumentation, parameters.getInstrumentationState());
124+
ExecutionResult lastResult = prevResults.size() > 0 ? prevResults.get(prevResults.size() - 1) : executionResult;
125+
return instrumentation.instrumentExecutionResult(lastResult, parameters.withNewState(state));
142126
});
143-
127+
return resultsFuture.thenApply((results) -> results.get(results.size() - 1));
144128
}
145129

146130
private static class ChainedInstrumentationState implements InstrumentationState {

0 commit comments

Comments
 (0)