11package itrx .chapter2 .transforming ;
22
3- import static org .junit .Assert .assertEquals ;
3+ import static org .junit .Assert .* ;
44
5+ import java .util .Arrays ;
56import java .util .concurrent .TimeUnit ;
67
78import org .junit .Test ;
1112import rx .observers .TestSubscriber ;
1213import rx .schedulers .Schedulers ;
1314import rx .schedulers .TestScheduler ;
14- import rx .schedulers .TimeInterval ;
15- import rx .schedulers .Timestamped ;
1615
1716public class FlatMapTest {
1817
@@ -35,30 +34,90 @@ public void onNext(Object v) {
3534 }
3635 }
3736
38- public void exampleTimestamp () {
39- Observable <Long > values = Observable .interval ( 100 , TimeUnit . MILLISECONDS );
37+ public void exampleFlatMap () {
38+ Observable <Integer > values = Observable .just ( 2 );
4039
41- values .take (3 )
42- .timestamp ()
43- .subscribe (new PrintSubscriber ("Timestamp" ));
44-
45- // Timestamp: Timestamped(timestampMillis = 1428611094943, value = 0)
46- // Timestamp: Timestamped(timestampMillis = 1428611095037, value = 1)
47- // Timestamp: Timestamped(timestampMillis = 1428611095136, value = 2)
48- // Timestamp: Completed
40+ values
41+ .flatMap (i -> Observable .range (0 ,i ))
42+ .subscribe (new PrintSubscriber ("flatMap" ));
43+
44+ // flatMap: 0
45+ // flatMap: 1
46+ // flatMap: Completed
47+ }
48+
49+ public void exampleFlatMapMultipleValues () {
50+ Observable <Integer > values = Observable .range (1 ,3 );
51+
52+ values
53+ .flatMap (i -> Observable .range (0 ,i ))
54+ .subscribe (new PrintSubscriber ("flatMap" ));
55+
56+ // flatMap: 0
57+ // flatMap: 0
58+ // flatMap: 1
59+ // flatMap: 0
60+ // flatMap: 1
61+ // flatMap: 2
62+ // flatMap: Completed
4963 }
5064
51- public void exampleTimeInteval () {
52- Observable <Long > values = Observable .interval ( 100 , TimeUnit . MILLISECONDS );
65+ public void exampleFlatMapNewType () {
66+ Observable <Integer > values = Observable .just ( 1 );
5367
54- values .take (3 )
55- .timeInterval ()
56- .subscribe (new PrintSubscriber ("TimeInterval" ));
57-
58- // TimeInterval: TimeInterval [intervalInMilliseconds=131, value=0]
59- // TimeInterval: TimeInterval [intervalInMilliseconds=75, value=1]
60- // TimeInterval: TimeInterval [intervalInMilliseconds=100, value=2]
61- // TimeInterval: Completed
68+ values
69+ .flatMap (i ->
70+ Observable .just (
71+ Character .valueOf ((char )(i +64 ))
72+ ))
73+ .subscribe (new PrintSubscriber ("flatMap" ));
74+
75+ // flatMap: A
76+ // flatMap: Completed
77+ }
78+
79+ public void exampleFlatMapFilter () {
80+ Observable <Integer > values = Observable .range (0 ,30 );
81+
82+ values
83+ .flatMap (i -> {
84+ if (0 < i && i <= 26 )
85+ return Observable .just (Character .valueOf ((char )(i +64 )));
86+ else
87+ return Observable .empty ();
88+ })
89+ .subscribe (new PrintSubscriber ("flatMap" ));
90+
91+ // flatMap: A
92+ // flatMap: B
93+ // flatMap: C
94+ // ...
95+ // flatMap: X
96+ // flatMap: Y
97+ // flatMap: Z
98+ // flatMap: Completed
99+ }
100+
101+ public void exampleFlatMapAsynchronous () {
102+ Observable .just (100 , 150 )
103+ .flatMap (i ->
104+ Observable .interval (i , TimeUnit .MILLISECONDS )
105+ .map (v -> i )
106+ )
107+ .take (10 )
108+ .subscribe (new PrintSubscriber ("flatMap" ));
109+
110+ // flatMap: 100
111+ // flatMap: 150
112+ // flatMap: 100
113+ // flatMap: 100
114+ // flatMap: 150
115+ // flatMap: 100
116+ // flatMap: 150
117+ // flatMap: 100
118+ // flatMap: 100
119+ // flatMap: 150
120+ // flatMap: Completed
62121 }
63122
64123
@@ -67,43 +126,91 @@ public void exampleTimeInteval() {
67126 //
68127
69128 @ Test
70- public void testTimestamp () {
71- TestSubscriber <Timestamped <Long >> tester = new TestSubscriber <>();
72- TestScheduler scheduler = Schedulers .test ();
129+ public void testFlatMap () {
130+ TestSubscriber <Integer > tester = new TestSubscriber <>();
73131
74- Observable <Long > values = Observable .interval ( 100 , TimeUnit . MILLISECONDS , scheduler );
132+ Observable <Integer > values = Observable .just ( 2 );
75133
76- values . take ( 3 )
77- .timestamp ( scheduler )
134+ values
135+ .flatMap ( i -> Observable . range ( 0 , i ) )
78136 .subscribe (tester );
79137
80- scheduler .advanceTimeBy (1 , TimeUnit .SECONDS );
138+ tester .assertReceivedOnNext (Arrays .asList (0 ,1 ));
139+ tester .assertTerminalEvent ();
140+ tester .assertNoErrors ();
141+ }
142+
143+ @ Test
144+ public void testFlatMapMultipleValues () {
145+ TestSubscriber <Integer > tester = new TestSubscriber <>();
146+
147+ Observable <Integer > values = Observable .range (1 ,3 );
148+
149+ values
150+ .flatMap (i -> Observable .range (0 ,i ))
151+ .subscribe (tester );
81152
82- assertEquals (tester .getOnNextEvents ().get (0 ).getTimestampMillis (), 100 );
83- assertEquals (tester .getOnNextEvents ().get (1 ).getTimestampMillis (), 200 );
84- assertEquals (tester .getOnNextEvents ().get (2 ).getTimestampMillis (), 300 );
153+ tester .assertReceivedOnNext (Arrays .asList (0 ,0 ,1 ,0 ,1 ,2 ));
85154 tester .assertTerminalEvent ();
86155 tester .assertNoErrors ();
156+
87157 }
88158
89159 @ Test
90- public void testTimeInteval () {
91- TestSubscriber <TimeInterval <Long >> tester = new TestSubscriber <>();
92- TestScheduler scheduler = Schedulers .test ();
160+ public void testFlatMapNewType () {
161+ TestSubscriber <Character > tester = new TestSubscriber <>();
93162
94- Observable <Long > values = Observable .interval ( 100 , TimeUnit . MILLISECONDS , scheduler );
163+ Observable <Integer > values = Observable .just ( 1 );
95164
96- values .take (3 )
97- .timeInterval (scheduler )
165+ values
166+ .flatMap (i ->
167+ Observable .just (
168+ Character .valueOf ((char )(i +64 ))
169+ ))
98170 .subscribe (tester );
99171
100- scheduler .advanceTimeBy (1 , TimeUnit .SECONDS );
172+ tester .assertReceivedOnNext (Arrays .asList ('A' ));
173+ tester .assertTerminalEvent ();
174+ tester .assertNoErrors ();
175+ }
176+
177+ @ Test
178+ public void testFlatMapFilter () {
179+ TestSubscriber <Character > tester = new TestSubscriber <>();
180+
181+ Observable <Integer > values = Observable .range (0 ,30 );
182+
183+ values
184+ .flatMap (i -> {
185+ if (0 < i && i <= 26 )
186+ return Observable .just (Character .valueOf ((char )(i +64 )));
187+ else
188+ return Observable .empty ();
189+ })
190+ .subscribe (tester );
101191
102- assertEquals (tester .getOnNextEvents ().get (0 ).getIntervalInMilliseconds (), 100 );
103- assertEquals (tester .getOnNextEvents ().get (1 ).getIntervalInMilliseconds (), 100 );
104- assertEquals (tester .getOnNextEvents ().get (2 ).getIntervalInMilliseconds (), 100 );
192+ assertEquals (tester .getOnNextEvents ().size (), 26 );
105193 tester .assertTerminalEvent ();
106194 tester .assertNoErrors ();
107195 }
108196
197+ @ Test
198+ public void testFlatMapAsynchronous () {
199+ TestSubscriber <Object > tester = new TestSubscriber <>();
200+ TestScheduler scheduler = Schedulers .test ();
201+
202+ Observable .just (100 , 150 )
203+ .flatMap (i ->
204+ Observable .interval (i , TimeUnit .MILLISECONDS , scheduler )
205+ .map (v -> i )
206+ )
207+ .take (10 )
208+ .distinctUntilChanged ()
209+ .subscribe (tester );
210+
211+ scheduler .advanceTimeBy (2 , TimeUnit .SECONDS );
212+
213+ assertTrue (tester .getOnNextEvents ().size () > 2 ); // 100 and 150 succeeded each other more than once
214+ tester .assertNoErrors ();
215+ }
109216}
0 commit comments