@@ -37,6 +37,34 @@ public void exampleReplay() throws InterruptedException {
3737// Second: 3
3838 }
3939
40+ public void exampleReplayWithBufferSize () throws InterruptedException {
41+ ConnectableObservable <Long > source = Observable .interval (1000 , TimeUnit .MILLISECONDS )
42+ .take (5 )
43+ .replay (2 );
44+
45+ source .connect ();
46+ Thread .sleep (4500 );
47+ source .subscribe (System .out ::println );
48+
49+ // 2
50+ // 3
51+ // 4
52+ }
53+
54+ public void exampleReplayWithTime () throws InterruptedException {
55+ ConnectableObservable <Long > source = Observable .interval (1000 , TimeUnit .MILLISECONDS )
56+ .take (5 )
57+ .replay (2000 , TimeUnit .MILLISECONDS );
58+
59+ source .connect ();
60+ Thread .sleep (4500 );
61+ source .subscribe (System .out ::println );
62+
63+ // 2
64+ // 3
65+ // 4
66+ }
67+
4068
4169 //
4270 // Test
@@ -68,5 +96,44 @@ public void testReplay() throws InterruptedException {
6896
6997 connection .unsubscribe ();
7098 }
99+
100+ @ Test
101+ public void testReplayWithBufferSize () {
102+ TestScheduler scheduler = Schedulers .test ();
103+ TestSubscriber <Long > tester = new TestSubscriber <>();
104+
105+ ConnectableObservable <Long > source = Observable .interval (1000 , TimeUnit .MILLISECONDS , scheduler )
106+ .take (5 )
107+ .replay (2 , scheduler );
108+
109+ source .connect ();
110+ scheduler .advanceTimeBy (4500 , TimeUnit .MILLISECONDS );
111+ source .subscribe (tester );
112+ scheduler .triggerActions ();
113+ tester .assertReceivedOnNext (Arrays .asList (2L , 3L ));
114+ scheduler .advanceTimeBy (500 , TimeUnit .MILLISECONDS );
115+ tester .assertReceivedOnNext (Arrays .asList (2L , 3L , 4L ));
116+ }
117+
118+ @ Test
119+ public void testReplayWithTime () throws InterruptedException {
120+ TestScheduler scheduler = Schedulers .test ();
121+ TestSubscriber <Long > tester = new TestSubscriber <>();
122+
123+ ConnectableObservable <Long > source = Observable .interval (1000 , TimeUnit .MILLISECONDS , scheduler )
124+ .take (5 )
125+ .replay (2000 , TimeUnit .MILLISECONDS , scheduler );
126+
127+ source .connect ();
128+ scheduler .advanceTimeBy (4500 , TimeUnit .MILLISECONDS );
129+ source .subscribe (tester );
130+ tester .assertReceivedOnNext (Arrays .asList (2L , 3L ));
131+ scheduler .advanceTimeBy (500 , TimeUnit .MILLISECONDS );
132+ tester .assertReceivedOnNext (Arrays .asList (2L , 3L , 4L ));
133+
134+ // 2
135+ // 3
136+ // 4
137+ }
71138
72139}
0 commit comments