Skip to content

Commit c0e12e5

Browse files
JakeWhartonakarnokd
authored andcommitted
Promote lift implementations to top-level types. (ReactiveX#4307)
1 parent 9315ec4 commit c0e12e5

File tree

6 files changed

+135
-25
lines changed

6 files changed

+135
-25
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1909,12 +1909,7 @@ public final Flowable<T> doOnLifecycle(final Consumer<? super Subscription> onSu
19091909
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
19101910
Objects.requireNonNull(onRequest, "onRequest is null");
19111911
Objects.requireNonNull(onCancel, "onCancel is null");
1912-
return lift(new FlowableOperator<T, T>() {
1913-
@Override
1914-
public Subscriber<? super T> apply(Subscriber<? super T> s) {
1915-
return new SubscriptionLambdaSubscriber<T>(s, onSubscribe, onRequest, onCancel);
1916-
}
1917-
});
1912+
return new FlowableDoOnLifecycle<T>(this, onSubscribe, onRequest, onCancel);
19181913
}
19191914

19201915
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@@ -2898,12 +2893,7 @@ public final <R> Flowable<R> scanWith(Callable<R> seedSupplier, BiFunction<R, ?
28982893
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
28992894
@SchedulerSupport(SchedulerSupport.NONE)
29002895
public final Flowable<T> serialize() {
2901-
return lift(new FlowableOperator<T, T>() {
2902-
@Override
2903-
public Subscriber<? super T> apply(Subscriber<? super T> s) {
2904-
return new SerializedSubscriber<T>(s);
2905-
}
2906-
});
2896+
return new FlowableSerialized<T>(this);
29072897
}
29082898

29092899
@BackpressureSupport(BackpressureKind.FULL)
@@ -3983,5 +3973,4 @@ public final TestSubscriber<T> test(long initialRequest, int fusionMode, boolean
39833973
subscribe(ts);
39843974
return ts;
39853975
}
3986-
39873976
}

src/main/java/io/reactivex/Observable.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1611,12 +1611,7 @@ public final Observable<T> doOnError(Consumer<? super Throwable> onError) {
16111611
public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Runnable onCancel) {
16121612
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
16131613
Objects.requireNonNull(onCancel, "onCancel is null");
1614-
return lift(new ObservableOperator<T, T>() {
1615-
@Override
1616-
public Observer<? super T> apply(Observer<? super T> s) {
1617-
return new SubscriptionLambdaObserver<T>(s, onSubscribe, onCancel);
1618-
}
1619-
});
1614+
return new ObservableDoOnLifecycle<T>(this, onSubscribe, onCancel);
16201615
}
16211616

16221617
@SchedulerSupport(SchedulerSupport.NONE)
@@ -2451,12 +2446,7 @@ public final <R> Observable<R> scanWith(Callable<R> seedSupplier, BiFunction<R,
24512446

24522447
@SchedulerSupport(SchedulerSupport.NONE)
24532448
public final Observable<T> serialize() {
2454-
return lift(new ObservableOperator<T, T>() {
2455-
@Override
2456-
public Observer<? super T> apply(Observer<? super T> s) {
2457-
return new SerializedObserver<T>(s);
2458-
}
2459-
});
2449+
return new ObservableSerialized<T>(this);
24602450
}
24612451

24622452
@SchedulerSupport(SchedulerSupport.NONE)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.internal.operators.flowable;
14+
15+
import io.reactivex.Flowable;
16+
import io.reactivex.functions.Consumer;
17+
import io.reactivex.functions.LongConsumer;
18+
import io.reactivex.internal.subscribers.flowable.SubscriptionLambdaSubscriber;
19+
import org.reactivestreams.Subscriber;
20+
import org.reactivestreams.Subscription;
21+
22+
public final class FlowableDoOnLifecycle<T> extends FlowableSource<T, T> {
23+
private final Consumer<? super Subscription> onSubscribe;
24+
private final LongConsumer onRequest;
25+
private final Runnable onCancel;
26+
27+
public FlowableDoOnLifecycle(Flowable<T> source, Consumer<? super Subscription> onSubscribe,
28+
LongConsumer onRequest, Runnable onCancel) {
29+
super(source);
30+
this.onSubscribe = onSubscribe;
31+
this.onRequest = onRequest;
32+
this.onCancel = onCancel;
33+
}
34+
35+
@Override
36+
protected void subscribeActual(Subscriber<? super T> s) {
37+
source.subscribe(new SubscriptionLambdaSubscriber<T>(s, onSubscribe, onRequest, onCancel));
38+
}
39+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.internal.operators.flowable;
14+
15+
import io.reactivex.Flowable;
16+
import io.reactivex.subscribers.SerializedSubscriber;
17+
import org.reactivestreams.Subscriber;
18+
19+
public final class FlowableSerialized<T> extends FlowableSource<T, T> {
20+
public FlowableSerialized(Flowable<T> source) {
21+
super(source);
22+
}
23+
24+
@Override
25+
protected void subscribeActual(Subscriber<? super T> s) {
26+
source.subscribe(new SerializedSubscriber<T>(s));
27+
}
28+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.internal.operators.observable;
14+
15+
import io.reactivex.Observable;
16+
import io.reactivex.Observer;
17+
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.functions.Consumer;
19+
import io.reactivex.internal.subscribers.observable.SubscriptionLambdaObserver;
20+
21+
public final class ObservableDoOnLifecycle<T> extends ObservableWithUpstream<T, T> {
22+
private final Consumer<? super Disposable> onSubscribe;
23+
private final Runnable onCancel;
24+
25+
public ObservableDoOnLifecycle(Observable<T> upstream, Consumer<? super Disposable> onSubscribe,
26+
Runnable onCancel) {
27+
super(upstream);
28+
this.onSubscribe = onSubscribe;
29+
this.onCancel = onCancel;
30+
}
31+
32+
@Override
33+
protected void subscribeActual(Observer<? super T> observer) {
34+
source.subscribe(new SubscriptionLambdaObserver<T>(observer, onSubscribe, onCancel));
35+
}
36+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.internal.operators.observable;
14+
15+
import io.reactivex.Observable;
16+
import io.reactivex.Observer;
17+
import io.reactivex.observers.SerializedObserver;
18+
19+
public final class ObservableSerialized<T> extends ObservableWithUpstream<T, T> {
20+
public ObservableSerialized(Observable<T> upstream) {
21+
super(upstream);
22+
}
23+
24+
@Override
25+
protected void subscribeActual(Observer<? super T> observer) {
26+
source.subscribe(new SerializedObserver<T>(observer));
27+
}
28+
}

0 commit comments

Comments
 (0)