Skip to content

Commit 15fb701

Browse files
authored
This fixes the Subscription publisher so that it does not drop values that have been received but not yet mapped by the graphql layer (#1801)
1 parent 0b49de6 commit 15fb701

3 files changed

Lines changed: 109 additions & 9 deletions

File tree

src/main/java/graphql/execution/reactive/CompletionStageMappingPublisher.java

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44
import org.reactivestreams.Subscriber;
55
import org.reactivestreams.Subscription;
66

7+
import java.util.ArrayDeque;
8+
import java.util.Queue;
79
import java.util.concurrent.CompletionStage;
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
import java.util.concurrent.atomic.AtomicReference;
12+
import java.util.function.BiConsumer;
813
import java.util.function.Function;
914

1015
/**
@@ -33,6 +38,10 @@ public CompletionStageMappingPublisher(Publisher<U> upstreamPublisher, Function<
3338
public void subscribe(Subscriber<? super D> downstreamSubscriber) {
3439
upstreamPublisher.subscribe(new Subscriber<U>() {
3540
Subscription delegatingSubscription;
41+
final Queue<CompletionStage<?>> inFlightDataQ = new ArrayDeque<>();
42+
final AtomicReference<Runnable> onCompleteOrErrorRun = new AtomicReference<>();
43+
final AtomicBoolean onCompleteOrErrorRunCalled = new AtomicBoolean(false);
44+
3645

3746
@Override
3847
public void onSubscribe(Subscription subscription) {
@@ -42,19 +51,36 @@ public void onSubscribe(Subscription subscription) {
4251

4352
@Override
4453
public void onNext(U u) {
45-
CompletionStage<D> completionStage;
54+
// for safety - no more data after we have called done/error - we should not get this BUT belts and braces
55+
if (onCompleteOrErrorRunCalled.get()) {
56+
return;
57+
}
4658
try {
47-
completionStage = mapper.apply(u);
48-
completionStage.whenComplete((d, throwable) -> {
59+
CompletionStage<D> completionStage = mapper.apply(u);
60+
offerToInFlightQ(completionStage);
61+
completionStage.whenComplete(whenNextFinished(completionStage));
62+
} catch (RuntimeException throwable) {
63+
handleThrowable(throwable);
64+
}
65+
}
66+
67+
private BiConsumer<D, Throwable> whenNextFinished(CompletionStage<D> completionStage) {
68+
return (d, throwable) -> {
69+
try {
4970
if (throwable != null) {
5071
handleThrowable(throwable);
5172
} else {
5273
downstreamSubscriber.onNext(d);
5374
}
54-
});
55-
} catch (RuntimeException throwable) {
56-
handleThrowable(throwable);
57-
}
75+
} finally {
76+
Runnable runOnCompleteOrErrorRun = onCompleteOrErrorRun.get();
77+
boolean empty = removeFromInFlightQAndCheckIfEmpty(completionStage);
78+
if (empty && runOnCompleteOrErrorRun != null) {
79+
onCompleteOrErrorRun.set(null);
80+
runOnCompleteOrErrorRun.run();
81+
}
82+
}
83+
};
5884
}
5985

6086
private void handleThrowable(Throwable throwable) {
@@ -71,12 +97,47 @@ private void handleThrowable(Throwable throwable) {
7197

7298
@Override
7399
public void onError(Throwable t) {
74-
downstreamSubscriber.onError(t);
100+
onCompleteOrError(() -> {
101+
onCompleteOrErrorRunCalled.set(true);
102+
downstreamSubscriber.onError(t);
103+
});
75104
}
76105

77106
@Override
78107
public void onComplete() {
79-
downstreamSubscriber.onComplete();
108+
onCompleteOrError(() -> {
109+
onCompleteOrErrorRunCalled.set(true);
110+
downstreamSubscriber.onComplete();
111+
});
112+
}
113+
114+
private void onCompleteOrError(Runnable doneCodeToRun) {
115+
if (inFlightQIsEmpty()) {
116+
// run right now
117+
doneCodeToRun.run();
118+
} else {
119+
onCompleteOrErrorRun.set(doneCodeToRun);
120+
}
121+
}
122+
123+
private void offerToInFlightQ(CompletionStage<?> completionStage) {
124+
synchronized (inFlightDataQ) {
125+
inFlightDataQ.offer(completionStage);
126+
}
127+
}
128+
129+
private boolean removeFromInFlightQAndCheckIfEmpty(CompletionStage<?> completionStage) {
130+
// uncontested locks in java are cheap - we dont expect much contention here
131+
synchronized (inFlightDataQ) {
132+
inFlightDataQ.remove(completionStage);
133+
return inFlightDataQ.isEmpty();
134+
}
135+
}
136+
137+
private boolean inFlightQIsEmpty() {
138+
synchronized (inFlightDataQ) {
139+
return inFlightDataQ.isEmpty();
140+
}
80141
}
81142
});
82143
}

src/test/groovy/graphql/execution/pubsub/CapturingSubscriber.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,28 @@ public class CapturingSubscriber<T> implements Subscriber<T> {
1919

2020
@Override
2121
public void onSubscribe(Subscription subscription) {
22+
System.out.println("onSubscribe called at " + System.nanoTime());
2223
this.subscription = subscription;
2324
subscription.request(1);
2425
}
2526

2627
@Override
2728
public void onNext(T t) {
29+
System.out.println("onNext called at " + System.nanoTime());
2830
events.add(t);
2931
subscription.request(1);
3032
}
3133

3234
@Override
3335
public void onError(Throwable t) {
36+
System.out.println("onError called at " + System.nanoTime());
3437
this.throwable = t;
3538
done.set(true);
3639
}
3740

3841
@Override
3942
public void onComplete() {
43+
System.out.println("onComplete called at " + System.nanoTime());
4044
done.set(true);
4145
}
4246

src/test/groovy/graphql/execution/reactive/CompletionStageMappingPublisherTest.groovy

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package graphql.execution.reactive
22

33
import graphql.execution.pubsub.CapturingSubscriber
44
import io.reactivex.Flowable
5+
import org.awaitility.Awaitility
56
import org.reactivestreams.Publisher
67
import spock.lang.Specification
78

@@ -119,4 +120,38 @@ class CompletionStageMappingPublisherTest extends Specification {
119120

120121
}
121122

123+
124+
def "asynchronous mapping works with completion"() {
125+
126+
when:
127+
Publisher<Integer> rxIntegers = Flowable.range(0, 10)
128+
129+
Function<Integer, CompletionStage<String>> mapper = mapperThatDelaysFor(100)
130+
Publisher<String> rxStrings = new CompletionStageMappingPublisher<String, Integer>(rxIntegers, mapper)
131+
132+
def capturingSubscriber = new CapturingSubscriber<>()
133+
rxStrings.subscribe(capturingSubscriber)
134+
135+
then:
136+
137+
Awaitility.await().untilTrue(capturingSubscriber.isDone())
138+
139+
capturingSubscriber.events.size() == 10
140+
capturingSubscriber.events[0] instanceof String
141+
capturingSubscriber.events[0] == "0"
142+
}
143+
144+
Function<Integer, CompletionStage<String>> mapperThatDelaysFor(int delay) {
145+
def mapper = new Function<Integer, CompletionStage<String>>() {
146+
@Override
147+
CompletionStage<String> apply(Integer integer) {
148+
return CompletableFuture.supplyAsync({
149+
Thread.sleep(delay)
150+
return String.valueOf(integer)
151+
})
152+
}
153+
}
154+
mapper
155+
}
156+
122157
}

0 commit comments

Comments
 (0)