Skip to content

Commit 3aee7f3

Browse files
committed
Examples 3.1 Side effects
1 parent 33e1aff commit 3aee7f3

File tree

3 files changed

+398
-0
lines changed

3 files changed

+398
-0
lines changed
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package itrx.chapter3.sideeffects;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.Arrays;
6+
7+
import org.junit.Test;
8+
9+
import rx.Observable;
10+
import rx.Subscriber;
11+
import rx.Subscription;
12+
import rx.functions.Func0;
13+
import rx.observers.TestSubscriber;
14+
import rx.subjects.ReplaySubject;
15+
16+
public class DoOnTest {
17+
18+
private static class PrintSubscriber extends Subscriber<Object>{
19+
private final String name;
20+
public PrintSubscriber(String name) {
21+
this.name = name;
22+
}
23+
@Override
24+
public void onCompleted() {
25+
System.out.println(name + ": Completed");
26+
}
27+
@Override
28+
public void onError(Throwable e) {
29+
System.out.println(name + ": Error: " + e);
30+
}
31+
@Override
32+
public void onNext(Object v) {
33+
System.out.println(name + ": " + v);
34+
}
35+
}
36+
37+
public void exampleDoOnEach() {
38+
Observable<String> values = Observable.just("side", "effects");
39+
40+
values
41+
.doOnEach(new PrintSubscriber("Log"))
42+
.map(s -> s.toUpperCase())
43+
.subscribe(new PrintSubscriber("Process"));
44+
45+
// Log: side
46+
// Process: SIDE
47+
// Log: effects
48+
// Process: EFFECTS
49+
// Log: Completed
50+
// Process: Completed
51+
}
52+
53+
public void exampleDoOnEachEncapsulation() {
54+
Func0<Observable<String>> service = () ->
55+
Observable
56+
.just("First", "Second", "Third")
57+
.doOnEach(new PrintSubscriber("Log"));
58+
59+
service.call()
60+
.map(s -> s.toUpperCase())
61+
.filter(s -> s.length() > 5)
62+
.subscribe(new PrintSubscriber("Process"));
63+
64+
// Log: First
65+
// Log: Second
66+
// Process: SECOND
67+
// Log: Third
68+
// Log: Completed
69+
// Process: Completed
70+
}
71+
72+
public void exampleOnSubscriber() {
73+
ReplaySubject<Integer> subject = ReplaySubject.create();
74+
Observable<Integer> values = subject
75+
.doOnSubscribe(() -> System.out.println("New subscription"))
76+
.doOnUnsubscribe(() -> System.out.println("Subscription over"));
77+
78+
Subscription s1 = values.subscribe(new PrintSubscriber("1st"));
79+
subject.onNext(0);
80+
values.subscribe(new PrintSubscriber("2st"));
81+
subject.onNext(1);
82+
s1.unsubscribe();
83+
subject.onNext(2);
84+
subject.onNext(3);
85+
subject.onCompleted();
86+
87+
// New subscription
88+
// 1st: 0
89+
// New subscription
90+
// 2st: 0
91+
// 1st: 1
92+
// 2st: 1
93+
// Subscription over
94+
// 2st: 2
95+
// 2st: 3
96+
// 2st: Completed
97+
// Subscription over
98+
}
99+
100+
101+
//
102+
// Tests
103+
//
104+
105+
@Test
106+
public void testDoOnEach() {
107+
TestSubscriber<String> testerLog = new TestSubscriber<>();
108+
TestSubscriber<String> testerFinal = new TestSubscriber<>();
109+
110+
Observable<String> values = Observable.just("side", "effects");
111+
112+
values
113+
.doOnEach(testerLog)
114+
.map(s -> s.toUpperCase())
115+
.subscribe(testerFinal);
116+
117+
testerLog.assertReceivedOnNext(Arrays.asList("side", "effects"));
118+
testerLog.assertTerminalEvent();
119+
testerLog.assertNoErrors();
120+
testerFinal.assertReceivedOnNext(Arrays.asList("SIDE", "EFFECTS"));
121+
testerFinal.assertTerminalEvent();
122+
testerFinal.assertNoErrors();
123+
}
124+
125+
@Test
126+
public void testDoOnEachEncapsulation() {
127+
TestSubscriber<String> testerLog = new TestSubscriber<>();
128+
TestSubscriber<String> testerFinal = new TestSubscriber<>();
129+
130+
Func0<Observable<String>> service = () ->
131+
Observable
132+
.just("First", "Second", "Third")
133+
.doOnEach(testerLog);
134+
135+
service.call()
136+
.map(s -> s.toUpperCase())
137+
.filter(s -> s.length() > 5)
138+
.subscribe(testerFinal);
139+
140+
testerLog.assertReceivedOnNext(Arrays.asList("First", "Second", "Third"));
141+
testerLog.assertTerminalEvent();
142+
testerLog.assertNoErrors();
143+
testerFinal.assertReceivedOnNext(Arrays.asList("SECOND"));
144+
testerFinal.assertTerminalEvent();
145+
testerFinal.assertNoErrors();
146+
}
147+
148+
@Test
149+
public void testOnSubscriber() {
150+
int[] counts = {0, 0};
151+
152+
ReplaySubject<Integer> subject = ReplaySubject.create();
153+
Observable<Integer> values = subject
154+
.doOnSubscribe(() -> counts[0]++)
155+
.doOnUnsubscribe(() -> counts[1]++);
156+
157+
assertArrayEquals(counts, new int[]{0, 0});
158+
Subscription s1 = values.subscribe();
159+
assertArrayEquals(counts, new int[]{1, 0});
160+
subject.onNext(0);
161+
values.subscribe();
162+
assertArrayEquals(counts, new int[]{2, 0});
163+
subject.onNext(1);
164+
s1.unsubscribe();
165+
assertArrayEquals(counts, new int[]{2, 1});
166+
subject.onNext(2);
167+
subject.onNext(3);
168+
subject.onCompleted();
169+
assertArrayEquals(counts, new int[]{2, 2});
170+
}
171+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package itrx.chapter3.sideeffects;
2+
3+
import java.util.Arrays;
4+
5+
import org.junit.Test;
6+
7+
import rx.Observable;
8+
import rx.observers.TestSubscriber;
9+
10+
public class MutablePipelineTest {
11+
12+
private static class Data {
13+
public int id;
14+
public String name;
15+
public Data(int id, String name) {
16+
this.id = id;
17+
this.name = name;
18+
}
19+
}
20+
21+
public void example() {
22+
Observable<Data> data = Observable.just(
23+
new Data(1, "Microsoft"),
24+
new Data(2, "Netflix")
25+
);
26+
27+
data.subscribe(d -> d.name = "Garbage");
28+
data.subscribe(d -> System.out.println(d.id + ": " + d.name));
29+
30+
// 1: Garbage
31+
// 2: Garbage
32+
}
33+
34+
@Test
35+
public void test() {
36+
TestSubscriber<String> tester = new TestSubscriber<>();
37+
38+
Observable<Data> data = Observable.just(
39+
new Data(1, "Microsoft"),
40+
new Data(2, "Netflix")
41+
);
42+
43+
data.subscribe(d -> d.name = "Garbage");
44+
data.map(d -> d.name)
45+
.subscribe(tester);
46+
47+
tester.assertReceivedOnNext(Arrays.asList("Garbage", "Garbage"));
48+
tester.assertTerminalEvent();
49+
tester.assertNoErrors();
50+
}
51+
}

0 commit comments

Comments
 (0)