From 3764bbdc44c7770c5b5a724a1e7383c38ef929e9 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Thu, 21 May 2020 08:48:45 +1000 Subject: [PATCH] This fixes the Subscription publisher so that it does not drop values that have been received but not yet mapped by the graphql layer (#1801) --- .../CompletionStageMappingPublisher.java | 79 ++++++++++++++++--- .../execution/pubsub/CapturingSubscriber.java | 4 + ...CompletionStageMappingPublisherTest.groovy | 35 ++++++++ 3 files changed, 109 insertions(+), 9 deletions(-) diff --git a/src/main/java/graphql/execution/reactive/CompletionStageMappingPublisher.java b/src/main/java/graphql/execution/reactive/CompletionStageMappingPublisher.java index b686889795..30cfde82b8 100644 --- a/src/main/java/graphql/execution/reactive/CompletionStageMappingPublisher.java +++ b/src/main/java/graphql/execution/reactive/CompletionStageMappingPublisher.java @@ -4,7 +4,12 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import java.util.ArrayDeque; +import java.util.Queue; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Function; /** @@ -33,6 +38,10 @@ public CompletionStageMappingPublisher(Publisher upstreamPublisher, Function< public void subscribe(Subscriber downstreamSubscriber) { upstreamPublisher.subscribe(new Subscriber() { Subscription delegatingSubscription; + final Queue> inFlightDataQ = new ArrayDeque<>(); + final AtomicReference onCompleteOrErrorRun = new AtomicReference<>(); + final AtomicBoolean onCompleteOrErrorRunCalled = new AtomicBoolean(false); + @Override public void onSubscribe(Subscription subscription) { @@ -42,19 +51,36 @@ public void onSubscribe(Subscription subscription) { @Override public void onNext(U u) { - CompletionStage completionStage; + // for safety - no more data after we have called done/error - we should not get this BUT belts and braces + if (onCompleteOrErrorRunCalled.get()) { + return; + } try { - completionStage = mapper.apply(u); - completionStage.whenComplete((d, throwable) -> { + CompletionStage completionStage = mapper.apply(u); + offerToInFlightQ(completionStage); + completionStage.whenComplete(whenNextFinished(completionStage)); + } catch (RuntimeException throwable) { + handleThrowable(throwable); + } + } + + private BiConsumer whenNextFinished(CompletionStage completionStage) { + return (d, throwable) -> { + try { if (throwable != null) { handleThrowable(throwable); } else { downstreamSubscriber.onNext(d); } - }); - } catch (RuntimeException throwable) { - handleThrowable(throwable); - } + } finally { + Runnable runOnCompleteOrErrorRun = onCompleteOrErrorRun.get(); + boolean empty = removeFromInFlightQAndCheckIfEmpty(completionStage); + if (empty && runOnCompleteOrErrorRun != null) { + onCompleteOrErrorRun.set(null); + runOnCompleteOrErrorRun.run(); + } + } + }; } private void handleThrowable(Throwable throwable) { @@ -71,12 +97,47 @@ private void handleThrowable(Throwable throwable) { @Override public void onError(Throwable t) { - downstreamSubscriber.onError(t); + onCompleteOrError(() -> { + onCompleteOrErrorRunCalled.set(true); + downstreamSubscriber.onError(t); + }); } @Override public void onComplete() { - downstreamSubscriber.onComplete(); + onCompleteOrError(() -> { + onCompleteOrErrorRunCalled.set(true); + downstreamSubscriber.onComplete(); + }); + } + + private void onCompleteOrError(Runnable doneCodeToRun) { + if (inFlightQIsEmpty()) { + // run right now + doneCodeToRun.run(); + } else { + onCompleteOrErrorRun.set(doneCodeToRun); + } + } + + private void offerToInFlightQ(CompletionStage completionStage) { + synchronized (inFlightDataQ) { + inFlightDataQ.offer(completionStage); + } + } + + private boolean removeFromInFlightQAndCheckIfEmpty(CompletionStage completionStage) { + // uncontested locks in java are cheap - we dont expect much contention here + synchronized (inFlightDataQ) { + inFlightDataQ.remove(completionStage); + return inFlightDataQ.isEmpty(); + } + } + + private boolean inFlightQIsEmpty() { + synchronized (inFlightDataQ) { + return inFlightDataQ.isEmpty(); + } } }); } diff --git a/src/test/groovy/graphql/execution/pubsub/CapturingSubscriber.java b/src/test/groovy/graphql/execution/pubsub/CapturingSubscriber.java index 97e7d48315..e6c3de62d4 100644 --- a/src/test/groovy/graphql/execution/pubsub/CapturingSubscriber.java +++ b/src/test/groovy/graphql/execution/pubsub/CapturingSubscriber.java @@ -19,24 +19,28 @@ public class CapturingSubscriber implements Subscriber { @Override public void onSubscribe(Subscription subscription) { + System.out.println("onSubscribe called at " + System.nanoTime()); this.subscription = subscription; subscription.request(1); } @Override public void onNext(T t) { + System.out.println("onNext called at " + System.nanoTime()); events.add(t); subscription.request(1); } @Override public void onError(Throwable t) { + System.out.println("onError called at " + System.nanoTime()); this.throwable = t; done.set(true); } @Override public void onComplete() { + System.out.println("onComplete called at " + System.nanoTime()); done.set(true); } diff --git a/src/test/groovy/graphql/execution/reactive/CompletionStageMappingPublisherTest.groovy b/src/test/groovy/graphql/execution/reactive/CompletionStageMappingPublisherTest.groovy index cd2771b605..c2117f9a8d 100644 --- a/src/test/groovy/graphql/execution/reactive/CompletionStageMappingPublisherTest.groovy +++ b/src/test/groovy/graphql/execution/reactive/CompletionStageMappingPublisherTest.groovy @@ -2,6 +2,7 @@ package graphql.execution.reactive import graphql.execution.pubsub.CapturingSubscriber import io.reactivex.Flowable +import org.awaitility.Awaitility import org.reactivestreams.Publisher import spock.lang.Specification @@ -119,4 +120,38 @@ class CompletionStageMappingPublisherTest extends Specification { } + + def "asynchronous mapping works with completion"() { + + when: + Publisher rxIntegers = Flowable.range(0, 10) + + Function> mapper = mapperThatDelaysFor(100) + Publisher rxStrings = new CompletionStageMappingPublisher(rxIntegers, mapper) + + def capturingSubscriber = new CapturingSubscriber<>() + rxStrings.subscribe(capturingSubscriber) + + then: + + Awaitility.await().untilTrue(capturingSubscriber.isDone()) + + capturingSubscriber.events.size() == 10 + capturingSubscriber.events[0] instanceof String + capturingSubscriber.events[0] == "0" + } + + Function> mapperThatDelaysFor(int delay) { + def mapper = new Function>() { + @Override + CompletionStage apply(Integer integer) { + return CompletableFuture.supplyAsync({ + Thread.sleep(delay) + return String.valueOf(integer) + }) + } + } + mapper + } + }