Skip to content

Commit 9fbaf84

Browse files
committed
3.7 Added serialize + unsafeSubscribe examples
1 parent ad19801 commit 9fbaf84

File tree

1 file changed

+175
-0
lines changed

1 file changed

+175
-0
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package itrx.chapter3.custom;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.Arrays;
6+
7+
import org.junit.Test;
8+
9+
import rx.Observable;
10+
import rx.Subscriber;
11+
import rx.observers.TestSubscriber;
12+
13+
public class SerializeTest {
14+
15+
public void exampleSafeSubscribe() {
16+
Observable<Integer> source = Observable.create(o -> {
17+
o.onNext(1);
18+
o.onNext(2);
19+
o.onCompleted();
20+
o.onNext(3);
21+
o.onCompleted();
22+
});
23+
24+
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
25+
.subscribe(
26+
System.out::println,
27+
System.out::println,
28+
() -> System.out.println("Completed"));
29+
30+
// 1
31+
// 2
32+
// Completed
33+
// Unsubscribed
34+
}
35+
36+
public void exampleUnsafeSubscribe() {
37+
Observable<Integer> source = Observable.create(o -> {
38+
o.onNext(1);
39+
o.onNext(2);
40+
o.onCompleted();
41+
o.onNext(3);
42+
o.onCompleted();
43+
});
44+
45+
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
46+
.unsafeSubscribe(new Subscriber<Integer>() {
47+
@Override
48+
public void onCompleted() {
49+
System.out.println("Completed");
50+
}
51+
52+
@Override
53+
public void onError(Throwable e) {
54+
System.out.println(e);
55+
}
56+
57+
@Override
58+
public void onNext(Integer t) {
59+
System.out.println(t);
60+
}
61+
});
62+
63+
// 1
64+
// 2
65+
// Completed
66+
// 3
67+
// Completed
68+
}
69+
70+
public void exampleSerialize() {
71+
Observable<Integer> source = Observable.create(o -> {
72+
o.onNext(1);
73+
o.onNext(2);
74+
o.onCompleted();
75+
o.onNext(3);
76+
o.onCompleted();
77+
})
78+
.cast(Integer.class)
79+
.serialize();;
80+
81+
82+
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
83+
.unsafeSubscribe(new Subscriber<Integer>() {
84+
@Override
85+
public void onCompleted() {
86+
System.out.println("Completed");
87+
}
88+
89+
@Override
90+
public void onError(Throwable e) {
91+
System.out.println(e);
92+
}
93+
94+
@Override
95+
public void onNext(Integer t) {
96+
System.out.println(t);
97+
}
98+
});
99+
100+
// 1
101+
// 2
102+
// Completed
103+
}
104+
105+
106+
//
107+
// Tests
108+
//
109+
110+
@Test
111+
public void testSafeSubscribe() {
112+
TestSubscriber<Integer> tester = new TestSubscriber<>();
113+
114+
Observable<Integer> source = Observable.create(o -> {
115+
o.onNext(1);
116+
o.onNext(2);
117+
o.onCompleted();
118+
o.onNext(3);
119+
o.onCompleted();
120+
});
121+
122+
source.subscribe(tester);
123+
124+
tester.assertReceivedOnNext(Arrays.asList(1, 2));
125+
tester.assertTerminalEvent();
126+
tester.assertNoErrors();
127+
tester.assertUnsubscribed();
128+
}
129+
130+
@Test
131+
public void testUnsafeSubscribe() {
132+
TestSubscriber<Integer> tester = new TestSubscriber<>();
133+
134+
Observable<Integer> source = Observable.create(o -> {
135+
o.onNext(1);
136+
o.onNext(2);
137+
o.onCompleted();
138+
o.onNext(3);
139+
o.onCompleted();
140+
});
141+
142+
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
143+
.unsafeSubscribe(tester);
144+
145+
tester.assertReceivedOnNext(Arrays.asList(1, 2, 3));
146+
assertEquals(2, tester.getOnCompletedEvents().size());
147+
tester.assertNoErrors();
148+
assertFalse(tester.isUnsubscribed());
149+
}
150+
151+
@Test
152+
public void testSerialize() {
153+
TestSubscriber<Integer> tester = new TestSubscriber<>();
154+
155+
Observable<Integer> source = Observable.create(o -> {
156+
o.onNext(1);
157+
o.onNext(2);
158+
o.onCompleted();
159+
o.onNext(3);
160+
o.onCompleted();
161+
})
162+
.cast(Integer.class)
163+
.serialize();;
164+
165+
166+
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
167+
.unsafeSubscribe(tester);
168+
169+
tester.assertReceivedOnNext(Arrays.asList(1, 2));
170+
tester.assertTerminalEvent();
171+
tester.assertNoErrors();
172+
assertFalse(tester.isUnsubscribed());
173+
}
174+
175+
}

0 commit comments

Comments
 (0)