Skip to content

Commit dfc7841

Browse files
committed
Naive schedulers implementation
1 parent bdd91eb commit dfc7841

13 files changed

Lines changed: 513 additions & 26 deletions

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,32 +36,7 @@
3636
import org.mockito.MockitoAnnotations;
3737

3838
import rx.observables.GroupedObservable;
39-
import rx.operators.OperationConcat;
40-
import rx.operators.OperationDefer;
41-
import rx.operators.OperationDematerialize;
42-
import rx.operators.OperationFilter;
43-
import rx.operators.OperationMap;
44-
import rx.operators.OperationMaterialize;
45-
import rx.operators.OperationMerge;
46-
import rx.operators.OperationMergeDelayError;
47-
import rx.operators.OperationMostRecent;
48-
import rx.operators.OperationNext;
49-
import rx.operators.OperationOnErrorResumeNextViaFunction;
50-
import rx.operators.OperationOnErrorResumeNextViaObservable;
51-
import rx.operators.OperationOnErrorReturn;
52-
import rx.operators.OperationScan;
53-
import rx.operators.OperationSkip;
54-
import rx.operators.OperationSynchronize;
55-
import rx.operators.OperationTake;
56-
import rx.operators.OperationTakeLast;
57-
import rx.operators.OperationToObservableFuture;
58-
import rx.operators.OperationToObservableIterable;
59-
import rx.operators.OperationToObservableList;
60-
import rx.operators.OperationToObservableSortedList;
61-
import rx.operators.OperationZip;
62-
import rx.operators.OperatorGroupBy;
63-
import rx.operators.OperatorTakeUntil;
64-
import rx.operators.OperatorToIterator;
39+
import rx.operators.*;
6540
import rx.plugins.RxJavaErrorHandler;
6641
import rx.plugins.RxJavaPlugins;
6742
import rx.subscriptions.Subscriptions;
@@ -766,6 +741,14 @@ public static Observable<Integer> range(int start, int count) {
766741
return from(Range.createWithCount(start, count));
767742
}
768743

744+
public static <T> Observable<T> subscribeOn(Observable<T> source, Scheduler scheduler) {
745+
return _create(OperationSubscribeOn.subscribeOn(source, scheduler));
746+
}
747+
748+
public static <T> Observable<T> observeOn(Observable<T> source, Scheduler scheduler) {
749+
return _create(OperationObserveOn.observeOn(source, scheduler));
750+
}
751+
769752
/**
770753
* Returns an observable sequence that invokes the observable factory whenever a new observer subscribes.
771754
* The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer
@@ -2589,6 +2572,14 @@ public Observable<Notification<T>> materialize() {
25892572
return materialize(this);
25902573
}
25912574

2575+
public Observable<T> subscribeOn(Scheduler scheduler) {
2576+
return subscribeOn(this, scheduler);
2577+
}
2578+
2579+
public Observable<T> observeOn(Scheduler scheduler) {
2580+
return observeOn(this, scheduler);
2581+
}
2582+
25922583
/**
25932584
* Dematerializes the explicit notification values of an observable sequence as implicit notifications.
25942585
*
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package rx;
2+
3+
import rx.util.functions.Action0;
4+
import rx.util.functions.Func0;
5+
6+
import java.util.concurrent.TimeUnit;
7+
8+
public interface Scheduler {
9+
10+
Subscription schedule(Action0 action);
11+
12+
Subscription schedule(Func0<Subscription> action);
13+
14+
Subscription schedule(Action0 action, long timespan, TimeUnit unit);
15+
16+
Subscription schedule(Func0<Subscription> action, long timespan, TimeUnit unit);
17+
18+
long now();
19+
20+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package rx.concurrency;
2+
3+
import rx.Scheduler;
4+
import rx.Subscription;
5+
import rx.subscriptions.Subscriptions;
6+
import rx.util.functions.Action0;
7+
import rx.util.functions.Func0;
8+
9+
import java.util.concurrent.TimeUnit;
10+
11+
public abstract class AbstractScheduler implements Scheduler {
12+
13+
@Override
14+
public Subscription schedule(Action0 action) {
15+
return schedule(asFunc0(action));
16+
}
17+
18+
@Override
19+
public Subscription schedule(Action0 action, long timespan, TimeUnit unit) {
20+
return schedule(asFunc0(action), timespan, unit);
21+
}
22+
23+
@Override
24+
public Subscription schedule(Func0<Subscription> action, long timespan, TimeUnit unit) {
25+
return schedule(new DelayedAction(action, this, timespan, unit));
26+
}
27+
28+
@Override
29+
public long now() {
30+
return System.nanoTime();
31+
}
32+
33+
private static Func0<Subscription> asFunc0(final Action0 action) {
34+
return new Func0<Subscription>() {
35+
@Override
36+
public Subscription call() {
37+
action.call();
38+
return Subscriptions.empty();
39+
}
40+
};
41+
}
42+
43+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package rx.concurrency;
2+
3+
import org.junit.Test;
4+
import rx.Scheduler;
5+
import rx.Subscription;
6+
import rx.util.functions.Action0;
7+
import rx.util.functions.Func0;
8+
9+
import java.util.LinkedList;
10+
import java.util.Queue;
11+
import java.util.concurrent.TimeUnit;
12+
13+
public class CurrentThreadScheduler extends AbstractScheduler {
14+
private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler();
15+
public static CurrentThreadScheduler getInstance() {
16+
return INSTANCE;
17+
}
18+
19+
private static final ThreadLocal<Queue<DiscardableAction>> QUEUE = new ThreadLocal<Queue<DiscardableAction>>();
20+
21+
private CurrentThreadScheduler() {
22+
}
23+
24+
@Override
25+
public Subscription schedule(Func0<Subscription> action) {
26+
DiscardableAction discardableAction = new DiscardableAction(action);
27+
enqueue(discardableAction);
28+
return discardableAction;
29+
}
30+
31+
private void enqueue(DiscardableAction action) {
32+
Queue<DiscardableAction> queue = QUEUE.get();
33+
boolean exec = false;
34+
35+
if (queue == null) {
36+
queue = new LinkedList<DiscardableAction>();
37+
QUEUE.set(queue);
38+
exec = true;
39+
}
40+
41+
queue.add(action);
42+
43+
while (exec && !queue.isEmpty()) {
44+
queue.poll().call();
45+
}
46+
}
47+
48+
public static class UnitTest {
49+
50+
@Test
51+
public void testScheduler() {
52+
final CurrentThreadScheduler scheduler = new CurrentThreadScheduler();
53+
54+
final Action0 firstAction = new Action0() {
55+
@Override
56+
public void call() {
57+
System.out.println("First action start");
58+
System.out.println("First action end");
59+
}
60+
};
61+
final Action0 secondAction = new Action0() {
62+
@Override
63+
public void call() {
64+
System.out.println("Second action start");
65+
scheduler.schedule(firstAction);
66+
System.out.println("Second action end");
67+
68+
}
69+
};
70+
final Action0 thirdAction = new Action0() {
71+
@Override
72+
public void call() {
73+
System.out.println("Third action start");
74+
scheduler.schedule(secondAction);
75+
System.out.println("Third action end");
76+
}
77+
};
78+
79+
scheduler.schedule(thirdAction);
80+
}
81+
82+
}
83+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package rx.concurrency;
2+
3+
import rx.Scheduler;
4+
import rx.Subscription;
5+
import rx.util.functions.Action0;
6+
import rx.util.functions.Func0;
7+
8+
import java.util.concurrent.TimeUnit;
9+
10+
public class DelayedAction implements Func0<Subscription> {
11+
private final Func0<Subscription> underlying;
12+
private final Scheduler scheduler;
13+
private final long execTime;
14+
15+
public DelayedAction(Func0<Subscription> underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) {
16+
this.underlying = underlying;
17+
this.scheduler = scheduler;
18+
this.execTime = scheduler.now() + timeUnit.toMillis(timespan);
19+
}
20+
21+
@Override
22+
public Subscription call() {
23+
if (execTime < scheduler.now()) {
24+
try {
25+
Thread.sleep(scheduler.now() - execTime);
26+
} catch (InterruptedException e) {
27+
Thread.currentThread().interrupt();
28+
throw new RuntimeException(e);
29+
}
30+
}
31+
32+
return underlying.call();
33+
34+
}
35+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package rx.concurrency;
2+
3+
import rx.Subscription;
4+
import rx.util.AtomicObservableSubscription;
5+
import rx.util.functions.Func0;
6+
7+
import java.util.concurrent.atomic.AtomicBoolean;
8+
9+
public class DiscardableAction implements Func0<Subscription>, Subscription {
10+
private final Func0<Subscription> underlying;
11+
12+
private final AtomicObservableSubscription wrapper = new AtomicObservableSubscription();
13+
private final AtomicBoolean ready = new AtomicBoolean(true);
14+
15+
public DiscardableAction(Func0<Subscription> underlying) {
16+
this.underlying = underlying;
17+
}
18+
19+
@Override
20+
public Subscription call() {
21+
if (ready.compareAndSet(true, false)) {
22+
Subscription subscription = underlying.call();
23+
wrapper.wrap(subscription);
24+
return subscription;
25+
}
26+
return wrapper;
27+
}
28+
29+
@Override
30+
public void unsubscribe() {
31+
ready.set(false);
32+
wrapper.unsubscribe();
33+
}
34+
}
35+
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package rx.concurrency;
2+
3+
import rx.Subscription;
4+
import rx.util.functions.Func0;
5+
6+
import java.util.concurrent.Executor;
7+
8+
public class ExecutorScheduler extends AbstractScheduler {
9+
private final Executor executor;
10+
11+
public ExecutorScheduler(Executor executor) {
12+
this.executor = executor;
13+
}
14+
15+
@Override
16+
public Subscription schedule(Func0<Subscription> action) {
17+
final DiscardableAction discardableAction = new DiscardableAction(action);
18+
19+
executor.execute(new Runnable() {
20+
@Override
21+
public void run() {
22+
discardableAction.call();
23+
}
24+
});
25+
26+
return discardableAction;
27+
28+
}
29+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package rx.concurrency;
2+
3+
import rx.Subscription;
4+
import rx.util.functions.Func0;
5+
6+
public final class ImmediateScheduler extends AbstractScheduler {
7+
private static final ImmediateScheduler INSTANCE = new ImmediateScheduler();
8+
9+
private ImmediateScheduler() {
10+
11+
}
12+
13+
public static ImmediateScheduler getInstance() {
14+
return INSTANCE;
15+
}
16+
17+
@Override
18+
public Subscription schedule(Func0<Subscription> action) {
19+
return new DiscardableAction(action);
20+
}
21+
22+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package rx.concurrency;
2+
3+
import rx.Subscription;
4+
import rx.util.functions.Func0;
5+
6+
public class NewThreadScheduler extends AbstractScheduler {
7+
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
8+
9+
public static NewThreadScheduler getInstance() {
10+
return INSTANCE;
11+
}
12+
13+
14+
@Override
15+
public Subscription schedule(Func0<Subscription> action) {
16+
final DiscardableAction discardableAction = new DiscardableAction(action);
17+
18+
Thread t = new Thread(new Runnable() {
19+
@Override
20+
public void run() {
21+
discardableAction.call();
22+
}
23+
});
24+
25+
t.start();
26+
27+
return discardableAction;
28+
}
29+
30+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package rx.concurrency;
2+
3+
import rx.Scheduler;
4+
5+
import java.util.concurrent.Executor;
6+
7+
public class Schedulers {
8+
private Schedulers() {
9+
10+
}
11+
12+
public static Scheduler immediate() {
13+
return ImmediateScheduler.getInstance();
14+
}
15+
16+
public static Scheduler currentThread() {
17+
return CurrentThreadScheduler.getInstance();
18+
}
19+
20+
public static Scheduler newThread() {
21+
return NewThreadScheduler.getInstance();
22+
}
23+
24+
public static Scheduler executor(Executor executor) {
25+
return new ExecutorScheduler(executor);
26+
}
27+
}

0 commit comments

Comments
 (0)