Skip to content

Commit 5ebbd9a

Browse files
committed
2.x: operator test onError and onException, relevant bugfixes
1 parent 1980024 commit 5ebbd9a

File tree

7 files changed

+1083
-17
lines changed

7 files changed

+1083
-17
lines changed

src/main/java/io/reactivex/internal/operators/OperatorOnErrorNext.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import io.reactivex.Observable.Operator;
2121
import io.reactivex.internal.subscriptions.SubscriptionArbiter;
22+
import io.reactivex.plugins.RxJavaPlugins;
2223

2324
public final class OperatorOnErrorNext<T> implements Operator<T, T> {
2425
final Function<? super Throwable, ? extends Publisher<? extends T>> nextSupplier;
@@ -44,6 +45,8 @@ static final class OnErrorNextSubscriber<T> implements Subscriber<T> {
4445

4546
boolean once;
4647

48+
boolean done;
49+
4750
public OnErrorNextSubscriber(Subscriber<? super T> actual, Function<? super Throwable, ? extends Publisher<? extends T>> nextSupplier, boolean allowFatal) {
4851
this.actual = actual;
4952
this.nextSupplier = nextSupplier;
@@ -58,6 +61,9 @@ public void onSubscribe(Subscription s) {
5861

5962
@Override
6063
public void onNext(T t) {
64+
if (done) {
65+
return;
66+
}
6167
actual.onNext(t);
6268
if (!once) {
6369
arbiter.produced(1L);
@@ -67,6 +73,10 @@ public void onNext(T t) {
6773
@Override
6874
public void onError(Throwable t) {
6975
if (once) {
76+
if (done) {
77+
RxJavaPlugins.onError(t);
78+
return;
79+
}
7080
actual.onError(t);
7181
return;
7282
}
@@ -99,6 +109,10 @@ public void onError(Throwable t) {
99109

100110
@Override
101111
public void onComplete() {
112+
if (done) {
113+
return;
114+
}
115+
done = true;
102116
once = true;
103117
actual.onComplete();
104118
}

src/main/java/io/reactivex/internal/operators/OperatorOnErrorReturn.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -138,27 +138,28 @@ public void request(long n) {
138138
if (SubscriptionHelper.validateRequest(n)) {
139139
return;
140140
}
141-
if (BackpressureHelper.add(this, n) == 0) {
142-
if (done) {
143-
for (;;) {
144-
int s = state;
145-
if (s == NO_REQUEST_HAS_VALUE) {
146-
if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) {
147-
T v = value;
148-
value = null;
149-
actual.onNext(v);
150-
actual.onComplete();
151-
return;
152-
}
153-
} else
154-
if (s == HAS_REQUEST_NO_VALUE || s == HAS_REQUEST_HAS_VALUE) {
155-
return;
156-
} else
157-
if (STATE.compareAndSet(this, s, HAS_REQUEST_NO_VALUE)) {
141+
BackpressureHelper.add(this, n);
142+
if (done) {
143+
for (;;) {
144+
int s = state;
145+
if (s == NO_REQUEST_HAS_VALUE) {
146+
if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) {
147+
T v = value;
148+
value = null;
149+
actual.onNext(v);
150+
actual.onComplete();
158151
return;
159152
}
153+
} else
154+
if (s == HAS_REQUEST_NO_VALUE || s == HAS_REQUEST_HAS_VALUE) {
155+
return;
156+
} else
157+
if (STATE.compareAndSet(this, s, HAS_REQUEST_NO_VALUE)) {
158+
return;
160159
}
161160
}
161+
} else {
162+
s.request(n);
162163
}
163164
}
164165

src/main/java/io/reactivex/internal/subscriptions/EmptySubscription.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ public void cancel() {
3434
// no-op
3535
}
3636

37+
@Override
38+
public String toString() {
39+
return "EmptySubscription";
40+
}
41+
3742
/**
3843
* Sets the empty subscription instance on the subscriber and then
3944
* calls onError with the supplied error.

0 commit comments

Comments
 (0)