2424
2525import android .app .Activity ;
2626import android .app .Fragment ;
27+ import android .os .Looper ;
2728import android .util .Log ;
2829
2930import java .lang .reflect .Field ;
31+ import java .util .concurrent .Callable ;
32+ import java .util .concurrent .Executors ;
33+ import java .util .concurrent .Future ;
34+ import java .util .concurrent .TimeUnit ;
3035
3136public class OperationObserveFromAndroidComponent {
3237
@@ -47,8 +52,8 @@ private static abstract class OnSubscribeBase<T, AndroidComponent> implements Ob
4752 private static final String LOG_TAG = OperationObserveFromAndroidComponent .class .getSimpleName ();
4853
4954 private final Observable <T > source ;
50- private volatile AndroidComponent componentRef ;
51- private volatile Observer <? super T > observerRef ;
55+ private AndroidComponent componentRef ;
56+ private Observer <? super T > observerRef ;
5257
5358 private OnSubscribeBase (Observable <T > source , AndroidComponent component ) {
5459 this .source = source ;
@@ -65,6 +70,7 @@ private void log(String message) {
6570
6671 @ Override
6772 public Subscription onSubscribe (Observer <? super T > observer ) {
73+ assertUiThread ();
6874 observerRef = observer ;
6975 final Subscription sourceSub = source .observeOn (AndroidSchedulers .mainThread ()).subscribe (new Observer <T >() {
7076 @ Override
@@ -111,6 +117,12 @@ private void releaseReferences() {
111117 observerRef = null ;
112118 componentRef = null ;
113119 }
120+
121+ private void assertUiThread () {
122+ if (Looper .getMainLooper () != Looper .myLooper ()) {
123+ throw new IllegalStateException ("Observers must subscribe from the main UI thread, but was " + Thread .currentThread ());
124+ }
125+ }
114126 }
115127
116128 private static final class OnSubscribeFragment <T > extends OnSubscribeBase <T , android .app .Fragment > {
@@ -171,6 +183,21 @@ public void setupMocks() {
171183 when (mockFragment .isAdded ()).thenReturn (true );
172184 }
173185
186+ @ Test
187+ public void itThrowsIfObserverSubscribesFromBackgroundThread () throws Exception {
188+ final Future <Object > future = Executors .newSingleThreadExecutor ().submit (new Callable <Object >() {
189+ @ Override
190+ public Object call () throws Exception {
191+ OperationObserveFromAndroidComponent .observeFromAndroidComponent (
192+ mockObservable , mockFragment ).subscribe (mockObserver );
193+ return null ;
194+ }
195+ });
196+ future .get (1 , TimeUnit .SECONDS );
197+ verify (mockObserver ).onError (any (IllegalStateException .class ));
198+ verifyNoMoreInteractions (mockObserver );
199+ }
200+
174201 @ Test
175202 public void itObservesTheSourceSequenceOnTheMainUIThread () {
176203 OperationObserveFromAndroidComponent .observeFromAndroidComponent (mockObservable , mockFragment ).subscribe (mockObserver );
0 commit comments