11package graphql .execution .reactive ;
22
33import graphql .Internal ;
4- import graphql . util . LockKit ;
4+ import org . jetbrains . annotations . NotNull ;
55import org .reactivestreams .Publisher ;
66import org .reactivestreams .Subscriber ;
7- import org .reactivestreams .Subscription ;
87
9- import java .util .ArrayDeque ;
10- import java .util .Queue ;
118import java .util .concurrent .CompletionStage ;
12- import java .util .concurrent .atomic .AtomicBoolean ;
13- import java .util .concurrent .atomic .AtomicReference ;
14- import java .util .function .BiConsumer ;
159import java .util .function .Function ;
1610
11+ import static graphql .Assert .assertNotNullWithNPE ;
12+
1713/**
1814 * A reactive Publisher that bridges over another Publisher of `D` and maps the results
1915 * to type `U` via a CompletionStage, handling errors in that stage
2016 *
21- * @param <D> the down stream type
22- * @param <U> the up stream type to be mapped to
17+ * @param <D> the downstream type
18+ * @param <U> the upstream type to be mapped to
2319 */
24- @ SuppressWarnings ("ReactiveStreamsPublisherImplementation" )
2520@ Internal
2621public class CompletionStageMappingPublisher <D , U > implements Publisher <D > {
27- private final Publisher <U > upstreamPublisher ;
28- private final Function <U , CompletionStage <D >> mapper ;
22+ protected final Publisher <U > upstreamPublisher ;
23+ protected final Function <U , CompletionStage <D >> mapper ;
2924
3025 /**
3126 * You need the following :
@@ -40,9 +35,16 @@ public CompletionStageMappingPublisher(Publisher<U> upstreamPublisher, Function<
4035
4136 @ Override
4237 public void subscribe (Subscriber <? super D > downstreamSubscriber ) {
43- upstreamPublisher .subscribe (new CompletionStageSubscriber (downstreamSubscriber ));
38+ assertNotNullWithNPE (downstreamSubscriber , () -> "Subscriber passed to subscribe must not be null" );
39+ upstreamPublisher .subscribe (createSubscriber (downstreamSubscriber ));
40+ }
41+
42+ @ NotNull
43+ protected Subscriber <? super U > createSubscriber (Subscriber <? super D > downstreamSubscriber ) {
44+ return new CompletionStageSubscriber <>(mapper , downstreamSubscriber );
4445 }
4546
47+
4648 /**
4749 * Get instance of an upstreamPublisher
4850 *
@@ -52,126 +54,4 @@ public Publisher<U> getUpstreamPublisher() {
5254 return upstreamPublisher ;
5355 }
5456
55- @ SuppressWarnings ("ReactiveStreamsSubscriberImplementation" )
56- @ Internal
57- public class CompletionStageSubscriber implements Subscriber <U > {
58- private final Subscriber <? super D > downstreamSubscriber ;
59- Subscription delegatingSubscription ;
60- final Queue <CompletionStage <?>> inFlightDataQ ;
61- final LockKit .ReentrantLock lock = new LockKit .ReentrantLock ();
62- final AtomicReference <Runnable > onCompleteOrErrorRun ;
63- final AtomicBoolean onCompleteOrErrorRunCalled ;
64-
65- public CompletionStageSubscriber (Subscriber <? super D > downstreamSubscriber ) {
66- this .downstreamSubscriber = downstreamSubscriber ;
67- inFlightDataQ = new ArrayDeque <>();
68- onCompleteOrErrorRun = new AtomicReference <>();
69- onCompleteOrErrorRunCalled = new AtomicBoolean (false );
70- }
71-
72-
73- @ Override
74- public void onSubscribe (Subscription subscription ) {
75- delegatingSubscription = new DelegatingSubscription (subscription );
76- downstreamSubscriber .onSubscribe (delegatingSubscription );
77- }
78-
79- @ Override
80- public void onNext (U u ) {
81- // for safety - no more data after we have called done/error - we should not get this BUT belts and braces
82- if (onCompleteOrErrorRunCalled .get ()) {
83- return ;
84- }
85- try {
86- CompletionStage <D > completionStage = mapper .apply (u );
87- offerToInFlightQ (completionStage );
88- completionStage .whenComplete (whenNextFinished (completionStage ));
89- } catch (RuntimeException throwable ) {
90- handleThrowable (throwable );
91- }
92- }
93-
94- private BiConsumer <D , Throwable > whenNextFinished (CompletionStage <D > completionStage ) {
95- return (d , throwable ) -> {
96- try {
97- if (throwable != null ) {
98- handleThrowable (throwable );
99- } else {
100- downstreamSubscriber .onNext (d );
101- }
102- } finally {
103- Runnable runOnCompleteOrErrorRun = onCompleteOrErrorRun .get ();
104- boolean empty = removeFromInFlightQAndCheckIfEmpty (completionStage );
105- if (empty && runOnCompleteOrErrorRun != null ) {
106- onCompleteOrErrorRun .set (null );
107- runOnCompleteOrErrorRun .run ();
108- }
109- }
110- };
111- }
112-
113- private void handleThrowable (Throwable throwable ) {
114- downstreamSubscriber .onError (throwable );
115- //
116- // reactive semantics say that IF an exception happens on a publisher
117- // then onError is called and no more messages flow. But since the exception happened
118- // during the mapping, the upstream publisher does not no about this.
119- // so we cancel to bring the semantics back together, that is as soon as an exception
120- // has happened, no more messages flow
121- //
122- delegatingSubscription .cancel ();
123- }
124-
125- @ Override
126- public void onError (Throwable t ) {
127- onCompleteOrError (() -> {
128- onCompleteOrErrorRunCalled .set (true );
129- downstreamSubscriber .onError (t );
130- });
131- }
132-
133- @ Override
134- public void onComplete () {
135- onCompleteOrError (() -> {
136- onCompleteOrErrorRunCalled .set (true );
137- downstreamSubscriber .onComplete ();
138- });
139- }
140-
141- /**
142- * Get instance of downstream subscriber
143- *
144- * @return {@link Subscriber}
145- */
146- public Subscriber <? super D > getDownstreamSubscriber () {
147- return downstreamSubscriber ;
148- }
149-
150- private void onCompleteOrError (Runnable doneCodeToRun ) {
151- if (inFlightQIsEmpty ()) {
152- // run right now
153- doneCodeToRun .run ();
154- } else {
155- onCompleteOrErrorRun .set (doneCodeToRun );
156- }
157- }
158-
159- private void offerToInFlightQ (CompletionStage <?> completionStage ) {
160- lock .runLocked (() ->
161- inFlightDataQ .offer (completionStage )
162- );
163- }
164-
165- private boolean removeFromInFlightQAndCheckIfEmpty (CompletionStage <?> completionStage ) {
166- // uncontested locks in java are cheap - we don't expect much contention here
167- return lock .callLocked (() -> {
168- inFlightDataQ .remove (completionStage );
169- return inFlightDataQ .isEmpty ();
170- });
171- }
172-
173- private boolean inFlightQIsEmpty () {
174- return lock .callLocked (inFlightDataQ ::isEmpty );
175- }
176- }
17757}
0 commit comments