Skip to content

Commit b8aec3a

Browse files
committed
3.6 Added examples for replay overloads
1 parent 16838e4 commit b8aec3a

File tree

1 file changed

+67
-0
lines changed

1 file changed

+67
-0
lines changed

tests/java/itrx/chapter3/hotandcold/ReplayTest.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,34 @@ public void exampleReplay() throws InterruptedException {
3737
// Second: 3
3838
}
3939

40+
public void exampleReplayWithBufferSize() throws InterruptedException {
41+
ConnectableObservable<Long> source = Observable.interval(1000, TimeUnit.MILLISECONDS)
42+
.take(5)
43+
.replay(2);
44+
45+
source.connect();
46+
Thread.sleep(4500);
47+
source.subscribe(System.out::println);
48+
49+
// 2
50+
// 3
51+
// 4
52+
}
53+
54+
public void exampleReplayWithTime() throws InterruptedException {
55+
ConnectableObservable<Long> source = Observable.interval(1000, TimeUnit.MILLISECONDS)
56+
.take(5)
57+
.replay(2000, TimeUnit.MILLISECONDS);
58+
59+
source.connect();
60+
Thread.sleep(4500);
61+
source.subscribe(System.out::println);
62+
63+
// 2
64+
// 3
65+
// 4
66+
}
67+
4068

4169
//
4270
// Test
@@ -68,5 +96,44 @@ public void testReplay() throws InterruptedException {
6896

6997
connection.unsubscribe();
7098
}
99+
100+
@Test
101+
public void testReplayWithBufferSize() {
102+
TestScheduler scheduler = Schedulers.test();
103+
TestSubscriber<Long> tester = new TestSubscriber<>();
104+
105+
ConnectableObservable<Long> source = Observable.interval(1000, TimeUnit.MILLISECONDS, scheduler)
106+
.take(5)
107+
.replay(2, scheduler);
108+
109+
source.connect();
110+
scheduler.advanceTimeBy(4500, TimeUnit.MILLISECONDS);
111+
source.subscribe(tester);
112+
scheduler.triggerActions();
113+
tester.assertReceivedOnNext(Arrays.asList(2L, 3L));
114+
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
115+
tester.assertReceivedOnNext(Arrays.asList(2L, 3L, 4L));
116+
}
117+
118+
@Test
119+
public void testReplayWithTime() throws InterruptedException {
120+
TestScheduler scheduler = Schedulers.test();
121+
TestSubscriber<Long> tester = new TestSubscriber<>();
122+
123+
ConnectableObservable<Long> source = Observable.interval(1000, TimeUnit.MILLISECONDS, scheduler)
124+
.take(5)
125+
.replay(2000, TimeUnit.MILLISECONDS, scheduler);
126+
127+
source.connect();
128+
scheduler.advanceTimeBy(4500, TimeUnit.MILLISECONDS);
129+
source.subscribe(tester);
130+
tester.assertReceivedOnNext(Arrays.asList(2L, 3L));
131+
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
132+
tester.assertReceivedOnNext(Arrays.asList(2L, 3L, 4L));
133+
134+
// 2
135+
// 3
136+
// 4
137+
}
71138

72139
}

0 commit comments

Comments
 (0)