|
20 | 20 |
|
21 | 21 | import java.util.concurrent.Executors; |
22 | 22 | import java.util.concurrent.TimeUnit; |
23 | | -import java.util.concurrent.atomic.AtomicBoolean; |
24 | 23 |
|
25 | 24 | import org.junit.Before; |
26 | 25 | import org.junit.Test; |
@@ -57,47 +56,35 @@ public static Func1<Observer<Long>, Subscription> interval(long interval, TimeUn |
57 | 56 | } |
58 | 57 |
|
59 | 58 | private static class Interval implements Func1<Observer<Long>, Subscription> { |
60 | | - private final long interval; |
| 59 | + private final long period; |
61 | 60 | private final TimeUnit unit; |
62 | 61 | private final Scheduler scheduler; |
63 | 62 |
|
64 | 63 | private long currentValue; |
65 | | - private final AtomicBoolean complete = new AtomicBoolean(); |
66 | 64 |
|
67 | | - private Interval(long interval, TimeUnit unit, Scheduler scheduler) { |
68 | | - this.interval = interval; |
| 65 | + private Interval(long period, TimeUnit unit, Scheduler scheduler) { |
| 66 | + this.period = period; |
69 | 67 | this.unit = unit; |
70 | 68 | this.scheduler = scheduler; |
71 | 69 | } |
72 | 70 |
|
73 | 71 | @Override |
74 | 72 | public Subscription call(final Observer<Long> observer) { |
75 | | - scheduler.schedule(new IntervalAction(observer), interval, unit); |
76 | | - return Subscriptions.create(new Action0() { |
| 73 | + final Subscription wrapped = scheduler.schedulePeriodically(new Action0() { |
77 | 74 | @Override |
78 | 75 | public void call() { |
79 | | - complete.set(true); |
| 76 | + observer.onNext(currentValue); |
| 77 | + currentValue++; |
80 | 78 | } |
81 | | - }); |
82 | | - } |
83 | | - |
84 | | - private class IntervalAction implements Action0 { |
85 | | - private final Observer<Long> observer; |
86 | | - |
87 | | - private IntervalAction(Observer<Long> observer) { |
88 | | - this.observer = observer; |
89 | | - } |
| 79 | + }, period, period, unit); |
90 | 80 |
|
91 | | - @Override |
92 | | - public void call() { |
93 | | - if (complete.get()) { |
| 81 | + return Subscriptions.create(new Action0() { |
| 82 | + @Override |
| 83 | + public void call() { |
| 84 | + wrapped.unsubscribe(); |
94 | 85 | observer.onCompleted(); |
95 | | - } else { |
96 | | - observer.onNext(currentValue); |
97 | | - currentValue++; |
98 | | - scheduler.schedule(this, interval, unit); |
99 | 86 | } |
100 | | - } |
| 87 | + }); |
101 | 88 | } |
102 | 89 | } |
103 | 90 |
|
|
0 commit comments