/** * Copyright 2013 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package rx; import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import rx.observables.BlockingObservable; import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; import rx.operators.AtomicObservableSubscription; import rx.operators.AtomicObserver; import rx.operators.OperationAll; import rx.operators.OperationCache; import rx.operators.OperationConcat; import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; import rx.operators.OperationFinally; import rx.operators.OperationGroupBy; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; import rx.operators.OperationMerge; import rx.operators.OperationMergeDelayError; import rx.operators.OperationMulticast; import rx.operators.OperationObserveOn; import rx.operators.OperationOnErrorResumeNextViaFunction; import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; import rx.operators.OperationSample; import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSubscribeOn; import rx.operators.OperationSwitch; import rx.operators.OperationSynchronize; import rx.operators.OperationTake; import rx.operators.OperationTakeLast; import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; import rx.operators.OperationTimestamp; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; import rx.operators.OperationToObservableList; import rx.operators.OperationToObservableSortedList; import rx.operators.OperationWhere; import rx.operators.OperationZip; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.subjects.PublishSubject; import rx.subjects.ReplaySubject; import rx.subjects.Subject; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; import rx.util.Range; import rx.util.Timestamped; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func0; import rx.util.functions.Func1; import rx.util.functions.Func2; import rx.util.functions.Func3; import rx.util.functions.Func4; import rx.util.functions.FuncN; import rx.util.functions.Function; import rx.util.functions.FunctionLanguageAdaptor; import rx.util.functions.Functions; /** * The Observable interface that implements the Reactive Pattern. *

* It provides overloaded methods for subscribing as well as delegate methods to the various operators. *

* The documentation for this interface makes use of marble diagrams. The following legend explains * these diagrams: *

* *

* For more information see the RxJava Wiki * * @param */ public class Observable { private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); private final Func1, Subscription> onSubscribe; protected Observable() { this(null); } /** * Construct an Observable with Function to execute when subscribed to. *

* NOTE: Generally you're better off using {@link #create(Func1)} to create an Observable instead of using inheritance. * * @param onSubscribe * {@link Func1} to be executed when {@link #subscribe(Observer)} is called. */ protected Observable(Func1, Subscription> onSubscribe) { this.onSubscribe = onSubscribe; } /** * an {@link Observer} must call an Observable's subscribe method in order to register itself * to receive push-based notifications from the Observable. A typical implementation of the * subscribe method does the following: *

* It stores a reference to the Observer in a collection object, such as a List * object. *

* It returns a reference to the {@link Subscription} interface. This enables * Observers to unsubscribe (that is, to stop receiving notifications) before the Observable has * finished sending them and has called the Observer's {@link Observer#onCompleted()} method. *

* At any given time, a particular instance of an Observable implementation is * responsible for accepting all subscriptions and notifying all subscribers. Unless the * documentation for a particular Observable implementation indicates otherwise, * Observers should make no assumptions about the Observable implementation, such * as the order of notifications that multiple Observers will receive. *

* For more information see the RxJava Wiki * * * @param observer * @return a {@link Subscription} reference that allows observers * to stop receiving notifications before the provider has finished sending them */ public Subscription subscribe(Observer observer) { // allow the hook to intercept and/or decorate Func1, Subscription> onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe); // validate and proceed if (onSubscribeFunction == null) { throw new IllegalStateException("onSubscribe function can not be null."); // the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception } try { /** * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ if (isInternalImplementation(observer)) { Subscription s = onSubscribeFunction.call(observer); if (s == null) { // this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens // we want to gracefully handle it the same as AtomicObservableSubscription does return hook.onSubscribeReturn(this, Subscriptions.empty()); } else { return hook.onSubscribeReturn(this, s); } } else { AtomicObservableSubscription subscription = new AtomicObservableSubscription(); subscription.wrap(onSubscribeFunction.call(new AtomicObserver(subscription, observer))); return hook.onSubscribeReturn(this, subscription); } } catch (Exception e) { // if an unhandled error occurs executing the onSubscribe we will propagate it try { observer.onError(hook.onSubscribeError(this, e)); } catch (Exception e2) { // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); hook.onSubscribeError(this, r); throw r; } return Subscriptions.empty(); } } /** * an {@link Observer} must call an Observable's subscribe method in order to register itself * to receive push-based notifications from the Observable. A typical implementation of the * subscribe method does the following: *

* It stores a reference to the Observer in a collection object, such as a List * object. *

* It returns a reference to the {@link Subscription} interface. This enables * Observers to unsubscribe (that is, to stop receiving notifications) before the Observable has * finished sending them and has called the Observer's {@link Observer#onCompleted()} method. *

* At any given time, a particular instance of an Observable implementation is * responsible for accepting all subscriptions and notifying all subscribers. Unless the * documentation for a particular Observable implementation indicates otherwise, * Observers should make no assumptions about the Observable implementation, such * as the order of notifications that multiple Observers will receive. *

* For more information see the RxJava Wiki * * * @param observer * @param scheduler * The {@link Scheduler} that the sequence is subscribed to on. * @return a {@link Subscription} reference that allows observers * to stop receiving notifications before the provider has finished sending them */ public Subscription subscribe(Observer observer, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(observer); } /** * Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance. *

* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ private Subscription protectivelyWrapAndSubscribe(Observer o) { AtomicObservableSubscription subscription = new AtomicObservableSubscription(); return subscription.wrap(subscribe(new AtomicObserver(subscription, o))); } @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Map callbacks) { // lookup and memoize onNext Object _onNext = callbacks.get("onNext"); if (_onNext == null) { throw new RuntimeException("onNext must be implemented"); } final FuncN onNext = Functions.from(_onNext); /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { Object onComplete = callbacks.get("onCompleted"); if (onComplete != null) { Functions.from(onComplete).call(); } } @Override public void onError(Exception e) { handleError(e); Object onError = callbacks.get("onError"); if (onError != null) { Functions.from(onError).call(e); } } @Override public void onNext(Object args) { onNext.call(args); } }); } public Subscription subscribe(final Map callbacks, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(callbacks); } @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Object o) { if (o instanceof Observer) { // in case a dynamic language is not correctly handling the overloaded methods and we receive an Observer just forward to the correct method. return subscribe((Observer) o); } // lookup and memoize onNext if (o == null) { throw new RuntimeException("onNext must be implemented"); } final FuncN onNext = Functions.from(o); /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { // do nothing } @Override public void onError(Exception e) { handleError(e); // no callback defined } @Override public void onNext(Object args) { onNext.call(args); } }); } public Subscription subscribe(final Object o, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(o); } public Subscription subscribe(final Action1 onNext) { /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { // do nothing } @Override public void onError(Exception e) { handleError(e); // no callback defined } @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); } onNext.call(args); } }); } public Subscription subscribe(final Action1 onNext, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext); } @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Object onNext, final Object onError) { // lookup and memoize onNext if (onNext == null) { throw new RuntimeException("onNext must be implemented"); } final FuncN onNextFunction = Functions.from(onNext); /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { // do nothing } @Override public void onError(Exception e) { handleError(e); if (onError != null) { Functions.from(onError).call(e); } } @Override public void onNext(Object args) { onNextFunction.call(args); } }); } public Subscription subscribe(final Object onNext, final Object onError, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError); } public Subscription subscribe(final Action1 onNext, final Action1 onError) { /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { // do nothing } @Override public void onError(Exception e) { handleError(e); if (onError != null) { onError.call(e); } } @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); } onNext.call(args); } }); } public Subscription subscribe(final Action1 onNext, final Action1 onError, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError); } @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete) { // lookup and memoize onNext if (onNext == null) { throw new RuntimeException("onNext must be implemented"); } final FuncN onNextFunction = Functions.from(onNext); /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { if (onComplete != null) { Functions.from(onComplete).call(); } } @Override public void onError(Exception e) { handleError(e); if (onError != null) { Functions.from(onError).call(e); } } @Override public void onNext(Object args) { onNextFunction.call(args); } }); } public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); } public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) { /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { onComplete.call(); } @Override public void onError(Exception e) { handleError(e); if (onError != null) { onError.call(e); } } @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); } onNext.call(args); } }); } public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); } /** * Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. * * @param subject * the subject to push source elements into. * @param * result type * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. */ public ConnectableObservable multicast(Subject subject) { return multicast(this, subject); } /** * Allow the {@link RxJavaErrorHandler} to receive the exception from onError. * * @param e */ private void handleError(Exception e) { // onError should be rare so we'll only fetch when needed RxJavaPlugins.getInstance().getErrorHandler().handleError(e); } /** * An Observable that never sends any information to an {@link Observer}. * * This Observable is useful primarily for testing purposes. * * @param * the type of item emitted by the Observable */ private static class NeverObservable extends Observable { public NeverObservable() { super(new Func1, Subscription>() { @Override public Subscription call(Observer t1) { return Subscriptions.empty(); } }); } } /** * an Observable that calls {@link Observer#onError(Exception)} when the Observer subscribes. * * @param * the type of object returned by the Observable */ private static class ThrowObservable extends Observable { public ThrowObservable(final Exception exception) { super(new Func1, Subscription>() { /** * Accepts an {@link Observer} and calls its onError method. * * @param observer * an {@link Observer} of this Observable * @return a reference to the subscription */ @Override public Subscription call(Observer observer) { observer.onError(exception); return Subscriptions.empty(); } }); } } /** * Creates an Observable that will execute the given function when a {@link Observer} subscribes to it. *

* Write the function you pass to create so that it behaves as an Observable - calling the passed-in * onNext, onError, and onCompleted methods appropriately. *

* A well-formed Observable must call either the {@link Observer}'s onCompleted method exactly once or its onError method exactly once. *

* See Rx Design Guidelines (PDF) for detailed information. * * @param * the type emitted by the Observable sequence * @param func * a function that accepts an Observer and calls its onNext, onError, and onCompleted methods * as appropriate, and returns a {@link Subscription} to allow canceling the subscription (if applicable) * @return an Observable that, when an {@link Observer} subscribes to it, will execute the given function */ public static Observable create(Func1, Subscription> func) { return new Observable(func); } /** * Creates an Observable that will execute the given function when a {@link Observer} subscribes to it. *

* This method accept {@link Object} to allow different languages to pass in closures using {@link FunctionLanguageAdaptor}. *

* Write the function you pass to create so that it behaves as an Observable - calling the passed-in * onNext, onError, and onCompleted methods appropriately. *

* A well-formed Observable must call either the {@link Observer}'s onCompleted method exactly once or its onError method exactly once. *

* See Rx Design Guidelines (PDF) for detailed information. * * @param * the type emitted by the Observable sequence * @param func * a function that accepts an Observer and calls its onNext, onError, and onCompleted methods * as appropriate, and returns a {@link Subscription} to allow canceling the subscription (if applicable) * @return an Observable that, when an {@link Observer} subscribes to it, will execute the given function */ public static Observable create(final Object func) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(func); return create(new Func1, Subscription>() { @Override public Subscription call(Observer t1) { return (Subscription) _f.call(t1); } }); } /** * Returns an Observable that returns no data to the {@link Observer} and immediately invokes its onCompleted method. *

* * * @param * the type of item emitted by the Observable * @return an Observable that returns no data to the {@link Observer} and immediately invokes the {@link Observer}'s onCompleted method */ public static Observable empty() { return toObservable(new ArrayList()); } /** * Returns an Observable that calls onError when an {@link Observer} subscribes to it. *

* * @param exception * the error to throw * @param * the type of object returned by the Observable * @return an Observable object that calls onError when an {@link Observer} subscribes */ public static Observable error(Exception exception) { return new ThrowObservable(exception); } /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

* * * @param that * the Observable to filter * @param predicate * a function that evaluates the items emitted by the source Observable, returning true if they pass the filter * @return an Observable that emits only those items in the original Observable that the filter evaluates as true */ public static Observable filter(Observable that, Func1 predicate) { return create(OperationFilter.filter(that, predicate)); } /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

* * * @param that * the Observable to filter * @param function * a function that evaluates the items emitted by the source Observable, returning true if they pass the filter * @return an Observable that emits only those items in the original Observable that the filter evaluates as true */ public static Observable filter(Observable that, final Object function) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(function); return filter(that, new Func1() { @Override public Boolean call(T t1) { return (Boolean) _f.call(t1); } }); } /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

* * * @param that * the Observable to filter * @param predicate * a function that evaluates the items emitted by the source Observable, returning true if they pass the filter * @return an Observable that emits only those items in the original Observable that the filter evaluates as true */ public static Observable where(Observable that, Func1 predicate) { return create(OperationWhere.where(that, predicate)); } /** * Converts an {@link Iterable} sequence to an Observable sequence. * * @param iterable * the source {@link Iterable} sequence * @param * the type of items in the {@link Iterable} sequence and the type emitted by the resulting Observable * @return an Observable that emits each item in the source {@link Iterable} sequence * @see #toObservable(Iterable) */ public static Observable from(Iterable iterable) { return toObservable(iterable); } /** * Converts an Array to an Observable sequence. * * @param items * the source Array * @param * the type of items in the Array, and the type of items emitted by the resulting Observable * @return an Observable that emits each item in the source Array * @see #toObservable(Object...) */ public static Observable from(T... items) { return toObservable(items); } /** * Generates an observable sequence of integral numbers within a specified range. * * @param start * The value of the first integer in the sequence * @param count * The number of sequential integers to generate. * * @return An observable sequence that contains a range of sequential integral numbers. * * @see Observable.Range Method (Int32, Int32) */ public static Observable range(int start, int count) { return from(Range.createWithCount(start, count)); } /** * Asynchronously subscribes and unsubscribes observers on the specified scheduler. * * @param source * the source observable. * @param scheduler * the scheduler to perform subscription and unsubscription actions on. * @param * the type of observable. * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. */ public static Observable subscribeOn(Observable source, Scheduler scheduler) { return create(OperationSubscribeOn.subscribeOn(source, scheduler)); } /** * Asynchronously notify observers on the specified scheduler. * * @param source * the source observable. * @param scheduler * the scheduler to notify observers on. * @param * the type of observable. * @return the source sequence whose observations happen on the specified scheduler. */ public static Observable observeOn(Observable source, Scheduler scheduler) { return create(OperationObserveOn.observeOn(source, scheduler)); } /** * Returns an observable sequence that invokes the observable factory whenever a new observer subscribes. * The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer * subscribes to the sequence. This is useful to allow an observer to easily obtain an updates or refreshed version * of the sequence. * * @param observableFactory * the observable factory function to invoke for each observer that subscribes to the resulting sequence. * @param * the type of the observable. * @return the observable sequence whose observers trigger an invocation of the given observable factory function. */ public static Observable defer(Func0> observableFactory) { return create(OperationDefer.defer(observableFactory)); } /** * Returns an observable sequence that invokes the observable factory whenever a new observer subscribes. * The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer * subscribes to the sequence. This is useful to allow an observer to easily obtain an updates or refreshed version * of the sequence. * * @param observableFactory * the observable factory function to invoke for each observer that subscribes to the resulting sequence. * @param * the type of the observable. * @return the observable sequence whose observers trigger an invocation of the given observable factory function. */ public static Observable defer(Object observableFactory) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(observableFactory); return create(OperationDefer.defer(new Func0>() { @Override @SuppressWarnings("unchecked") public Observable call() { return (Observable) _f.call(); } })); } /** * Returns an Observable that notifies an {@link Observer} of a single value and then completes. *

* To convert any object into an Observable that emits that object, pass that object into the just method. *

* This is similar to the {@link #toObservable} method, except that toObservable will convert * an {@link Iterable} object into an Observable that emits each of the items in the {@link Iterable}, one * at a time, while the just method would convert the {@link Iterable} into an Observable * that emits the entire {@link Iterable} as a single item. *

* * * @param value * the value to pass to the Observer's onNext method * @param * the type of the value * @return an Observable that notifies an {@link Observer} of a single value and then completes */ public static Observable just(T value) { List list = new ArrayList(); list.add(value); return toObservable(list); } /** * Applies a function of your choosing to every notification emitted by an Observable, and returns * this transformation as a new Observable sequence. *

* * * @param sequence * the source Observable * @param func * a function to apply to each item in the sequence emitted by the source Observable * @param * the type of items emitted by the the source Observable * @param * the type of items returned by map function * @return an Observable that is the result of applying the transformation function to each item * in the sequence emitted by the source Observable */ public static Observable map(Observable sequence, Func1 func) { return create(OperationMap.map(sequence, func)); } /** * Applies a function of your choosing to every notification emitted by an Observable, and returns * this transformation as a new Observable sequence. *

* * * @param sequence * the source Observable * @param func * a function to apply to each item in the sequence emitted by the source Observable * @param * the type of items emitted by the the source Observable * @param * the type of items returned by map function * @return an Observable that is the result of applying the transformation function to each item * in the sequence emitted by the source Observable */ public static Observable map(Observable sequence, final Object func) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(func); return map(sequence, new Func1() { @SuppressWarnings("unchecked") @Override public R call(T t1) { return (R) _f.call(t1); } }); } /** * Creates a new Observable sequence by applying a function that you supply to each object in the * original Observable sequence, where that function is itself an Observable that emits objects, * and then merges the results of that function applied to every item emitted by the original * Observable, emitting these merged results as its own sequence. *

* Note: mapMany and flatMap are equivalent. *

* * * @param sequence * the source Observable * @param func * a function to apply to each item emitted by the source Observable, generating a * Observable * @param * the type emitted by the source Observable * @param * the type emitted by the Observables emitted by func * @return an Observable that emits a sequence that is the result of applying the transformation * function to each item emitted by the source Observable and merging the results of * the Observables obtained from this transformation * @see #flatMap(Observable, Func1) */ public static Observable mapMany(Observable sequence, Func1> func) { return create(OperationMap.mapMany(sequence, func)); } /** * Creates a new Observable sequence by applying a function that you supply to each object in the * original Observable sequence, where that function is itself an Observable that emits objects, * and then merges the results of that function applied to every item emitted by the original * Observable, emitting these merged results as its own sequence. *

* * * @param sequence * the source Observable * @param func * a function to apply to each item emitted by the source Observable, generating a * Observable * @param * the type emitted by the source Observable * @param * the type emitted by the Observables emitted by func * @return an Observable that emits a sequence that is the result of applying the transformation * function to each item emitted by the source Observable and merging the results of * the Observables obtained from this transformation */ public static Observable mapMany(Observable sequence, final Object func) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(func); return mapMany(sequence, new Func1() { @SuppressWarnings("unchecked") @Override public R call(T t1) { return (R) _f.call(t1); } }); } /** * Materializes the implicit notifications of an observable sequence as explicit notification values. *

* * * @param sequence * An observable sequence of elements to project. * @return An observable sequence whose elements are the result of materializing the notifications of the given sequence. * @see MSDN: Observable.Materialize */ public static Observable> materialize(final Observable sequence) { return create(OperationMaterialize.materialize(sequence)); } /** * Dematerializes the explicit notification values of an observable sequence as implicit notifications. * * @param sequence * An observable sequence containing explicit notification values which have to be turned into implicit notifications. * @return An observable sequence exhibiting the behavior corresponding to the source sequence's notification values. * @see MSDN: Observable.Dematerialize */ public static Observable dematerialize(final Observable> sequence) { return create(OperationDematerialize.dematerialize(sequence)); } /** * Flattens the Observable sequences from a list of Observables into one Observable sequence * without any transformation. You can combine the output of multiple Observables so that they * act like a single Observable, by using the merge method. *

* * * @param source * a list of Observables that emit sequences of items * @return an Observable that emits a sequence of elements that are the result of flattening the * output from the source list of Observables * @see MSDN: Observable.Merge */ public static Observable merge(List> source) { return create(OperationMerge.merge(source)); } /** * Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a * Observable into one Observable sequence without any transformation. You can combine the output * of multiple Observables so that they act like a single Observable, by using the merge method. *

* * * @param source * an Observable that emits Observables * @return an Observable that emits a sequence of elements that are the result of flattening the * output from the Observables emitted by the source Observable * @see MSDN: Observable.Merge Method */ public static Observable merge(Observable> source) { return create(OperationMerge.merge(source)); } /** * Flattens the Observable sequences from a series of Observables into one Observable sequence * without any transformation. You can combine the output of multiple Observables so that they * act like a single Observable, by using the merge method. *

* * * @param source * a series of Observables that emit sequences of items * @return an Observable that emits a sequence of elements that are the result of flattening the * output from the source Observables * @see MSDN: Observable.Merge Method */ public static Observable merge(Observable... source) { return create(OperationMerge.merge(source)); } /** * Returns the values from the source observable sequence until the other observable sequence produces a value. * * @param source * the source sequence to propagate elements for. * @param other * the observable sequence that terminates propagation of elements of the source sequence. * @param * the type of source. * @param * the other type. * @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation. */ public static Observable takeUntil(final Observable source, final Observable other) { return OperationTakeUntil.takeUntil(source, other); } /** * Combines the objects emitted by two or more Observables, and emits the result as a single Observable, * by using the concat method. *

* * * @param source * a series of Observables that emit sequences of items * @return an Observable that emits a sequence of elements that are the result of combining the * output from the source Observables * @see MSDN: Observable.Concat Method */ public static Observable concat(Observable... source) { return create(OperationConcat.concat(source)); } /** * Emits the same objects as the given Observable, calling the given action * when it calls onComplete or onError. * * @param source * an observable * @param action * an action to be called when the source completes or errors. * @return an Observable that emits the same objects, then calls the action. * @see MSDN: Observable.Finally Method */ public static Observable finallyDo(Observable source, Action0 action) { return create(OperationFinally.finallyDo(source, action)); } /** * Creates a new Observable sequence by applying a function that you supply to each object in the * original Observable sequence, where that function is itself an Observable that emits objects, * and then merges the results of that function applied to every item emitted by the original * Observable, emitting these merged results as its own sequence. *

* Note: mapMany and flatMap are equivalent. *

* * * @param sequence * the source Observable * @param func * a function to apply to each item emitted by the source Observable, generating a * Observable * @param * the type emitted by the source Observable * @param * the type emitted by the Observables emitted by func * @return an Observable that emits a sequence that is the result of applying the transformation * function to each item emitted by the source Observable and merging the results of * the Observables obtained from this transformation * @see #mapMany(Observable, Func1) */ public static Observable flatMap(Observable sequence, Func1> func) { return mapMany(sequence, func); } /** * Creates a new Observable sequence by applying a function that you supply to each object in the * original Observable sequence, where that function is itself an Observable that emits objects, * and then merges the results of that function applied to every item emitted by the original * Observable, emitting these merged results as its own sequence. *

* Note: mapMany and flatMap are equivalent. *

* * * @param sequence * the source Observable * @param func * a function to apply to each item emitted by the source Observable, generating a * Observable * @param * the type emitted by the source Observable * @param * the type emitted by the Observables emitted by func * @return an Observable that emits a sequence that is the result of applying the transformation * function to each item emitted by the source Observable and merging the results of * the Observables obtained from this transformation * @see #mapMany(Observable, Func1) */ public static Observable flatMap(Observable sequence, final Object func) { return mapMany(sequence, func); } /** * Groups the elements of an observable and selects the resulting elements by using a specified function. * * @param source * an observable whose elements to group. * @param keySelector * a function to extract the key for each element. * @param elementSelector * a function to map each source element to an element in an observable group. * @param * the key type. * @param * the source type. * @param * the resulting observable type. * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. */ public static Observable> groupBy(Observable source, final Func1 keySelector, final Func1 elementSelector) { return create(OperationGroupBy.groupBy(source, keySelector, elementSelector)); } /** * Groups the elements of an observable according to a specified key selector function and * * @param source * an observable whose elements to group. * @param keySelector * a function to extract the key for each element. * @param * the key type. * @param * the source type. * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. */ public static Observable> groupBy(Observable source, final Func1 keySelector) { return create(OperationGroupBy.groupBy(source, keySelector)); } /** * Same functionality as merge except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error. *

* Only the first onError received will be sent. *

* This enables receiving all successes from merged sequences without one onError from one sequence causing all onNext calls to be prevented. *

* * * @param source * a list of Observables that emit sequences of items * @return an Observable that emits a sequence of elements that are the result of flattening the * output from the source list of Observables * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(List> source) { return create(OperationMergeDelayError.mergeDelayError(source)); } /** * Same functionality as merge except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error. *

* Only the first onError received will be sent. *

* This enables receiving all successes from merged sequences without one onError from one sequence causing all onNext calls to be prevented. *

* * * @param source * an Observable that emits Observables * @return an Observable that emits a sequence of elements that are the result of flattening the * output from the Observables emitted by the source Observable * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(Observable> source) { return create(OperationMergeDelayError.mergeDelayError(source)); } /** * Same functionality as merge except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error. *

* Only the first onError received will be sent. *

* This enables receiving all successes from merged sequences without one onError from one sequence causing all onNext calls to be prevented. *

* * * @param source * a series of Observables that emit sequences of items * @return an Observable that emits a sequence of elements that are the result of flattening the * output from the source Observables * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(Observable... source) { return create(OperationMergeDelayError.mergeDelayError(source)); } /** * Returns an Observable that never sends any information to an {@link Observer}. * * This observable is useful primarily for testing purposes. * * @param * the type of item (not) emitted by the Observable * @return an Observable that never sends any information to an {@link Observer} */ public static Observable never() { return new NeverObservable(); } /** * Instruct an Observable to pass control to another Observable (the return value of a function) * rather than calling onError if it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer, * the Observable calls its {@link Observer}'s onError function, and then quits without calling any more * of its {@link Observer}'s closures. The onErrorResumeNext method changes this behavior. If you pass a * function that emits an Observable (resumeFunction) to an Observable's onErrorResumeNext method, * if the original Observable encounters an error, instead of calling its {@link Observer}'s onError function, it * will instead relinquish control to this new Observable, which will call the {@link Observer}'s onNext method if * it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may * never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered. *

* * * @param that * the source Observable * @param resumeFunction * a function that returns an Observable that will take over if the source Observable * encounters an error * @return the source Observable, with its behavior modified as described */ public static Observable onErrorResumeNext(final Observable that, final Func1> resumeFunction) { return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction)); } /** * Instruct an Observable to pass control to another Observable (the return value of a function) * rather than calling onError if it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer, * the Observable calls its {@link Observer}'s onError function, and then quits without calling any more * of its {@link Observer}'s closures. The onErrorResumeNext method changes this behavior. If you pass a * function that emits an Observable (resumeFunction) to an Observable's onErrorResumeNext method, * if the original Observable encounters an error, instead of calling its {@link Observer}'s onError function, it * will instead relinquish control to this new Observable, which will call the {@link Observer}'s onNext method if * it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may * never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered. *

* * * @param that * the source Observable * @param resumeFunction * a function that returns an Observable that will take over if the source Observable * encounters an error * @return the source Observable, with its behavior modified as described */ public static Observable onErrorResumeNext(final Observable that, final Object resumeFunction) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(resumeFunction); return onErrorResumeNext(that, new Func1>() { @SuppressWarnings("unchecked") @Override public Observable call(Exception e) { return (Observable) _f.call(e); } }); } /** * Instruct an Observable to pass control to another Observable rather than calling onError if it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected item to its Observer, * the Observable calls its {@link Observer}'s onError function, and then quits without calling any more * of its {@link Observer}'s closures. The onErrorResumeNext method changes this behavior. If you pass a * function that emits an Observable (resumeFunction) to an Observable's onErrorResumeNext method, * if the original Observable encounters an error, instead of calling its {@link Observer}'s onError function, it * will instead relinquish control to this new Observable, which will call the {@link Observer}'s onNext method if * it is able to do so. In such a case, because no Observable necessarily invokes onError, the Observer may * never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered. *

* * * @param that * the source Observable * @param resumeSequence * a function that returns an Observable that will take over if the source Observable * encounters an error * @return the source Observable, with its behavior modified as described */ public static Observable onErrorResumeNext(final Observable that, final Observable resumeSequence) { return create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence)); } /** * Instruct an Observable to emit a particular item to its Observer's onNext function * rather than calling onError if it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected item to its {@link Observer}, the Observable calls its {@link Observer}'s onError * function, and then quits * without calling any more of its {@link Observer}'s closures. The onErrorReturn method changes * this behavior. If you pass a function (resumeFunction) to an Observable's onErrorReturn * method, if the original Observable encounters an error, instead of calling its {@link Observer}'s * onError function, it will instead pass the return value of resumeFunction to the {@link Observer}'s onNext method. *

* You can use this to prevent errors from propagating or to supply fallback data should errors be encountered. * * @param that * the source Observable * @param resumeFunction * a function that returns a value that will be passed into an {@link Observer}'s onNext function if the Observable encounters an error that would * otherwise cause it to call onError * @return the source Observable, with its behavior modified as described */ public static Observable onErrorReturn(final Observable that, Func1 resumeFunction) { return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction)); } /** * Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notifications. * * @param that * the source Observable * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. */ public static ConnectableObservable replay(final Observable that) { return OperationMulticast.multicast(that, ReplaySubject. create()); } /** * Similar to {@link #replay()} except that this auto-subscribes to the source sequence. *

* This is useful when returning an Observable that you wish to cache responses but can't control the * subscribe/unsubscribe behavior of all the Observers. *

* NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not * use this on infinite or very large sequences that will use up memory. This is similar to * the {@link Observable#toList()} operator in this caution. * * @return an observable sequence that upon first subscription caches all events for subsequent subscriptions. */ public static Observable cache(final Observable that) { return create(OperationCache.cache(that)); } /** * Returns a connectable observable sequence that shares a single subscription to the underlying sequence. * * @param that * the source Observable * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. */ public static ConnectableObservable publish(final Observable that) { return OperationMulticast.multicast(that, PublishSubject. create()); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the final result from the final call to your function as its sole * output. *

* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject * method that does a similar operation on lists. *

* * * @param * the type item emitted by the source Observable * @param sequence * the source Observable * @param accumulator * an accumulator function to be invoked on each element from the sequence, whose * result will be used in the next accumulator call (if applicable) * * @return an Observable that emits a single element that is the result of accumulating the * output from applying the accumulator to the sequence of items emitted by the source * Observable * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public static Observable reduce(Observable sequence, Func2 accumulator) { return takeLast(create(OperationScan.scan(sequence, accumulator)), 1); } /** * Used by dynamic languages. * * @see #reduce(Observable, Func2) */ public static Observable reduce(final Observable sequence, final Object accumulator) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(accumulator); return reduce(sequence, new Func2() { @SuppressWarnings("unchecked") @Override public T call(T t1, T t2) { return (T) _f.call(t1, t2); } }); } /** * @see #reduce(Observable, Func2) */ public static Observable aggregate(Observable sequence, Func2 accumulator) { return reduce(sequence, accumulator); } /** * Used by dynamic languages. * * @see #reduce(Observable, Func2) */ public static Observable aggregate(Observable sequence, Object accumulator) { return reduce(sequence, accumulator); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the final result from the final call to your function as its sole * output. *

* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject * method that does a similar operation on lists. *

* * * @param * the type item emitted by the source Observable * @param * the type returned for each item of the target observable * @param sequence * the source Observable * @param initialValue * a seed passed into the first execution of the accumulator function * @param accumulator * an accumulator function to be invoked on each element from the sequence, whose * result will be used in the next accumulator call (if applicable) * * @return an Observable that emits a single element that is the result of accumulating the * output from applying the accumulator to the sequence of items emitted by the source * Observable * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public static Observable reduce(Observable sequence, R initialValue, Func2 accumulator) { return takeLast(create(OperationScan.scan(sequence, initialValue, accumulator)), 1); } /** * Used by dynamic languages. * * @see #reduce(Observable, Object, Func2) */ public static Observable reduce(final Observable sequence, final R initialValue, final Object accumulator) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(accumulator); return reduce(sequence, initialValue, new Func2() { @SuppressWarnings("unchecked") @Override public R call(R r, T t) { return (R) _f.call(r, t); } }); } /** * @see #reduce(Observable, Object, Func2) */ public static Observable aggregate(Observable sequence, R initialValue, Func2 accumulator) { return reduce(sequence, initialValue, accumulator); } /** * Used by dynamic languages. * * @see #reduce(Observable, Object, Func2) */ public static Observable aggregate(Observable sequence, R initialValue, Object accumulator) { return reduce(sequence, initialValue, accumulator); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the result of each of these iterations as its own sequence. *

* * * @param * the type item emitted by the source Observable * @param sequence * the source Observable * @param accumulator * an accumulator function to be invoked on each element from the sequence, whose * result will be emitted and used in the next accumulator call (if applicable) * @return an Observable that emits a sequence of items that are the result of accumulating the * output from the sequence emitted by the source Observable * @see MSDN: Observable.Scan */ public static Observable scan(Observable sequence, Func2 accumulator) { return create(OperationScan.scan(sequence, accumulator)); } /** * Used by dynamic languages. * * @see #scan(Observable, Func2) */ public static Observable scan(final Observable sequence, final Object accumulator) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(accumulator); return scan(sequence, new Func2() { @SuppressWarnings("unchecked") @Override public T call(T t1, T t2) { return (T) _f.call(t1, t2); } }); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the result of each of these iterations as its own sequence. *

* * * @param * the type item emitted by the source Observable * @param * the type returned for each item of the target observable * @param sequence * the source Observable * @param initialValue * the initial (seed) accumulator value * @param accumulator * an accumulator function to be invoked on each element from the sequence, whose * result will be emitted and used in the next accumulator call (if applicable) * @return an Observable that emits a sequence of items that are the result of accumulating the * output from the sequence emitted by the source Observable * @see MSDN: Observable.Scan */ public static Observable scan(Observable sequence, R initialValue, Func2 accumulator) { return create(OperationScan.scan(sequence, initialValue, accumulator)); } /** * Used by dynamic languages. * * @see #scan(Observable, Object, Func2) */ public static Observable scan(final Observable sequence, final R initialValue, final Object accumulator) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(accumulator); return scan(sequence, initialValue, new Func2() { @SuppressWarnings("unchecked") @Override public R call(R r, T t) { return (R) _f.call(r, t); } }); } /** * Determines whether all elements of an observable sequence satisfies a condition. * * @param sequence * an observable sequence whose elements to apply the predicate to. * @param predicate * a function to test each element for a condition. * @param * the type of observable. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public static Observable all(final Observable sequence, final Func1 predicate) { return create(OperationAll.all(sequence, predicate)); } /** * Determines whether all elements of an observable sequence satisfies a condition. * * @param sequence * an observable sequence whose elements to apply the predicate to. * @param predicate * a function to test each element for a condition. * @param * the type of observable. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public static Observable all(final Observable sequence, Object predicate) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(predicate); return all(sequence, new Func1() { @Override public Boolean call(T t) { return (Boolean) _f.call(t); } }); } /** * Returns an Observable that skips the first num items emitted by the source * Observable. You can ignore the first num items emitted by an Observable and attend * only to those items that come after, by modifying the Observable with the skip method. *

* * * @param items * the source Observable * @param num * the number of items to skip * @return an Observable that emits the same sequence of items emitted by the source Observable, * except for the first num items * @see MSDN: Observable.Skip Method */ public static Observable skip(final Observable items, int num) { return create(OperationSkip.skip(items, num)); } /** * Accepts an {@link Observable} sequence of {@link Observable} sequences, and transforms it into a single * {@link Observable} sequence, which publishes the values of the most recently published {@link Observable} sequence. * * @param sequenceOfSequences * the {@link Observable} sequence of {@link Observable} sequences. * @return an {@link Observable} which publishes only the values of the most recently published * {@link Observable} sequence. */ public static Observable switchDo(Observable> sequenceOfSequences) { return create(OperationSwitch.switchDo(sequenceOfSequences)); } /** * Accepts an Observable and wraps it in another Observable that ensures that the resulting * Observable is chronologically well-behaved. *

* A well-behaved observable ensures onNext, onCompleted, or onError calls to its subscribers are not interleaved, onCompleted and * onError are only called once respectively, and no * onNext calls follow onCompleted and onError calls. * * @param observable * the source Observable * @param * the type of item emitted by the source Observable * @return an Observable that is a chronologically well-behaved version of the source Observable */ public static Observable synchronize(Observable observable) { return create(OperationSynchronize.synchronize(observable)); } /** * Returns an Observable that emits the first num items emitted by the source * Observable. *

* You can choose to pay attention only to the first num values emitted by an Observable by calling its take method. This method returns an Observable that will call a * subscribing Observer's onNext function a * maximum of num times before calling onCompleted. *

* * * @param items * the source Observable * @param num * the number of items from the start of the sequence emitted by the source * Observable to emit * @return an Observable that only emits the first num items emitted by the source * Observable */ public static Observable take(final Observable items, final int num) { return create(OperationTake.take(items, num)); } /** * Returns an Observable that emits the last count items emitted by the source * Observable. * * @param items * the source Observable * @param count * the number of items from the end of the sequence emitted by the source * Observable to emit * @return an Observable that only emits the last count items emitted by the source * Observable */ public static Observable takeLast(final Observable items, final int count) { return create(OperationTakeLast.takeLast(items, count)); } /** * Returns the values from the start of an observable sequence while a given predicate remains true. * * @param items * @param predicate * a function to test each source element for a condition * @return the values from the start of the given sequence */ public static Observable takeWhile(final Observable items, Func1 predicate) { return create(OperationTakeWhile.takeWhile(items, predicate)); } /** * Returns the values from the start of an observable sequence while a given predicate remains true. * * @param items * @param predicate * a function to test each source element for a condition * @return the values from the start of the given sequence */ public static Observable takeWhile(final Observable items, Object predicate) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(predicate); return takeWhile(items, new Func1() { @Override public Boolean call(T t) { return (Boolean) _f.call(t); } }); } /** * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. * * @param items * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. * @return the values from the start of the given sequence */ public static Observable takeWhileWithIndex(final Observable items, Func2 predicate) { return create(OperationTakeWhile.takeWhileWithIndex(items, predicate)); } public static Observable takeWhileWithIndex(final Observable items, Object predicate) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(predicate); return create(OperationTakeWhile.takeWhileWithIndex(items, new Func2() { @Override public Boolean call(T t, Integer integer) { return (Boolean) _f.call(t, integer); } })); } /** * Adds a timestamp to each item emitted by this observable. * * @return An observable sequence of timestamped items. */ public Observable> timestamp() { return create(OperationTimestamp.timestamp(this)); } /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. *

* Normally, an Observable that returns multiple items will do so by calling its Observer's onNext function for each such item. You can change this behavior, instructing the * Observable * to * compose a list of all of these multiple items and * then to call the Observer's onNext function once, passing it the entire list, by calling the Observable object's toList method prior to calling its * subscribe * method. *

* * * @param that * the source Observable * @return an Observable that emits a single item: a List containing all of the * items emitted by the source Observable */ public static Observable> toList(final Observable that) { return create(OperationToObservableList.toObservableList(that)); } /** * Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. * * @param source * the source sequence whose elements will be pushed into the specified subject. * @param subject * the subject to push source elements into. * @param * source type * @param * result type * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. */ public static ConnectableObservable multicast(Observable source, final Subject subject) { return OperationMulticast.multicast(source, subject); } /** * Converts an Iterable sequence to an Observable sequence. * * Any object that supports the Iterable interface can be converted into an Observable that emits * each iterable item in the object, by passing the object into the toObservable method. *

* * * @param iterable * the source Iterable sequence * @param * the type of items in the iterable sequence and the type emitted by the resulting * Observable * @return an Observable that emits each item in the source Iterable sequence */ public static Observable toObservable(Iterable iterable) { return create(OperationToObservableIterable.toObservableIterable(iterable)); } /** * Converts an Future to an Observable sequence. * * Any object that supports the {@link Future} interface can be converted into an Observable that emits * the return value of the get() method in the object, by passing the object into the toObservable method. *

* This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. * * @param future * the source {@link Future} * @param * the type of of object that the future's returns and the type emitted by the resulting * Observable * @return an Observable that emits the item from the source Future * @deprecated Replaced by {@link #from(Future)} */ public static Observable toObservable(Future future) { return create(OperationToObservableFuture.toObservableFuture(future)); } /** * Converts an Future to an Observable sequence. * * Any object that supports the {@link Future} interface can be converted into an Observable that emits * the return value of the get() method in the object, by passing the object into the toObservable method. *

* This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. * * @param future * the source {@link Future} * @param * the type of of object that the future's returns and the type emitted by the resulting * Observable * @return an Observable that emits the item from the source Future */ public static Observable from(Future future) { return create(OperationToObservableFuture.toObservableFuture(future)); } /** * Converts an Future to an Observable sequence. * * Any object that supports the {@link Future} interface can be converted into an Observable that emits * the return value of the get() method in the object, by passing the object into the toObservable method. * The subscribe method on this synchronously so the Subscription returned doesn't nothing. *

* This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. * * @param future * the source {@link Future} * @param timeout * the maximum time to wait * @param unit * the time unit of the time argument * @param * the type of of object that the future's returns and the type emitted by the resulting * Observable * @return an Observable that emits the item from the source Future * @deprecated Replaced by {@link #from(Future, long, TimeUnit)} */ public static Observable toObservable(Future future, long timeout, TimeUnit unit) { return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); } /** * Converts an Future to an Observable sequence. * * Any object that supports the {@link Future} interface can be converted into an Observable that emits * the return value of the get() method in the object, by passing the object into the toObservable method. * The subscribe method on this synchronously so the Subscription returned doesn't nothing. *

* This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. * * @param future * the source {@link Future} * @param timeout * the maximum time to wait * @param unit * the time unit of the time argument * @param * the type of of object that the future's returns and the type emitted by the resulting * Observable * @return an Observable that emits the item from the source Future */ public static Observable from(Future future, long timeout, TimeUnit unit) { return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); } /** * Converts an Array sequence to an Observable sequence. * * An Array can be converted into an Observable that emits each item in the Array, by passing the * Array into the toObservable method. *

* * * @param items * the source Array * @param * the type of items in the Array, and the type of items emitted by the resulting * Observable * @return an Observable that emits each item in the source Array * @deprecated Use {@link #from(Object...)} */ public static Observable toObservable(T... items) { return toObservable(Arrays.asList(items)); } /** * Sort T objects by their natural order (object must implement Comparable). *

* * * @param sequence * @throws ClassCastException * if T objects do not implement Comparable * @return an observable containing the sorted list */ public static Observable> toSortedList(Observable sequence) { return create(OperationToObservableSortedList.toSortedList(sequence)); } /** * Sort T objects using the defined sort function. *

* * * @param sequence * @param sortFunction * @return an observable containing the sorted list */ public static Observable> toSortedList(Observable sequence, Func2 sortFunction) { return create(OperationToObservableSortedList.toSortedList(sequence, sortFunction)); } /** * Sort T objects using the defined sort function. *

* * * @param sequence * @param sortFunction * @return an observable containing the sorted list */ public static Observable> toSortedList(Observable sequence, final Object sortFunction) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(sortFunction); return create(OperationToObservableSortedList.toSortedList(sequence, new Func2() { @Override public Integer call(T t1, T t2) { return (Integer) _f.call(t1, t2); } })); } /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by two other Observables, with the results of this function becoming the * sequence emitted by the returned Observable. *

* zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by * w0 * and the first item emitted by w1; the * second item emitted by the new Observable will be the result of the function applied to the second item emitted by w0 and the second item emitted by w1; and so forth. *

* The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the * shortest sequence. *

* * * @param w0 * one source Observable * @param w1 * another source Observable * @param reduceFunction * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Func2 reduceFunction) { return create(OperationZip.zip(w0, w1, reduceFunction)); } /** * Determines whether two sequences are equal by comparing the elements pairwise. * * @param first * observable to compare * @param second * observable to compare * @param * type of sequence * @return sequence of booleans, true if two sequences are equal by comparing the elements pairwise; otherwise, false. */ public static Observable sequenceEqual(Observable first, Observable second) { return sequenceEqual(first, second, new Func2() { @Override public Boolean call(T first, T second) { return first.equals(second); } }); } /** * Determines whether two sequences are equal by comparing the elements pairwise using a specified equality function. * * @param first * observable sequence to compare * @param second * observable sequence to compare * @param equality * a function used to compare elements of both sequences * @param * type of sequence * @return sequence of booleans, true if two sequences are equal by comparing the elements pairwise; otherwise, false. */ public static Observable sequenceEqual(Observable first, Observable second, Func2 equality) { return zip(first, second, equality); } /** * Determines whether two sequences are equal by comparing the elements pairwise using a specified equality function. * * @param first * observable sequence to compare * @param second * observable sequence to compare * @param equality * a function used to compare elements of both sequences * @param * type of sequence * @return sequence of booleans, true if two sequences are equal by comparing the elements pairwise; otherwise, false. */ public static Observable sequenceEqual(Observable first, Observable second, Object equality) { return zip(first, second, equality); } /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by two other Observables, with the results of this function becoming the * sequence emitted by the returned Observable. *

* zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by * w0 * and the first item emitted by w1; the * second item emitted by the new Observable will be the result of the function applied to the second item emitted by w0 and the second item emitted by w1; and so forth. *

* The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the * shortest sequence. *

* * * @param w0 * one source Observable * @param w1 * another source Observable * @param function * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, final Object function) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(function); return zip(w0, w1, new Func2() { @SuppressWarnings("unchecked") @Override public R call(T0 t0, T1 t1) { return (R) _f.call(t0, t1); } }); } /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by three other Observables, with the results of this function becoming * the sequence emitted by the returned Observable. *

* zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by * w0, * the first item emitted by w1, and the * first item emitted by w2; the second item emitted by the new Observable will be the result of the function applied to the second item emitted by w0, the second item * emitted by w1, and the second item * emitted by w2; and so forth. *

* The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the * shortest sequence. *

* * * @param w0 * one source Observable * @param w1 * another source Observable * @param w2 * a third source Observable * @param function * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, Func3 function) { return create(OperationZip.zip(w0, w1, w2, function)); } /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by three other Observables, with the results of this function becoming * the sequence emitted by the returned Observable. *

* zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by * w0, * the first item emitted by w1, and the * first item emitted by w2; the second item emitted by the new Observable will be the result of the function applied to the second item emitted by w0, the second item * emitted by w1, and the second item * emitted by w2; and so forth. *

* The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the * shortest sequence. *

* * * @param w0 * one source Observable * @param w1 * another source Observable * @param w2 * a third source Observable * @param function * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, final Object function) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(function); return zip(w0, w1, w2, new Func3() { @SuppressWarnings("unchecked") @Override public R call(T0 t0, T1 t1, T2 t2) { return (R) _f.call(t0, t1, t2); } }); } /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by four other Observables, with the results of this function becoming * the sequence emitted by the returned Observable. *

* zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by * w0, * the first item emitted by w1, the * first item emitted by w2, and the first item emitted by w3; the second item emitted by the new Observable will be the result of the function applied to the second item * emitted by each of those Observables; and so forth. *

* The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the * shortest sequence. *

* * * @param w0 * one source Observable * @param w1 * another source Observable * @param w2 * a third source Observable * @param w3 * a fourth source Observable * @param reduceFunction * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, Func4 reduceFunction) { return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction)); } /** * Returns an Observable that applies a function of your choosing to the combination of items * emitted, in sequence, by four other Observables, with the results of this function becoming * the sequence emitted by the returned Observable. *

* zip applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by * w0, * the first item emitted by w1, the * first item emitted by w2, and the first item emitted by w3; the second item emitted by the new Observable will be the result of the function applied to the second item * emitted by each of those Observables; and so forth. *

* The resulting Observable returned from zip will call onNext as many times as the number onNext calls of the source Observable with the * shortest sequence. *

* * * @param w0 * one source Observable * @param w1 * another source Observable * @param w2 * a third source Observable * @param w3 * a fourth source Observable * @param function * a function that, when applied to an item emitted by each of the source Observables, * results in a value that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, final Object function) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(function); return zip(w0, w1, w2, w3, new Func4() { @SuppressWarnings("unchecked") @Override public R call(T0 t0, T1 t1, T2 t2, T3 t3) { return (R) _f.call(t0, t1, t2, t3); } }); } /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

* * * @param predicate * a function that evaluates the items emitted by the source Observable, returning * true if they pass the filter * @return an Observable that emits only those items in the original Observable that the filter * evaluates as true */ public Observable filter(Func1 predicate) { return filter(this, predicate); } /** * Registers an action to be called when this observable calls * onComplete or onError. * * @param action * an action to be called when this observable completes or errors. * @return an Observable that emits the same objects as this observable, then calls the action. * @see MSDN: Observable.Finally Method */ public Observable finallyDo(Action0 action) { return create(OperationFinally.finallyDo(this, action)); } /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

* * * @param callback * a function that evaluates the items emitted by the source Observable, returning * true if they pass the filter * @return an Observable that emits only those items in the original Observable that the filter * evaluates as "true" */ public Observable filter(final Object callback) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(callback); return filter(this, new Func1() { @Override public Boolean call(T t1) { return (Boolean) _f.call(t1); } }); } /** * Creates a new Observable sequence by applying a function that you supply to each item in the * original Observable sequence, where that function is itself an Observable that emits items, and * then merges the results of that function applied to every item emitted by the original * Observable, emitting these merged results as its own sequence. *

* Note: mapMany and flatMap are equivalent. *

* * * @param func * a function to apply to each item in the sequence, that returns an Observable. * @return an Observable that emits a sequence that is the result of applying the transformation * function to each item in the input sequence and merging the results of the * Observables obtained from this transformation. * @see #mapMany(Func1) */ public Observable flatMap(Func1> func) { return mapMany(func); } /** * Creates a new Observable sequence by applying a function that you supply to each item in the * original Observable sequence, where that function is itself an Observable that emits items, and * then merges the results of that function applied to every item emitted by the original * Observable, emitting these merged results as its own sequence. *

* Note: mapMany and flatMap are equivalent. *

* * * @param callback * a function to apply to each item in the sequence that returns an Observable. * @return an Observable that emits a sequence that is the result of applying the transformation' * function to each item in the input sequence and merging the results of the * Observables obtained from this transformation. * @see #mapMany(Object) */ public Observable flatMap(final Object callback) { return mapMany(callback); } /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

* * * @param predicate * a function that evaluates the items emitted by the source Observable, returning * true if they pass the filter * @return an Observable that emits only those items in the original Observable that the filter * evaluates as true */ public Observable where(Func1 predicate) { return where(this, predicate); } /** * Applies a function of your choosing to every item emitted by an Observable, and returns this * transformation as a new Observable sequence. *

* * * @param func * a function to apply to each item in the sequence. * @return an Observable that emits a sequence that is the result of applying the transformation * function to each item in the sequence emitted by the input Observable. */ public Observable map(Func1 func) { return map(this, func); } /** * Applies a function of your choosing to every item emitted by an Observable, and returns this * transformation as a new Observable sequence. *

* * * @param callback * a function to apply to each item in the sequence. * @return an Observable that emits a sequence that is the result of applying the transformation * function to each item in the sequence emitted by the input Observable. */ public Observable map(final Object callback) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(callback); return map(this, new Func1() { @Override @SuppressWarnings("unchecked") public R call(T t1) { return (R) _f.call(t1); } }); } /** * Creates a new Observable sequence by applying a function that you supply to each item in the * original Observable sequence, where that function is itself an Observable that emits items, and * then merges the results of that function applied to every item emitted by the original * Observable, emitting these merged results as its own sequence. *

* Note: mapMany and flatMap are equivalent. *

* * * @param func * a function to apply to each item in the sequence, that returns an Observable. * @return an Observable that emits a sequence that is the result of applying the transformation * function to each item in the input sequence and merging the results of the * Observables obtained from this transformation. * @see #flatMap(Func1) */ public Observable mapMany(Func1> func) { return mapMany(this, func); } /** * Creates a new Observable sequence by applying a function that you supply to each item in the * original Observable sequence, where that function is itself an Observable that emits items, and * then merges the results of that function applied to every item emitted by the original * Observable, emitting these merged results as its own sequence. *

* Note: mapMany and flatMap are equivalent. *

* * * @param callback * a function to apply to each item in the sequence that returns an Observable. * @return an Observable that emits a sequence that is the result of applying the transformation' * function to each item in the input sequence and merging the results of the * Observables obtained from this transformation. * @see #flatMap(Object) */ public Observable mapMany(final Object callback) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(callback); return mapMany(this, new Func1>() { @Override @SuppressWarnings("unchecked") public Observable call(T t1) { return (Observable) _f.call(t1); } }); } /** * Materializes the implicit notifications of this observable sequence as explicit notification values. *

* * * @return An observable sequence whose elements are the result of materializing the notifications of the given sequence. * @see MSDN: Observable.materialize */ public Observable> materialize() { return materialize(this); } /** * Asynchronously subscribes and unsubscribes observers on the specified scheduler. * * @param scheduler * the scheduler to perform subscription and unsubscription actions on. * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. */ public Observable subscribeOn(Scheduler scheduler) { return subscribeOn(this, scheduler); } /** * Asynchronously notify observers on the specified scheduler. * * @param scheduler * the scheduler to notify observers on. * @return the source sequence whose observations happen on the specified scheduler. */ public Observable observeOn(Scheduler scheduler) { return observeOn(this, scheduler); } /** * Dematerializes the explicit notification values of an observable sequence as implicit notifications. * * @return An observable sequence exhibiting the behavior corresponding to the source sequence's notification values. * @see MSDN: Observable.dematerialize * @throws Exception * if attempted on Observable not of type {@code Observable>}. */ @SuppressWarnings("unchecked") public Observable dematerialize() { return dematerialize((Observable>) this); } /** * Instruct an Observable to pass control to another Observable rather than calling onError if it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected * item to its Observer, the Observable calls its Observer's onError function, and * then quits without calling any more of its Observer's closures. The * onErrorResumeNext method changes this behavior. If you pass another Observable * (resumeFunction) to an Observable's onErrorResumeNext method, if the * original Observable encounters an error, instead of calling its Observer's * onErrort function, it will instead relinquish control to * resumeFunction which will call the Observer's onNext method if it * is able to do so. In such a case, because no Observable necessarily invokes * onError, the Observer may never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. *

* * * @param resumeFunction * @return the original Observable, with appropriately modified behavior */ public Observable onErrorResumeNext(final Func1> resumeFunction) { return onErrorResumeNext(this, resumeFunction); } /** * Instruct an Observable to emit a particular item rather than calling onError if * it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected * item to its Observer, the Observable calls its Observer's onError function, and * then quits without calling any more of its Observer's closures. The * onErrorResumeNext method changes this behavior. If you pass another Observable * (resumeFunction) to an Observable's onErrorResumeNext method, if the * original Observable encounters an error, instead of calling its Observer's * onError function, it will instead relinquish control to * resumeFunction which will call the Observer's onNext method if it * is able to do so. In such a case, because no Observable necessarily invokes * onError, the Observer may never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. *

* * * @param resumeFunction * @return the original Observable with appropriately modified behavior */ public Observable onErrorResumeNext(final Object resumeFunction) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(resumeFunction); return onErrorResumeNext(this, new Func1>() { @Override @SuppressWarnings("unchecked") public Observable call(Exception e) { return (Observable) _f.call(e); } }); } /** * Instruct an Observable to pass control to another Observable rather than calling * onError if it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected * item to its Observer, the Observable calls its Observer's onError function, and * then quits without calling any more of its Observer's closures. The * onErrorResumeNext method changes this behavior. If you pass another Observable * (resumeSequence) to an Observable's onErrorResumeNext method, if the * original Observable encounters an error, instead of calling its Observer's * onError function, it will instead relinquish control to * resumeSequence which will call the Observer's onNext method if it * is able to do so. In such a case, because no Observable necessarily invokes * onError, the Observer may never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. *

* * * @param resumeSequence * @return the original Observable, with appropriately modified behavior */ public Observable onErrorResumeNext(final Observable resumeSequence) { return onErrorResumeNext(this, resumeSequence); } /** * Instruct an Observable to emit a particular item rather than calling onError if * it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected * object to its Observer, the Observable calls its Observer's onError function, and * then quits without calling any more of its Observer's closures. The * onErrorReturn method changes this behavior. If you pass a function * (resumeFunction) to an Observable's onErrorReturn method, if the * original Observable encounters an error, instead of calling its Observer's * onError function, it will instead call pass the return value of * resumeFunction to the Observer's onNext method. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. * * @param resumeFunction * @return the original Observable with appropriately modified behavior */ public Observable onErrorReturn(Func1 resumeFunction) { return onErrorReturn(this, resumeFunction); } /** * Instruct an Observable to emit a particular item rather than calling onError if * it encounters an error. *

* By default, when an Observable encounters an error that prevents it from emitting the expected * object to its Observer, the Observable calls its Observer's onError function, and * then quits without calling any more of its Observer's closures. The * onErrorReturn method changes this behavior. If you pass a function * (resumeFunction) to an Observable's onErrorReturn method, if the * original Observable encounters an error, instead of calling its Observer's * onError function, it will instead call pass the return value of * resumeFunction to the Observer's onNext method. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. * * @param resumeFunction * @return the original Observable with appropriately modified behavior */ public Observable onErrorReturn(final Object resumeFunction) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(resumeFunction); return onErrorReturn(this, new Func1() { @Override @SuppressWarnings("unchecked") public T call(Exception e) { return (T) _f.call(e); } }); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the final result from the final call to your function as its sole * output. *

* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," * "compress," or "inject" in other programming contexts. Groovy, for instance, has an * inject method that does a similar operation on lists. *

* * * @param accumulator * An accumulator function to be invoked on each element from the sequence, whose result * will be used in the next accumulator call (if applicable). * * @return An observable sequence with a single element from the result of accumulating the * output from the list of Observables. * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(Func2 accumulator) { return reduce(this, accumulator); } /** * Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notifications. * * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. */ public ConnectableObservable replay() { return replay(this); } /** * Similar to {@link #replay()} except that this auto-subscribes to the source sequence. *

* This is useful when returning an Observable that you wish to cache responses but can't control the * subscribe/unsubscribe behavior of all the Observers. *

* NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not * use this on infinite or very large sequences that will use up memory. This is similar to * the {@link Observable#toList()} operator in this caution. * * @return an observable sequence that upon first subscription caches all events for subsequent subscriptions. */ public Observable cache() { return cache(this); } /** * Returns a connectable observable sequence that shares a single subscription to the underlying sequence. * * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. */ public ConnectableObservable publish() { return publish(this); } /** * Used by dynamic languages. * * @see #reduce(Func2) */ public Observable reduce(Object accumulator) { return reduce(this, accumulator); } /** * @see #reduce(Func2) */ public Observable aggregate(Func2 accumulator) { return aggregate(this, accumulator); } /** * Used by dynamic languages. * * @see #reduce(Func2) */ public Observable aggregate(Object accumulator) { return aggregate(this, accumulator); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the final result from the final call to your function as its sole * output. *

* This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," * "compress," or "inject" in other programming contexts. Groovy, for instance, has an * inject method that does a similar operation on lists. *

* * * @param initialValue * The initial (seed) accumulator value. * @param accumulator * An accumulator function to be invoked on each element from the sequence, whose * result will be used in the next accumulator call (if applicable). * * @return an Observable that emits a single element from the result of accumulating the output * from the list of Observables. * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(R initialValue, Func2 accumulator) { return reduce(this, initialValue, accumulator); } /** * Used by dynamic languages. * * @see #reduce(Object, Func2) */ public Observable reduce(R initialValue, Object accumulator) { return reduce(this, initialValue, accumulator); } /** * @see #reduce(Object, Func2) */ public Observable aggregate(R initialValue, Func2 accumulator) { return aggregate(this, initialValue, accumulator); } /** * Used by dynamic languages. * * @see #reduce(Object, Func2) */ public Observable aggregate(R initialValue, Object accumulator) { return aggregate(this, initialValue, accumulator); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the result of each of these iterations. It emits the result of * each of these iterations as a sequence from the returned Observable. This sort of function is * sometimes called an accumulator. *

* * * @param accumulator * An accumulator function to be invoked on each element from the sequence whose * result will be sent via onNext and used in the next accumulator call * (if applicable). * @return an Observable sequence whose elements are the result of accumulating the output from * the list of Observables. * @see MSDN: Observable.Scan */ public Observable scan(Func2 accumulator) { return scan(this, accumulator); } /** * Samples the observable sequence at each interval. * * @param period * The period of time that defines the sampling rate. * @param unit * The time unit for the sampling rate time period. * @return An observable sequence whose elements are the results of sampling the current observable sequence. */ public Observable sample(long period, TimeUnit unit) { return create(OperationSample.sample(this, period, unit)); } /** * Samples the observable sequence at each interval. * * @param period * The period of time that defines the sampling rate. * @param unit * The time unit for the sampling rate time period. * @param scheduler * The scheduler to use for sampling. * @return An observable sequence whose elements are the results of sampling the current observable sequence. */ public Observable sample(long period, TimeUnit unit, Scheduler scheduler) { return create(OperationSample.sample(this, period, unit, scheduler)); } /** * Used by dynamic languages. * * @see #scan(Func2) */ public Observable scan(final Object accumulator) { return scan(this, accumulator); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by an Observable into the same function, and so on until all items have been emitted by the * source Observable, emitting the result of each of these iterations. This sort of function is * sometimes called an accumulator. *

* * * @param initialValue * The initial (seed) accumulator value. * @param accumulator * An accumulator function to be invoked on each element from the sequence whose * result will be sent via onNext and used in the next accumulator call * (if applicable). * @return an Observable sequence whose elements are the result of accumulating the output from * the list of Observables. * @see MSDN: Observable.Scan */ public Observable scan(R initialValue, Func2 accumulator) { return scan(this, initialValue, accumulator); } /** * Used by dynamic languages. * * @see #scan(Object, Func2) */ public Observable scan(final R initialValue, final Object accumulator) { return scan(this, initialValue, accumulator); } /** * Determines whether all elements of an observable sequence satisfies a condition. * * @param predicate * a function to test each element for a condition. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public Observable all(Func1 predicate) { return all(this, predicate); } /** * Determines whether all elements of an observable sequence satisfies a condition. * * @param predicate * a function to test each element for a condition. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public Observable all(Object predicate) { return all(this, predicate); } /** * Returns an Observable that skips the first num items emitted by the source * Observable. * You can ignore the first num items emitted by an Observable and attend only to * those items that come after, by modifying the Observable with the skip method. *

* * * @param num * The number of items to skip * @return an Observable sequence that is identical to the source Observable except that it does * not emit the first num items from that sequence. */ public Observable skip(int num) { return skip(this, num); } /** * Returns an Observable that emits the first num items emitted by the source * Observable. * * You can choose to pay attention only to the first num values emitted by a * Observable by calling its take method. This method returns an Observable that will * call a subscribing Observer's onNext function a maximum of num times * before calling onCompleted. *

* * * @param num * @return an Observable that emits only the first num items from the source * Observable, or all of the items from the source Observable if that Observable emits * fewer than num items. */ public Observable take(final int num) { return take(this, num); } /** * Returns an Observable that items emitted by the source Observable as long as a specified condition is true. * * @param predicate * a function to test each source element for a condition * @return the values from the start of the given sequence */ public Observable takeWhile(final Func1 predicate) { return takeWhile(this, predicate); } /** * Returns an Observable that items emitted by the source Observable as long as a specified condition is true. * * @param predicate * a function to test each source element for a condition * @return the values from the start of the given sequence */ public Observable takeWhile(final Object predicate) { return takeWhile(this, predicate); } /** * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. * * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. * @return the values from the start of the given sequence */ public Observable takeWhileWithIndex(final Func2 predicate) { return takeWhileWithIndex(this, predicate); } /** * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. * * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. * @return the values from the start of the given sequence */ public Observable takeWhileWithIndex(final Object predicate) { return takeWhileWithIndex(this, predicate); } /** * Returns an Observable that emits the last count items emitted by the source * Observable. * * @param count * the number of items from the end of the sequence emitted by the source * Observable to emit * @return an Observable that only emits the last count items emitted by the source * Observable */ public Observable takeLast(final int count) { return takeLast(this, count); } /** * Returns the values from the source observable sequence until the other observable sequence produces a value. * * @param other * the observable sequence that terminates propagation of elements of the source sequence. * @param * the other type. * @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation. */ public Observable takeUntil(Observable other) { return takeUntil(this, other); } /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. * * Normally, an Observable that returns multiple items will do so by calling its Observer's * onNext function for each such item. You can change this behavior, instructing * the Observable to compose a list of all of these multiple items and then to call the * Observer's onNext function once, passing it the entire list, by calling the * Observable object's toList method prior to calling its subscribe * method. *

* * * @return an Observable that emits a single item: a List containing all of the items emitted by * the source Observable. */ public Observable> toList() { return toList(this); } /** * Sort T objects by their natural order (object must implement Comparable). *

* * * @throws ClassCastException * if T objects do not implement Comparable * @return an observable containing the sorted list */ public Observable> toSortedList() { return toSortedList(this); } /** * Sort T objects using the defined sort function. *

* * * @param sortFunction * @return an observable containing the sorted list */ public Observable> toSortedList(Func2 sortFunction) { return toSortedList(this, sortFunction); } /** * Sort T objects using the defined sort function. *

* * * @param sortFunction * @return an observable containing the sorted list */ public Observable> toSortedList(final Object sortFunction) { return toSortedList(this, sortFunction); } @SuppressWarnings("unchecked") public Observable startWith(T... values) { return concat(Observable. from(values), this); } /** * Groups the elements of an observable and selects the resulting elements by using a specified function. * * @param keySelector * a function to extract the key for each element. * @param elementSelector * a function to map each source element to an element in an observable group. * @param * the key type. * @param * the resulting observable type. * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. */ public Observable> groupBy(final Func1 keySelector, final Func1 elementSelector) { return groupBy(this, keySelector, elementSelector); } /** * Groups the elements of an observable according to a specified key selector function and * * @param keySelector * a function to extract the key for each element. * @param * the key type. * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. */ public Observable> groupBy(final Func1 keySelector) { return groupBy(this, keySelector); } public BlockingObservable toBlockingObservable() { return BlockingObservable.from(this); } /** * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. *

* For why this is being used see https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" * * NOTE: If strong reasons for not depending on package names comes up then the implementation of this method can change to looking for a marker interface. * * @param f * @return {@code true} if the given function is an internal implementation, and {@code false} otherwise. */ private boolean isInternalImplementation(Object o) { if (o == null) { return true; } // prevent double-wrapping (yeah it happens) if (o instanceof AtomicObserver) return true; // we treat the following package as "internal" and don't wrap it return o.getClass().getPackage().getName().startsWith("rx.operators"); } public static class UnitTest { @Mock Observer w; @Before public void before() { MockitoAnnotations.initMocks(this); } @Test public void testCreate() { Observable observable = create(new Func1, Subscription>() { @Override public Subscription call(Observer Observer) { Observer.onNext("one"); Observer.onNext("two"); Observer.onNext("three"); Observer.onCompleted(); return Subscriptions.empty(); } }); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); observable.subscribe(aObserver); verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, times(1)).onNext("three"); verify(aObserver, Mockito.never()).onError(any(Exception.class)); verify(aObserver, times(1)).onCompleted(); } @Test public void testReduce() { Observable Observable = toObservable(1, 2, 3, 4); reduce(Observable, new Func2() { @Override public Integer call(Integer t1, Integer t2) { return t1 + t2; } }).subscribe(w); // we should be called only once verify(w, times(1)).onNext(anyInt()); verify(w).onNext(10); } @Test public void testReduceWithInitialValue() { Observable Observable = toObservable(1, 2, 3, 4); reduce(Observable, 50, new Func2() { @Override public Integer call(Integer t1, Integer t2) { return t1 + t2; } }).subscribe(w); // we should be called only once verify(w, times(1)).onNext(anyInt()); verify(w).onNext(60); } @Test public void testSequenceEqual() { Observable first = toObservable(1, 2, 3); Observable second = toObservable(1, 2, 4); @SuppressWarnings("unchecked") Observer result = mock(Observer.class); sequenceEqual(first, second).subscribe(result); verify(result, times(2)).onNext(true); verify(result, times(1)).onNext(false); } @Test public void testOnSubscribeFails() { @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); final RuntimeException re = new RuntimeException("bad impl"); Observable o = Observable.create(new Func1, Subscription>() { @Override public Subscription call(Observer t1) { throw re; } }); o.subscribe(observer); verify(observer, times(0)).onNext(anyString()); verify(observer, times(0)).onCompleted(); verify(observer, times(1)).onError(re); } @Test public void testMaterializeDematerializeChaining() { Observable obs = Observable.just(1); Observable chained = obs.materialize().dematerialize(); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); chained.subscribe(observer); verify(observer, times(1)).onNext(1); verify(observer, times(1)).onCompleted(); verify(observer, times(0)).onError(any(Exception.class)); } /** * The error from the user provided Observer is not handled by the subscribe method try/catch. * * It is handled by the AtomicObserver that wraps the provided Observer. * * Result: Passes (if AtomicObserver functionality exists) */ @Test public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger count = new AtomicInteger(); final AtomicReference error = new AtomicReference(); Observable.create(new Func1, Subscription>() { @Override public Subscription call(final Observer observer) { final BooleanSubscription s = new BooleanSubscription(); new Thread(new Runnable() { @Override public void run() { try { if (!s.isUnsubscribed()) { observer.onNext("1"); observer.onNext("2"); observer.onNext("three"); observer.onNext("4"); observer.onCompleted(); } } finally { latch.countDown(); } } }).start(); return s; } }).subscribe(new Observer() { @Override public void onCompleted() { System.out.println("completed"); } @Override public void onError(Exception e) { error.set(e); System.out.println("error"); e.printStackTrace(); } @Override public void onNext(String v) { int num = Integer.parseInt(v); System.out.println(num); // doSomething(num); count.incrementAndGet(); } }); // wait for async sequence to complete latch.await(); assertEquals(2, count.get()); assertNotNull(error.get()); if (!(error.get() instanceof NumberFormatException)) { fail("It should be a NumberFormatException"); } } /** * The error from the user provided Observer is handled by the subscribe try/catch because this is synchronous * * Result: Passes */ @Test public void testCustomObservableWithErrorInObserverSynchronous() { final AtomicInteger count = new AtomicInteger(); final AtomicReference error = new AtomicReference(); Observable.create(new Func1, Subscription>() { @Override public Subscription call(Observer observer) { observer.onNext("1"); observer.onNext("2"); observer.onNext("three"); observer.onNext("4"); observer.onCompleted(); return Subscriptions.empty(); } }).subscribe(new Observer() { @Override public void onCompleted() { System.out.println("completed"); } @Override public void onError(Exception e) { error.set(e); System.out.println("error"); e.printStackTrace(); } @Override public void onNext(String v) { int num = Integer.parseInt(v); System.out.println(num); // doSomething(num); count.incrementAndGet(); } }); assertEquals(2, count.get()); assertNotNull(error.get()); if (!(error.get() instanceof NumberFormatException)) { fail("It should be a NumberFormatException"); } } /** * The error from the user provided Observable is handled by the subscribe try/catch because this is synchronous * * * Result: Passes */ @Test public void testCustomObservableWithErrorInObservableSynchronous() { final AtomicInteger count = new AtomicInteger(); final AtomicReference error = new AtomicReference(); Observable.create(new Func1, Subscription>() { @Override public Subscription call(Observer observer) { observer.onNext("1"); observer.onNext("2"); throw new NumberFormatException(); } }).subscribe(new Observer() { @Override public void onCompleted() { System.out.println("completed"); } @Override public void onError(Exception e) { error.set(e); System.out.println("error"); e.printStackTrace(); } @Override public void onNext(String v) { System.out.println(v); count.incrementAndGet(); } }); assertEquals(2, count.get()); assertNotNull(error.get()); if (!(error.get() instanceof NumberFormatException)) { fail("It should be a NumberFormatException"); } } @Test public void testPublish() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(); ConnectableObservable o = Observable.create(new Func1, Subscription>() { @Override public Subscription call(final Observer observer) { final BooleanSubscription subscription = new BooleanSubscription(); new Thread(new Runnable() { @Override public void run() { counter.incrementAndGet(); observer.onNext("one"); observer.onCompleted(); } }).start(); return subscription; } }).publish(); final CountDownLatch latch = new CountDownLatch(2); // subscribe once o.subscribe(new Action1() { @Override public void call(String v) { assertEquals("one", v); latch.countDown(); } }); // subscribe again o.subscribe(new Action1() { @Override public void call(String v) { assertEquals("one", v); latch.countDown(); } }); Subscription s = o.connect(); try { if (!latch.await(1000, TimeUnit.MILLISECONDS)) { fail("subscriptions did not receive values"); } assertEquals(1, counter.get()); } finally { s.unsubscribe(); } } @Test public void testReplay() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(); ConnectableObservable o = Observable.create(new Func1, Subscription>() { @Override public Subscription call(final Observer observer) { final BooleanSubscription subscription = new BooleanSubscription(); new Thread(new Runnable() { @Override public void run() { counter.incrementAndGet(); observer.onNext("one"); observer.onCompleted(); } }).start(); return subscription; } }).replay(); // we connect immediately and it will emit the value Subscription s = o.connect(); try { // we then expect the following 2 subscriptions to get that same value final CountDownLatch latch = new CountDownLatch(2); // subscribe once o.subscribe(new Action1() { @Override public void call(String v) { assertEquals("one", v); latch.countDown(); } }); // subscribe again o.subscribe(new Action1() { @Override public void call(String v) { assertEquals("one", v); latch.countDown(); } }); if (!latch.await(1000, TimeUnit.MILLISECONDS)) { fail("subscriptions did not receive values"); } assertEquals(1, counter.get()); } finally { s.unsubscribe(); } } @Test public void testCache() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(); Observable o = Observable.create(new Func1, Subscription>() { @Override public Subscription call(final Observer observer) { final BooleanSubscription subscription = new BooleanSubscription(); new Thread(new Runnable() { @Override public void run() { counter.incrementAndGet(); observer.onNext("one"); observer.onCompleted(); } }).start(); return subscription; } }).cache(); // we then expect the following 2 subscriptions to get that same value final CountDownLatch latch = new CountDownLatch(2); // subscribe once o.subscribe(new Action1() { @Override public void call(String v) { assertEquals("one", v); latch.countDown(); } }); // subscribe again o.subscribe(new Action1() { @Override public void call(String v) { assertEquals("one", v); latch.countDown(); } }); if (!latch.await(1000, TimeUnit.MILLISECONDS)) { fail("subscriptions did not receive values"); } assertEquals(1, counter.get()); } } }