2525import rx .Scheduler ;
2626import rx .Subscription ;
2727import rx .subscriptions .CompositeSubscription ;
28+ import rx .subscriptions .MultipleAssignmentSubscription ;
2829import rx .subscriptions .Subscriptions ;
30+ import rx .util .functions .Action0 ;
2931import rx .util .functions .Func2 ;
3032
3133/**
@@ -46,6 +48,7 @@ private NewThreadScheduler() {
4648
4749 private static class EventLoopScheduler extends Scheduler {
4850 private final ExecutorService executor ;
51+ private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription ();
4952
5053 private EventLoopScheduler () {
5154 executor = Executors .newFixedThreadPool (1 , new ThreadFactory () {
@@ -61,21 +64,30 @@ public Thread newThread(Runnable r) {
6164
6265 @ Override
6366 public <T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action ) {
67+ CompositeSubscription s = new CompositeSubscription ();
6468 final DiscardableAction <T > discardableAction = new DiscardableAction <T >(state , action );
65- // all subscriptions that may need to be unsubscribed
66- final CompositeSubscription subscription = new CompositeSubscription (discardableAction );
67-
69+ s .add (discardableAction );
70+
6871 final Scheduler _scheduler = this ;
69- subscription .add (Subscriptions .from (executor .submit (new Runnable () {
72+ s .add (Subscriptions .from (executor .submit (new Runnable () {
7073
7174 @ Override
7275 public void run () {
73- Subscription s = discardableAction .call (_scheduler );
74- subscription .add (s );
76+ discardableAction .call (_scheduler );
7577 }
7678 })));
77-
78- return subscription ;
79+
80+ // replace the EventLoopScheduler child subscription with this one
81+ childSubscription .set (s );
82+ /*
83+ * If `schedule` is run concurrently instead of recursively then we'd lose subscriptions as the `childSubscription`
84+ * only remembers the last one scheduled. However, the parent subscription will shutdown the entire EventLoopScheduler
85+ * and the ExecutorService which will terminate all outstanding tasks so this childSubscription is actually somewhat
86+ * superfluous for stopping and cleanup ... though childSubscription does ensure exactness as can be seen by
87+ * the `testUnSubscribeForScheduler()` unit test which fails if the `childSubscription` does not exist.
88+ */
89+
90+ return childSubscription ;
7991 }
8092
8193 @ Override
@@ -103,12 +115,26 @@ public void run() {
103115 return subscription ;
104116 }
105117
118+ private void shutdownNow () {
119+ executor .shutdownNow ();
120+ }
121+
106122 }
107123
108124 @ Override
109125 public <T > Subscription schedule (T state , Func2 <? super Scheduler , ? super T , ? extends Subscription > action ) {
110- EventLoopScheduler s = new EventLoopScheduler ();
111- return s .schedule (state , action );
126+ final EventLoopScheduler s = new EventLoopScheduler ();
127+ CompositeSubscription cs = new CompositeSubscription ();
128+ cs .add (s .schedule (state , action ));
129+ cs .add (Subscriptions .create (new Action0 () {
130+
131+ @ Override
132+ public void call () {
133+ // shutdown the executor, all tasks queued to run and clean up resources
134+ s .shutdownNow ();
135+ }
136+ }));
137+ return cs ;
112138 }
113139
114140 @ Override
0 commit comments