Skip to content

Commit ad19801

Browse files
committed
4.1 Added unsubscribeOn examples
1 parent 56351eb commit ad19801

File tree

1 file changed

+71
-0
lines changed

1 file changed

+71
-0
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package itrx.chapter4.scheduling;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.Arrays;
6+
7+
import org.junit.Test;
8+
9+
import rx.Observable;
10+
import rx.schedulers.Schedulers;
11+
12+
public class UnsubscribeOnTest {
13+
14+
public static void example() {
15+
Observable<Object> source = Observable.using(
16+
() -> {
17+
System.out.println("Subscribed on " + Thread.currentThread().getId());
18+
return Arrays.asList(1,2);
19+
},
20+
(ints) -> {
21+
System.out.println("Producing on " + Thread.currentThread().getId());
22+
return Observable.from(ints);
23+
},
24+
(ints) -> {
25+
System.out.println("Unubscribed on " + Thread.currentThread().getId());
26+
}
27+
);
28+
29+
source
30+
.unsubscribeOn(Schedulers.newThread())
31+
.subscribe(System.out::println);
32+
33+
// Subscribed on 1
34+
// Producing on 1
35+
// 1
36+
// 2
37+
// Unubscribed on 11
38+
}
39+
40+
41+
//
42+
// Test
43+
//
44+
45+
@Test
46+
public void test() {
47+
long[] threads = {0, 0, 0};
48+
49+
Observable<Object> source = Observable.using(
50+
() -> {
51+
threads[0] = Thread.currentThread().getId();
52+
return Arrays.asList(1,2);
53+
},
54+
(ints) -> {
55+
threads[1] = Thread.currentThread().getId();
56+
return Observable.from(ints);
57+
},
58+
(ints) -> {
59+
threads[2] = Thread.currentThread().getId();
60+
}
61+
);
62+
63+
source
64+
.unsubscribeOn(Schedulers.newThread())
65+
.subscribe();
66+
67+
assertEquals(threads[0], threads[1]);
68+
assertNotEquals(threads[0], threads[2]);
69+
}
70+
71+
}

0 commit comments

Comments
 (0)