1919import java .util .LinkedList ;
2020import java .util .List ;
2121import java .util .Queue ;
22+ import java .util .concurrent .atomic .AtomicBoolean ;
23+
2224import rx .Notification ;
2325import rx .Observable ;
24- import rx .subscriptions . SingleAssignmentSubscription ;
26+ import rx .operators . SafeObservableSubscription ;
2527import rx .util .functions .Action1 ;
2628
2729/**
@@ -33,14 +35,15 @@ public final class JoinObserver1<T> extends ObserverBase<Notification<T>> implem
3335 private final Action1 <Throwable > onError ;
3436 private final List <ActivePlan0 > activePlans ;
3537 private final Queue <Notification <T >> queue ;
36- private final SingleAssignmentSubscription subscription ;
38+ private final SafeObservableSubscription subscription ;
3739 private volatile boolean done ;
40+ private final AtomicBoolean subscribed = new AtomicBoolean (false );
3841
3942 public JoinObserver1 (Observable <T > source , Action1 <Throwable > onError ) {
4043 this .source = source ;
4144 this .onError = onError ;
4245 queue = new LinkedList <Notification <T >>();
43- subscription = new SingleAssignmentSubscription ();
46+ subscription = new SafeObservableSubscription ();
4447 activePlans = new ArrayList <ActivePlan0 >();
4548 }
4649 public Queue <Notification <T >> queue () {
@@ -51,8 +54,12 @@ public void addActivePlan(ActivePlan0 activePlan) {
5154 }
5255 @ Override
5356 public void subscribe (Object gate ) {
54- this .gate = gate ;
55- subscription .set (source .materialize ().subscribe (this ));
57+ if (subscribed .compareAndSet (false , true )) {
58+ this .gate = gate ;
59+ subscription .wrap (source .materialize ().subscribe (this ));
60+ } else {
61+ throw new IllegalStateException ("Can only be subscribed to once." );
62+ }
5663 }
5764
5865 @ Override
0 commit comments