Skip to content

Commit 28a6e60

Browse files
committed
2.5 Added concatMap
1 parent b50059f commit 28a6e60

File tree

1 file changed

+59
-0
lines changed

1 file changed

+59
-0
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package itrx.chapter2.transforming;
2+
3+
import java.util.Arrays;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.Test;
7+
8+
import rx.Observable;
9+
import rx.observers.TestSubscriber;
10+
import rx.schedulers.Schedulers;
11+
import rx.schedulers.TestScheduler;
12+
13+
public class ConcatMapTest {
14+
15+
public void exampleConcatMap() {
16+
Observable.just(100, 150)
17+
.concatMap(i ->
18+
Observable.interval(i, TimeUnit.MILLISECONDS)
19+
.map(v -> i)
20+
.take(3))
21+
.subscribe(
22+
System.out::println,
23+
System.out::println,
24+
() -> System.out.println("Completed"));
25+
26+
// 100
27+
// 100
28+
// 100
29+
// 150
30+
// 150
31+
// 150
32+
// Completed
33+
}
34+
35+
36+
//
37+
// Test
38+
//
39+
40+
@Test
41+
public void testConcatMap() {
42+
TestSubscriber<Integer> tester = new TestSubscriber<>();
43+
TestScheduler scheduler = Schedulers.test();
44+
45+
Observable.just(100, 150)
46+
.concatMap(i ->
47+
Observable.interval(i, TimeUnit.MILLISECONDS, scheduler)
48+
.map(v -> i)
49+
.take(3)
50+
)
51+
.subscribe(tester);
52+
53+
scheduler.advanceTimeBy(750, TimeUnit.MILLISECONDS);
54+
tester.assertReceivedOnNext(Arrays.asList(100, 100, 100, 150, 150, 150));
55+
tester.assertTerminalEvent();
56+
tester.assertNoErrors();
57+
}
58+
59+
}

0 commit comments

Comments
 (0)