/** * Copyright 2015 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */ package rx; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import rx.Observable.Operator; import rx.annotations.Experimental; import rx.exceptions.Exceptions; import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action1; import rx.functions.Func1; import rx.functions.Func2; import rx.functions.Func3; import rx.functions.Func4; import rx.functions.Func5; import rx.functions.Func6; import rx.functions.Func7; import rx.functions.Func8; import rx.functions.Func9; import rx.internal.operators.OnSubscribeToObservableFuture; import rx.internal.operators.OperatorMap; import rx.internal.operators.OperatorObserveOn; import rx.internal.operators.OperatorOnErrorReturn; import rx.internal.operators.OperatorSubscribeOn; import rx.internal.operators.OperatorTimeout; import rx.internal.operators.OperatorZip; import rx.internal.producers.SingleDelayedProducer; import rx.observers.SafeSubscriber; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; /** * The Single class implements the Reactive Pattern for a single value response. See {@link Observable} for the * implementation of the Reactive Pattern for a stream or vector of values. *
* {@code Single} behaves the same as {@link Observable} except that it can only emit either a single successful * value, or an error (there is no "onComplete" notification as there is for {@link Observable}) *
* Like an {@link Observable}, a {@code Single} is lazy, can be either "hot" or "cold", synchronous or * asynchronous. *
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams: *
*
*
* For more information see the ReactiveX
* documentation.
*
* @param
* Note: Use {@link #create(OnSubscribe)} to create a Single, instead of this constructor,
* unless you specifically have a need for inheritance.
*
* @param f
* {@code OnExecute} to be executed when {@code execute(SingleSubscriber)} or
* {@code subscribe(Subscriber)} is called
*/
protected Single(final OnSubscribe
*
* Write the function you pass to {@code create} so that it behaves as a Single: It should invoke the
* SingleSubscriber {@link SingleSubscriber#onSuccess onSuccess} and/or
* {@link SingleSubscriber#onError onError} methods appropriately.
*
* A well-formed Single must invoke either the SingleSubscriber's {@code onSuccess} method exactly once or
* its {@code onError} method exactly once.
*
*
* In other words, this allows chaining TaskExecutors together on a Single for acting on the values within
* the Single.
*
* {@code task.map(...).filter(...).lift(new OperatorA()).lift(new OperatorB(...)).subscribe() }
*
* If the operator you are creating is designed to act on the item emitted by a source Single, use
* {@code lift}. If your operator is designed to transform the source Single as a whole (for instance, by
* applying a particular set of existing RxJava operators to it) use {@link #compose}.
*
* This method operates on the Single itself whereas {@link #lift} operates on the Single's Subscribers or
* Observers.
*
* If the operator you are creating is designed to act on the individual item emitted by a Single, use
* {@link #lift}. If your operator is designed to transform the source Single as a whole (for instance, by
* applying a particular set of existing RxJava operators to it) use {@code compose}.
*
*
*
*
*
*
*
*
*
*
*
*
* You can convert any object that supports the {@link Future} interface into a Single that emits the return
* value of the {@link Future#get} method of that object, by passing the object into the {@code from}
* method.
*
* Important note: This Single is blocking; you cannot unsubscribe from it.
*
*
* You can convert any object that supports the {@link Future} interface into a {@code Single} that emits
* the return value of the {@link Future#get} method of that object, by passing the object into the
* {@code from} method.
*
* Important note: This {@code Single} is blocking; you cannot unsubscribe from it.
*
*
* You can convert any object that supports the {@link Future} interface into a {@code Single} that emits
* the return value of the {@link Future#get} method of that object, by passing the object into the
* {@code from} method.
*
*
* To convert any object into a {@code Single} that emits that object, pass that object into the
* {@code just} method.
*
*
*
*
* You can combine items emitted by multiple Singles so that they appear as a single Observable, by
* using the {@code merge} method.
*
*
* You can combine items emitted by multiple Singles so that they appear as a single Observable, by using
* the {@code merge} method.
*
*
* You can combine items emitted by multiple Singles so that they appear as a single Observable, by using
* the {@code merge} method.
*
*
* You can combine items emitted by multiple Singles so that they appear as a single Observable, by using
* the {@code merge} method.
*
*
* You can combine items emitted by multiple Singles so that they appear as a single Observable, by using
* the {@code merge} method.
*
*
* You can combine items emitted by multiple Singles so that they appear as a single Observable, by using
* the {@code merge} method.
*
*
* You can combine items emitted by multiple Singles so that they appear as a single Observable, by using
* the {@code merge} method.
*
*
* You can combine items emitted by multiple Singles so that they appear as a single Observable, by using
* the {@code merge} method.
*
*
*
*
*
*
*
*
*
*
*
*
*
*
* You can combine items emitted by multiple Singles so that they appear as a single Observable, by using
* the {@code mergeWith} method.
*
*
*
* By default, when a Single encounters an error that prevents it from emitting the expected item to its
* subscriber, the Single invokes its subscriber's {@link Subscriber#onError} method, and then quits
* without invoking any more of its subscriber's methods. The {@code onErrorReturn} method changes this
* behavior. If you pass a function ({@code resumeFunction}) to a Single's {@code onErrorReturn} method, if
* the original Single encounters an error, instead of invoking its subscriber's
* {@link Subscriber#onError} method, it will instead emit the return value of {@code resumeFunction}.
*
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
*
* Use this only for implementing an {@link Operator} that requires nested subscriptions. For other
* purposes, use {@link #subscribe(Subscriber)} which ensures the Rx contract and other functionality.
*
* A typical implementation of {@code subscribe} does the following:
*
* A {@code Single
* For more information see the
* ReactiveX documentation.
*
* A typical implementation of {@code subscribe} does the following:
*
* A {@code Single
* For more information see the
* ReactiveX documentation.
*
*
*
*
*
*
*
*
*
*
*
* @param
*
*
* @param lift
* the Operator that implements the Single-operating function to be applied to the source Single
* @return a Single that is the result of applying the lifted Operator to the source Single
* @see RxJava wiki: Implementing Your Own Operators
*/
private final
*
*
* @param transformer
* implements the function that transforms the source Single
* @return the source Single, transformed by the transformer function
* @see RxJava wiki: Implementing Your Own Operators
*/
@SuppressWarnings("unchecked")
public
*
* @warn more complete description needed
*/
private static
*
*
*
* @return a Single that emits an Observable that emits the same item as the source Single
* @see ReactiveX operators documentation: To
*/
private final Single
*
*
*
* @param t1
* an Single to be concatenated
* @param t2
* an Single to be concatenated
* @return an Observable that emits items emitted by the two source Singles, one after the other.
* @see ReactiveX operators documentation: Concat
*/
public final static
*
*
*
* @param t1
* a Single to be concatenated
* @param t2
* a Single to be concatenated
* @param t3
* a Single to be concatenated
* @return an Observable that emits items emitted by the three source Singles, one after the other.
* @see ReactiveX operators documentation: Concat
*/
public final static
*
*
*
* @param t1
* a Single to be concatenated
* @param t2
* a Single to be concatenated
* @param t3
* a Single to be concatenated
* @param t4
* a Single to be concatenated
* @return an Observable that emits items emitted by the four source Singles, one after the other.
* @see ReactiveX operators documentation: Concat
*/
public final static
*
*
*
* @param t1
* a Single to be concatenated
* @param t2
* a Single to be concatenated
* @param t3
* a Single to be concatenated
* @param t4
* a Single to be concatenated
* @param t5
* a Single to be concatenated
* @return an Observable that emits items emitted by the five source Singles, one after the other.
* @see ReactiveX operators documentation: Concat
*/
public final static
*
*
*
* @param t1
* a Single to be concatenated
* @param t2
* a Single to be concatenated
* @param t3
* a Single to be concatenated
* @param t4
* a Single to be concatenated
* @param t5
* a Single to be concatenated
* @param t6
* a Single to be concatenated
* @return an Observable that emits items emitted by the six source Singles, one after the other.
* @see ReactiveX operators documentation: Concat
*/
public final static
*
*
*
* @param t1
* a Single to be concatenated
* @param t2
* a Single to be concatenated
* @param t3
* a Single to be concatenated
* @param t4
* a Single to be concatenated
* @param t5
* a Single to be concatenated
* @param t6
* a Single to be concatenated
* @param t7
* a Single to be concatenated
* @return an Observable that emits items emitted by the seven source Singles, one after the other.
* @see ReactiveX operators documentation: Concat
*/
public final static
*
*
*
* @param t1
* a Single to be concatenated
* @param t2
* a Single to be concatenated
* @param t3
* a Single to be concatenated
* @param t4
* a Single to be concatenated
* @param t5
* a Single to be concatenated
* @param t6
* a Single to be concatenated
* @param t7
* a Single to be concatenated
* @param t8
* a Single to be concatenated
* @return an Observable that emits items emitted by the eight source Singles, one after the other.
* @see ReactiveX operators documentation: Concat
*/
public final static
*
*
*
* @param t1
* a Single to be concatenated
* @param t2
* a Single to be concatenated
* @param t3
* a Single to be concatenated
* @param t4
* a Single to be concatenated
* @param t5
* a Single to be concatenated
* @param t6
* a Single to be concatenated
* @param t7
* a Single to be concatenated
* @param t8
* a Single to be concatenated
* @param t9
* a Single to be concatenated
* @return an Observable that emits items emitted by the nine source Singles, one after the other.
* @see ReactiveX operators documentation: Concat
*/
public final static
*
*
*
* @param exception
* the particular Throwable to pass to {@link SingleSubscriber#onError onError}
* @param
*
*
*
* @param future
* the source {@link Future}
* @param
*
*
*
* @param future
* the source {@link Future}
* @param timeout
* the maximum time to wait before calling {@code get}
* @param unit
* the {@link TimeUnit} of the {@code timeout} argument
* @param
*
*
*
* @param future
* the source {@link Future}
* @param scheduler
* the {@link Scheduler} to wait for the Future on. Use a Scheduler such as
* {@link Schedulers#io()} that can block and wait on the Future
* @param
*
*
*
* @param value
* the item to emit
* @param
*
*
*
* @param source
* a {@code Single} that emits a {@code Single}
* @return a {@code Single} that emits the item that is the result of flattening the {@code Single} emitted
* by {@code source}
* @see ReactiveX operators documentation: Merge
*/
public final static
*
*
*
* @param t1
* a Single to be merged
* @param t2
* a Single to be merged
* @return an Observable that emits all of the items emitted by the source Singles
* @see ReactiveX operators documentation: Merge
*/
public final static
*
*
*
* @param t1
* a Single to be merged
* @param t2
* a Single to be merged
* @param t3
* a Single to be merged
* @return an Observable that emits all of the items emitted by the source Singles
* @see ReactiveX operators documentation: Merge
*/
public final static
*
*
*
* @param t1
* a Single to be merged
* @param t2
* a Single to be merged
* @param t3
* a Single to be merged
* @param t4
* a Single to be merged
* @return an Observable that emits all of the items emitted by the source Singles
* @see ReactiveX operators documentation: Merge
*/
public final static
*
*
*
* @param t1
* a Single to be merged
* @param t2
* a Single to be merged
* @param t3
* a Single to be merged
* @param t4
* a Single to be merged
* @param t5
* a Single to be merged
* @return an Observable that emits all of the items emitted by the source Singles
* @see ReactiveX operators documentation: Merge
*/
public final static
*
*
*
* @param t1
* a Single to be merged
* @param t2
* a Single to be merged
* @param t3
* a Single to be merged
* @param t4
* a Single to be merged
* @param t5
* a Single to be merged
* @param t6
* a Single to be merged
* @return an Observable that emits all of the items emitted by the source Singles
* @see ReactiveX operators documentation: Merge
*/
public final static
*
*
*
* @param t1
* a Single to be merged
* @param t2
* a Single to be merged
* @param t3
* a Single to be merged
* @param t4
* a Single to be merged
* @param t5
* a Single to be merged
* @param t6
* a Single to be merged
* @param t7
* a Single to be merged
* @return an Observable that emits all of the items emitted by the source Singles
* @see ReactiveX operators documentation: Merge
*/
public final static
*
*
*
* @param t1
* a Single to be merged
* @param t2
* a Single to be merged
* @param t3
* a Single to be merged
* @param t4
* a Single to be merged
* @param t5
* a Single to be merged
* @param t6
* a Single to be merged
* @param t7
* a Single to be merged
* @param t8
* a Single to be merged
* @return an Observable that emits all of the items emitted by the source Singles
* @see ReactiveX operators documentation: Merge
*/
public final static
*
*
*
* @param t1
* a Single to be merged
* @param t2
* a Single to be merged
* @param t3
* a Single to be merged
* @param t4
* a Single to be merged
* @param t5
* a Single to be merged
* @param t6
* a Single to be merged
* @param t7
* a Single to be merged
* @param t8
* a Single to be merged
* @param t9
* a Single to be merged
* @return an Observable that emits all of the items emitted by the source Singles
* @see ReactiveX operators documentation: Merge
*/
public final static
*
*
*
* @param o1
* the first source Single
* @param o2
* a second source Single
* @param zipFunction
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see ReactiveX operators documentation: Zip
*/
public final static
*
*
*
* @param o1
* the first source Single
* @param o2
* a second source Single
* @param o3
* a third source Single
* @param zipFunction
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see ReactiveX operators documentation: Zip
*/
public final static
*
*
*
* @param o1
* the first source Single
* @param o2
* a second source Single
* @param o3
* a third source Single
* @param o4
* a fourth source Single
* @param zipFunction
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see ReactiveX operators documentation: Zip
*/
public final static
*
*
*
* @param o1
* the first source Single
* @param o2
* a second source Single
* @param o3
* a third source Single
* @param o4
* a fourth source Single
* @param o5
* a fifth source Single
* @param zipFunction
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see ReactiveX operators documentation: Zip
*/
public final static
*
*
*
* @param o1
* the first source Single
* @param o2
* a second source Single
* @param o3
* a third source Single
* @param o4
* a fourth source Single
* @param o5
* a fifth source Single
* @param o6
* a sixth source Single
* @param zipFunction
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see ReactiveX operators documentation: Zip
*/
public final static
*
*
*
* @param o1
* the first source Single
* @param o2
* a second source Single
* @param o3
* a third source Single
* @param o4
* a fourth source Single
* @param o5
* a fifth source Single
* @param o6
* a sixth source Single
* @param o7
* a seventh source Single
* @param zipFunction
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see ReactiveX operators documentation: Zip
*/
public final static
*
*
*
* @param o1
* the first source Single
* @param o2
* a second source Single
* @param o3
* a third source Single
* @param o4
* a fourth source Single
* @param o5
* a fifth source Single
* @param o6
* a sixth source Single
* @param o7
* a seventh source Single
* @param o8
* an eighth source Single
* @param zipFunction
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see ReactiveX operators documentation: Zip
*/
public final static
*
*
*
* @param o1
* the first source Single
* @param o2
* a second source Single
* @param o3
* a third source Single
* @param o4
* a fourth source Single
* @param o5
* a fifth source Single
* @param o6
* a sixth source Single
* @param o7
* a seventh source Single
* @param o8
* an eighth source Single
* @param o9
* a ninth source Single
* @param zipFunction
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see ReactiveX operators documentation: Zip
*/
public final static
*
*
*
* @param t1
* a Single to be concatenated after the current
* @return an Observable that emits the item emitted by the source Single, followed by the item emitted by
* {@code t1}
* @see ReactiveX operators documentation: Concat
*/
public final Observable
*
*
*
* @param func
* a function that, when applied to the item emitted by the source Single, returns a Single
* @return the Single returned from {@code func} when applied to the item emitted by the source Single
* @see ReactiveX operators documentation: FlatMap
*/
public final
*
*
*
* @param func
* a function that, when applied to the item emitted by the source Single, returns an
* Observable
* @return the Observable returned from {@code func} when applied to the item emitted by the source Single
* @see ReactiveX operators documentation: FlatMap
*/
public final
*
*
*
* @param func
* a function to apply to the item emitted by the Single
* @return a Single that emits the item from the source Single, transformed by the specified function
* @see ReactiveX operators documentation: Map
*/
public final
*
*
*
* @param t1
* a Single to be merged
* @return an Observable that emits all of the items emitted by the source Singles
* @see ReactiveX operators documentation: Merge
*/
public final Observable
*
*
*
* @param scheduler
* the {@link Scheduler} to notify subscribers on
* @return the source Single modified so that its subscribers are notified on the specified
* {@link Scheduler}
* @see ReactiveX operators documentation: ObserveOn
* @see RxJava Threading Examples
* @see #subscribeOn
*/
public final Single
*
*
*
* @param resumeFunction
* a function that returns an item that the new Single will emit if the source Single encounters
* an error
* @return the original Single with appropriately modified behavior
* @see ReactiveX operators documentation: Catch
*/
public final Single
*
*
* @return a {@link Subscription} reference can request the {@link Single} stop work.
* @throws OnErrorNotImplementedException
* if the Single tries to call {@link Subscriber#onError}
* @see ReactiveX operators documentation: Subscribe
*/
public final Subscription subscribe() {
return subscribe(new Subscriber
*
*
* @param onSuccess
* the {@code Action1
*
*
* @param onSuccess
* the {@code Action1
*
*
* @param subscriber
* the Subscriber that will handle the emission or notification from the Single
*/
public final void unsafeSubscribe(Subscriber super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// TODO add back the hook
// hook.onSubscribeStart(this, onSubscribe).call(subscriber);
onSubscribe.call(subscriber);
hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Throwable e2) {
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
}
}
/**
* Subscribes to a Single and provides a Subscriber that implements functions to handle the item the Single
* emits or any error notification it issues.
*
*
*
*
* @param subscriber
* the {@link Subscriber} that will handle the emission or notification from the Single
* @return a {@link Subscription} reference can request the {@link Single} stop work.
* @throws IllegalStateException
* if {@code subscribe} is unable to obtain an {@code OnSubscribe<>} function
* @throws IllegalArgumentException
* if the {@link Subscriber} provided as the argument to {@code subscribe} is {@code null}
* @throws OnErrorNotImplementedException
* if the {@link Subscriber}'s {@code onError} method is null
* @throws RuntimeException
* if the {@link Subscriber}'s {@code onError} method itself threw a {@code Throwable}
* @see ReactiveX operators documentation: Subscribe
*/
public final Subscription subscribe(Subscriber super T> subscriber) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber
*
*
*
* @param te
* the {@link SingleSubscriber} that will handle the emission or notification from the Single
* @return a {@link Subscription} reference can request the {@link Single} stop work.
* @throws IllegalStateException
* if {@code subscribe} is unable to obtain an {@code OnSubscribe<>} function
* @throws IllegalArgumentException
* if the {@link SingleSubscriber} provided as the argument to {@code subscribe} is {@code null}
* @throws OnErrorNotImplementedException
* if the {@link SingleSubscriber}'s {@code onError} method is null
* @throws RuntimeException
* if the {@link SingleSubscriber}'s {@code onError} method itself threw a {@code Throwable}
* @see ReactiveX operators documentation: Subscribe
*/
public final Subscription subscribe(final SingleSubscriber super T> te) {
Subscriber
*
*
*
* @param scheduler
* the {@link Scheduler} to perform subscription actions on
* @return the source Single modified so that its subscriptions happen on the specified {@link Scheduler}
* @see ReactiveX operators documentation: SubscribeOn
* @see RxJava Threading Examples
* @see #observeOn
*/
public final Single
*
* @return an {@link Observable} that emits a single item T.
*/
public final Observable
*
*
*
* @param timeout
* maximum duration before the Single times out
* @param timeUnit
* the unit of time that applies to the {@code timeout} argument.
* @return the source Single modified to notify subscribers of a {@code TimeoutException} in case of a
* timeout
* @see ReactiveX operators documentation: Timeout
*/
public final Single
*
*
*
* @param timeout
* maximum duration before the Single times out
* @param timeUnit
* the unit of time that applies to the {@code timeout} argument
* @param scheduler
* the Scheduler to run the timeout timers on
* @return the source Single modified to notify subscribers of a {@code TimeoutException} in case of a
* timeout
* @see ReactiveX operators documentation: Timeout
*/
public final Single
*
*
*
* @param timeout
* maximum time before a timeout occurs
* @param timeUnit
* the unit of time that applies to the {@code timeout} argument
* @param other
* the fallback Single to use in case of a timeout
* @return the source Single modified to switch to the fallback Single in case of a timeout
* @see ReactiveX operators documentation: Timeout
*/
public final Single
*
*
*
* @param timeout
* maximum duration before a timeout occurs
* @param timeUnit
* the unit of time that applies to the {@code timeout} argument
* @param other
* the Single to use as the fallback in case of a timeout
* @param scheduler
* the {@link Scheduler} to run the timeout timers on
* @return the source Single modified so that it will switch to the fallback Singlein case of a timeout
* @see ReactiveX operators documentation: Timeout
*/
public final Single
*
*
*
* @param