Skip to content

Commit 08d7f62

Browse files
committed
Examples 4.4 Backpressure
1 parent e792fc8 commit 08d7f62

File tree

5 files changed

+455
-0
lines changed

5 files changed

+455
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package itrx.chapter4.backpressure;
2+
3+
import java.util.Arrays;
4+
import java.util.List;
5+
import java.util.concurrent.TimeUnit;
6+
7+
import org.junit.Test;
8+
9+
import rx.Observable;
10+
import rx.observers.TestSubscriber;
11+
import rx.schedulers.Schedulers;
12+
import rx.schedulers.TestScheduler;
13+
14+
public class ConsumerSideTest {
15+
16+
public void exampleSample() {
17+
Observable.interval(1, TimeUnit.MILLISECONDS)
18+
.observeOn(Schedulers.newThread())
19+
.sample(100, TimeUnit.MILLISECONDS)
20+
.take(3)
21+
.subscribe(
22+
i -> {
23+
System.out.println(i);
24+
try {
25+
Thread.sleep(100);
26+
} catch (Exception e) { }
27+
},
28+
System.out::println);
29+
30+
// 82
31+
// 182
32+
// 283
33+
}
34+
35+
public void exampleBuffer() {
36+
Observable.interval(10, TimeUnit.MILLISECONDS)
37+
.observeOn(Schedulers.newThread())
38+
.buffer(100, TimeUnit.MILLISECONDS)
39+
.take(3)
40+
.subscribe(
41+
i -> {
42+
System.out.println(i);
43+
try {
44+
Thread.sleep(100);
45+
} catch (Exception e) { }
46+
},
47+
System.out::println);
48+
49+
// [0, 1, 2, 3, 4, 5, 6, 7]
50+
// [8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
51+
// [18, 19, 20, 21, 22, 23, 24, 25, 26, 27]
52+
}
53+
54+
55+
//
56+
// Test
57+
//
58+
59+
@Test
60+
public void testSample() {
61+
TestScheduler scheduler = Schedulers.test();
62+
TestSubscriber<Long> tester = new TestSubscriber<Long>() {
63+
@Override
64+
public void onNext(Long t) {
65+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
66+
super.onNext(t);
67+
}
68+
};
69+
70+
Observable.interval(1, TimeUnit.MILLISECONDS, scheduler)
71+
.observeOn(scheduler)
72+
.sample(100, TimeUnit.MILLISECONDS, scheduler)
73+
.take(3)
74+
.subscribe(tester);
75+
76+
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
77+
tester.assertReceivedOnNext(Arrays.asList(98L, 199L, 299L));
78+
tester.assertNoErrors();
79+
}
80+
81+
@Test
82+
public void testBuffer() {
83+
TestScheduler scheduler = Schedulers.test();
84+
TestSubscriber<List<Long>> tester = new TestSubscriber<List<Long>>() {
85+
@Override
86+
public void onNext(List<Long> t) {
87+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
88+
super.onNext(t);
89+
}
90+
};
91+
92+
Observable.interval(10, TimeUnit.MILLISECONDS, scheduler)
93+
.observeOn(scheduler)
94+
.buffer(100, TimeUnit.MILLISECONDS, scheduler)
95+
.take(3)
96+
.subscribe(tester);
97+
98+
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
99+
tester.assertReceivedOnNext(Arrays.asList(
100+
Arrays.asList( 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L),
101+
Arrays.asList( 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, 17L, 18L, 19L),
102+
Arrays.asList(20L, 21L, 22L, 23L, 24L, 25L, 26L, 27L, 28L, 29L)
103+
));
104+
tester.assertNoErrors();
105+
}
106+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package itrx.chapter4.backpressure;
2+
3+
import rx.Subscriber;
4+
import rx.functions.Action0;
5+
import rx.functions.Action1;
6+
7+
/**
8+
* An Rx Subscriber that does not accept any items unless manually requested to.
9+
*
10+
* @author Chris
11+
*
12+
* @param <T>
13+
*/
14+
public class ControlledPullSubscriber<T> extends Subscriber<T> {
15+
16+
private final Action1<T> onNextAction;
17+
private final Action1<Throwable> onErrorAction;
18+
private final Action0 onCompletedAction;
19+
20+
public ControlledPullSubscriber(
21+
Action1<T> onNextAction,
22+
Action1<Throwable> onErrorAction,
23+
Action0 onCompletedAction) {
24+
this.onNextAction = onNextAction;
25+
this.onErrorAction = onErrorAction;
26+
this.onCompletedAction = onCompletedAction;
27+
}
28+
29+
public ControlledPullSubscriber(
30+
Action1<T> onNextAction,
31+
Action1<Throwable> onErrorAction) {
32+
this(onNextAction, onErrorAction, () -> {});
33+
}
34+
35+
public ControlledPullSubscriber(Action1<T> onNextAction) {
36+
this(onNextAction, e -> {}, () -> {});
37+
}
38+
39+
@Override
40+
public void onStart() {
41+
request(0);
42+
}
43+
44+
@Override
45+
public void onCompleted() {
46+
onCompletedAction.call();
47+
}
48+
49+
@Override
50+
public void onError(Throwable e) {
51+
onErrorAction.call(e);
52+
}
53+
54+
@Override
55+
public void onNext(T t) {
56+
onNextAction.call(t);
57+
}
58+
59+
public void requestMore(int n) {
60+
request(n);
61+
}
62+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package itrx.chapter4.backpressure;
2+
3+
import static org.hamcrest.CoreMatchers.instanceOf;
4+
import static org.junit.Assert.*;
5+
6+
import java.util.ArrayList;
7+
import java.util.Arrays;
8+
import java.util.List;
9+
import java.util.concurrent.TimeUnit;
10+
11+
import org.junit.Test;
12+
13+
import rx.Observable;
14+
import rx.observers.TestSubscriber;
15+
import rx.schedulers.Schedulers;
16+
import rx.schedulers.TestScheduler;
17+
18+
public class NoBackpressureTest {
19+
20+
public void exampleSynchronous() {
21+
// Produce
22+
Observable<Integer> producer = Observable.create(o -> {
23+
o.onNext(1);
24+
o.onNext(2);
25+
o.onCompleted();
26+
});
27+
// Consume
28+
producer.subscribe(i -> {
29+
try {
30+
Thread.sleep(1000);
31+
System.out.println(i);
32+
} catch (Exception e) { }
33+
});
34+
35+
// 1
36+
// 2
37+
}
38+
39+
public void exampleNoBackpressure() {
40+
Observable.interval(1, TimeUnit.MILLISECONDS)
41+
.observeOn(Schedulers.newThread())
42+
.subscribe(
43+
i -> {
44+
System.out.println(i);
45+
try {
46+
Thread.sleep(100);
47+
} catch (Exception e) { }
48+
},
49+
System.out::println);
50+
51+
// 0
52+
// 1
53+
// rx.exceptions.MissingBackpressureException
54+
}
55+
56+
57+
//
58+
// Tests
59+
//
60+
61+
@Test
62+
public void testSynchronous() {
63+
List<String> execution = new ArrayList<String>();
64+
65+
// Produce
66+
Observable<Integer> producer = Observable.create(o -> {
67+
execution.add("Producing 1");
68+
o.onNext(1);
69+
execution.add("Producing 2");
70+
o.onNext(2);
71+
o.onCompleted();
72+
});
73+
// Consume
74+
producer.subscribe(i -> execution.add("Processed " + i));
75+
76+
assertEquals(
77+
Arrays.asList(
78+
"Producing 1",
79+
"Processed 1",
80+
"Producing 2",
81+
"Processed 2"
82+
),
83+
execution);
84+
}
85+
86+
@Test
87+
public void testNoBackpressure() {
88+
TestScheduler scheduler = Schedulers.test();
89+
TestSubscriber<Long> tester = new TestSubscriber<Long>() {
90+
@Override
91+
public void onNext(Long t) {
92+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
93+
super.onNext(t);
94+
}
95+
};
96+
97+
Observable.interval(1, TimeUnit.MILLISECONDS, scheduler)
98+
.observeOn(scheduler)
99+
.subscribe(tester);
100+
101+
scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
102+
assertThat(
103+
tester.getOnErrorEvents().get(0),
104+
instanceOf(rx.exceptions.MissingBackpressureException.class));
105+
106+
}
107+
108+
}

0 commit comments

Comments
 (0)