Skip to content

Commit ca59ab3

Browse files
committed
Examples 3.2 Leaving the monad
1 parent 08d7f62 commit ca59ab3

File tree

4 files changed

+590
-0
lines changed

4 files changed

+590
-0
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package itrx.chapter3.leaving;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import java.util.ArrayList;
6+
import java.util.Arrays;
7+
import java.util.List;
8+
import java.util.concurrent.TimeUnit;
9+
10+
import org.junit.Test;
11+
12+
import rx.Observable;
13+
14+
public class FirstLastSingleTest {
15+
16+
public void exampleFirst() {
17+
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
18+
19+
long value = values
20+
.take(5)
21+
.toBlocking()
22+
.first(i -> i>2);
23+
System.out.println(value);
24+
25+
// 3
26+
}
27+
28+
public void exampleSingleError() {
29+
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
30+
31+
try {
32+
long value = values
33+
.take(5)
34+
.toBlocking()
35+
.single(i -> i>2);
36+
System.out.println(value);
37+
}
38+
catch (Exception e) {
39+
System.out.println("Caught: " + e);
40+
}
41+
42+
// Caught: java.lang.IllegalArgumentException: Sequence contains too many elements
43+
}
44+
45+
46+
//
47+
// Tests
48+
//
49+
50+
@Test
51+
public void testFirst() throws InterruptedException {
52+
List<Integer> received = new ArrayList<>();
53+
54+
Observable<Integer> values = Observable.range(0,5);
55+
56+
int value = values
57+
.take(5)
58+
.toBlocking()
59+
.first(i -> i>2);
60+
received.add(value);
61+
62+
assertEquals(received, Arrays.asList(3));
63+
}
64+
65+
@Test(expected = IllegalArgumentException.class)
66+
public void testSingleError() {
67+
Observable<Integer> values = Observable.range(0, 5);
68+
69+
long value = values
70+
.take(5)
71+
.toBlocking()
72+
.single(i -> i>2);
73+
System.out.println(value);
74+
}
75+
76+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package itrx.chapter3.leaving;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.lang.Thread.State;
6+
import java.util.ArrayList;
7+
import java.util.Arrays;
8+
import java.util.List;
9+
import java.util.concurrent.TimeUnit;
10+
11+
import org.junit.Test;
12+
13+
import rx.Observable;
14+
import rx.schedulers.Schedulers;
15+
import rx.schedulers.TestScheduler;
16+
17+
public class ForEachTest {
18+
19+
public void exampleObservableForEach() {
20+
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
21+
22+
values
23+
.take(5)
24+
.forEach(
25+
v -> System.out.println(v));
26+
System.out.println("Subscribed");
27+
28+
// Subscribed
29+
// 0
30+
// 1
31+
// 2
32+
// 3
33+
// 4
34+
}
35+
36+
public void exampleBlockingForEach() {
37+
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
38+
39+
values
40+
.take(5)
41+
.toBlocking()
42+
.forEach(
43+
v -> System.out.println(v));
44+
System.out.println("Subscribed");
45+
46+
// 0
47+
// 1
48+
// 2
49+
// 3
50+
// 4
51+
// Subscribed
52+
}
53+
54+
public void exampleBlockingForEachError() {
55+
Observable<Long> values = Observable.error(new Exception("Oops"));
56+
57+
try {
58+
values
59+
.take(5)
60+
.toBlocking()
61+
.forEach(
62+
v -> System.out.println(v));
63+
}
64+
catch (Exception e) {
65+
System.out.println("Caught: " + e.getMessage());
66+
}
67+
System.out.println("Subscribed");
68+
69+
// Caught: java.lang.Exception: Oops
70+
// Subscribed
71+
}
72+
73+
74+
//
75+
// Tests
76+
//
77+
78+
@Test
79+
public void testObservableForEach() {
80+
List<Long> received = new ArrayList<>();
81+
TestScheduler scheduler = Schedulers.test();
82+
83+
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
84+
85+
values
86+
.take(5)
87+
.forEach(
88+
i -> received.add(i));
89+
received.add(-1L); // Mark that forEach statement returned
90+
91+
assertEquals(received, Arrays.asList(-1L));
92+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
93+
assertEquals(received, Arrays.asList(-1L, 0L, 1L, 2L, 3L, 4L));
94+
}
95+
96+
@Test
97+
public void testBlockingForEach() throws InterruptedException {
98+
List<Long> received = new ArrayList<>();
99+
TestScheduler scheduler = Schedulers.test();
100+
101+
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
102+
103+
// Blocking call on new thread
104+
Thread thread = new Thread(() -> {
105+
values
106+
.take(5)
107+
.toBlocking()
108+
.forEach(
109+
i -> received.add(i));
110+
received.add(-1L); // Mark that forEach statement returned
111+
112+
});
113+
thread.start();
114+
115+
assertEquals(received, Arrays.asList());
116+
// Wait for blocking call to block before producing values
117+
while (thread.getState() != State.WAITING)
118+
Thread.sleep(1);
119+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
120+
// Wait for processing to complete
121+
thread.join(50);
122+
assertEquals(received, Arrays.asList(0L, 1L, 2L, 3L, 4L, -1L));
123+
}
124+
125+
@Test
126+
public void testBlockingForEachError() {
127+
boolean caughtException = false;
128+
129+
Observable<Long> values = Observable.error(new Exception("Oops"));
130+
131+
try {
132+
values
133+
.take(5)
134+
.toBlocking()
135+
.forEach(
136+
v -> {});
137+
}
138+
catch (Exception e) {
139+
caughtException = true;
140+
}
141+
142+
assertTrue(caughtException);
143+
}
144+
145+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package itrx.chapter3.leaving;
2+
3+
4+
import static org.junit.Assert.*;
5+
6+
import java.util.concurrent.ExecutionException;
7+
import java.util.concurrent.Future;
8+
import java.util.concurrent.TimeUnit;
9+
10+
import org.junit.Test;
11+
12+
import rx.Observable;
13+
14+
public class FutureTest {
15+
16+
public void exampleFuture() {
17+
Observable<Long> values = Observable.timer(500, TimeUnit.MILLISECONDS);
18+
19+
values.subscribe(v -> System.out.println("Emitted: " + v));
20+
21+
Future<Long> future = values.toBlocking().toFuture();
22+
try {
23+
System.out.println(future.get());
24+
} catch (InterruptedException e) {
25+
e.printStackTrace();
26+
} catch (ExecutionException e) {
27+
e.printStackTrace();
28+
}
29+
30+
// Emitted: 0
31+
// 0
32+
}
33+
34+
35+
//
36+
//
37+
38+
@Test
39+
public void testFuture() throws InterruptedException, ExecutionException {
40+
Observable<Integer> sequence = Observable.just(0);
41+
Future<Integer> future = sequence.toBlocking().toFuture();
42+
int value = future.get();
43+
assertEquals(0, value);
44+
}
45+
46+
}

0 commit comments

Comments
 (0)