Skip to content

Commit 715dcec

Browse files
committed
Drop use of WeakReferences and use a custom subscription instead
1 parent cb08eba commit 715dcec

2 files changed

Lines changed: 104 additions & 50 deletions

File tree

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package rx.android;
2+
3+
import rx.Observable;
4+
import rx.operators.OperationObserveFromAndroidComponent;
5+
6+
import android.app.Activity;
7+
import android.app.Fragment;
8+
9+
public class AndroidObservables {
10+
11+
public static <T> Observable<T> fromActivity(Activity activity, Observable<T> sourceObservable) {
12+
return OperationObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, activity);
13+
}
14+
15+
public static <T> Observable<T> fromFragment(Fragment fragment, Observable<T> sourceObservable) {
16+
return OperationObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, fragment);
17+
}
18+
19+
public static <T> Observable<T> fromFragment(android.support.v4.app.Fragment fragment, Observable<T> sourceObservable) {
20+
return OperationObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, fragment);
21+
}
22+
23+
}

rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveInForeground.java renamed to rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java

Lines changed: 81 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static org.mockito.Matchers.any;
44
import static org.mockito.Matchers.anyInt;
5+
import static org.mockito.Mockito.mock;
56
import static org.mockito.Mockito.never;
67
import static org.mockito.Mockito.times;
78
import static org.mockito.Mockito.verify;
@@ -25,84 +26,97 @@
2526
import android.app.Fragment;
2627
import android.util.Log;
2728

28-
import java.lang.ref.WeakReference;
29+
import java.lang.reflect.Field;
2930

30-
public class OperationObserveInForeground {
31+
public class OperationObserveFromAndroidComponent {
3132

32-
public static <T> Observable<T> observeOnFragment(Observable<T> source, android.app.Fragment fragment) {
33-
return Observable.create(new OnSubscribeFragment<T>(source, new WeakReference<Fragment>(fragment)));
33+
public static <T> Observable<T> observeFromAndroidComponent(Observable<T> source, android.app.Fragment fragment) {
34+
return Observable.create(new OnSubscribeFragment<T>(source, fragment));
3435
}
3536

36-
public static <T> Observable<T> observeOnSupportFragment(Observable<T> source, android.support.v4.app.Fragment fragment) {
37-
return Observable.create(new OnSubscribeSupportFragment<T>(
38-
source, new WeakReference<android.support.v4.app.Fragment>(fragment)));
37+
public static <T> Observable<T> observeFromAndroidComponent(Observable<T> source, android.support.v4.app.Fragment fragment) {
38+
return Observable.create(new OnSubscribeSupportFragment<T>(source, fragment));
3939
}
4040

41-
public static <T> Observable<T> observeOnActivity(Observable<T> source, Activity activity) {
42-
return Observable.create(new OnSubscribeActivity<T>(source, new WeakReference<Activity>(activity)));
41+
public static <T> Observable<T> observeFromAndroidComponent(Observable<T> source, Activity activity) {
42+
return Observable.create(new OnSubscribeActivity<T>(source, activity));
4343
}
4444

4545
private static abstract class OnSubscribeBase<T, AndroidComponent> implements Observable.OnSubscribeFunc<T> {
4646

47+
private static final String LOG_TAG = OperationObserveFromAndroidComponent.class.getSimpleName();
48+
4749
private final Observable<T> source;
48-
private final WeakReference<AndroidComponent> componentRef;
49-
private Observer<? super T> actual;
50+
private volatile AndroidComponent componentRef;
51+
private volatile Observer<? super T> observerRef;
5052

51-
private OnSubscribeBase(Observable<T> source, WeakReference<AndroidComponent> componentRef) {
53+
private OnSubscribeBase(Observable<T> source, AndroidComponent component) {
5254
this.source = source;
53-
this.componentRef = componentRef;
55+
this.componentRef = component;
56+
}
57+
58+
private void log(String message) {
59+
Log.d(LOG_TAG, "componentRef = " + componentRef);
60+
Log.d(LOG_TAG, "observerRef = " + observerRef);
61+
Log.d(LOG_TAG, message);
5462
}
5563

5664
protected abstract boolean isComponentValid(AndroidComponent component);
5765

5866
@Override
5967
public Subscription onSubscribe(Observer<? super T> observer) {
60-
actual = observer;
61-
return source.observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<T>() {
68+
observerRef = observer;
69+
final Subscription sourceSub = source.observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<T>() {
6270
@Override
6371
public void onCompleted() {
64-
AndroidComponent component = componentRef.get();
65-
if (component != null && isComponentValid(component)) {
66-
actual.onCompleted();
72+
if (componentRef != null && isComponentValid(componentRef)) {
73+
observerRef.onCompleted();
6774
} else {
68-
actual = null;
6975
log("onComplete: target component released or detached; dropping message");
76+
releaseReferences();
7077
}
7178
}
7279

7380
@Override
7481
public void onError(Throwable e) {
75-
AndroidComponent component = componentRef.get();
76-
if (component != null && isComponentValid(component)) {
77-
actual.onError(e);
82+
if (componentRef != null && isComponentValid(componentRef)) {
83+
observerRef.onError(e);
7884
} else {
79-
actual = null;
8085
log("onError: target component released or detached; dropping message");
86+
releaseReferences();
8187
}
8288
}
8389

8490
@Override
8591
public void onNext(T args) {
86-
AndroidComponent component = componentRef.get();
87-
if (component != null && isComponentValid(component)) {
88-
actual.onNext(args);
92+
if (componentRef != null && isComponentValid(componentRef)) {
93+
observerRef.onNext(args);
8994
} else {
90-
actual = null;
9195
log("onNext: target component released or detached; dropping message");
96+
releaseReferences();
9297
}
9398
}
94-
95-
private void log(String message) {
96-
Log.i(OperationObserveInForeground.class.getSimpleName(), message);
97-
}
9899
});
100+
return new Subscription() {
101+
@Override
102+
public void unsubscribe() {
103+
log("unsubscribing from source sequence");
104+
releaseReferences();
105+
sourceSub.unsubscribe();
106+
}
107+
};
108+
}
109+
110+
private void releaseReferences() {
111+
observerRef = null;
112+
componentRef = null;
99113
}
100114
}
101115

102116
private static final class OnSubscribeFragment<T> extends OnSubscribeBase<T, android.app.Fragment> {
103117

104-
private OnSubscribeFragment(Observable<T> source, WeakReference<android.app.Fragment> fragmentRef) {
105-
super(source, fragmentRef);
118+
private OnSubscribeFragment(Observable<T> source, android.app.Fragment fragment) {
119+
super(source, fragment);
106120
}
107121

108122
@Override
@@ -113,7 +127,7 @@ protected boolean isComponentValid(android.app.Fragment fragment) {
113127

114128
private static final class OnSubscribeSupportFragment<T> extends OnSubscribeBase<T, android.support.v4.app.Fragment> {
115129

116-
private OnSubscribeSupportFragment(Observable<T> source, WeakReference<android.support.v4.app.Fragment> fragment) {
130+
private OnSubscribeSupportFragment(Observable<T> source, android.support.v4.app.Fragment fragment) {
117131
super(source, fragment);
118132
}
119133

@@ -125,7 +139,7 @@ protected boolean isComponentValid(android.support.v4.app.Fragment fragment) {
125139

126140
private static final class OnSubscribeActivity<T> extends OnSubscribeBase<T, Activity> {
127141

128-
private OnSubscribeActivity(Observable<T> source, WeakReference<Activity> activity) {
142+
private OnSubscribeActivity(Observable<T> source, Activity activity) {
129143
super(source, activity);
130144
}
131145

@@ -159,14 +173,14 @@ public void setupMocks() {
159173

160174
@Test
161175
public void itObservesTheSourceSequenceOnTheMainUIThread() {
162-
OperationObserveInForeground.observeOnFragment(mockObservable, mockFragment).subscribe(mockObserver);
176+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(mockObservable, mockFragment).subscribe(mockObserver);
163177
verify(mockObservable).observeOn(AndroidSchedulers.mainThread());
164178
}
165179

166180
@Test
167181
public void itForwardsOnNextOnCompletedSequenceToTargetObserver() {
168182
Observable<Integer> source = Observable.from(1, 2, 3);
169-
OperationObserveInForeground.observeOnFragment(source, mockFragment).subscribe(mockObserver);
183+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(mockObserver);
170184
verify(mockObserver, times(3)).onNext(anyInt());
171185
verify(mockObserver).onCompleted();
172186
verify(mockObserver, never()).onError(any(Exception.class));
@@ -176,22 +190,21 @@ public void itForwardsOnNextOnCompletedSequenceToTargetObserver() {
176190
public void itForwardsOnErrorToTargetObserver() {
177191
final Exception exception = new Exception();
178192
Observable<Integer> source = Observable.error(exception);
179-
OperationObserveInForeground.observeOnFragment(source, mockFragment).subscribe(mockObserver);
193+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(mockObserver);
180194
verify(mockObserver).onError(exception);
181195
verify(mockObserver, never()).onNext(anyInt());
182196
verify(mockObserver, never()).onCompleted();
183197
}
184198

185199
@Test
186-
public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() {
200+
public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() throws Throwable {
187201
PublishSubject<Integer> source = PublishSubject.create();
188-
WeakReference<Fragment> fragmentRef = new WeakReference<Fragment>(mockFragment);
189202

190-
final OnSubscribeFragment<Integer> operator = new OnSubscribeFragment<Integer>(source, fragmentRef);
203+
final OnSubscribeFragment<Integer> operator = new OnSubscribeFragment<Integer>(source, mockFragment);
191204
operator.onSubscribe(mockObserver);
192205

193206
source.onNext(1);
194-
fragmentRef.clear();
207+
releaseComponentRef(operator);
195208

196209
source.onNext(2);
197210
source.onNext(3);
@@ -202,26 +215,31 @@ public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() {
202215
}
203216

204217
@Test
205-
public void itDropsOnErrorIfTargetComponentIsGone() {
218+
public void itDropsOnErrorIfTargetComponentIsGone() throws Throwable {
206219
PublishSubject<Integer> source = PublishSubject.create();
207-
WeakReference<Fragment> fragmentRef = new WeakReference<Fragment>(mockFragment);
208220

209-
final OnSubscribeFragment<Integer> operator = new OnSubscribeFragment<Integer>(source, fragmentRef);
221+
final OnSubscribeFragment<Integer> operator = new OnSubscribeFragment<Integer>(source, mockFragment);
210222
operator.onSubscribe(mockObserver);
211223

212224
source.onNext(1);
213-
fragmentRef.clear();
225+
releaseComponentRef(operator);
214226

215227
source.onError(new Exception());
216228

217229
verify(mockObserver).onNext(1);
218230
verifyNoMoreInteractions(mockObserver);
219231
}
220232

233+
private void releaseComponentRef(OnSubscribeFragment<Integer> operator) throws NoSuchFieldException, IllegalAccessException {
234+
final Field componentRef = operator.getClass().getSuperclass().getDeclaredField("componentRef");
235+
componentRef.setAccessible(true);
236+
componentRef.set(operator, null);
237+
}
238+
221239
@Test
222240
public void itDoesNotForwardOnNextOnCompletedSequenceIfFragmentIsDetached() {
223241
PublishSubject<Integer> source = PublishSubject.create();
224-
OperationObserveInForeground.observeOnFragment(source, mockFragment).subscribe(mockObserver);
242+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(mockObserver);
225243

226244
source.onNext(1);
227245

@@ -237,7 +255,7 @@ public void itDoesNotForwardOnNextOnCompletedSequenceIfFragmentIsDetached() {
237255
@Test
238256
public void itDoesNotForwardOnErrorIfFragmentIsDetached() {
239257
PublishSubject<Integer> source = PublishSubject.create();
240-
OperationObserveInForeground.observeOnFragment(source, mockFragment).subscribe(mockObserver);
258+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(mockObserver);
241259

242260
source.onNext(1);
243261

@@ -251,7 +269,7 @@ public void itDoesNotForwardOnErrorIfFragmentIsDetached() {
251269
@Test
252270
public void isDoesNotForwardOnNextOnCompletedSequenceIfActivityIsFinishing() {
253271
PublishSubject<Integer> source = PublishSubject.create();
254-
OperationObserveInForeground.observeOnActivity(source, mockActivity).subscribe(mockObserver);
272+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockActivity).subscribe(mockObserver);
255273

256274
source.onNext(1);
257275

@@ -267,7 +285,7 @@ public void isDoesNotForwardOnNextOnCompletedSequenceIfActivityIsFinishing() {
267285
@Test
268286
public void itDoesNotForwardOnErrorIfActivityIsFinishing() {
269287
PublishSubject<Integer> source = PublishSubject.create();
270-
OperationObserveInForeground.observeOnActivity(source, mockActivity).subscribe(mockObserver);
288+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockActivity).subscribe(mockObserver);
271289

272290
source.onNext(1);
273291

@@ -277,5 +295,18 @@ public void itDoesNotForwardOnErrorIfActivityIsFinishing() {
277295
verify(mockObserver).onNext(1);
278296
verify(mockObserver, never()).onError(any(Exception.class));
279297
}
298+
299+
@Test
300+
public void itUnsubscribesFromTheSourceSequence() {
301+
Subscription underlying = mock(Subscription.class);
302+
when(mockObservable.observeOn(AndroidSchedulers.mainThread())).thenReturn(mockObservable);
303+
when(mockObservable.subscribe(any(Observer.class))).thenReturn(underlying);
304+
305+
Subscription sub = OperationObserveFromAndroidComponent.observeFromAndroidComponent(
306+
mockObservable, mockActivity).subscribe(mockObserver);
307+
sub.unsubscribe();
308+
309+
verify(underlying).unsubscribe();
310+
}
280311
}
281312
}

0 commit comments

Comments
 (0)