/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import rx.observables.BlockingObservable;
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.operators.AtomicObservableSubscription;
import rx.operators.AtomicObserver;
import rx.operators.OperationAll;
import rx.operators.OperationCache;
import rx.operators.OperationConcat;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationGroupBy;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMulticast;
import rx.operators.OperationObserveOn;
import rx.operators.OperationOnErrorResumeNextViaFunction;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationSample;
import rx.operators.OperationScan;
import rx.operators.OperationSkip;
import rx.operators.OperationSubscribeOn;
import rx.operators.OperationSwitch;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTake;
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationTimestamp;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
import rx.operators.OperationToObservableList;
import rx.operators.OperationToObservableSortedList;
import rx.operators.OperationWhere;
import rx.operators.OperationZip;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.Range;
import rx.util.Timestamped;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
import rx.util.functions.Func3;
import rx.util.functions.Func4;
import rx.util.functions.FuncN;
import rx.util.functions.Function;
import rx.util.functions.FunctionLanguageAdaptor;
import rx.util.functions.Functions;
/**
* The Observable interface that implements the Reactive Pattern.
*
* It provides overloaded methods for subscribing as well as delegate methods to the various operators.
*
* The documentation for this interface makes use of marble diagrams. The following legend explains
* these diagrams:
*
*
*
* For more information see the RxJava Wiki
*
* @param
*/
public class Observable {
private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
private final Func1, Subscription> onSubscribe;
protected Observable() {
this(null);
}
/**
* Construct an Observable with Function to execute when subscribed to.
*
* NOTE: Generally you're better off using {@link #create(Func1)} to create an Observable instead of using inheritance.
*
* @param onSubscribe
* {@link Func1} to be executed when {@link #subscribe(Observer)} is called.
*/
protected Observable(Func1, Subscription> onSubscribe) {
this.onSubscribe = onSubscribe;
}
/**
* an {@link Observer} must call an Observable's subscribe method in order to register itself
* to receive push-based notifications from the Observable. A typical implementation of the
* subscribe method does the following:
*
* It stores a reference to the Observer in a collection object, such as a List
* object.
*
* It returns a reference to the {@link Subscription} interface. This enables
* Observers to unsubscribe (that is, to stop receiving notifications) before the Observable has
* finished sending them and has called the Observer's {@link Observer#onCompleted()} method.
*
* At any given time, a particular instance of an Observable implementation is
* responsible for accepting all subscriptions and notifying all subscribers. Unless the
* documentation for a particular Observable implementation indicates otherwise,
* Observers should make no assumptions about the Observable implementation, such
* as the order of notifications that multiple Observers will receive.
*
* For more information see the RxJava Wiki
*
*
* @param observer
* @return a {@link Subscription} reference that allows observers
* to stop receiving notifications before the provider has finished sending them
*/
public Subscription subscribe(Observer observer) {
// allow the hook to intercept and/or decorate
Func1, Subscription> onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe);
// validate and proceed
if (onSubscribeFunction == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
// the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception
}
try {
/**
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
if (isInternalImplementation(observer)) {
Subscription s = onSubscribeFunction.call(observer);
if (s == null) {
// this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens
// we want to gracefully handle it the same as AtomicObservableSubscription does
return hook.onSubscribeReturn(this, Subscriptions.empty());
} else {
return hook.onSubscribeReturn(this, s);
}
} else {
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
subscription.wrap(onSubscribeFunction.call(new AtomicObserver(subscription, observer)));
return hook.onSubscribeReturn(this, subscription);
}
} catch (Exception e) {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
observer.onError(hook.onSubscribeError(this, e));
} catch (Exception e2) {
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
hook.onSubscribeError(this, r);
throw r;
}
return Subscriptions.empty();
}
}
/**
* an {@link Observer} must call an Observable's subscribe method in order to register itself
* to receive push-based notifications from the Observable. A typical implementation of the
* subscribe method does the following:
*
* It stores a reference to the Observer in a collection object, such as a List
* object.
*
* It returns a reference to the {@link Subscription} interface. This enables
* Observers to unsubscribe (that is, to stop receiving notifications) before the Observable has
* finished sending them and has called the Observer's {@link Observer#onCompleted()} method.
*
* At any given time, a particular instance of an Observable implementation is
* responsible for accepting all subscriptions and notifying all subscribers. Unless the
* documentation for a particular Observable implementation indicates otherwise,
* Observers should make no assumptions about the Observable implementation, such
* as the order of notifications that multiple Observers will receive.
*
* For more information see the RxJava Wiki
*
*
* @param observer
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @return a {@link Subscription} reference that allows observers
* to stop receiving notifications before the provider has finished sending them
*/
public Subscription subscribe(Observer observer, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(observer);
}
/**
* Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
private Subscription protectivelyWrapAndSubscribe(Observer o) {
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
return subscription.wrap(subscribe(new AtomicObserver(subscription, o)));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Map callbacks) {
// lookup and memoize onNext
Object _onNext = callbacks.get("onNext");
if (_onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
final FuncN onNext = Functions.from(_onNext);
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
Object onComplete = callbacks.get("onCompleted");
if (onComplete != null) {
Functions.from(onComplete).call();
}
}
@Override
public void onError(Exception e) {
handleError(e);
Object onError = callbacks.get("onError");
if (onError != null) {
Functions.from(onError).call(e);
}
}
@Override
public void onNext(Object args) {
onNext.call(args);
}
});
}
public Subscription subscribe(final Map callbacks, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(callbacks);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object o) {
if (o instanceof Observer) {
// in case a dynamic language is not correctly handling the overloaded methods and we receive an Observer just forward to the correct method.
return subscribe((Observer) o);
}
// lookup and memoize onNext
if (o == null) {
throw new RuntimeException("onNext must be implemented");
}
final FuncN onNext = Functions.from(o);
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
// do nothing
}
@Override
public void onError(Exception e) {
handleError(e);
// no callback defined
}
@Override
public void onNext(Object args) {
onNext.call(args);
}
});
}
public Subscription subscribe(final Object o, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(o);
}
public Subscription subscribe(final Action1 onNext) {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
// do nothing
}
@Override
public void onError(Exception e) {
handleError(e);
// no callback defined
}
@Override
public void onNext(T args) {
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
onNext.call(args);
}
});
}
public Subscription subscribe(final Action1 onNext, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object onNext, final Object onError) {
// lookup and memoize onNext
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
final FuncN onNextFunction = Functions.from(onNext);
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
// do nothing
}
@Override
public void onError(Exception e) {
handleError(e);
if (onError != null) {
Functions.from(onError).call(e);
}
}
@Override
public void onNext(Object args) {
onNextFunction.call(args);
}
});
}
public Subscription subscribe(final Object onNext, final Object onError, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError);
}
public Subscription subscribe(final Action1 onNext, final Action1 onError) {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
// do nothing
}
@Override
public void onError(Exception e) {
handleError(e);
if (onError != null) {
onError.call(e);
}
}
@Override
public void onNext(T args) {
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
onNext.call(args);
}
});
}
public Subscription subscribe(final Action1 onNext, final Action1 onError, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete) {
// lookup and memoize onNext
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
final FuncN onNextFunction = Functions.from(onNext);
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
if (onComplete != null) {
Functions.from(onComplete).call();
}
}
@Override
public void onError(Exception e) {
handleError(e);
if (onError != null) {
Functions.from(onError).call(e);
}
}
@Override
public void onNext(Object args) {
onNextFunction.call(args);
}
});
}
public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}
public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) {
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
onComplete.call();
}
@Override
public void onError(Exception e) {
handleError(e);
if (onError != null) {
onError.call(e);
}
}
@Override
public void onNext(T args) {
if (onNext == null) {
throw new RuntimeException("onNext must be implemented");
}
onNext.call(args);
}
});
}
public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}
/**
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*
* @param subject
* the subject to push source elements into.
* @param
* result type
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*/
public ConnectableObservable multicast(Subject subject) {
return multicast(this, subject);
}
/**
* Allow the {@link RxJavaErrorHandler} to receive the exception from onError.
*
* @param e
*/
private void handleError(Exception e) {
// onError should be rare so we'll only fetch when needed
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
}
/**
* An Observable that never sends any information to an {@link Observer}.
*
* This Observable is useful primarily for testing purposes.
*
* @param
* the type of item emitted by the Observable
*/
private static class NeverObservable extends Observable {
public NeverObservable() {
super(new Func1, Subscription>() {
@Override
public Subscription call(Observer t1) {
return Subscriptions.empty();
}
});
}
}
/**
* an Observable that calls {@link Observer#onError(Exception)} when the Observer subscribes.
*
* @param
* the type of object returned by the Observable
*/
private static class ThrowObservable extends Observable {
public ThrowObservable(final Exception exception) {
super(new Func1, Subscription>() {
/**
* Accepts an {@link Observer} and calls its onError method.
*
* @param observer
* an {@link Observer} of this Observable
* @return a reference to the subscription
*/
@Override
public Subscription call(Observer observer) {
observer.onError(exception);
return Subscriptions.empty();
}
});
}
}
/**
* Creates an Observable that will execute the given function when a {@link Observer} subscribes to it.
*
* Write the function you pass to create so that it behaves as an Observable - calling the passed-in
* onNext, onError, and onCompleted methods appropriately.
*
* A well-formed Observable must call either the {@link Observer}'s onCompleted method exactly once or its onError method exactly once.
*
* See Rx Design Guidelines (PDF) for detailed information.
*
* @param
* the type emitted by the Observable sequence
* @param func
* a function that accepts an Observer and calls its onNext, onError, and onCompleted methods
* as appropriate, and returns a {@link Subscription} to allow canceling the subscription (if applicable)
* @return an Observable that, when an {@link Observer} subscribes to it, will execute the given function
*/
public static Observable create(Func1, Subscription> func) {
return new Observable(func);
}
/**
* Creates an Observable that will execute the given function when a {@link Observer} subscribes to it.
*
* This method accept {@link Object} to allow different languages to pass in closures using {@link FunctionLanguageAdaptor}.
*
* Write the function you pass to create so that it behaves as an Observable - calling the passed-in
* onNext, onError, and onCompleted methods appropriately.
*
* A well-formed Observable must call either the {@link Observer}'s onCompleted method exactly once or its onError method exactly once.
*
* See Rx Design Guidelines (PDF) for detailed information.
*
* @param
* the type emitted by the Observable sequence
* @param func
* a function that accepts an Observer and calls its onNext, onError, and onCompleted methods
* as appropriate, and returns a {@link Subscription} to allow canceling the subscription (if applicable)
* @return an Observable that, when an {@link Observer} subscribes to it, will execute the given function
*/
public static Observable create(final Object func) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(func);
return create(new Func1, Subscription>() {
@Override
public Subscription call(Observer t1) {
return (Subscription) _f.call(t1);
}
});
}
/**
* Returns an Observable that returns no data to the {@link Observer} and immediately invokes its onCompleted method.
*
*
*
* @param
* the type of item emitted by the Observable
* @return an Observable that returns no data to the {@link Observer} and immediately invokes the {@link Observer}'s onCompleted method
*/
public static Observable empty() {
return toObservable(new ArrayList());
}
/**
* Returns an Observable that calls onError when an {@link Observer} subscribes to it.
*
*
* @param exception
* the error to throw
* @param
* the type of object returned by the Observable
* @return an Observable object that calls onError when an {@link Observer} subscribes
*/
public static Observable error(Exception exception) {
return new ThrowObservable(exception);
}
/**
* Filters an Observable by discarding any of its emissions that do not meet some test.
*
*
*
* @param that
* the Observable to filter
* @param predicate
* a function that evaluates the items emitted by the source Observable, returning true if they pass the filter
* @return an Observable that emits only those items in the original Observable that the filter evaluates as true
*/
public static Observable filter(Observable that, Func1 predicate) {
return create(OperationFilter.filter(that, predicate));
}
/**
* Filters an Observable by discarding any of its emissions that do not meet some test.
*
*
*
* @param that
* the Observable to filter
* @param function
* a function that evaluates the items emitted by the source Observable, returning true if they pass the filter
* @return an Observable that emits only those items in the original Observable that the filter evaluates as true
*/
public static Observable filter(Observable that, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return filter(that, new Func1() {
@Override
public Boolean call(T t1) {
return (Boolean) _f.call(t1);
}
});
}
/**
* Filters an Observable by discarding any of its emissions that do not meet some test.
*
*
*
* @param that
* the Observable to filter
* @param predicate
* a function that evaluates the items emitted by the source Observable, returning true if they pass the filter
* @return an Observable that emits only those items in the original Observable that the filter evaluates as true
*/
public static Observable where(Observable that, Func1 predicate) {
return create(OperationWhere.where(that, predicate));
}
/**
* Converts an {@link Iterable} sequence to an Observable sequence.
*
* @param iterable
* the source {@link Iterable} sequence
* @param
* the type of items in the {@link Iterable} sequence and the type emitted by the resulting Observable
* @return an Observable that emits each item in the source {@link Iterable} sequence
* @see #toObservable(Iterable)
*/
public static Observable from(Iterable iterable) {
return toObservable(iterable);
}
/**
* Converts an Array to an Observable sequence.
*
* @param items
* the source Array
* @param
* the type of items in the Array, and the type of items emitted by the resulting Observable
* @return an Observable that emits each item in the source Array
* @see #toObservable(Object...)
*/
public static Observable from(T... items) {
return toObservable(items);
}
/**
* Generates an observable sequence of integral numbers within a specified range.
*
* @param start
* The value of the first integer in the sequence
* @param count
* The number of sequential integers to generate.
*
* @return An observable sequence that contains a range of sequential integral numbers.
*
* @see Observable.Range Method (Int32, Int32)
*/
public static Observable range(int start, int count) {
return from(Range.createWithCount(start, count));
}
/**
* Asynchronously subscribes and unsubscribes observers on the specified scheduler.
*
* @param source
* the source observable.
* @param scheduler
* the scheduler to perform subscription and unsubscription actions on.
* @param
* the type of observable.
* @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
public static Observable subscribeOn(Observable source, Scheduler scheduler) {
return create(OperationSubscribeOn.subscribeOn(source, scheduler));
}
/**
* Asynchronously notify observers on the specified scheduler.
*
* @param source
* the source observable.
* @param scheduler
* the scheduler to notify observers on.
* @param
* the type of observable.
* @return the source sequence whose observations happen on the specified scheduler.
*/
public static Observable observeOn(Observable source, Scheduler scheduler) {
return create(OperationObserveOn.observeOn(source, scheduler));
}
/**
* Returns an observable sequence that invokes the observable factory whenever a new observer subscribes.
* The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer
* subscribes to the sequence. This is useful to allow an observer to easily obtain an updates or refreshed version
* of the sequence.
*
* @param observableFactory
* the observable factory function to invoke for each observer that subscribes to the resulting sequence.
* @param
* the type of the observable.
* @return the observable sequence whose observers trigger an invocation of the given observable factory function.
*/
public static Observable defer(Func0> observableFactory) {
return create(OperationDefer.defer(observableFactory));
}
/**
* Returns an observable sequence that invokes the observable factory whenever a new observer subscribes.
* The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer
* subscribes to the sequence. This is useful to allow an observer to easily obtain an updates or refreshed version
* of the sequence.
*
* @param observableFactory
* the observable factory function to invoke for each observer that subscribes to the resulting sequence.
* @param
* the type of the observable.
* @return the observable sequence whose observers trigger an invocation of the given observable factory function.
*/
public static Observable defer(Object observableFactory) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(observableFactory);
return create(OperationDefer.defer(new Func0>() {
@Override
@SuppressWarnings("unchecked")
public Observable call() {
return (Observable) _f.call();
}
}));
}
/**
* Returns an Observable that notifies an {@link Observer} of a single value and then completes.
*
* To convert any object into an Observable that emits that object, pass that object into the just method.
*
* This is similar to the {@link #toObservable} method, except that toObservable will convert
* an {@link Iterable} object into an Observable that emits each of the items in the {@link Iterable}, one
* at a time, while the just method would convert the {@link Iterable} into an Observable
* that emits the entire {@link Iterable} as a single item.
*
*
*
* @param value
* the value to pass to the Observer's onNext method
* @param
* the type of the value
* @return an Observable that notifies an {@link Observer} of a single value and then completes
*/
public static Observable just(T value) {
List list = new ArrayList();
list.add(value);
return toObservable(list);
}
/**
* Applies a function of your choosing to every notification emitted by an Observable, and returns
* this transformation as a new Observable sequence.
*
*
*
* @param sequence
* the source Observable
* @param func
* a function to apply to each item in the sequence emitted by the source Observable
* @param
* the type of items emitted by the the source Observable
* @param
* the type of items returned by map function
* @return an Observable that is the result of applying the transformation function to each item
* in the sequence emitted by the source Observable
*/
public static Observable map(Observable sequence, Func1 func) {
return create(OperationMap.map(sequence, func));
}
/**
* Applies a function of your choosing to every notification emitted by an Observable, and returns
* this transformation as a new Observable sequence.
*
*
*
* @param sequence
* the source Observable
* @param func
* a function to apply to each item in the sequence emitted by the source Observable
* @param
* the type of items emitted by the the source Observable
* @param
* the type of items returned by map function
* @return an Observable that is the result of applying the transformation function to each item
* in the sequence emitted by the source Observable
*/
public static Observable map(Observable sequence, final Object func) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(func);
return map(sequence, new Func1() {
@SuppressWarnings("unchecked")
@Override
public R call(T t1) {
return (R) _f.call(t1);
}
});
}
/**
* Creates a new Observable sequence by applying a function that you supply to each object in the
* original Observable sequence, where that function is itself an Observable that emits objects,
* and then merges the results of that function applied to every item emitted by the original
* Observable, emitting these merged results as its own sequence.
*
* Note: mapMany and flatMap are equivalent.
*
*
*
* @param sequence
* the source Observable
* @param func
* a function to apply to each item emitted by the source Observable, generating a
* Observable
* @param
* the type emitted by the source Observable
* @param
* the type emitted by the Observables emitted by func
* @return an Observable that emits a sequence that is the result of applying the transformation
* function to each item emitted by the source Observable and merging the results of
* the Observables obtained from this transformation
* @see #flatMap(Observable, Func1)
*/
public static Observable mapMany(Observable sequence, Func1> func) {
return create(OperationMap.mapMany(sequence, func));
}
/**
* Creates a new Observable sequence by applying a function that you supply to each object in the
* original Observable sequence, where that function is itself an Observable that emits objects,
* and then merges the results of that function applied to every item emitted by the original
* Observable, emitting these merged results as its own sequence.
*
*
*
* @param sequence
* the source Observable
* @param func
* a function to apply to each item emitted by the source Observable, generating a
* Observable
* @param
* the type emitted by the source Observable
* @param
* the type emitted by the Observables emitted by func
* @return an Observable that emits a sequence that is the result of applying the transformation
* function to each item emitted by the source Observable and merging the results of
* the Observables obtained from this transformation
*/
public static Observable mapMany(Observable sequence, final Object func) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(func);
return mapMany(sequence, new Func1() {
@SuppressWarnings("unchecked")
@Override
public R call(T t1) {
return (R) _f.call(t1);
}
});
}
/**
* Materializes the implicit notifications of an observable sequence as explicit notification values.
*
*
*
* @param sequence
* An observable sequence of elements to project.
* @return An observable sequence whose elements are the result of materializing the notifications of the given sequence.
* @see MSDN: Observable.Materialize
*/
public static Observable> materialize(final Observable sequence) {
return create(OperationMaterialize.materialize(sequence));
}
/**
* Dematerializes the explicit notification values of an observable sequence as implicit notifications.
*
* @param sequence
* An observable sequence containing explicit notification values which have to be turned into implicit notifications.
* @return An observable sequence exhibiting the behavior corresponding to the source sequence's notification values.
* @see MSDN: Observable.Dematerialize
*/
public static Observable dematerialize(final Observable> sequence) {
return create(OperationDematerialize.dematerialize(sequence));
}
/**
* Flattens the Observable sequences from a list of Observables into one Observable sequence
* without any transformation. You can combine the output of multiple Observables so that they
* act like a single Observable, by using the merge method.
*
*
*
* @param source
* a list of Observables that emit sequences of items
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the source list of Observables
* @see MSDN: Observable.Merge
*/
public static Observable merge(List> source) {
return create(OperationMerge.merge(source));
}
/**
* Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a
* Observable into one Observable sequence without any transformation. You can combine the output
* of multiple Observables so that they act like a single Observable, by using the merge method.
*
*
*
* @param source
* an Observable that emits Observables
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the Observables emitted by the source Observable
* @see MSDN: Observable.Merge Method
*/
public static Observable merge(Observable> source) {
return create(OperationMerge.merge(source));
}
/**
* Flattens the Observable sequences from a series of Observables into one Observable sequence
* without any transformation. You can combine the output of multiple Observables so that they
* act like a single Observable, by using the merge method.
*
*
*
* @param source
* a series of Observables that emit sequences of items
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the source Observables
* @see MSDN: Observable.Merge Method
*/
public static Observable merge(Observable... source) {
return create(OperationMerge.merge(source));
}
/**
* Returns the values from the source observable sequence until the other observable sequence produces a value.
*
* @param source
* the source sequence to propagate elements for.
* @param other
* the observable sequence that terminates propagation of elements of the source sequence.
* @param
* the type of source.
* @param
* the other type.
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
*/
public static Observable takeUntil(final Observable source, final Observable other) {
return OperationTakeUntil.takeUntil(source, other);
}
/**
* Combines the objects emitted by two or more Observables, and emits the result as a single Observable,
* by using the concat method.
*
*
*
* @param source
* a series of Observables that emit sequences of items
* @return an Observable that emits a sequence of elements that are the result of combining the
* output from the source Observables
* @see MSDN: Observable.Concat Method
*/
public static Observable concat(Observable... source) {
return create(OperationConcat.concat(source));
}
/**
* Emits the same objects as the given Observable, calling the given action
* when it calls onComplete or onError.
*
* @param source
* an observable
* @param action
* an action to be called when the source completes or errors.
* @return an Observable that emits the same objects, then calls the action.
* @see MSDN: Observable.Finally Method
*/
public static Observable finallyDo(Observable source, Action0 action) {
return create(OperationFinally.finallyDo(source, action));
}
/**
* Creates a new Observable sequence by applying a function that you supply to each object in the
* original Observable sequence, where that function is itself an Observable that emits objects,
* and then merges the results of that function applied to every item emitted by the original
* Observable, emitting these merged results as its own sequence.
*
* Note: mapMany and flatMap are equivalent.
*
*
*
* @param sequence
* the source Observable
* @param func
* a function to apply to each item emitted by the source Observable, generating a
* Observable
* @param
* the type emitted by the source Observable
* @param
* the type emitted by the Observables emitted by func
* @return an Observable that emits a sequence that is the result of applying the transformation
* function to each item emitted by the source Observable and merging the results of
* the Observables obtained from this transformation
* @see #mapMany(Observable, Func1)
*/
public static Observable flatMap(Observable sequence, Func1> func) {
return mapMany(sequence, func);
}
/**
* Creates a new Observable sequence by applying a function that you supply to each object in the
* original Observable sequence, where that function is itself an Observable that emits objects,
* and then merges the results of that function applied to every item emitted by the original
* Observable, emitting these merged results as its own sequence.
*
* Note: mapMany and flatMap are equivalent.
*
*
*
* @param sequence
* the source Observable
* @param func
* a function to apply to each item emitted by the source Observable, generating a
* Observable
* @param
* the type emitted by the source Observable
* @param
* the type emitted by the Observables emitted by func
* @return an Observable that emits a sequence that is the result of applying the transformation
* function to each item emitted by the source Observable and merging the results of
* the Observables obtained from this transformation
* @see #mapMany(Observable, Func1)
*/
public static Observable flatMap(Observable sequence, final Object func) {
return mapMany(sequence, func);
}
/**
* Groups the elements of an observable and selects the resulting elements by using a specified function.
*
* @param source
* an observable whose elements to group.
* @param keySelector
* a function to extract the key for each element.
* @param elementSelector
* a function to map each source element to an element in an observable group.
* @param
* the key type.
* @param
* the source type.
* @param
* the resulting observable type.
* @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
*/
public static Observable> groupBy(Observable source, final Func1 keySelector, final Func1 elementSelector) {
return create(OperationGroupBy.groupBy(source, keySelector, elementSelector));
}
/**
* Groups the elements of an observable according to a specified key selector function and
*
* @param source
* an observable whose elements to group.
* @param keySelector
* a function to extract the key for each element.
* @param
* the key type.
* @param
* the source type.
* @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
*/
public static Observable> groupBy(Observable source, final Func1 keySelector) {
return create(OperationGroupBy.groupBy(source, keySelector));
}
/**
* Same functionality as merge except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error.
*
* Only the first onError received will be sent.
*
* This enables receiving all successes from merged sequences without one onError from one sequence causing all onNext calls to be prevented.
*
*
*
* @param source
* a list of Observables that emit sequences of items
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the source list of Observables
* @see MSDN: Observable.Merge Method
*/
public static Observable mergeDelayError(List> source) {
return create(OperationMergeDelayError.mergeDelayError(source));
}
/**
* Same functionality as merge except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error.
*
* Only the first onError received will be sent.
*
* This enables receiving all successes from merged sequences without one onError from one sequence causing all onNext calls to be prevented.
*
*
*
* @param source
* an Observable that emits Observables
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the Observables emitted by the source Observable
* @see MSDN: Observable.Merge Method
*/
public static Observable mergeDelayError(Observable> source) {
return create(OperationMergeDelayError.mergeDelayError(source));
}
/**
* Same functionality as merge except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error.
*
* Only the first onError received will be sent.
*
* This enables receiving all successes from merged sequences without one onError from one sequence causing all onNext calls to be prevented.
*
*
*
* @param source
* a series of Observables that emit sequences of items
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the source Observables
* @see MSDN: Observable.Merge Method
*/
public static Observable mergeDelayError(Observable... source) {
return create(OperationMergeDelayError.mergeDelayError(source));
}
/**
* Returns an Observable that never sends any information to an {@link Observer}.
*
* This observable is useful primarily for testing purposes.
*
* @param
* the type of item (not) emitted by the Observable
* @return an Observable that never sends any information to an {@link Observer}
*/
public static Observable never() {
return new NeverObservable();
}
/**
* Instruct an Observable to pass control to another Observable (the return value of a function)
* rather than calling onError if it encounters an error.
*
* By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer,
* the Observable calls its {@link Observer}'s onError function, and then quits without calling any more
* of its {@link Observer}'s closures. The onErrorResumeNext method changes this behavior. If you pass a
* function that emits an Observable (resumeFunction) to an Observable's onErrorResumeNext method,
* if the original Observable encounters an error, instead of calling its {@link Observer}'s onError function, it
* will instead relinquish control to this new Observable, which will call the {@link Observer}'s onNext method if
* it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may
* never know that an error happened.
*
* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered.
*
*
*
* @param that
* the source Observable
* @param resumeFunction
* a function that returns an Observable that will take over if the source Observable
* encounters an error
* @return the source Observable, with its behavior modified as described
*/
public static Observable onErrorResumeNext(final Observable that, final Func1> resumeFunction) {
return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction));
}
/**
* Instruct an Observable to pass control to another Observable (the return value of a function)
* rather than calling onError if it encounters an error.
*
* By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer,
* the Observable calls its {@link Observer}'s onError function, and then quits without calling any more
* of its {@link Observer}'s closures. The onErrorResumeNext method changes this behavior. If you pass a
* function that emits an Observable (resumeFunction) to an Observable's onErrorResumeNext method,
* if the original Observable encounters an error, instead of calling its {@link Observer}'s onError function, it
* will instead relinquish control to this new Observable, which will call the {@link Observer}'s onNext method if
* it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may
* never know that an error happened.
*
* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered.
*
*
*
* @param that
* the source Observable
* @param resumeFunction
* a function that returns an Observable that will take over if the source Observable
* encounters an error
* @return the source Observable, with its behavior modified as described
*/
public static Observable onErrorResumeNext(final Observable that, final Object resumeFunction) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(resumeFunction);
return onErrorResumeNext(that, new Func1>() {
@SuppressWarnings("unchecked")
@Override
public Observable call(Exception e) {
return (Observable) _f.call(e);
}
});
}
/**
* Instruct an Observable to pass control to another Observable rather than calling onError if it encounters an error.
*
* By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer,
* the Observable calls its {@link Observer}'s onError function, and then quits without calling any more
* of its {@link Observer}'s closures. The onErrorResumeNext method changes this behavior. If you pass a
* function that emits an Observable (resumeFunction) to an Observable's onErrorResumeNext method,
* if the original Observable encounters an error, instead of calling its {@link Observer}'s onError function, it
* will instead relinquish control to this new Observable, which will call the {@link Observer}'s onNext method if
* it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may
* never know that an error happened.
*
* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered.
*
*
*
* @param that
* the source Observable
* @param resumeSequence
* a function that returns an Observable that will take over if the source Observable
* encounters an error
* @return the source Observable, with its behavior modified as described
*/
public static Observable onErrorResumeNext(final Observable that, final Observable resumeSequence) {
return create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence));
}
/**
* Instruct an Observable to emit a particular item to its Observer's onNext function
* rather than calling onError if it encounters an error.
*
* By default, when an Observable encounters an error that prevents it from emitting the expected item to its {@link Observer}, the Observable calls its {@link Observer}'s onError
* function, and then quits
* without calling any more of its {@link Observer}'s closures. The onErrorReturn method changes
* this behavior. If you pass a function (resumeFunction) to an Observable's onErrorReturn
* method, if the original Observable encounters an error, instead of calling its {@link Observer}'s
* onError function, it will instead pass the return value of resumeFunction to the {@link Observer}'s onNext method.
*
* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered.
*
* @param that
* the source Observable
* @param resumeFunction
* a function that returns a value that will be passed into an {@link Observer}'s onNext function if the Observable encounters an error that would
* otherwise cause it to call onError
* @return the source Observable, with its behavior modified as described
*/
public static Observable onErrorReturn(final Observable that, Func1 resumeFunction) {
return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction));
}
/**
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notifications.
*
* @param that
* the source Observable
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*/
public static ConnectableObservable replay(final Observable that) {
return OperationMulticast.multicast(that, ReplaySubject. create());
}
/**
* Similar to {@link #replay()} except that this auto-subscribes to the source sequence.
*
* This is useful when returning an Observable that you wish to cache responses but can't control the
* subscribe/unsubscribe behavior of all the Observers.
*
* NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not
* use this on infinite or very large sequences that will use up memory. This is similar to
* the {@link Observable#toList()} operator in this caution.
*
* @return an observable sequence that upon first subscription caches all events for subsequent subscriptions.
*/
public static Observable cache(final Observable that) {
return create(OperationCache.cache(that));
}
/**
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
*
* @param that
* the source Observable
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*/
public static ConnectableObservable publish(final Observable that) {
return OperationMulticast.multicast(that, PublishSubject. create());
}
/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
* by an Observable into the same function, and so on until all items have been emitted by the
* source Observable, emitting the final result from the final call to your function as its sole
* output.
*
* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject
* method that does a similar operation on lists.
*
*
*
* @param
* the type item emitted by the source Observable
* @param sequence
* the source Observable
* @param accumulator
* an accumulator function to be invoked on each element from the sequence, whose
* result will be used in the next accumulator call (if applicable)
*
* @return an Observable that emits a single element that is the result of accumulating the
* output from applying the accumulator to the sequence of items emitted by the source
* Observable
* @see MSDN: Observable.Aggregate
* @see Wikipedia: Fold (higher-order function)
*/
public static Observable