2121
2222import rx .Scheduler ;
2323import rx .Subscription ;
24- import rx .subscriptions .CompositeSubscription ;
2524import rx .subscriptions .MultipleAssignmentSubscription ;
25+ import rx .util .functions .Func1 ;
2626import rx .util .functions .Func2 ;
2727
2828/**
@@ -36,7 +36,17 @@ public static CurrentThreadScheduler getInstance() {
3636 return INSTANCE ;
3737 }
3838
39- private static final ThreadLocal <PriorityQueue <TimedAction >> QUEUE = new ThreadLocal <PriorityQueue <TimedAction >>();
39+ private static final ThreadLocal <PriorityQueue <TimedAction >> QUEUE = new ThreadLocal <PriorityQueue <TimedAction >>() {
40+ protected java .util .PriorityQueue <TimedAction > initialValue () {
41+ return new PriorityQueue <TimedAction >();
42+ };
43+ };
44+
45+ private static final ThreadLocal <Boolean > PROCESSING = new ThreadLocal <Boolean >() {
46+ protected Boolean initialValue () {
47+ return Boolean .FALSE ;
48+ };
49+ };
4050
4151 /* package accessible for unit tests */ CurrentThreadScheduler () {
4252 }
@@ -45,50 +55,57 @@ public static CurrentThreadScheduler getInstance() {
4555 public <T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action ) {
4656 // immediately move to the InnerCurrentThreadScheduler
4757 InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler ();
48- DiscardableAction < T > discardableAction = new DiscardableAction <T >(state , action );
49- enqueue (innerScheduler , discardableAction , now ());
58+ innerScheduler . enqueue ( new DiscardableAction <T >(state , action ), now () );
59+ enqueueFromOuter (innerScheduler , now ());
5060 return innerScheduler ;
5161 }
5262
5363 @ Override
5464 public <T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action , long dueTime , TimeUnit unit ) {
5565 long execTime = now () + unit .toMillis (dueTime );
5666
57- // immediately move to the InnerCurrentThreadScheduler
67+ // create an inner scheduler and queue it for execution
5868 InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler ();
59- DiscardableAction < T > discardableAction = new DiscardableAction <T >(state , new SleepingAction <T >(action , this , execTime ));
60- enqueue (innerScheduler , discardableAction , execTime );
61- return discardableAction ;
69+ innerScheduler . enqueue ( new DiscardableAction <T >(state , new SleepingAction <T >(action , this , execTime )), execTime );
70+ enqueueFromOuter (innerScheduler , execTime );
71+ return innerScheduler ;
6272 }
6373
64- private static void enqueue (Scheduler scheduler , DiscardableAction <?> action , long execTime ) {
74+ /*
75+ * This will accept InnerCurrentThreadScheduler instances and execute them in order they are received
76+ * and on each of them will loop internally until each is complete.
77+ */
78+ private void enqueueFromOuter (final InnerCurrentThreadScheduler innerScheduler , long execTime ) {
79+ // Note that everything here is single-threaded so we won't have race conditions
6580 PriorityQueue <TimedAction > queue = QUEUE .get ();
66- boolean exec = queue == null ;
81+ queue . add ( new TimedAction ( new Func1 < Scheduler , Subscription >() {
6782
68- if ( exec ) {
69- queue = new PriorityQueue < TimedAction >();
70- QUEUE . set ( queue );
71- }
72-
73- queue . add ( new TimedAction ( action , execTime , counter .incrementAndGet ()));
83+ @ Override
84+ public Subscription call ( Scheduler _) {
85+ // when the InnerCurrentThreadScheduler gets scheduled we want to process its tasks
86+ return innerScheduler . startProcessing ();
87+ }
88+ } , execTime , counter .incrementAndGet ()));
7489
75- if (exec ) {
90+ // first time through starts the loop
91+ if (!PROCESSING .get ()) {
92+ PROCESSING .set (Boolean .TRUE );
7693 while (!queue .isEmpty ()) {
77- queue .poll ().action .call (scheduler );
94+ queue .poll ().action .call (innerScheduler );
7895 }
79-
80- QUEUE .set (null );
96+ PROCESSING .set (Boolean .FALSE );
8197 }
8298 }
8399
84100 private static class InnerCurrentThreadScheduler extends Scheduler implements Subscription {
85101 private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription ();
102+ private final PriorityQueue <TimedAction > innerQueue = new PriorityQueue <TimedAction >();
86103
87104 @ Override
88105 public <T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action ) {
89106 DiscardableAction <T > discardableAction = new DiscardableAction <T >(state , action );
90107 childSubscription .set (discardableAction );
91- enqueue (this , discardableAction , now ());
108+ enqueue (discardableAction , now ());
92109 return childSubscription ;
93110 }
94111
@@ -98,10 +115,21 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?
98115
99116 DiscardableAction <T > discardableAction = new DiscardableAction <T >(state , action );
100117 childSubscription .set (discardableAction );
101- enqueue (this , discardableAction , execTime );
118+ enqueue (discardableAction , execTime );
102119 return childSubscription ;
103120 }
104121
122+ private void enqueue (Func1 <Scheduler , Subscription > action , long execTime ) {
123+ innerQueue .add (new TimedAction (action , execTime , counter .incrementAndGet ()));
124+ }
125+
126+ private Subscription startProcessing () {
127+ while (!innerQueue .isEmpty ()) {
128+ innerQueue .poll ().action .call (this );
129+ }
130+ return this ;
131+ }
132+
105133 @ Override
106134 public void unsubscribe () {
107135 childSubscription .unsubscribe ();
@@ -113,11 +141,11 @@ public void unsubscribe() {
113141 * Use time to sort items so delayed actions are sorted to their appropriate position in the queue.
114142 */
115143 private static class TimedAction implements Comparable <TimedAction > {
116- final DiscardableAction <? > action ;
144+ final Func1 < Scheduler , Subscription > action ;
117145 final Long execTime ;
118146 final Long count ; // In case if time between enqueueing took less than 1ms
119147
120- private TimedAction (DiscardableAction <? > action , Long execTime , Long count ) {
148+ private TimedAction (Func1 < Scheduler , Subscription > action , Long execTime , Long count ) {
121149 this .action = action ;
122150 this .execTime = execTime ;
123151 this .count = count ;
0 commit comments