File tree Expand file tree Collapse file tree 2 files changed +44
-2
lines changed
Expand file tree Collapse file tree 2 files changed +44
-2
lines changed Original file line number Diff line number Diff line change @@ -671,7 +671,7 @@ public static <T> Single<T> merge(final Single<? extends Single<? extends T>> so
671671
672672 @ Override
673673 public void call (final SingleSubscriber <? super T > child ) {
674- source . subscribe ( new SingleSubscriber <Single <? extends T >>() {
674+ SingleSubscriber < Single <? extends T >> parent = new SingleSubscriber <Single <? extends T >>() {
675675
676676 @ Override
677677 public void onSuccess (Single <? extends T > innerSingle ) {
@@ -683,7 +683,9 @@ public void onError(Throwable error) {
683683 child .onError (error );
684684 }
685685
686- });
686+ };
687+ child .add (parent );
688+ source .subscribe (parent );
687689 }
688690 });
689691 }
Original file line number Diff line number Diff line change @@ -1914,4 +1914,44 @@ public void subscribeWithNullObserver() {
19141914 assertEquals ("observer is null" , ex .getMessage ());
19151915 }
19161916 }
1917+
1918+ @ Test
1919+ public void unsubscribeComposesThrough () {
1920+ PublishSubject <Integer > ps = PublishSubject .create ();
1921+
1922+ Subscription s = ps .toSingle ()
1923+ .flatMap (new Func1 <Integer , Single <Integer >>() {
1924+ @ Override
1925+ public Single <Integer > call (Integer v ) {
1926+ return Single .just (1 );
1927+ }
1928+ })
1929+ .subscribe ();
1930+
1931+ s .unsubscribe ();
1932+
1933+ assertFalse ("Observers present?!" , ps .hasObservers ());
1934+ }
1935+
1936+ @ Test (timeout = 1000 )
1937+ public void unsubscribeComposesThroughAsync () {
1938+ PublishSubject <Integer > ps = PublishSubject .create ();
1939+
1940+ Subscription s = ps .toSingle ()
1941+ .subscribeOn (Schedulers .io ())
1942+ .flatMap (new Func1 <Integer , Single <Integer >>() {
1943+ @ Override
1944+ public Single <Integer > call (Integer v ) {
1945+ return Single .just (1 );
1946+ }
1947+ })
1948+ .subscribe ();
1949+
1950+ while (!ps .hasObservers () && !Thread .currentThread ().isInterrupted ()) ;
1951+
1952+ s .unsubscribe ();
1953+
1954+ assertFalse ("Observers present?!" , ps .hasObservers ());
1955+ }
1956+
19171957}
You can’t perform that action at this time.
0 commit comments