Skip to content

Commit 3101649

Browse files
committed
Examples 2.4 Aggregation
1 parent 04043f3 commit 3101649

File tree

9 files changed

+1229
-0
lines changed

9 files changed

+1229
-0
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package itrx.chapter2.aggregation;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.Arrays;
6+
7+
import org.junit.Test;
8+
9+
import rx.Observable;
10+
import rx.Subscriber;
11+
import rx.observers.TestSubscriber;
12+
13+
public class CountTest {
14+
15+
private class PrintSubscriber extends Subscriber<Object>{
16+
private final String name;
17+
public PrintSubscriber(String name) {
18+
this.name = name;
19+
}
20+
@Override
21+
public void onCompleted() {
22+
System.out.println(name + ": Completed");
23+
}
24+
@Override
25+
public void onError(Throwable e) {
26+
System.out.println(name + ": Error: " + e);
27+
}
28+
@Override
29+
public void onNext(Object v) {
30+
System.out.println(name + ": " + v);
31+
}
32+
}
33+
34+
35+
public void example() {
36+
Observable<Integer> values = Observable.range(0, 3);
37+
38+
values
39+
.subscribe(new PrintSubscriber("Values"));
40+
values
41+
.count()
42+
.subscribe(new PrintSubscriber("Count"));
43+
44+
// Values: 0
45+
// Values: 1
46+
// Values: 2
47+
// Values: Completed
48+
// Count: 3
49+
// Count: Completed
50+
}
51+
52+
public void exampleCountLong() {
53+
Observable<Integer> values = Observable.range(0, 3);
54+
55+
values
56+
.subscribe(new PrintSubscriber("Values"));
57+
values
58+
.countLong()
59+
.subscribe(new PrintSubscriber("Count"));
60+
61+
// Values: 0
62+
// Values: 1
63+
// Values: 2
64+
// Values: Completed
65+
// Count: 3
66+
// Count: Completed
67+
}
68+
69+
@Test
70+
public void test() {
71+
TestSubscriber<Integer> testerSource = new TestSubscriber<>();
72+
TestSubscriber<Integer> testerCount = new TestSubscriber<>();
73+
74+
Observable<Integer> values = Observable.range(0, 3);
75+
76+
values
77+
.subscribe(testerSource);
78+
values
79+
.count()
80+
.subscribe(testerCount);
81+
82+
testerSource.assertReceivedOnNext(Arrays.asList(0,1,2));
83+
testerSource.assertTerminalEvent();
84+
testerSource.assertNoErrors();
85+
86+
testerCount.assertReceivedOnNext(Arrays.asList(3));
87+
testerCount.assertTerminalEvent();
88+
testerCount.assertNoErrors();
89+
}
90+
91+
@Test
92+
public void testCountLong() {
93+
TestSubscriber<Integer> testerSource = new TestSubscriber<>();
94+
TestSubscriber<Long> testerCount = new TestSubscriber<>();
95+
96+
Observable<Integer> values = Observable.range(0, 3);
97+
98+
values
99+
.subscribe(testerSource);
100+
values
101+
.countLong()
102+
.subscribe(testerCount);
103+
104+
testerSource.assertReceivedOnNext(Arrays.asList(0,1,2));
105+
testerSource.assertTerminalEvent();
106+
testerSource.assertNoErrors();
107+
108+
testerCount.assertReceivedOnNext(Arrays.asList(3L));
109+
testerCount.assertTerminalEvent();
110+
testerCount.assertNoErrors();
111+
}
112+
113+
114+
115+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package itrx.chapter2.aggregation;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.Arrays;
6+
import java.util.concurrent.TimeUnit;
7+
8+
import org.junit.Test;
9+
10+
import rx.Observable;
11+
import rx.Subscriber;
12+
import rx.observers.TestSubscriber;
13+
import rx.schedulers.Schedulers;
14+
import rx.schedulers.TestScheduler;
15+
16+
public class FirstTest {
17+
18+
private class PrintSubscriber extends Subscriber<Object>{
19+
private final String name;
20+
public PrintSubscriber(String name) {
21+
this.name = name;
22+
}
23+
@Override
24+
public void onCompleted() {
25+
System.out.println(name + ": Completed");
26+
}
27+
@Override
28+
public void onError(Throwable e) {
29+
System.out.println(name + ": Error: " + e);
30+
}
31+
@Override
32+
public void onNext(Object v) {
33+
System.out.println(name + ": " + v);
34+
}
35+
}
36+
37+
public void exampleFirst() {
38+
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
39+
40+
values
41+
.first()
42+
.subscribe(new PrintSubscriber("First"));
43+
44+
// 0
45+
}
46+
47+
public void exampleFirstWithPredicate() {
48+
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
49+
50+
values
51+
.first(v -> v>5)
52+
.subscribe(new PrintSubscriber("First"));
53+
54+
// 6
55+
}
56+
57+
public void exampleFirstOrDefault() {
58+
Observable<Long> values = Observable.empty();
59+
60+
values
61+
.firstOrDefault(-1L)
62+
.subscribe(new PrintSubscriber("First"));
63+
64+
// -1
65+
}
66+
67+
public void exampleFirstOrDefaultWithPredicate() {
68+
Observable<Long> values = Observable.empty();
69+
70+
values
71+
.firstOrDefault(-1L, v -> v>5)
72+
.subscribe(new PrintSubscriber("First"));
73+
74+
// -1
75+
}
76+
77+
@Test
78+
public void testFirst() {
79+
TestSubscriber<Long> tester = new TestSubscriber<>();
80+
TestScheduler scheduler = Schedulers.test();
81+
82+
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
83+
84+
values
85+
.first()
86+
.subscribe(tester);
87+
88+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
89+
90+
tester.assertReceivedOnNext(Arrays.asList(0L));
91+
tester.assertTerminalEvent();
92+
tester.assertNoErrors();
93+
}
94+
95+
@Test
96+
public void testFirstWithPredicate() {
97+
TestSubscriber<Long> tester = new TestSubscriber<>();
98+
TestScheduler scheduler = Schedulers.test();
99+
100+
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
101+
102+
values
103+
.first(v -> v>5)
104+
.subscribe(tester);
105+
106+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
107+
108+
tester.assertReceivedOnNext(Arrays.asList(6L));
109+
tester.assertTerminalEvent();
110+
tester.assertNoErrors();
111+
}
112+
113+
@Test
114+
public void testFirstOrDefault() {
115+
TestSubscriber<Long> tester = new TestSubscriber<>();
116+
117+
Observable<Long> values = Observable.empty();
118+
119+
values
120+
.firstOrDefault(-1L)
121+
.subscribe(tester);
122+
123+
tester.assertReceivedOnNext(Arrays.asList(-1L));
124+
tester.assertTerminalEvent();
125+
tester.assertNoErrors();
126+
}
127+
128+
@Test
129+
public void testFirstOrDefaultWithPredicate() {
130+
TestSubscriber<Long> tester = new TestSubscriber<>();
131+
132+
Observable<Long> values = Observable.empty();
133+
134+
values
135+
.firstOrDefault(-1L, v -> v>5)
136+
.subscribe(tester);
137+
138+
tester.assertReceivedOnNext(Arrays.asList(-1L));
139+
tester.assertTerminalEvent();
140+
tester.assertNoErrors();
141+
}
142+
143+
}
144+
145+
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package itrx.chapter2.aggregation;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.*;
6+
7+
import org.junit.Test;
8+
9+
import rx.Observable;
10+
import rx.Subscriber;
11+
import rx.Subscription;
12+
import rx.observers.TestSubscriber;
13+
import rx.schedulers.Schedulers;
14+
import rx.schedulers.TestScheduler;
15+
import rx.subjects.ReplaySubject;
16+
import rx.subjects.Subject;
17+
18+
public class GroupByTest {
19+
20+
private class PrintSubscriber extends Subscriber<Object>{
21+
private final String name;
22+
public PrintSubscriber(String name) {
23+
this.name = name;
24+
}
25+
@Override
26+
public void onCompleted() {
27+
System.out.println(name + ": Completed");
28+
}
29+
@Override
30+
public void onError(Throwable e) {
31+
System.out.println(name + ": Error: " + e);
32+
}
33+
@Override
34+
public void onNext(Object v) {
35+
System.out.println(name + ": " + v);
36+
}
37+
}
38+
39+
public void exampleGroupBy() {
40+
Observable<String> values = Observable.just(
41+
"first",
42+
"second",
43+
"third",
44+
"forth",
45+
"fifth",
46+
"sixth"
47+
);
48+
49+
values.groupBy(word -> word.charAt(0))
50+
.flatMap(group ->
51+
group.last().map(v -> group.getKey() + ": " + v)
52+
)
53+
.subscribe(v -> System.out.println(v));
54+
55+
// s: sixth
56+
// t: third
57+
// f: fifth
58+
}
59+
60+
61+
//
62+
// Tests
63+
//
64+
65+
@Test
66+
public void testGroupBy() {
67+
TestSubscriber<Object> tester = new TestSubscriber<>();
68+
69+
Observable<String> values = Observable.just(
70+
"first",
71+
"second",
72+
"third",
73+
"forth",
74+
"fifth",
75+
"sixth"
76+
);
77+
78+
values.groupBy(word -> word.charAt(0))
79+
.flatMap(group ->
80+
group.last().map(v -> group.getKey() + ": " + v)
81+
)
82+
.subscribe(tester);
83+
84+
tester.assertReceivedOnNext(Arrays.asList("s: sixth", "t: third", "f: fifth"));
85+
tester.assertTerminalEvent();
86+
tester.assertNoErrors();
87+
}
88+
}
89+
90+

0 commit comments

Comments
 (0)