1515 */
1616package rx .operators ;
1717
18+ import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
1819import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
1920
2021import rx .Observable .Operator ;
2122import rx .Scheduler ;
2223import rx .Subscriber ;
24+ import rx .Subscription ;
2325import rx .functions .Action0 ;
2426import rx .schedulers .ImmediateScheduler ;
2527import rx .schedulers .TrampolineScheduler ;
@@ -59,6 +61,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
5961 private static final class ObserveOnSubscriber <T > extends Subscriber <T > {
6062 final Subscriber <? super T > observer ;
6163 private final Scheduler .Worker recursiveScheduler ;
64+ private final ScheduledUnsubscribe scheduledUnsubscribe ;
6265 final NotificationLite <T > on = NotificationLite .instance ();
6366 /** Guarded by this. */
6467 private FastList queue = new FastList ();
@@ -72,11 +75,15 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> subscriber
7275 super (subscriber );
7376 this .observer = subscriber ;
7477 this .recursiveScheduler = scheduler .createWorker ();
75- subscriber .add (recursiveScheduler );
78+ this .scheduledUnsubscribe = new ScheduledUnsubscribe (recursiveScheduler );
79+ subscriber .add (scheduledUnsubscribe );
7680 }
7781
7882 @ Override
7983 public void onNext (final T t ) {
84+ if (scheduledUnsubscribe .isUnsubscribed ()) {
85+ return ;
86+ }
8087 synchronized (this ) {
8188 queue .add (on .next (t ));
8289 }
@@ -85,6 +92,9 @@ public void onNext(final T t) {
8592
8693 @ Override
8794 public void onCompleted () {
95+ if (scheduledUnsubscribe .isUnsubscribed ()) {
96+ return ;
97+ }
8898 synchronized (this ) {
8999 queue .add (on .completed ());
90100 }
@@ -93,6 +103,9 @@ public void onCompleted() {
93103
94104 @ Override
95105 public void onError (final Throwable e ) {
106+ if (scheduledUnsubscribe .isUnsubscribed ()) {
107+ return ;
108+ }
96109 synchronized (this ) {
97110 queue .add (on .error (e ));
98111 }
@@ -153,4 +166,32 @@ public void add(Object o) {
153166 size = s + 1 ;
154167 }
155168 }
169+ static final class ScheduledUnsubscribe implements Subscription {
170+ final Scheduler .Worker worker ;
171+ volatile int once ;
172+ static final AtomicIntegerFieldUpdater <ScheduledUnsubscribe > ONCE_UPDATER
173+ = AtomicIntegerFieldUpdater .newUpdater (ScheduledUnsubscribe .class , "once" );
174+
175+ public ScheduledUnsubscribe (Scheduler .Worker worker ) {
176+ this .worker = worker ;
177+ }
178+
179+ @ Override
180+ public boolean isUnsubscribed () {
181+ return once != 0 ;
182+ }
183+
184+ @ Override
185+ public void unsubscribe () {
186+ if (ONCE_UPDATER .getAndSet (this , 1 ) == 0 ) {
187+ worker .schedule (new Action0 () {
188+ @ Override
189+ public void call () {
190+ worker .unsubscribe ();
191+ }
192+ });
193+ }
194+ }
195+
196+ }
156197}
0 commit comments