1616package rx .subscriptions ;
1717
1818import java .util .concurrent .atomic .AtomicBoolean ;
19- import java .util .concurrent .atomic .AtomicInteger ;
2019import java .util .concurrent .atomic .AtomicReference ;
20+
2121import rx .Subscription ;
2222
2323/**
2727 * @see <a href='http://msdn.microsoft.com/en-us/library/system.reactive.disposables.refcountdisposable.aspx'>MSDN RefCountDisposable</a>
2828 */
2929public class RefCountSubscription implements Subscription {
30- /** The state for the atomic operations. */
31- private enum State {
32- ACTIVE ,
33- MUTATING ,
34- UNSUBSCRIBED
35- }
36-
3730 /** The reference to the actual subscription. */
38- private volatile Subscription main ;
39- /** The current state. */
40- private final AtomicReference <State > state = new AtomicReference <State >();
41- /** Counts the number of sub-subscriptions. */
42- private final AtomicInteger count = new AtomicInteger ();
43- /** Indicate the request to unsubscribe from the main. */
44- private final AtomicBoolean mainDone = new AtomicBoolean ();
31+ private final Subscription actual ;
32+ /** Counts the number of subscriptions (1 parent + multiple children) */
33+ private final AtomicReference <State > state = new AtomicReference <State >(new State (false , 0 ));
34+
35+ private static final class State {
36+ final boolean isUnsubscribed ;
37+ final int children ;
38+
39+ State (boolean u , int c ) {
40+ this .isUnsubscribed = u ;
41+ this .children = c ;
42+ }
43+
44+ State addChild () {
45+ return new State (isUnsubscribed , children + 1 );
46+ }
47+
48+ State removeChild () {
49+ return new State (isUnsubscribed , children - 1 );
50+ }
51+
52+ State unsubscribe () {
53+ return new State (true , children );
54+ }
55+
56+ }
4557
4658 /**
4759 * Create a RefCountSubscription by wrapping the given non-null Subscription.
@@ -52,87 +64,52 @@ public RefCountSubscription(Subscription s) {
5264 if (s == null ) {
5365 throw new IllegalArgumentException ("s" );
5466 }
55- this .main = s ;
67+ this .actual = s ;
5668 }
5769
5870 /**
5971 * Returns a new sub-subscription.
6072 */
6173 public Subscription getSubscription () {
74+ State current ;
75+ State newState ;
6276 do {
63- State s = state .get ();
64- if (s == State . UNSUBSCRIBED ) {
77+ current = state .get ();
78+ if (current . isUnsubscribed ) {
6579 return Subscriptions .empty ();
80+ } else {
81+ newState = current .addChild ();
6682 }
67- if (s == State .MUTATING ) {
68- continue ;
69- }
70- if (state .compareAndSet (s , State .MUTATING )) {
71- count .incrementAndGet ();
72- state .set (State .ACTIVE );
73- return new InnerSubscription ();
74- }
75- } while (true );
83+ } while (!state .compareAndSet (current , newState ));
84+
85+ return new InnerSubscription ();
7686 }
7787
7888 /**
7989 * Check if this subscription is already unsubscribed.
8090 */
8191 public boolean isUnsubscribed () {
82- return state .get () == State . UNSUBSCRIBED ;
92+ return state .get (). isUnsubscribed ;
8393 }
8494
8595 @ Override
8696 public void unsubscribe () {
97+ State current ;
98+ State newState ;
8799 do {
88- State s = state .get ();
89- if (s == State . UNSUBSCRIBED ) {
100+ current = state .get ();
101+ if (current . isUnsubscribed ) {
90102 return ;
91103 }
92- if (s == State .MUTATING ) {
93- continue ;
94- }
95- if (state .compareAndSet (s , State .MUTATING )) {
96- if (mainDone .compareAndSet (false , true ) && count .get () == 0 ) {
97- terminate ();
98- return ;
99- }
100- state .set (State .ACTIVE );
101- break ;
102- }
103- } while (true );
104- }
105-
106- /**
107- * Terminate this subscription by unsubscribing from main and setting the
108- * state to UNSUBSCRIBED.
109- */
110- private void terminate () {
111- state .set (State .UNSUBSCRIBED );
112- Subscription r = main ;
113- main = null ;
114- r .unsubscribe ();
104+ newState = current .unsubscribe ();
105+ } while (!state .compareAndSet (current , newState ));
106+ unsubscribeActualIfApplicable (newState );
115107 }
116108
117- /** Remove an inner subscription. */
118- void innerDone () {
119- do {
120- State s = state .get ();
121- if (s == State .UNSUBSCRIBED ) {
122- return ;
123- }
124- if (s == State .MUTATING ) {
125- continue ;
126- }
127- if (state .compareAndSet (s , State .MUTATING )) {
128- if (count .decrementAndGet () == 0 && mainDone .get ()) {
129- terminate ();
130- return ;
131- }
132- state .set (State .ACTIVE );
133- break ;
134- }
135- } while (true );
109+ private void unsubscribeActualIfApplicable (State state ) {
110+ if (state .isUnsubscribed && state .children == 0 ) {
111+ actual .unsubscribe ();
112+ }
136113 }
137114
138115 /** The individual sub-subscriptions. */
@@ -142,7 +119,13 @@ class InnerSubscription implements Subscription {
142119 @ Override
143120 public void unsubscribe () {
144121 if (innerDone .compareAndSet (false , true )) {
145- innerDone ();
122+ State current ;
123+ State newState ;
124+ do {
125+ current = state .get ();
126+ newState = current .removeChild ();
127+ } while (!state .compareAndSet (current , newState ));
128+ unsubscribeActualIfApplicable (newState );
146129 }
147130 }
148131 };
0 commit comments