|
17 | 17 |
|
18 | 18 | import java.util.concurrent.TimeUnit; |
19 | 19 |
|
20 | | -import rx.functions.Action1; |
| 20 | +import rx.functions.Action0; |
| 21 | +import rx.schedulers.Schedulers; |
21 | 22 |
|
22 | 23 | /** |
23 | 24 | * Represents an object that schedules units of work. |
24 | 25 | * <p> |
25 | | - * The methods left to implement are: |
26 | | - * <ul> |
27 | | - * <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit)}</li> |
28 | | - * <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action)}</li> |
29 | | - * </ul> |
| 26 | + * Common implementations can be found in {@link Schedulers}. |
30 | 27 | * <p> |
31 | 28 | * Why is this an abstract class instead of an interface? |
32 | 29 | * <p> |
33 | 30 | * <ol> |
34 | 31 | * <li>Java doesn't support extension methods and there are many overload methods needing default |
35 | | - * implementations.</li> |
| 32 | + * implementations.</li> |
36 | 33 | * <li>Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for |
37 | | - * a long time.</li> |
| 34 | + * a long time.</li> |
38 | 35 | * <li>If only an interface were used Scheduler implementations would then need to extend from an |
39 | | - * AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the |
40 | | - * functionality.</li> |
| 36 | + * AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the |
| 37 | + * functionality.</li> |
41 | 38 | * <li>Without virtual extension methods even additive changes are breaking and thus severely impede library |
42 | | - * maintenance.</li> |
| 39 | + * maintenance.</li> |
43 | 40 | * </ol> |
44 | 41 | */ |
45 | 42 | public abstract class Scheduler { |
46 | 43 |
|
47 | 44 | /** |
48 | | - * Schedules an Action on a new Scheduler instance (typically another thread) for execution. |
49 | | - * |
50 | | - * @param action |
51 | | - * Action to schedule |
52 | | - * @return a subscription to be able to unsubscribe from action |
53 | | - */ |
54 | | - |
55 | | - public abstract Subscription schedule(Action1<Scheduler.Inner> action); |
56 | | - |
57 | | - /** |
58 | | - * Schedules an Action on a new Scheduler instance (typically another thread) for execution at some point |
59 | | - * in the future. |
| 45 | + * Retrieve or create a new {@link Scheduler.Inner} that represents serial execution of actions. |
| 46 | + * <p> |
| 47 | + * When work is completed it should be unsubscribed. Work on a {@link Scheduler.Inner} is guaranteed to be sequential. |
60 | 48 | * |
61 | | - * @param action |
62 | | - * the Action to schedule |
63 | | - * @param delayTime |
64 | | - * time to wait before executing the action |
65 | | - * @param unit |
66 | | - * the time unit the delay time is given in |
67 | | - * @return a subscription to be able to unsubscribe from action |
| 49 | + * @return Inner representing a serial queue of actions to be executed |
68 | 50 | */ |
69 | | - public abstract Subscription schedule(final Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit unit); |
| 51 | + public abstract Inner inner(); |
70 | 52 |
|
71 | 53 | /** |
72 | | - * Schedules a cancelable action to be executed periodically. This default implementation schedules |
73 | | - * recursively and waits for actions to complete (instead of potentially executing long-running actions |
74 | | - * concurrently). Each scheduler that can do periodic scheduling in a better way should override this. |
75 | | - * |
76 | | - * @param action |
77 | | - * the Action to execute periodically |
78 | | - * @param initialDelay |
79 | | - * time to wait before executing the action for the first time |
80 | | - * @param period |
81 | | - * the time interval to wait each time in between executing the action |
82 | | - * @param unit |
83 | | - * the time unit the interval above is given in |
84 | | - * @return a subscription to be able to unsubscribe from action |
| 54 | + * Sequential Scheduler for executing actions on a single thread or event loop. |
| 55 | + * <p> |
| 56 | + * Unsubscribing the {@Inner} unschedules all outstanding work and allows resources cleanup. |
85 | 57 | */ |
86 | | - public Subscription schedulePeriodically(final Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit) { |
87 | | - final long periodInNanos = unit.toNanos(period); |
88 | | - |
89 | | - final Action1<Scheduler.Inner> recursiveAction = new Action1<Scheduler.Inner>() { |
90 | | - @Override |
91 | | - public void call(Inner inner) { |
92 | | - if (!inner.isUnsubscribed()) { |
93 | | - long startedAt = now(); |
94 | | - action.call(inner); |
95 | | - long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt); |
96 | | - inner.schedule(this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS); |
97 | | - } |
98 | | - } |
99 | | - }; |
100 | | - return schedule(recursiveAction, initialDelay, unit); |
101 | | - } |
102 | | - |
103 | | - public final Subscription scheduleRecursive(final Action1<Recurse> action) { |
104 | | - return schedule(new Action1<Inner>() { |
105 | | - |
106 | | - @Override |
107 | | - public void call(Inner inner) { |
108 | | - action.call(new Recurse(inner, action)); |
109 | | - } |
110 | | - |
111 | | - }); |
112 | | - } |
113 | | - |
114 | | - public static final class Recurse { |
115 | | - private final Action1<Recurse> action; |
116 | | - private final Inner inner; |
117 | | - |
118 | | - private Recurse(Inner inner, Action1<Recurse> action) { |
119 | | - this.inner = inner; |
120 | | - this.action = action; |
121 | | - } |
122 | | - |
123 | | - /** |
124 | | - * Schedule the current function for execution immediately. |
125 | | - */ |
126 | | - public final void schedule() { |
127 | | - final Recurse self = this; |
128 | | - inner.schedule(new Action1<Inner>() { |
129 | | - |
130 | | - @Override |
131 | | - public void call(Inner _inner) { |
132 | | - action.call(self); |
133 | | - } |
134 | | - |
135 | | - }); |
136 | | - } |
| 58 | + public abstract static class Inner implements Subscription { |
137 | 59 |
|
138 | 60 | /** |
139 | | - * Schedule the current function for execution in the future. |
| 61 | + * Schedules an Action for execution. |
| 62 | + * |
| 63 | + * @param action |
| 64 | + * Action to schedule |
| 65 | + * @return a subscription to be able to unsubscribe the action (unschedule it if not executed) |
140 | 66 | */ |
141 | | - public final void schedule(long delay, TimeUnit unit) { |
142 | | - final Recurse self = this; |
143 | | - inner.schedule(new Action1<Inner>() { |
144 | | - |
145 | | - @Override |
146 | | - public void call(Inner _inner) { |
147 | | - action.call(self); |
148 | | - } |
149 | 67 |
|
150 | | - }, delay, unit); |
151 | | - } |
152 | | - } |
153 | | - |
154 | | - public abstract static class Inner implements Subscription { |
| 68 | + public abstract Subscription schedule(Action0 action); |
155 | 69 |
|
156 | 70 | /** |
157 | | - * Schedules an action to be executed in delayTime. |
| 71 | + * Schedules an Action for execution at some point in the future. |
158 | 72 | * |
| 73 | + * @param action |
| 74 | + * the Action to schedule |
159 | 75 | * @param delayTime |
160 | | - * time the action is to be delayed before executing |
| 76 | + * time to wait before executing the action |
161 | 77 | * @param unit |
162 | | - * time unit of the delay time |
| 78 | + * the time unit the delay time is given in |
| 79 | + * @return a subscription to be able to unsubscribe the action (unschedule it if not executed) |
163 | 80 | */ |
164 | | - public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit); |
| 81 | + public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit); |
165 | 82 |
|
166 | 83 | /** |
167 | | - * Schedules a cancelable action to be executed in delayTime. |
| 84 | + * Schedules a cancelable action to be executed periodically. This default implementation schedules |
| 85 | + * recursively and waits for actions to complete (instead of potentially executing long-running actions |
| 86 | + * concurrently). Each scheduler that can do periodic scheduling in a better way should override this. |
168 | 87 | * |
| 88 | + * @param action |
| 89 | + * the Action to execute periodically |
| 90 | + * @param initialDelay |
| 91 | + * time to wait before executing the action for the first time |
| 92 | + * @param period |
| 93 | + * the time interval to wait each time in between executing the action |
| 94 | + * @param unit |
| 95 | + * the time unit the interval above is given in |
| 96 | + * @return a subscription to be able to unsubscribe the action (unschedule it if not executed) |
169 | 97 | */ |
170 | | - public abstract void schedule(Action1<Scheduler.Inner> action); |
| 98 | + public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) { |
| 99 | + final long periodInNanos = unit.toNanos(period); |
| 100 | + |
| 101 | + final Action0 recursiveAction = new Action0() { |
| 102 | + @Override |
| 103 | + public void call() { |
| 104 | + if (!isUnsubscribed()) { |
| 105 | + long startedAt = now(); |
| 106 | + action.call(); |
| 107 | + long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt); |
| 108 | + schedule(this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS); |
| 109 | + } |
| 110 | + } |
| 111 | + }; |
| 112 | + return schedule(recursiveAction, initialDelay, unit); |
| 113 | + } |
171 | 114 |
|
172 | 115 | /** |
173 | 116 | * @return the scheduler's notion of current absolute time in milliseconds. |
|
0 commit comments