Skip to content

Commit dbfe4cd

Browse files
committed
make zip(Ob, Func1) not double subscribe to the observable.
1 parent b45f418 commit dbfe4cd

1 file changed

Lines changed: 26 additions & 2 deletions

File tree

src/main/java/rx/BiObservable.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import rx.functions.Func1;
77
import rx.functions.Func2;
88
import rx.functions.Func3;
9-
import rx.subjects.PublishSubject;
109

1110
import java.util.HashMap;
1211
import java.util.Map;
@@ -55,7 +54,32 @@ public void call(Subscriber<? super R> child) {
5554
}
5655

5756
public static <T0, T1> BiObservable<T0, T1> zip(final Observable<? extends T0> ob0, final Func1<? super T0, ? extends T1> f) {
58-
return zip(ob0, ob0.map(f));
57+
return create(new BiOnSubscribe<T0, T1>() {
58+
@Override
59+
public void call(DualSubscriber<? super T0, ? super T1> subscriber) {
60+
ob0.unsafeSubscribe(new Subscriber<T0>() {
61+
@Override
62+
public void onCompleted() {
63+
subscriber.onComplete();
64+
}
65+
66+
@Override
67+
public void onError(Throwable e) {
68+
subscriber.onError(e);
69+
}
70+
71+
@Override
72+
public void onNext(T0 t0) {
73+
try {
74+
T1 t1 = f.call(t0);
75+
subscriber.onNext(t0, t1);
76+
} catch(Throwable e) {
77+
subscriber.onError(e);
78+
}
79+
}
80+
});
81+
}
82+
});
5983
}
6084

6185
public static <T0, T1> BiObservable<T0, T1> zip(final Observable<? extends T0> ob0, final Observable<? extends T1> ob1) {

0 commit comments

Comments
 (0)