Skip to content

Commit 16838e4

Browse files
committed
Examples 4.1 Scheduling
1 parent 9a2fc21 commit 16838e4

File tree

5 files changed

+599
-0
lines changed

5 files changed

+599
-0
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package itrx.chapter4.scheduling;
2+
3+
import static org.junit.Assert.*;
4+
5+
import org.junit.Assert;
6+
import org.junit.Test;
7+
8+
import rx.Observable;
9+
import rx.schedulers.Schedulers;
10+
11+
public class ObserveOnTest {
12+
13+
public void exampleObserveOn() {
14+
Observable.create(o -> {
15+
System.out.println("Created on " + Thread.currentThread().getId());
16+
o.onNext(1);
17+
o.onNext(2);
18+
o.onCompleted();
19+
})
20+
.observeOn(Schedulers.newThread())
21+
.subscribe(i ->
22+
System.out.println("Received " + i + " on " + Thread.currentThread().getId()));
23+
24+
// Created on 1
25+
// Received 1 on 13
26+
// Received 2 on 13
27+
}
28+
29+
public void exampleObserveOnBeforeAfter() {
30+
Observable.create(o -> {
31+
System.out.println("Created on " + Thread.currentThread().getId());
32+
o.onNext(1);
33+
o.onNext(2);
34+
o.onCompleted();
35+
})
36+
.doOnNext(i ->
37+
System.out.println("Before " + i + " on " + Thread.currentThread().getId()))
38+
.observeOn(Schedulers.newThread())
39+
.doOnNext(i ->
40+
System.out.println("After " + i + " on " + Thread.currentThread().getId()))
41+
.subscribe();
42+
43+
// Created on 1
44+
// Before 1 on 1
45+
// Before 2 on 1
46+
// After 1 on 13
47+
// After 2 on 13
48+
}
49+
50+
51+
//
52+
// Test
53+
//
54+
55+
@Test
56+
public void testObserveOn() {
57+
long[] threads = {0, 0};
58+
59+
Observable.create(o -> {
60+
threads[0] = Thread.currentThread().getId();
61+
o.onNext(1);
62+
o.onNext(2);
63+
o.onCompleted();
64+
})
65+
.observeOn(Schedulers.newThread())
66+
.subscribe(i -> threads[1] = Thread.currentThread().getId());
67+
68+
Assert.assertNotEquals("Create and receive on different threads", threads[0], threads[1]);
69+
}
70+
71+
@Test
72+
public void testObserveOnBeforeAfter() {
73+
long[] threads = {0, 0, 0, 0, 0};
74+
75+
threads[0] = Thread.currentThread().getId();
76+
77+
Observable.create(o -> {
78+
threads[1] = Thread.currentThread().getId();
79+
o.onNext(1);
80+
o.onNext(2);
81+
o.onCompleted();
82+
})
83+
.doOnNext(i -> threads[2] = Thread.currentThread().getId())
84+
.observeOn(Schedulers.newThread())
85+
.doOnNext(i -> threads[3] = Thread.currentThread().getId())
86+
.subscribe(i -> threads[4] = Thread.currentThread().getId());
87+
88+
assertEquals("Create on main thread", threads[0], threads[1]);
89+
assertEquals("Synchronous before observeOn", threads[1], threads[2]);
90+
assertEquals("Synchronous after observeOn", threads[3], threads[4]);
91+
assertNotEquals("Before and after observeOn on different threads", threads[2], threads[3]);
92+
}
93+
94+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package itrx.chapter4.scheduling;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.lang.Thread.State;
6+
import java.util.ArrayList;
7+
import java.util.Arrays;
8+
import java.util.List;
9+
import java.util.concurrent.TimeUnit;
10+
11+
import org.junit.Test;
12+
13+
import rx.Scheduler;
14+
import rx.schedulers.Schedulers;
15+
import rx.schedulers.TestScheduler;
16+
17+
public class SchedulerTest {
18+
19+
public void exampleSchedule() {
20+
Scheduler scheduler = Schedulers.immediate();
21+
22+
Scheduler.Worker worker = scheduler.createWorker();
23+
worker.schedule(
24+
() -> System.out.println("Action"));
25+
}
26+
27+
public void exampleScheduleFuture() {
28+
Scheduler scheduler = Schedulers.newThread();
29+
long start = System.currentTimeMillis();
30+
Scheduler.Worker worker = scheduler.createWorker();
31+
worker.schedule(
32+
() -> System.out.println(System.currentTimeMillis()-start),
33+
5, TimeUnit.SECONDS);
34+
worker.schedule(
35+
() -> System.out.println(System.currentTimeMillis()-start),
36+
5, TimeUnit.SECONDS);
37+
38+
// 5033
39+
// 5035
40+
}
41+
42+
public void exampleCancelWork() {
43+
Scheduler scheduler = Schedulers.newThread();
44+
long start = System.currentTimeMillis();
45+
Scheduler.Worker worker = scheduler.createWorker();
46+
worker.schedule(
47+
() -> {
48+
System.out.println(System.currentTimeMillis()-start);
49+
worker.unsubscribe();
50+
},
51+
5, TimeUnit.SECONDS);
52+
worker.schedule(
53+
() -> System.out.println(System.currentTimeMillis()-start),
54+
5, TimeUnit.SECONDS);
55+
56+
// 5032
57+
}
58+
59+
public void exampleCancelWithInterrupt() throws InterruptedException {
60+
Scheduler scheduler = Schedulers.newThread();
61+
Scheduler.Worker worker = scheduler.createWorker();
62+
worker.schedule(() -> {
63+
try {
64+
Thread.sleep(2000);
65+
System.out.println("Action completed");
66+
} catch (InterruptedException e) {
67+
System.out.println("Action interrupted");
68+
}
69+
});
70+
Thread.sleep(500);
71+
worker.unsubscribe();
72+
73+
// Action interrupted
74+
}
75+
76+
77+
//
78+
// Test
79+
//
80+
81+
@Test
82+
public void testSchedule() {
83+
List<Boolean> executed = new ArrayList<>();
84+
85+
Scheduler scheduler = Schedulers.immediate();
86+
Scheduler.Worker worker = scheduler.createWorker();
87+
worker.schedule(
88+
() -> executed.add(true));
89+
90+
assertEquals(Arrays.asList(new Boolean(true)), executed);
91+
}
92+
93+
@Test
94+
public void testScheduleFuture() {
95+
long[] executionTimes = {0, 0};
96+
97+
TestScheduler scheduler = Schedulers.test();
98+
Scheduler.Worker worker = scheduler.createWorker();
99+
worker.schedule(
100+
() -> executionTimes[0] = scheduler.now(),
101+
5, TimeUnit.SECONDS);
102+
worker.schedule(
103+
() -> executionTimes[1] = scheduler.now(),
104+
5, TimeUnit.SECONDS);
105+
106+
scheduler.advanceTimeTo(5000, TimeUnit.MILLISECONDS);
107+
assertEquals("First task executed on time", 5000, executionTimes[0]);
108+
assertEquals("Second task executed on time", 5000, executionTimes[1]);
109+
}
110+
111+
@Test
112+
public void testCancelWork() {
113+
long[] executionTimes = {0, 0};
114+
115+
TestScheduler scheduler = Schedulers.test();
116+
Scheduler.Worker worker = scheduler.createWorker();
117+
worker.schedule(
118+
() -> {
119+
executionTimes[0] = scheduler.now();
120+
worker.unsubscribe();
121+
},
122+
5, TimeUnit.SECONDS);
123+
worker.schedule(
124+
() -> executionTimes[1] = scheduler.now(),
125+
5, TimeUnit.SECONDS);
126+
127+
scheduler.advanceTimeTo(5000, TimeUnit.MILLISECONDS);
128+
assertEquals("First task executed on time", 5000, executionTimes[0]);
129+
assertEquals("Second task never executed", 0, executionTimes[1]);
130+
}
131+
132+
@Test
133+
public void testCancelWithInterrupt() throws InterruptedException {
134+
Scheduler scheduler = Schedulers.newThread();
135+
Scheduler.Worker worker = scheduler.createWorker();
136+
Thread[] workerThread = {null};
137+
Boolean[] interrupted = {false};
138+
worker.schedule(() -> {
139+
try {
140+
workerThread[0] = Thread.currentThread();
141+
Thread.sleep(100);
142+
} catch (InterruptedException e) {
143+
interrupted[0] = true;
144+
}
145+
});
146+
147+
while (workerThread[0] == null ||
148+
workerThread[0].getState() != State.TIMED_WAITING)
149+
Thread.sleep(1); // Wait for task to sleep
150+
worker.unsubscribe();
151+
workerThread[0].join();
152+
assertTrue("Task must be interrupted before completing", interrupted[0]);
153+
}
154+
155+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package itrx.chapter4.scheduling;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.ArrayList;
6+
import java.util.Arrays;
7+
import java.util.List;
8+
import java.util.concurrent.Semaphore;
9+
10+
import org.junit.Test;
11+
12+
import rx.Scheduler;
13+
import rx.schedulers.Schedulers;
14+
15+
public class SchedulersTest {
16+
17+
public static void printThread(String message) {
18+
System.out.println(message + " on " + Thread.currentThread().getId());
19+
}
20+
21+
public void exampleImmediate() {
22+
Scheduler scheduler = Schedulers.immediate();
23+
Scheduler.Worker worker = scheduler.createWorker();
24+
worker.schedule(() -> {
25+
System.out.println("Start");
26+
worker.schedule(() -> System.out.println("Inner"));
27+
System.out.println("End");
28+
});
29+
30+
// Start
31+
// Inner
32+
// End
33+
}
34+
35+
public void exampleTrampoline() {
36+
Scheduler scheduler = Schedulers.trampoline();
37+
Scheduler.Worker worker = scheduler.createWorker();
38+
worker.schedule(() -> {
39+
System.out.println("Start");
40+
worker.schedule(() -> System.out.println("Inner"));
41+
System.out.println("End");
42+
});
43+
44+
// Start
45+
// End
46+
// Inner
47+
}
48+
49+
public void exampleNewThread() throws InterruptedException {
50+
printThread("Main");
51+
Scheduler scheduler = Schedulers.newThread();
52+
Scheduler.Worker worker = scheduler.createWorker();
53+
worker.schedule(() -> {
54+
printThread("Start");
55+
worker.schedule(() -> printThread("Inner"));
56+
printThread("End");
57+
});
58+
Thread.sleep(500);
59+
worker.schedule(() -> printThread("Again"));
60+
61+
// Main on 1
62+
// Start on 11
63+
// End on 11
64+
// Inner on 11
65+
// Again on 11
66+
}
67+
68+
69+
//
70+
// Test
71+
//
72+
73+
@Test
74+
public void testImmediate() {
75+
List<String> execution = new ArrayList<>();
76+
77+
Scheduler scheduler = Schedulers.immediate();
78+
Scheduler.Worker worker = scheduler.createWorker();
79+
worker.schedule(() -> {
80+
execution.add("Start");
81+
worker.schedule(() -> execution.add("Inner"));
82+
execution.add("End");
83+
});
84+
85+
assertEquals(Arrays.asList("Start", "Inner", "End"), execution);
86+
}
87+
88+
@Test
89+
public void testTrampoline() {
90+
List<String> execution = new ArrayList<>();
91+
92+
Scheduler scheduler = Schedulers.trampoline();
93+
Scheduler.Worker worker = scheduler.createWorker();
94+
worker.schedule(() -> {
95+
execution.add("Start");
96+
worker.schedule(() -> execution.add("Inner"));
97+
execution.add("End");
98+
});
99+
100+
assertEquals(Arrays.asList("Start", "End", "Inner"), execution);
101+
}
102+
103+
@Test
104+
public void testNewThread() throws InterruptedException {
105+
List<String> execution = new ArrayList<>();
106+
List<Thread> threads = new ArrayList<>();
107+
Semaphore workfinished = new Semaphore(-2);
108+
109+
Scheduler scheduler = Schedulers.newThread();
110+
Scheduler.Worker worker = scheduler.createWorker();
111+
worker.schedule(() -> {
112+
threads.add(Thread.currentThread());
113+
execution.add("Start");
114+
worker.schedule(() -> {
115+
execution.add("Inner");
116+
workfinished.release();
117+
});
118+
execution.add("End");
119+
workfinished.release();
120+
});
121+
worker.schedule(() -> {
122+
threads.add(Thread.currentThread());
123+
workfinished.release();
124+
});
125+
126+
workfinished.acquire();
127+
128+
assertEquals("Same worker schedules on the same thread",
129+
threads.get(0),
130+
threads.get(1));
131+
assertEquals("New thread used as trampoline",
132+
Arrays.asList("Start", "End", "Inner"),
133+
execution);
134+
}
135+
136+
}

0 commit comments

Comments
 (0)