44import org .reactivestreams .Subscriber ;
55import org .reactivestreams .Subscription ;
66
7+ import java .util .ArrayDeque ;
8+ import java .util .Queue ;
79import java .util .concurrent .CompletionStage ;
10+ import java .util .concurrent .atomic .AtomicBoolean ;
11+ import java .util .concurrent .atomic .AtomicReference ;
12+ import java .util .function .BiConsumer ;
813import java .util .function .Function ;
914
1015/**
@@ -33,6 +38,10 @@ public CompletionStageMappingPublisher(Publisher<U> upstreamPublisher, Function<
3338 public void subscribe (Subscriber <? super D > downstreamSubscriber ) {
3439 upstreamPublisher .subscribe (new Subscriber <U >() {
3540 Subscription delegatingSubscription ;
41+ final Queue <CompletionStage <?>> inFlightDataQ = new ArrayDeque <>();
42+ final AtomicReference <Runnable > onCompleteOrErrorRun = new AtomicReference <>();
43+ final AtomicBoolean onCompleteOrErrorRunCalled = new AtomicBoolean (false );
44+
3645
3746 @ Override
3847 public void onSubscribe (Subscription subscription ) {
@@ -42,19 +51,36 @@ public void onSubscribe(Subscription subscription) {
4251
4352 @ Override
4453 public void onNext (U u ) {
45- CompletionStage <D > completionStage ;
54+ // for safety - no more data after we have called done/error - we should not get this BUT belts and braces
55+ if (onCompleteOrErrorRunCalled .get ()) {
56+ return ;
57+ }
4658 try {
47- completionStage = mapper .apply (u );
48- completionStage .whenComplete ((d , throwable ) -> {
59+ CompletionStage <D > completionStage = mapper .apply (u );
60+ offerToInFlightQ (completionStage );
61+ completionStage .whenComplete (whenNextFinished (completionStage ));
62+ } catch (RuntimeException throwable ) {
63+ handleThrowable (throwable );
64+ }
65+ }
66+
67+ private BiConsumer <D , Throwable > whenNextFinished (CompletionStage <D > completionStage ) {
68+ return (d , throwable ) -> {
69+ try {
4970 if (throwable != null ) {
5071 handleThrowable (throwable );
5172 } else {
5273 downstreamSubscriber .onNext (d );
5374 }
54- });
55- } catch (RuntimeException throwable ) {
56- handleThrowable (throwable );
57- }
75+ } finally {
76+ Runnable runOnCompleteOrErrorRun = onCompleteOrErrorRun .get ();
77+ boolean empty = removeFromInFlightQAndCheckIfEmpty (completionStage );
78+ if (empty && runOnCompleteOrErrorRun != null ) {
79+ onCompleteOrErrorRun .set (null );
80+ runOnCompleteOrErrorRun .run ();
81+ }
82+ }
83+ };
5884 }
5985
6086 private void handleThrowable (Throwable throwable ) {
@@ -71,12 +97,47 @@ private void handleThrowable(Throwable throwable) {
7197
7298 @ Override
7399 public void onError (Throwable t ) {
74- downstreamSubscriber .onError (t );
100+ onCompleteOrError (() -> {
101+ onCompleteOrErrorRunCalled .set (true );
102+ downstreamSubscriber .onError (t );
103+ });
75104 }
76105
77106 @ Override
78107 public void onComplete () {
79- downstreamSubscriber .onComplete ();
108+ onCompleteOrError (() -> {
109+ onCompleteOrErrorRunCalled .set (true );
110+ downstreamSubscriber .onComplete ();
111+ });
112+ }
113+
114+ private void onCompleteOrError (Runnable doneCodeToRun ) {
115+ if (inFlightQIsEmpty ()) {
116+ // run right now
117+ doneCodeToRun .run ();
118+ } else {
119+ onCompleteOrErrorRun .set (doneCodeToRun );
120+ }
121+ }
122+
123+ private void offerToInFlightQ (CompletionStage <?> completionStage ) {
124+ synchronized (inFlightDataQ ) {
125+ inFlightDataQ .offer (completionStage );
126+ }
127+ }
128+
129+ private boolean removeFromInFlightQAndCheckIfEmpty (CompletionStage <?> completionStage ) {
130+ // uncontested locks in java are cheap - we dont expect much contention here
131+ synchronized (inFlightDataQ ) {
132+ inFlightDataQ .remove (completionStage );
133+ return inFlightDataQ .isEmpty ();
134+ }
135+ }
136+
137+ private boolean inFlightQIsEmpty () {
138+ synchronized (inFlightDataQ ) {
139+ return inFlightDataQ .isEmpty ();
140+ }
80141 }
81142 });
82143 }
0 commit comments