Skip to content

Commit fe8951d

Browse files
committed
Initial code for subscription support
1 parent 2d4f518 commit fe8951d

9 files changed

Lines changed: 765 additions & 0 deletions

File tree

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ dependencies {
3333
compile 'org.antlr:antlr4-runtime:4.5.1'
3434
compile 'org.slf4j:slf4j-api:' + slf4jVersion
3535
compile 'com.graphql-java:java-dataloader:2.0.1'
36+
compile 'org.reactivestreams:reactive-streams:1.0.1'
3637
antlr "org.antlr:antlr4:4.5.1"
3738
testCompile group: 'junit', name: 'junit', version: '4.11'
3839
testCompile 'org.spockframework:spock-core:1.0-groovy-2.4'
@@ -44,6 +45,7 @@ dependencies {
4445
testCompile 'com.fasterxml.jackson.core:jackson-databind:2.8.8.1'
4546
testCompile 'org.slf4j:slf4j-simple:' + slf4jVersion
4647
testCompile 'org.awaitility:awaitility-groovy:3.0.0'
48+
testCompile 'org.reactivestreams:reactive-streams-tck:1.0.1'
4749

4850
}
4951

docs/execution.rst

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,91 @@ This behaviour makes it unsuitable to be used as a mutation execution strategy.
425425
.build();
426426
427427
428+
SubscriptionExecutionStrategy
429+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
430+
431+
Using graphql subscriptions allows you to create stateful subscriptions to graphql data. Initially it returns an execution
432+
result that contains a ``Publisher<ExecutionResult>`` event stream. You then subscribe to that event stream and you will be
433+
given further updates as ``ExecutionResult`` objects as that stream encounters new events.
434+
435+
.. code-block:: java
436+
437+
DataFetcher pubSubDataFetcher = new DataFetcher() {
438+
@Override
439+
public Object get(DataFetchingEnvironment environment) {
440+
int roomId = environment.getArgument("roomId");
441+
Publisher<Object> publisher = new Publisher<Object>() {
442+
@Override
443+
public void subscribe(Subscriber subscriber) {
444+
//
445+
// use your pub sub system of choice to actually implement proper publish and subscribe
446+
subscribeToRoom(subscriber, roomId);
447+
}
448+
};
449+
return publisher;
450+
}
451+
};
452+
453+
GraphQLObjectType messageType = newObject().name("Message")
454+
.field(newFieldDefinition().name("sender").type(Scalars.GraphQLString))
455+
.field(newFieldDefinition().name("text").type(Scalars.GraphQLString))
456+
.build();
457+
458+
459+
GraphQLObjectType subscriptionType = newObject().name("Subscription")
460+
.field(newFieldDefinition()
461+
.name("newMessage")
462+
.type(messageType)
463+
.dataFetcher(pubSubDataFetcher))
464+
.build();
465+
466+
GraphQLSchema graphQLSchema = GraphQLSchema.newSchema().subscription(subscriptionType).build();
467+
468+
SubscriptionExecutionStrategy subscriptionES = new SubscriptionExecutionStrategy();
469+
GraphQL graphql = GraphQL.newGraphQL(graphQLSchema)
470+
.subscriptionExecutionStrategy(subscriptionES)
471+
.build();
472+
473+
ExecutionResult subscriptionER = graphql.execute("" +
474+
"subscription NewMessages {" +
475+
" newMessage(roomId: 123) {" +
476+
" sender" +
477+
" text" +
478+
" }" +
479+
"}");
480+
481+
Publisher<ExecutionResult> eventStream = subscriptionER.getData();
482+
//
483+
// now new events will be supplied in this reactive event stream
484+
//
485+
// See http://www.reactive-streams.org/ for more information
486+
//
487+
eventStream.subscribe(new Subscriber<ExecutionResult>() {
488+
private Subscription subscription;
489+
490+
@Override
491+
public void onSubscribe(Subscription s) {
492+
this.subscription = s;
493+
subscription.request(1);
494+
}
495+
496+
@Override
497+
public void onNext(ExecutionResult executionResult) {
498+
handleNewData(executionResult);
499+
}
500+
501+
@Override
502+
public void onError(Throwable t) {
503+
// decide what to do if the event stream has gone into error
504+
}
505+
506+
@Override
507+
public void onComplete() {
508+
// clean up any resources
509+
}
510+
});
511+
512+
428513
429514
BatchedExecutionStrategy
430515
^^^^^^^^^^^^^^^^^^^^^^^^

src/main/java/graphql/execution/ExecutionContext.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.LinkedHashMap;
1515
import java.util.List;
1616
import java.util.Map;
17+
import java.util.function.Consumer;
1718

1819
@PublicApi
1920
public class ExecutionContext {
@@ -125,4 +126,26 @@ public ExecutionStrategy getMutationStrategy() {
125126
public ExecutionStrategy getSubscriptionStrategy() {
126127
return subscriptionStrategy;
127128
}
129+
130+
131+
public ExecutionContext transform(Consumer<ExecutionContextBuilder> builderConsumer) {
132+
ExecutionContextBuilder builder = new ExecutionContextBuilder()
133+
.graphQLSchema(this.graphQLSchema)
134+
.executionId(this.executionId)
135+
.instrumentationState(instrumentationState)
136+
.queryStrategy(this.queryStrategy)
137+
.mutationStrategy(this.mutationStrategy)
138+
.subscriptionStrategy(this.subscriptionStrategy)
139+
.fragmentsByName(this.fragmentsByName)
140+
.operationDefinition(this.operationDefinition)
141+
.document(this.document)
142+
.variables(this.variables)
143+
.root(this.root)
144+
.context(this.context)
145+
.instrumentation(this.instrumentation);
146+
147+
builderConsumer.accept(builder);
148+
return builder.build();
149+
}
150+
128151
}
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package graphql.execution;
2+
3+
import graphql.ExecutionResult;
4+
import graphql.ExecutionResultImpl;
5+
import graphql.language.Field;
6+
import org.reactivestreams.Processor;
7+
import org.reactivestreams.Publisher;
8+
import org.reactivestreams.Subscriber;
9+
import org.reactivestreams.Subscription;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
import java.util.Map;
14+
import java.util.concurrent.CompletableFuture;
15+
16+
import static graphql.Assert.assertNotNull;
17+
import static graphql.Assert.assertTrue;
18+
19+
/**
20+
* An execution strategy that implements graphql subscriptions by using reactive-streams
21+
* as the output result of the subscription query.
22+
*
23+
* Afterwards each object delivered on that stream will be mapped via running the original selection set over that object and hence producing an ExecutionResult
24+
* just like a normal graphql query.
25+
*
26+
* See https://github.com/facebook/graphql/blob/master/spec/Section%206%20--%20Execution.md
27+
* See http://www.reactive-streams.org/
28+
*/
29+
public class SubscriptionExecutionStrategy extends ExecutionStrategy {
30+
31+
public SubscriptionExecutionStrategy() {
32+
super();
33+
}
34+
35+
public SubscriptionExecutionStrategy(DataFetcherExceptionHandler dataFetcherExceptionHandler) {
36+
super(dataFetcherExceptionHandler);
37+
}
38+
39+
@Override
40+
public CompletableFuture<ExecutionResult> execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
41+
42+
CompletableFuture<Publisher<Object>> sourceEventStream = createSourceEventStream(executionContext, parameters);
43+
44+
//
45+
// when the upstream source event stream completes, subscribe to it and wire in our adapter
46+
return sourceEventStream.thenApply((publisher) -> {
47+
if (publisher == null) {
48+
return new ExecutionResultImpl(null, executionContext.getErrors());
49+
}
50+
Processor<Object, ExecutionResult> mapSourceToResponse = new ExecutionResultProcessor(executionContext, parameters);
51+
publisher.subscribe(mapSourceToResponse);
52+
return new ExecutionResultImpl(mapSourceToResponse, executionContext.getErrors());
53+
});
54+
}
55+
56+
/*
57+
https://github.com/facebook/graphql/blob/master/spec/Section%206%20--%20Execution.md
58+
59+
CreateSourceEventStream(subscription, schema, variableValues, initialValue):
60+
61+
Let {subscriptionType} be the root Subscription type in {schema}.
62+
Assert: {subscriptionType} is an Object type.
63+
Let {selectionSet} be the top level Selection Set in {subscription}.
64+
Let {rootField} be the first top level field in {selectionSet}.
65+
Let {argumentValues} be the result of {CoerceArgumentValues(subscriptionType, rootField, variableValues)}.
66+
Let {fieldStream} be the result of running {ResolveFieldEventStream(subscriptionType, initialValue, rootField, argumentValues)}.
67+
Return {fieldStream}.
68+
*/
69+
70+
private CompletableFuture<Publisher<Object>> createSourceEventStream(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
71+
ExecutionStrategyParameters newParameters = firstFieldOfSubscriptionSelection(parameters);
72+
73+
CompletableFuture<Object> fieldFetched = fetchField(executionContext, newParameters);
74+
return fieldFetched.thenApply(publisher -> {
75+
if (publisher != null) {
76+
assertTrue(publisher instanceof Publisher, "You data fetcher must return a Publisher of events when using graphql subscriptions");
77+
}
78+
//noinspection unchecked
79+
return (Publisher<Object>) publisher;
80+
});
81+
}
82+
83+
/*
84+
ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue):
85+
86+
Let {subscriptionType} be the root Subscription type in {schema}.
87+
Assert: {subscriptionType} is an Object type.
88+
Let {selectionSet} be the top level Selection Set in {subscription}.
89+
Let {data} be the result of running {ExecuteSelectionSet(selectionSet, subscriptionType, initialValue, variableValues)} normally (allowing parallelization).
90+
Let {errors} be any field errors produced while executing the selection set.
91+
Return an unordered map containing {data} and {errors}.
92+
93+
Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to {ExecuteQuery()} since this is how each event result is produced.
94+
*/
95+
96+
private CompletableFuture<ExecutionResult> executeSubscriptionEvent(ExecutionContext executionContext, ExecutionStrategyParameters parameters, Object eventPayload) {
97+
ExecutionContext newExecutionContext = executionContext.transform(builder -> builder.root(eventPayload));
98+
99+
ExecutionStrategyParameters newParameters = firstFieldOfSubscriptionSelection(parameters);
100+
101+
return completeField(newExecutionContext, newParameters, eventPayload);
102+
}
103+
104+
private ExecutionStrategyParameters firstFieldOfSubscriptionSelection(ExecutionStrategyParameters parameters) {
105+
Map<String, List<Field>> fields = parameters.fields();
106+
List<String> fieldNames = new ArrayList<>(fields.keySet());
107+
108+
List<Field> firstField = fields.get(fieldNames.get(0));
109+
110+
ExecutionPath fieldPath = parameters.path().segment(firstField.get(0).getName());
111+
return parameters.transform(builder -> builder.field(firstField).path(fieldPath));
112+
}
113+
114+
/**
115+
* A simple subscription that delegates to another
116+
*/
117+
private class DelegatingSubscription implements Subscription {
118+
private final Subscription upstreamSubscription;
119+
120+
DelegatingSubscription(Subscription upstreamSubscription) {
121+
this.upstreamSubscription = assertNotNull(upstreamSubscription);
122+
}
123+
124+
@Override
125+
public void request(long n) {
126+
upstreamSubscription.request(n);
127+
}
128+
129+
@Override
130+
public void cancel() {
131+
upstreamSubscription.cancel();
132+
}
133+
}
134+
135+
136+
/**
137+
* This class is a processor, that is it subscribes upstream and maps
138+
* the returned objects into graphql {@link graphql.ExecutionResult} by
139+
* completing each value from the upstream event source.
140+
*/
141+
private class ExecutionResultProcessor implements Processor<Object, ExecutionResult> {
142+
143+
private final ExecutionContext executionContext;
144+
private final ExecutionStrategyParameters parameters;
145+
private final List<Subscriber<? super ExecutionResult>> subscribers;
146+
private Subscription upstreamSubscription;
147+
148+
public ExecutionResultProcessor(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
149+
this.executionContext = executionContext;
150+
this.parameters = parameters;
151+
subscribers = new ArrayList<>();
152+
}
153+
154+
@Override
155+
public void onSubscribe(Subscription subscription) {
156+
this.upstreamSubscription = subscription;
157+
}
158+
159+
@Override
160+
public void subscribe(Subscriber<? super ExecutionResult> subscriber) {
161+
subscribers.add(subscriber);
162+
//
163+
// give the down stream subscribers a delegate subscription that just asks the upstream event stream
164+
// for more data
165+
subscriber.onSubscribe(new DelegatingSubscription(upstreamSubscription));
166+
}
167+
168+
@Override
169+
public void onNext(Object eventPayload) {
170+
CompletableFuture<ExecutionResult> resultPromise = executeSubscriptionEvent(executionContext, parameters, eventPayload);
171+
resultPromise.whenComplete((executionResult, throwable) -> {
172+
if (throwable != null) {
173+
subscribers.forEach(subscriber -> subscriber.onError(throwable));
174+
} else {
175+
subscribers.forEach(subscriber -> subscriber.onNext(executionResult));
176+
}
177+
});
178+
}
179+
180+
181+
@Override
182+
public void onError(Throwable throwable) {
183+
subscribers.forEach(subscriber -> subscriber.onError(throwable));
184+
}
185+
186+
@Override
187+
public void onComplete() {
188+
subscribers.forEach(Subscriber::onComplete);
189+
}
190+
}
191+
}

0 commit comments

Comments
 (0)