Skip to content

Commit 71248c4

Browse files
author
akarnokd
committed
2.x: operator test elementAt, filter, finally, first
1 parent c0e6019 commit 71248c4

6 files changed

Lines changed: 535 additions & 6 deletions

File tree

src/main/java/io/reactivex/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,14 +1100,14 @@ public final Observable<T> doOnUnsubscribe(Runnable onUnsubscribe) {
11001100

11011101
public final Observable<T> elementAt(long index) {
11021102
if (index < 0) {
1103-
throw new IllegalArgumentException("index >= 0 required but it was " + index);
1103+
throw new IndexOutOfBoundsException("index >= 0 required but it was " + index);
11041104
}
11051105
return lift(new OperatorElementAt<>(index, null));
11061106
}
11071107

11081108
public final Observable<T> elementAt(long index, T defaultValue) {
11091109
if (index < 0) {
1110-
throw new IllegalArgumentException("index >= 0 required but it was " + index);
1110+
throw new IndexOutOfBoundsException("index >= 0 required but it was " + index);
11111111
}
11121112
Objects.requireNonNull(defaultValue);
11131113
return lift(new OperatorElementAt<>(index, defaultValue));

src/main/java/io/reactivex/internal/operators/OperatorElementAt.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313

1414
package io.reactivex.internal.operators;
1515

16-
import java.util.NoSuchElementException;
17-
1816
import org.reactivestreams.*;
1917

2018
import io.reactivex.Observable.Operator;
@@ -87,11 +85,11 @@ public void onError(Throwable t) {
8785

8886
@Override
8987
public void onComplete() {
90-
if (index != count && !done) {
88+
if (index <= count && !done) {
9189
done = true;
9290
T v = defaultValue;
9391
if (v == null) {
94-
actual.onError(new NoSuchElementException());
92+
actual.onError(new IndexOutOfBoundsException());
9593
} else {
9694
actual.onNext(v);
9795
actual.onComplete();
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators;
15+
16+
import static org.junit.Assert.assertEquals;
17+
18+
import org.junit.Test;
19+
20+
import io.reactivex.Observable;
21+
22+
public class OperatorElementAtTest {
23+
24+
@Test
25+
public void testElementAt() {
26+
assertEquals(2, Observable.fromArray(1, 2).elementAt(1).toBlocking().single()
27+
.intValue());
28+
}
29+
30+
@Test(expected = IndexOutOfBoundsException.class)
31+
public void testElementAtWithMinusIndex() {
32+
Observable.fromArray(1, 2).elementAt(-1);
33+
}
34+
35+
@Test(expected = IndexOutOfBoundsException.class)
36+
public void testElementAtWithIndexOutOfBounds() {
37+
Observable.fromArray(1, 2).elementAt(2).toBlocking().single();
38+
}
39+
40+
@Test
41+
public void testElementAtOrDefault() {
42+
assertEquals(2, Observable.fromArray(1, 2).elementAt(1, 0).toBlocking()
43+
.single().intValue());
44+
}
45+
46+
@Test
47+
public void testElementAtOrDefaultWithIndexOutOfBounds() {
48+
assertEquals(0, Observable.fromArray(1, 2).elementAt(2, 0).toBlocking()
49+
.single().intValue());
50+
}
51+
52+
@Test(expected = IndexOutOfBoundsException.class)
53+
public void testElementAtOrDefaultWithMinusIndex() {
54+
Observable.fromArray(1, 2).elementAt(-1, 0);
55+
}
56+
}
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators;
15+
16+
import static org.mockito.Matchers.any;
17+
import static org.mockito.Mockito.*;
18+
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.function.Predicate;
21+
22+
import org.junit.Test;
23+
import org.mockito.Mockito;
24+
import org.reactivestreams.Subscriber;
25+
26+
import io.reactivex.*;
27+
import io.reactivex.subscribers.TestSubscriber;
28+
29+
public class OperatorFilterTest {
30+
31+
@Test
32+
public void testFilter() {
33+
Observable<String> w = Observable.just("one", "two", "three");
34+
Observable<String> observable = w.filter(new Predicate<String>() {
35+
36+
@Override
37+
public boolean test(String t1) {
38+
return t1.equals("two");
39+
}
40+
});
41+
42+
Subscriber<String> observer = TestHelper.mockSubscriber();
43+
44+
observable.subscribe(observer);
45+
46+
verify(observer, Mockito.never()).onNext("one");
47+
verify(observer, times(1)).onNext("two");
48+
verify(observer, Mockito.never()).onNext("three");
49+
verify(observer, Mockito.never()).onError(any(Throwable.class));
50+
verify(observer, times(1)).onComplete();
51+
}
52+
53+
/**
54+
* Make sure we are adjusting subscriber.request() for filtered items
55+
*/
56+
@Test(timeout = 500)
57+
public void testWithBackpressure() throws InterruptedException {
58+
Observable<String> w = Observable.just("one", "two", "three");
59+
Observable<String> o = w.filter(new Predicate<String>() {
60+
61+
@Override
62+
public boolean test(String t1) {
63+
return t1.equals("three");
64+
}
65+
});
66+
67+
final CountDownLatch latch = new CountDownLatch(1);
68+
TestSubscriber<String> ts = new TestSubscriber<String>() {
69+
70+
@Override
71+
public void onComplete() {
72+
System.out.println("onCompleted");
73+
latch.countDown();
74+
}
75+
76+
@Override
77+
public void onError(Throwable e) {
78+
e.printStackTrace();
79+
latch.countDown();
80+
}
81+
82+
@Override
83+
public void onNext(String t) {
84+
System.out.println("Received: " + t);
85+
// request more each time we receive
86+
request(1);
87+
}
88+
89+
};
90+
// this means it will only request "one" and "two", expecting to receive them before requesting more
91+
ts.request(2);
92+
93+
o.subscribe(ts);
94+
95+
// this will wait forever unless OperatorTake handles the request(n) on filtered items
96+
latch.await();
97+
}
98+
99+
/**
100+
* Make sure we are adjusting subscriber.request() for filtered items
101+
*/
102+
@Test(timeout = 500000)
103+
public void testWithBackpressure2() throws InterruptedException {
104+
Observable<Integer> w = Observable.range(1, Observable.bufferSize() * 2);
105+
Observable<Integer> o = w.filter(new Predicate<Integer>() {
106+
107+
@Override
108+
public boolean test(Integer t1) {
109+
return t1 > 100;
110+
}
111+
});
112+
113+
final CountDownLatch latch = new CountDownLatch(1);
114+
final TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
115+
116+
@Override
117+
public void onComplete() {
118+
System.out.println("onCompleted");
119+
latch.countDown();
120+
}
121+
122+
@Override
123+
public void onError(Throwable e) {
124+
e.printStackTrace();
125+
latch.countDown();
126+
}
127+
128+
@Override
129+
public void onNext(Integer t) {
130+
System.out.println("Received: " + t);
131+
// request more each time we receive
132+
request(1);
133+
}
134+
};
135+
// this means it will only request 1 item and expect to receive more
136+
ts.request(1);
137+
138+
o.subscribe(ts);
139+
140+
// this will wait forever unless OperatorTake handles the request(n) on filtered items
141+
latch.await();
142+
}
143+
144+
// FIXME subscribers are not allowed to throw
145+
// @Test
146+
// public void testFatalError() {
147+
// try {
148+
// Observable.just(1)
149+
// .filter(new Predicate<Integer>() {
150+
// @Override
151+
// public boolean test(Integer t) {
152+
// return true;
153+
// }
154+
// })
155+
// .first()
156+
// .subscribe(new Consumer<Integer>() {
157+
// @Override
158+
// public void accept(Integer t) {
159+
// throw new TestException();
160+
// }
161+
// });
162+
// Assert.fail("No exception was thrown");
163+
// } catch (OnErrorNotImplementedException ex) {
164+
// if (!(ex.getCause() instanceof TestException)) {
165+
// Assert.fail("Failed to report the original exception, instead: " + ex.getCause());
166+
// }
167+
// }
168+
// }
169+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators;
15+
16+
import static org.mockito.Mockito.*;
17+
18+
import org.junit.*;
19+
import org.reactivestreams.Subscriber;
20+
21+
import io.reactivex.*;
22+
23+
public class OperatorFinallyTest {
24+
25+
private Runnable aAction0;
26+
private Subscriber<String> observer;
27+
28+
// mocking has to be unchecked, unfortunately
29+
@Before
30+
public void before() {
31+
aAction0 = mock(Runnable.class);
32+
observer = TestHelper.mockSubscriber();
33+
}
34+
35+
private void checkActionCalled(Observable<String> input) {
36+
input.finallyDo(aAction0).subscribe(observer);
37+
verify(aAction0, times(1)).run();
38+
}
39+
40+
@Test
41+
public void testFinallyCalledOnComplete() {
42+
checkActionCalled(Observable.fromArray("1", "2", "3"));
43+
}
44+
45+
@Test
46+
public void testFinallyCalledOnError() {
47+
checkActionCalled(Observable.<String> error(new RuntimeException("expected")));
48+
}
49+
}

0 commit comments

Comments
 (0)