1717
1818import java .util .PriorityQueue ;
1919import java .util .concurrent .TimeUnit ;
20- import java .util .concurrent .atomic .AtomicInteger ;
20+ import java .util .concurrent .atomic .AtomicLong ;
2121
2222import rx .Scheduler ;
2323import rx .Subscription ;
24+ import rx .subscriptions .CompositeSubscription ;
25+ import rx .subscriptions .MultipleAssignmentSubscription ;
2426import rx .util .functions .Func2 ;
2527
2628/**
2729 * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed.
2830 */
2931public class CurrentThreadScheduler extends Scheduler {
3032 private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler ();
33+ private static final AtomicLong counter = new AtomicLong (0 );
3134
3235 public static CurrentThreadScheduler getInstance () {
3336 return INSTANCE ;
@@ -38,25 +41,27 @@ public static CurrentThreadScheduler getInstance() {
3841 /* package accessible for unit tests */ CurrentThreadScheduler () {
3942 }
4043
41- private final AtomicInteger counter = new AtomicInteger (0 );
42-
4344 @ Override
4445 public <T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action ) {
46+ // immediately move to the InnerCurrentThreadScheduler
47+ InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler ();
4548 DiscardableAction <T > discardableAction = new DiscardableAction <T >(state , action );
46- enqueue (discardableAction , now ());
47- return discardableAction ;
49+ enqueue (innerScheduler , discardableAction , now ());
50+ return innerScheduler ;
4851 }
4952
5053 @ Override
5154 public <T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action , long dueTime , TimeUnit unit ) {
5255 long execTime = now () + unit .toMillis (dueTime );
5356
57+ // immediately move to the InnerCurrentThreadScheduler
58+ InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler ();
5459 DiscardableAction <T > discardableAction = new DiscardableAction <T >(state , new SleepingAction <T >(action , this , execTime ));
55- enqueue (discardableAction , execTime );
60+ enqueue (innerScheduler , discardableAction , execTime );
5661 return discardableAction ;
5762 }
5863
59- private void enqueue (DiscardableAction <?> action , long execTime ) {
64+ private static void enqueue (Scheduler scheduler , DiscardableAction <?> action , long execTime ) {
6065 PriorityQueue <TimedAction > queue = QUEUE .get ();
6166 boolean exec = queue == null ;
6267
@@ -69,19 +74,50 @@ private void enqueue(DiscardableAction<?> action, long execTime) {
6974
7075 if (exec ) {
7176 while (!queue .isEmpty ()) {
72- queue .poll ().action .call (this );
77+ queue .poll ().action .call (scheduler );
7378 }
7479
7580 QUEUE .set (null );
7681 }
7782 }
7883
84+ private static class InnerCurrentThreadScheduler extends Scheduler implements Subscription {
85+ private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription ();
86+
87+ @ Override
88+ public <T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action ) {
89+ DiscardableAction <T > discardableAction = new DiscardableAction <T >(state , action );
90+ childSubscription .set (discardableAction );
91+ enqueue (this , discardableAction , now ());
92+ return childSubscription ;
93+ }
94+
95+ @ Override
96+ public <T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action , long delayTime , TimeUnit unit ) {
97+ long execTime = now () + unit .toMillis (delayTime );
98+
99+ DiscardableAction <T > discardableAction = new DiscardableAction <T >(state , action );
100+ childSubscription .set (discardableAction );
101+ enqueue (this , discardableAction , execTime );
102+ return childSubscription ;
103+ }
104+
105+ @ Override
106+ public void unsubscribe () {
107+ childSubscription .unsubscribe ();
108+ }
109+
110+ }
111+
112+ /**
113+ * Use time to sort items so delayed actions are sorted to their appropriate position in the queue.
114+ */
79115 private static class TimedAction implements Comparable <TimedAction > {
80116 final DiscardableAction <?> action ;
81117 final Long execTime ;
82- final Integer count ; // In case if time between enqueueing took less than 1ms
118+ final Long count ; // In case if time between enqueueing took less than 1ms
83119
84- private TimedAction (DiscardableAction <?> action , Long execTime , Integer count ) {
120+ private TimedAction (DiscardableAction <?> action , Long execTime , Long count ) {
85121 this .action = action ;
86122 this .execTime = execTime ;
87123 this .count = count ;
0 commit comments