22
33import static org .mockito .Matchers .any ;
44import static org .mockito .Matchers .anyInt ;
5+ import static org .mockito .Mockito .mock ;
56import static org .mockito .Mockito .never ;
67import static org .mockito .Mockito .times ;
78import static org .mockito .Mockito .verify ;
2526import android .app .Fragment ;
2627import android .util .Log ;
2728
28- import java .lang .ref . WeakReference ;
29+ import java .lang .reflect . Field ;
2930
30- public class OperationObserveInForeground {
31+ public class OperationObserveFromAndroidComponent {
3132
32- public static <T > Observable <T > observeOnFragment (Observable <T > source , android .app .Fragment fragment ) {
33- return Observable .create (new OnSubscribeFragment <T >(source , new WeakReference < Fragment >( fragment ) ));
33+ public static <T > Observable <T > observeFromAndroidComponent (Observable <T > source , android .app .Fragment fragment ) {
34+ return Observable .create (new OnSubscribeFragment <T >(source , fragment ));
3435 }
3536
36- public static <T > Observable <T > observeOnSupportFragment (Observable <T > source , android .support .v4 .app .Fragment fragment ) {
37- return Observable .create (new OnSubscribeSupportFragment <T >(
38- source , new WeakReference <android .support .v4 .app .Fragment >(fragment )));
37+ public static <T > Observable <T > observeFromAndroidComponent (Observable <T > source , android .support .v4 .app .Fragment fragment ) {
38+ return Observable .create (new OnSubscribeSupportFragment <T >(source , fragment ));
3939 }
4040
41- public static <T > Observable <T > observeOnActivity (Observable <T > source , Activity activity ) {
42- return Observable .create (new OnSubscribeActivity <T >(source , new WeakReference < Activity >( activity ) ));
41+ public static <T > Observable <T > observeFromAndroidComponent (Observable <T > source , Activity activity ) {
42+ return Observable .create (new OnSubscribeActivity <T >(source , activity ));
4343 }
4444
4545 private static abstract class OnSubscribeBase <T , AndroidComponent > implements Observable .OnSubscribeFunc <T > {
4646
47+ private static final String LOG_TAG = OperationObserveFromAndroidComponent .class .getSimpleName ();
48+
4749 private final Observable <T > source ;
48- private final WeakReference < AndroidComponent > componentRef ;
49- private Observer <? super T > actual ;
50+ private volatile AndroidComponent componentRef ;
51+ private volatile Observer <? super T > observerRef ;
5052
51- private OnSubscribeBase (Observable <T > source , WeakReference < AndroidComponent > componentRef ) {
53+ private OnSubscribeBase (Observable <T > source , AndroidComponent component ) {
5254 this .source = source ;
53- this .componentRef = componentRef ;
55+ this .componentRef = component ;
56+ }
57+
58+ private void log (String message ) {
59+ Log .d (LOG_TAG , "componentRef = " + componentRef );
60+ Log .d (LOG_TAG , "observerRef = " + observerRef );
61+ Log .d (LOG_TAG , message );
5462 }
5563
5664 protected abstract boolean isComponentValid (AndroidComponent component );
5765
5866 @ Override
5967 public Subscription onSubscribe (Observer <? super T > observer ) {
60- actual = observer ;
61- return source .observeOn (AndroidSchedulers .mainThread ()).subscribe (new Observer <T >() {
68+ observerRef = observer ;
69+ final Subscription sourceSub = source .observeOn (AndroidSchedulers .mainThread ()).subscribe (new Observer <T >() {
6270 @ Override
6371 public void onCompleted () {
64- AndroidComponent component = componentRef .get ();
65- if (component != null && isComponentValid (component )) {
66- actual .onCompleted ();
72+ if (componentRef != null && isComponentValid (componentRef )) {
73+ observerRef .onCompleted ();
6774 } else {
68- actual = null ;
6975 log ("onComplete: target component released or detached; dropping message" );
76+ releaseReferences ();
7077 }
7178 }
7279
7380 @ Override
7481 public void onError (Throwable e ) {
75- AndroidComponent component = componentRef .get ();
76- if (component != null && isComponentValid (component )) {
77- actual .onError (e );
82+ if (componentRef != null && isComponentValid (componentRef )) {
83+ observerRef .onError (e );
7884 } else {
79- actual = null ;
8085 log ("onError: target component released or detached; dropping message" );
86+ releaseReferences ();
8187 }
8288 }
8389
8490 @ Override
8591 public void onNext (T args ) {
86- AndroidComponent component = componentRef .get ();
87- if (component != null && isComponentValid (component )) {
88- actual .onNext (args );
92+ if (componentRef != null && isComponentValid (componentRef )) {
93+ observerRef .onNext (args );
8994 } else {
90- actual = null ;
9195 log ("onNext: target component released or detached; dropping message" );
96+ releaseReferences ();
9297 }
9398 }
94-
95- private void log (String message ) {
96- Log .i (OperationObserveInForeground .class .getSimpleName (), message );
97- }
9899 });
100+ return new Subscription () {
101+ @ Override
102+ public void unsubscribe () {
103+ log ("unsubscribing from source sequence" );
104+ releaseReferences ();
105+ sourceSub .unsubscribe ();
106+ }
107+ };
108+ }
109+
110+ private void releaseReferences () {
111+ observerRef = null ;
112+ componentRef = null ;
99113 }
100114 }
101115
102116 private static final class OnSubscribeFragment <T > extends OnSubscribeBase <T , android .app .Fragment > {
103117
104- private OnSubscribeFragment (Observable <T > source , WeakReference < android .app .Fragment > fragmentRef ) {
105- super (source , fragmentRef );
118+ private OnSubscribeFragment (Observable <T > source , android .app .Fragment fragment ) {
119+ super (source , fragment );
106120 }
107121
108122 @ Override
@@ -113,7 +127,7 @@ protected boolean isComponentValid(android.app.Fragment fragment) {
113127
114128 private static final class OnSubscribeSupportFragment <T > extends OnSubscribeBase <T , android .support .v4 .app .Fragment > {
115129
116- private OnSubscribeSupportFragment (Observable <T > source , WeakReference < android .support .v4 .app .Fragment > fragment ) {
130+ private OnSubscribeSupportFragment (Observable <T > source , android .support .v4 .app .Fragment fragment ) {
117131 super (source , fragment );
118132 }
119133
@@ -125,7 +139,7 @@ protected boolean isComponentValid(android.support.v4.app.Fragment fragment) {
125139
126140 private static final class OnSubscribeActivity <T > extends OnSubscribeBase <T , Activity > {
127141
128- private OnSubscribeActivity (Observable <T > source , WeakReference < Activity > activity ) {
142+ private OnSubscribeActivity (Observable <T > source , Activity activity ) {
129143 super (source , activity );
130144 }
131145
@@ -159,14 +173,14 @@ public void setupMocks() {
159173
160174 @ Test
161175 public void itObservesTheSourceSequenceOnTheMainUIThread () {
162- OperationObserveInForeground . observeOnFragment (mockObservable , mockFragment ).subscribe (mockObserver );
176+ OperationObserveFromAndroidComponent . observeFromAndroidComponent (mockObservable , mockFragment ).subscribe (mockObserver );
163177 verify (mockObservable ).observeOn (AndroidSchedulers .mainThread ());
164178 }
165179
166180 @ Test
167181 public void itForwardsOnNextOnCompletedSequenceToTargetObserver () {
168182 Observable <Integer > source = Observable .from (1 , 2 , 3 );
169- OperationObserveInForeground . observeOnFragment (source , mockFragment ).subscribe (mockObserver );
183+ OperationObserveFromAndroidComponent . observeFromAndroidComponent (source , mockFragment ).subscribe (mockObserver );
170184 verify (mockObserver , times (3 )).onNext (anyInt ());
171185 verify (mockObserver ).onCompleted ();
172186 verify (mockObserver , never ()).onError (any (Exception .class ));
@@ -176,22 +190,21 @@ public void itForwardsOnNextOnCompletedSequenceToTargetObserver() {
176190 public void itForwardsOnErrorToTargetObserver () {
177191 final Exception exception = new Exception ();
178192 Observable <Integer > source = Observable .error (exception );
179- OperationObserveInForeground . observeOnFragment (source , mockFragment ).subscribe (mockObserver );
193+ OperationObserveFromAndroidComponent . observeFromAndroidComponent (source , mockFragment ).subscribe (mockObserver );
180194 verify (mockObserver ).onError (exception );
181195 verify (mockObserver , never ()).onNext (anyInt ());
182196 verify (mockObserver , never ()).onCompleted ();
183197 }
184198
185199 @ Test
186- public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone () {
200+ public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone () throws Throwable {
187201 PublishSubject <Integer > source = PublishSubject .create ();
188- WeakReference <Fragment > fragmentRef = new WeakReference <Fragment >(mockFragment );
189202
190- final OnSubscribeFragment <Integer > operator = new OnSubscribeFragment <Integer >(source , fragmentRef );
203+ final OnSubscribeFragment <Integer > operator = new OnSubscribeFragment <Integer >(source , mockFragment );
191204 operator .onSubscribe (mockObserver );
192205
193206 source .onNext (1 );
194- fragmentRef . clear ( );
207+ releaseComponentRef ( operator );
195208
196209 source .onNext (2 );
197210 source .onNext (3 );
@@ -202,26 +215,31 @@ public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() {
202215 }
203216
204217 @ Test
205- public void itDropsOnErrorIfTargetComponentIsGone () {
218+ public void itDropsOnErrorIfTargetComponentIsGone () throws Throwable {
206219 PublishSubject <Integer > source = PublishSubject .create ();
207- WeakReference <Fragment > fragmentRef = new WeakReference <Fragment >(mockFragment );
208220
209- final OnSubscribeFragment <Integer > operator = new OnSubscribeFragment <Integer >(source , fragmentRef );
221+ final OnSubscribeFragment <Integer > operator = new OnSubscribeFragment <Integer >(source , mockFragment );
210222 operator .onSubscribe (mockObserver );
211223
212224 source .onNext (1 );
213- fragmentRef . clear ( );
225+ releaseComponentRef ( operator );
214226
215227 source .onError (new Exception ());
216228
217229 verify (mockObserver ).onNext (1 );
218230 verifyNoMoreInteractions (mockObserver );
219231 }
220232
233+ private void releaseComponentRef (OnSubscribeFragment <Integer > operator ) throws NoSuchFieldException , IllegalAccessException {
234+ final Field componentRef = operator .getClass ().getSuperclass ().getDeclaredField ("componentRef" );
235+ componentRef .setAccessible (true );
236+ componentRef .set (operator , null );
237+ }
238+
221239 @ Test
222240 public void itDoesNotForwardOnNextOnCompletedSequenceIfFragmentIsDetached () {
223241 PublishSubject <Integer > source = PublishSubject .create ();
224- OperationObserveInForeground . observeOnFragment (source , mockFragment ).subscribe (mockObserver );
242+ OperationObserveFromAndroidComponent . observeFromAndroidComponent (source , mockFragment ).subscribe (mockObserver );
225243
226244 source .onNext (1 );
227245
@@ -237,7 +255,7 @@ public void itDoesNotForwardOnNextOnCompletedSequenceIfFragmentIsDetached() {
237255 @ Test
238256 public void itDoesNotForwardOnErrorIfFragmentIsDetached () {
239257 PublishSubject <Integer > source = PublishSubject .create ();
240- OperationObserveInForeground . observeOnFragment (source , mockFragment ).subscribe (mockObserver );
258+ OperationObserveFromAndroidComponent . observeFromAndroidComponent (source , mockFragment ).subscribe (mockObserver );
241259
242260 source .onNext (1 );
243261
@@ -251,7 +269,7 @@ public void itDoesNotForwardOnErrorIfFragmentIsDetached() {
251269 @ Test
252270 public void isDoesNotForwardOnNextOnCompletedSequenceIfActivityIsFinishing () {
253271 PublishSubject <Integer > source = PublishSubject .create ();
254- OperationObserveInForeground . observeOnActivity (source , mockActivity ).subscribe (mockObserver );
272+ OperationObserveFromAndroidComponent . observeFromAndroidComponent (source , mockActivity ).subscribe (mockObserver );
255273
256274 source .onNext (1 );
257275
@@ -267,7 +285,7 @@ public void isDoesNotForwardOnNextOnCompletedSequenceIfActivityIsFinishing() {
267285 @ Test
268286 public void itDoesNotForwardOnErrorIfActivityIsFinishing () {
269287 PublishSubject <Integer > source = PublishSubject .create ();
270- OperationObserveInForeground . observeOnActivity (source , mockActivity ).subscribe (mockObserver );
288+ OperationObserveFromAndroidComponent . observeFromAndroidComponent (source , mockActivity ).subscribe (mockObserver );
271289
272290 source .onNext (1 );
273291
@@ -277,5 +295,18 @@ public void itDoesNotForwardOnErrorIfActivityIsFinishing() {
277295 verify (mockObserver ).onNext (1 );
278296 verify (mockObserver , never ()).onError (any (Exception .class ));
279297 }
298+
299+ @ Test
300+ public void itUnsubscribesFromTheSourceSequence () {
301+ Subscription underlying = mock (Subscription .class );
302+ when (mockObservable .observeOn (AndroidSchedulers .mainThread ())).thenReturn (mockObservable );
303+ when (mockObservable .subscribe (any (Observer .class ))).thenReturn (underlying );
304+
305+ Subscription sub = OperationObserveFromAndroidComponent .observeFromAndroidComponent (
306+ mockObservable , mockActivity ).subscribe (mockObserver );
307+ sub .unsubscribe ();
308+
309+ verify (underlying ).unsubscribe ();
310+ }
280311 }
281312}
0 commit comments