2323import rx .Observable .OnSubscribe ;
2424import rx .Observer ;
2525import rx .Subscriber ;
26- import rx .Subscription ;
2726import rx .functions .Action0 ;
2827import rx .functions .Action1 ;
29- import rx .operators .SafeObservableSubscription ;
3028import rx .subscriptions .Subscriptions ;
3129
3230/* package */ class SubjectSubscriptionManager <T > {
3937 * Always runs at the beginning of 'subscribe' regardless of terminal state.
4038 * @param onTerminated
4139 * Only runs if Subject is in terminal state and the Observer ends up not being registered.
42- * @param onUnsubscribe called after the child subscription is removed from the state
40+ * @param onUnsubscribe
41+ * called after the child subscription is removed from the state
4342 * @return
4443 */
45- public OnSubscribe <T > getOnSubscribeFunc (final Action1 <SubjectObserver <? super T >> onSubscribe ,
44+ public OnSubscribe <T > getOnSubscribeFunc (final Action1 <SubjectObserver <? super T >> onSubscribe ,
4645 final Action1 <SubjectObserver <? super T >> onTerminated ,
4746 final Action1 <SubjectObserver <? super T >> onUnsubscribe ) {
4847 return new OnSubscribe <T >() {
@@ -73,10 +72,9 @@ public void call(Subscriber<? super T> actualOperator) {
7372 }
7473 break ;
7574 } else {
76- final SafeObservableSubscription subscription = new SafeObservableSubscription ();
77- actualOperator .add (subscription ); // add to parent if the Subject itself is unsubscribed
7875 addedObserver = true ;
79- subscription .wrap (Subscriptions .create (new Action0 () {
76+ // add to parent if the Subject itself is unsubscribed
77+ actualOperator .add (Subscriptions .create (new Action0 () {
8078
8179 @ Override
8280 public void call () {
@@ -85,19 +83,19 @@ public void call() {
8583 do {
8684 current = state .get ();
8785 // on unsubscribe remove it from the map of outbound observers to notify
88- newState = current .removeObserver (subscription );
86+ newState = current .removeObserver (observer );
8987 } while (!state .compareAndSet (current , newState ));
9088 if (onUnsubscribe != null ) {
9189 onUnsubscribe .call (observer );
9290 }
9391 }
9492 }));
95- if (subscription .isUnsubscribed ()) {
96- addedObserver = false ;
93+ if (actualOperator .isUnsubscribed ()) {
94+ // we've been unsubscribed while working so return and do nothing
9795 return ;
9896 }
9997 // on subscribe add it to the map of outbound observers to notify
100- newState = current .addObserver (subscription , observer );
98+ newState = current .addObserver (observer );
10199 }
102100 } while (!state .compareAndSet (current , newState ));
103101
@@ -126,7 +124,7 @@ protected Collection<SubjectObserver<? super T>> terminate(Action1<Collection<Su
126124 }
127125 } while (!state .compareAndSet (current , newState ));
128126
129- Collection <SubjectObserver <? super T >> observerCollection = (Collection )Arrays .asList (newState .observers );
127+ Collection <SubjectObserver <? super T >> observerCollection = (Collection ) Arrays .asList (newState .observers );
130128 /*
131129 * if we get here then we won setting the state to terminated
132130 * and have a deterministic set of Observers to emit to (concurrent subscribes
@@ -158,97 +156,79 @@ public SubjectObserver<Object>[] rawSnapshot() {
158156 protected static class State <T > {
159157 final boolean terminated ;
160158 final CountDownLatch terminationLatch ;
161- final Subscription [] subscriptions ;
162159 final SubjectObserver [] observers ;
163160 // to avoid lots of empty arrays
164- final Subscription [] EMPTY_S = new Subscription [0 ];
165- // to avoid lots of empty arrays
166161 final SubjectObserver [] EMPTY_O = new SubjectObserver [0 ];
167162
168- private State (boolean isTerminated , CountDownLatch terminationLatch ,
169- Subscription [] subscriptions , SubjectObserver [] observers ) {
163+ private State (boolean isTerminated , CountDownLatch terminationLatch , SubjectObserver [] observers ) {
170164 this .terminationLatch = terminationLatch ;
171165 this .terminated = isTerminated ;
172- this .subscriptions = subscriptions ;
173166 this .observers = observers ;
174167 }
175168
176169 State () {
177170 this .terminated = false ;
178171 this .terminationLatch = null ;
179- this .subscriptions = EMPTY_S ;
180172 this .observers = EMPTY_O ;
181173 }
182174
183175 public State <T > terminate () {
184176 if (terminated ) {
185177 throw new IllegalStateException ("Already terminated." );
186178 }
187- return new State <T >(true , new CountDownLatch (1 ), subscriptions , observers );
179+ return new State <T >(true , new CountDownLatch (1 ), observers );
188180 }
189181
190- public State <T > addObserver (Subscription s , SubjectObserver <? super T > observer ) {
182+ public State <T > addObserver (SubjectObserver <? super T > observer ) {
191183 int n = this .observers .length ;
192184
193- Subscription [] newsubscriptions = Arrays .copyOf (this .subscriptions , n + 1 );
194185 SubjectObserver [] newobservers = Arrays .copyOf (this .observers , n + 1 );
195186
196- newsubscriptions [n ] = s ;
197187 newobservers [n ] = observer ;
198188
199- return createNewWith (newsubscriptions , newobservers );
189+ return createNewWith (newobservers );
200190 }
201191
202- private State <T > createNewWith (Subscription [] newsubscriptions , SubjectObserver [] newobservers ) {
203- return new State <T >(terminated , terminationLatch , newsubscriptions , newobservers );
192+ private State <T > createNewWith (SubjectObserver [] newobservers ) {
193+ return new State <T >(terminated , terminationLatch , newobservers );
204194 }
205195
206- public State <T > removeObserver (Subscription s ) {
196+ public State <T > removeObserver (SubjectObserver <? super T > o ) {
207197 // we are empty, nothing to remove
208198 if (this .observers .length == 0 ) {
209199 return this ;
210- } else
211- if (this .observers .length == 1 ) {
212- if (this .subscriptions [0 ].equals (s )) {
213- return createNewWith (EMPTY_S , EMPTY_O );
214- }
215- return this ;
216200 }
201+
217202 int n = this .observers .length - 1 ;
218203 int copied = 0 ;
219- Subscription [] newsubscriptions = new Subscription [n ];
220204 SubjectObserver [] newobservers = new SubjectObserver [n ];
221205
222- for (int i = 0 ; i < this .subscriptions .length ; i ++) {
223- Subscription s0 = this .subscriptions [i ];
224- if (!s0 .equals (s )) {
206+ for (int i = 0 ; i < this .observers .length ; i ++) {
207+ SubjectObserver s0 = this .observers [i ];
208+ if (!s0 .equals (o )) {
225209 if (copied == n ) {
226210 // if s was not found till the end of the iteration
227211 // we return ourselves since no modification should
228212 // have happened
229213 return this ;
230214 }
231- newsubscriptions [copied ] = s0 ;
232- newobservers [copied ] = this .observers [i ];
215+ newobservers [copied ] = s0 ;
233216 copied ++;
234217 }
235218 }
236219
237220 if (copied == 0 ) {
238- return createNewWith (EMPTY_S , EMPTY_O );
221+ return createNewWith (EMPTY_O );
239222 }
240223 // if somehow copied less than expected, truncate the arrays
241224 // if s is unique, this should never happen
242225 if (copied < n ) {
243- Subscription [] newsubscriptions2 = new Subscription [copied ];
244- System .arraycopy (newsubscriptions , 0 , newsubscriptions2 , 0 , copied );
245-
246226 SubjectObserver [] newobservers2 = new SubjectObserver [copied ];
247227 System .arraycopy (newobservers , 0 , newobservers2 , 0 , copied );
248228
249- return createNewWith (newsubscriptions2 , newobservers2 );
229+ return createNewWith (newobservers2 );
250230 }
251- return createNewWith (newsubscriptions , newobservers );
231+ return createNewWith (newobservers );
252232 }
253233 }
254234
0 commit comments