From 6fb6de86076bff42a51f22bb885edfadf1c36c62 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Sun, 25 May 2025 09:52:37 +1000 Subject: [PATCH 1/5] make DataLoader work per Subscription event --- .../java/graphql/execution/Execution.java | 9 +-- .../SubscriptionExecutionStrategy.java | 23 +++++- .../SubscriptionExecutionStrategyTest.groovy | 78 +++++++++++++++++++ 3 files changed, 100 insertions(+), 10 deletions(-) diff --git a/src/main/java/graphql/execution/Execution.java b/src/main/java/graphql/execution/Execution.java index 60447c9996..effec32d66 100644 --- a/src/main/java/graphql/execution/Execution.java +++ b/src/main/java/graphql/execution/Execution.java @@ -13,7 +13,6 @@ import graphql.execution.instrumentation.Instrumentation; import graphql.execution.instrumentation.InstrumentationContext; import graphql.execution.instrumentation.InstrumentationState; -import graphql.execution.instrumentation.dataloader.FallbackDataLoaderDispatchStrategy; import graphql.execution.instrumentation.dataloader.PerLevelDataLoaderDispatchStrategy; import graphql.execution.instrumentation.parameters.InstrumentationExecuteOperationParameters; import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters; @@ -254,11 +253,7 @@ private DataLoaderDispatchStrategy createDataLoaderDispatchStrategy(ExecutionCon if (executionContext.getDataLoaderRegistry() == EMPTY_DATALOADER_REGISTRY || doNotAutomaticallyDispatchDataLoader) { return DataLoaderDispatchStrategy.NO_OP; } - if (!executionContext.isSubscriptionOperation()) { - return new PerLevelDataLoaderDispatchStrategy(executionContext); - } else { - return new FallbackDataLoaderDispatchStrategy(executionContext); - } + return new PerLevelDataLoaderDispatchStrategy(executionContext); } @@ -284,7 +279,7 @@ private ExecutionResult mergeExtensionsBuilderIfPresent(ExecutionResult executio private boolean propagateErrorsOnNonNullContractFailure(List directives) { boolean jvmWideEnabled = Directives.isExperimentalDisableErrorPropagationDirectiveEnabled(); - if (! jvmWideEnabled) { + if (!jvmWideEnabled) { return true; } Directive foundDirective = NodeUtil.findNodeByName(directives, EXPERIMENTAL_DISABLE_ERROR_PROPAGATION_DIRECTIVE_DEFINITION.getName()); diff --git a/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java b/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java index 365e3e3737..1ae862536b 100644 --- a/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java +++ b/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java @@ -4,6 +4,7 @@ import graphql.ExecutionResultImpl; import graphql.GraphQLContext; import graphql.PublicApi; +import graphql.execution.incremental.DeferredCallContext; import graphql.execution.instrumentation.ExecutionStrategyInstrumentationContext; import graphql.execution.instrumentation.Instrumentation; import graphql.execution.instrumentation.InstrumentationContext; @@ -106,7 +107,7 @@ private boolean keepOrdered(GraphQLContext graphQLContext) { */ private CompletableFuture> createSourceEventStream(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { - ExecutionStrategyParameters newParameters = firstFieldOfSubscriptionSelection(executionContext,parameters); + ExecutionStrategyParameters newParameters = firstFieldOfSubscriptionSelection(executionContext, parameters); CompletableFuture fieldFetched = Async.toCompletableFuture(fetchField(executionContext, newParameters)); return fieldFetched.thenApply(fetchedValue -> { @@ -133,6 +134,8 @@ private CompletableFuture> createSourceEventStream(ExecutionCo */ private CompletableFuture executeSubscriptionEvent(ExecutionContext executionContext, ExecutionStrategyParameters parameters, Object eventPayload) { + System.out.println("new event"); + Instrumentation instrumentation = executionContext.getInstrumentation(); ExecutionContext newExecutionContext = executionContext.transform(builder -> builder @@ -148,7 +151,13 @@ private CompletableFuture executeSubscriptionEvent(ExecutionCon )); FetchedValue fetchedValue = unboxPossibleDataFetcherResult(newExecutionContext, parameters, eventPayload); + FieldValueInfo fieldValueInfo = completeField(newExecutionContext, newParameters, fetchedValue); + MergedSelectionSet fields = parameters.getFields(); + MergedField firstField = fields.getSubField(fields.getKeys().get(0)); + //TODO: make it nicer + executionContext.getDataLoaderDispatcherStrategy().fieldFetched(executionContext, newParameters, null, null, null); + executionContext.getDataLoaderDispatcherStrategy().deferredOnFieldValue(firstField.getResultKey(), fieldValueInfo, null, newParameters); CompletableFuture overallResult = fieldValueInfo .getFieldValueFuture() .thenApply(val -> new ExecutionResultImpl(val, newExecutionContext.getErrors())) @@ -185,8 +194,16 @@ private ExecutionStrategyParameters firstFieldOfSubscriptionSelection(ExecutionC ResultPath fieldPath = parameters.getPath().segment(mkNameForPath(firstField.getSingleField())); NonNullableFieldValidator nonNullableFieldValidator = new NonNullableFieldValidator(executionContext); - return parameters.transform(builder -> builder - .field(firstField).path(fieldPath).nonNullFieldValidator(nonNullableFieldValidator)); + + + ExecutionStrategyParameters newParameters = parameters.transform(builder -> builder + .field(firstField) + .path(fieldPath) + .nonNullFieldValidator(nonNullableFieldValidator) + .deferredCallContext(new DeferredCallContext(1, 1)) + ); + return newParameters; + } private ExecutionStepInfo createSubscribedFieldStepInfo(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { diff --git a/src/test/groovy/graphql/execution/SubscriptionExecutionStrategyTest.groovy b/src/test/groovy/graphql/execution/SubscriptionExecutionStrategyTest.groovy index 3d7bb28086..a92a669fd6 100644 --- a/src/test/groovy/graphql/execution/SubscriptionExecutionStrategyTest.groovy +++ b/src/test/groovy/graphql/execution/SubscriptionExecutionStrategyTest.groovy @@ -22,11 +22,15 @@ import graphql.schema.DataFetchingEnvironment import graphql.schema.PropertyDataFetcher import graphql.schema.idl.RuntimeWiring import org.awaitility.Awaitility +import org.dataloader.BatchLoader +import org.dataloader.DataLoaderFactory +import org.dataloader.DataLoaderRegistry import org.reactivestreams.Publisher import spock.lang.Specification import spock.lang.Unroll import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicInteger import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring @@ -721,4 +725,78 @@ class SubscriptionExecutionStrategyTest extends Specification { } } + + def "DataLoader works on each subscription event"() { + given: + def sdl = """ + type Query { + hello: String + } + + type Subscription { + newDogs: [Dog] + } + + type Dog { + name: String + } + """ + + AtomicInteger batchLoaderCalled = new AtomicInteger(0) + BatchLoader batchLoader = { keys -> + println "batchLoader called with keys: $keys" + batchLoaderCalled.incrementAndGet() + assert keys.size() == 2 + CompletableFuture.completedFuture(["Luna", "Skipper"]) + } + def dataLoader = DataLoaderFactory.newDataLoader("dogsNameLoader", batchLoader) + DataLoaderRegistry dataLoaderRegistry = new DataLoaderRegistry() + dataLoaderRegistry.register("dogsNameLoader", dataLoader) + + DataFetcher dogsNameDF = { env -> + println "dogsNameDF called" + env.getDataLoader("dogsNameLoader").load(env.getSource()) + } + DataFetcher newDogsDF = { env -> + println "newDogsDF called" + def dogNames = ["Luna", "Skipper"] + return new ReactiveStreamsObjectPublisher(3, { int index -> + return dogNames.collect { it -> it + "$index" } + }) + } + + def query = '''subscription { + newDogs { + name + } + }''' + def schema = TestUtil.schema(sdl, [ + Subscription: [newDogs: newDogsDF], + Dog : [name: dogsNameDF] + ]) + ExecutionInput ei = ExecutionInput.newExecutionInput() + .query(query) + .dataLoaderRegistry(dataLoaderRegistry) + .build() + def graphQL = GraphQL.newGraphQL(schema) + .build() + + + when: + def executionResult = graphQL.execute(ei) + Publisher msgStream = executionResult.getData() + def capturingSubscriber = new CapturingSubscriber(3) + msgStream.subscribe(capturingSubscriber) + Awaitility.await().untilTrue(capturingSubscriber.isDone()) + + then: + def events = capturingSubscriber.events + events.size() == 3 + batchLoaderCalled.get() == 3 // batchLoader should be called once for each event + + events[0].data == ["newDogs": [[name: "Luna"], [name: "Skipper"]]] + events[1].data == ["newDogs": [[name: "Luna"], [name: "Skipper"]]] + events[2].data == ["newDogs": [[name: "Luna"], [name: "Skipper"]]] + + } } From a3f95a23f4f85cbe6922fc19e1b058ed04486bf1 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Sun, 25 May 2025 09:58:35 +1000 Subject: [PATCH 2/5] make DataLoader work per Subscription event --- .../graphql/execution/DataLoaderDispatchStrategy.java | 5 +++++ .../graphql/execution/SubscriptionExecutionStrategy.java | 4 +--- .../execution/incremental/DeferredCallContext.java | 2 ++ .../dataloader/PerLevelDataLoaderDispatchStrategy.java | 9 +++++++++ 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java index 8799797d1f..de7d692c5f 100644 --- a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java @@ -1,6 +1,7 @@ package graphql.execution; import graphql.Internal; +import graphql.execution.incremental.DeferredCallContext; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; @@ -59,4 +60,8 @@ default void fieldFetched(ExecutionContext executionContext, default DataFetcher modifyDataFetcher(DataFetcher dataFetcher) { return dataFetcher; } + + default void newSubscriptionExecution(FieldValueInfo fieldValueInfo, DeferredCallContext deferredCallContext) { + + } } diff --git a/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java b/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java index 1ae862536b..8ff9a70e1c 100644 --- a/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java +++ b/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java @@ -155,9 +155,7 @@ private CompletableFuture executeSubscriptionEvent(ExecutionCon FieldValueInfo fieldValueInfo = completeField(newExecutionContext, newParameters, fetchedValue); MergedSelectionSet fields = parameters.getFields(); MergedField firstField = fields.getSubField(fields.getKeys().get(0)); - //TODO: make it nicer - executionContext.getDataLoaderDispatcherStrategy().fieldFetched(executionContext, newParameters, null, null, null); - executionContext.getDataLoaderDispatcherStrategy().deferredOnFieldValue(firstField.getResultKey(), fieldValueInfo, null, newParameters); + executionContext.getDataLoaderDispatcherStrategy().newSubscriptionExecution(fieldValueInfo, newParameters.getDeferredCallContext()); CompletableFuture overallResult = fieldValueInfo .getFieldValueFuture() .thenApply(val -> new ExecutionResultImpl(val, newExecutionContext.getErrors())) diff --git a/src/main/java/graphql/execution/incremental/DeferredCallContext.java b/src/main/java/graphql/execution/incremental/DeferredCallContext.java index e7e2ec0658..885c66af4b 100644 --- a/src/main/java/graphql/execution/incremental/DeferredCallContext.java +++ b/src/main/java/graphql/execution/incremental/DeferredCallContext.java @@ -15,6 +15,8 @@ *

* Some behaviours, like error capturing, need to be scoped to a single {@link DeferredFragmentCall}, because each defer payload * contains its own distinct list of errors. + * + * This class is used also by the Subscription execution strategy to maintain a DataLoader dispatching context per event */ @Internal public class DeferredCallContext { diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index e8f198c231..2fd0a0b42a 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -290,6 +290,15 @@ public void executeObject(ExecutionContext executionContext, ExecutionStrategyPa onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, callStack); } + + @Override + public void newSubscriptionExecution(FieldValueInfo fieldValueInfo, DeferredCallContext deferredCallContext) { + CallStack callStack = getCallStack(deferredCallContext); + callStack.increaseFetchCount(1); + callStack.deferredFragmentRootFieldsFetched.add(fieldValueInfo); + onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, 1, callStack); + } + @Override public void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo, Throwable throwable, ExecutionStrategyParameters parameters) { From 37a4913e7ac23e6e84297bb303d8923909c218b0 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Mon, 9 Jun 2025 15:28:56 +1000 Subject: [PATCH 3/5] cleanup --- .../SubscriptionExecutionStrategy.java | 7 +---- .../FallbackDataLoaderDispatchStrategy.java | 31 ------------------- 2 files changed, 1 insertion(+), 37 deletions(-) delete mode 100644 src/main/java/graphql/execution/instrumentation/dataloader/FallbackDataLoaderDispatchStrategy.java diff --git a/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java b/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java index 8ff9a70e1c..432b3dbb09 100644 --- a/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java +++ b/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java @@ -134,7 +134,6 @@ private CompletableFuture> createSourceEventStream(ExecutionCo */ private CompletableFuture executeSubscriptionEvent(ExecutionContext executionContext, ExecutionStrategyParameters parameters, Object eventPayload) { - System.out.println("new event"); Instrumentation instrumentation = executionContext.getInstrumentation(); @@ -151,10 +150,7 @@ private CompletableFuture executeSubscriptionEvent(ExecutionCon )); FetchedValue fetchedValue = unboxPossibleDataFetcherResult(newExecutionContext, parameters, eventPayload); - FieldValueInfo fieldValueInfo = completeField(newExecutionContext, newParameters, fetchedValue); - MergedSelectionSet fields = parameters.getFields(); - MergedField firstField = fields.getSubField(fields.getKeys().get(0)); executionContext.getDataLoaderDispatcherStrategy().newSubscriptionExecution(fieldValueInfo, newParameters.getDeferredCallContext()); CompletableFuture overallResult = fieldValueInfo .getFieldValueFuture() @@ -194,13 +190,12 @@ private ExecutionStrategyParameters firstFieldOfSubscriptionSelection(ExecutionC NonNullableFieldValidator nonNullableFieldValidator = new NonNullableFieldValidator(executionContext); - ExecutionStrategyParameters newParameters = parameters.transform(builder -> builder + return parameters.transform(builder -> builder .field(firstField) .path(fieldPath) .nonNullFieldValidator(nonNullableFieldValidator) .deferredCallContext(new DeferredCallContext(1, 1)) ); - return newParameters; } diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/FallbackDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/FallbackDataLoaderDispatchStrategy.java deleted file mode 100644 index f33657cb63..0000000000 --- a/src/main/java/graphql/execution/instrumentation/dataloader/FallbackDataLoaderDispatchStrategy.java +++ /dev/null @@ -1,31 +0,0 @@ -package graphql.execution.instrumentation.dataloader; - -import graphql.Internal; -import graphql.execution.DataLoaderDispatchStrategy; -import graphql.execution.ExecutionContext; -import graphql.schema.DataFetcher; - - -/** - * Used when we cant guarantee the fields will be counted right: simply dispatch always after each DF. - */ -@Internal -public class FallbackDataLoaderDispatchStrategy implements DataLoaderDispatchStrategy { - - private final ExecutionContext executionContext; - - public FallbackDataLoaderDispatchStrategy(ExecutionContext executionContext) { - this.executionContext = executionContext; - } - - - @Override - public DataFetcher modifyDataFetcher(DataFetcher dataFetcher) { - return (DataFetcher) environment -> { - Object obj = dataFetcher.get(environment); - executionContext.getDataLoaderRegistry().dispatchAll(); - return obj; - }; - - } -} From d9c3abd1f9f507c6f01334b698486e79919dfb3e Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Thu, 12 Jun 2025 13:02:06 +1000 Subject: [PATCH 4/5] dataloader for subscriptions --- .../SubscriptionExecutionStrategy.java | 30 +++++++++++-------- .../incremental/DeferredCallContext.java | 5 ++-- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java b/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java index 432b3dbb09..901ab82ac3 100644 --- a/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java +++ b/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java @@ -107,7 +107,7 @@ private boolean keepOrdered(GraphQLContext graphQLContext) { */ private CompletableFuture> createSourceEventStream(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { - ExecutionStrategyParameters newParameters = firstFieldOfSubscriptionSelection(executionContext, parameters); + ExecutionStrategyParameters newParameters = firstFieldOfSubscriptionSelection(executionContext, parameters, false); CompletableFuture fieldFetched = Async.toCompletableFuture(fetchField(executionContext, newParameters)); return fieldFetched.thenApply(fetchedValue -> { @@ -141,7 +141,7 @@ private CompletableFuture executeSubscriptionEvent(ExecutionCon .root(eventPayload) .resetErrors() ); - ExecutionStrategyParameters newParameters = firstFieldOfSubscriptionSelection(newExecutionContext, parameters); + ExecutionStrategyParameters newParameters = firstFieldOfSubscriptionSelection(newExecutionContext, parameters, true); ExecutionStepInfo subscribedFieldStepInfo = createSubscribedFieldStepInfo(executionContext, newParameters); InstrumentationFieldParameters i13nFieldParameters = new InstrumentationFieldParameters(executionContext, () -> subscribedFieldStepInfo); @@ -149,12 +149,12 @@ private CompletableFuture executeSubscriptionEvent(ExecutionCon i13nFieldParameters, executionContext.getInstrumentationState() )); - FetchedValue fetchedValue = unboxPossibleDataFetcherResult(newExecutionContext, parameters, eventPayload); + FetchedValue fetchedValue = unboxPossibleDataFetcherResult(newExecutionContext, newParameters, eventPayload); FieldValueInfo fieldValueInfo = completeField(newExecutionContext, newParameters, fetchedValue); executionContext.getDataLoaderDispatcherStrategy().newSubscriptionExecution(fieldValueInfo, newParameters.getDeferredCallContext()); CompletableFuture overallResult = fieldValueInfo .getFieldValueFuture() - .thenApply(val -> new ExecutionResultImpl(val, newExecutionContext.getErrors())) + .thenApply(val -> new ExecutionResultImpl(val, newParameters.getDeferredCallContext().getErrors())) .thenApply(executionResult -> wrapWithRootFieldName(newParameters, executionResult)); // dispatch instrumentation so they can know about each subscription event @@ -182,7 +182,9 @@ private String getRootFieldName(ExecutionStrategyParameters parameters) { return rootField.getResultKey(); } - private ExecutionStrategyParameters firstFieldOfSubscriptionSelection(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { + private ExecutionStrategyParameters firstFieldOfSubscriptionSelection(ExecutionContext executionContext, + ExecutionStrategyParameters parameters, + boolean newCallContext) { MergedSelectionSet fields = parameters.getFields(); MergedField firstField = fields.getSubField(fields.getKeys().get(0)); @@ -190,16 +192,20 @@ private ExecutionStrategyParameters firstFieldOfSubscriptionSelection(ExecutionC NonNullableFieldValidator nonNullableFieldValidator = new NonNullableFieldValidator(executionContext); - return parameters.transform(builder -> builder - .field(firstField) - .path(fieldPath) - .nonNullFieldValidator(nonNullableFieldValidator) - .deferredCallContext(new DeferredCallContext(1, 1)) - ); + return parameters.transform(builder -> { + builder + .field(firstField) + .path(fieldPath) + .nonNullFieldValidator(nonNullableFieldValidator); + if (newCallContext) { + builder.deferredCallContext(new DeferredCallContext(1, 1)); + } + }); } - private ExecutionStepInfo createSubscribedFieldStepInfo(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { + private ExecutionStepInfo createSubscribedFieldStepInfo(ExecutionContext + executionContext, ExecutionStrategyParameters parameters) { Field field = parameters.getField().getSingleField(); GraphQLObjectType parentType = (GraphQLObjectType) parameters.getExecutionStepInfo().getUnwrappedNonNullType(); GraphQLFieldDefinition fieldDef = getFieldDef(executionContext.getGraphQLSchema(), parentType, field); diff --git a/src/main/java/graphql/execution/incremental/DeferredCallContext.java b/src/main/java/graphql/execution/incremental/DeferredCallContext.java index 885c66af4b..f511c06722 100644 --- a/src/main/java/graphql/execution/incremental/DeferredCallContext.java +++ b/src/main/java/graphql/execution/incremental/DeferredCallContext.java @@ -4,8 +4,9 @@ import graphql.Internal; import graphql.VisibleForTesting; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; /** * Contains data relevant to the execution of a {@link DeferredFragmentCall}. @@ -24,7 +25,7 @@ public class DeferredCallContext { private final int startLevel; private final int fields; - private final List errors = new CopyOnWriteArrayList<>(); + private final List errors = Collections.synchronizedList(new ArrayList<>()); public DeferredCallContext(int startLevel, int fields) { this.startLevel = startLevel; From cbd4505a56dc9fea9d11eae49acfd65399a95b49 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Thu, 12 Jun 2025 13:07:09 +1000 Subject: [PATCH 5/5] rename DeferredCallContext.java --- .../execution/DataLoaderDispatchStrategy.java | 4 +-- .../ExecutionStrategyParameters.java | 34 +++++++++---------- .../SubscriptionExecutionStrategy.java | 4 +-- ...ntext.java => AlternativeCallContext.java} | 12 +++---- .../incremental/DeferredExecutionSupport.java | 10 +++--- .../incremental/DeferredFragmentCall.java | 8 ++--- .../PerLevelDataLoaderDispatchStrategy.java | 22 ++++++------ .../schema/DataFetchingEnvironmentImpl.java | 22 ++++++------ .../graphql/schema/DataLoaderWithContext.java | 6 ++-- .../incremental/DeferredCallTest.groovy | 6 ++-- .../IncrementalCallStateDeferTest.groovy | 6 ++-- 11 files changed, 66 insertions(+), 68 deletions(-) rename src/main/java/graphql/execution/incremental/{DeferredCallContext.java => AlternativeCallContext.java} (81%) diff --git a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java index de7d692c5f..13d918bdd9 100644 --- a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java @@ -1,7 +1,7 @@ package graphql.execution; import graphql.Internal; -import graphql.execution.incremental.DeferredCallContext; +import graphql.execution.incremental.AlternativeCallContext; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; @@ -61,7 +61,7 @@ default DataFetcher modifyDataFetcher(DataFetcher dataFetcher) { return dataFetcher; } - default void newSubscriptionExecution(FieldValueInfo fieldValueInfo, DeferredCallContext deferredCallContext) { + default void newSubscriptionExecution(FieldValueInfo fieldValueInfo, AlternativeCallContext alternativeCallContext) { } } diff --git a/src/main/java/graphql/execution/ExecutionStrategyParameters.java b/src/main/java/graphql/execution/ExecutionStrategyParameters.java index 87dd7057ae..ecb52d973b 100644 --- a/src/main/java/graphql/execution/ExecutionStrategyParameters.java +++ b/src/main/java/graphql/execution/ExecutionStrategyParameters.java @@ -2,7 +2,7 @@ import graphql.Internal; import graphql.PublicApi; -import graphql.execution.incremental.DeferredCallContext; +import graphql.execution.incremental.AlternativeCallContext; import org.jspecify.annotations.Nullable; import java.util.function.Consumer; @@ -22,7 +22,7 @@ public class ExecutionStrategyParameters { private final ResultPath path; private final MergedField currentField; private final ExecutionStrategyParameters parent; - private final DeferredCallContext deferredCallContext; + private final AlternativeCallContext alternativeCallContext; private ExecutionStrategyParameters(ExecutionStepInfo executionStepInfo, Object source, @@ -32,7 +32,7 @@ private ExecutionStrategyParameters(ExecutionStepInfo executionStepInfo, ResultPath path, MergedField currentField, ExecutionStrategyParameters parent, - DeferredCallContext deferredCallContext) { + AlternativeCallContext alternativeCallContext) { this.executionStepInfo = assertNotNull(executionStepInfo, () -> "executionStepInfo is null"); this.localContext = localContext; @@ -42,7 +42,7 @@ private ExecutionStrategyParameters(ExecutionStepInfo executionStepInfo, this.path = path; this.currentField = currentField; this.parent = parent; - this.deferredCallContext = deferredCallContext; + this.alternativeCallContext = alternativeCallContext; } public ExecutionStepInfo getExecutionStepInfo() { @@ -95,8 +95,8 @@ public ExecutionStrategyParameters getParent() { */ @Nullable @Internal - public DeferredCallContext getDeferredCallContext() { - return deferredCallContext; + public AlternativeCallContext getDeferredCallContext() { + return alternativeCallContext; } /** @@ -105,7 +105,7 @@ public DeferredCallContext getDeferredCallContext() { * @return true if we're in the scope of a deferred call */ public boolean isInDeferredContext() { - return deferredCallContext != null; + return alternativeCallContext != null; } /** @@ -128,7 +128,7 @@ ExecutionStrategyParameters transform(MergedField currentField, path, currentField, parent, - deferredCallContext); + alternativeCallContext); } @Internal @@ -143,7 +143,7 @@ ExecutionStrategyParameters transform(ExecutionStepInfo executionStepInfo, path, currentField, parent, - deferredCallContext); + alternativeCallContext); } @Internal @@ -159,7 +159,7 @@ ExecutionStrategyParameters transform(ExecutionStepInfo executionStepInfo, path, currentField, parent, - deferredCallContext); + alternativeCallContext); } @Internal @@ -174,7 +174,7 @@ ExecutionStrategyParameters transform(ExecutionStepInfo executionStepInfo, path, currentField, parent, - deferredCallContext); + alternativeCallContext); } @Internal @@ -189,7 +189,7 @@ ExecutionStrategyParameters transform(MergedField currentField, path, currentField, parent, - deferredCallContext); + alternativeCallContext); } public ExecutionStrategyParameters transform(Consumer builderConsumer) { @@ -221,7 +221,7 @@ public static class Builder { ResultPath path = ResultPath.rootPath(); MergedField currentField; ExecutionStrategyParameters parent; - DeferredCallContext deferredCallContext; + AlternativeCallContext alternativeCallContext; /** * @see ExecutionStrategyParameters#newParameters() @@ -239,7 +239,7 @@ private Builder(ExecutionStrategyParameters oldParameters) { this.fields = oldParameters.fields; this.nonNullableFieldValidator = oldParameters.nonNullableFieldValidator; this.currentField = oldParameters.currentField; - this.deferredCallContext = oldParameters.deferredCallContext; + this.alternativeCallContext = oldParameters.alternativeCallContext; this.path = oldParameters.path; this.parent = oldParameters.parent; } @@ -289,13 +289,13 @@ public Builder parent(ExecutionStrategyParameters parent) { return this; } - public Builder deferredCallContext(DeferredCallContext deferredCallContext) { - this.deferredCallContext = deferredCallContext; + public Builder deferredCallContext(AlternativeCallContext alternativeCallContext) { + this.alternativeCallContext = alternativeCallContext; return this; } public ExecutionStrategyParameters build() { - return new ExecutionStrategyParameters(executionStepInfo, source, localContext, fields, nonNullableFieldValidator, path, currentField, parent, deferredCallContext); + return new ExecutionStrategyParameters(executionStepInfo, source, localContext, fields, nonNullableFieldValidator, path, currentField, parent, alternativeCallContext); } } } diff --git a/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java b/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java index 901ab82ac3..f0d9e9593a 100644 --- a/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java +++ b/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java @@ -4,7 +4,7 @@ import graphql.ExecutionResultImpl; import graphql.GraphQLContext; import graphql.PublicApi; -import graphql.execution.incremental.DeferredCallContext; +import graphql.execution.incremental.AlternativeCallContext; import graphql.execution.instrumentation.ExecutionStrategyInstrumentationContext; import graphql.execution.instrumentation.Instrumentation; import graphql.execution.instrumentation.InstrumentationContext; @@ -198,7 +198,7 @@ private ExecutionStrategyParameters firstFieldOfSubscriptionSelection(ExecutionC .path(fieldPath) .nonNullFieldValidator(nonNullableFieldValidator); if (newCallContext) { - builder.deferredCallContext(new DeferredCallContext(1, 1)); + builder.deferredCallContext(new AlternativeCallContext(1, 1)); } }); diff --git a/src/main/java/graphql/execution/incremental/DeferredCallContext.java b/src/main/java/graphql/execution/incremental/AlternativeCallContext.java similarity index 81% rename from src/main/java/graphql/execution/incremental/DeferredCallContext.java rename to src/main/java/graphql/execution/incremental/AlternativeCallContext.java index f511c06722..47e956798e 100644 --- a/src/main/java/graphql/execution/incremental/DeferredCallContext.java +++ b/src/main/java/graphql/execution/incremental/AlternativeCallContext.java @@ -9,31 +9,29 @@ import java.util.List; /** - * Contains data relevant to the execution of a {@link DeferredFragmentCall}. + * Contains data relevant to the execution of a {@link DeferredFragmentCall} and Subscription events. *

* The responsibilities of this class are similar to {@link graphql.execution.ExecutionContext}, but restricted to the * execution of a deferred call (instead of the whole GraphQL execution like {@link graphql.execution.ExecutionContext}). *

- * Some behaviours, like error capturing, need to be scoped to a single {@link DeferredFragmentCall}, because each defer payload + * Some behaviours, like error capturing, need to be scoped to a single {@link DeferredFragmentCall} for deferred, because each defer payload * contains its own distinct list of errors. - * - * This class is used also by the Subscription execution strategy to maintain a DataLoader dispatching context per event */ @Internal -public class DeferredCallContext { +public class AlternativeCallContext { private final int startLevel; private final int fields; private final List errors = Collections.synchronizedList(new ArrayList<>()); - public DeferredCallContext(int startLevel, int fields) { + public AlternativeCallContext(int startLevel, int fields) { this.startLevel = startLevel; this.fields = fields; } @VisibleForTesting - public DeferredCallContext() { + public AlternativeCallContext() { this.startLevel = 0; this.fields = 0; } diff --git a/src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java b/src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java index ade6242d24..3055e14c48 100644 --- a/src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java +++ b/src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java @@ -117,26 +117,26 @@ public Set> createCalls() { private DeferredFragmentCall createDeferredFragmentCall(DeferredExecution deferredExecution) { int level = parameters.getPath().getLevel() + 1; - DeferredCallContext deferredCallContext = new DeferredCallContext(level, deferredFields.size()); + AlternativeCallContext alternativeCallContext = new AlternativeCallContext(level, deferredFields.size()); List mergedFields = deferredExecutionToFields.get(deferredExecution); List>> calls = FpKit.arrayListSizedTo(mergedFields); for (MergedField currentField : mergedFields) { - calls.add(this.createResultSupplier(currentField, deferredCallContext)); + calls.add(this.createResultSupplier(currentField, alternativeCallContext)); } return new DeferredFragmentCall( deferredExecution.getLabel(), this.parameters.getPath(), calls, - deferredCallContext + alternativeCallContext ); } private Supplier> createResultSupplier( MergedField currentField, - DeferredCallContext deferredCallContext + AlternativeCallContext alternativeCallContext ) { Map fields = new LinkedHashMap<>(); fields.put(currentField.getResultKey(), currentField); @@ -145,7 +145,7 @@ private Supplier>> calls; - private final DeferredCallContext deferredCallContext; + private final AlternativeCallContext alternativeCallContext; public DeferredFragmentCall( String label, ResultPath path, List>> calls, - DeferredCallContext deferredCallContext + AlternativeCallContext alternativeCallContext ) { this.label = label; this.path = path; this.calls = calls; - this.deferredCallContext = deferredCallContext; + this.alternativeCallContext = alternativeCallContext; } @Override @@ -100,7 +100,7 @@ private DeferPayload handleNonNullableFieldError(DeferPayload result, Throwable } private DeferPayload transformToDeferredPayload(List fieldWithExecutionResults) { - List errorsEncountered = deferredCallContext.getErrors(); + List errorsEncountered = alternativeCallContext.getErrors(); Map dataMap = new HashMap<>(); diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index 2198dff8a6..61b0914d31 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -7,7 +7,7 @@ import graphql.execution.ExecutionContext; import graphql.execution.ExecutionStrategyParameters; import graphql.execution.FieldValueInfo; -import graphql.execution.incremental.DeferredCallContext; +import graphql.execution.incremental.AlternativeCallContext; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; import graphql.util.InterThreadMemoizedSupplier; @@ -48,7 +48,7 @@ public class PerLevelDataLoaderDispatchStrategy implements DataLoaderDispatchStr static final long DEFAULT_BATCH_WINDOW_NANO_SECONDS_DEFAULT = 500_000L; - private final Map deferredCallStackMap = new ConcurrentHashMap<>(); + private final Map deferredCallStackMap = new ConcurrentHashMap<>(); private static class CallStack { @@ -253,14 +253,14 @@ private CallStack getCallStack(ExecutionStrategyParameters parameters) { return getCallStack(parameters.getDeferredCallContext()); } - private CallStack getCallStack(@Nullable DeferredCallContext deferredCallContext) { - if (deferredCallContext == null) { + private CallStack getCallStack(@Nullable AlternativeCallContext alternativeCallContext) { + if (alternativeCallContext == null) { return this.initialCallStack; } else { - return deferredCallStackMap.computeIfAbsent(deferredCallContext, k -> { + return deferredCallStackMap.computeIfAbsent(alternativeCallContext, k -> { CallStack callStack = new CallStack(); - int startLevel = deferredCallContext.getStartLevel(); - int fields = deferredCallContext.getFields(); + int startLevel = alternativeCallContext.getStartLevel(); + int fields = alternativeCallContext.getFields(); callStack.lock.runLocked(() -> { // we make sure that startLevel-1 is considered done callStack.expectedExecuteObjectCallsPerLevel.set(0, 0); // set to 1 in the constructor of CallStack @@ -292,8 +292,8 @@ public void executeObject(ExecutionContext executionContext, ExecutionStrategyPa @Override - public void newSubscriptionExecution(FieldValueInfo fieldValueInfo, DeferredCallContext deferredCallContext) { - CallStack callStack = getCallStack(deferredCallContext); + public void newSubscriptionExecution(FieldValueInfo fieldValueInfo, AlternativeCallContext alternativeCallContext) { + CallStack callStack = getCallStack(alternativeCallContext); callStack.increaseFetchCount(1); callStack.deferredFragmentRootFieldsFetched.add(fieldValueInfo); onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, 1, callStack); @@ -541,12 +541,12 @@ public void dispatchDLCFImpl(Set resultPathsToDispatch, Integer level, C public void newDataLoaderLoadCall(String resultPath, int level, DataLoader dataLoader, String - dataLoaderName, Object key, @Nullable DeferredCallContext deferredCallContext) { + dataLoaderName, Object key, @Nullable AlternativeCallContext alternativeCallContext) { if (!enableDataLoaderChaining) { return; } ResultPathWithDataLoader resultPathWithDataLoader = new ResultPathWithDataLoader(resultPath, level, dataLoader, dataLoaderName, key); - CallStack callStack = getCallStack(deferredCallContext); + CallStack callStack = getCallStack(alternativeCallContext); boolean levelFinished = callStack.lock.callLocked(() -> { boolean finished = callStack.dispatchingFinishedPerLevel.contains(level); callStack.allResultPathWithDataLoader.add(resultPathWithDataLoader); diff --git a/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java b/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java index 39c1c4b1c5..23a31f4c44 100644 --- a/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java +++ b/src/main/java/graphql/schema/DataFetchingEnvironmentImpl.java @@ -13,7 +13,7 @@ import graphql.execution.ExecutionStepInfo; import graphql.execution.MergedField; import graphql.execution.directives.QueryDirectives; -import graphql.execution.incremental.DeferredCallContext; +import graphql.execution.incremental.AlternativeCallContext; import graphql.execution.instrumentation.dataloader.DataLoaderDispatchingContextKeys; import graphql.language.Document; import graphql.language.Field; @@ -81,7 +81,7 @@ private DataFetchingEnvironmentImpl(Builder builder) { this.queryDirectives = builder.queryDirectives; // internal state - this.dfeInternalState = new DFEInternalState(builder.dataLoaderDispatchStrategy, builder.deferredCallContext); + this.dfeInternalState = new DFEInternalState(builder.dataLoaderDispatchStrategy, builder.alternativeCallContext); } /** @@ -287,7 +287,7 @@ public static class Builder { private ImmutableMapWithNullValues variables; private QueryDirectives queryDirectives; private DataLoaderDispatchStrategy dataLoaderDispatchStrategy; - private DeferredCallContext deferredCallContext; + private AlternativeCallContext alternativeCallContext; public Builder(DataFetchingEnvironmentImpl env) { this.source = env.source; @@ -312,7 +312,7 @@ public Builder(DataFetchingEnvironmentImpl env) { this.variables = env.variables; this.queryDirectives = env.queryDirectives; this.dataLoaderDispatchStrategy = env.dfeInternalState.dataLoaderDispatchStrategy; - this.deferredCallContext = env.dfeInternalState.deferredCallContext; + this.alternativeCallContext = env.dfeInternalState.alternativeCallContext; } public Builder() { @@ -432,8 +432,8 @@ public Builder queryDirectives(QueryDirectives queryDirectives) { return this; } - public Builder deferredCallContext(DeferredCallContext deferredCallContext) { - this.deferredCallContext = deferredCallContext; + public Builder deferredCallContext(AlternativeCallContext alternativeCallContext) { + this.alternativeCallContext = alternativeCallContext; return this; } @@ -450,19 +450,19 @@ public Builder dataLoaderDispatchStrategy(DataLoaderDispatchStrategy dataLoaderD @Internal public static class DFEInternalState { final DataLoaderDispatchStrategy dataLoaderDispatchStrategy; - final DeferredCallContext deferredCallContext; + final AlternativeCallContext alternativeCallContext; - public DFEInternalState(DataLoaderDispatchStrategy dataLoaderDispatchStrategy, DeferredCallContext deferredCallContext) { + public DFEInternalState(DataLoaderDispatchStrategy dataLoaderDispatchStrategy, AlternativeCallContext alternativeCallContext) { this.dataLoaderDispatchStrategy = dataLoaderDispatchStrategy; - this.deferredCallContext = deferredCallContext; + this.alternativeCallContext = alternativeCallContext; } public DataLoaderDispatchStrategy getDataLoaderDispatchStrategy() { return dataLoaderDispatchStrategy; } - public DeferredCallContext getDeferredCallContext() { - return deferredCallContext; + public AlternativeCallContext getDeferredCallContext() { + return alternativeCallContext; } } } diff --git a/src/main/java/graphql/schema/DataLoaderWithContext.java b/src/main/java/graphql/schema/DataLoaderWithContext.java index 972a53d27b..a9086e334a 100644 --- a/src/main/java/graphql/schema/DataLoaderWithContext.java +++ b/src/main/java/graphql/schema/DataLoaderWithContext.java @@ -1,7 +1,7 @@ package graphql.schema; import graphql.Internal; -import graphql.execution.incremental.DeferredCallContext; +import graphql.execution.incremental.AlternativeCallContext; import graphql.execution.instrumentation.dataloader.PerLevelDataLoaderDispatchStrategy; import org.dataloader.DataLoader; import org.dataloader.DelegatingDataLoader; @@ -31,10 +31,10 @@ public CompletableFuture load(@NonNull K key, @Nullable Object keyContext) { DataFetchingEnvironmentImpl dfeImpl = (DataFetchingEnvironmentImpl) dfe; DataFetchingEnvironmentImpl.DFEInternalState dfeInternalState = (DataFetchingEnvironmentImpl.DFEInternalState) dfeImpl.toInternal(); if (dfeInternalState.getDataLoaderDispatchStrategy() instanceof PerLevelDataLoaderDispatchStrategy) { - DeferredCallContext deferredCallContext = dfeInternalState.getDeferredCallContext(); + AlternativeCallContext alternativeCallContext = dfeInternalState.getDeferredCallContext(); int level = dfe.getExecutionStepInfo().getPath().getLevel(); String path = dfe.getExecutionStepInfo().getPath().toString(); - ((PerLevelDataLoaderDispatchStrategy) dfeInternalState.dataLoaderDispatchStrategy).newDataLoaderLoadCall(path, level, delegate, dataLoaderName, key, deferredCallContext); + ((PerLevelDataLoaderDispatchStrategy) dfeInternalState.dataLoaderDispatchStrategy).newDataLoaderLoadCall(path, level, delegate, dataLoaderName, key, alternativeCallContext); } return result; } diff --git a/src/test/groovy/graphql/execution/incremental/DeferredCallTest.groovy b/src/test/groovy/graphql/execution/incremental/DeferredCallTest.groovy index 77da81298a..d8a011b8c4 100644 --- a/src/test/groovy/graphql/execution/incremental/DeferredCallTest.groovy +++ b/src/test/groovy/graphql/execution/incremental/DeferredCallTest.groovy @@ -17,7 +17,7 @@ class DeferredCallTest extends Specification { def "test call capture gives a CF"() { given: DeferredFragmentCall call = new DeferredFragmentCall("my-label", parse("/path"), - [createResolvedFieldCall("field", "some data")], new DeferredCallContext()) + [createResolvedFieldCall("field", "some data")], new AlternativeCallContext()) when: def future = call.invoke() @@ -37,7 +37,7 @@ class DeferredCallTest extends Specification { createResolvedFieldCall("field2", "some data 2"), createResolvedFieldCall("field3", "some data 3") ], - new DeferredCallContext() + new AlternativeCallContext() ) when: @@ -52,7 +52,7 @@ class DeferredCallTest extends Specification { def "can handle non-nullable field error"() { given: - def deferredCallContext = new DeferredCallContext() + def deferredCallContext = new AlternativeCallContext() def mockedException = Mock(NonNullableFieldWasNullException) { getMessage() >> "Field value can't be null" getPath() >> ResultPath.parse("/path") diff --git a/src/test/groovy/graphql/execution/incremental/IncrementalCallStateDeferTest.groovy b/src/test/groovy/graphql/execution/incremental/IncrementalCallStateDeferTest.groovy index d99b49fae4..db9085274f 100644 --- a/src/test/groovy/graphql/execution/incremental/IncrementalCallStateDeferTest.groovy +++ b/src/test/groovy/graphql/execution/incremental/IncrementalCallStateDeferTest.groovy @@ -167,7 +167,7 @@ class IncrementalCallStateDeferTest extends Specification { } } - def deferredCall = new DeferredFragmentCall(null, ResultPath.parse("/field/path"), [call1, call2], new DeferredCallContext()) + def deferredCall = new DeferredFragmentCall(null, ResultPath.parse("/field/path"), [call1, call2], new AlternativeCallContext()) when: def incrementalCallState = new IncrementalCallState() @@ -291,7 +291,7 @@ class IncrementalCallStateDeferTest extends Specification { } } - return new DeferredFragmentCall(null, ResultPath.parse(path), [callSupplier], new DeferredCallContext()) + return new DeferredFragmentCall(null, ResultPath.parse(path), [callSupplier], new AlternativeCallContext()) } private static DeferredFragmentCall offThreadCallWithinCall(IncrementalCallState incrementalCallState, String dataParent, String dataChild, int sleepTime, String path) { @@ -305,7 +305,7 @@ class IncrementalCallStateDeferTest extends Specification { }) } } - return new DeferredFragmentCall(null, ResultPath.parse("/field/path"), [callSupplier], new DeferredCallContext()) + return new DeferredFragmentCall(null, ResultPath.parse("/field/path"), [callSupplier], new AlternativeCallContext()) } private static void assertResultsSizeAndHasNextRule(int expectedSize, List results) {