Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions src/main/java/graphql/ExecutionInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CompletableFuture;

import java.util.function.Consumer;

import static graphql.Assert.assertNotNull;
Expand All @@ -34,7 +35,7 @@ public class ExecutionInput {
private final DataLoaderRegistry dataLoaderRegistry;
private final ExecutionId executionId;
private final Locale locale;
private final AtomicBoolean cancelled;
private final CompletableFuture<Void> cancellationFuture;
private final boolean profileExecution;

/**
Expand All @@ -60,7 +61,7 @@ private ExecutionInput(Builder builder) {
this.locale = builder.locale != null ? builder.locale : Locale.getDefault(); // always have a locale in place
this.localContext = builder.localContext;
this.extensions = builder.extensions;
this.cancelled = builder.cancelled;
this.cancellationFuture = builder.cancellationFuture;
this.profileExecution = builder.profileExecution;
}

Expand Down Expand Up @@ -211,15 +212,26 @@ public Map<String, Object> getExtensions() {
* @return true if the execution should be cancelled
*/
public boolean isCancelled() {
return cancelled.get();
return cancellationFuture.isDone();
}

/**
* This can be called to cancel the graphql execution. Remember this is a cooperative cancellation
* and the graphql engine needs to be running on a thread to allow is to respect this flag.
*/
public void cancel() {
cancelled.set(true);
cancellationFuture.complete(null);
}

/**
* Returns a {@link CompletableFuture} that completes when {@link #cancel()} is called.
* This allows async code to race against cancellation without polling.
*
* @return a future that completes (with null) when this execution is cancelled
*/
@Internal
public CompletableFuture<Void> getCancellationFuture() {
return cancellationFuture;
}


Expand All @@ -241,7 +253,7 @@ public ExecutionInput transform(Consumer<Builder> builderConsumer) {
.operationName(this.operationName)
.context(this.context)
.internalTransferContext(this.graphQLContext)
.internalTransferCancelBoolean(this.cancelled)
.internalTransferCancellationFuture(this.cancellationFuture)
.localContext(this.localContext)
.root(this.root)
.dataLoaderRegistry(this.dataLoaderRegistry)
Expand Down Expand Up @@ -306,7 +318,7 @@ public static class Builder {
private DataLoaderRegistry dataLoaderRegistry = EMPTY_DATALOADER_REGISTRY;
private Locale locale = Locale.getDefault();
private ExecutionId executionId;
private AtomicBoolean cancelled = new AtomicBoolean(false);
private CompletableFuture<Void> cancellationFuture = new CompletableFuture<>();
private boolean profileExecution;

/**
Expand Down Expand Up @@ -412,9 +424,8 @@ private Builder internalTransferContext(GraphQLContext graphQLContext) {
return this;
}

// hidden on purpose
private Builder internalTransferCancelBoolean(AtomicBoolean cancelled) {
this.cancelled = cancelled;
private Builder internalTransferCancellationFuture(CompletableFuture<Void> cancellationFuture) {
this.cancellationFuture = cancellationFuture;
return this;
}

Expand Down
57 changes: 57 additions & 0 deletions src/main/java/graphql/GraphQLUnusualConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@ public ResponseMapFactoryConfig responseMapFactory() {
return new ResponseMapFactoryConfig(this);
}

/**
* @return an element that allows you to control cancellation behavior
*/
@ExperimentalApi
public CancellationConfig cancellation() {
return new CancellationConfig(this);
}

private void put(String named, Object value) {
if (graphQLContext != null) {
graphQLContext.put(named, value);
Expand Down Expand Up @@ -410,4 +418,53 @@ public ResponseMapFactoryConfig setFactory(ResponseMapFactory factory) {
return this;
}
}

public static class CancellationConfig extends BaseContextConfig {

/**
* The context key used to enable capturing partial results when an execution is cancelled.
*/
@ExperimentalApi
public static final String CAPTURE_PARTIAL_RESULTS_ON_CANCEL = "graphql.capturePartialResultsOnCancel";

/**
* The context key used to store the cancellation {@link java.util.concurrent.CompletableFuture}
* that completes when {@link ExecutionInput#cancel()} is called.
* This is only set when {@link #CAPTURE_PARTIAL_RESULTS_ON_CANCEL} is enabled.
*/
@Internal
public static final String CANCELLATION_FUTURE_KEY = CAPTURE_PARTIAL_RESULTS_ON_CANCEL + ".cancelFuture";

private CancellationConfig(GraphQLContextConfiguration contextConfig) {
super(contextConfig);
}

/**
* Returns true if partial results should be captured when the execution is cancelled via
* {@link ExecutionInput#cancel()}.
*
* @return true if partial results capture on cancel is enabled
*/
@ExperimentalApi
public boolean isCapturePartialResultsOnCancelEnabled() {
return contextConfig.getBoolean(CAPTURE_PARTIAL_RESULTS_ON_CANCEL);
}

/**
* When enabled, if {@link ExecutionInput#cancel()} is called during execution, the engine will
* return the partial results of any fields that have already completed, along with an error
* indicating the execution was cancelled.
* <p>
* By default this is false and cancellation returns only the cancellation error with null data.
*
* @param enable true to enable capturing partial results on cancel
*
* @return this config object for chaining
*/
@ExperimentalApi
public CancellationConfig capturePartialResultsOnCancel(boolean enable) {
contextConfig.put(CAPTURE_PARTIAL_RESULTS_ON_CANCEL, enable);
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import graphql.ExecutionResult;
import graphql.ExecutionResultImpl;
import graphql.GraphQLContext;
import graphql.PublicSpi;

import java.util.concurrent.CompletionException;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -29,8 +32,47 @@ protected BiConsumer<List<Object>, Throwable> handleResults(ExecutionContext exe
return;
}

Map<String, Object> resolvedValuesByField = executionContext.getResponseMapFactory().createInsertionOrdered(fieldNames, results);
overallResult.complete(new ExecutionResultImpl(resolvedValuesByField, executionContext.getErrors()));
completeResultFuture(overallResult, executionContext, fieldNames, results);
};
}

protected BiConsumer<List<Object>, Throwable> handleResultsWithPartialData(ExecutionContext executionContext, List<String> fieldNames, CompletableFuture<ExecutionResult> overallResult) {
return (List<Object> results, Throwable exception) -> {
// when partial results on cancel is enabled the results list will already have partial data
// (already-completed fields) so we can build a partial response even if exception is set
if (exception != null) {
Throwable cause = exception instanceof CompletionException ? exception.getCause() : exception;
if (cause instanceof AbortExecutionException && results != null
&& capturePartialResults(executionContext)) {
executionContext.addError((AbortExecutionException) cause);
completeResultFuture(overallResult, executionContext, fieldNames, results);
return;
}
handleNonNullException(executionContext, overallResult, exception);
return;
}

// check if cancel fired while results were being gathered (no exception, but cancelled)
Throwable cancelException = executionContext.possibleCancellation(null);
if (cancelException != null) {
Throwable cancelCause = cancelException instanceof CompletionException ? cancelException.getCause() : cancelException;
if (cancelCause instanceof AbortExecutionException && results != null
&& capturePartialResults(executionContext)) {
// we have partial data from already-completed CFs — use it
executionContext.addError((AbortExecutionException) cancelCause);
completeResultFuture(overallResult, executionContext, fieldNames, results);
} else {
handleNonNullException(executionContext, overallResult, cancelException);
}
return;
}

completeResultFuture(overallResult, executionContext, fieldNames, results);
};
}

protected void completeResultFuture(CompletableFuture<ExecutionResult> overallResult, ExecutionContext executionContext, List<String> fieldNames, List<Object> results) {
Map<String, Object> resolvedValuesByField = executionContext.getResponseMapFactory().createInsertionOrdered(fieldNames, results);
overallResult.complete(new ExecutionResultImpl(resolvedValuesByField, executionContext.getErrors()));
}
}
120 changes: 120 additions & 0 deletions src/main/java/graphql/execution/Async.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package graphql.execution;

import graphql.Assert;
import graphql.GraphQLContext;
import graphql.Internal;
import graphql.GraphQLUnusualConfiguration;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

Expand All @@ -21,6 +23,8 @@
import java.util.stream.Collectors;

import static graphql.Assert.assertTrue;
import static graphql.GraphQLUnusualConfiguration.CancellationConfig.CANCELLATION_FUTURE_KEY;
import static graphql.GraphQLUnusualConfiguration.CancellationConfig.CAPTURE_PARTIAL_RESULTS_ON_CANCEL;
import static java.util.stream.Collectors.toList;

@Internal
Expand Down Expand Up @@ -57,6 +61,17 @@ public interface CombinedBuilder<T> {
*/
CompletableFuture<List<T>> await();

/**
* Like {@link #await()} but when {@link GraphQLUnusualConfiguration.CancellationConfig#CAPTURE_PARTIAL_RESULTS_ON_CANCEL}
* is enabled in the context and an {@link AbortExecutionException} causes a CF to fail, the already-completed
* CFs will have their values harvested and returned as partial results rather than completing exceptionally.
*
* @param graphQLContext the context to check for the partial results flag
*
* @return a CompletableFuture to a List of values (possibly partial on cancellation)
*/
CompletableFuture<List<T>> await(GraphQLContext graphQLContext);

/**
* This will return a {@code CompletableFuture<List<T>>} if ANY of the input values are async
* otherwise it just return a materialised {@code List<T>}
Expand Down Expand Up @@ -104,6 +119,11 @@ public CompletableFuture<List<T>> await() {
return typedEmpty();
}

@Override
public CompletableFuture<List<T>> await(GraphQLContext graphQLContext) {
return await();
}

@Override
public Object awaitPolymorphic() {
Assert.assertTrue(ix == 0, () -> "expected size was " + 0 + " got " + ix);
Expand Down Expand Up @@ -149,6 +169,11 @@ public CompletableFuture<List<T>> await() {
return CompletableFuture.completedFuture(Collections.singletonList((T) value));
}

@Override
public CompletableFuture<List<T>> await(GraphQLContext graphQLContext) {
return await();
}

@Override
public Object awaitPolymorphic() {
commonSizeAssert();
Expand Down Expand Up @@ -232,6 +257,101 @@ public CompletableFuture<List<T>> await() {
return overallResult;
}

@SuppressWarnings("unchecked")
@Override
public CompletableFuture<List<T>> await(GraphQLContext graphQLContext) {
commonSizeAssert();
if (cfCount == 0) {
return CompletableFuture.completedFuture(materialisedList(array));
}
if (!graphQLContext.getBoolean(CAPTURE_PARTIAL_RESULTS_ON_CANCEL)) {
return await();
}

CompletableFuture<Void> cancellationFuture = graphQLContext.get(CANCELLATION_FUTURE_KEY);
if (cancellationFuture == null) {
return await();
}

CompletableFuture<List<T>> overallResult = new CompletableFuture<>();
CompletableFuture<T>[] cfsArr = copyOnlyCFsToArray();
CompletableFuture<Void> allOf = CompletableFuture.allOf(cfsArr);

// race: either all CFs complete normally, or cancellation fires first
CompletableFuture.anyOf(allOf, cancellationFuture)
.whenComplete((ignored, exception) -> {
if (overallResult.isDone()) {
return;
}
if (exception != null) {
// a CF failed — not our abort path, propagate
overallResult.completeExceptionally(exception);
return;
}
if (!allOf.isDone()) {
// cancellation fired before allOf: harvest already-completed CFs
List<T> partialResults = harvestPartialResults(array);
overallResult.complete(partialResults);
return;
}
// allOf finished normally: collect all results
overallResult.complete(collectAllResults(array, cfsArr));
});

// also handle when allOf completes (either normally or exceptionally) after the race
allOf.whenComplete((ignored, exception) -> {
if (overallResult.isDone()) {
return;
}
if (exception != null) {
// a CF in allOf failed — propagate
overallResult.completeExceptionally(exception);
return;
}
overallResult.complete(collectAllResults(array, cfsArr));
});

return overallResult;
}

@SuppressWarnings("unchecked")
private List<T> harvestPartialResults(Object[] array) {
List<T> partialResults = new ArrayList<>(array.length);
for (Object object : array) {
if (object instanceof CompletableFuture) {
CompletableFuture<T> cf = (CompletableFuture<T>) object;
if (cf.isDone() && !cf.isCompletedExceptionally()) {
partialResults.add(cf.join());
} else {
partialResults.add(null);
}
} else {
partialResults.add((T) object);
}
}
return partialResults;
}

@SuppressWarnings("unchecked")
private List<T> collectAllResults(Object[] array, CompletableFuture<T>[] cfsArr) {
List<T> results = new ArrayList<>(array.length);
if (cfsArr.length == array.length) {
for (CompletableFuture<T> cf : cfsArr) {
results.add(cf.join());
}
} else {
for (Object object : array) {
if (object instanceof CompletableFuture) {
CompletableFuture<T> cf = (CompletableFuture<T>) object;
results.add(cf.join());
} else {
results.add((T) object);
}
}
}
return results;
}

@SuppressWarnings("unchecked")
@NonNull
private CompletableFuture<T>[] copyOnlyCFsToArray() {
Expand Down
Loading
Loading