11/**
2- * Copyright 2014 Netflix, Inc.
2+ * Copyright 2013 Netflix, Inc.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1515 */
1616package rx ;
1717
18- import java .util .Date ;
1918import java .util .concurrent .TimeUnit ;
20- import java .util .concurrent .atomic .AtomicBoolean ;
2119
22- import rx .subscriptions .CompositeSubscription ;
23- import rx .subscriptions .MultipleAssignmentSubscription ;
24- import rx .subscriptions .Subscriptions ;
25- import rx .util .functions .Action0 ;
2620import rx .util .functions .Action1 ;
27- import rx .util .functions .Func2 ;
2821
2922/**
3023 * Represents an object that schedules units of work.
4841public abstract class Scheduler {
4942
5043 /**
51- * Schedules a cancelable action to be executed .
44+ * Schedules an Action on a new Scheduler instance (typically another thread) for execution .
5245 *
53- * @param state
54- * State to pass into the action.
5546 * @param action
5647 * Action to schedule.
5748 * @return a subscription to be able to unsubscribe from action.
5849 */
59- public abstract <T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action );
50+
51+ public abstract Subscription schedule (Action1 <Scheduler .Inner > action );
6052
6153 /**
62- * Schedules a cancelable action to be executed in delayTime .
54+ * Schedules an Action on a new Scheduler instance (typically another thread) for execution at some point in the future .
6355 *
64- * @param state
65- * State to pass into the action.
6656 * @param action
67- * Action to schedule.
6857 * @param delayTime
69- * Time the action is to be delayed before executing.
7058 * @param unit
71- * Time unit of the delay time.
72- * @return a subscription to be able to unsubscribe from action.
59+ * @return
7360 */
74- public abstract < T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action , long delayTime , TimeUnit unit );
61+ public abstract Subscription schedule (final Action1 < Scheduler . Inner > action , final long delayTime , final TimeUnit unit );
7562
7663 /**
7764 * Schedules a cancelable action to be executed periodically.
@@ -90,152 +77,58 @@ public abstract class Scheduler {
9077 * The time unit the interval above is given in.
9178 * @return A subscription to be able to unsubscribe from action.
9279 */
93- public < T > Subscription schedulePeriodically (T state , final Func2 <? super Scheduler , ? super T , ? extends Subscription > action , long initialDelay , long period , TimeUnit unit ) {
80+ public Subscription schedulePeriodically (final Action1 < Scheduler . Inner > action , long initialDelay , long period , TimeUnit unit ) {
9481 final long periodInNanos = unit .toNanos (period );
95- final AtomicBoolean complete = new AtomicBoolean ();
9682
97- final Func2 <Scheduler , T , Subscription > recursiveAction = new Func2 <Scheduler , T , Subscription >() {
83+ final Action1 <Scheduler . Inner > recursiveAction = new Action1 <Scheduler . Inner >() {
9884 @ Override
99- public Subscription call (Scheduler scheduler , T state0 ) {
100- if (!complete . get ()) {
85+ public void call (Inner inner ) {
86+ if (!inner . isUnsubscribed ()) {
10187 long startedAt = now ();
102- final Subscription sub1 = action .call (scheduler , state0 );
88+ action .call (inner );
10389 long timeTakenByActionInNanos = TimeUnit .MILLISECONDS .toNanos (now () - startedAt );
104- final Subscription sub2 = schedule (state0 , this , periodInNanos - timeTakenByActionInNanos , TimeUnit .NANOSECONDS );
105- return Subscriptions .create (new Action0 () {
106- @ Override
107- public void call () {
108- sub1 .unsubscribe ();
109- sub2 .unsubscribe ();
110- }
111- });
90+ inner .schedule (this , periodInNanos - timeTakenByActionInNanos , TimeUnit .NANOSECONDS );
11291 }
113- return Subscriptions .empty ();
11492 }
11593 };
116- final Subscription sub = schedule (state , recursiveAction , initialDelay , unit );
117- return Subscriptions .create (new Action0 () {
118- @ Override
119- public void call () {
120- complete .set (true );
121- sub .unsubscribe ();
122- }
123- });
94+ return schedule (recursiveAction , initialDelay , unit );
12495 }
12596
126- /**
127- * Schedules a cancelable action to be executed at dueTime.
128- *
129- * @param state
130- * State to pass into the action.
131- * @param action
132- * Action to schedule.
133- * @param dueTime
134- * Time the action is to be executed. If in the past it will be executed immediately.
135- * @return a subscription to be able to unsubscribe from action.
136- */
137- public <T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action , Date dueTime ) {
138- long scheduledTime = dueTime .getTime ();
139- long timeInFuture = scheduledTime - now ();
140- if (timeInFuture <= 0 ) {
141- return schedule (state , action );
142- } else {
143- return schedule (state , action , timeInFuture , TimeUnit .MILLISECONDS );
97+ public abstract static class Inner implements Subscription {
98+
99+ /**
100+ * Schedules an action to be executed in delayTime.
101+ *
102+ * @param delayTime
103+ * Time the action is to be delayed before executing.
104+ * @param unit
105+ * Time unit of the delay time.
106+ */
107+ public abstract void schedule (Action1 <Scheduler .Inner > action , long delayTime , TimeUnit unit );
108+
109+ /**
110+ * Schedules a cancelable action to be executed in delayTime.
111+ *
112+ */
113+ public abstract void schedule (Action1 <Scheduler .Inner > action );
114+
115+ /**
116+ * @return the scheduler's notion of current absolute time in milliseconds.
117+ */
118+ public long now () {
119+ return System .currentTimeMillis ();
144120 }
145121 }
146122
147123 /**
148- * Schedules an action and receives back an action for recursive execution.
149- *
150- * @param action
151- * action
152- * @return a subscription to be able to unsubscribe from action.
153- */
154- public Subscription schedule (final Action1 <Action0 > action ) {
155- final CompositeSubscription parentSubscription = new CompositeSubscription ();
156- final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription ();
157- parentSubscription .add (childSubscription );
158-
159- final Func2 <Scheduler , Func2 , Subscription > parentAction = new Func2 <Scheduler , Func2 , Subscription >() {
160-
161- @ Override
162- public Subscription call (final Scheduler scheduler , final Func2 parentAction ) {
163- action .call (new Action0 () {
164-
165- @ Override
166- public void call () {
167- if (!parentSubscription .isUnsubscribed ()) {
168- childSubscription .set (scheduler .schedule (parentAction , parentAction ));
169- }
170- }
171-
172- });
173- return childSubscription ;
174- }
175- };
176-
177- parentSubscription .add (schedule (parentAction , parentAction ));
178-
179- return parentSubscription ;
180- }
181-
182- /**
183- * Schedules an action to be executed.
184- *
185- * @param action
186- * action
187- * @return a subscription to be able to unsubscribe from action.
188- */
189- public Subscription schedule (final Action0 action ) {
190- return schedule (null , new Func2 <Scheduler , Void , Subscription >() {
191-
192- @ Override
193- public Subscription call (Scheduler scheduler , Void state ) {
194- action .call ();
195- return Subscriptions .empty ();
196- }
197- });
198- }
199-
200- /**
201- * Schedules an action to be executed in delayTime.
202- *
203- * @param action
204- * action
205- * @return a subscription to be able to unsubscribe from action.
206- */
207- public Subscription schedule (final Action0 action , long delayTime , TimeUnit unit ) {
208- return schedule (null , new Func2 <Scheduler , Void , Subscription >() {
209-
210- @ Override
211- public Subscription call (Scheduler scheduler , Void state ) {
212- action .call ();
213- return Subscriptions .empty ();
214- }
215- }, delayTime , unit );
216- }
217-
218- /**
219- * Schedules an action to be executed periodically.
124+ * Parallelism available to a Scheduler.
125+ * <p>
126+ * This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
220127 *
221- * @param action
222- * The action to execute periodically.
223- * @param initialDelay
224- * Time to wait before executing the action for the first time.
225- * @param period
226- * The time interval to wait each time in between executing the action.
227- * @param unit
228- * The time unit the interval above is given in.
229- * @return A subscription to be able to unsubscribe from action.
128+ * @return the scheduler's available degree of parallelism.
230129 */
231- public Subscription schedulePeriodically (final Action0 action , long initialDelay , long period , TimeUnit unit ) {
232- return schedulePeriodically (null , new Func2 <Scheduler , Void , Subscription >() {
233- @ Override
234- public Subscription call (Scheduler scheduler , Void state ) {
235- action .call ();
236- return Subscriptions .empty ();
237- }
238- }, initialDelay , period , unit );
130+ public int degreeOfParallelism () {
131+ return Runtime .getRuntime ().availableProcessors ();
239132 }
240133
241134 /**
@@ -245,14 +138,4 @@ public long now() {
245138 return System .currentTimeMillis ();
246139 }
247140
248- /**
249- * Parallelism available to a Scheduler.
250- * <p>
251- * This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
252- *
253- * @return the scheduler's available degree of parallelism.
254- */
255- public int degreeOfParallelism () {
256- return Runtime .getRuntime ().availableProcessors ();
257- }
258141}
0 commit comments