11package itrx ;
22
33import java .util .Arrays ;
4+ import java .util .concurrent .TimeUnit ;
45
56import org .junit .Test ;
67
78import rx .observers .TestSubscriber ;
9+ import rx .schedulers .Schedulers ;
10+ import rx .schedulers .TestScheduler ;
811import rx .subjects .ReplaySubject ;
912
1013public class ReplaySubjectTest {
11-
12- public static void main (String [] args ) {
13- ReplaySubject <Integer > s = ReplaySubject .create ();
14- s .subscribe (v -> System .out .println ("Early:" + v ));
15- s .onNext (0 );
16- s .onNext (1 );
17- s .subscribe (v -> System .out .println ("Late: " + v ));
18- s .onNext (2 );
19- }
2014
2115 @ Test
22- public void test () {
16+ public void testEarlyLate () {
2317 TestSubscriber <Integer > tester = new TestSubscriber <Integer >();
2418
2519 ReplaySubject <Integer > s = ReplaySubject .create ();
@@ -32,5 +26,36 @@ public void test() {
3226 tester .assertReceivedOnNext (Arrays .asList (0 , 1 , 0 , 1 , 2 , 2 ));
3327 }
3428
29+ @ Test
30+ public void testWithSize () {
31+ TestSubscriber <Integer > tester = new TestSubscriber <Integer >();
32+
33+ ReplaySubject <Integer > s = ReplaySubject .createWithSize (2 );
34+ s .onNext (0 );
35+ s .onNext (1 );
36+ s .onNext (2 );
37+ s .subscribe (tester );
38+ s .onNext (3 );
39+
40+ tester .assertReceivedOnNext (Arrays .asList (1 ,2 ,3 ));
41+ }
42+
43+ @ Test
44+ public void testWithTime () throws InterruptedException {
45+ TestSubscriber <Integer > tester = new TestSubscriber <Integer >();
46+ TestScheduler scheduler = Schedulers .test ();
47+
48+ ReplaySubject <Integer > s = ReplaySubject .createWithTime (150 , TimeUnit .MILLISECONDS , scheduler );
49+ s .onNext (0 );
50+ scheduler .advanceTimeBy (100 , TimeUnit .MILLISECONDS );
51+ s .onNext (1 );
52+ scheduler .advanceTimeBy (100 , TimeUnit .MILLISECONDS );
53+ s .onNext (2 );
54+ s .subscribe (tester );
55+ s .onNext (3 );
56+
57+ tester .assertReceivedOnNext (Arrays .asList (1 ,2 ,3 ));
58+ }
59+
3560
3661}
0 commit comments