File tree Expand file tree Collapse file tree 1 file changed +37
-0
lines changed
tests/java/itrx/chapter2/creating Expand file tree Collapse file tree 1 file changed +37
-0
lines changed Original file line number Diff line number Diff line change 22
33import java .util .Arrays ;
44import java .util .concurrent .FutureTask ;
5+ import java .util .concurrent .TimeUnit ;
56
67import org .junit .Test ;
78
@@ -28,6 +29,24 @@ public void exampleFromFuture() {
2829 // Received: 21
2930 // Completed
3031 }
32+
33+ public void exampleFromFutureTimeout () {
34+ FutureTask <Integer > f = new FutureTask <Integer >(() -> {
35+ Thread .sleep (2000 );
36+ return 21 ;
37+ });
38+ new Thread (f ).start ();
39+
40+ Observable <Integer > values = Observable .from (f , 1000 , TimeUnit .MILLISECONDS );
41+
42+ values .subscribe (
43+ v -> System .out .println ("Received: " + v ),
44+ e -> System .out .println ("Error: " + e ),
45+ () -> System .out .println ("Completed" )
46+ );
47+
48+ // Error: java.util.concurrent.TimeoutException
49+ }
3150
3251 public void exampleFromArray () {
3352 Integer [] is = {1 ,2 ,3 };
@@ -64,6 +83,24 @@ public void exampleFromIterable() {
6483 // Tests
6584 //
6685
86+ @ Test
87+ public void testFromFuture () {
88+ TestSubscriber <Integer > tester = new TestSubscriber <Integer >();
89+
90+ FutureTask <Integer > f = new FutureTask <Integer >(() -> {
91+ return 21 ;
92+ });
93+ new Thread (f ).start ();
94+
95+ Observable <Integer > values = Observable .from (f );
96+
97+ values .subscribe (tester );
98+
99+ tester .assertReceivedOnNext (Arrays .asList (21 ));
100+ tester .assertNoErrors ();
101+ tester .assertTerminalEvent ();
102+ }
103+
67104 @ Test
68105 public void testFromArray () {
69106 TestSubscriber <Integer > tester = new TestSubscriber <Integer >();
You can’t perform that action at this time.
0 commit comments