|
15 | 15 | */ |
16 | 16 | package rx.lang.scala |
17 | 17 |
|
18 | | -import java.util.Date |
19 | 18 | import scala.concurrent.duration.Duration |
20 | | -import rx.util.functions.{Action0, Action1, Func2} |
| 19 | +import rx.util.functions.Action1 |
21 | 20 | import rx.lang.scala.schedulers._ |
| 21 | +import scala.concurrent.duration |
| 22 | +import rx.lang.scala.JavaConversions._ |
22 | 23 |
|
23 | 24 | /** |
24 | 25 | * Represents an object that schedules units of work. |
25 | 26 | */ |
26 | 27 | trait Scheduler { |
27 | | - import rx.lang.scala.ImplicitFunctionConversions._ |
28 | 28 |
|
29 | 29 | private [scala] val asJavaScheduler: rx.Scheduler |
30 | 30 |
|
31 | 31 | /** |
32 | | - * Schedules a cancelable action to be executed. |
| 32 | + * Parallelism available to a Scheduler. |
33 | 33 | * |
34 | | - * @param action Action to schedule. |
35 | | - * @return a subscription to be able to unsubscribe from action. |
36 | | - */ |
37 | | - def schedule(action: Scheduler => Subscription): Subscription = { |
38 | | - this.schedule[Integer](0, (s: Scheduler, x: Integer) => action(s): Subscription): Subscription |
39 | | - } |
40 | | - |
41 | | - /** |
42 | | - * Schedules a cancelable action to be executed. |
| 34 | + * This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster. |
43 | 35 | * |
44 | | - * @param state State to pass into the action. |
45 | | - * @param action Action to schedule. |
46 | | - * @return a subscription to be able to unsubscribe from action. |
| 36 | + * @return the scheduler's available degree of parallelism. |
47 | 37 | */ |
48 | | - private [scala] def schedule[T](state: T, action: (Scheduler, T) => Subscription): Subscription = { |
49 | | - Subscription(asJavaScheduler.schedule(state, new Func2[rx.Scheduler, T, rx.Subscription] { |
50 | | - def call(t1: rx.Scheduler, t2: T): rx.Subscription = { |
51 | | - action(Scheduler(t1), t2).asJavaSubscription |
52 | | - } |
53 | | - })) |
54 | | - } |
| 38 | + def degreeOfParallelism: Int = asJavaScheduler.degreeOfParallelism |
55 | 39 |
|
56 | 40 | /** |
57 | | - * Schedules a cancelable action to be executed in delayTime. |
58 | | - * |
59 | | - * @param action Action to schedule. |
60 | | - * @param delayTime Time the action is to be delayed before executing. |
61 | | - * @return a subscription to be able to unsubscribe from action. |
| 41 | + * @return the scheduler's notion of current absolute time in milliseconds. |
62 | 42 | */ |
63 | | - def schedule(delayTime: Duration, action: Scheduler => Subscription): Subscription = { |
64 | | - this.schedule[Integer](0, (s: Scheduler, x: Integer) => action(s), delayTime: Duration): Subscription |
65 | | - } |
| 43 | + def now: Long = this.asJavaScheduler.now() |
66 | 44 |
|
67 | 45 | /** |
68 | | - * Schedules a cancelable action to be executed in delayTime. |
| 46 | + * Schedules a cancelable action to be executed. |
69 | 47 | * |
70 | | - * @param state |
71 | | - * State to pass into the action. |
72 | | - * @param action |
73 | | - * Action to schedule. |
74 | | - * @param delayTime |
75 | | - * Time the action is to be delayed before executing. |
| 48 | + * @param action Action to schedule. |
76 | 49 | * @return a subscription to be able to unsubscribe from action. |
77 | 50 | */ |
78 | | - private [scala] def schedule[T](state: T, action: (Scheduler, T) => Subscription, delayTime: Duration): Subscription = { |
79 | | - Subscription(asJavaScheduler.schedule(state, schedulerActionToFunc2(action), delayTime.length, delayTime.unit)) |
80 | | - } |
81 | | - |
82 | | - /** |
83 | | - * Schedules a cancelable action to be executed periodically. |
84 | | - * This default implementation schedules recursively and waits for actions to complete (instead of potentially executing |
85 | | - * long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this. |
86 | | - * |
87 | | - * @param action The action to execute periodically. |
88 | | - * @param initialDelay Time to wait before executing the action for the first time. |
89 | | - * @param period The time interval to wait each time in between executing the action. |
90 | | - * @return A subscription to be able to unsubscribe from action. |
91 | | - */ |
92 | | - def schedule(initialDelay: Duration, period: Duration, action: Scheduler => Subscription): Subscription = { |
93 | | - this.schedulePeriodically[Integer](0, (s: Scheduler, x:Integer) => action(s): Subscription, initialDelay: Duration, period: Duration): Subscription |
94 | | - } |
| 51 | + def schedule(action: Inner => Unit): Subscription = this.asJavaScheduler.schedule(action) |
95 | 52 |
|
96 | 53 | /** |
97 | 54 | * Schedules a cancelable action to be executed periodically. |
98 | 55 | * This default implementation schedules recursively and waits for actions to complete (instead of potentially executing |
99 | 56 | * long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this. |
100 | 57 | * |
101 | | - * @param state |
102 | | - * State to pass into the action. |
103 | 58 | * @param action |
104 | | - * The action to execute periodically. |
| 59 | + * The action to execute periodically. |
105 | 60 | * @param initialDelay |
106 | | - * Time to wait before executing the action for the first time. |
| 61 | + * Time to wait before executing the action for the first time. |
107 | 62 | * @param period |
108 | | - * The time interval to wait each time in between executing the action. |
| 63 | + * The time interval to wait each time in between executing the action. |
109 | 64 | * @return A subscription to be able to unsubscribe from action. |
110 | 65 | */ |
111 | | - private [scala] def schedulePeriodically[T](state: T, action: (Scheduler, T) => Subscription, initialDelay: Duration, period: Duration): Subscription = { |
112 | | - Subscription(asJavaScheduler.schedulePeriodically(state, action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit)) |
113 | | - } |
114 | | - |
115 | | - /** |
116 | | - * Schedules a cancelable action to be executed at dueTime. |
117 | | - * |
118 | | - * @param action Action to schedule. |
119 | | - * @param dueTime Time the action is to be executed. If in the past it will be executed immediately. |
120 | | - * @return a subscription to be able to unsubscribe from action. |
121 | | - */ |
122 | | - def schedule(dueTime: Date, action: Scheduler => Subscription): Subscription = { |
123 | | - this.schedule(0: Integer, (s: Scheduler, x: Integer) => action(s): Subscription, dueTime: Date): Subscription |
124 | | - } |
| 66 | + def schedulePeriodically(action: Inner => Unit, initialDelay: Duration, period: Duration): Subscription = |
| 67 | + this.asJavaScheduler.schedulePeriodically ( |
| 68 | + new Action1[rx.Scheduler.Inner] { |
| 69 | + override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner)) |
| 70 | + }, |
| 71 | + initialDelay.toNanos, |
| 72 | + period.toNanos, |
| 73 | + duration.NANOSECONDS |
| 74 | + ) |
125 | 75 |
|
126 | | - /** |
127 | | - * Schedules a cancelable action to be executed at dueTime. |
128 | | - * |
129 | | - * @param state |
130 | | - * State to pass into the action. |
131 | | - * @param action |
132 | | - * Action to schedule. |
133 | | - * @param dueTime |
134 | | - * Time the action is to be executed. If in the past it will be executed immediately. |
135 | | - * @return a subscription to be able to unsubscribe from action. |
136 | | - */ |
137 | | - private [scala] def schedule[T](state: T, action: (Scheduler, T) => Subscription, dueTime: Date): Subscription = { |
138 | | - Subscription(asJavaScheduler.schedule(state, action, dueTime)) |
| 76 | + def scheduleRec(work: (=>Unit)=>Unit): Subscription = { |
| 77 | + Subscription(asJavaScheduler.schedule(new Action1[rx.Scheduler.Inner] { |
| 78 | + override def call(inner: rx.Scheduler.Inner): Unit = work{ inner.schedule(this) } |
| 79 | + })) |
139 | 80 | } |
| 81 | +} |
140 | 82 |
|
141 | | - /** |
142 | | - * Schedules an action to be executed. |
143 | | - * |
144 | | - * @param action |
145 | | - * action |
146 | | - * @return a subscription to be able to unsubscribe from action. |
147 | | - */ |
148 | | - def schedule(action: =>Unit): Subscription = { |
149 | | - Subscription(asJavaScheduler.schedule(()=>action)) |
150 | | - } |
| 83 | +object Inner { |
| 84 | + def apply(inner: rx.Scheduler.Inner): Inner = new Inner { private[scala] val asJavaInner = inner } |
| 85 | +} |
151 | 86 |
|
152 | | - /** |
153 | | - * Schedules an action to be executed in delayTime. |
154 | | - * |
155 | | - * @param action action |
156 | | - * @return a subscription to be able to unsubscribe from action. |
157 | | - */ |
158 | | - def schedule(delayTime: Duration, action: =>Unit): Subscription = { |
159 | | - Subscription(asJavaScheduler.schedule(()=>action, delayTime.length, delayTime.unit)) |
160 | | - } |
| 87 | +trait Inner extends Subscription { |
| 88 | + private [scala] val asJavaInner: rx.Scheduler.Inner |
161 | 89 |
|
162 | 90 | /** |
163 | | - * Schedules an action to be executed periodically. |
164 | | - * |
165 | | - * @param action |
166 | | - * The action to execute periodically. |
167 | | - * @param initialDelay |
168 | | - * Time to wait before executing the action for the first time. |
169 | | - * @param period |
170 | | - * The time interval to wait each time in between executing the action. |
171 | | - * @return A subscription to be able to unsubscribe from action. |
| 91 | + * Schedules a cancelable action to be executed in delayTime. |
172 | 92 | */ |
173 | | - def schedule(initialDelay: Duration, period: Duration, action: =>Unit): Subscription = { |
174 | | - Subscription(asJavaScheduler.schedulePeriodically(()=>action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit)) |
175 | | - } |
176 | | - |
177 | | - def scheduleRec(work: (=>Unit)=>Unit): Subscription = { |
178 | | - Subscription(asJavaScheduler.schedule(new Action1[Action0] { |
179 | | - def call(t1: Action0){ |
180 | | - work{ t1.call() } |
181 | | - } |
182 | | - })) |
183 | | - } |
| 93 | + def schedule(action: Inner => Unit, delayTime: Duration): Unit = |
| 94 | + this.asJavaInner.schedule( |
| 95 | + new Action1[rx.Scheduler.Inner] { |
| 96 | + override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner)) |
| 97 | + }, |
| 98 | + delayTime.length, |
| 99 | + delayTime.unit) |
184 | 100 |
|
185 | 101 | /** |
186 | | - * Returns the scheduler's notion of current absolute time in milliseconds. |
| 102 | + * Schedules a cancelable action to be executed immediately. |
187 | 103 | */ |
188 | | - def now: Long = { |
189 | | - asJavaScheduler.now |
190 | | - } |
| 104 | + def schedule(action: Inner=>Unit): Unit = this.asJavaInner.schedule( |
| 105 | + new Action1[rx.Scheduler.Inner]{ |
| 106 | + override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner)) |
| 107 | + } |
| 108 | + ) |
191 | 109 |
|
192 | 110 | /** |
193 | | - * Parallelism available to a Scheduler. |
194 | | - * |
195 | | - * This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster. |
196 | | - * |
197 | | - * @return the scheduler's available degree of parallelism. |
| 111 | + * @return the scheduler's notion of current absolute time in milliseconds. |
198 | 112 | */ |
199 | | - def degreeOfParallelism: Int = { |
200 | | - asJavaScheduler.degreeOfParallelism |
201 | | - } |
202 | | - |
| 113 | + def now: Long = this.asJavaInner.now() |
203 | 114 | } |
204 | 115 |
|
| 116 | + |
205 | 117 | private [scala] object Scheduler { |
206 | 118 | def apply(scheduler: rx.Scheduler): Scheduler = scheduler match { |
207 | | - case s: rx.schedulers.CurrentThreadScheduler => new CurrentThreadScheduler(s) |
208 | | - case s: rx.schedulers.ExecutorScheduler => new ExecutorScheduler(s) |
209 | | - case s: rx.schedulers.ImmediateScheduler => new ImmediateScheduler(s) |
210 | | - case s: rx.schedulers.NewThreadScheduler => new NewThreadScheduler(s) |
211 | 119 | case s: rx.schedulers.TestScheduler => new TestScheduler(s) |
212 | 120 | case s: rx.Scheduler => new Scheduler{ val asJavaScheduler = s } |
213 | 121 | } |
|
0 commit comments