Skip to content

Commit 7f865e1

Browse files
committed
2.x: operator tests unsubscribeOn, withLatestFrom, zip (partial)
+ fixed cancellation behavior of observeOn, subscribeOn and unsubscribeOn (when and what to call cancel on) + fixed infinite loop in ScheduledRunnable + fixed zip not quitting eagerly if one of the sources was shorter + added specific ZipIterable because zip-iterable tests expect it to be not prefetching any of the sources (the plain zip does prefetch) + made the fromIterable more resilient to Iterable/Iterator crashes and added null-value checks
1 parent 0a455d5 commit 7f865e1

16 files changed

+1189
-49
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2407,11 +2407,7 @@ public final <U, R> Observable<R> withLatestFrom(Publisher<? extends U> other, B
24072407
}
24082408

24092409
public final <U, R> Observable<R> zipWith(Iterable<? extends U> other, BiFunction<? super T, ? super U, ? extends R> zipper) {
2410-
return zip(this, new PublisherIterableSource<>(other), zipper);
2411-
}
2412-
2413-
public final <U, R> Observable<R> zipWith(Iterable<? extends U> other, BiFunction<? super T, ? super U, ? extends R> zipper, int bufferSize) {
2414-
return zip(this, new PublisherIterableSource<>(other), zipper, false, bufferSize);
2410+
return create(new PublisherZipIterable<>(this, other, zipper));
24152411
}
24162412

24172413
public final <U, R> Observable<R> zipWith(Publisher<? extends U> other, BiFunction<? super T, ? super U, ? extends R> zipper) {

src/main/java/io/reactivex/internal/operators/OperatorObserveOn.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,8 @@ public void request(long n) {
167167
public void cancel() {
168168
if (!cancelled) {
169169
cancelled = true;
170-
if (getAndIncrement() == 0) {
171-
s.cancel();
172-
worker.dispose();
173-
}
170+
s.cancel();
171+
worker.dispose();
174172
}
175173
}
176174

src/main/java/io/reactivex/internal/operators/OperatorUnsubscribeOn.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import io.reactivex.Observable.Operator;
2121
import io.reactivex.Scheduler;
22-
import io.reactivex.plugins.RxJavaPlugins;
22+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2323

2424
public final class OperatorUnsubscribeOn<T> implements Operator<T, T> {
2525
final Scheduler scheduler;
@@ -48,9 +48,7 @@ public UnsubscribeSubscriber(Subscriber<? super T> actual, Scheduler scheduler)
4848

4949
@Override
5050
public void onSubscribe(Subscription s) {
51-
if (this.s != null) {
52-
s.cancel();
53-
RxJavaPlugins.onError(new IllegalStateException("Subscription already set!"));
51+
if (SubscriptionHelper.validateSubscription(this.s, s)) {
5452
return;
5553
}
5654
this.s = s;
@@ -64,20 +62,12 @@ public void onNext(T t) {
6462

6563
@Override
6664
public void onError(Throwable t) {
67-
try {
68-
actual.onError(t);
69-
} finally {
70-
cancel();
71-
}
65+
actual.onError(t);
7266
}
7367

7468
@Override
7569
public void onComplete() {
76-
try {
77-
actual.onComplete();
78-
} finally {
79-
cancel();
80-
}
70+
actual.onComplete();
8171
}
8272

8373
@Override
@@ -88,7 +78,9 @@ public void request(long n) {
8878
@Override
8979
public void cancel() {
9080
if (compareAndSet(false, true)) {
91-
scheduler.scheduleDirect(s::cancel);
81+
scheduler.scheduleDirect(() -> {
82+
s.cancel();
83+
});
9284
}
9385
}
9486
}

src/main/java/io/reactivex/internal/operators/OperatorWithLatestFrom.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,6 @@ public void onNext(T t) {
121121
return;
122122
}
123123
actual.onNext(r);
124-
} else {
125-
s.request(1);
126124
}
127125
}
128126

src/main/java/io/reactivex/internal/operators/PublisherIterableSource.java

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020

2121
import org.reactivestreams.*;
2222

23-
import io.reactivex.internal.subscriptions.EmptySubscription;
23+
import io.reactivex.internal.subscriptions.*;
2424
import io.reactivex.internal.util.BackpressureHelper;
25-
import io.reactivex.plugins.RxJavaPlugins;
2625

2726
/**
2827
*
@@ -45,7 +44,14 @@ public void subscribe(Subscriber<? super T> s) {
4544
EmptySubscription.error(e, s);
4645
return;
4746
}
48-
if (!it.hasNext()) {
47+
boolean hasNext;
48+
try {
49+
hasNext = it.hasNext();
50+
} catch (Throwable e) {
51+
EmptySubscription.error(e, s);
52+
return;
53+
}
54+
if (!hasNext) {
4955
EmptySubscription.complete(s);
5056
return;
5157
}
@@ -66,8 +72,7 @@ public IteratorSourceSubscription(Iterator<? extends T> it, Subscriber<? super T
6672
}
6773
@Override
6874
public void request(long n) {
69-
if (n <= 0) {
70-
RxJavaPlugins.onError(new IllegalArgumentException("n > 0 required but it was " + n));
75+
if (SubscriptionHelper.validateRequest(n)) {
7176
return;
7277
}
7378
if (BackpressureHelper.add(this, n) != 0L) {
@@ -81,21 +86,40 @@ public void request(long n) {
8186
if (cancelled) {
8287
return;
8388
}
84-
if (!it.hasNext()) {
85-
subscriber.onComplete();
86-
return;
87-
}
88-
long e = 0;
89+
90+
long e = 0L;
8991
while (r != 0L) {
90-
T v = it.next();
92+
T v;
93+
try {
94+
v = it.next();
95+
} catch (Throwable ex) {
96+
subscriber.onError(ex);
97+
return;
98+
}
99+
100+
if (v == null) {
101+
subscriber.onError(new NullPointerException("Iterator returned a null element"));
102+
return;
103+
}
104+
91105
subscriber.onNext(v);
106+
92107
if (cancelled) {
93108
return;
94109
}
95-
if (!it.hasNext()) {
110+
111+
boolean hasNext;
112+
try {
113+
hasNext = it.hasNext();
114+
} catch (Throwable ex) {
115+
subscriber.onError(ex);
116+
return;
117+
}
118+
if (!hasNext) {
96119
subscriber.onComplete();
97120
return;
98121
}
122+
99123
r--;
100124
e--;
101125
}

src/main/java/io/reactivex/internal/operators/PublisherSubscribeOn.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,20 @@ public void onNext(T t) {
8181

8282
@Override
8383
public void onError(Throwable t) {
84-
cancel();
85-
actual.onError(t);
84+
try {
85+
actual.onError(t);
86+
} finally {
87+
worker.dispose();
88+
}
8689
}
8790

8891
@Override
8992
public void onComplete() {
90-
cancel();
91-
actual.onComplete();
93+
try {
94+
actual.onComplete();
95+
} finally {
96+
worker.dispose();
97+
}
9298
}
9399

94100
@Override

src/main/java/io/reactivex/internal/operators/PublisherZip.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.reactivex.internal.operators;
1515

16-
import java.util.*;
16+
import java.util.Queue;
1717
import java.util.concurrent.atomic.*;
1818
import java.util.function.Function;
1919

@@ -161,9 +161,9 @@ public void drain() {
161161
boolean unbounded = r == Long.MAX_VALUE;
162162
long e = 0;
163163

164-
outer:
165164
while (r != 0) {
166165
int i = 0;
166+
int emptyCount = 0;
167167
for (ZipSubscriber<T, R> z : zs) {
168168
boolean d = z.done;
169169
T v = z.queue.peek();
@@ -174,13 +174,17 @@ public void drain() {
174174
}
175175

176176
if (empty) {
177-
break outer;
177+
emptyCount++;
178+
continue;
178179
}
179180

180181
os[i] = v;
181182
i++;
182183
}
183184

185+
if (emptyCount != 0) {
186+
break;
187+
}
184188
// consume the row
185189
for (ZipSubscriber<T, R> z : zs) {
186190
z.queue.poll();

0 commit comments

Comments
 (0)