1616package rx .concurrency ;
1717
1818import java .util .concurrent .TimeUnit ;
19+ import java .util .concurrent .atomic .AtomicBoolean ;
1920
2021import rx .Scheduler ;
2122import rx .Subscription ;
@@ -34,22 +35,12 @@ public Subscription schedule(Action0 action) {
3435
3536 @ Override
3637 public Subscription schedule (final Func1 <Scheduler , Subscription > action ) {
37- return schedule (new Func0 <Subscription >() {
38- @ Override
39- public Subscription call () {
40- return action .call (AbstractScheduler .this );
41- }
42- });
38+ return schedule (func0ForwardingToFunc1 (action ));
4339 }
4440
4541 @ Override
4642 public <T > Subscription schedule (final T state , final Func2 <Scheduler , T , Subscription > action ) {
47- return schedule (new Func0 <Subscription >() {
48- @ Override
49- public Subscription call () {
50- return action .call (AbstractScheduler .this , state );
51- }
52- });
43+ return schedule (func0ForwardingToFunc2 (action , state ));
5344 }
5445
5546 @ Override
@@ -59,29 +50,92 @@ public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
5950
6051 @ Override
6152 public Subscription schedule (final Func1 <Scheduler , Subscription > action , long dueTime , TimeUnit unit ) {
62- return schedule (new Func0 <Subscription >() {
63- @ Override
64- public Subscription call () {
65- return action .call (AbstractScheduler .this );
66- }
67- }, dueTime , unit );
53+ return schedule (func0ForwardingToFunc1 (action ), dueTime , unit );
6854 }
6955
7056 @ Override
7157 public <T > Subscription schedule (final T state , final Func2 <Scheduler , T , Subscription > action , long dueTime , TimeUnit unit ) {
72- return schedule (new Func0 <Subscription >() {
58+ return schedule (func0ForwardingToFunc2 (action , state ), dueTime , unit );
59+ }
60+
61+ @ Override
62+ public Subscription schedulePeriodically (Action0 action , long initialDelay , long period , TimeUnit unit ) {
63+ return schedulePeriodically (asFunc0 (action ), initialDelay , period , unit );
64+ }
65+
66+ /**
67+ * This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
68+ * long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
69+ */
70+ @ Override
71+ public Subscription schedulePeriodically (final Func0 <Subscription > action , long initialDelay , final long period , final TimeUnit unit ) {
72+ final long periodInNanos = unit .toNanos (period );
73+ final AtomicBoolean complete = new AtomicBoolean ();
74+
75+ final Func0 <Subscription > recursiveAction = new Func0 <Subscription >() {
7376 @ Override
7477 public Subscription call () {
75- return action .call (AbstractScheduler .this , state );
78+ if (! complete .get ()) {
79+ long startedAt = System .nanoTime ();
80+ final Subscription sub1 = action .call ();
81+ long timeTakenByActionInNanos = System .nanoTime () - startedAt ;
82+ final Subscription sub2 = schedule (this , periodInNanos - timeTakenByActionInNanos , TimeUnit .NANOSECONDS );
83+ return Subscriptions .create (new Action0 () {
84+ @ Override
85+ public void call () {
86+ sub1 .unsubscribe ();
87+ sub2 .unsubscribe ();
88+ }
89+ });
90+ }
91+ return Subscriptions .empty ();
92+ }
93+ };
94+ final Subscription sub = schedule (recursiveAction , initialDelay , unit );
95+ return Subscriptions .create (new Action0 () {
96+ @ Override
97+ public void call () {
98+ complete .set (true );
99+ sub .unsubscribe ();
76100 }
77- }, dueTime , unit );
101+ });
78102 }
79-
103+
104+ @ Override
105+ public Subscription schedulePeriodically (Func1 <Scheduler , Subscription > action , long initialDelay , long period , TimeUnit unit ) {
106+ return schedulePeriodically (func0ForwardingToFunc1 (action ), initialDelay , period , unit );
107+ }
108+
109+ @ Override
110+ public <T > Subscription schedulePeriodically (T state , Func2 <Scheduler , T , Subscription > action , long initialDelay , long period , TimeUnit unit ) {
111+ return schedulePeriodically (func0ForwardingToFunc2 (action , state ), initialDelay , period , unit );
112+ }
113+
80114 @ Override
81115 public long now () {
82116 return System .nanoTime ();
83117 }
84118
119+ @ SuppressWarnings ("static-method" ) // can't be done, of course, but Eclipse fails at detecting AbstractScheduler.this
120+ private Func0 <Subscription > func0ForwardingToFunc1 (final Func1 <Scheduler , Subscription > func1 ) {
121+ return new Func0 <Subscription >() {
122+ @ Override
123+ public Subscription call () {
124+ return func1 .call (AbstractScheduler .this );
125+ }
126+ };
127+ }
128+
129+ @ SuppressWarnings ("static-method" ) // can't be done, of course, but Eclipse fails at detecting AbstractScheduler.this
130+ private <T > Func0 <Subscription > func0ForwardingToFunc2 (final Func2 <Scheduler , T , Subscription > func2 , final T state ) {
131+ return new Func0 <Subscription >() {
132+ @ Override
133+ public Subscription call () {
134+ return func2 .call (AbstractScheduler .this , state );
135+ }
136+ };
137+ }
138+
85139 private static Func0 <Subscription > asFunc0 (final Action0 action ) {
86140 return new Func0 <Subscription >() {
87141 @ Override
0 commit comments