2525import rx .Scheduler ;
2626import rx .Subscription ;
2727import rx .subscriptions .CompositeSubscription ;
28+ import rx .subscriptions .MultipleAssignmentSubscription ;
2829import rx .subscriptions .Subscriptions ;
2930import rx .util .functions .Func2 ;
3031
@@ -68,19 +69,18 @@ public void run() {
6869 @ Override
6970 public <T > Subscription schedule (final T state , final Func2 <? super Scheduler , ? super T , ? extends Subscription > action , long delayTime , TimeUnit unit ) {
7071 final DiscardableAction <T > discardableAction = new DiscardableAction <T >(state , action );
71- final Scheduler _scheduler = this ;
72+ final InnerExecutorScheduler _scheduler = new InnerExecutorScheduler (executor );
73+
7274 // all subscriptions that may need to be unsubscribed
73- final CompositeSubscription subscription = new CompositeSubscription (discardableAction );
75+ final CompositeSubscription subscription = new CompositeSubscription (discardableAction , _scheduler );
7476
7577 if (executor instanceof ScheduledExecutorService ) {
7678 // we are a ScheduledExecutorService so can do proper scheduling
7779 ScheduledFuture <?> f = ((ScheduledExecutorService ) executor ).schedule (new Runnable () {
7880 @ Override
7981 public void run () {
8082 // when the delay has passed we now do the work on the actual scheduler
81- Subscription s = discardableAction .call (_scheduler );
82- // add the subscription to the CompositeSubscription so it is unsubscribed
83- subscription .add (s );
83+ discardableAction .call (_scheduler );
8484 }
8585 }, delayTime , unit );
8686 // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
@@ -89,9 +89,7 @@ public void run() {
8989 // we are not a ScheduledExecutorService so can't directly schedule
9090 if (delayTime == 0 ) {
9191 // no delay so put on the thread-pool right now
92- Subscription s = schedule (state , action );
93- // add the subscription to the CompositeSubscription so it is unsubscribed
94- subscription .add (s );
92+ return schedule (state , action );
9593 } else {
9694 // there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService
9795 // to handle the scheduling and once it's ready then execute on this Executor
@@ -100,9 +98,7 @@ public void run() {
10098 @ Override
10199 public void run () {
102100 // now execute on the real Executor (by using the other overload that schedules for immediate execution)
103- Subscription s = _scheduler .schedule (state , action );
104- // add the subscription to the CompositeSubscription so it is unsubscribed
105- subscription .add (s );
101+ _scheduler .schedule (state , action );
106102 }
107103 }, delayTime , unit );
108104 // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
@@ -114,33 +110,138 @@ public void run() {
114110
115111 @ Override
116112 public <T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action ) {
113+ CompositeSubscription s = new CompositeSubscription ();
117114 final DiscardableAction <T > discardableAction = new DiscardableAction <T >(state , action );
118- final Scheduler _scheduler = this ;
119- // all subscriptions that may need to be unsubscribed
120- final CompositeSubscription subscription = new CompositeSubscription (discardableAction );
115+ s .add (discardableAction );
116+
117+ final InnerExecutorScheduler _scheduler = new InnerExecutorScheduler (executor );
118+ s .add (_scheduler );
121119
122- // work to be done on a thread
123- Runnable r = new Runnable () {
120+ s .add (execute (executor , new Runnable () {
124121 @ Override
125122 public void run () {
126- Subscription s = discardableAction .call (_scheduler );
127- // add the subscription to the CompositeSubscription so it is unsubscribed
128- subscription .add (s );
123+ discardableAction .call (_scheduler );
129124 }
130- };
125+ }));
126+
127+ return s ;
128+ }
131129
130+ /**
131+ * Execute on the given Executor and retrieve a Subscription
132+ *
133+ * @param executor
134+ * @param r
135+ * @return
136+ */
137+ private static Subscription execute (Executor executor , Runnable r ) {
132138 // submit for immediate execution
133139 if (executor instanceof ExecutorService ) {
134140 // we are an ExecutorService so get a Future back that supports unsubscribe
135141 Future <?> f = ((ExecutorService ) executor ).submit (r );
136142 // add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens
137- subscription . add ( Subscriptions .from (f ) );
143+ return Subscriptions .from (f );
138144 } else {
139145 // we are the lowest common denominator so can't unsubscribe once we execute
140146 executor .execute (r );
147+ return Subscriptions .empty ();
141148 }
149+ }
142150
143- return subscription ;
151+ private static class InnerExecutorScheduler extends Scheduler implements Subscription {
152+
153+ private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription ();
154+ private final Executor executor ;
155+
156+ InnerExecutorScheduler (Executor executor ) {
157+ this .executor = executor ;
158+ }
159+
160+ @ Override
161+ public <T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action ) {
162+ if (childSubscription .isUnsubscribed ()) {
163+ return childSubscription ;
164+ }
165+
166+ CompositeSubscription s = new CompositeSubscription ();
167+ final DiscardableAction <T > discardableAction = new DiscardableAction <T >(state , action );
168+ s .add (discardableAction );
169+
170+ final Scheduler _scheduler = this ;
171+
172+ s .add (execute (executor , new Runnable () {
173+
174+ @ Override
175+ public void run () {
176+ discardableAction .call (_scheduler );
177+ }
178+ }));
179+
180+ // replace the InnerExecutorScheduler child subscription with this one
181+ childSubscription .set (s );
182+ /*
183+ * TODO: Consider what will happen if `schedule` is run concurrently instead of recursively
184+ * and we lose subscriptions as the `childSubscription` only remembers the last one scheduled.
185+ *
186+ * Not obvious that this should ever happen. Can it?
187+ *
188+ * benjchristensen => Haven't been able to come up with a valid test case to prove this as an issue
189+ * so it may not be.
190+ */
191+
192+ return childSubscription ;
193+ }
194+
195+ @ Override
196+ public <T > Subscription schedule (final T state , final Func2 <? super Scheduler , ? super T , ? extends Subscription > action , long delayTime , TimeUnit unit ) {
197+ if (childSubscription .isUnsubscribed ()) {
198+ return childSubscription ;
199+ }
200+
201+ CompositeSubscription s = new CompositeSubscription ();
202+ final DiscardableAction <T > discardableAction = new DiscardableAction <T >(state , action );
203+ s .add (discardableAction );
204+
205+ final Scheduler _scheduler = this ;
206+
207+ if (executor instanceof ScheduledExecutorService ) {
208+ // we are a ScheduledExecutorService so can do proper scheduling
209+ ScheduledFuture <?> f = ((ScheduledExecutorService ) executor ).schedule (new Runnable () {
210+ @ Override
211+ public void run () {
212+ // when the delay has passed we now do the work on the actual scheduler
213+ discardableAction .call (_scheduler );
214+ }
215+ }, delayTime , unit );
216+ // replace the InnerExecutorScheduler child subscription with this one
217+ childSubscription .set (Subscriptions .from (f ));
218+ } else {
219+ // we are not a ScheduledExecutorService so can't directly schedule
220+ if (delayTime == 0 ) {
221+ // no delay so put on the thread-pool right now
222+ return schedule (state , action );
223+ } else {
224+ // there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService
225+ // to handle the scheduling and once it's ready then execute on this Executor
226+ ScheduledFuture <?> f = GenericScheduledExecutorService .getInstance ().schedule (new Runnable () {
227+
228+ @ Override
229+ public void run () {
230+ // now execute on the real Executor (by using the other overload that schedules for immediate execution)
231+ _scheduler .schedule (state , action );
232+ }
233+ }, delayTime , unit );
234+ // replace the InnerExecutorScheduler child subscription with this one
235+ childSubscription .set (Subscriptions .from (f ));
236+ }
237+ }
238+ return childSubscription ;
239+ }
240+
241+ @ Override
242+ public void unsubscribe () {
243+ childSubscription .unsubscribe ();
244+ }
144245
145246 }
146247
0 commit comments