Skip to content

Commit 24a8371

Browse files
author
akarnokd
committed
2.x: operator tests: sample, scan and sequenceEqual + their bugfixes
1 parent 5377cad commit 24a8371

File tree

5 files changed

+791
-7
lines changed

5 files changed

+791
-7
lines changed

src/main/java/io/reactivex/internal/operators/OperatorScanSeed.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.internal.queue.SpscArrayQueue;
2121
import io.reactivex.internal.subscribers.*;
2222
import io.reactivex.internal.subscriptions.*;
23+
import io.reactivex.plugins.RxJavaPlugins;
2324

2425
public final class OperatorScanSeed<T, R> implements Operator<R, T> {
2526
final BiFunction<R, ? super T, R> accumulator;
@@ -104,13 +105,20 @@ public void onNext(T t) {
104105

105106
@Override
106107
public void onError(Throwable t) {
108+
if (done) {
109+
RxJavaPlugins.onError(t);
110+
return;
111+
}
107112
error = t;
108113
done = true;
109114
drain(false);
110115
}
111116

112117
@Override
113118
public void onComplete() {
119+
if (done) {
120+
return;
121+
}
114122
done = true;
115123
drain(false);
116124
}
@@ -119,6 +127,7 @@ public void onComplete() {
119127
public void request(long n) {
120128
requested(n);
121129
s.request(n);
130+
drain(false);
122131
}
123132

124133
@Override

src/main/java/io/reactivex/internal/operators/PublisherSequenceEqual.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.reactivex.internal.operators;
1515

16-
import java.util.*;
16+
import java.util.Queue;
1717
import java.util.concurrent.atomic.*;
1818
import java.util.function.BiPredicate;
1919

@@ -40,8 +40,8 @@ public PublisherSequenceEqual(Publisher<? extends T> first, Publisher<? extends
4040

4141
@Override
4242
public void subscribe(Subscriber<? super Boolean> s) {
43-
// TODO Auto-generated method stub
44-
43+
EqualCoordinator<T> ec = new EqualCoordinator<>(s, bufferSize, first, second, comparer);
44+
ec.subscribe();
4545
}
4646

4747
static final class EqualCoordinator<T> extends AtomicInteger implements Subscription {
@@ -79,6 +79,12 @@ public EqualCoordinator(Subscriber<? super Boolean> actual, int bufferSize,
7979
boolean setSubscription(Subscription s, int index) {
8080
return resources.setResource(index, s);
8181
}
82+
83+
void subscribe() {
84+
EqualSubscriber<T>[] as = subscribers;
85+
first.subscribe(as[0]);
86+
second.subscribe(as[1]);
87+
}
8288

8389
@Override
8490
public void request(long n) {
@@ -117,14 +123,12 @@ void drain() {
117123
return;
118124
}
119125

120-
121-
122126
int missed = 1;
123127
EqualSubscriber<T>[] as = subscribers;
124128

125129
final EqualSubscriber<T> s1 = as[0];
126130
final Queue<T> q1 = s1.queue;
127-
final EqualSubscriber<T> s2 = as[0];
131+
final EqualSubscriber<T> s2 = as[1];
128132
final Queue<T> q2 = s2.queue;
129133

130134
for (;;) {
@@ -170,7 +174,7 @@ void drain() {
170174
actual.onComplete();
171175
return;
172176
}
173-
if ((d1 && e1) != (d2 && e2)) {
177+
if ((d1 && d2) && (e1 != e2)) {
174178
cancel(q1, q2);
175179

176180
actual.onNext(false);
@@ -248,6 +252,7 @@ public EqualSubscriber(EqualCoordinator<T> parent, int index, int bufferSize) {
248252
@Override
249253
public void onSubscribe(Subscription s) {
250254
if (parent.setSubscription(s, index)) {
255+
this.s = s;
251256
s.request(bufferSize);
252257
}
253258
}
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
package io.reactivex.internal.operators;
2+
3+
import static org.mockito.Matchers.any;
4+
import static org.mockito.Mockito.*;
5+
6+
import java.util.concurrent.TimeUnit;
7+
8+
import org.junit.*;
9+
import org.mockito.InOrder;
10+
import org.reactivestreams.*;
11+
12+
import io.reactivex.*;
13+
import io.reactivex.internal.subscriptions.EmptySubscription;
14+
import io.reactivex.schedulers.TestScheduler;
15+
import io.reactivex.subjects.PublishSubject;
16+
17+
public class OperatorSampleTest {
18+
private TestScheduler scheduler;
19+
private Scheduler.Worker innerScheduler;
20+
private Subscriber<Long> observer;
21+
private Subscriber<Object> observer2;
22+
23+
@Before
24+
// due to mocking
25+
public void before() {
26+
scheduler = new TestScheduler();
27+
innerScheduler = scheduler.createWorker();
28+
observer = TestHelper.mockSubscriber();
29+
observer2 = TestHelper.mockSubscriber();
30+
}
31+
32+
@Test
33+
public void testSample() {
34+
Observable<Long> source = Observable.create(new Publisher<Long>() {
35+
@Override
36+
public void subscribe(final Subscriber<? super Long> observer1) {
37+
observer1.onSubscribe(EmptySubscription.INSTANCE);
38+
innerScheduler.schedule(new Runnable() {
39+
@Override
40+
public void run() {
41+
observer1.onNext(1L);
42+
}
43+
}, 1, TimeUnit.SECONDS);
44+
innerScheduler.schedule(new Runnable() {
45+
@Override
46+
public void run() {
47+
observer1.onNext(2L);
48+
}
49+
}, 2, TimeUnit.SECONDS);
50+
innerScheduler.schedule(new Runnable() {
51+
@Override
52+
public void run() {
53+
observer1.onComplete();
54+
}
55+
}, 3, TimeUnit.SECONDS);
56+
}
57+
});
58+
59+
Observable<Long> sampled = source.sample(400L, TimeUnit.MILLISECONDS, scheduler);
60+
sampled.subscribe(observer);
61+
62+
InOrder inOrder = inOrder(observer);
63+
64+
scheduler.advanceTimeTo(800L, TimeUnit.MILLISECONDS);
65+
verify(observer, never()).onNext(any(Long.class));
66+
verify(observer, never()).onComplete();
67+
verify(observer, never()).onError(any(Throwable.class));
68+
69+
scheduler.advanceTimeTo(1200L, TimeUnit.MILLISECONDS);
70+
inOrder.verify(observer, times(1)).onNext(1L);
71+
verify(observer, never()).onNext(2L);
72+
verify(observer, never()).onComplete();
73+
verify(observer, never()).onError(any(Throwable.class));
74+
75+
scheduler.advanceTimeTo(1600L, TimeUnit.MILLISECONDS);
76+
inOrder.verify(observer, never()).onNext(1L);
77+
verify(observer, never()).onNext(2L);
78+
verify(observer, never()).onComplete();
79+
verify(observer, never()).onError(any(Throwable.class));
80+
81+
scheduler.advanceTimeTo(2000L, TimeUnit.MILLISECONDS);
82+
inOrder.verify(observer, never()).onNext(1L);
83+
inOrder.verify(observer, times(1)).onNext(2L);
84+
verify(observer, never()).onComplete();
85+
verify(observer, never()).onError(any(Throwable.class));
86+
87+
scheduler.advanceTimeTo(3000L, TimeUnit.MILLISECONDS);
88+
inOrder.verify(observer, never()).onNext(1L);
89+
inOrder.verify(observer, never()).onNext(2L);
90+
verify(observer, times(1)).onComplete();
91+
verify(observer, never()).onError(any(Throwable.class));
92+
}
93+
94+
@Test
95+
public void sampleWithSamplerNormal() {
96+
PublishSubject<Integer> source = PublishSubject.create();
97+
PublishSubject<Integer> sampler = PublishSubject.create();
98+
99+
Observable<Integer> m = source.sample(sampler);
100+
m.subscribe(observer2);
101+
102+
source.onNext(1);
103+
source.onNext(2);
104+
sampler.onNext(1);
105+
source.onNext(3);
106+
source.onNext(4);
107+
sampler.onNext(2);
108+
source.onComplete();
109+
sampler.onNext(3);
110+
111+
InOrder inOrder = inOrder(observer2);
112+
inOrder.verify(observer2, never()).onNext(1);
113+
inOrder.verify(observer2, times(1)).onNext(2);
114+
inOrder.verify(observer2, never()).onNext(3);
115+
inOrder.verify(observer2, times(1)).onNext(4);
116+
inOrder.verify(observer2, times(1)).onComplete();
117+
verify(observer, never()).onError(any(Throwable.class));
118+
}
119+
120+
@Test
121+
public void sampleWithSamplerNoDuplicates() {
122+
PublishSubject<Integer> source = PublishSubject.create();
123+
PublishSubject<Integer> sampler = PublishSubject.create();
124+
125+
Observable<Integer> m = source.sample(sampler);
126+
m.subscribe(observer2);
127+
128+
source.onNext(1);
129+
source.onNext(2);
130+
sampler.onNext(1);
131+
sampler.onNext(1);
132+
133+
source.onNext(3);
134+
source.onNext(4);
135+
sampler.onNext(2);
136+
sampler.onNext(2);
137+
138+
source.onComplete();
139+
sampler.onNext(3);
140+
141+
InOrder inOrder = inOrder(observer2);
142+
inOrder.verify(observer2, never()).onNext(1);
143+
inOrder.verify(observer2, times(1)).onNext(2);
144+
inOrder.verify(observer2, never()).onNext(3);
145+
inOrder.verify(observer2, times(1)).onNext(4);
146+
inOrder.verify(observer2, times(1)).onComplete();
147+
verify(observer, never()).onError(any(Throwable.class));
148+
}
149+
150+
@Test
151+
public void sampleWithSamplerTerminatingEarly() {
152+
PublishSubject<Integer> source = PublishSubject.create();
153+
PublishSubject<Integer> sampler = PublishSubject.create();
154+
155+
Observable<Integer> m = source.sample(sampler);
156+
m.subscribe(observer2);
157+
158+
source.onNext(1);
159+
source.onNext(2);
160+
sampler.onNext(1);
161+
sampler.onComplete();
162+
163+
source.onNext(3);
164+
source.onNext(4);
165+
166+
InOrder inOrder = inOrder(observer2);
167+
inOrder.verify(observer2, never()).onNext(1);
168+
inOrder.verify(observer2, times(1)).onNext(2);
169+
inOrder.verify(observer2, times(1)).onComplete();
170+
inOrder.verify(observer2, never()).onNext(any());
171+
verify(observer, never()).onError(any(Throwable.class));
172+
}
173+
174+
@Test
175+
public void sampleWithSamplerEmitAndTerminate() {
176+
PublishSubject<Integer> source = PublishSubject.create();
177+
PublishSubject<Integer> sampler = PublishSubject.create();
178+
179+
Observable<Integer> m = source.sample(sampler);
180+
m.subscribe(observer2);
181+
182+
source.onNext(1);
183+
source.onNext(2);
184+
sampler.onNext(1);
185+
source.onNext(3);
186+
source.onComplete();
187+
sampler.onNext(2);
188+
sampler.onComplete();
189+
190+
InOrder inOrder = inOrder(observer2);
191+
inOrder.verify(observer2, never()).onNext(1);
192+
inOrder.verify(observer2, times(1)).onNext(2);
193+
inOrder.verify(observer2, never()).onNext(3);
194+
inOrder.verify(observer2, times(1)).onComplete();
195+
inOrder.verify(observer2, never()).onNext(any());
196+
verify(observer, never()).onError(any(Throwable.class));
197+
}
198+
199+
@Test
200+
public void sampleWithSamplerEmptySource() {
201+
PublishSubject<Integer> source = PublishSubject.create();
202+
PublishSubject<Integer> sampler = PublishSubject.create();
203+
204+
Observable<Integer> m = source.sample(sampler);
205+
m.subscribe(observer2);
206+
207+
source.onComplete();
208+
sampler.onNext(1);
209+
210+
InOrder inOrder = inOrder(observer2);
211+
inOrder.verify(observer2, times(1)).onComplete();
212+
verify(observer2, never()).onNext(any());
213+
verify(observer, never()).onError(any(Throwable.class));
214+
}
215+
216+
@Test
217+
public void sampleWithSamplerSourceThrows() {
218+
PublishSubject<Integer> source = PublishSubject.create();
219+
PublishSubject<Integer> sampler = PublishSubject.create();
220+
221+
Observable<Integer> m = source.sample(sampler);
222+
m.subscribe(observer2);
223+
224+
source.onNext(1);
225+
source.onError(new RuntimeException("Forced failure!"));
226+
sampler.onNext(1);
227+
228+
InOrder inOrder = inOrder(observer2);
229+
inOrder.verify(observer2, times(1)).onError(any(Throwable.class));
230+
verify(observer2, never()).onNext(any());
231+
verify(observer, never()).onComplete();
232+
}
233+
234+
@Test
235+
public void sampleWithSamplerThrows() {
236+
PublishSubject<Integer> source = PublishSubject.create();
237+
PublishSubject<Integer> sampler = PublishSubject.create();
238+
239+
Observable<Integer> m = source.sample(sampler);
240+
m.subscribe(observer2);
241+
242+
source.onNext(1);
243+
sampler.onNext(1);
244+
sampler.onError(new RuntimeException("Forced failure!"));
245+
246+
InOrder inOrder = inOrder(observer2);
247+
inOrder.verify(observer2, times(1)).onNext(1);
248+
inOrder.verify(observer2, times(1)).onError(any(RuntimeException.class));
249+
verify(observer, never()).onComplete();
250+
}
251+
252+
@Test
253+
public void testSampleUnsubscribe() {
254+
final Subscription s = mock(Subscription.class);
255+
Observable<Integer> o = Observable.create(
256+
new Publisher<Integer>() {
257+
@Override
258+
public void subscribe(Subscriber<? super Integer> subscriber) {
259+
subscriber.onSubscribe(s);
260+
}
261+
}
262+
);
263+
o.throttleLast(1, TimeUnit.MILLISECONDS).subscribe().dispose();
264+
verify(s).cancel();
265+
}
266+
}

0 commit comments

Comments
 (0)