1818import io .reactivex .*;
1919import io .reactivex .exceptions .*;
2020import io .reactivex .functions .Function ;
21+ import io .reactivex .internal .functions .ObjectHelper ;
2122import io .reactivex .internal .subscriptions .SubscriptionArbiter ;
2223import io .reactivex .plugins .RxJavaPlugins ;
2324
@@ -35,41 +36,47 @@ public FlowableOnErrorNext(Flowable<T> source,
3536 @ Override
3637 protected void subscribeActual (Subscriber <? super T > s ) {
3738 OnErrorNextSubscriber <T > parent = new OnErrorNextSubscriber <T >(s , nextSupplier , allowFatal );
38- s .onSubscribe (parent . arbiter );
39+ s .onSubscribe (parent );
3940 source .subscribe (parent );
4041 }
4142
42- static final class OnErrorNextSubscriber <T > implements FlowableSubscriber <T > {
43+ static final class OnErrorNextSubscriber <T >
44+ extends SubscriptionArbiter
45+ implements FlowableSubscriber <T > {
46+ private static final long serialVersionUID = 4063763155303814625L ;
47+
4348 final Subscriber <? super T > actual ;
49+
4450 final Function <? super Throwable , ? extends Publisher <? extends T >> nextSupplier ;
51+
4552 final boolean allowFatal ;
46- final SubscriptionArbiter arbiter ;
4753
4854 boolean once ;
4955
5056 boolean done ;
5157
58+ long produced ;
59+
5260 OnErrorNextSubscriber (Subscriber <? super T > actual , Function <? super Throwable , ? extends Publisher <? extends T >> nextSupplier , boolean allowFatal ) {
5361 this .actual = actual ;
5462 this .nextSupplier = nextSupplier ;
5563 this .allowFatal = allowFatal ;
56- this .arbiter = new SubscriptionArbiter ();
5764 }
5865
5966 @ Override
6067 public void onSubscribe (Subscription s ) {
61- arbiter . setSubscription (s );
68+ setSubscription (s );
6269 }
6370
6471 @ Override
6572 public void onNext (T t ) {
6673 if (done ) {
6774 return ;
6875 }
69- actual .onNext (t );
7076 if (!once ) {
71- arbiter . produced ( 1L ) ;
77+ produced ++ ;
7278 }
79+ actual .onNext (t );
7380 }
7481
7582 @ Override
@@ -92,18 +99,16 @@ public void onError(Throwable t) {
9299 Publisher <? extends T > p ;
93100
94101 try {
95- p = nextSupplier .apply (t );
102+ p = ObjectHelper . requireNonNull ( nextSupplier .apply (t ), "The nextSupplier returned a null Publisher" );
96103 } catch (Throwable e ) {
97104 Exceptions .throwIfFatal (e );
98105 actual .onError (new CompositeException (t , e ));
99106 return ;
100107 }
101108
102- if (p == null ) {
103- NullPointerException npe = new NullPointerException ("Publisher is null" );
104- npe .initCause (t );
105- actual .onError (npe );
106- return ;
109+ long mainProduced = produced ;
110+ if (mainProduced != 0L ) {
111+ produced (mainProduced );
107112 }
108113
109114 p .subscribe (this );
0 commit comments