Skip to content

Commit 92713a5

Browse files
committed
This has a fix to buffer published events to keep them in order - the subscribers now inhrent from each other and outstanding futures are now cancelled if the Publisher fails
1 parent db46b1d commit 92713a5

9 files changed

+222
-45
lines changed

src/main/java/graphql/execution/SubscriptionExecutionStrategy.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import graphql.ExecutionResult;
44
import graphql.ExecutionResultImpl;
5+
import graphql.GraphQLContext;
56
import graphql.PublicApi;
67
import graphql.execution.instrumentation.ExecutionStrategyInstrumentationContext;
78
import graphql.execution.instrumentation.Instrumentation;
@@ -37,6 +38,14 @@
3738
@PublicApi
3839
public class SubscriptionExecutionStrategy extends ExecutionStrategy {
3940

41+
/**
42+
* If a boolean value is placed into the {@link GraphQLContext} with this key then the order
43+
* of the subscription events can be controlled. By default, subscription events are published
44+
* as the graphql subselection calls complete, and not in the order they originally arrived from the
45+
* source publisher. But this can be changed to {@link Boolean#TRUE} to keep them in order.
46+
*/
47+
public static final String KEEP_SUBSCRIPTION_EVENTS_ORDERED = "KEEP_SUBSCRIPTION_EVENTS_ORDERED";
48+
4049
public SubscriptionExecutionStrategy() {
4150
super();
4251
}
@@ -64,7 +73,8 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
6473
return new ExecutionResultImpl(null, executionContext.getErrors());
6574
}
6675
Function<Object, CompletionStage<ExecutionResult>> mapperFunction = eventPayload -> executeSubscriptionEvent(executionContext, parameters, eventPayload);
67-
SubscriptionPublisher mapSourceToResponse = new SubscriptionPublisher(publisher, mapperFunction);
76+
boolean keepOrdered = keepOrdered(executionContext.getGraphQLContext());
77+
SubscriptionPublisher mapSourceToResponse = new SubscriptionPublisher(publisher, mapperFunction, keepOrdered);
6878
return new ExecutionResultImpl(mapSourceToResponse, executionContext.getErrors());
6979
});
7080

@@ -75,6 +85,10 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
7585
return overallResult;
7686
}
7787

88+
private boolean keepOrdered(GraphQLContext graphQLContext) {
89+
return graphQLContext.getOrDefault(KEEP_SUBSCRIPTION_EVENTS_ORDERED, false);
90+
}
91+
7892

7993
/*
8094
https://github.com/facebook/graphql/blob/master/spec/Section%206%20--%20Execution.md
@@ -99,7 +113,7 @@ private CompletableFuture<Publisher<Object>> createSourceEventStream(ExecutionCo
99113
if (publisher != null) {
100114
assertTrue(publisher instanceof Publisher, () -> "Your data fetcher must return a Publisher of events when using graphql subscriptions");
101115
}
102-
//noinspection unchecked
116+
//noinspection unchecked,DataFlowIssue
103117
return (Publisher<Object>) publisher;
104118
});
105119
}
Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
package graphql.execution.reactive;
22

33
import graphql.Internal;
4+
import org.jetbrains.annotations.NotNull;
45
import org.reactivestreams.Publisher;
56
import org.reactivestreams.Subscriber;
67

78
import java.util.concurrent.CompletionStage;
89
import java.util.function.Function;
910

10-
import static graphql.Assert.assertNotNullWithNPE;
11-
1211
/**
1312
* A reactive Publisher that bridges over another Publisher of `D` and maps the results
1413
* to type `U` via a CompletionStage, handling errors in that stage but keeps the results
@@ -19,34 +18,19 @@
1918
* @param <U> the upstream type to be mapped to
2019
*/
2120
@Internal
22-
public class CompletionStageMappingOrderedPublisher<D, U> implements Publisher<D> {
23-
private final Publisher<U> upstreamPublisher;
24-
private final Function<U, CompletionStage<D>> mapper;
25-
21+
public class CompletionStageMappingOrderedPublisher<D, U> extends CompletionStageMappingPublisher<D, U> {
2622
/**
2723
* You need the following :
2824
*
2925
* @param upstreamPublisher an upstream source of data
3026
* @param mapper a mapper function that turns upstream data into a promise of mapped D downstream data
3127
*/
3228
public CompletionStageMappingOrderedPublisher(Publisher<U> upstreamPublisher, Function<U, CompletionStage<D>> mapper) {
33-
this.upstreamPublisher = upstreamPublisher;
34-
this.mapper = mapper;
29+
super(upstreamPublisher, mapper);
3530
}
3631

3732
@Override
38-
public void subscribe(Subscriber<? super D> downstreamSubscriber) {
39-
assertNotNullWithNPE(downstreamSubscriber, () -> "Subscriber passed to subscribe must not be null");
40-
upstreamPublisher.subscribe(new CompletionStageOrderedSubscriber<>(mapper, downstreamSubscriber));
41-
}
42-
43-
/**
44-
* Get instance of an upstreamPublisher
45-
*
46-
* @return upstream instance of {@link Publisher}
47-
*/
48-
public Publisher<U> getUpstreamPublisher() {
49-
return upstreamPublisher;
33+
protected @NotNull Subscriber<? super U> createSubscriber(Subscriber<? super D> downstreamSubscriber) {
34+
return new CompletionStageOrderedSubscriber<>(mapper, downstreamSubscriber);
5035
}
51-
5236
}

src/main/java/graphql/execution/reactive/CompletionStageMappingPublisher.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package graphql.execution.reactive;
22

33
import graphql.Internal;
4+
import org.jetbrains.annotations.NotNull;
45
import org.reactivestreams.Publisher;
56
import org.reactivestreams.Subscriber;
67

78
import java.util.concurrent.CompletionStage;
89
import java.util.function.Function;
910

11+
import static graphql.Assert.assertNotNullWithNPE;
12+
1013
/**
1114
* A reactive Publisher that bridges over another Publisher of `D` and maps the results
1215
* to type `U` via a CompletionStage, handling errors in that stage
@@ -16,8 +19,8 @@
1619
*/
1720
@Internal
1821
public class CompletionStageMappingPublisher<D, U> implements Publisher<D> {
19-
private final Publisher<U> upstreamPublisher;
20-
private final Function<U, CompletionStage<D>> mapper;
22+
protected final Publisher<U> upstreamPublisher;
23+
protected final Function<U, CompletionStage<D>> mapper;
2124

2225
/**
2326
* You need the following :
@@ -32,9 +35,16 @@ public CompletionStageMappingPublisher(Publisher<U> upstreamPublisher, Function<
3235

3336
@Override
3437
public void subscribe(Subscriber<? super D> downstreamSubscriber) {
35-
upstreamPublisher.subscribe(new CompletionStageSubscriber<>(mapper, downstreamSubscriber));
38+
assertNotNullWithNPE(downstreamSubscriber, () -> "Subscriber passed to subscribe must not be null");
39+
upstreamPublisher.subscribe(createSubscriber(downstreamSubscriber));
40+
}
41+
42+
@NotNull
43+
protected Subscriber<? super U> createSubscriber(Subscriber<? super D> downstreamSubscriber) {
44+
return new CompletionStageSubscriber<>(mapper, downstreamSubscriber);
3645
}
3746

47+
3848
/**
3949
* Get instance of an upstreamPublisher
4050
*

src/main/java/graphql/execution/reactive/CompletionStageSubscriber.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ protected void handleThrowableDuringMapping(Throwable throwable) {
125125
// has happened, no more messages flow
126126
//
127127
delegatingSubscription.cancel();
128+
129+
cancelInFlightFutures();
128130
}
129131
}
130132

@@ -133,6 +135,7 @@ public void onError(Throwable t) {
133135
// we immediately terminate - we don't wait for any promises to complete
134136
if (isTerminal.compareAndSet(false, true)) {
135137
downstreamSubscriber.onError(t);
138+
cancelInFlightFutures();
136139
}
137140
}
138141

@@ -168,6 +171,21 @@ private boolean removeFromInFlightQAndCheckIfEmpty(CompletionStage<?> completion
168171
});
169172
}
170173

174+
/**
175+
* If the promise is backed by frameworks such as Reactor, then the cancel()
176+
* can cause them to propagate the cancel back into the reactive chain
177+
*/
178+
private void cancelInFlightFutures() {
179+
lock.runLocked(() -> {
180+
while (!inFlightDataQ.isEmpty()) {
181+
CompletionStage<?> cs = inFlightDataQ.poll();
182+
if (cs != null) {
183+
cs.toCompletableFuture().cancel(false);
184+
}
185+
}
186+
});
187+
}
188+
171189
protected boolean inFlightQIsEmpty() {
172190
return lock.callLocked(inFlightDataQ::isEmpty);
173191
}

src/main/java/graphql/execution/reactive/SubscriptionPublisher.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,15 @@ public class SubscriptionPublisher implements Publisher<ExecutionResult> {
3131
*
3232
* @param upstreamPublisher the original publisher of objects that then have a graphql selection set applied to them
3333
* @param mapper a mapper that turns object into promises to execution results which are then published on this stream
34+
* @param keepOrdered this indicates that the order of results should be kep in the same order as the source events arrive
3435
*/
3536
@Internal
36-
public SubscriptionPublisher(Publisher<Object> upstreamPublisher, Function<Object, CompletionStage<ExecutionResult>> mapper) {
37-
mappingPublisher = new CompletionStageMappingPublisher<>(upstreamPublisher, mapper);
37+
public SubscriptionPublisher(Publisher<Object> upstreamPublisher, Function<Object, CompletionStage<ExecutionResult>> mapper, boolean keepOrdered) {
38+
if (keepOrdered) {
39+
mappingPublisher = new CompletionStageMappingOrderedPublisher<>(upstreamPublisher, mapper);
40+
} else {
41+
mappingPublisher = new CompletionStageMappingPublisher<>(upstreamPublisher, mapper);
42+
}
3843
}
3944

4045
/**

src/test/groovy/graphql/execution/SubscriptionExecutionStrategyTest.groovy

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,4 +628,97 @@ class SubscriptionExecutionStrategyTest extends Specification {
628628
instrumentResultCalls.size() == 11 // one for the initial execution and then one for each stream event
629629
}
630630
631+
def "emits results in the order they complete"() {
632+
List<Runnable> promises = []
633+
Publisher<Object> publisher = new RxJavaMessagePublisher(10)
634+
635+
DataFetcher newMessageDF = { env -> return publisher }
636+
DataFetcher senderDF = dfThatDoesNotComplete("sender", promises)
637+
DataFetcher textDF = PropertyDataFetcher.fetching("text")
638+
639+
GraphQL graphQL = buildSubscriptionQL(newMessageDF, senderDF, textDF)
640+
641+
def executionInput = ExecutionInput.newExecutionInput().query("""
642+
subscription NewMessages {
643+
newMessage(roomId: 123) {
644+
sender
645+
text
646+
}
647+
}
648+
""").build()
649+
650+
def executionResult = graphQL.execute(executionInput)
651+
652+
when:
653+
Publisher<ExecutionResult> msgStream = executionResult.getData()
654+
def capturingSubscriber = new CapturingSubscriber<ExecutionResult>(100)
655+
msgStream.subscribe(capturingSubscriber)
656+
657+
// make them all complete but in reverse order
658+
promises.reverse().forEach { it.run() }
659+
660+
then:
661+
Awaitility.await().untilTrue(capturingSubscriber.isDone())
662+
663+
// in order they completed - which was reversed
664+
def messages = capturingSubscriber.events
665+
messages.size() == 10
666+
for (int i = 0, j = messages.size() - 1; i < messages.size(); i++, j--) {
667+
def message = messages[i].data
668+
assert message == ["newMessage": [sender: "sender" + j, text: "text" + j]]
669+
}
670+
}
671+
672+
def "emits results in the order they where emitted by source"() {
673+
List<Runnable> promises = []
674+
Publisher<Object> publisher = new RxJavaMessagePublisher(10)
675+
676+
DataFetcher newMessageDF = { env -> return publisher }
677+
DataFetcher senderDF = dfThatDoesNotComplete("sender", promises)
678+
DataFetcher textDF = PropertyDataFetcher.fetching("text")
679+
680+
GraphQL graphQL = buildSubscriptionQL(newMessageDF, senderDF, textDF)
681+
682+
def executionInput = ExecutionInput.newExecutionInput().query("""
683+
subscription NewMessages {
684+
newMessage(roomId: 123) {
685+
sender
686+
text
687+
}
688+
}
689+
""").graphQLContext([(SubscriptionExecutionStrategy.KEEP_SUBSCRIPTION_EVENTS_ORDERED): true]).build()
690+
691+
def executionResult = graphQL.execute(executionInput)
692+
693+
when:
694+
Publisher<ExecutionResult> msgStream = executionResult.getData()
695+
def capturingSubscriber = new CapturingSubscriber<ExecutionResult>(100)
696+
msgStream.subscribe(capturingSubscriber)
697+
698+
// make them all complete but in reverse order
699+
promises.reverse().forEach { it.run() }
700+
701+
then:
702+
Awaitility.await().untilTrue(capturingSubscriber.isDone())
703+
704+
// in order they were emitted originally - they have been buffered
705+
def messages = capturingSubscriber.events
706+
messages.size() == 10
707+
for (int i = 0; i < messages.size(); i++) {
708+
def message = messages[i].data
709+
assert message == ["newMessage": [sender: "sender" + i, text: "text" + i]]
710+
}
711+
}
712+
713+
private static DataFetcher<?> dfThatDoesNotComplete(String propertyName, List<Runnable> promises) {
714+
{ env ->
715+
def df = PropertyDataFetcher.fetching(propertyName)
716+
def value = df.get(env)
717+
718+
def cf = new CompletableFuture()
719+
promises.add({ cf.complete(value) })
720+
return cf
721+
}
722+
}
723+
631724
}

src/test/groovy/graphql/execution/reactive/CompletionStageOrderedSubscriberTest.groovy

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import java.util.concurrent.CompletableFuture
88
import java.util.concurrent.CompletionStage
99
import java.util.function.Function
1010

11+
import static graphql.execution.reactive.CompletionStageSubscriberTest.mapperThatDoesNotComplete
12+
1113
class CompletionStageOrderedSubscriberTest extends Specification {
1214

1315
def "basic test of mapping"() {
@@ -140,6 +142,32 @@ class CompletionStageOrderedSubscriberTest extends Specification {
140142
capturingSubscriber.events == []
141143
}
142144

145+
def "if onError is called, then futures are cancelled"() {
146+
def capturingSubscriber = new CapturingSubscriber<>()
147+
def subscription = new CapturingSubscription()
148+
List<CompletableFuture> promises = []
149+
Function<Integer, CompletionStage<String>> mapper = mapperThatDoesNotComplete([], promises)
150+
def completionStageSubscriber = new CompletionStageSubscriber<Integer, String>(mapper, capturingSubscriber)
151+
152+
when:
153+
completionStageSubscriber.onSubscribe(subscription)
154+
completionStageSubscriber.onNext(0)
155+
completionStageSubscriber.onNext(1)
156+
completionStageSubscriber.onNext(2)
157+
completionStageSubscriber.onNext(3)
158+
completionStageSubscriber.onError(new RuntimeException("Bang"))
159+
160+
then:
161+
!capturingSubscriber.isCompleted()
162+
capturingSubscriber.isCompletedExceptionally()
163+
capturingSubscriber.events == []
164+
165+
promises.size() == 4
166+
for (CompletableFuture<?> cf : promises) {
167+
assert cf.isCancelled(), "The CF was not cancelled?"
168+
}
169+
}
170+
143171
def "emits values in the order they arrive not the order they complete"() {
144172
def capturingSubscriber = new CapturingSubscriber<>()
145173
def subscription = new CapturingSubscription()
@@ -166,19 +194,8 @@ class CompletionStageOrderedSubscriberTest extends Specification {
166194
then:
167195
!subscription.isCancelled()
168196
capturingSubscriber.isCompleted()
169-
capturingSubscriber.events == ["0","1","2","3"]
197+
capturingSubscriber.events == ["0", "1", "2", "3"]
170198
}
171199

172-
private static Function<Integer, CompletionStage<String>> mapperThatDoesNotComplete(List<Runnable> promises) {
173-
def mapper = new Function<Integer, CompletionStage<String>>() {
174-
@Override
175-
CompletionStage<String> apply(Integer integer) {
176-
def cf = new CompletableFuture<String>()
177-
promises.add({ cf.complete(String.valueOf(integer)) })
178-
return cf
179-
}
180-
}
181-
mapper
182-
}
183200

184201
}

0 commit comments

Comments
 (0)