1515 */
1616package rx .subscriptions ;
1717
18- import static java .util .Arrays .asList ;
19- import static java .util .Collections .unmodifiableSet ;
20-
2118import java .util .ArrayList ;
19+ import java .util .Arrays ;
2220import java .util .Collection ;
23- import java .util .Collections ;
24- import java .util .HashSet ;
25- import java .util .Set ;
21+ import java .util .List ;
2622import java .util .concurrent .atomic .AtomicReference ;
2723
2824import rx .Subscription ;
3531 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
3632 */
3733public class CompositeSubscription implements Subscription {
38- /** Sentinel to indicate a thread is modifying the subscription set. */
39- private static final Set <Subscription > MUTATE_SENTINEL = unmodifiableSet (Collections .<Subscription > emptySet ());
40- /** Sentinel to indicate the entire CompositeSubscription has been unsubscribed. */
41- private static final Set <Subscription > UNSUBSCRIBED_SENTINEL = unmodifiableSet (Collections .<Subscription > emptySet ());
42- /** The reference to the set of subscriptions. */
43- private final AtomicReference <Set <Subscription >> reference = new AtomicReference <Set <Subscription >>();
34+
35+ private final AtomicReference <State > state = new AtomicReference <State >();
36+
37+ private static final class State {
38+ final boolean isUnsubscribed ;
39+ final List <Subscription > subscriptions ;
40+
41+ State (boolean u , List <Subscription > s ) {
42+ this .isUnsubscribed = u ;
43+ this .subscriptions = s ;
44+ }
45+
46+ State unsubscribe () {
47+ return new State (true , subscriptions );
48+ }
49+
50+ State add (Subscription s ) {
51+ List <Subscription > newSubscriptions = new ArrayList <Subscription >();
52+ newSubscriptions .addAll (subscriptions );
53+ newSubscriptions .add (s );
54+ return new State (isUnsubscribed , newSubscriptions );
55+ }
56+
57+ State remove (Subscription s ) {
58+ List <Subscription > newSubscriptions = new ArrayList <Subscription >();
59+ newSubscriptions .addAll (subscriptions );
60+ newSubscriptions .remove (s ); // only first occurrence
61+ return new State (isUnsubscribed , newSubscriptions );
62+ }
63+
64+ State clear () {
65+ return new State (isUnsubscribed , new ArrayList <Subscription >());
66+ }
67+ }
4468
4569 public CompositeSubscription (final Subscription ... subscriptions ) {
46- reference .set (new HashSet < Subscription >( asList (subscriptions )));
70+ state .set (new State ( false , Arrays . asList (subscriptions )));
4771 }
4872
4973 public boolean isUnsubscribed () {
50- return reference .get () == UNSUBSCRIBED_SENTINEL ;
74+ return state .get (). isUnsubscribed ;
5175 }
5276
5377 public void add (final Subscription s ) {
78+ State current ;
79+ State newState ;
5480 do {
55- final Set < Subscription > existing = reference .get ();
56- if (existing == UNSUBSCRIBED_SENTINEL ) {
81+ current = state .get ();
82+ if (current . isUnsubscribed ) {
5783 s .unsubscribe ();
58- break ;
59- }
60-
61- if (existing == MUTATE_SENTINEL ) {
62- continue ;
84+ return ;
85+ } else {
86+ newState = current .add (s );
6387 }
64-
65- if (reference .compareAndSet (existing , MUTATE_SENTINEL )) {
66- existing .add (s );
67- reference .set (existing );
68- break ;
69- }
70- } while (true );
88+ } while (!state .compareAndSet (current , newState ));
7189 }
7290
7391 public void remove (final Subscription s ) {
92+ State current ;
93+ State newState ;
7494 do {
75- final Set <Subscription > subscriptions = reference .get ();
76- if (subscriptions == UNSUBSCRIBED_SENTINEL ) {
77- s .unsubscribe ();
78- break ;
79- }
80-
81- if (subscriptions == MUTATE_SENTINEL ) {
82- continue ;
95+ current = state .get ();
96+ if (current .isUnsubscribed ) {
97+ return ;
98+ } else {
99+ newState = current .remove (s );
83100 }
84-
85- if (reference .compareAndSet (subscriptions , MUTATE_SENTINEL )) {
86- // also unsubscribe from it:
87- // http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
88- subscriptions .remove (s );
89- reference .set (subscriptions );
90- s .unsubscribe ();
91- break ;
92- }
93- } while (true );
101+ } while (!state .compareAndSet (current , newState ));
102+ // if we removed successfully we then need to call unsubscribe on it
103+ s .unsubscribe ();
94104 }
95105
96106 public void clear () {
107+ State current ;
108+ State newState ;
97109 do {
98- final Set <Subscription > subscriptions = reference .get ();
99- if (subscriptions == UNSUBSCRIBED_SENTINEL ) {
100- break ;
101- }
102-
103- if (subscriptions == MUTATE_SENTINEL ) {
104- continue ;
110+ current = state .get ();
111+ if (current .isUnsubscribed ) {
112+ return ;
113+ } else {
114+ newState = current .clear ();
105115 }
116+ } while (!state .compareAndSet (current , newState ));
117+ // if we cleared successfully we then need to call unsubscribe on all previous
118+ // current is now "previous"
119+ unsubscribeFromAll (current .subscriptions );
120+ }
106121
107- if (reference .compareAndSet (subscriptions , MUTATE_SENTINEL )) {
108- final Set <Subscription > copy = new HashSet <Subscription >(
109- subscriptions );
110- subscriptions .clear ();
111- reference .set (subscriptions );
112-
113- unsubscribeAll (copy );
114- break ;
122+ @ Override
123+ public void unsubscribe () {
124+ State current ;
125+ State newState ;
126+ do {
127+ current = state .get ();
128+ if (current .isUnsubscribed ) {
129+ return ;
130+ } else {
131+ newState = current .unsubscribe ();
115132 }
116- } while (true );
133+ } while (!state .compareAndSet (current , newState ));
134+ // current is now "previous"
135+ unsubscribeFromAll (current .subscriptions );
117136 }
118137
119- /**
120- * Unsubscribe from the collection of subscriptions.
121- * <p>
122- * Exceptions thrown by any of the {@code unsubscribe()} methods are
123- * collected into a {@link CompositeException} and thrown once
124- * all unsubscriptions have been attempted.
125- *
126- * @param subs
127- * the collection of subscriptions
128- */
129- private void unsubscribeAll (Collection <Subscription > subs ) {
138+ private static void unsubscribeFromAll (Collection <Subscription > subscriptions ) {
130139 final Collection <Throwable > es = new ArrayList <Throwable >();
131- for (final Subscription s : subs ) {
140+ for (Subscription s : subscriptions ) {
132141 try {
133142 s .unsubscribe ();
134- } catch (final Throwable e ) {
143+ } catch (Throwable e ) {
135144 es .add (e );
136145 }
137146 }
@@ -140,23 +149,4 @@ private void unsubscribeAll(Collection<Subscription> subs) {
140149 "Failed to unsubscribe to 1 or more subscriptions." , es );
141150 }
142151 }
143-
144- @ Override
145- public void unsubscribe () {
146- do {
147- final Set <Subscription > subscriptions = reference .get ();
148- if (subscriptions == UNSUBSCRIBED_SENTINEL ) {
149- break ;
150- }
151-
152- if (subscriptions == MUTATE_SENTINEL ) {
153- continue ;
154- }
155-
156- if (reference .compareAndSet (subscriptions , UNSUBSCRIBED_SENTINEL )) {
157- unsubscribeAll (subscriptions );
158- break ;
159- }
160- } while (true );
161- }
162152}
0 commit comments