Skip to content

Commit 23994c0

Browse files
committed
3.3 Added retryWhen example
1 parent 06a4dca commit 23994c0

File tree

1 file changed

+77
-0
lines changed

1 file changed

+77
-0
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package itrx.chapter3.error;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.io.IOException;
6+
import java.util.Arrays;
7+
import java.util.concurrent.TimeUnit;
8+
9+
import junit.framework.Assert;
10+
11+
import org.junit.Test;
12+
13+
import rx.Observable;
14+
import rx.observers.TestSubscriber;
15+
import rx.schedulers.Schedulers;
16+
import rx.schedulers.TestScheduler;
17+
18+
public class RetryWhenTest {
19+
20+
public static void main(String[] args) throws IOException {
21+
new RetryWhenTest().example();
22+
System.in.read();
23+
}
24+
25+
public void example() {
26+
Observable<Integer> source = Observable.create(o -> {
27+
o.onNext(1);
28+
o.onNext(2);
29+
o.onError(new Exception("Failed"));
30+
});
31+
32+
source.retryWhen((o) -> o
33+
.take(2)
34+
.delay(100, TimeUnit.MILLISECONDS)
35+
.concatWith(Observable.error(new Exception("Done"))))
36+
.timeInterval()
37+
.subscribe(
38+
System.out::println,
39+
System.out::println);
40+
41+
// TimeInterval [intervalInMilliseconds=17, value=1]
42+
// TimeInterval [intervalInMilliseconds=0, value=2]
43+
// TimeInterval [intervalInMilliseconds=102, value=1]
44+
// TimeInterval [intervalInMilliseconds=0, value=2]
45+
// TimeInterval [intervalInMilliseconds=102, value=1]
46+
// TimeInterval [intervalInMilliseconds=0, value=2]
47+
}
48+
49+
50+
//
51+
// Test
52+
//
53+
54+
@Test
55+
public void test() {
56+
TestScheduler scheduler = Schedulers.test();
57+
TestSubscriber<Long> intervals = new TestSubscriber<>();
58+
59+
Observable<Integer> source = Observable.create(o -> {
60+
o.onNext(1);
61+
o.onNext(2);
62+
o.onError(new Exception("Failed"));
63+
});
64+
source.retryWhen((o) -> o
65+
.take(2)
66+
.delay(100, TimeUnit.MILLISECONDS, scheduler)
67+
.concatWith(Observable.error(new Exception("Done"))), scheduler)
68+
.timeInterval(scheduler)
69+
.map(i -> i.getIntervalInMilliseconds())
70+
.subscribe(intervals);
71+
72+
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
73+
intervals.assertReceivedOnNext(Arrays.asList(0L, 0L, 100L, 0L, 100L, 0L));
74+
intervals.assertTerminalEvent();
75+
assertEquals(1, intervals.getOnErrorEvents().size());
76+
}
77+
}

0 commit comments

Comments
 (0)