Skip to content

Commit c532ddd

Browse files
committed
Examples 3.5 Time-shifted sequences
1 parent 34a0d4b commit c532ddd

File tree

6 files changed

+761
-0
lines changed

6 files changed

+761
-0
lines changed
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
package itrx.chapter3.timeshifted;
2+
3+
import java.util.Arrays;
4+
import java.util.List;
5+
import java.util.concurrent.TimeUnit;
6+
7+
import org.junit.Test;
8+
9+
import rx.Observable;
10+
import rx.observers.TestSubscriber;
11+
import rx.schedulers.Schedulers;
12+
import rx.schedulers.TestScheduler;
13+
14+
public class BufferTest {
15+
16+
public void exampleByCount() {
17+
Observable.range(0, 10)
18+
.buffer(4)
19+
.subscribe(System.out::println);
20+
21+
// [0, 1, 2, 3]
22+
// [4, 5, 6, 7]
23+
// [8, 9]
24+
}
25+
26+
public void exampleByTime() {
27+
Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
28+
.buffer(250, TimeUnit.MILLISECONDS)
29+
.subscribe(System.out::println);
30+
31+
// [0, 1]
32+
// [2, 3]
33+
// [4, 5, 6]
34+
// [7, 8]
35+
// [9]
36+
}
37+
38+
public void exampleByCountAndTime() {
39+
Observable.interval(100, TimeUnit.MILLISECONDS)
40+
.take(10)
41+
.buffer(250, TimeUnit.MILLISECONDS, 2)
42+
.subscribe(System.out::println);
43+
44+
// [0, 1]
45+
// []
46+
// [2, 3]
47+
// []
48+
// [4, 5]
49+
// [6]
50+
// [7, 8]
51+
// []
52+
// [9]
53+
}
54+
55+
public void exampleWithSignal() {
56+
Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
57+
.buffer(Observable.interval(250, TimeUnit.MILLISECONDS))
58+
.subscribe(System.out::println);
59+
60+
// [0, 1]
61+
// [2, 3]
62+
// [4, 5, 6]
63+
// [7, 8]
64+
// [9]
65+
}
66+
67+
public void exampleOverlappingByCount() {
68+
Observable.range(0,10)
69+
.buffer(4, 3)
70+
.subscribe(System.out::println);
71+
72+
// [0, 1, 2, 3]
73+
// [3, 4, 5, 6]
74+
// [6, 7, 8, 9]
75+
// [9]
76+
}
77+
78+
public void exampleOverlappingByTime() {
79+
Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
80+
.buffer(350, 200, TimeUnit.MILLISECONDS)
81+
.subscribe(System.out::println);
82+
83+
// [0, 1, 2]
84+
// [2, 3, 4]
85+
// [3, 4, 5, 6]
86+
// [5, 6, 7, 8]
87+
// [7, 8, 9]
88+
// [9]
89+
}
90+
91+
public void exampleOverlappingBySignal() {
92+
Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
93+
.buffer(
94+
Observable.interval(250, TimeUnit.MILLISECONDS),
95+
i -> Observable.timer(200, TimeUnit.MILLISECONDS))
96+
.subscribe(System.out::println);
97+
98+
// [2, 3]
99+
// [4, 5]
100+
// [7, 8]
101+
// [9]
102+
}
103+
104+
105+
//
106+
// Tests
107+
//
108+
109+
@Test
110+
public void testByCount() {
111+
TestSubscriber<List<Integer>> tester = new TestSubscriber<>();
112+
113+
Observable.range(0, 10)
114+
.buffer(4)
115+
.subscribe(tester);
116+
117+
tester.assertReceivedOnNext(Arrays.asList(
118+
Arrays.asList(0, 1, 2, 3),
119+
Arrays.asList(4, 5, 6, 7),
120+
Arrays.asList(8, 9)
121+
));
122+
tester.assertTerminalEvent();
123+
tester.assertNoErrors();
124+
}
125+
126+
@Test
127+
public void testByTime() {
128+
TestSubscriber<List<Long>> tester = new TestSubscriber<>();
129+
TestScheduler scheduler = Schedulers.test();
130+
131+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler).take(10)
132+
.buffer(250, TimeUnit.MILLISECONDS, scheduler)
133+
.subscribe(tester);
134+
135+
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
136+
137+
tester.assertReceivedOnNext(Arrays.asList(
138+
Arrays.asList(0L, 1L),
139+
Arrays.asList(2L, 3L),
140+
Arrays.asList(4L, 5L, 6L),
141+
Arrays.asList(7L, 8L),
142+
Arrays.asList(9L)
143+
));
144+
tester.assertTerminalEvent();
145+
tester.assertNoErrors();
146+
}
147+
148+
@Test
149+
public void testByCountAndTime() {
150+
TestSubscriber<List<Long>> tester = new TestSubscriber<>();
151+
TestScheduler scheduler = Schedulers.test();
152+
153+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler)
154+
.take(10)
155+
.buffer(250, TimeUnit.MILLISECONDS, 2, scheduler)
156+
.subscribe(tester);
157+
158+
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
159+
160+
tester.assertReceivedOnNext(Arrays.asList(
161+
Arrays.asList(0L, 1L),
162+
Arrays.asList(),
163+
Arrays.asList(2L, 3L),
164+
Arrays.asList(),
165+
Arrays.asList(4L, 5L),
166+
Arrays.asList(6L),
167+
Arrays.asList(7L, 8L),
168+
Arrays.asList(),
169+
Arrays.asList(9L)
170+
));
171+
tester.assertTerminalEvent();
172+
tester.assertNoErrors();
173+
}
174+
175+
@Test
176+
public void testWithSignal() {
177+
TestSubscriber<List<Long>> tester = new TestSubscriber<>();
178+
TestScheduler scheduler = Schedulers.test();
179+
180+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler).take(10)
181+
.buffer(Observable.interval(250, TimeUnit.MILLISECONDS, scheduler))
182+
.subscribe(tester);
183+
184+
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
185+
186+
tester.assertReceivedOnNext(Arrays.asList(
187+
Arrays.asList(0L, 1L),
188+
Arrays.asList(2L, 3L),
189+
Arrays.asList(4L, 5L, 6L),
190+
Arrays.asList(7L, 8L),
191+
Arrays.asList(9L)
192+
));
193+
tester.assertTerminalEvent();
194+
tester.assertNoErrors();
195+
}
196+
197+
@Test
198+
public void testOverlappingByCount() {
199+
TestSubscriber<List<Integer>> tester = new TestSubscriber<>();
200+
201+
Observable.range(0,10)
202+
.buffer(4, 3)
203+
.subscribe(tester);
204+
205+
tester.assertReceivedOnNext(Arrays.asList(
206+
Arrays.asList(0, 1, 2, 3),
207+
Arrays.asList(3, 4, 5, 6),
208+
Arrays.asList(6, 7, 8, 9),
209+
Arrays.asList(9)
210+
));
211+
tester.assertTerminalEvent();
212+
tester.assertNoErrors();
213+
}
214+
215+
@Test
216+
public void testOverlappingByTime() {
217+
TestSubscriber<List<Long>> tester = new TestSubscriber<>();
218+
TestScheduler scheduler = Schedulers.test();
219+
220+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler).take(10)
221+
.buffer(350, 200, TimeUnit.MILLISECONDS, scheduler)
222+
.subscribe(tester);
223+
224+
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
225+
226+
tester.assertReceivedOnNext(Arrays.asList(
227+
Arrays.asList(0L, 1L, 2L),
228+
Arrays.asList(1L, 2L, 3L, 4L),
229+
Arrays.asList(3L, 4L, 5L, 6L),
230+
Arrays.asList(5L, 6L, 7L, 8L),
231+
Arrays.asList(7L, 8L, 9L),
232+
Arrays.asList(9L)
233+
));
234+
tester.assertTerminalEvent();
235+
tester.assertNoErrors();
236+
}
237+
238+
@Test
239+
public void testOverlappingBySignal() {
240+
TestSubscriber<List<Long>> tester = new TestSubscriber<>();
241+
TestScheduler scheduler = Schedulers.test();
242+
243+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler).take(10)
244+
.buffer(
245+
Observable.interval(250, TimeUnit.MILLISECONDS, scheduler),
246+
i -> Observable.timer(200, TimeUnit.MILLISECONDS, scheduler))
247+
.subscribe(tester);
248+
249+
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
250+
251+
tester.assertReceivedOnNext(Arrays.asList(
252+
Arrays.asList(2L, 3L),
253+
Arrays.asList(4L, 5L),
254+
Arrays.asList(7L, 8L),
255+
Arrays.asList(9L)
256+
));
257+
tester.assertTerminalEvent();
258+
tester.assertNoErrors();
259+
}
260+
261+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package itrx.chapter3.timeshifted;
2+
3+
import java.util.Arrays;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.Test;
7+
8+
import rx.Observable;
9+
import rx.observers.TestSubscriber;
10+
import rx.schedulers.Schedulers;
11+
import rx.schedulers.TestScheduler;
12+
13+
public class DebounceTest {
14+
15+
public void exampleDebounce() {
16+
Observable.concat(
17+
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
18+
Observable.interval(500, TimeUnit.MILLISECONDS).take(3),
19+
Observable.interval(100, TimeUnit.MILLISECONDS).take(3)
20+
)
21+
.scan(0, (acc, v) -> acc+1)
22+
.debounce(150, TimeUnit.MILLISECONDS)
23+
.subscribe(System.out::println);
24+
25+
// 3
26+
// 4
27+
// 5
28+
// 9
29+
}
30+
31+
public void exampleDebounceDynamic() {
32+
Observable.concat(
33+
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
34+
Observable.interval(500, TimeUnit.MILLISECONDS).take(3),
35+
Observable.interval(100, TimeUnit.MILLISECONDS).take(3)
36+
)
37+
.scan(0, (acc, v) -> acc+1)
38+
.debounce(i -> Observable.timer(i * 50, TimeUnit.MILLISECONDS))
39+
.subscribe(System.out::println);
40+
41+
// 1
42+
// 3
43+
// 4
44+
// 5
45+
// 9
46+
}
47+
48+
49+
//
50+
// Test
51+
//
52+
53+
@Test
54+
public void testDebounce() {
55+
TestScheduler scheduler = Schedulers.test();
56+
TestSubscriber<Integer> tester = new TestSubscriber<>();
57+
58+
Observable.concat(
59+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler).take(3),
60+
Observable.interval(500, TimeUnit.MILLISECONDS, scheduler).take(3),
61+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler).take(3)
62+
)
63+
.scan(0, (acc, v) -> acc+1)
64+
.debounce(150, TimeUnit.MILLISECONDS, scheduler)
65+
.subscribe(tester);
66+
67+
scheduler.advanceTimeBy(2100, TimeUnit.MILLISECONDS);
68+
tester.assertReceivedOnNext(Arrays.asList(3, 4, 5, 9));
69+
}
70+
71+
@Test
72+
public void testDebounceDynamic() {
73+
TestScheduler scheduler = Schedulers.test();
74+
TestSubscriber<Integer> tester = new TestSubscriber<>();
75+
76+
Observable.concat(
77+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler).take(3),
78+
Observable.interval(500, TimeUnit.MILLISECONDS, scheduler).take(3),
79+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler).take(3)
80+
)
81+
.scan(0, (acc, v) -> acc+1)
82+
.debounce(i -> Observable.timer(i * 50, TimeUnit.MILLISECONDS, scheduler))
83+
.subscribe(tester);
84+
85+
scheduler.advanceTimeBy(2100, TimeUnit.MILLISECONDS);
86+
tester.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 9));
87+
}
88+
89+
}

0 commit comments

Comments
 (0)