Skip to content

Commit d9c3abd

Browse files
committed
dataloader for subscriptions
1 parent 37a4913 commit d9c3abd

File tree

2 files changed

+21
-14
lines changed

2 files changed

+21
-14
lines changed

src/main/java/graphql/execution/SubscriptionExecutionStrategy.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ private boolean keepOrdered(GraphQLContext graphQLContext) {
107107
*/
108108

109109
private CompletableFuture<Publisher<Object>> createSourceEventStream(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
110-
ExecutionStrategyParameters newParameters = firstFieldOfSubscriptionSelection(executionContext, parameters);
110+
ExecutionStrategyParameters newParameters = firstFieldOfSubscriptionSelection(executionContext, parameters, false);
111111

112112
CompletableFuture<FetchedValue> fieldFetched = Async.toCompletableFuture(fetchField(executionContext, newParameters));
113113
return fieldFetched.thenApply(fetchedValue -> {
@@ -141,20 +141,20 @@ private CompletableFuture<ExecutionResult> executeSubscriptionEvent(ExecutionCon
141141
.root(eventPayload)
142142
.resetErrors()
143143
);
144-
ExecutionStrategyParameters newParameters = firstFieldOfSubscriptionSelection(newExecutionContext, parameters);
144+
ExecutionStrategyParameters newParameters = firstFieldOfSubscriptionSelection(newExecutionContext, parameters, true);
145145
ExecutionStepInfo subscribedFieldStepInfo = createSubscribedFieldStepInfo(executionContext, newParameters);
146146

147147
InstrumentationFieldParameters i13nFieldParameters = new InstrumentationFieldParameters(executionContext, () -> subscribedFieldStepInfo);
148148
InstrumentationContext<ExecutionResult> subscribedFieldCtx = nonNullCtx(instrumentation.beginSubscribedFieldEvent(
149149
i13nFieldParameters, executionContext.getInstrumentationState()
150150
));
151151

152-
FetchedValue fetchedValue = unboxPossibleDataFetcherResult(newExecutionContext, parameters, eventPayload);
152+
FetchedValue fetchedValue = unboxPossibleDataFetcherResult(newExecutionContext, newParameters, eventPayload);
153153
FieldValueInfo fieldValueInfo = completeField(newExecutionContext, newParameters, fetchedValue);
154154
executionContext.getDataLoaderDispatcherStrategy().newSubscriptionExecution(fieldValueInfo, newParameters.getDeferredCallContext());
155155
CompletableFuture<ExecutionResult> overallResult = fieldValueInfo
156156
.getFieldValueFuture()
157-
.thenApply(val -> new ExecutionResultImpl(val, newExecutionContext.getErrors()))
157+
.thenApply(val -> new ExecutionResultImpl(val, newParameters.getDeferredCallContext().getErrors()))
158158
.thenApply(executionResult -> wrapWithRootFieldName(newParameters, executionResult));
159159

160160
// dispatch instrumentation so they can know about each subscription event
@@ -182,24 +182,30 @@ private String getRootFieldName(ExecutionStrategyParameters parameters) {
182182
return rootField.getResultKey();
183183
}
184184

185-
private ExecutionStrategyParameters firstFieldOfSubscriptionSelection(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
185+
private ExecutionStrategyParameters firstFieldOfSubscriptionSelection(ExecutionContext executionContext,
186+
ExecutionStrategyParameters parameters,
187+
boolean newCallContext) {
186188
MergedSelectionSet fields = parameters.getFields();
187189
MergedField firstField = fields.getSubField(fields.getKeys().get(0));
188190

189191
ResultPath fieldPath = parameters.getPath().segment(mkNameForPath(firstField.getSingleField()));
190192
NonNullableFieldValidator nonNullableFieldValidator = new NonNullableFieldValidator(executionContext);
191193

192194

193-
return parameters.transform(builder -> builder
194-
.field(firstField)
195-
.path(fieldPath)
196-
.nonNullFieldValidator(nonNullableFieldValidator)
197-
.deferredCallContext(new DeferredCallContext(1, 1))
198-
);
195+
return parameters.transform(builder -> {
196+
builder
197+
.field(firstField)
198+
.path(fieldPath)
199+
.nonNullFieldValidator(nonNullableFieldValidator);
200+
if (newCallContext) {
201+
builder.deferredCallContext(new DeferredCallContext(1, 1));
202+
}
203+
});
199204

200205
}
201206

202-
private ExecutionStepInfo createSubscribedFieldStepInfo(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
207+
private ExecutionStepInfo createSubscribedFieldStepInfo(ExecutionContext
208+
executionContext, ExecutionStrategyParameters parameters) {
203209
Field field = parameters.getField().getSingleField();
204210
GraphQLObjectType parentType = (GraphQLObjectType) parameters.getExecutionStepInfo().getUnwrappedNonNullType();
205211
GraphQLFieldDefinition fieldDef = getFieldDef(executionContext.getGraphQLSchema(), parentType, field);

src/main/java/graphql/execution/incremental/DeferredCallContext.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
import graphql.Internal;
55
import graphql.VisibleForTesting;
66

7+
import java.util.ArrayList;
8+
import java.util.Collections;
79
import java.util.List;
8-
import java.util.concurrent.CopyOnWriteArrayList;
910

1011
/**
1112
* Contains data relevant to the execution of a {@link DeferredFragmentCall}.
@@ -24,7 +25,7 @@ public class DeferredCallContext {
2425
private final int startLevel;
2526
private final int fields;
2627

27-
private final List<GraphQLError> errors = new CopyOnWriteArrayList<>();
28+
private final List<GraphQLError> errors = Collections.synchronizedList(new ArrayList<>());
2829

2930
public DeferredCallContext(int startLevel, int fields) {
3031
this.startLevel = startLevel;

0 commit comments

Comments
 (0)