Skip to content

Commit f869436

Browse files
JakeWhartonakarnokd
authored andcommitted
Normalize empty and never sources. (ReactiveX#4316)
1 parent 0e0edb5 commit f869436

9 files changed

Lines changed: 59 additions & 30 deletions

File tree

src/main/java/io/reactivex/Completable.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,6 @@
3232
* The class follows a similar event pattern as Reactive-Streams: onSubscribe (onError|onComplete)?
3333
*/
3434
public abstract class Completable implements CompletableSource {
35-
/** Single instance of a complete Completable. */
36-
static final Completable COMPLETE = new CompletableEmpty();
37-
38-
/** Single instance of a never Completable. */
39-
static final Completable NEVER = new CompletableNever();
40-
4135
/**
4236
* Wraps the given CompletableSource into a Completable
4337
* if not already Completable.
@@ -93,7 +87,7 @@ public static Completable amb(final Iterable<? extends CompletableSource> source
9387
*/
9488
@SchedulerSupport(SchedulerSupport.NONE)
9589
public static Completable complete() {
96-
return COMPLETE;
90+
return CompletableEmpty.INSTANCE;
9791
}
9892

9993
/**
@@ -432,7 +426,7 @@ public static Completable mergeDelayError(Publisher<? extends CompletableSource>
432426
* @return the singleton instance that never calls onError or onComplete
433427
*/
434428
public static Completable never() {
435-
return NEVER;
429+
return CompletableNever.INSTANCE;
436430
}
437431

438432
/**

src/main/java/io/reactivex/Flowable.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import io.reactivex.internal.operators.flowable.FlowableConcatMap.ErrorMode;
3232
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3333
import io.reactivex.internal.subscribers.flowable.*;
34-
import io.reactivex.internal.subscriptions.EmptySubscription;
3534
import io.reactivex.internal.util.ArrayListSupplier;
3635
import io.reactivex.internal.util.HashMapSupplier;
3736
import io.reactivex.plugins.RxJavaPlugins;
@@ -46,14 +45,6 @@ public abstract class Flowable<T> implements Publisher<T> {
4645
BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128));
4746
}
4847

49-
/** A never observable instance as there is no need to instantiate this more than once. */
50-
static final Flowable<Object> NEVER = new Flowable<Object>() { // FIXME factor out
51-
@Override
52-
public void subscribeActual(Subscriber<? super Object> s) {
53-
s.onSubscribe(EmptySubscription.INSTANCE);
54-
}
55-
};
56-
5748
public static <T> Flowable<T> amb(Iterable<? extends Publisher<? extends T>> sources) {
5849
Objects.requireNonNull(sources, "sources is null");
5950
return new FlowableAmb<T>(null, sources);
@@ -480,8 +471,9 @@ public static <T> Flowable<T> defer(Callable<? extends Publisher<? extends T>> s
480471

481472
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
482473
@SchedulerSupport(SchedulerSupport.NONE)
474+
@SuppressWarnings("unchecked")
483475
public static <T> Flowable<T> empty() {
484-
return FlowableEmpty.empty();
476+
return (Flowable<T>) FlowableEmpty.INSTANCE;
485477
}
486478

487479
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@@ -1014,7 +1006,7 @@ public static <T> Flowable<T> mergeDelayError(
10141006
@SchedulerSupport(SchedulerSupport.NONE)
10151007
@SuppressWarnings("unchecked")
10161008
public static <T> Flowable<T> never() {
1017-
return (Flowable<T>)NEVER;
1009+
return (Flowable<T>) FlowableNever.INSTANCE;
10181010
}
10191011

10201012
@BackpressureSupport(BackpressureKind.FULL)

src/main/java/io/reactivex/Single.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@
3434
* @param <T> the value type
3535
*/
3636
public abstract class Single<T> implements SingleSource<T> {
37-
38-
static final Single<Object> NEVER = new SingleNever<Object>();
39-
4037
static <T> Single<T> wrap(SingleSource<T> source) {
4138
Objects.requireNonNull(source, "source is null");
4239
if (source instanceof Single) {
@@ -389,7 +386,7 @@ public static <T> Flowable<T> merge(
389386

390387
@SuppressWarnings("unchecked")
391388
public static <T> Single<T> never() {
392-
return (Single<T>)NEVER;
389+
return (Single<T>) SingleNever.INSTANCE;
393390
}
394391

395392
public static Single<Long> timer(long delay, TimeUnit unit) {

src/main/java/io/reactivex/internal/operators/completable/CompletableEmpty.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
import io.reactivex.internal.disposables.EmptyDisposable;
1818

1919
public final class CompletableEmpty extends Completable {
20+
public static final Completable INSTANCE = new CompletableEmpty();
21+
22+
private CompletableEmpty() {
23+
}
24+
2025
@Override
2126
public void subscribeActual(CompletableObserver s) {
2227
s.onSubscribe(EmptyDisposable.INSTANCE);

src/main/java/io/reactivex/internal/operators/completable/CompletableNever.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
import io.reactivex.internal.disposables.EmptyDisposable;
1818

1919
public final class CompletableNever extends Completable {
20+
public static final Completable INSTANCE = new CompletableNever();
21+
22+
private CompletableNever() {
23+
}
2024

2125
@Override
2226
protected void subscribeActual(CompletableObserver s) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableEmpty.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,9 @@
2424
*/
2525
public final class FlowableEmpty extends Flowable<Object> implements ScalarCallable<Object> {
2626

27-
static final Flowable<Object> INSTANCE = new FlowableEmpty();
27+
public static final Flowable<Object> INSTANCE = new FlowableEmpty();
2828

29-
@SuppressWarnings("unchecked")
30-
public static <T> Flowable<T> empty() {
31-
return (Flowable<T>)INSTANCE;
29+
private FlowableEmpty() {
3230
}
3331

3432
@Override
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.internal.operators.flowable;
14+
15+
import io.reactivex.Flowable;
16+
import io.reactivex.internal.subscriptions.EmptySubscription;
17+
import org.reactivestreams.Subscriber;
18+
19+
public final class FlowableNever extends Flowable<Object> {
20+
public static final Flowable<Object> INSTANCE = new FlowableNever();
21+
22+
private FlowableNever() {
23+
}
24+
25+
@Override
26+
public void subscribeActual(Subscriber<? super Object> s) {
27+
s.onSubscribe(EmptySubscription.INSTANCE);
28+
}
29+
}

src/main/java/io/reactivex/internal/operators/observable/ObservableEmpty.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
import io.reactivex.Observable;
1616
import io.reactivex.Observer;
1717
import io.reactivex.internal.disposables.EmptyDisposable;
18+
import io.reactivex.internal.fuseable.ScalarCallable;
1819

19-
public final class ObservableEmpty extends Observable<Object> {
20+
public final class ObservableEmpty extends Observable<Object> implements ScalarCallable<Object> {
2021
public static final Observable<Object> INSTANCE = new ObservableEmpty();
2122

2223
private ObservableEmpty() {
@@ -27,4 +28,9 @@ protected void subscribeActual(Observer<? super Object> o) {
2728
o.onSubscribe(EmptyDisposable.INSTANCE);
2829
o.onComplete();
2930
}
31+
32+
@Override
33+
public Object call() {
34+
return null; // null scalar is interpreted as being empty
35+
}
3036
}

src/main/java/io/reactivex/internal/operators/single/SingleNever.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
import io.reactivex.*;
1717
import io.reactivex.internal.disposables.EmptyDisposable;
1818

19-
public final class SingleNever<T> extends Single<T> {
19+
public final class SingleNever extends Single<Object> {
20+
public static final Single<Object> INSTANCE = new SingleNever();
21+
22+
private SingleNever() {
23+
}
2024

2125
@Override
22-
protected void subscribeActual(SingleObserver<? super T> s) {
26+
protected void subscribeActual(SingleObserver<? super Object> s) {
2327
s.onSubscribe(EmptyDisposable.INSTANCE);
2428
}
2529

0 commit comments

Comments
 (0)