Skip to content

Commit 3305c46

Browse files
committed
Merge remote-tracking branch 'origin/master' into start-drain-on-publisher-subscribe
2 parents 6483500 + 285d2c0 commit 3305c46

20 files changed

+1258
-161
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ dependencies {
117117

118118
testImplementation 'org.reactivestreams:reactive-streams-tck:' + reactiveStreamsVersion
119119
testImplementation "io.reactivex.rxjava2:rxjava:2.2.21"
120+
testImplementation "io.projectreactor:reactor-core:3.6.5"
120121

121122
testImplementation 'org.testng:testng:7.10.2' // use for reactive streams test inheritance
122123

src/main/java/graphql/execution/SubscriptionExecutionStrategy.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import graphql.ExecutionResult;
44
import graphql.ExecutionResultImpl;
5+
import graphql.GraphQLContext;
56
import graphql.PublicApi;
67
import graphql.execution.instrumentation.ExecutionStrategyInstrumentationContext;
78
import graphql.execution.instrumentation.Instrumentation;
@@ -37,6 +38,14 @@
3738
@PublicApi
3839
public class SubscriptionExecutionStrategy extends ExecutionStrategy {
3940

41+
/**
42+
* If a boolean value is placed into the {@link GraphQLContext} with this key then the order
43+
* of the subscription events can be controlled. By default, subscription events are published
44+
* as the graphql subselection calls complete, and not in the order they originally arrived from the
45+
* source publisher. But this can be changed to {@link Boolean#TRUE} to keep them in order.
46+
*/
47+
public static final String KEEP_SUBSCRIPTION_EVENTS_ORDERED = "KEEP_SUBSCRIPTION_EVENTS_ORDERED";
48+
4049
public SubscriptionExecutionStrategy() {
4150
super();
4251
}
@@ -64,7 +73,8 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
6473
return new ExecutionResultImpl(null, executionContext.getErrors());
6574
}
6675
Function<Object, CompletionStage<ExecutionResult>> mapperFunction = eventPayload -> executeSubscriptionEvent(executionContext, parameters, eventPayload);
67-
SubscriptionPublisher mapSourceToResponse = new SubscriptionPublisher(publisher, mapperFunction);
76+
boolean keepOrdered = keepOrdered(executionContext.getGraphQLContext());
77+
SubscriptionPublisher mapSourceToResponse = new SubscriptionPublisher(publisher, mapperFunction, keepOrdered);
6878
return new ExecutionResultImpl(mapSourceToResponse, executionContext.getErrors());
6979
});
7080

@@ -75,6 +85,10 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
7585
return overallResult;
7686
}
7787

88+
private boolean keepOrdered(GraphQLContext graphQLContext) {
89+
return graphQLContext.getOrDefault(KEEP_SUBSCRIPTION_EVENTS_ORDERED, false);
90+
}
91+
7892

7993
/*
8094
https://github.com/facebook/graphql/blob/master/spec/Section%206%20--%20Execution.md
@@ -99,7 +113,7 @@ private CompletableFuture<Publisher<Object>> createSourceEventStream(ExecutionCo
99113
if (publisher != null) {
100114
assertTrue(publisher instanceof Publisher, () -> "Your data fetcher must return a Publisher of events when using graphql subscriptions");
101115
}
102-
//noinspection unchecked
116+
//noinspection unchecked,DataFlowIssue
103117
return (Publisher<Object>) publisher;
104118
});
105119
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package graphql.execution.reactive;
2+
3+
import graphql.Internal;
4+
import org.jetbrains.annotations.NotNull;
5+
import org.reactivestreams.Publisher;
6+
import org.reactivestreams.Subscriber;
7+
8+
import java.util.concurrent.CompletionStage;
9+
import java.util.function.Function;
10+
11+
/**
12+
* A reactive Publisher that bridges over another Publisher of `D` and maps the results
13+
* to type `U` via a CompletionStage, handling errors in that stage but keeps the results
14+
* in order of downstream publishing. This means it must queue unfinished
15+
* completion stages in memory in arrival order.
16+
*
17+
* @param <D> the downstream type
18+
* @param <U> the upstream type to be mapped to
19+
*/
20+
@Internal
21+
public class CompletionStageMappingOrderedPublisher<D, U> extends CompletionStageMappingPublisher<D, U> {
22+
/**
23+
* You need the following :
24+
*
25+
* @param upstreamPublisher an upstream source of data
26+
* @param mapper a mapper function that turns upstream data into a promise of mapped D downstream data
27+
*/
28+
public CompletionStageMappingOrderedPublisher(Publisher<U> upstreamPublisher, Function<U, CompletionStage<D>> mapper) {
29+
super(upstreamPublisher, mapper);
30+
}
31+
32+
@Override
33+
protected @NotNull Subscriber<? super U> createSubscriber(Subscriber<? super D> downstreamSubscriber) {
34+
return new CompletionStageOrderedSubscriber<>(mapper, downstreamSubscriber);
35+
}
36+
}
Lines changed: 15 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,26 @@
11
package graphql.execution.reactive;
22

33
import graphql.Internal;
4-
import graphql.util.LockKit;
4+
import org.jetbrains.annotations.NotNull;
55
import org.reactivestreams.Publisher;
66
import org.reactivestreams.Subscriber;
7-
import org.reactivestreams.Subscription;
87

9-
import java.util.ArrayDeque;
10-
import java.util.Queue;
118
import java.util.concurrent.CompletionStage;
12-
import java.util.concurrent.atomic.AtomicBoolean;
13-
import java.util.concurrent.atomic.AtomicReference;
14-
import java.util.function.BiConsumer;
159
import java.util.function.Function;
1610

11+
import static graphql.Assert.assertNotNullWithNPE;
12+
1713
/**
1814
* A reactive Publisher that bridges over another Publisher of `D` and maps the results
1915
* to type `U` via a CompletionStage, handling errors in that stage
2016
*
21-
* @param <D> the down stream type
22-
* @param <U> the up stream type to be mapped to
17+
* @param <D> the downstream type
18+
* @param <U> the upstream type to be mapped to
2319
*/
24-
@SuppressWarnings("ReactiveStreamsPublisherImplementation")
2520
@Internal
2621
public class CompletionStageMappingPublisher<D, U> implements Publisher<D> {
27-
private final Publisher<U> upstreamPublisher;
28-
private final Function<U, CompletionStage<D>> mapper;
22+
protected final Publisher<U> upstreamPublisher;
23+
protected final Function<U, CompletionStage<D>> mapper;
2924

3025
/**
3126
* You need the following :
@@ -40,9 +35,16 @@ public CompletionStageMappingPublisher(Publisher<U> upstreamPublisher, Function<
4035

4136
@Override
4237
public void subscribe(Subscriber<? super D> downstreamSubscriber) {
43-
upstreamPublisher.subscribe(new CompletionStageSubscriber(downstreamSubscriber));
38+
assertNotNullWithNPE(downstreamSubscriber, () -> "Subscriber passed to subscribe must not be null");
39+
upstreamPublisher.subscribe(createSubscriber(downstreamSubscriber));
40+
}
41+
42+
@NotNull
43+
protected Subscriber<? super U> createSubscriber(Subscriber<? super D> downstreamSubscriber) {
44+
return new CompletionStageSubscriber<>(mapper, downstreamSubscriber);
4445
}
4546

47+
4648
/**
4749
* Get instance of an upstreamPublisher
4850
*
@@ -52,126 +54,4 @@ public Publisher<U> getUpstreamPublisher() {
5254
return upstreamPublisher;
5355
}
5456

55-
@SuppressWarnings("ReactiveStreamsSubscriberImplementation")
56-
@Internal
57-
public class CompletionStageSubscriber implements Subscriber<U> {
58-
private final Subscriber<? super D> downstreamSubscriber;
59-
Subscription delegatingSubscription;
60-
final Queue<CompletionStage<?>> inFlightDataQ;
61-
final LockKit.ReentrantLock lock = new LockKit.ReentrantLock();
62-
final AtomicReference<Runnable> onCompleteOrErrorRun;
63-
final AtomicBoolean onCompleteOrErrorRunCalled;
64-
65-
public CompletionStageSubscriber(Subscriber<? super D> downstreamSubscriber) {
66-
this.downstreamSubscriber = downstreamSubscriber;
67-
inFlightDataQ = new ArrayDeque<>();
68-
onCompleteOrErrorRun = new AtomicReference<>();
69-
onCompleteOrErrorRunCalled = new AtomicBoolean(false);
70-
}
71-
72-
73-
@Override
74-
public void onSubscribe(Subscription subscription) {
75-
delegatingSubscription = new DelegatingSubscription(subscription);
76-
downstreamSubscriber.onSubscribe(delegatingSubscription);
77-
}
78-
79-
@Override
80-
public void onNext(U u) {
81-
// for safety - no more data after we have called done/error - we should not get this BUT belts and braces
82-
if (onCompleteOrErrorRunCalled.get()) {
83-
return;
84-
}
85-
try {
86-
CompletionStage<D> completionStage = mapper.apply(u);
87-
offerToInFlightQ(completionStage);
88-
completionStage.whenComplete(whenNextFinished(completionStage));
89-
} catch (RuntimeException throwable) {
90-
handleThrowable(throwable);
91-
}
92-
}
93-
94-
private BiConsumer<D, Throwable> whenNextFinished(CompletionStage<D> completionStage) {
95-
return (d, throwable) -> {
96-
try {
97-
if (throwable != null) {
98-
handleThrowable(throwable);
99-
} else {
100-
downstreamSubscriber.onNext(d);
101-
}
102-
} finally {
103-
Runnable runOnCompleteOrErrorRun = onCompleteOrErrorRun.get();
104-
boolean empty = removeFromInFlightQAndCheckIfEmpty(completionStage);
105-
if (empty && runOnCompleteOrErrorRun != null) {
106-
onCompleteOrErrorRun.set(null);
107-
runOnCompleteOrErrorRun.run();
108-
}
109-
}
110-
};
111-
}
112-
113-
private void handleThrowable(Throwable throwable) {
114-
downstreamSubscriber.onError(throwable);
115-
//
116-
// reactive semantics say that IF an exception happens on a publisher
117-
// then onError is called and no more messages flow. But since the exception happened
118-
// during the mapping, the upstream publisher does not no about this.
119-
// so we cancel to bring the semantics back together, that is as soon as an exception
120-
// has happened, no more messages flow
121-
//
122-
delegatingSubscription.cancel();
123-
}
124-
125-
@Override
126-
public void onError(Throwable t) {
127-
onCompleteOrError(() -> {
128-
onCompleteOrErrorRunCalled.set(true);
129-
downstreamSubscriber.onError(t);
130-
});
131-
}
132-
133-
@Override
134-
public void onComplete() {
135-
onCompleteOrError(() -> {
136-
onCompleteOrErrorRunCalled.set(true);
137-
downstreamSubscriber.onComplete();
138-
});
139-
}
140-
141-
/**
142-
* Get instance of downstream subscriber
143-
*
144-
* @return {@link Subscriber}
145-
*/
146-
public Subscriber<? super D> getDownstreamSubscriber() {
147-
return downstreamSubscriber;
148-
}
149-
150-
private void onCompleteOrError(Runnable doneCodeToRun) {
151-
if (inFlightQIsEmpty()) {
152-
// run right now
153-
doneCodeToRun.run();
154-
} else {
155-
onCompleteOrErrorRun.set(doneCodeToRun);
156-
}
157-
}
158-
159-
private void offerToInFlightQ(CompletionStage<?> completionStage) {
160-
lock.runLocked(() ->
161-
inFlightDataQ.offer(completionStage)
162-
);
163-
}
164-
165-
private boolean removeFromInFlightQAndCheckIfEmpty(CompletionStage<?> completionStage) {
166-
// uncontested locks in java are cheap - we don't expect much contention here
167-
return lock.callLocked(() -> {
168-
inFlightDataQ.remove(completionStage);
169-
return inFlightDataQ.isEmpty();
170-
});
171-
}
172-
173-
private boolean inFlightQIsEmpty() {
174-
return lock.callLocked(inFlightDataQ::isEmpty);
175-
}
176-
}
17757
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package graphql.execution.reactive;
2+
3+
import graphql.Internal;
4+
import org.reactivestreams.Subscriber;
5+
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.concurrent.CompletionException;
8+
import java.util.concurrent.CompletionStage;
9+
import java.util.function.Function;
10+
11+
/**
12+
* This subscriber can be used to map between a {@link org.reactivestreams.Publisher} of U
13+
* elements and map them into {@link CompletionStage} of D promises, and it keeps them in the order
14+
* the Publisher provided them.
15+
*
16+
* @param <U> published upstream elements
17+
* @param <D> mapped downstream values
18+
*/
19+
@Internal
20+
public class CompletionStageOrderedSubscriber<U, D> extends CompletionStageSubscriber<U, D> implements Subscriber<U> {
21+
22+
public CompletionStageOrderedSubscriber(Function<U, CompletionStage<D>> mapper, Subscriber<? super D> downstreamSubscriber) {
23+
super(mapper, downstreamSubscriber);
24+
}
25+
26+
@Override
27+
protected void whenNextFinished(CompletionStage<D> completionStage, D d, Throwable throwable) {
28+
try {
29+
if (throwable != null) {
30+
handleThrowableDuringMapping(throwable);
31+
} else {
32+
emptyInFlightQueueIfWeCan();
33+
}
34+
} finally {
35+
boolean empty = inFlightQIsEmpty();
36+
finallyAfterEachPromiseFinishes(empty);
37+
}
38+
}
39+
40+
private void emptyInFlightQueueIfWeCan() {
41+
// done inside a memory lock, so we cant offer new CFs to the queue
42+
// until we have processed any completed ones from the start of
43+
// the queue.
44+
lock.runLocked(() -> {
45+
//
46+
// from the top of the in flight queue, take all the CFs that have
47+
// completed... but stop if they are not done
48+
while (!inFlightDataQ.isEmpty()) {
49+
CompletionStage<?> cs = inFlightDataQ.peek();
50+
if (cs != null) {
51+
//
52+
CompletableFuture<?> cf = cs.toCompletableFuture();
53+
if (cf.isDone()) {
54+
// take it off the queue
55+
inFlightDataQ.poll();
56+
D value;
57+
try {
58+
//noinspection unchecked
59+
value = (D) cf.join();
60+
} catch (RuntimeException rte) {
61+
//
62+
// if we get an exception while joining on a value, we
63+
// send it into the exception handling and break out
64+
handleThrowableDuringMapping(cfExceptionUnwrap(rte));
65+
break;
66+
}
67+
downstreamSubscriber.onNext(value);
68+
} else {
69+
// if the CF is not done, then we have to stop processing
70+
// to keep the results in order inside the inFlightQueue
71+
break;
72+
}
73+
}
74+
}
75+
});
76+
}
77+
78+
private Throwable cfExceptionUnwrap(Throwable throwable) {
79+
if (throwable instanceof CompletionException & throwable.getCause() != null) {
80+
return throwable.getCause();
81+
}
82+
return throwable;
83+
}
84+
}

0 commit comments

Comments
 (0)