Skip to content

Commit 9a2fc21

Browse files
committed
Examples 3.7 Custom operators
1 parent 9bd4441 commit 9a2fc21

File tree

2 files changed

+184
-0
lines changed

2 files changed

+184
-0
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package itrx.chapter3.custom;
2+
3+
import java.util.Arrays;
4+
5+
import org.junit.Test;
6+
7+
import rx.Observable;
8+
import rx.observers.TestSubscriber;
9+
10+
public class ComposeTest {
11+
12+
/**
13+
* A custom operator for calculating a running average
14+
*
15+
* @author Chris
16+
*
17+
*/
18+
public static class RunningAverage implements Observable.Transformer<Integer, Double> {
19+
private static class AverageAcc {
20+
public final int sum;
21+
public final int count;
22+
public AverageAcc(int sum, int count) {
23+
this.sum = sum;
24+
this.count = count;
25+
}
26+
}
27+
28+
final int threshold;
29+
30+
public RunningAverage() {
31+
this.threshold = Integer.MAX_VALUE;
32+
}
33+
34+
public RunningAverage(int threshold) {
35+
this.threshold = threshold;
36+
}
37+
38+
@Override
39+
public Observable<Double> call(Observable<Integer> source) {
40+
return source
41+
.filter(i -> i< this.threshold)
42+
.scan(
43+
new AverageAcc(0,0),
44+
(acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
45+
.filter(acc -> acc.count > 0)
46+
.map(acc -> acc.sum/(double)acc.count);
47+
}
48+
}
49+
50+
public void exampleComposeFromClass() {
51+
Observable.just(2, 3, 10, 12, 4)
52+
.compose(new RunningAverage())
53+
.subscribe(System.out::println);
54+
55+
// 2.0
56+
// 2.5
57+
// 5.0
58+
// 6.75
59+
// 6.2
60+
}
61+
62+
public void exampleComposeParameterised() {
63+
Observable.just(2, 3, 10, 12, 4)
64+
.compose(new RunningAverage(5))
65+
.subscribe(System.out::println);
66+
67+
// 2.0
68+
// 2.5
69+
// 3.0
70+
}
71+
72+
73+
//
74+
// Test
75+
//
76+
77+
@Test
78+
public void testComposeFromClass() {
79+
TestSubscriber<Double> tester = new TestSubscriber<>();
80+
81+
Observable.just(2, 3, 10, 12, 4)
82+
.compose(new RunningAverage())
83+
.subscribe(tester);
84+
85+
tester.assertReceivedOnNext(Arrays.asList(2.0, 2.5, 5.0, 6.75, 6.2));
86+
tester.assertTerminalEvent();
87+
tester.assertNoErrors();
88+
}
89+
90+
@Test
91+
public void testComposeParameterised() {
92+
TestSubscriber<Double> tester = new TestSubscriber<>();
93+
94+
Observable.just(2, 3, 10, 12, 4)
95+
.compose(new RunningAverage(5))
96+
.subscribe(tester);
97+
98+
tester.assertReceivedOnNext(Arrays.asList(2.0, 2.5, 3.0));
99+
tester.assertTerminalEvent();
100+
tester.assertNoErrors();
101+
}
102+
103+
104+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package itrx.chapter3.custom;
2+
3+
import java.util.Arrays;
4+
5+
import org.junit.Test;
6+
7+
import rx.Observable;
8+
import rx.Subscriber;
9+
import rx.functions.Func1;
10+
import rx.observers.TestSubscriber;
11+
12+
public class LiftTest {
13+
14+
public static class MyMap<T,R> implements Observable.Operator<R, T> {
15+
private Func1<T,R> transformer;
16+
17+
public MyMap(Func1<T,R> transformer) {
18+
this.transformer = transformer;
19+
}
20+
21+
@Override
22+
public Subscriber<? super T> call(Subscriber<? super R> subscriber) {
23+
return new Subscriber<T>() {
24+
25+
@Override
26+
public void onCompleted() {
27+
if (!subscriber.isUnsubscribed())
28+
subscriber.onCompleted();
29+
}
30+
31+
@Override
32+
public void onError(Throwable e) {
33+
if (!subscriber.isUnsubscribed())
34+
subscriber.onError(e);
35+
}
36+
37+
@Override
38+
public void onNext(T t) {
39+
if (!subscriber.isUnsubscribed())
40+
subscriber.onNext(transformer.call(t));
41+
}
42+
43+
};
44+
}
45+
46+
public static <T,R> MyMap<T,R> create(Func1<T,R> transformer) {
47+
return new MyMap<T, R>(transformer);
48+
}
49+
}
50+
51+
public void exampleLift() {
52+
Observable.range(0, 5)
53+
.lift(MyMap.create(i -> i + "!"))
54+
.subscribe(System.out::println);
55+
56+
// 0!
57+
// 1!
58+
// 2!
59+
// 3!
60+
// 4!
61+
}
62+
63+
64+
//
65+
// Tests
66+
//
67+
68+
@Test
69+
public void testLift() {
70+
TestSubscriber<String> tester = new TestSubscriber<>();
71+
72+
Observable.range(0, 5)
73+
.lift(MyMap.create(i -> i + "!"))
74+
.subscribe(tester);
75+
76+
tester.assertReceivedOnNext(Arrays.asList("0!", "1!", "2!", "3!", "4!"));
77+
tester.assertTerminalEvent();
78+
tester.assertNoErrors();
79+
}
80+
}

0 commit comments

Comments
 (0)