Skip to content

Commit 916232b

Browse files
committed
This has a fix to buffer published events to keep them in order - tweaks on tests and how we handle things
1 parent 23c3689 commit 916232b

2 files changed

Lines changed: 22 additions & 22 deletions

File tree

src/main/java/graphql/execution/reactive/CompletionStageMappingPublisher.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,14 @@ public class CompletionStageSubscriber implements Subscriber<U> {
6666
final LockKit.ReentrantLock lock = new LockKit.ReentrantLock();
6767
final AtomicReference<Runnable> onCompleteOrErrorRun;
6868
final AtomicBoolean onCompleteOrErrorRunCalled;
69-
final AtomicBoolean upstreamCancelled;
69+
final AtomicBoolean subscriptionCancelled;
7070

7171
public CompletionStageSubscriber(Subscriber<? super D> downstreamSubscriber) {
7272
this.downstreamSubscriber = downstreamSubscriber;
7373
inFlightDataQ = new ArrayDeque<>();
7474
onCompleteOrErrorRun = new AtomicReference<>();
7575
onCompleteOrErrorRunCalled = new AtomicBoolean(false);
76-
upstreamCancelled = new AtomicBoolean(false);
76+
subscriptionCancelled = new AtomicBoolean(false);
7777
}
7878

7979

@@ -123,6 +123,9 @@ private BiConsumer<D, Throwable> whenNextFinished() {
123123
}
124124

125125
private void emptyInFlightQueueIfWeCan() {
126+
if (subscriptionCancelled.get()) {
127+
return;
128+
}
126129
// done inside a memory lock, so we cant offer new CFs to the queue
127130
// until we have processed any completed ones from the start of
128131
// the queue.
@@ -162,7 +165,7 @@ private void emptyInFlightQueueIfWeCan() {
162165

163166
private void handleThrowable(Throwable throwable) {
164167
// only do this once
165-
if (upstreamCancelled.compareAndSet(false,true)) {
168+
if (subscriptionCancelled.compareAndSet(false,true)) {
166169
downstreamSubscriber.onError(throwable);
167170
//
168171
// Reactive semantics say that IF an exception happens on a publisher,

src/test/groovy/graphql/execution/reactive/CompletionStageMappingPublisherTest.groovy

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import spock.lang.Specification
1010
import java.util.concurrent.CompletableFuture
1111
import java.util.concurrent.CompletionException
1212
import java.util.concurrent.CompletionStage
13-
import java.util.concurrent.CountDownLatch
14-
import java.util.concurrent.TimeUnit
1513
import java.util.function.Function
1614

1715
class CompletionStageMappingPublisherTest extends Specification {
@@ -132,30 +130,29 @@ class CompletionStageMappingPublisherTest extends Specification {
132130
when:
133131
Publisher<Integer> rxIntegers = Flowable.range(0, 10)
134132

135-
CountDownLatch latch = new CountDownLatch(5)
136-
CountDownLatch exceptionLatch = new CountDownLatch(1)
133+
List<Runnable> completions = []
134+
137135
def mapper = new Function<Integer, CompletionStage<String>>() {
138136
@Override
139137
CompletionStage<String> apply(Integer integer) {
140138

141-
if (integer == 5) {
142-
return CompletableFuture.supplyAsync {
143-
exceptionLatch.await(10_000, TimeUnit.SECONDS)
144-
sleep(100)
145-
throw new RuntimeException("Bang")
146-
}
139+
if (integer < 5) {
140+
def cf = new CompletableFuture<String>()
141+
completions.add({ cf.complete(String.valueOf(integer)) })
142+
return cf
143+
} else if (integer == 5) {
144+
def cf = new CompletableFuture<String>()
145+
completions.add({ cf.completeExceptionally(new RuntimeException("Bang")) })
146+
return cf
147147
} else if (integer == 6) {
148-
return CompletableFuture.supplyAsync {
149-
latch.countDown()
150-
exceptionLatch.countDown() // number 5 is now alive
151-
return String.valueOf(integer)
148+
// complete 5,4,3,2,1 so we have to queue
149+
def reverse = completions.reverse()
150+
for (Runnable r : (reverse)) {
151+
r.run()
152152
}
153+
return CompletableFuture.completedFuture(String.valueOf(integer))
153154
} else {
154-
return CompletableFuture.supplyAsync {
155-
latch.countDown()
156-
latch.await(10_000, TimeUnit.SECONDS)
157-
return String.valueOf(integer)
158-
}
155+
return CompletableFuture.completedFuture(String.valueOf(integer))
159156
}
160157
}
161158
}

0 commit comments

Comments
 (0)