1- /**
2- * Copyright 2013 Netflix, Inc.
3- *
4- * Licensed under the Apache License, Version 2.0 (the "License");
5- * you may not use this file except in compliance with the License.
6- * You may obtain a copy of the License at
7- *
8- * http://www.apache.org/licenses/LICENSE-2.0
9- *
10- * Unless required by applicable law or agreed to in writing, software
11- * distributed under the License is distributed on an "AS IS" BASIS,
12- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13- * See the License for the specific language governing permissions and
14- * limitations under the License.
15- */
1+ /**
2+ * Copyright 2013 Netflix, Inc.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
1616package rx .subscriptions ;
1717
1818import static java .util .Arrays .asList ;
3131/**
3232 * Subscription that represents a group of Subscriptions that are unsubscribed
3333 * together.
34- *
35- * @see <a
36- * href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net
37- * equivalent CompositeDisposable</a>
34+ *
35+ * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
3836 */
3937public class CompositeSubscription implements Subscription {
4038 /** Sentinel to indicate a thread is modifying the subscription set. */
41- private static final Set <Subscription > MUTATE_SENTINEL = unmodifiableSet (Collections .<Subscription >emptySet ());
42- /** Sentinel to indicate the entire CompositeSubscription has been unsubscribed.*/
43- private static final Set <Subscription > UNSUBSCRIBED_SENTINEL = unmodifiableSet (Collections .<Subscription >emptySet ());
39+ private static final Set <Subscription > MUTATE_SENTINEL = unmodifiableSet (Collections .<Subscription > emptySet ());
40+ /** Sentinel to indicate the entire CompositeSubscription has been unsubscribed. */
41+ private static final Set <Subscription > UNSUBSCRIBED_SENTINEL = unmodifiableSet (Collections .<Subscription > emptySet ());
4442 /** The reference to the set of subscriptions. */
4543 private final AtomicReference <Set <Subscription >> reference = new AtomicReference <Set <Subscription >>();
46-
44+
4745 public CompositeSubscription (final Subscription ... subscriptions ) {
4846 reference .set (new HashSet <Subscription >(asList (subscriptions )));
4947 }
50-
48+
5149 public boolean isUnsubscribed () {
5250 return reference .get () == UNSUBSCRIBED_SENTINEL ;
5351 }
54-
52+
5553 public void add (final Subscription s ) {
5654 do {
5755 final Set <Subscription > existing = reference .get ();
5856 if (existing == UNSUBSCRIBED_SENTINEL ) {
5957 s .unsubscribe ();
6058 break ;
6159 }
62-
60+
6361 if (existing == MUTATE_SENTINEL ) {
6462 continue ;
6563 }
66-
64+
6765 if (reference .compareAndSet (existing , MUTATE_SENTINEL )) {
6866 existing .add (s );
6967 reference .set (existing );
7068 break ;
7169 }
7270 } while (true );
7371 }
74-
72+
7573 public void remove (final Subscription s ) {
7674 do {
7775 final Set <Subscription > subscriptions = reference .get ();
7876 if (subscriptions == UNSUBSCRIBED_SENTINEL ) {
7977 s .unsubscribe ();
8078 break ;
8179 }
82-
80+
8381 if (subscriptions == MUTATE_SENTINEL ) {
8482 continue ;
8583 }
86-
84+
8785 if (reference .compareAndSet (subscriptions , MUTATE_SENTINEL )) {
8886 // also unsubscribe from it:
8987 // http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
@@ -94,36 +92,39 @@ public void remove(final Subscription s) {
9492 }
9593 } while (true );
9694 }
97-
95+
9896 public void clear () {
9997 do {
10098 final Set <Subscription > subscriptions = reference .get ();
10199 if (subscriptions == UNSUBSCRIBED_SENTINEL ) {
102100 break ;
103101 }
104-
102+
105103 if (subscriptions == MUTATE_SENTINEL ) {
106104 continue ;
107105 }
108-
106+
109107 if (reference .compareAndSet (subscriptions , MUTATE_SENTINEL )) {
110108 final Set <Subscription > copy = new HashSet <Subscription >(
111109 subscriptions );
112110 subscriptions .clear ();
113111 reference .set (subscriptions );
114-
112+
115113 unsubscribeAll (copy );
116114 break ;
117115 }
118116 } while (true );
119117 }
118+
120119 /**
121120 * Unsubscribe from the collection of subscriptions.
122121 * <p>
123122 * Exceptions thrown by any of the {@code unsubscribe()} methods are
124123 * collected into a {@link CompositeException} and thrown once
125124 * all unsubscriptions have been attempted.
126- * @param subs the collection of subscriptions
125+ *
126+ * @param subs
127+ * the collection of subscriptions
127128 */
128129 private void unsubscribeAll (Collection <Subscription > subs ) {
129130 final Collection <Throwable > es = new ArrayList <Throwable >();
@@ -139,18 +140,19 @@ private void unsubscribeAll(Collection<Subscription> subs) {
139140 "Failed to unsubscribe to 1 or more subscriptions." , es );
140141 }
141142 }
143+
142144 @ Override
143145 public void unsubscribe () {
144146 do {
145147 final Set <Subscription > subscriptions = reference .get ();
146148 if (subscriptions == UNSUBSCRIBED_SENTINEL ) {
147149 break ;
148150 }
149-
151+
150152 if (subscriptions == MUTATE_SENTINEL ) {
151153 continue ;
152154 }
153-
155+
154156 if (reference .compareAndSet (subscriptions , UNSUBSCRIBED_SENTINEL )) {
155157 unsubscribeAll (subscriptions );
156158 break ;
0 commit comments