/** * Copyright 2014 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package rx; import static rx.util.functions.Functions.*; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import rx.joins.Pattern2; import rx.joins.Plan0; import rx.observables.BlockingObservable; import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; import rx.operators.OperationAll; import rx.operators.OperationAmb; import rx.operators.OperationAny; import rx.operators.OperationAsObservable; import rx.operators.OperationAverage; import rx.operators.OperationBuffer; import rx.operators.OperationCache; import rx.operators.OperationCombineLatest; import rx.operators.OperationConcat; import rx.operators.OperationDebounce; import rx.operators.OperationDefaultIfEmpty; import rx.operators.OperationDefer; import rx.operators.OperationDelay; import rx.operators.OperationDematerialize; import rx.operators.OperationDistinct; import rx.operators.OperationDistinctUntilChanged; import rx.operators.OperationDoOnEach; import rx.operators.OperationElementAt; import rx.operators.OperationFilter; import rx.operators.OperationFinally; import rx.operators.OperationFlatMap; import rx.operators.OperationGroupByUntil; import rx.operators.OperationGroupJoin; import rx.operators.OperationInterval; import rx.operators.OperationJoin; import rx.operators.OperationJoinPatterns; import rx.operators.OperationMaterialize; import rx.operators.OperationMergeDelayError; import rx.operators.OperationMinMax; import rx.operators.OperationMulticast; import rx.operators.OperationObserveOn; import rx.operators.OperationOnErrorResumeNextViaFunction; import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationParallelMerge; import rx.operators.OperationRepeat; import rx.operators.OperationReplay; import rx.operators.OperationRetry; import rx.operators.OperationSample; import rx.operators.OperationScan; import rx.operators.OperationSequenceEqual; import rx.operators.OperationSingle; import rx.operators.OperationSkip; import rx.operators.OperationSkipLast; import rx.operators.OperationSkipUntil; import rx.operators.OperationSkipWhile; import rx.operators.OperationSubscribeOn; import rx.operators.OperationSum; import rx.operators.OperationSwitch; import rx.operators.OperationSynchronize; import rx.operators.OperationTakeLast; import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; import rx.operators.OperationThrottleFirst; import rx.operators.OperationTimeInterval; import rx.operators.OperationTimeout; import rx.operators.OperationTimer; import rx.operators.OperationToMap; import rx.operators.OperationToMultimap; import rx.operators.OperationToObservableFuture; import rx.operators.OperationUsing; import rx.operators.OperationWindow; import rx.operators.OperationZip; import rx.operators.OperatorCast; import rx.operators.OperatorFromIterable; import rx.operators.OperatorGroupBy; import rx.operators.OperatorMap; import rx.operators.OperatorMerge; import rx.operators.OperatorParallel; import rx.operators.OperatorTake; import rx.operators.OperatorTakeTimed; import rx.operators.OperatorTimestamp; import rx.operators.OperatorToObservableList; import rx.operators.OperatorToObservableSortedList; import rx.operators.SafeObservableSubscription; import rx.operators.SafeObserver; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.schedulers.Schedulers; import rx.subjects.AsyncSubject; import rx.subjects.BehaviorSubject; import rx.subjects.PublishSubject; import rx.subjects.ReplaySubject; import rx.subjects.Subject; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; import rx.util.OnErrorNotImplementedException; import rx.util.Range; import rx.util.TimeInterval; import rx.util.Timestamped; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Action2; import rx.util.functions.Func0; import rx.util.functions.Func1; import rx.util.functions.Func2; import rx.util.functions.Func3; import rx.util.functions.Func4; import rx.util.functions.Func5; import rx.util.functions.Func6; import rx.util.functions.Func7; import rx.util.functions.Func8; import rx.util.functions.Func9; import rx.util.functions.FuncN; import rx.util.functions.Function; import rx.util.functions.Functions; /** * The Observable class that implements the Reactive Pattern. *

* This class provides methods for subscribing to the Observable as well as delegate methods to the * various operators. *

* The documentation for this class makes use of marble diagrams. The following legend explains * these diagrams: *

* *

* For more information see the * RxJava Wiki * * @param * the type of the items emitted by the Observable */ public class Observable { final Action1> f; /** * Observable with Function to execute when subscribed to. *

* Note: Use {@link #create(OnSubscribeFunc)} to create an Observable, instead of this * constructor, unless you specifically have a need for inheritance. * * @param onSubscribe * {@link OnSubscribeFunc} to be executed when {@link #subscribe(Observer)} is called */ protected Observable(Action1> f) { this.f = f; } /** * Function interface for work to be performed when an {@link Observable} is subscribed to via {@link Observable#subscribe(Observer)} * * @param * @deprecated */ @Deprecated public static interface OnSubscribeFunc extends Function { public Subscription onSubscribe(Observer t1); } private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); /** * Returns an Observable that will execute the specified function when an {@link Observer} subscribes to it. *

* *

* Write the function you pass to {@code create} so that it behaves as an Observable: It should * invoke the Observer's {@link Observer#onNext onNext}, {@link Observer#onError onError}, and {@link Observer#onCompleted onCompleted} methods appropriately. *

* A well-formed Observable must invoke either the Observer's {@code onCompleted} method * exactly once or its {@code onError} method exactly once. *

* See Rx Design Guidelines (PDF) * for detailed information. * * @param * the type of the items that this Observable emits * @param func * a function that accepts an {@code Observer}, invokes its {@code onNext}, {@code onError}, and {@code onCompleted} methods as appropriate, and returns a {@link Subscription} that * allows the Observer to cancel the subscription * @return an Observable that, when an {@link Observer} subscribes to it, will execute the * specified function * @see RxJava Wiki: create() * @see MSDN: Observable.Create */ public final static Observable create(final Action1> f) { return new Observable(f); } /** * Returns an Observable that will execute the specified function when an {@link Observer} subscribes to it. *

* *

* Write the function you pass to {@code create} so that it behaves as an Observable: It should * invoke the Observer's {@link Observer#onNext onNext}, {@link Observer#onError onError}, and {@link Observer#onCompleted onCompleted} methods appropriately. *

* A well-formed Observable must invoke either the Observer's {@code onCompleted} method * exactly once or its {@code onError} method exactly once. *

* See Rx Design Guidelines (PDF) * for detailed information. * * @param * the type of the items that this Observable emits * @param func * a function that accepts an {@code Observer}, invokes its {@code onNext}, {@code onError}, and {@code onCompleted} methods as appropriate, and returns a {@link Subscription} that * allows the Observer to cancel the subscription * @return an Observable that, when an {@link Observer} subscribes to it, will execute the * specified function * @see RxJava Wiki: create() * @see MSDN: Observable.Create * @deprecated */ @Deprecated public final static Observable create(final OnSubscribeFunc func) { return new Observable(new Action1>() { @Override public void call(Operator o) { o.add(func.onSubscribe(o)); } }); } /** * Bind a function to the current Observable and return a new Observable that when subscribed to will pass the values of the current Observable through the function. *

* In other words, this allows chaining operators together on an Observable for acting on the values within the Observable. *

* {@code * observable.map(...).filter(...).take(5).bind(new OperatorA()).bind(new OperatorB(...)).subscribe() * } * * @param bind * @return an Observable that emits values that are the result of applying the bind function to the values of the current Observable */ public Observable bind(final Func1, Operator> bind) { return new Observable(new Action1>() { @Override public void call(Operator o) { subscribe(bind.call(o)); } }); } /* ****************************************************************************** * Operators Below Here * ****************************************************************************** */ /** * Mirror the one Observable in an Iterable of several Observables that first emits an item. *

* * * @param sources * an Iterable of Observable sources competing to react first * @return an Observable that emits the same sequence of items as whichever of the source * Observables first emitted an item * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public final static Observable amb(Iterable> sources) { return create(OperationAmb.amb(sources)); } /** * Given two Observables, mirror the one that first emits an item. *

* * * @param o1 * an Observable competing to react first * @param o2 * an Observable competing to react first * @return an Observable that emits the same sequence of items as whichever of the source * Observables first emitted an item * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2) { return create(OperationAmb.amb(o1, o2)); } /** * Given three Observables, mirror the one that first emits an item. *

* * * @param o1 * an Observable competing to react first * @param o2 * an Observable competing to react first * @param o3 * an Observable competing to react first * @return an Observable that emits the same sequence of items as whichever of the source * Observables first emitted an item * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3) { return create(OperationAmb.amb(o1, o2, o3)); } /** * Given four Observables, mirror the one that first emits an item. *

* * * @param o1 * an Observable competing to react first * @param o2 * an Observable competing to react first * @param o3 * an Observable competing to react first * @param o4 * an Observable competing to react first * @return an Observable that emits the same sequence of items as whichever of the source * Observables first emitted an item * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4) { return create(OperationAmb.amb(o1, o2, o3, o4)); } /** * Given five Observables, mirror the one that first emits an item. *

* * * @param o1 * an Observable competing to react first * @param o2 * an Observable competing to react first * @param o3 * an Observable competing to react first * @param o4 * an Observable competing to react first * @param o5 * an Observable competing to react first * @return an Observable that emits the same sequence of items as whichever of the source * Observables first emitted an item * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5) { return create(OperationAmb.amb(o1, o2, o3, o4, o5)); } /** * Given six Observables, mirror the one that first emits an item. *

* * * @param o1 * an Observable competing to react first * @param o2 * an Observable competing to react first * @param o3 * an Observable competing to react first * @param o4 * an Observable competing to react first * @param o5 * an Observable competing to react first * @param o6 * an Observable competing to react first * @return an Observable that emits the same sequence of items as whichever of the source * Observables first emitted an item * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6) { return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6)); } /** * Given seven Observables, mirror the one that first emits an item. *

* * * @param o1 * an Observable competing to react first * @param o2 * an Observable competing to react first * @param o3 * an Observable competing to react first * @param o4 * an Observable competing to react first * @param o5 * an Observable competing to react first * @param o6 * an Observable competing to react first * @param o7 * an Observable competing to react first * @return an Observable that emits the same sequence of items as whichever of the source * Observables first emitted an item * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7) { return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7)); } /** * Given eight Observables, mirror the one that first emits an item. *

* * * @param o1 * an Observable competing to react first * @param o2 * an Observable competing to react first * @param o3 * an Observable competing to react first * @param o4 * an Observable competing to react first * @param o5 * an Observable competing to react first * @param o6 * an Observable competing to react first * @param o7 * an Observable competing to react first * @param o8 * an observable competing to react first * @return an Observable that emits the same sequence of items as whichever of the source * Observables first emitted an item * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8) { return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8)); } /** * Given nine Observables, mirror the one that first emits an item. *

* * * @param o1 * an Observable competing to react first * @param o2 * an Observable competing to react first * @param o3 * an Observable competing to react first * @param o4 * an Observable competing to react first * @param o5 * an Observable competing to react first * @param o6 * an Observable competing to react first * @param o7 * an Observable competing to react first * @param o8 * an Observable competing to react first * @param o9 * an Observable competing to react first * @return an Observable that emits the same sequence of items as whichever of the source * Observables first emitted an item * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Observable o9) { return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8, o9)); } /** * @deprecated use {@link #averageInteger} */ @Deprecated public final static Observable average(Observable source) { return OperationAverage.average(source); } /** * Returns an Observable that emits the average of the Doubles emitted by the source Observable. *

* * * @param source * source Observable to compute the average of * @return an Observable that emits a single item: the average of all the Doubles emitted * by the source Observable * @see RxJava Wiki: averageDouble() * @see MSDN: Observable.Average */ public final static Observable averageDouble(Observable source) { return OperationAverage.averageDoubles(source); } /** * Returns an Observable that emits the average of the Floats emitted by the source Observable. *

* * * @param source * source Observable to compute the average of * @return an Observable that emits a single item: the average of all the Floats emitted by * the source Observable * @see RxJava Wiki: averageFloat() * @see MSDN: Observable.Average */ public final static Observable averageFloat(Observable source) { return OperationAverage.averageFloats(source); } /** * Returns an Observable that emits the average of the Integers emitted by the source * Observable. *

* * * @param source * source Observable to compute the average of * @return an Observable that emits a single item: the average of all the Integers emitted * by the source Observable * @throws IllegalArgumentException * if the source Observable emits no items * @see RxJava Wiki: averageInteger() * @see MSDN: Observable.Average */ public final static Observable averageInteger(Observable source) { return OperationAverage.average(source); } /** * Returns an Observable that emits the average of the Longs emitted by the source Observable. *

* * * @param source * source Observable to compute the average of * @return an Observable that emits a single item: the average of all the Longs emitted by * the source Observable * @see RxJava Wiki: averageLong() * @see MSDN: Observable.Average */ public final static Observable averageLong(Observable source) { return OperationAverage.averageLongs(source); } /** * Combines two source Observables by emitting an item that aggregates the latest values of each * of the source Observables each time an item is received from either of the source * Observables, where this aggregation is defined by a specified function. *

* * * @param o1 * the first source Observable * @param o2 * the second source Observable * @param combineFunction * the aggregation function used to combine the items emitted by the source * Observables * @return an Observable that emits items that are the result of combining the items emitted by * the source Observables by means of the given aggregation function * @see RxJava Wiki: combineLatest() */ public final static Observable combineLatest(Observable o1, Observable o2, Func2 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, combineFunction)); } /** * Combines three source Observables by emitting an item that aggregates the latest values of * each of the source Observables each time an item is received from any of the source * Observables, where this aggregation is defined by a specified function. *

* * * @param o1 * the first source Observable * @param o2 * the second source Observable * @param o3 * the third source Observable * @param combineFunction * the aggregation function used to combine the items emitted by the source * Observables * @return an Observable that emits items that are the result of combining the items emitted by * the source Observables by means of the given aggregation function * @see RxJava Wiki: combineLatest() */ public final static Observable combineLatest(Observable o1, Observable o2, Observable o3, Func3 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, combineFunction)); } /** * Combines four source Observables by emitting an item that aggregates the latest values of * each of the source Observables each time an item is received from any of the source * Observables, where this aggregation is defined by a specified function. *

* * * @param o1 * the first source Observable * @param o2 * the second source Observable * @param o3 * the third source Observable * @param o4 * the fourth source Observable * @param combineFunction * the aggregation function used to combine the items emitted by the source * Observables * @return an Observable that emits items that are the result of combining the items emitted by * the source Observables by means of the given aggregation function * @see RxJava Wiki: combineLatest() */ public final static Observable combineLatest(Observable o1, Observable o2, Observable o3, Observable o4, Func4 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, combineFunction)); } /** * Combines five source Observables by emitting an item that aggregates the latest values of * each of the source Observables each time an item is received from any of the source * Observables, where this aggregation is defined by a specified function. *

* * * @param o1 * the first source Observable * @param o2 * the second source Observable * @param o3 * the third source Observable * @param o4 * the fourth source Observable * @param o5 * the fifth source Observable * @param combineFunction * the aggregation function used to combine the items emitted by the source * Observables * @return an Observable that emits items that are the result of combining the items emitted by * the source Observables by means of the given aggregation function * @see RxJava Wiki: combineLatest() */ public final static Observable combineLatest(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Func5 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, combineFunction)); } /** * Combines six source Observables by emitting an item that aggregates the latest values of each * of the source Observables each time an item is received from any of the source Observables, * where this aggregation is defined by a specified function. *

* * * @param o1 * the first source Observable * @param o2 * the second source Observable * @param o3 * the third source Observable * @param o4 * the fourth source Observable * @param o5 * the fifth source Observable * @param o6 * the sixth source Observable * @param combineFunction * the aggregation function used to combine the items emitted by the source * Observables * @return an Observable that emits items that are the result of combining the items emitted by * the source Observables by means of the given aggregation function * @see RxJava Wiki: combineLatest() */ public final static Observable combineLatest(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Func6 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, combineFunction)); } /** * Combines seven source Observables by emitting an item that aggregates the latest values of * each of the source Observables each time an item is received from any of the source * Observables, where this aggregation is defined by a specified function. *

* * * @param o1 * the first source Observable * @param o2 * the second source Observable * @param o3 * the third source Observable * @param o4 * the fourth source Observable * @param o5 * the fifth source Observable * @param o6 * the sixth source Observable * @param o7 * the seventh source Observable * @param combineFunction * the aggregation function used to combine the items emitted by the source * Observables * @return an Observable that emits items that are the result of combining the items emitted by * the source Observables by means of the given aggregation function * @see RxJava Wiki: combineLatest() */ public final static Observable combineLatest(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Func7 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, combineFunction)); } /** * Combines eight source Observables by emitting an item that aggregates the latest values of * each of the source Observables each time an item is received from any of the source * Observables, where this aggregation is defined by a specified function. *

* * * @param o1 * the first source Observable * @param o2 * the second source Observable * @param o3 * the third source Observable * @param o4 * the fourth source Observable * @param o5 * the fifth source Observable * @param o6 * the sixth source Observable * @param o7 * the seventh source Observable * @param o8 * the eighth source Observable * @param combineFunction * the aggregation function used to combine the items emitted by the source * Observables * @return an Observable that emits items that are the result of combining the items emitted by * the source Observables by means of the given aggregation function * @see RxJava Wiki: combineLatest() */ public final static Observable combineLatest(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Func8 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, combineFunction)); } /** * Combines nine source Observables by emitting an item that aggregates the latest values of * each of the source Observables each time an item is received from any of the source * Observables, where this aggregation is defined by a specified function. *

* * * @param o1 * the first source Observable * @param o2 * the second source Observable * @param o3 * the third source Observable * @param o4 * the fourth source Observable * @param o5 * the fifth source Observable * @param o6 * the sixth source Observable * @param o7 * the seventh source Observable * @param o8 * the eighth source Observable * @param o9 * the ninth source Observable * @param combineFunction * the aggregation function used to combine the items emitted by the source * Observables * @return an Observable that emits items that are the result of combining the items emitted by * the source Observables by means of the given aggregation function * @see RxJava Wiki: combineLatest() */ public final static Observable combineLatest(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Observable o9, Func9 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, o9, combineFunction)); } /** * Returns an Observable that emits the items emitted by each of the Observables emitted by an * Observable, one after the other, without interleaving them. *

* * * @param observables * an Observable that emits Observables * @return an Observable that emits items all of the items emitted by the Observables emitted by {@code observables}, one after the other, without interleaving them * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ public final static Observable concat(Observable> observables) { return create(OperationConcat.concat(observables)); } /** * Returns an Observable that emits the items emitted by two Observables, one after the other, * without interleaving them. *

* * * @param t1 * an Observable to be concatenated * @param t2 * an Observable to be concatenated * @return an Observable that emits items emitted by the two source Observables, one after the * other, without interleaving them * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2) { return create(OperationConcat.concat(t1, t2)); } /** * Returns an Observable that emits the items emitted by three Observables, one after the other, * without interleaving them. *

* * * @param t1 * an Observable to be concatenated * @param t2 * an Observable to be concatenated * @param t3 * an Observable to be concatenated * @return an Observable that emits items emitted by the three source Observables, one after the * other, without interleaving them * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3) { return create(OperationConcat.concat(t1, t2, t3)); } /** * Returns an Observable that emits the items emitted by four Observables, one after the other, * without interleaving them. *

* * * @param t1 * an Observable to be concatenated * @param t2 * an Observable to be concatenated * @param t3 * an Observable to be concatenated * @param t4 * an Observable to be concatenated * @return an Observable that emits items emitted by the four source Observables, one after the * other, without interleaving them * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4) { return create(OperationConcat.concat(t1, t2, t3, t4)); } /** * Returns an Observable that emits the items emitted by five Observables, one after the other, * without interleaving them. *

* * * @param t1 * an Observable to be concatenated * @param t2 * an Observable to be concatenated * @param t3 * an Observable to be concatenated * @param t4 * an Observable to be concatenated * @param t5 * an Observable to be concatenated * @return an Observable that emits items emitted by the five source Observables, one after the * other, without interleaving them * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5) { return create(OperationConcat.concat(t1, t2, t3, t4, t5)); } /** * Returns an Observable that emits the items emitted by six Observables, one after the other, * without interleaving them. *

* * * @param t1 * an Observable to be concatenated * @param t2 * an Observable to be concatenated * @param t3 * an Observable to be concatenated * @param t4 * an Observable to be concatenated * @param t5 * an Observable to be concatenated * @param t6 * an Observable to be concatenated * @return an Observable that emits items emitted by the six source Observables, one after the * other, without interleaving them * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6) { return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6)); } /** * Returns an Observable that emits the items emitted by seven Observables, one after the other, * without interleaving them. *

* * * @param t1 * an Observable to be concatenated * @param t2 * an Observable to be concatenated * @param t3 * an Observable to be concatenated * @param t4 * an Observable to be concatenated * @param t5 * an Observable to be concatenated * @param t6 * an Observable to be concatenated * @param t7 * an Observable to be concatenated * @return an Observable that emits items emitted by the seven source Observables, one after the * other, without interleaving them * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7) { return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6, t7)); } /** * Returns an Observable that emits the items emitted by eight Observables, one after the other, * without interleaving them. *

* * * @param t1 * an Observable to be concatenated * @param t2 * an Observable to be concatenated * @param t3 * an Observable to be concatenated * @param t4 * an Observable to be concatenated * @param t5 * an Observable to be concatenated * @param t6 * an Observable to be concatenated * @param t7 * an Observable to be concatenated * @param t8 * an Observable to be concatenated * @return an Observable that emits items emitted by the eight source Observables, one after the * other, without interleaving them * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8) { return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6, t7, t8)); } /** * Returns an Observable that emits the items emitted by nine Observables, one after the other, * without interleaving them. *

* * * @param t1 * an Observable to be concatenated * @param t2 * an Observable to be concatenated * @param t3 * an Observable to be concatenated * @param t4 * an Observable to be concatenated * @param t5 * an Observable to be concatenated * @param t6 * an Observable to be concatenated * @param t7 * an Observable to be concatenated * @param t8 * an Observable to be concatenated * @param t9 * an Observable to be concatenated * @return an Observable that emits items emitted by the nine source Observables, one after the * other, without interleaving them * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8, Observable t9) { return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6, t7, t8, t9)); } /** * Returns an Observable that calls an Observable factory to create its Observable for each new * Observer that subscribes. That is, for each subscriber, the actual Observable that subscriber * observs is determined by the factory function. *

* *

* The defer operator allows you to defer or delay emitting items from an Observable until such * time as an Observer subscribes to the Observable. This allows an {@link Observer} to easily * obtain updates or a refreshed version of the sequence. * * @param observableFactory * the Observable factory function to invoke for each {@link Observer} that * subscribes to the resulting Observable * @param * the type of the items emitted by the Observable * @return an Observable whose {@link Observer}s' subscriptions trigger an invocation of the * given Observable factory function * @see RxJava Wiki: defer() */ public final static Observable defer(Func0> observableFactory) { return create(OperationDefer.defer(observableFactory)); } /** * Returns an Observable that emits no items to the {@link Observer} and immediately invokes its {@link Observer#onCompleted onCompleted} method. *

* * * @param * the type of the items (ostensibly) emitted by the Observable * @return an Observable that emits no items to the {@link Observer} but immediately invokes the {@link Observer}'s {@link Observer#onCompleted() onCompleted} method * @see RxJava Wiki: empty() * @see MSDN: Observable.Empty */ public final static Observable empty() { return from(new ArrayList()); } /** * Returns an Observable that emits no items to the {@link Observer} and immediately invokes its {@link Observer#onCompleted onCompleted} method on the specified scheduler. *

* * * @param scheduler * the scheduler to use to call the {@link Observer#onCompleted onCompleted} method * @param * the type of the items (ostensibly) emitted by the Observable * @return an Observable that emits no items to the {@link Observer} but immediately invokes the {@link Observer}'s {@link Observer#onCompleted() onCompleted} method with the * specified {@code scheduler} * @see RxJava Wiki: empty() * @see MSDN: Observable.Empty Method (IScheduler) */ public final static Observable empty(Scheduler scheduler) { return Observable. empty().subscribeOn(scheduler); } /** * Returns an Observable that invokes an {@link Observer}'s {@link Observer#onError onError} method when the Observer subscribes to it. *

* * * @param exception * the particular Throwable to pass to {@link Observer#onError onError} * @param * the type of the items (ostensibly) emitted by the Observable * @return an Observable that invokes the {@link Observer}'s {@link Observer#onError onError} method when the Observer subscribes to it * @see RxJava Wiki: error() * @see MSDN: Observable.Throw */ public final static Observable error(Throwable exception) { return new ThrowObservable(exception); } /** * Returns an Observable that invokes an {@link Observer}'s {@link Observer#onError onError} method on the specified scheduler. *

* * * @param exception * the particular Throwable to pass to {@link Observer#onError onError} * @param scheduler * the scheduler on which to call {@link Observer#onError onError} * @param * the type of the items (ostensibly) emitted by the Observable * @return an Observable that invokes the {@link Observer}'s {@link Observer#onError onError} method, on the specified scheduler * @see RxJava Wiki: error() * @see MSDN: Observable.Throw */ public final static Observable error(Throwable exception, Scheduler scheduler) { return Observable. error(exception).subscribeOn(scheduler); } /** * Converts a {@link Future} into an Observable. *

* *

* You can convert any object that supports the {@link Future} interface into an Observable that * emits the return value of the {@link Future#get} method of that object, by passing the object * into the {@code from} method. *

* Important note: This Observable is blocking; you cannot unsubscribe from it. * * @param future * the source {@link Future} * @param * the type of object that the {@link Future} returns, and also the type of item to * be emitted by the resulting Observable * @return an Observable that emits the item from the source {@link Future} * @see RxJava Wiki: from() */ public final static Observable from(Future future) { return create(OperationToObservableFuture.toObservableFuture(future)); } /** * Converts a {@link Future} into an Observable, with a timeout on the Future. *

* *

* You can convert any object that supports the {@link Future} interface into an Observable that * emits the return value of the {@link Future#get} method of that object, by passing the object * into the {@code from} method. *

* Important note: This Observable is blocking; you cannot unsubscribe from it. * * @param future * the source {@link Future} * @param timeout * the maximum time to wait before calling {@code get()} * @param unit * the {@link TimeUnit} of the {@code timeout} argument * @param * the type of object that the {@link Future} returns, and also the type of item to * be emitted by the resulting Observable * @return an Observable that emits the item from the source {@link Future} * @see RxJava Wiki: from() */ public final static Observable from(Future future, long timeout, TimeUnit unit) { return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); } /** * Converts a {@link Future}, operating on a specified {@link Scheduler}, into an Observable. *

* *

* You can convert any object that supports the {@link Future} interface into an Observable that * emits the return value of the {@link Future#get} method of that object, by passing the object * into the {@code from} method. *

* * @param future * the source {@link Future} * @param scheduler * the {@link Scheduler} to wait for the Future on. Use a Scheduler such as {@link Schedulers#threadPoolForIO()} that can block and wait on the future. * @param * the type of object that the {@link Future} returns, and also the type of item to * be emitted by the resulting Observable * @return an Observable that emits the item from the source {@link Future} * @see RxJava Wiki: from() */ public final static Observable from(Future future, Scheduler scheduler) { return create(OperationToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler); } /** * Converts an {@link Iterable} sequence into an Observable that emits the items in the * sequence. *

* *

* Note: the entire iterable sequence is immediately emitted each time an {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned, it is not possible * to unsubscribe from the sequence before it completes. * * @param iterable * the source {@link Iterable} sequence * @param * the type of items in the {@link Iterable} sequence and the type of items to be * emitted by the resulting Observable * @return an Observable that emits each item in the source {@link Iterable} sequence * @see RxJava Wiki: from() */ public final static Observable from(Iterable iterable) { return create(new OperatorFromIterable(iterable)); } /** * Converts an {@link Iterable} sequence into an Observable that operates on the specified * scheduler, emitting each item from the sequence. *

* * * @param iterable * the source {@link Iterable} sequence * @param scheduler * the scheduler on which the Observable is to emit the items of the iterable * @param * the type of items in the {@link Iterable} sequence and the type of items to be * emitted by the resulting Observable * @return an Observable that emits each item in the source {@link Iterable} sequence, on the * specified scheduler * @see RxJava Wiki: from() * @see MSDN: Observable.ToObservable */ public final static Observable from(Iterable iterable, Scheduler scheduler) { return create(new OperatorFromIterable(iterable)).subscribeOn(scheduler); } /** * Converts an item into an Observable that emits that item. *

* *

* Note: the item is immediately emitted each time an {@link Observer} subscribes. Since this * occurs before the {@link Subscription} is returned, it is not possible to unsubscribe from * the sequence before it completes. * * @param t1 * the item * @param * the type of the item * @return an Observable that emits the item * @see RxJava Wiki: from() */ // suppress unchecked because we are using varargs inside the method public final static Observable from(T t1) { return from(Arrays.asList(t1)); } /** * Converts two items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} subscribes. Since * this occurs before the {@link Subscription} is returned, it is not possible to unsubscribe * from the sequence before it completes. * * @param t1 * first item * @param t2 * second item * @param * the type of these items * @return an Observable that emits each item * @see RxJava Wiki: from() * @deprecated Use {@link #from(Iterable)} instead such as {@code from(Arrays.asList(t1))} */ @Deprecated // suppress unchecked because we are using varargs inside the method public final static Observable from(T t1, T t2) { return from(Arrays.asList(t1, t2)); } /** * Converts three items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} subscribes. Since * this occurs before the {@link Subscription} is returned, it is not possible to unsubscribe * from the sequence before it completes. * * @param t1 * first item * @param t2 * second item * @param t3 * third item * @param * the type of these items * @return an Observable that emits each item * @see RxJava Wiki: from() * @deprecated Use {@link #from(Iterable)} instead such as {@code from(Arrays.asList(t1))}. */ @Deprecated // suppress unchecked because we are using varargs inside the method public final static Observable from(T t1, T t2, T t3) { return from(Arrays.asList(t1, t2, t3)); } /** * Converts four items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} subscribes. Since * this occurs before the {@link Subscription} is returned, it is not possible to unsubscribe * from the sequence before it completes. * * @param t1 * first item * @param t2 * second item * @param t3 * third item * @param t4 * fourth item * @param * the type of these items * @return an Observable that emits each item * @see RxJava Wiki: from() * @deprecated Use {@link #from(Iterable)} instead such as {@code from(Arrays.asList(t1))}. */ @Deprecated // suppress unchecked because we are using varargs inside the method public final static Observable from(T t1, T t2, T t3, T t4) { return from(Arrays.asList(t1, t2, t3, t4)); } /** * Converts five items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} subscribes. Since * this occurs before the {@link Subscription} is returned, it is not possible to unsubscribe * from the sequence before it completes. * * @param t1 * first item * @param t2 * second item * @param t3 * third item * @param t4 * fourth item * @param t5 * fifth item * @param * the type of these items * @return an Observable that emits each item * @see RxJava Wiki: from() * @deprecated Use {@link #from(Iterable)} instead such as {@code from(Arrays.asList(t1))}. */ @Deprecated // suppress unchecked because we are using varargs inside the method public final static Observable from(T t1, T t2, T t3, T t4, T t5) { return from(Arrays.asList(t1, t2, t3, t4, t5)); } /** * Converts six items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} subscribes. Since * this occurs before the {@link Subscription} is returned, it is not possible to unsubscribe * from the sequence before it completes. * * @param t1 * first item * @param t2 * second item * @param t3 * third item * @param t4 * fourth item * @param t5 * fifth item * @param t6 * sixth item * @param * the type of these items * @return an Observable that emits each item * @see RxJava Wiki: from() * @deprecated Use {@link #from(Iterable)} instead such as {@code from(Arrays.asList(t1))}. */ @Deprecated // suppress unchecked because we are using varargs inside the method public final static Observable from(T t1, T t2, T t3, T t4, T t5, T t6) { return from(Arrays.asList(t1, t2, t3, t4, t5, t6)); } /** * Converts seven items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} subscribes. Since * this occurs before the {@link Subscription} is returned, it is not possible to unsubscribe * from the sequence before it completes. * * @param t1 * first item * @param t2 * second item * @param t3 * third item * @param t4 * fourth item * @param t5 * fifth item * @param t6 * sixth item * @param t7 * seventh item * @param * the type of these items * @return an Observable that emits each item * @see RxJava Wiki: from() * @deprecated Use {@link #from(Iterable)} instead such as {@code from(Arrays.asList(t1))}. */ @Deprecated // suppress unchecked because we are using varargs inside the method public final static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7) { return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7)); } /** * Converts eight items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} subscribes. Since * this occurs before the {@link Subscription} is returned, it is not possible to unsubscribe * from the sequence before it completes. * * @param t1 * first item * @param t2 * second item * @param t3 * third item * @param t4 * fourth item * @param t5 * fifth item * @param t6 * sixth item * @param t7 * seventh item * @param t8 * eighth item * @param * the type of these items * @return an Observable that emits each item * @see RxJava Wiki: from() * @deprecated Use {@link #from(Iterable)} instead such as {@code from(Arrays.asList(t1))}. */ @Deprecated // suppress unchecked because we are using varargs inside the method public final static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) { return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8)); } /** * Converts nine items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} subscribes. Since * this occurs before the {@link Subscription} is returned, it is not possible to unsubscribe * from the sequence before it completes. * * @param t1 * first item * @param t2 * second item * @param t3 * third item * @param t4 * fourth item * @param t5 * fifth item * @param t6 * sixth item * @param t7 * seventh item * @param t8 * eighth item * @param t9 * ninth item * @param * the type of these items * @return an Observable that emits each item * @see RxJava Wiki: from() * @deprecated Use {@link #from(Iterable)} instead such as {@code from(Arrays.asList(t1))}. */ @Deprecated // suppress unchecked because we are using varargs inside the method public final static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9) { return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9)); } /** * Converts ten items into an Observable that emits those items. *

* *

* * @param t1 * first item * @param t2 * second item * @param t3 * third item * @param t4 * fourth item * @param t5 * fifth item * @param t6 * sixth item * @param t7 * seventh item * @param t8 * eighth item * @param t9 * ninth item * @param t10 * tenth item * @param * the type of these items * @return an Observable that emits each item * @see RxJava Wiki: from() * @deprecated Use {@link #from(Iterable)} instead such as {@code from(Arrays.asList(t1))}. */ @Deprecated // suppress unchecked because we are using varargs inside the method public final static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10) { return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10)); } /** * Converts an Array into an Observable that emits the items in the Array. *

* *

* Note: the entire array is immediately emitted each time an {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned, it is not possible * to unsubscribe from the sequence before it completes. * * @param items * the source array * @param * the type of items in the Array and the type of items to be emitted by the * resulting Observable * @return an Observable that emits each item in the source Array * @see RxJava Wiki: from() */ // @SafeVarargs // commenting out until we figure out if we can do Java7 compilation without breaking Android for just this feature public final static Observable from(T... t1) { return from(Arrays.asList(t1)); } /** * Converts an Array into an Observable that emits the items in the Array on a specified * scheduler. *

* *

* Note: the entire array is immediately emitted each time an {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned, it is not * possible to unsubscribe from the sequence before it completes. * * @param items * the source array * @param scheduler * the scheduler on which the Observable emits the items of the array * @param * the type of items in the Array and the type of items to be emitted by the * resulting Observable * @return an Observable that emits each item in the source Array * @see RxJava Wiki: from() */ public final static Observable from(T[] items, Scheduler scheduler) { return from(Arrays.asList(items), scheduler); } /** * Returns an Observable that emits a sequential number every specified interval of time. *

* * * @param interval * interval size in time units (see below) * @param unit * time units to use for the interval size * @return an Observable that emits a sequential number each time interval * @see RxJava Wiki: interval() * @see MSDN: Observable.Interval */ public final static Observable interval(long interval, TimeUnit unit) { return create(OperationInterval.interval(interval, unit)); } /** * Returns an Observable that emits a sequential number every specified interval of time, on a * specified scheduler. *

* * * @param interval * interval size in time units (see below) * @param unit * time units to use for the interval size * @param scheduler * the scheduler to use for scheduling the items * @return an Observable that emits a sequential number each time interval * @see RxJava Wiki: interval() * @see MSDN: Observable.Interval */ public final static Observable interval(long interval, TimeUnit unit, Scheduler scheduler) { return create(OperationInterval.interval(interval, unit, scheduler)); } /** * Returns an Observable that emits a single item and then completes. *

* *

* To convert any object into an Observable that emits that object, pass that object into the {@code just} method. *

* This is similar to the {@link #from(java.lang.Object[])} method, except that {@code from()} will convert an {@link Iterable} object into an Observable that emits each of the items in * the Iterable, one at a time, while the {@code just()} method converts an Iterable into an * Observable that emits the entire Iterable as a single item. * * @param value * the item to emit * @param * the type of that item * @return an Observable that emits {@code value} as a single item and then completes * @see RxJava Wiki: just() * @deprecated Use {@link #from(T)} */ @Deprecated public final static Observable just(T value) { return from(Arrays.asList((value))); } /** * Returns an Observable that emits a single item and then completes, on a specified scheduler. *

* *

* This is a scheduler version of {@link #just(Object)}. * * @param value * the item to emit * @param * the type of that item * @param scheduler * the scheduler to emit the single item on * @return an Observable that emits {@code value} as a single item and then completes, on a * specified scheduler * @see RxJava Wiki: just() * @deprecated Use {@link #from(T)} */ @Deprecated public final static Observable just(T value, Scheduler scheduler) { return from(Arrays.asList((value)), scheduler); } /** * Returns an Observable that emits the single item emitted by the source Observable with the * maximum numeric value. If there is more than one item with the same maximum value, it emits * the last-emitted of these. *

* * * @param source * an Observable to scan for the maximum emitted item * @return an Observable that emits this maximum item * @throws IllegalArgumentException * if the source is empty * @see RxJava Wiki: max() * @see MSDN: Observable.Max */ public final static > Observable max(Observable source) { return OperationMinMax.max(source); } /** * Flattens an Iterable of Observables into one Observable, without any transformation. *

* *

* You can combine the items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param sequences * the Iterable of Observables * @return an Observable that emits items that are the result of flattening the items emitted by * the Observables in the Iterable * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Iterable> sequences) { return merge(from(sequences)); } /** * Flattens an Iterable of Observables into one Observable, without any transformation, while * limiting the number of concurrent subscriptions to these Observables. *

* *

* You can combine the items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param sequences * the Iterable of Observables * @param maxConcurrent * the maximum number of Observables that may be subscribed to concurrently * @return an Observable that emits items that are the result of flattening the items emitted by * the Observables in the Iterable * @throw IllegalArgumentException if {@code maxConcurrent} is less than or equal to 0 * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Iterable> sequences, int maxConcurrent) { return merge(from(sequences), maxConcurrent); } /** * Flattens an Iterable of Observables into one Observable, without any transformation, while * limiting the number of concurrent subscriptions to these Observables, and subscribing to * these Observables on a specified scheduler. *

* *

* You can combine the items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param sequences * the Iterable of Observables * @param maxConcurrent * the maximum number of Observables that may be subscribed to concurrently * @param scheduler * the scheduler on which to traverse the Iterable of Observables * @return an Observable that emits items that are the result of flattening the items emitted by * the Observables in the Iterable * @throw IllegalArgumentException if {@code maxConcurrent} is less than or equal to 0 * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Iterable> sequences, int maxConcurrent, Scheduler scheduler) { return merge(from(sequences, scheduler), maxConcurrent); } /** * Flattens an Iterable of Observables into one Observable, without any transformation, * subscribing to these Observables on a specified scheduler. *

* *

* You can combine the items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param sequences * the Iterable of Observables * @param scheduler * the scheduler on which to traverse the Iterable of Observables * @return an Observable that emits items that are the result of flattening the items emitted by * the Observables in the Iterable * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Iterable> sequences, Scheduler scheduler) { return merge(from(sequences, scheduler)); } /** * Flattens an Observable that emits Observables into a single Observable that emits the items * emitted by those Observables, without any transformation. *

* *

* You can combine the items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param source * an Observable that emits Observables * @return an Observable that emits items that are the result of flattening the Observables * emitted by the {@code source} Observable * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Observable> source) { return source.bind(new OperatorMerge()); // any idea how to get these generics working?! } /** * Flattens an Observable that emits Observables into a single Observable that emits the items * emitted by those Observables, without any transformation, while limiting the maximum number * of concurrent subscriptions to these Observables. *

* *

* You can combine the items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param source * an Observable that emits Observables * @param maxConcurrent * the maximum number of Observables that may be subscribed to concurrently * @return an Observable that emits items that are the result of flattening the Observables * emitted by the {@code source} Observable * @throw IllegalArgumentException if {@code maxConcurrent} is less than or equal to 0 * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Observable> source, int maxConcurrent) { return source.bind(new OperatorMerge(maxConcurrent)); // any idea how to get these generics working?! } /** * Flattens two Observables into a single Observable, without any transformation. *

* *

* You can combine items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @return an Observable that emits all of the items emitted by the source Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2) { return merge(from(t1, t2)); } /** * Flattens three Observables into a single Observable, without any transformation. *

* *

* You can combine items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @return an Observable that emits all of the items emitted by the source Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2, Observable t3) { return merge(from(t1, t2, t3)); } /** * Flattens four Observables into a single Observable, without any transformation. *

* *

* You can combine items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @param t4 * an Observable to be merged * @return an Observable that emits all of the items emitted by the source Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4) { return merge(from(t1, t2, t3, t4)); } /** * Flattens five Observables into a single Observable, without any transformation. *

* *

* You can combine items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @param t4 * an Observable to be merged * @param t5 * an Observable to be merged * @return an Observable that emits all of the items emitted by the source Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5) { return merge(from(t1, t2, t3, t4, t5)); } /** * Flattens six Observables into a single Observable, without any transformation. *

* *

* You can combine items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @param t4 * an Observable to be merged * @param t5 * an Observable to be merged * @param t6 * an Observable to be merged * @return an Observable that emits all of the items emitted by the source Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6) { return merge(from(t1, t2, t3, t4, t5, t6)); } /** * Flattens seven Observables into a single Observable, without any transformation. *

* *

* You can combine items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @param t4 * an Observable to be merged * @param t5 * an Observable to be merged * @param t6 * an Observable to be merged * @param t7 * an Observable to be merged * @return an Observable that emits all of the items emitted by the source Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7) { return merge(from(t1, t2, t3, t4, t5, t6, t7)); } /** * Flattens eight Observables into a single Observable, without any transformation. *

* *

* You can combine items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @param t4 * an Observable to be merged * @param t5 * an Observable to be merged * @param t6 * an Observable to be merged * @param t7 * an Observable to be merged * @param t8 * an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @return an Observable that emits all of the items emitted by the source Observables * @see MSDN: Observable.Merge */ public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8) { return merge(from(t1, t2, t3, t4, t5, t6, t7, t8)); } /** * Flattens nine Observables into a single Observable, without any transformation. *

* *

* You can combine items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @param t4 * an Observable to be merged * @param t5 * an Observable to be merged * @param t6 * an Observable to be merged * @param t7 * an Observable to be merged * @param t8 * an Observable to be merged * @param t9 * an Observable to be merged * @return an Observable that emits all of the items emitted by the source Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ // suppress because the types are checked by the method signature before using a vararg public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8, Observable t9) { return merge(from(t1, t2, t3, t4, t5, t6, t7, t8, t9)); } /** * Flattens an array of Observables into one Observable, without any transformation. *

* *

* You can combine items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param sequences * the array of Observables * @return an Observable that emits all of the items emitted by the Observables in the array * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Observable[] sequences) { return merge(from(sequences)); } /** * Flattens an array of Observables into one Observable, without any transformation, traversing * the array on a specified scheduler. *

* *

* You can combine items emitted by multiple Observables so that they appear as a single * Observable, by using the {@code merge} method. * * @param sequences * the array of Observables * @param scheduler * the scheduler on which to traverse the array * @return an Observable that emits all of the items emitted by the Observables in the array * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public final static Observable merge(Observable[] sequences, Scheduler scheduler) { return merge(from(sequences, scheduler)); } /** * This behaves like {@link #merge(Observable)} except that if any of the merged Observables * notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain * from propagating that error notification until all of the merged Observables have finished * emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its Observers once. *

* This method allows an Observer to observe all successfully emitted items from all of the * source Observables without being interrupted by an error notification from one of them. * * @param source * an Observable that emits Observables * @return an Observable that emits all of the items emitted by the Observables emitted by the {@code source} Observable * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ public final static Observable mergeDelayError(Observable> source) { return create(OperationMergeDelayError.mergeDelayError(source)); } /** * This behaves like {@link #merge(Observable, Observable)} except that if any of the merged * Observables notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that error notification until all of the merged Observables * have finished emitting items. *

* *

* Even if both merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its Observers once. *

* This method allows an Observer to receive all successfully emitted items from each of the * source Observables without being interrupted by an error notification from one of them. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @return an Observable that emits all of the items that are emitted by the two source * Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2) { return create(OperationMergeDelayError.mergeDelayError(t1, t2)); } /** * This behaves like {@link #merge(Observable, Observable, Observable)} except that if any of * the merged Observables notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that error notification until all of * the merged Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its Observers once. *

* This method allows an Observer to receive all successfully emitted items from all of the * source Observables without being interrupted by an error notification from one of them. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @return an Observable that emits all of the items that are emitted by the source Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3)); } /** * This behaves like {@link #merge(Observable, Observable, Observable, Observable)} except that * if any of the merged Observables notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that error notification until all of * the merged Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its Observers once. *

* This method allows an Observer to receive all successfully emitted items from all of the * source Observables without being interrupted by an error notification from one of them. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @param t4 * an Observable to be merged * @return an Observable that emits all of the items that are emitted by the source Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4)); } /** * This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables notify of an error via {@link Observer#onError onError} * , {@code mergeDelayError} will refrain from propagating that * error notification until all of the merged Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its Observers once. *

* This method allows an Observer to receive all successfully emitted items from all of the * source Observables without being interrupted by an error notification from one of them. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @param t4 * an Observable to be merged * @param t5 * an Observable to be merged * @return an Observable that emits all of the items that are emitted by the source Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5)); } /** * This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables notify of an error via * {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that * error notification until all of the merged Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its Observers once. *

* This method allows an Observer to receive all successfully emitted items from all of the * source Observables without being interrupted by an error notification from one of them. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @param t4 * an Observable to be merged * @param t5 * an Observable to be merged * @param t6 * an Observable to be merged * @return an Observable that emits all of the items that are emitted by the source Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5, t6)); } /** * This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables notify of an error via * {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that * error notification until all of the merged Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its Observers once. *

* This method allows an Observer to receive all successfully emitted items from all of the * source Observables without being interrupted by an error notification from one of them. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @param t4 * an Observable to be merged * @param t5 * an Observable to be merged * @param t6 * an Observable to be merged * @param t7 * an Observable to be merged * @return an Observable that emits all of the items that are emitted by the source Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5, t6, t7)); } /** * This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables notify of an error * via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that * error notification until all of the merged Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its Observers once. *

* This method allows an Observer to receive all successfully emitted items from all of the * source Observables without being interrupted by an error notification from one of them. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @param t4 * an Observable to be merged * @param t5 * an Observable to be merged * @param t6 * an Observable to be merged * @param t7 * an Observable to be merged * @param t8 * an Observable to be merged * @return an Observable that emits all of the items that are emitted by the source Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5, t6, t7, t8)); } /** * This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables notify * of an error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that * error notification until all of the merged Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its Observers once. *

* This method allows an Observer to receive all successfully emitted items from all of the * source Observables without being interrupted by an error notification from one of them. * * @param t1 * an Observable to be merged * @param t2 * an Observable to be merged * @param t3 * an Observable to be merged * @param t4 * an Observable to be merged * @param t5 * an Observable to be merged * @param t6 * an Observable to be merged * @param t7 * an Observable to be merged * @param t8 * an Observable to be merged * @param t9 * an Observable to be merged * @return an Observable that emits all of the items that are emitted by the source Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public final static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8, Observable t9) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5, t6, t7, t8, t9)); } /** * Returns an Observable that emits the single numerically minimum item emitted by the source * Observable. If there is more than one such item, it returns the last-emitted one. *

* * * @param source * an Observable to determine the minimum item of * @return an Observable that emits the minimum item emitted by the source Observable * @throws IllegalArgumentException * if the source is empty * @see MSDN: Observable.Min */ public final static > Observable min(Observable source) { return OperationMinMax.min(source); } /** * Returns an Observable that never sends any items or notifications to an {@link Observer}. *

* *

* This Observable is useful primarily for testing purposes. * * @param * the type of items (not) emitted by the Observable * @return an Observable that never emits any items or sends any notifications to an {@link Observer} * @see RxJava Wiki: never() */ public final static Observable never() { return new NeverObservable(); } /** * Converts an {@code Observable>} into another {@code Observable>} whose emitted Observables emit the same items, but the number of such Observables is * restricted by {@code parallelObservables}. *

* For example, if the original {@code Observable>} emits 100 Observables and {@code parallelObservables} is 8, the items emitted by the 100 original Observables will be * distributed among 8 Observables emitted by the resulting Observable. *

* *

* This is a mechanism for efficiently processing n number of Observables on a smaller * m number of resources (typically CPU cores). * * @param parallelObservables * the number of Observables to merge into * @return an Observable of Observables constrained in number by {@code parallelObservables} * @see RxJava Wiki: parallelMerge() */ public final static Observable> parallelMerge(Observable> source, int parallelObservables) { return OperationParallelMerge.parallelMerge(source, parallelObservables); } /** * Converts an {@code Observable>} into another {@code Observable>} whose emitted Observables emit the same items, but the number of such Observables is * restricted by {@code parallelObservables}, and each runs on a defined Scheduler. *

* For example, if the original {@code Observable>} emits 100 Observables and {@code parallelObservables} is 8, the items emitted by the 100 original Observables will be * distributed among 8 Observables emitted by the resulting Observable. *

* *

* This is a mechanism for efficiently processing n number of Observables on a smaller * m number of resources (typically CPU cores). * * @param parallelObservables * the number of Observables to merge into * @param scheduler * the {@link Scheduler} to run each Observable on * @return an Observable of Observables constrained in number by {@code parallelObservables} * @see RxJava Wiki: parallelMerge() */ public final static Observable> parallelMerge(Observable> source, int parallelObservables, Scheduler scheduler) { return OperationParallelMerge.parallelMerge(source, parallelObservables, scheduler); } /** * Returns an Observable that emits a sequence of Integers within a specified range. *

* * * @param start * the value of the first Integer in the sequence * @param count * the number of sequential Integers to generate * @return an Observable that emits a range of sequential Integers * @see RxJava Wiki: range() * @see MSDN: Observable.Range */ public final static Observable range(int start, int count) { return from(Range.createWithCount(start, count)); } /** * Returns an Observable that emits a sequence of Integers within a specified range, on a * specified scheduler. *

* * * @param start * the value of the first Integer in the sequence * @param count * the number of sequential Integers to generate * @param scheduler * the scheduler to run the generator loop on * @return an Observable that emits a range of sequential Integers * @see RxJava Wiki: range() * @see MSDN: Observable.Range */ public final static Observable range(int start, int count, Scheduler scheduler) { return from(Range.createWithCount(start, count), scheduler); } /** * Returns an Observable that emits a Boolean value that indicates whether two Observable * sequences are the same by comparing the items emitted by each Observable pairwise. *

* * * @param first * the first Observable to compare * @param second * the second Observable to compare * @param * the type of items emitted by each Observable * @return an Observable that emits a Boolean value that indicates whether the two sequences are * the same * @see RxJava Wiki: sequenceEqual() */ public final static Observable sequenceEqual(Observable first, Observable second) { return sequenceEqual(first, second, new Func2() { @Override public final Boolean call(T first, T second) { if (first == null) { return second == null; } return first.equals(second); } }); } /** * Returns an Observable that emits a Boolean value that indicates whether two Observable * sequences are the same by comparing the items emitted by each Observable pairwise based on * the results of a specified equality function. *

* * * @param first * the first Observable to compare * @param second * the second Observable to compare * @param equality * a function used to compare items emitted by each Observable * @param * the type of items emitted by each Observable * @return an Observable that emits a Boolean value that indicates whether the two Observable * two sequences are the same according to the specified function * @see RxJava Wiki: sequenceEqual() */ public final static Observable sequenceEqual(Observable first, Observable second, Func2 equality) { return OperationSequenceEqual.sequenceEqual(first, second, equality); } /** * @deprecated use {@link #sumInteger} */ @Deprecated public final static Observable sum(Observable source) { return OperationSum.sum(source); } /** * Returns an Observable that emits the sum of all the Doubles emitted by the source Observable. *

* * * @param source * the source Observable to compute the sum of * @return an Observable that emits a single item: the sum of all the Doubles emitted by the * source Observable * @see RxJava Wiki: sumDouble() * @see MSDN: Observable.Sum */ public final static Observable sumDouble(Observable source) { return OperationSum.sumDoubles(source); } /** * Returns an Observable that emits the sum of all the Floats emitted by the source Observable. *

* * * @param source * the source Observable to compute the sum of * @return an Observable that emits a single item: the sum of all the Floats emitted by the * source Observable * @see RxJava Wiki: sumFloat() * @see MSDN: Observable.Sum */ public final static Observable sumFloat(Observable source) { return OperationSum.sumFloats(source); } /** * Returns an Observable that emits the sum of all the Integers emitted by the source * Observable. *

* * * @param source * source Observable to compute the sum of * @return an Observable that emits a single item: the sum of all the Integers emitted by the * source Observable * @see RxJava Wiki: sumInteger() * @see MSDN: Observable.Sum */ public final static Observable sumInteger(Observable source) { return OperationSum.sum(source); } /** * Returns an Observable that emits the sum of all the Longs emitted by the source Observable. *

* * * @param source * source Observable to compute the sum of * @return an Observable that emits a single item: the sum of all the Longs emitted by the * source Observable * @see RxJava Wiki: sumLong() * @see MSDN: Observable.Sum */ public final static Observable sumLong(Observable source) { return OperationSum.sumLongs(source); } /** * Given an Observable that emits Observables, returns an Observable that emits the items * emitted by the most recently emitted of those Observables. *

* * * @param sequenceOfSequences * the source Observable that emits Observables * @return an Observable that emits the items emitted by the Observable most recently emitted by * the source Observable * @see RxJava Wiki: switchOnNext() * @deprecated use {@link #switchOnNext} */ @Deprecated public final static Observable switchDo(Observable> sequenceOfSequences) { return create(OperationSwitch.switchDo(sequenceOfSequences)); } /** * Given an Observable that emits Observables, returns an Observable that emits the items * emitted by the most recently emitted of those Observables. *

* * * @param sequenceOfSequences * the source Observable that emits Observables * @return an Observable that emits the items emitted by the Observable most recently emitted by * the source Observable * @see RxJava Wiki: switchOnNext() * @see {@link #switchOnNext(Observable)} */ public final static Observable switchLatest(Observable> sequenceOfSequences) { return create(OperationSwitch.switchDo(sequenceOfSequences)); } /** * Given an Observable that emits Observables, returns an Observable that emits the items * emitted by the most recently emitted of those Observables. *

* * * @param sequenceOfSequences * the source Observable that emits Observables * @return an Observable that emits the items emitted by the Observable most recently emitted by * the source Observable * @see RxJava Wiki: switchOnNext() */ public final static Observable switchOnNext(Observable> sequenceOfSequences) { return create(OperationSwitch.switchDo(sequenceOfSequences)); } /** * @deprecated use {@link #synchronize()} or {@link #synchronize(Object)} */ @Deprecated public final static Observable synchronize(Observable source) { return create(OperationSynchronize.synchronize(source)); } /** * Return an Observable that emits a 0L after the {@code initialDelay} and ever increasing * numbers after each {@code period} of time thereafter. *

* * * @param initialDelay * the initial delay time to wait before emitting the first value of 0L * @param period * the period of time between emissions of the subsequent numbers * @param unit * the time unit for both {@code initialDelay} and {@code period} * @return an Observable that emits a 0L after the {@code initialDelay} and ever increasing * numbers after each {@code period} of time thereafter * @see RxJava Wiki: timer() * @see MSDN: Observable.Timer */ public final static Observable timer(long initialDelay, long period, TimeUnit unit) { return timer(initialDelay, period, unit, Schedulers.computation()); } /** * Return an Observable that emits a 0L after the {@code initialDelay} and ever increasing * numbers after each {@code period} of time thereafter, on a specified Scheduler. *

* * * @param initialDelay * the initial delay time to wait before emitting the first value of 0L * @param period * the period of time between emissions of the subsequent numbers * @param unit * the time unit for both {@code initialDelay} and {@code period} * @param scheduler * the scheduler on which the waiting happens and items are emitted * @return an Observable that emits a 0L after the {@code initialDelay} and ever increasing * numbers after each {@code period} of time thereafter, while running on the given {@code scheduler} * @see RxJava Wiki: timer() * @see MSDN: Observable.Timer */ public final static Observable timer(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { return create(new OperationTimer.TimerPeriodically(initialDelay, period, unit, scheduler)); } /** * Returns an Observable that emits one item after a specified delay, and then completes. *

* * * @param delay * the initial delay before emitting a single 0L * @param unit * time units to use for the delay * @see RxJava wiki: timer() */ public final static Observable timer(long delay, TimeUnit unit) { return timer(delay, unit, Schedulers.computation()); } /** * Returns an Observable that emits one item after a specified delay, on a specified scheduler, * and then completes. *

* * * @param delay * the initial delay before emitting a single 0L * @param unit * time units to use for the delay * @param scheduler * the scheduler to use for scheduling the item * @see RxJava wiki: timer() */ public final static Observable timer(long delay, TimeUnit unit, Scheduler scheduler) { return create(new OperationTimer.TimerOnce(delay, unit, scheduler)); } /** * Constructs an Observable that creates a dependent resource object. *

* * * @param resourceFactory * the factory function to create a resource object that depends on the Observable * @param observableFactory * the factory function to obtain an Observable * @return the Observable whose lifetime controls the lifetime of the dependent resource object * @see RxJava Wiki: using() * @see MSDN: Observable.Using */ public final static Observable using(Func0 resourceFactory, Func1> observableFactory) { return create(OperationUsing.using(resourceFactory, observableFactory)); } /** * Joins together the results from several patterns via their plans. *

* * * @param plans * a series of plans created by use of the {@link #then} operator on patterns * @return an Observable that emits the results from matching several patterns * @throws NullPointerException * if {@code plans} is null * @see RxJava Wiki: when() * @see MSDN: Observable.When */ public final static Observable when(Iterable> plans) { if (plans == null) { throw new NullPointerException("plans"); } return create(OperationJoinPatterns.when(plans)); } /** * Joins together the results from several patterns via their plans. *

* * * @param plans * a series of plans created by use of the {@link #then} operator on patterns * @return an Observable that emits the results from matching several patterns * @throws NullPointerException * if {@code plans} is null * @see RxJava Wiki: when() * @see MSDN: Observable.When */ public final static Observable when(Plan0... plans) { return create(OperationJoinPatterns.when(plans)); } /** * Joins the results from a pattern via its plan. *

* * * @param p1 * the plan to join, created by use of the {@link #then} operator on a pattern * @return an Observable that emits the results from matching a pattern * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public final static Observable when(Plan0 p1) { return create(OperationJoinPatterns.when(p1)); } /** * Joins together the results from two patterns via their plans. *

* * * @param p1 * a plan, created by use of the {@link #then} operator on a pattern * @param p2 * a plan, created by use of the {@link #then} operator on a pattern * @return an Observable that emits the results from matching two patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public final static Observable when(Plan0 p1, Plan0 p2) { return create(OperationJoinPatterns.when(p1, p2)); } /** * Joins together the results from three patterns via their plans. *

* * * @param p1 * a plan, created by use of the {@link #then} operator on a pattern * @param p2 * a plan, created by use of the {@link #then} operator on a pattern * @param p3 * a plan, created by use of the {@link #then} operator on a pattern * @return an Observable that emits the results from matching three patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public final static Observable when(Plan0 p1, Plan0 p2, Plan0 p3) { return create(OperationJoinPatterns.when(p1, p2, p3)); } /** * Joins together the results from four patterns via their plans. *

* * * @param p1 * a plan, created by use of the {@link #then} operator on a pattern * @param p2 * a plan, created by use of the {@link #then} operator on a pattern * @param p3 * a plan, created by use of the {@link #then} operator on a pattern * @param p4 * a plan, created by use of the {@link #then} operator on a pattern * @return an Observable that emits the results from matching four patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public final static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4) { return create(OperationJoinPatterns.when(p1, p2, p3, p4)); } /** * Joins together the results from five patterns via their plans. *

* * * @param p1 * a plan, created by use of the {@link #then} operator on a pattern * @param p2 * a plan, created by use of the {@link #then} operator on a pattern * @param p3 * a plan, created by use of the {@link #then} operator on a pattern * @param p4 * a plan, created by use of the {@link #then} operator on a pattern * @param p5 * a plan, created by use of the {@link #then} operator on a pattern * @return an Observable that emits the results from matching five patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public final static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5) { return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5)); } /** * Joins together the results from six patterns via their plans. *

* * * @param p1 * a plan, created by use of the {@link #then} operator on a pattern * @param p2 * a plan, created by use of the {@link #then} operator on a pattern * @param p3 * a plan, created by use of the {@link #then} operator on a pattern * @param p4 * a plan, created by use of the {@link #then} operator on a pattern * @param p5 * a plan, created by use of the {@link #then} operator on a pattern * @param p6 * a plan, created by use of the {@link #then} operator on a pattern * @return an Observable that emits the results from matching six patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public final static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6) { return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6)); } /** * Joins together the results from seven patterns via their plans. *

* * * @param p1 * a plan, created by use of the {@link #then} operator on a pattern * @param p2 * a plan, created by use of the {@link #then} operator on a pattern * @param p3 * a plan, created by use of the {@link #then} operator on a pattern * @param p4 * a plan, created by use of the {@link #then} operator on a pattern * @param p5 * a plan, created by use of the {@link #then} operator on a pattern * @param p6 * a plan, created by use of the {@link #then} operator on a pattern * @param p7 * a plan, created by use of the {@link #then} operator on a pattern * @return an Observable that emits the results from matching seven patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public final static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6, Plan0 p7) { return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7)); } /** * Joins together the results from eight patterns via their plans. *

* * * @param p1 * a plan, created by use of the {@link #then} operator on a pattern * @param p2 * a plan, created by use of the {@link #then} operator on a pattern * @param p3 * a plan, created by use of the {@link #then} operator on a pattern * @param p4 * a plan, created by use of the {@link #then} operator on a pattern * @param p5 * a plan, created by use of the {@link #then} operator on a pattern * @param p6 * a plan, created by use of the {@link #then} operator on a pattern * @param p7 * a plan, created by use of the {@link #then} operator on a pattern * @param p8 * a plan, created by use of the {@link #then} operator on a pattern * @return an Observable that emits the results from matching eight patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public final static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6, Plan0 p7, Plan0 p8) { return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8)); } /** * Joins together the results from nine patterns via their plans. *

* * * @param p1 * a plan, created by use of the {@link #then} operator on a pattern * @param p2 * a plan, created by use of the {@link #then} operator on a pattern * @param p3 * a plan, created by use of the {@link #then} operator on a pattern * @param p4 * a plan, created by use of the {@link #then} operator on a pattern * @param p5 * a plan, created by use of the {@link #then} operator on a pattern * @param p6 * a plan, created by use of the {@link #then} operator on a pattern * @param p7 * a plan, created by use of the {@link #then} operator on a pattern * @param p8 * a plan, created by use of the {@link #then} operator on a pattern * @param p9 * a plan, created by use of the {@link #then} operator on a pattern * @return an Observable that emits the results from matching nine patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public final static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6, Plan0 p7, Plan0 p8, Plan0 p9) { return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9)); } /** * Returns an Observable that emits the results of a function of your choosing applied to * combinations items emitted, in sequence, by an Iterable of other Observables. *

{@code zip} applies this function in strict sequence, so the first item emitted by the new * Observable will be the result of the function applied to the first item emitted by each of * the source Observables; the second item emitted by the new Observable will be the result of * the function applied to the second item emitted by each of those Observables; and so forth. *

* The resulting {@code Observable} returned from {@code zip} will invoke {@code onNext} as * many times as the number of {@code onNext} invokations of the source Observable that emits * the fewest items. *

* * * @param ws * an Iterable of source Observables * @param zipFunction * a function that, when applied to an item emitted by each of the source * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public final static Observable zip(Iterable> ws, FuncN zipFunction) { return create(OperationZip.zip(ws, zipFunction)); } /** * Returns an Observable that emits the results of a function of your choosing applied to * combinations of n items emitted, in sequence, by the n Observables emitted by * a specified Observable. *

{@code zip} applies this function in strict sequence, so the first item emitted by the new * Observable will be the result of the function applied to the first item emitted by each of * the Observables emitted by the source Observable; the second item emitted by the new * Observable will be the result of the function applied to the second item emitted by each of * those Observables; and so forth. *

* The resulting {@code Observable} returned from {@code zip} will invoke {@code onNext} as * many times as the number of {@code onNext} invokations of the source Observable that emits * the fewest items. *

* * * @param ws * an Observable of source Observables * @param zipFunction * a function that, when applied to an item emitted by each of the Observables * emitted by {@code ws}, results in an item that will be emitted by the resulting * Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public final static Observable zip(Observable> ws, final FuncN zipFunction) { return ws.toList().mergeMap(new Func1>, Observable>() { @Override public final Observable call(List> wsList) { return create(OperationZip.zip(wsList, zipFunction)); } }); } /** * Returns an Observable that emits the results of a function of your choosing applied to * combinations of two items emitted, in sequence, by two other Observables. *

* *

{@code zip} applies this function in strict sequence, so the first item emitted by the new * Observable will be the result of the function applied to the first item emitted by {@code o1} and the first item emitted by {@code o2}; the second item emitted by the new Observable will * be the result of the function applied to the second item emitted by {@code o1} and the second * item emitted by {@code o2}; and so forth. *

* The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of * the source Observable that emits the fewest items. * * @param o1 * the first source Observable * @param o2 * a second source Observable * @param zipFunction * a function that, when applied to an item emitted by each of the source * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public final static Observable zip(Observable o1, Observable o2, Func2 zipFunction) { return create(OperationZip.zip(o1, o2, zipFunction)); } /** * Returns an Observable that emits the results of a function of your choosing applied to * combinations of three items emitted, in sequence, by three other Observables. *

* *

{@code zip} applies this function in strict sequence, so the first item emitted by the new * Observable will be the result of the function applied to the first item emitted by {@code o1}, the first item emitted by {@code o2}, and the first item emitted by {@code o3}; * the second item emitted by the new Observable will be the result of the function applied to * the second item emitted by {@code o1}, the second item emitted by {@code o2}, and the second * item emitted by {@code o3}; and so forth. *

* The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of * the source Observable that emits the fewest items. * * @param o1 * the first source Observable * @param o2 * a second source Observable * @param o3 * a third source Observable * @param zipFunction * a function that, when applied to an item emitted by each of the source * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public final static Observable zip(Observable o1, Observable o2, Observable o3, Func3 zipFunction) { return create(OperationZip.zip(o1, o2, o3, zipFunction)); } /** * Returns an Observable that emits the results of a function of your choosing applied to * combinations of four items emitted, in sequence, by four other Observables. *

* *

{@code zip} applies this function in strict sequence, so the first item emitted by the new * Observable will be the result of the function applied to the first item emitted by {@code o1}, the first item emitted by {@code o2}, the first item emitted by {@code o3}, and * the first item emitted by {@code 04}; the second item emitted by the new Observable will be * the result of the function applied to the second item emitted by each of those Observables; * and so forth. *

* The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of * the source Observable that emits the fewest items. * * @param o1 * the first source Observable * @param o2 * a second source Observable * @param o3 * a third source Observable * @param o4 * a fourth source Observable * @param zipFunction * a function that, when applied to an item emitted by each of the source * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public final static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Func4 zipFunction) { return create(OperationZip.zip(o1, o2, o3, o4, zipFunction)); } /** * Returns an Observable that emits the results of a function of your choosing applied to * combinations of five items emitted, in sequence, by five other Observables. *

* *

{@code zip} applies this function in strict sequence, so the first item emitted by the new * Observable will be the result of the function applied to the first item emitted by {@code o1}, the first item emitted by {@code o2}, the first item emitted by {@code o3}, the * first item emitted by {@code o4}, and the first item emitted by {@code o5}; the second item * emitted by the new Observable will be the result of the function applied to the second item * emitted by each of those Observables; and so forth. *

* The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of * the source Observable that emits the fewest items. * * @param o1 * the first source Observable * @param o2 * a second source Observable * @param o3 * a third source Observable * @param o4 * a fourth source Observable * @param o5 * a fifth source Observable * @param zipFunction * a function that, when applied to an item emitted by each of the source * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public final static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Func5 zipFunction) { return create(OperationZip.zip(o1, o2, o3, o4, o5, zipFunction)); } /** * Returns an Observable that emits the results of a function of your choosing applied to * combinations of six items emitted, in sequence, by six other Observables. *

* *

{@code zip} applies this function in strict sequence, so the first item emitted by the new * Observable will be the result of the function applied to the first item emitted each source * Observable, the second item emitted by the new Observable will be the result of the function * applied to the second item emitted by each of those Observables, and so forth. *

* The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of * the source Observable that emits the fewest items. * * @param o1 * the first source Observable * @param o2 * a second source Observable * @param o3 * a third source Observable * @param o4 * a fourth source Observable * @param o5 * a fifth source Observable * @param o6 * a sixth source Observable * @param zipFunction * a function that, when applied to an item emitted by each of the source * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public final static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Func6 zipFunction) { return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, zipFunction)); } /** * Returns an Observable that emits the results of a function of your choosing applied to * combinations of seven items emitted, in sequence, by seven other Observables. *

* *

{@code zip} applies this function in strict sequence, so the first item emitted by the new * Observable will be the result of the function applied to the first item emitted each source * Observable, the second item emitted by the new Observable will be the result of the function * applied to the second item emitted by each of those Observables, and so forth. *

* The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of * the source Observable that emits the fewest items. * * @param o1 * the first source Observable * @param o2 * a second source Observable * @param o3 * a third source Observable * @param o4 * a fourth source Observable * @param o5 * a fifth source Observable * @param o6 * a sixth source Observable * @param o7 * a seventh source Observable * @param zipFunction * a function that, when applied to an item emitted by each of the source * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public final static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Func7 zipFunction) { return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, zipFunction)); } /** * Returns an Observable that emits the results of a function of your choosing applied to * combinations of eight items emitted, in sequence, by eight other Observables. *

* *

{@code zip} applies this function in strict sequence, so the first item emitted by the new * Observable will be the result of the function applied to the first item emitted each source * Observable, the second item emitted by the new Observable will be the result of the function * applied to the second item emitted by each of those Observables, and so forth. *

* The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of * the source Observable that emits the fewest items. * * @param o1 * the first source Observable * @param o2 * a second source Observable * @param o3 * a third source Observable * @param o4 * a fourth source Observable * @param o5 * a fifth source Observable * @param o6 * a sixth source Observable * @param o7 * a seventh source Observable * @param o8 * an eighth source Observable * @param zipFunction * a function that, when applied to an item emitted by each of the source * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public final static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Func8 zipFunction) { return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, zipFunction)); } /** * Returns an Observable that emits the results of a function of your choosing applied to * combinations of nine items emitted, in sequence, by nine other Observables. *

* *

{@code zip} applies this function in strict sequence, so the first item emitted by the new * Observable will be the result of the function applied to the first item emitted each source * Observable, the second item emitted by the new Observable will be the result of the function * applied to the second item emitted by each of those Observables, and so forth. *

* The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of * the source Observable that emits the fewest items. * * @param o1 * the first source Observable * @param o2 * a second source Observable * @param o3 * a third source Observable * @param o4 * a fourth source Observable * @param o5 * a fifth source Observable * @param o6 * a sixth source Observable * @param o7 * a seventh source Observable * @param o8 * an eighth source Observable * @param o9 * a ninth source Observable * @param zipFunction * a function that, when applied to an item emitted by each of the source * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public final static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Observable o9, Func9 zipFunction) { return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9, zipFunction)); } /** * Synonymous with {@code reduce()}. *

* * * @see RxJava Wiki: reduce() * @see #reduce(Func2) * @deprecated use #reduce(Func2) */ @Deprecated public final Observable aggregate(Func2 accumulator) { return reduce(accumulator); } /** * Synonymous with {@code reduce()}. *

* * * @see RxJava Wiki: reduce() * @see #reduce(Object, Func2) * @deprecated use #reduce(Object, Func2) */ @Deprecated public final Observable aggregate(R initialValue, Func2 accumulator) { return reduce(initialValue, accumulator); } /** * Returns an Observable that emits a Boolean that indicates whether all of the items emitted by * the source Observable satisfy a condition. *

* * * @param predicate * a function that evaluates an item and returns a Boolean * @return an Observable that emits {@code true} if all items emitted by the source Observable * satisfy the predicate; otherwise, {@code false} * @see RxJava Wiki: all() */ public final Observable all(Func1 predicate) { return create(OperationAll.all(this, predicate)); } /** * Returns a Pattern that matches when both Observables emit an item. *

* * * @param right * an Observable to match with the source Observable * @return a Pattern object that matches when both Observables emit an item * @throws NullPointerException * if {@code right} is null * @see RxJava Wiki: and() * @see MSDN: Observable.And */ public final Pattern2 and(Observable right) { return OperationJoinPatterns.and(this, right); } /** * Hides the identity of this Observable. Useful for instance when you have an implementation * of a subclass of Observable but you want to hide the properties and methods of this subclass * from whomever you are passing the Observable to. * * @return an Observable that hides the identity of this Observable */ public final Observable asObservable() { return create(new OperationAsObservable(this)); } /** * Returns an Observable that transforms items emitted by the source Observable into Doubles by * using a function you provide and then emits the Double average of the complete sequence of * transformed values. *

* * * @param valueExtractor * the function to transform an item emitted by the source Observable into a Double * @return an Observable that emits a single item: the Double average of the complete sequence * of items emitted by the source Observable when transformed into Doubles by the * specified function * @see RxJava Wiki: averageDouble() * @see MSDN: Observable.Average */ public final Observable averageDouble(Func1 valueExtractor) { return create(new OperationAverage.AverageDoubleExtractor(this, valueExtractor)); } /** * Returns an Observable that transforms items emitted by the source Observable into Floats by * using a function you provide and then emits the Float average of the complete sequence of * transformed values. *

* * * @param valueExtractor * the function to transform an item emitted by the source Observable into a Float * @return an Observable that emits a single item: the Float average of the complete sequence of * items emitted by the source Observable when transformed into Floats by the specified * function * @see RxJava Wiki: averageFloat() * @see MSDN: Observable.Average */ public final Observable averageFloat(Func1 valueExtractor) { return create(new OperationAverage.AverageFloatExtractor(this, valueExtractor)); } /** * Returns an Observable that transforms items emitted by the source Observable into Integers by * using a function you provide and then emits the Integer average of the complete sequence of * transformed values. *

* * * @param valueExtractor * the function to transform an item emitted by the source Observable into an Integer * @return an Observable that emits a single item: the Integer average of the complete sequence * of items emitted by the source Observable when transformed into Integers by the * specified function * @see RxJava Wiki: averageInteger() * @see MSDN: Observable.Average */ public final Observable averageInteger(Func1 valueExtractor) { return create(new OperationAverage.AverageIntegerExtractor(this, valueExtractor)); } /** * Returns an Observable that transforms items emitted by the source Observable into Longs by * using a function you provide and then emits the Long average of the complete sequence of * transformed values. *

* * * @param valueExtractor * the function to transform an item emitted by the source Observable into a Long * @return an Observable that emits a single item: the Long average of the complete sequence of * items emitted by the source Observable when transformed into Longs by the specified * function * @see RxJava Wiki: averageLong() * @see MSDN: Observable.Average */ public final Observable averageLong(Func1 valueExtractor) { return create(new OperationAverage.AverageLongExtractor(this, valueExtractor)); } /** * Returns an Observable that emits buffers of items it collects from the source Observable. * The resulting Observable emits connected, non-overlapping buffers. It emits the current * buffer and replaces it with a new buffer when the Observable produced by the specified {@code bufferClosingSelector} emits an item. It then uses the {@code bufferClosingSelector} to create a * new Observable to observe to indicate the end of the next buffer. *

* * * @param bufferClosingSelector * a {@link Func0} that produces an Observable for each buffer created. When * this {@code Observable} emits an item, {@code buffer()} emits the associated * buffer and replaces it with a new one. * @return an Observable that emits a connected, non-overlapping buffer of items from * the source Observable each time the current Observable created with the {@code bufferClosingSelector} argument emits an item * @see RxJava Wiki: buffer() */ public final Observable> buffer(Func0> bufferClosingSelector) { return create(OperationBuffer.buffer(this, bufferClosingSelector)); } /** * Returns an Observable that emits buffers of items it collects from the source Observable. * The resulting Observable emits connected, non-overlapping buffers, each containing {@code count} items. When the source Observable completes or encounters an error, the * resulting Observable emits the current buffer and propagates the notification from the source * Observable. *

* * * @param count * the maximum number of items in each buffer before it should be emitted * @return an Observable that emits connected, non-overlapping buffers, each containing * at most {@code count} items from the source Observable * @see RxJava Wiki: buffer() */ public final Observable> buffer(int count) { return create(OperationBuffer.buffer(this, count)); } /** * Returns an Observable that emits buffers of items it collects from the source Observable. * The resulting Observable emits buffers every {@code skip} items, each containing {@code count} items. When the source Observable completes or encounters an error, the * resulting Observable emits the current buffer and propagates the notification from the source * Observable. *

* * * @param count * the maximum size of each buffer before it should be emitted * @param skip * how many items emitted by the source Observable should be skipped before starting * a new buffer. Note that when {@code skip} and {@code count} are equal, this is the * same operation as {@link #buffer(int)}. * @return an Observable that emits buffers for every {@code skip} item from the source * Observable and containing at most {@code count} items * @see RxJava Wiki: buffer() */ public final Observable> buffer(int count, int skip) { return create(OperationBuffer.buffer(this, count, skip)); } /** * Returns an Observable that emits buffers of items it collects from the source Observable. * The resulting Observable starts a new buffer periodically, as determined by the {@code timeshift} argument. It emits each buffer after a fixed timespan, specified by the {@code timespan} * argument. When the source Observable completes or encounters an error, the * resulting Observable emits the current buffer and propagates the notification from the source * Observable. *

* * * @param timespan * the period of time each buffer collects items before it is emitted * @param timeshift * the period of time after which a new buffer will be created * @param unit * the unit of time that applies to the {@code timespan} and {@code timeshift} arguments * @return an Observable that emits new buffers of items emitted by the source * Observable periodically after a fixed timespan has elapsed * @see RxJava Wiki: buffer() */ public final Observable> buffer(long timespan, long timeshift, TimeUnit unit) { return create(OperationBuffer.buffer(this, timespan, timeshift, unit)); } /** * Returns an Observable that emits buffers of items it collects from the source Observable. * The resulting Observable starts a new buffer periodically, as determined by the {@code timeshift} argument, and on the specified {@code scheduler}. It emits each buffer * after a fixed timespan, specified by the {@code timespan} argument. When the source * Observable completes or encounters an error, the resulting Observable emits the current * buffer propagates the notification from the source Observable. *

* * * @param timespan * the period of time each buffer collects items before it is emitted * @param timeshift * the period of time after which a new buffer will be created * @param unit * the unit of time that applies to the {@code timespan} and {@code timeshift} arguments * @param scheduler * the {@link Scheduler} to use when determining the end and start of a buffer * @return an Observable that emits new buffers of items emitted by the source * Observable periodically after a fixed timespan has elapsed * @see RxJava Wiki: buffer() */ public final Observable> buffer(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) { return create(OperationBuffer.buffer(this, timespan, timeshift, unit, scheduler)); } /** * Returns an Observable that emits buffers of items it collects from the source Observable. * The resulting Observable emits connected, non-overlapping buffers, each of a fixed duration * specified by the {@code timespan} argument. When the source Observable completes or * encounters an error, the resulting Observable emits the current buffer and propagates the * notification from the source Observable. *

* * * @param timespan * the period of time each buffer collects items before it is emitted and replaced * with a new buffer * @param unit * the unit of time that applies to the {@code timespan} argument * @return an Observable that emits connected, non-overlapping buffers of items emitted * by the source Observable within a fixed duration * @see RxJava Wiki: buffer() */ public final Observable> buffer(long timespan, TimeUnit unit) { return create(OperationBuffer.buffer(this, timespan, unit)); } /** * Returns an Observable that emits buffers of items it collects from the source Observable. * The resulting Observable emits connected, non-overlapping buffers, each of a fixed duration * specified by the {@code timespan} argument or a maximum size specified by the {@code count} argument (whichever is reached first). When the source Observable completes or encounters an * error, the resulting Observable emits the current buffer and propagates the notification from * the source Observable. *

* * * @param timespan * the period of time each buffer collects items before it is emitted and replaced * with a new buffer * @param unit * the unit of time which applies to the {@code timespan} argument * @param count * the maximum size of each buffer before it is emitted * @return an Observable that emits connected, non-overlapping buffers of items emitted * by the source Observable, after a fixed duration or when the buffer reaches maximum * capacity (whichever occurs first) * @see RxJava Wiki: buffer() */ public final Observable> buffer(long timespan, TimeUnit unit, int count) { return create(OperationBuffer.buffer(this, timespan, unit, count)); } /** * Returns an Observable that emits buffers of items it collects from the source Observable. * The resulting Observable emits connected, non-overlapping buffers, each of a fixed duration * specified by the {@code timespan} argument as measured on the specified {@code scheduler}, or * a maximum size specified by the {@code count} argument (whichever is reached first). When the * source Observable completes or encounters an error, the resulting Observable emits the * current buffer and propagates the notification from the source Observable. *

* * * @param timespan * the period of time each buffer collects items before it is emitted and replaced * with a new buffer * @param unit * the unit of time which applies to the {@code timespan} argument * @param count * the maximum size of each buffer before it is emitted * @param scheduler * the {@link Scheduler} to use when determining the end and start of a buffer * @return an Observable that emits connected, non-overlapping buffers of items emitted * by the source Observable after a fixed duration or when the buffer reaches maximum * capacity (whichever occurs first) * @see RxJava Wiki: buffer() */ public final Observable> buffer(long timespan, TimeUnit unit, int count, Scheduler scheduler) { return create(OperationBuffer.buffer(this, timespan, unit, count, scheduler)); } /** * Returns an Observable that emits buffers of items it collects from the source Observable. * The resulting Observable emits connected, non-overlapping buffers, each of a fixed duration * specified by the {@code timespan} argument and on the specified {@code scheduler}. When the * source Observable completes or encounters an error, the resulting Observable emits the * current buffer and propagates the notification from the source Observable. *

* * * @param timespan * the period of time each buffer collects items before it is emitted and replaced * with a new buffer * @param unit * the unit of time which applies to the {@code timespan} argument * @param scheduler * the {@link Scheduler} to use when determining the end and start of a buffer * @return an Observable that emits connected, non-overlapping buffers of items emitted * by the source Observable within a fixed duration * @see RxJava Wiki: buffer() */ public final Observable> buffer(long timespan, TimeUnit unit, Scheduler scheduler) { return create(OperationBuffer.buffer(this, timespan, unit, scheduler)); } /** * Returns an Observable that emits buffers of items it collects from the source Observable. * The resulting Observable emits buffers that it creates when the specified {@code bufferOpenings} Observable emits an item, and closes when the Observable returned from * {@code bufferClosingSelector} emits an item. *

* * * @param bufferOpenings * the Observable that, when it emits an item, causes a new buffer to be * created * @param bufferClosingSelector * the {@link Func1} that is used to produce an Observable for every buffer * created. When this Observable emits an item, the associated buffer is * emitted. * @return an Observable that emits buffers, containing items from the source * Observable, that are created and closed when the specified Observables emit * items * @see RxJava Wiki: buffer() */ public final Observable> buffer(Observable bufferOpenings, Func1> bufferClosingSelector) { return create(OperationBuffer.buffer(this, bufferOpenings, bufferClosingSelector)); } /** * Returns an Observable that emits non-overlapping buffered items from the source Observable * each time the specified boundary Observable emits an item. *

* *

* Completion of either the source or the boundary Observable causes the returned Observable * to emit the latest buffer and complete. * * @param * the boundary value type (ignored) * @param boundary * the boundary Observable * @return an Observable that emits buffered items from the source Observable when the boundary * Observable emits an item * @see #buffer(rx.Observable, int) * @see RxJava Wiki: buffer() */ public final Observable> buffer(Observable boundary) { return create(OperationBuffer.bufferWithBoundaryObservable(this, boundary)); } /** * Returns an Observable that emits non-overlapping buffered items from the source Observable * each time the specified boundary Observable emits an item. *

* *

* Completion of either the source or the boundary Observable causes the returned Observable * to emit the latest buffer and complete. * * @param * the boundary value type (ignored) * @param boundary * the boundary Observable * @param initialCapacity * the initial capacity of each buffer chunk * @return an Observable that emits buffered items from the source Observable when the boundary * Observable emits an item * @see RxJava Wiki: buffer() * @see #buffer(rx.Observable, int) */ public final Observable> buffer(Observable boundary, int initialCapacity) { return create(OperationBuffer.bufferWithBoundaryObservable(this, boundary, initialCapacity)); } /** * This method has similar behavior to {@link #replay} except that this auto-subscribes to the * source Observable rather than returning a {@link ConnectableObservable}. *

* *

* This is useful when you want an Observable to cache responses and you can't control the * subscribe/unsubscribe behavior of all the {@link Observer}s. *

* When you call {@code cache()}, it does not yet subscribe to the source Observable. This only * happens when {@code subscribe} is called the first time on the Observable returned by {@code cache()}. *

* Note: You sacrifice the ability to unsubscribe from the origin when you use the {@code cache()} operator so be careful not to use this operator on Observables that emit an * infinite or very large number of items that will use up memory. * * @return an Observable that, when first subscribed to, caches all of its items and * notifications for the benefit of subsequent observers * @see RxJava Wiki: cache() */ public final Observable cache() { return create(OperationCache.cache(this)); } /** * Returns an Observable that emits the items emitted by the source Observable, converted to the * specified type. *

* * * @param klass * the target class type that the items emitted by the source Observable will be * converted to before being emitted by the resulting Observable * @return an Observable that emits each item from the source Observable after converting it to * the specified type * @see RxJava Wiki: cast() * @see MSDN: Observable.Cast */ public final Observable cast(final Class klass) { return bind(new OperatorCast(klass)); } /** * Collect values into a single mutable data structure. *

* This is a simplified version of {@code reduce} that does not need to return the state on each * pass. *

* * @param state * FIXME FIXME FIXME * @param collector * FIXME FIXME FIXME * @return FIXME FIXME FIXME */ public final Observable collect(R state, final Action2 collector) { Func2 accumulator = new Func2() { @Override public final R call(R state, T value) { collector.call(state, value); return state; } }; return reduce(state, accumulator); } /** * Returns a new Observable that emits items resulting from applying a function that you supply * to each item emitted by the source Observable, where that function returns an Observable, and * then emitting the items that result from concatinating those resulting Observables. *

* * * @param func * a function that, when applied to an item emitted by the source Observable, returns * an Observable * @return an Observable that emits the result of applying the transformation function to each * item emitted by the source Observable and concatinating the Observables obtained from * this transformation */ public final Observable concatMap(Func1> func) { return concat(map(func)); } /** * Returns an Observable that emits a Boolean that indicates whether the source Observable * emitted a specified item. *

* * * @param element * the item to search for in the emissions from the source Observable * @return an Observable that emits {@code true} if the specified item is emitted by the source * Observable, or {@code false} if the source Observable completes without emitting that * item * @see RxJava Wiki: contains() * @see MSDN: Observable.Contains */ public final Observable contains(final T element) { return exists(new Func1() { public final Boolean call(T t1) { return element == null ? t1 == null : element.equals(t1); } }); } /** * Returns an Observable emits the count of the total number of items emitted by the source * Observable. *

* * * @return an Observable that emits a single item: the number of elements emitted by the * source Observable * @see RxJava Wiki: count() * @see MSDN: Observable.Count * @see #longCount() */ public final Observable count() { return reduce(0, new Func2() { @Override public final Integer call(Integer t1, T t2) { return t1 + 1; } }); } /** * Return an Observable that mirrors the source Observable, except that it drops items emitted * by the source Observable that are followed by another item within a computed debounce * duration. *

* * * @param * the debounce value type (ignored) * @param debounceSelector * function to retrieve a sequence that indicates the throttle duration for each item * @return an Observable that omits items emitted by the source Observable that are followed by * another item within a computed debounce duration */ public final Observable debounce(Func1> debounceSelector) { return create(OperationDebounce.debounceSelector(this, debounceSelector)); } /** * Return an Observable that mirrors the source Observable, except that it drops items emitted * by the source Observable that are followed by newer items before a timeout value expires. * The timer resets on each emission. *

* Note: If items keep being emitted by the source Observable faster than the timeout * then no items will be emitted by the resulting Observable. *

* *

* Information on debounce vs throttle: *

*

* * @param timeout * the time each item has to be "the most recent" of those emitted by the source * Observable to ensure that it's not dropped * @param unit * the {@link TimeUnit} for the timeout * @return an Observable that filters out items from the source Observable that are too * quickly followed by newer items * @see RxJava Wiki: debounce() * @see #throttleWithTimeout(long, TimeUnit) */ public final Observable debounce(long timeout, TimeUnit unit) { return create(OperationDebounce.debounce(this, timeout, unit)); } /** * Return an Observable that mirrors the source Observable, except that it drops items emitted * by the source Observable that are followed by newer items before a timeout value expires on * a specified Scheduler. The timer resets on each emission. *

* Note: If items keep being emitted by the source Observable faster than the timeout * then no items will be emitted by the resulting Observable. *

* *

* Information on debounce vs throttle: *

*

* * @param timeout * the time each item has to be "the most recent" of those emitted by the source * Observable to ensure that it's not dropped * @param unit * the unit of time for the specified timeout * @param scheduler * the {@link Scheduler} to use internally to manage the timers that handle the * timeout for each item * @return an Observable that filters out items from the source Observable that are too * quickly followed by newer items * @see RxJava Wiki: debounce() * @see #throttleWithTimeout(long, TimeUnit, Scheduler) */ public final Observable debounce(long timeout, TimeUnit unit, Scheduler scheduler) { return create(OperationDebounce.debounce(this, timeout, unit, scheduler)); } /** * Returns an Observable that emits the items emitted by the source Observable or a specified * default item if the source Observable is empty. *

* * * @param defaultValue * the item to emit if the source Observable emits no items * @return an Observable that emits either the specified default item if the source Observable * emits no items, or the items emitted by the source Observable * @see RxJava Wiki: defaultIfEmpty() * @see MSDN: Observable.DefaultIfEmpty */ public final Observable defaultIfEmpty(T defaultValue) { return create(OperationDefaultIfEmpty.defaultIfEmpty(this, defaultValue)); } /** * Returns an Observable that delays the subscription to and emissions from the souce Observable * via another Observable on a per-item basis. *

* *

* Note: the resulting Observable will immediately propagate any {@code onError} notification from the source Observable. * * @param * the subscription delay value type (ignored) * @param * the item delay value type (ignored) * @param subscriptionDelay * a function that returns an Observable that triggers the subscription to the source * Observable once it emits any item * @param itemDelay * a function that returns an Observable for each item emitted by the source * Observable, which is then used to delay the emission of that item by the resulting * Observable until the Observable returned from {@code itemDelay} emits an item * @return an Observable that delays the subscription and emissions of the source Observable via * another Observable on a per-item basis */ public final Observable delay( Func0> subscriptionDelay, Func1> itemDelay) { return create(OperationDelay.delay(this, subscriptionDelay, itemDelay)); } /** * Returns an Observable that delays the emissions of the source Observable via another * Observable on a per-item basis. *

* *

* Note: the resulting Observable will immediately propagate any {@code onError} notification from the source Observable. * * @param * the item delay value type (ignored) * @param itemDelay * a function that returns an Observable for each item emitted by the source * Observable, which is then used to delay the emission of that item by the resulting * Observable until the Observable returned from {@code itemDelay} emits an item * @return an Observable that delays the emissions of the source Observable via another * Observable on a per-item basis */ public final Observable delay(Func1> itemDelay) { return create(OperationDelay.delay(this, itemDelay)); } /** * Returns an Observable that emits the items emitted by the source Observable shifted forward * in time by a specified delay. Error notifications from the source Observable are not delayed. *

* * * @param delay * the delay to shift the source by * @param unit * the {@link TimeUnit} in which {@code period} is defined * @return the source Observable shifted in time by the specified delay * @see RxJava Wiki: delay() * @see MSDN: Observable.Delay */ public final Observable delay(long delay, TimeUnit unit) { return OperationDelay.delay(this, delay, unit, Schedulers.computation()); } /** * Returns an Observable that emits the items emitted by the source Observable shifted forward * in time by a specified delay. Error notifications from the source Observable are not delayed. *

* * * @param delay * the delay to shift the source by * @param unit * the {@link TimeUnit} in which {@code period} is defined * @param scheduler * the {@link Scheduler} to use for delaying * @return the source Observable shifted in time by the specified delay * @see RxJava Wiki: delay() * @see MSDN: Observable.Delay */ public final Observable delay(long delay, TimeUnit unit, Scheduler scheduler) { return OperationDelay.delay(this, delay, unit, scheduler); } /** * Return an Observable that delays the subscription to the source Observable by a given amount * of time. *

* * * @param delay * the time to delay the subscription * @param unit * the time unit * @return an Observable that delays the subscription to the source Observable by the given * amount */ public final Observable delaySubscription(long delay, TimeUnit unit) { return delaySubscription(delay, unit, Schedulers.computation()); } /** * Return an Observable that delays the subscription to the source Observable by a given amount * of time, both waiting and subscribing on a given Scheduler. *

* * * @param delay * the time to delay the subscription * @param unit * the time unit of {@code delay} * @param scheduler * the scheduler on which the waiting and subscription will happen * @return an Observable that delays the subscription to the source Observable by a given * amount, waiting and subscribing on the given Scheduler */ public final Observable delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) { return create(OperationDelay.delaySubscription(this, delay, unit, scheduler)); } /** * Returns an Observable that reverses the effect of {@link #materialize materialize} by * transforming the {@link Notification} objects emitted by the source Observable into the items * or notifications they represent. *

* * * @return an Observable that emits the items and notifications embedded in the {@link Notification} objects emitted by the source Observable * @throws Throwable * if the source Observable is not of type {@code Observable>} * @see RxJava Wiki: dematerialize() * @see MSDN: Observable.dematerialize */ @SuppressWarnings("unchecked") public final Observable dematerialize() { return create(OperationDematerialize.dematerialize((Observable>) this)); } /** * Returns an Observable that emits all items emitted by the source Observable that are * distinct. *

* * * @return an Observable that emits only those items emitted by the source Observable that are * distinct from each other * @see RxJava Wiki: distinct() * @see MSDN: Observable.distinct */ public final Observable distinct() { return create(OperationDistinct.distinct(this)); } /** * Returns an Observable that emits all items emitted by the source Observable that are distinct * according to a key selector function. *

* * * @param keySelector * a function that projects an emitted item to a key value that is used to decide * whether an item is distinct from another one or not * @return an Observable that emits those items emitted by the source Observable that have * distinct keys * @see RxJava Wiki: distinct() * @see MSDN: Observable.distinct */ public final Observable distinct(Func1 keySelector) { return create(OperationDistinct.distinct(this, keySelector)); } /** * Returns an Observable that emits all items emitted by the source Observable that are distinct * from their immediate predecessors. *

* * * @return an Observable that emits those items from the source Observable that are distinct * from their immediate predecessors * @see RxJava Wiki: distinctUntilChanged() * @see MSDN: Observable.distinctUntilChanged */ public final Observable distinctUntilChanged() { return create(OperationDistinctUntilChanged.distinctUntilChanged(this)); } /** * Returns an Observable that emits all items emitted by the source Observable that are distinct * from their immediate predecessors, according to a key selector function. *

* * * @param keySelector * a function that projects an emitted item to a key value that is used to decide * whether an item is distinct from another one or not * @return an Observable that emits those items from the source Observable whose keys are * distinct from those of their immediate predecessors * @see RxJava Wiki: distinctUntilChanged() * @see MSDN: Observable.distinctUntilChanged */ public final Observable distinctUntilChanged(Func1 keySelector) { return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector)); } /** * Modifies an Observable so that it invokes an action when it calls {@code onCompleted}. *

* * * @param onCompleted * the action to invoke when the source Observable calls {@code onCompleted} * @return the source Observable with the side-effecting behavior applied * @see RxJava Wiki: doOnCompleted() * @see MSDN: Observable.Do */ public final Observable doOnCompleted(final Action0 onCompleted) { Observer observer = new Observer() { @Override public final void onCompleted() { onCompleted.call(); } @Override public final void onError(Throwable e) { } @Override public final void onNext(T args) { } }; return create(OperationDoOnEach.doOnEach(this, observer)); } /** * Modifies an Observable so that it invokes an action for each item it emits. *

* * * @param observer * the action to invoke for each item emitted by the source Observable * @return the source Observable with the side-effecting behavior applied * @see RxJava Wiki: doOnEach() * @see MSDN: Observable.Do */ public final Observable doOnEach(final Action1> onNotification) { Observer observer = new Observer() { @Override public final void onCompleted() { onNotification.call(new Notification()); } @Override public final void onError(Throwable e) { onNotification.call(new Notification(e)); } @Override public final void onNext(T v) { onNotification.call(new Notification(v)); } }; return create(OperationDoOnEach.doOnEach(this, observer)); } /** * Modifies an Observable so that it notifies an Observer for each item it emits. *

* * * @param observer * the action to invoke for each item emitted by the source Observable * @return the source Observable with the side-effecting behavior applied * @see RxJava Wiki: doOnEach() * @see MSDN: Observable.Do */ public final Observable doOnEach(Observer observer) { return create(OperationDoOnEach.doOnEach(this, observer)); } /** * Modifies an Observable so that it invokes an action if it calls {@code onError}. *

* * * @param onError * the action to invoke if the source Observable calls {@code onError} * @return the source Observable with the side-effecting behavior applied * @see RxJava Wiki: doOnError() * @see MSDN: Observable.Do */ public final Observable doOnError(final Action1 onError) { Observer observer = new Observer() { @Override public final void onCompleted() { } @Override public final void onError(Throwable e) { onError.call(e); } @Override public final void onNext(T args) { } }; return create(OperationDoOnEach.doOnEach(this, observer)); } /** * Modifies an Observable so that it invokes an action when it calls {@code onNext}. *

* * * @param onNext * the action to invoke when the source Observable calls {@code onNext} * @return the source Observable with the side-effecting behavior applied * @see RxJava Wiki: doOnNext() * @see MSDN: Observable.Do */ public final Observable doOnNext(final Action1 onNext) { Observer observer = new Observer() { @Override public final void onCompleted() { } @Override public final void onError(Throwable e) { } @Override public final void onNext(T args) { onNext.call(args); } }; return create(OperationDoOnEach.doOnEach(this, observer)); } /** * Returns an Observable that emits the single item at a specified index in a sequence of * emissions from a source Observbable. *

* * * @param index * the zero-based index of the item to retrieve * @return an Observable that emits a single item: the item at the specified position in the * sequence of those emitted by the source Observable * @throws IndexOutOfBoundsException * if {@code index} is greater than or equal to the number of items emitted by the * source Observable * @throws IndexOutOfBoundsException * if {@code index} is less than 0 * @see RxJava Wiki: elementAt() */ public final Observable elementAt(int index) { return create(OperationElementAt.elementAt(this, index)); } /** * Returns an Observable that emits the item found at a specified index in a sequence of * emissions from a source Observable, or a default item if that index is out of range. *

* * * @param index * the zero-based index of the item to retrieve * @param defaultValue * the default item * @return an Observable that emits the item at the specified position in the sequence emitted * by the source Observable, or the default item if that index is outside the bounds of * the source sequence * @throws IndexOutOfBoundsException * if {@code index} is less than 0 * @see RxJava Wiki: elementAtOrDefault() */ public final Observable elementAtOrDefault(int index, T defaultValue) { return create(OperationElementAt.elementAtOrDefault(this, index, defaultValue)); } /** * Returns an Observable that emits {@code true} if any item emitted by the source Observable * satisfies a specified condition, otherwise {@code false}. Note: this always emits {@code false} if the source Observable is empty. *

* *

* In Rx.Net this is the {@code any} operator but we renamed it in RxJava to better match Java * naming idioms. * * @param predicate * the condition to test items emitted by the source Observable * @return an Observable that emits a Boolean that indicates whether any item emitted by the * source Observable satisfies the {@code predicate} * @see RxJava Wiki: exists() * @see MSDN: Observable.Any Note: the description in this page was wrong at the time of this writing. */ public final Observable exists(Func1 predicate) { return create(OperationAny.exists(this, predicate)); } /** * Filter items emitted by an Observable. *

* * * @param predicate * a function that evaluates the items emitted by the source Observable, returning {@code true} if they pass the filter * @return an Observable that emits only those items emitted by the source Observable that the * filter evaluates as {@code true} * @see RxJava Wiki: filter() */ public final Observable filter(Func1 predicate) { return create(OperationFilter.filter(this, predicate)); } /** * Registers an {@link Action0} to be called when this Observable invokes either {@link Observer#onCompleted onCompleted} or {@link Observer#onError onError}. *

* * * @param action * an {@link Action0} to be invoked when the source Observable finishes * @return an Observable that emits the same items as the source Observable, then invokes the {@link Action0} * @see RxJava Wiki: finallyDo() * @see MSDN: Observable.Finally */ public final Observable finallyDo(Action0 action) { return create(OperationFinally.finallyDo(this, action)); } /** * Returns an Observable that emits only the very first item emitted by the source Observable, * or raises an {@code IllegalArgumentException} if the source Observable is empty. *

* * * @return an Observable that emits only the very first item emitted by the source Observable, * or raises an {@code IllegalArgumentException} if the source Observable is empty * @see RxJava Wiki: first() * @see MSDN: {@code Observable.firstAsync()} */ public final Observable first() { return take(1).single(); } /** * Returns an Observable that emits only the very first item emitted by the source Observable * that satisfies a specified condition, or raises an {@code IllegalArgumentException} if no * such items are emitted. *

* * * @param predicate * the condition that an item emitted by the source Observable has to satisfy * @return an Observable that emits only the very first item emitted by the source Observable * that satisfies the {@code predicate}, or raises an {@code IllegalArgumentException} if no such items are emitted * @see RxJava Wiki: first() * @see MSDN: {@code Observable.firstAsync()} */ public final Observable first(Func1 predicate) { return takeFirst(predicate).single(); } /** * Returns an Observable that emits only the very first item emitted by the source Observable, * or a default item if the source Observable completes without emitting anything. *

* * * @param defaultValue * the default item to emit if the source Observable doesn't emit anything * @return an Observable that emits only the very first item from the source, or a default item * if the source Observable completes without emitting any items * @see RxJava Wiki: firstOrDefault() * @see MSDN: {@code Observable.firstOrDefaultAsync()} */ public final Observable firstOrDefault(T defaultValue) { return take(1).singleOrDefault(defaultValue); } /** * Returns an Observable that emits only the very first item emitted by the source Observable * that satisfies a specified condition, or a default item if the source Observable emits no * such items. *

* * * @param predicate * the condition any item emitted by the source Observable has to satisfy * @param defaultValue * the default item to emit if the source Observable doesn't emit anything that * satisfies the {@code predicate} * @return an Observable that emits only the very first item emitted by the source Observable * that satisfies the {@code predicate}, or a default item if the source Observable * emits no such items * @see RxJava Wiki: firstOrDefault() * @see MSDN: {@code Observable.firstOrDefaultAsync()} */ public final Observable firstOrDefault(T defaultValue, Func1 predicate) { return takeFirst(predicate).singleOrDefault(defaultValue); } /** * Returns an Observable that emits items based on applying a function that you supply to each * item emitted by the source Observable, where that function returns an Observable, and then * merging those resulting Observables and emitting the results of this merger. *

* *

* Note: {@code mapMany} and {@code flatMap} are equivalent. * * @param func * a function that, when applied to an item emitted by the source Observable, * returns an Observable * @return an Observable that emits the result of applying the transformation function to each * item emitted by the source Observable and merging the results of the Observables * obtained from this transformation * @see RxJava Wiki: flatMap() * @see #mapMany(Func1) */ public final Observable flatMap(Func1> func) { return mergeMap(func); } /** * Groups the items emitted by an Observable according to a specified criterion, and emits these * grouped items as {@link GroupedObservable}s, one {@code GroupedObservable} per group. *

* * * @param keySelector * a function that extracts the key for each item * @param * the key type * @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a * unique key value and each of which emits those items from the source Observable that * share that key value * @see RxJava Wiki: groupBy */ public final Observable> groupBy(final Func1 keySelector) { return bind(new OperatorGroupBy(keySelector)); } /** * Groups the items emitted by an Observable according to a specified criterion, and emits these * grouped items, transformed by a selector, within {@link GroupedObservable}s, one {@code GroupedObservable} per group. *

* * * @param keySelector * a function that extracts the key from an item * @param elementSelector * a function to map a source item to an item emitted by a {@link GroupedObservable} * @param * the key type * @param * the type of items emitted by the resulting {@link GroupedObservable}s * @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a * unique key value and emits transformed items corresponding to items from the source * Observable that share that key value * @see RxJava Wiki: groupBy */ public final Observable> groupBy(final Func1 keySelector, final Func1 elementSelector) { return null; } /** * Groups the items emitted by an Observable according to a specified key selector function * until the duration Observable expires for the key. *

* * * @param keySelector * a function to extract the key for each item * @param durationSelector * a function to signal the expiration of a group * @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a * key value and each of which emits all items emitted by the source Observable during * that key's duration that share that same key value * @see RxJava Wiki: groupByUntil() * @see MSDN: Observable.GroupByUntil */ public final Observable> groupByUntil(Func1 keySelector, Func1, ? extends Observable> durationSelector) { return groupByUntil(keySelector, Functions. identity(), durationSelector); } /** * Groups the items emitted by an Observable (transformed by a selector) according to a * specified key selector function until the duration Observable expires for the key. *

* * * @param keySelector * a function to extract the key for each item * @param valueSelector * a function to map each item emitted by the source Observable to an item emitted by * one of the resulting {@link GroupedObservable}s * @param durationSelector * a function to signal the expiration of a group * @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a * key value and each of which emits all items emitted by the source Observable during * that key's duration that share that same key value, transformed by the value selector * @see RxJava Wiki: groupByUntil() * @see MSDN: Observable.GroupByUntil */ public final Observable> groupByUntil(Func1 keySelector, Func1 valueSelector, Func1, ? extends Observable> durationSelector) { return create(new OperationGroupByUntil(this, keySelector, valueSelector, durationSelector)); } /** * Return an Observable that correlates two Observables when they overlap in time and groups the * results. *

* * * @param right * the other Observable to correlate items from the source Observable with * @param leftDuration * a function that returns an Observable whose emissions indicate the duration of the * values of the source Observable * @param rightDuration * a function that returns an Observable whose emissions indicate the duration of the * values of the {@code right} Observable * @param resultSelector * a function that takes an item emitted by each Observable and returns the value to * be emitted by the resulting Observable * @return an Observable that emits items based on combining those items emitted by the source * Observables whose durations overlap * @see RxJava Wiiki: groupJoin * @see MSDN: Observable.GroupJoin */ public final Observable groupJoin(Observable right, Func1> leftDuration, Func1> rightDuration, Func2, ? extends R> resultSelector) { return create(new OperationGroupJoin(this, right, leftDuration, rightDuration, resultSelector)); } /** * Ignores all items emitted by the source Observable and only calls {@code onCompleted} or {@code onError}. *

* * * @return an empty Observable that only calls {@code onCompleted} or {@code onError}, based on * which one is called by the source Observable * @see RxJava Wiki: ignoreElements() * @see MSDN: Observable.IgnoreElements */ public final Observable ignoreElements() { return filter(alwaysFalse()); } /** * Returns an Observable that emits {@code true} if the source Observable is empty, otherwise {@code false}. *

* In Rx.Net this is negated as the {@code any} operator but we renamed this in RxJava to better * match Java naming idioms. *

* * * @return an Observable that emits a Boolean * @see RxJava Wiki: isEmpty() * @see MSDN: Observable.Any */ public final Observable isEmpty() { return create(OperationAny.isEmpty(this)); } /** * Correlates the items emitted by two Observables based on overlapping durations. *

* * * @param right * the second Observable to join items from * @param leftDurationSelector * a function to select a duration for each item emitted by the source Observable, * used to determine overlap * @param rightDurationSelector * a function to select a duration for each item emitted by the {@code right} Observable, used to determine overlap * @param resultSelector * a function that computes an item to be emitted by the resulting Observable for any * two overlapping items emitted by the two Observables * @return an Observable that emits items correlating to items emitted by the source Observables * that have overlapping durations * @see RxJava Wiki: join() * @see MSDN: Observable.Join */ public final Observable join(Observable right, Func1> leftDurationSelector, Func1> rightDurationSelector, Func2 resultSelector) { return create(new OperationJoin(this, right, leftDurationSelector, rightDurationSelector, resultSelector)); } /** * Returns an Observable that emits the last item emitted by the source Observable or notifies * observers of an {@code IllegalArgumentException} if the source Observable is empty. *

* * * @return an Observable that emits the last item from the source Observable or notifies * observers of an error * @see RxJava Wiki: last() * @see MSDN: {@code Observable.lastAsync()} */ public final Observable last() { return takeLast(1).single(); } /** * Returns an Observable that emits only the last item emitted by the source Observable that * satisfies a given condition, or an {@code IllegalArgumentException} if no such items are * emitted. *

* * * @param predicate * the condition any source emitted item has to satisfy * @return an Observable that emits only the last item satisfying the given condition from the * source, or an {@code IllegalArgumentException} if no such items are emitted * @throws IllegalArgumentException * if no items that match the predicate are emitted by the source Observable * @see RxJava Wiki: last() * @see MSDN: {@code Observable.lastAsync()} */ public final Observable last(Func1 predicate) { return filter(predicate).takeLast(1).single(); } /** * Returns an Observable that emits only the last item emitted by the source Observable, or a * default item if the source Observable completes without emitting any items. *

* * * @param defaultValue * the default item to emit if the source Observable is empty * @return an Observable that emits only the last item emitted by the source Observable, or a * default item if the source Observable is empty * @see RxJava Wiki: lastOrDefault() * @see MSDN: {@code Observable.lastOrDefaultAsync()} */ public final Observable lastOrDefault(T defaultValue) { return takeLast(1).singleOrDefault(defaultValue); } /** * Returns an Observable that emits only the last item emitted by the source Observable that * satisfies a specified condition, or a default item if no such item is emitted by the source * Observable. *

* * * @param defaultValue * the default item to emit if the source Observable doesn't emit anything that * satisfies the specified {@code predicate} * @param predicate * the condition any item emitted by the source Observable has to satisfy * @return an Observable that emits only the last item emitted by the source Observable that * satisfies the given condition, or a default item if no such item is emitted by the * source Observable * @see RxJava Wiki: lastOrDefault() * @see MSDN: {@code Observable.lastOrDefaultAsync()} */ public final Observable lastOrDefault(T defaultValue, Func1 predicate) { return filter(predicate).takeLast(1).singleOrDefault(defaultValue); } /** * Returns an Observable that counts the total number of items emitted by the source Observable * and emits this count as a 64-bit Long. *

* * * @return an Observable that emits a single item: the number of items emitted by the source * Observable as a 64-bit Long item * @see RxJava Wiki: count() * @see MSDN: Observable.LongCount * @see #count() */ public final Observable longCount() { return reduce(0L, new Func2() { @Override public final Long call(Long t1, T t2) { return t1 + 1; } }); } /** * Returns an Observable that applies a specified function to each item emitted by the source * Observable and emits the results of these function applications. *

* * * @param func * a function to apply to each item emitted by the Observable * @return an Observable that emits the items from the source Observable, transformed by the * specified function * @see RxJava Wiki: map() * @see MSDN: Observable.Select */ public final Observable map(Func1 func) { return bind(new OperatorMap(func)); } /** * Returns a new Observable by applying a function that you supply to each item emitted by the * source Observable, where that function returns an Observable, and then merging those * resulting Observables and emitting the results of this merger. *

* *

* Note: {@code mapMany} and {@code flatMap} are equivalent. * * @param func * a function that, when applied to an item emitted by the source Observable, returns * an Observable * @return an Observable that emits the result of applying the transformation function to each * item emitted by the source Observable and merging the results of the Observables * obtained from these transformations * @see RxJava Wiki: mapMany() * @see #flatMap(Func1) * @deprecated use {@link #flatMap(Func1)} */ @Deprecated public final Observable mapMany(Func1> func) { return mergeMap(func); } /** * Turns all of the emissions and notifications from a source Observable into emissions marked * with their original types within {@link Notification} objects. *

* * * @return an Observable that emits items that are the result of materializing the items and * notifications of the source Observable * @see RxJava Wiki: materialize() * @see MSDN: Observable.materialize */ public final Observable> materialize() { return create(OperationMaterialize.materialize(this)); } /** * Returns an Observable that emits the maximum item emitted by the source Observable, according * to the specified comparator. If there is more than one item with the same maximum value, it * emits the last-emitted of these. *

* * * @param comparator * the comparer used to compare items * @return an Observable that emits the maximum item emitted by the source Observable, according * to the specified comparator * @throws IllegalArgumentException * if the source is empty * @see RxJava Wiki: max() * @see MSDN: Observable.Max */ public final Observable max(Comparator comparator) { return OperationMinMax.max(this, comparator); } /** * Returns an Observable that emits a List of items emitted by the source Observable that have * the maximum key value. For a source Observable that emits no items, the resulting Observable * emits an empty List. *

* * * @param selector * this function accepts an item emitted by the source Observable and returns a key * @return an Observable that emits a List of those items emitted by the source Observable that * had the largest key value of all of the emitted items * @see RxJava Wiki: maxBy() * @see MSDN: Observable.MaxBy */ public final > Observable> maxBy(Func1 selector) { return OperationMinMax.maxBy(this, selector); } /** * Returns an Observable that emits a List of items emitted by the source Observable that have * the maximum key value according to a specified comparator. For a source Observable that emits * no items, the resulting Observable emits an empty List. *

* * * @param selector * this function accepts an item emitted by the source Observable and returns a key * @param comparator * the comparator used to compare key values * @return an Observable that emits a List of those items emitted by the source Observable that * had the largest key value of all of the emitted items according to the specified * comparator * @see RxJava Wiki: maxBy() * @see MSDN: Observable.MaxBy */ public final Observable> maxBy(Func1 selector, Comparator comparator) { return OperationMinMax.maxBy(this, selector, comparator); } /** * Returns an Observable that emits the results of applying a specified function to each item * emitted by the source Observable, where that function returns an Observable, and then merging * those resulting Observables and emitting the results of this merger. *

* * * @param func * a function that, when applied to an item emitted by the source Observable, returns * an Observable * @return an Observable that emits the result of applying the transformation function to each * item emitted by the source Observable and merging the results of the Observables * obtained from these transformations * @see RxJava Wiki: flatMap() * @see #flatMap(Func1) */ public final Observable mergeMap(Func1> func) { return merge(map(func)); } /** * Returns an Observable that applies a function to each item emitted or notification raised by * the source Observable and then flattens the Observables returned from these functions and * emits the resulting items. *

* * * @param * the result type * @param onNext * a function that returns an Observable to merge for each item emitted by the source * Observable * @param onError * a function that returns an Observable to merge for an onError notification from * the source Observable * @param onCompleted * a function that returns an Observable to merge for an onCompleted notification * from the source Observable * @return an Observable that emits the results of merging the Observables returned from * applying the specified functions to the emissions and notifications of the source * Observable */ public final Observable mergeMap( Func1> onNext, Func1> onError, Func0> onCompleted) { return create(OperationFlatMap.flatMap(this, onNext, onError, onCompleted)); } /** * Returns an Observable that emits the results of a specified function to the pair of values * emitted by the source Observable and a specified collection Observable. *

* * * @param * the type of items emitted by the collection Observable * @param * the type of items emitted by the resulting Observable * @param collectionSelector * a function that returns an Observable for each item emitted by the source * Observable * @param resultSelector * a function that combines one item emitted by each of the source and collection * Observables and returns an item to be emitted by the resulting Observable * @return an Observable that emits the results of applying a function to a pair of values * emitted by the source Observable and the collection Observable */ public final Observable mergeMap(Func1> collectionSelector, Func2 resultSelector) { return create(OperationFlatMap.flatMap(this, collectionSelector, resultSelector)); } /** * Returns an Observable that merges each item emitted by the source Observable with the values * in an Iterable corresponding to that item that is generated by a selector. *

* * * @param * the type of item emitted by the resulting Observable * @param collectionSelector * a function that returns an Iterable sequence of values for when given an item * emitted by the source Observable * @return an Observable that emits the results of merging the items emitted by the source * Observable with the values in the Iterables corresponding to those items, as * generated by {@code collectionSelector} */ public final Observable mergeMapIterable(Func1> collectionSelector) { return merge(map(OperationFlatMap.flatMapIterableFunc(collectionSelector))); } /** * Returns an Observable that emits the results of applying a function to the pair of values * from the source Observable and an Iterable corresponding to that item that is generated by * a selector. *

* * * @param * the collection element type * @param * the type of item emited by the resulting Observable * @param collectionSelector * a function that returns an Iterable sequence of values for each item emitted by * the source Observable * @param resultSelector * a function that returns an item based on the item emitted by the source Observable * and the Iterable returned for that item by the {@code collectionSelector} * @return an Observable that emits the items returned by {@code resultSelector} for each item * in the source Observable */ public final Observable mergeMapIterable(Func1> collectionSelector, Func2 resultSelector) { return mergeMap(OperationFlatMap.flatMapIterableFunc(collectionSelector), resultSelector); } /** * Returns an Observable that emits the minimum item emitted by the source Observable, according * to a specified comparator. If there is more than one such item, it returns the last-emitted * one. *

* * * @param comparator * the comparer used to compare elements * @return an Observable that emits the minimum item emitted by the source Observable according * to the specified comparator * @throws IllegalArgumentException * if the source is empty * @see RxJava Wiki: min() * @see MSDN: Observable.Min */ public final Observable min(Comparator comparator) { return OperationMinMax.min(this, comparator); } /** * Returns an Observable that emits a List of items emitted by the source Observable that have * the minimum key value. For a source Observable that emits no items, the resulting Observable * emits an empty List. *

* * * @param selector * the key selector function * @return an Observable that emits a List of all of the items from the source Observable that * had the lowest key value of any items emitted by the source Observable * @see RxJava Wiki: minBy() * @see MSDN: Observable.MinBy */ public final > Observable> minBy(Func1 selector) { return OperationMinMax.minBy(this, selector); } /** * Returns an Observable that emits a List of items emitted by the source Observable that have * the minimum key value according to a given comparator function. For a source Observable that * emits no items, the resulting Observable emits an empty List. *

* * * @param selector * the key selector function * @param comparator * the comparator used to compare key values * @return an Observable that emits a List of all of the items from the source Observable that * had the lowest key value of any items emitted by the source Observable according to * the specified comparator * @see RxJava Wiki: minBy() * @see MSDN: Observable.MinBy */ public final Observable> minBy(Func1 selector, Comparator comparator) { return OperationMinMax.minBy(this, selector, comparator); } /** * Returns an Observable that emits items produced by multicasting the source Observable within * a selector function. * * @param subjectFactory * the {@link Subject} factory * @param selector * the selector function, which can use the multicasted source Observable subject to * the policies enforced by the created {@code Subject} * @return an Observable that emits the items produced by multicasting the source Observable * within a selector function * @see RxJava: Observable.publish() and Observable.multicast() * @see MSDN: Observable.Multicast */ public final Observable multicast( final Func0> subjectFactory, final Func1, ? extends Observable> selector) { return OperationMulticast.multicast(this, subjectFactory, selector); } /** * Returns a {@link ConnectableObservable} that upon connection causes the source Observable to * push results into the specified subject. * * @param subject * the {@link Subject} for the {@link ConnectableObservable} to push source items * into * @param * the type of items emitted by the resulting {@code ConnectableObservable} * @return a {@link ConnectableObservable} that upon connection causes the source Observable to * push results into the specified {@link Subject} * @see RxJava Wiki: Observable.publish() and Observable.multicast() */ public final ConnectableObservable multicast(Subject subject) { return OperationMulticast.multicast(this, subject); } /** * Modify the source Observable so that it asynchronously notifies {@link Observer}s on the * specified {@link Scheduler}. *

* * * @param scheduler * the {@link Scheduler} to notify {@link Observer}s on * @return the source Observable modified so that its {@link Observer}s are notified on the * specified {@link Scheduler} * @see RxJava Wiki: observeOn() */ public final Observable observeOn(Scheduler scheduler) { return create(OperationObserveOn.observeOn(this, scheduler)); } /** * Filters the items emitted by an Observable, only emitting those of the specified type. *

* * * @param klass * the class type to filter the items emitted by the source Observable * @return an Observable that emits items from the source Observable of type {@code klass} * @see RxJava Wiki: ofType() * @see MSDN: Observable.OfType */ public final Observable ofType(final Class klass) { return filter(new Func1() { public final Boolean call(T t) { return klass.isInstance(t); } }).cast(klass); } /** * Instruct an Observable to pass control to another Observable rather than invoking {@link Observer#onError onError} if it encounters an error. *

* *

* By default, when an Observable encounters an error that prevents it from emitting the * expected item to its {@link Observer}, the Observable invokes its Observer's {@code onError} method, and then quits without invoking any more of its Observer's methods. The * {@code onErrorResumeNext} method changes this behavior. If you pass a function that returns * an Observable ({@code resumeFunction}) to {@code onErrorResumeNext}, if the original * Observable encounters an error, instead of invoking its Observer's {@code onError} method, it * will instead relinquish control to the Observable returned from {@code resumeFunction}, which * will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In * such a case, because no Observable necessarily invokes {@code onError}, the Observer may * never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. * * @param resumeFunction * a function that returns an Observable that will take over if the source Observable * encounters an error * @return the original Observable, with appropriately modified behavior * @see RxJava Wiki: onErrorResumeNext() */ public final Observable onErrorResumeNext(final Func1> resumeFunction) { return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(this, resumeFunction)); } /** * Instruct an Observable to pass control to another Observable rather than invoking {@link Observer#onError onError} if it encounters an error. *

* *

* By default, when an Observable encounters an error that prevents it from emitting the * expected item to its {@link Observer}, the Observable invokes its Observer's {@code onError} method, and then quits without invoking any more of its Observer's methods. The * {@code onErrorResumeNext} method changes this behavior. If you pass another Observable * ({@code resumeSequence}) to an Observable's {@code onErrorResumeNext} method, if the original * Observable encounters an error, instead of invoking its Observer's {@code onError} method, it * will instead relinquish control to {@code resumeSequence} which will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case, because no * Observable necessarily invokes {@code onError}, the Observer may never know that an error * happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. * * @param resumeSequence * a function that returns an Observable that will take over if the source Observable * encounters an error * @return the original Observable, with appropriately modified behavior * @see RxJava Wiki: onErrorResumeNext() */ public final Observable onErrorResumeNext(final Observable resumeSequence) { return create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(this, resumeSequence)); } /** * Instruct an Observable to emit an item (returned by a specified function) rather than * invoking {@link Observer#onError onError} if it encounters an error. *

* *

* By default, when an Observable encounters an error that prevents it from emitting the * expected item to its {@link Observer}, the Observable invokes its Observer's {@code onError} method, and then quits without invoking any more of its Observer's methods. The * {@code onErrorReturn} method changes this behavior. If you pass a function * ({@code resumeFunction}) to an Observable's {@code onErrorReturn} method, if the original * Observable encounters an error, instead of invoking its Observer's {@code onError} method, it * will instead emit the return value of {@code resumeFunction}. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. * * @param resumeFunction * a function that returns an item that the new Observable will emit if the source * Observable encounters an error * @return the original Observable with appropriately modified behavior * @see RxJava Wiki: onErrorReturn() */ public final Observable onErrorReturn(Func1 resumeFunction) { return create(OperationOnErrorReturn.onErrorReturn(this, resumeFunction)); } /** * Instruct an Observable to pass control to another Observable rather than invoking {@link Observer#onError onError} if it encounters an {@link java.lang.Exception}. *

* This differs from {@link #onErrorResumeNext} in that this one does not handle {@link java.lang.Throwable} or {@link java.lang.Error} but lets those continue through. *

* *

* By default, when an Observable encounters an exception that prevents it from emitting the * expected item to its {@link Observer}, the Observable invokes its Observer's {@code onError} method, and then quits without invoking any more of its Observer's methods. The * {@code onExceptionResumeNext} method changes this behavior. If you pass another Observable * ({@code resumeSequence}) to an Observable's {@code onExceptionResumeNext} method, if the * original Observable encounters an exception, instead of invoking its Observer's {@code onError} method, it will instead relinquish control to {@code resumeSequence} which * will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In * such a case, because no Observable necessarily invokes {@code onError}, the Observer may * never know that an exception happened. *

* You can use this to prevent exceptions from propagating or to supply fallback data should * exceptions be encountered. * * @param resumeSequence * a function that returns an Observable that will take over if the source Observable * encounters an exception * @return the original Observable, with appropriately modified behavior * @see RxJava Wiki: onExceptionResumeNext() */ public final Observable onExceptionResumeNext(final Observable resumeSequence) { return create(OperationOnExceptionResumeNextViaObservable.onExceptionResumeNextViaObservable(this, resumeSequence)); } /** * Perform work on the source {@code Observable} in parallel by sharding it on a {@link Schedulers#threadPoolForComputation()} {@link Scheduler}, and return the resulting {@code Observable}. *

* * * @param f * a {@link Func1} that applies Observable operators to {@code Observable} in * parallel and returns an {@code Observable} * @return an Observable that emits the results of applying {@link Func1} to the items emitted * by the source Observable * @see RxJava Wiki: parallel() */ public final Observable parallel(Func1, Observable> f) { return bind(new OperatorParallel(f, Schedulers.computation())); } /** * Perform work on the source {@code Observable} in parallel by sharding it on a {@link Scheduler}, and return the resulting {@code Observable}. *

* * * @param f * a {@link Func1} that applies Observable operators to {@code Observable} in * parallel and returns an {@code Observable} * @param s * a {@link Scheduler} to perform the work on * @return an Observable that emits the results of applying {@link Func1} to the items emitted * by the source Observable * @see RxJava Wiki: parallel() */ public final Observable parallel(final Func1, Observable> f, final Scheduler s) { return bind(new OperatorParallel(f, s)); } /** * Protects against errors being thrown from Observer implementations and ensures * onNext/onError/onCompleted contract compliance. *

* See https://github.com/Netflix/RxJava/issues/216 for a discussion on "Guideline 6.4: Protect * calls to user code from within an operator" */ private Subscription protectivelyWrapAndSubscribe(Observer o) { SafeObservableSubscription subscription = new SafeObservableSubscription(); return subscription.wrap(subscribe(new SafeObserver(subscription, o))); } /** * Returns a {@link ConnectableObservable}, which waits until its {@link ConnectableObservable#connect connect} method is called before it begins emitting * items to those {@link Observer}s that have subscribed to it. *

* * * @return a {@link ConnectableObservable} that upon connection causes the source Observable to * emit items to its {@link Observer}s * @see RxJava Wiki: publish() */ public final ConnectableObservable publish() { return OperationMulticast.multicast(this, PublishSubject. create()); } /** * Returns an Observable that emits the results of invoking a specified selector on items * emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying sequence. *

* * * @param * the type of items emitted by the resulting Observable * @param selector * a function that can use the multicasted source sequence as many times as needed, * without causing multiple subscriptions to the source sequence. Subscribers to the * given source will receive all notifications of the source from the time of the * subscription forward. * @return an Observable that emits the results of invoking the selector on the items emitted by * a {@link ConnectableObservable} that shares a single subscription to the underlying * sequence */ public final Observable publish(Func1, ? extends Observable> selector) { return multicast(new Func0>() { @Override public final Subject call() { return PublishSubject.create(); } }, selector); } /** * Returns an Observable that emits the results of invoking a specified selector on items * emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable that shares a single subscription to the underlying sequence and starts * with {@code initialValue}. *

* * * @param * the type of items emitted by the resulting Observable * @param selector * a function that can use the multicasted source sequence as many times as needed, * without causing multiple subscriptions to the source Observable. Subscribers to * the source will receive all notifications of the source from the time of the * subscription forward * @param initialValue * the initial value of the underlying {@link BehaviorSubject} * @return an Observable that emits {@code initialValue} followed by the results of invoking the * selector on a {@ConnectableObservable} that shares a single subscription to the * underlying sequence */ public final Observable publish(Func1, ? extends Observable> selector, final T initialValue) { return multicast(new Func0>() { @Override public final Subject call() { return BehaviorSubject.create(initialValue); } }, selector); } /** * Returns a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable and starts with {@code initialValue}. *

* * * @param initialValue * the initial value of the underlying {@link BehaviorSubject} * @return a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable and starts with {@code initialValue} */ public final ConnectableObservable publish(T initialValue) { return OperationMulticast.multicast(this, BehaviorSubject. create(initialValue)); } /** * Returns a {@link ConnectableObservable} that emits only the last item emitted by the source * Observable. *

* * * @return a {@link ConnectableObservable} that emits only the last item emitted by the source * Observable * @see RxJava Wiki: publishLast() */ public final ConnectableObservable publishLast() { return OperationMulticast.multicast(this, AsyncSubject. create()); } /** * Retusna an Observable that emits items that are results of invoking a specified selector on * items emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying sequence but contains only its last emission. *

* * * @param * the type of items emitted by the resulting Observable * @param selector * a function that can use the multicasted source sequence as many times as needed, * without causing multiple subscriptions to the source Observable. Subscribers to * the source will only receive the last item emitted by the source. * @return an Observable that emits items that are the result of invoking the selector on a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable but contains only its last emission. */ public final Observable publishLast(Func1, ? extends Observable> selector) { return multicast(new Func0>() { @Override public final Subject call() { return AsyncSubject.create(); } }, selector); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by * a source Observable, then feeds the result of that function along with the second item * emitted by the source Observable into the same function, and so on until all items have been * emitted by the source Observable, and emits the final result from the final call to your * function as its sole item. *

* *

* This technique, which is called "reduce" here, is sometimes called "aggregate," "fold," * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, * has an {@code inject()} method that does a similar operation on lists. * * @param accumulator * an accumulator function to be invoked on each item emitted by the source * Observable, whose result will be used in the next accumulator call * @return an Observable that emits a single item that is the result of accumulating the items * emitted by the source Observable * @throws IllegalArgumentException * if the source Observable emits no items * @see RxJava Wiki: reduce() * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public final Observable reduce(Func2 accumulator) { /* * Discussion and confirmation of implementation at * https://github.com/Netflix/RxJava/issues/423#issuecomment-27642532 * * It should use last() not takeLast(1) since it needs to emit an error if the sequence is * empty. */ return create(OperationScan.scan(this, accumulator)).last(); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable and a specified seed value, then feeds the result of that function along * with the second item emitted by an Observable into the same function, and so on until all * items have been emitted by the source Observable, emitting the final result from the final * call to your function as its sole item. *

* *

* This technique, which is called "reduce" here, is sometimec called "aggregate," "fold," * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, * has an {@code inject()} method that does a similar operation on lists. * * @param initialValue * the initial (seed) accumulator value * @param accumulator * an accumulator function to be invoked on each item emitted by the source * Observable, the result of which will be used in the next accumulator call * @return an Observable that emits a single item that is the result of accumulating the output * from the items emitted by the source Observable * @see RxJava Wiki: reduce() * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public final Observable reduce(R initialValue, Func2 accumulator) { return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1); } /** * Returns an Observable that repeats the sequence of items emitted by the source Observable * indefinitely. *

* * * @return an Observable that emits the items emitted by the source Observable repeatedly and in * sequence * @see RxJava Wiki: repeat() * @see MSDN: Observable.Repeat */ public final Observable repeat() { return this.repeat(Schedulers.currentThread()); } /** * Returns an Observable that repeats the sequence of items emitted by the source Observable * indefinitely, on a particular scheduler. *

* * * @param scheduler * the scheduler to emit the items on * @return an Observable that emits the items emitted by the source Observable repeatedly and in * sequence * @see RxJava Wiki: repeat() * @see MSDN: Observable.Repeat */ public final Observable repeat(Scheduler scheduler) { return create(OperationRepeat.repeat(this, scheduler)); } /** * Returns a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable that will replay all of its items and notifications to any future {@link Observer}. *

* * * @return a {@link ConnectableObservable} that upon connection causes the source Observable to * emit its items to its {@link Observer}s * @see RxJava Wiki: replay() */ public final ConnectableObservable replay() { return OperationMulticast.multicast(this, ReplaySubject. create()); } /** * Returns an Observable that emits items that are the results of invoking a specified selector * on the items emitted by a {@link ConnectableObservable} that shares a single subscription to * the underlying Observable. *

* * * @param * the type of items emitted by the resulting Observable * @param selector * the selector function, which can use the multicasted sequence as many times as * needed, without causing multiple subscriptions to the Observable * @return an Observable that emits items that are the results of invoking the selector on a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final Observable replay(Func1, ? extends Observable> selector) { return OperationMulticast.multicast(this, new Func0>() { @Override public final Subject call() { return ReplaySubject.create(); } }, selector); } /** * Returns an Observable that emits items that are the results of invoking a specified selector * on items emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable, replaying {@code bufferSize} notifications. *

* * * @param * the type of items emitted by the resulting Observable * @param selector * the selector function, which can use the multicasted sequence as many times as * needed, without causing multiple subscriptions to the Observable * @param bufferSize * the buffer size that limits the number of items the connectable observable can * replay * @return an Observable that emits items that are the results of invoking the selector on * items emitted by a {@link ConnectableObservable} that shares a single subscription to * the underlying Observable replaying no more than {@code bufferSize} items * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final Observable replay(Func1, ? extends Observable> selector, final int bufferSize) { return OperationMulticast.multicast(this, new Func0>() { @Override public final Subject call() { return OperationReplay.replayBuffered(bufferSize); } }, selector); } /** * Returns an Observable that emits items that are the results of invoking a specified selector * on items emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable, replaying no more than {@code bufferSize} items that were emitted * within a specified time window. *

* * * @param * the type of items emitted by the resulting Observable * @param selector * a selector function, which can use the multicasted sequence as many times as * needed, without causing multiple subscriptions to the Observable * @param bufferSize * the buffer size that limits the number of items the connectable observable can * replay * @param time * the duration of the window in which the replayed items must have been emitted * @param unit * the time unit of {@code time} * @return an Observable that emits items that are the results of invoking the selector on items * emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable, and replay no more than {@code bufferSize} items that were * emitted within the window defined by {@code time} * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final Observable replay(Func1, ? extends Observable> selector, int bufferSize, long time, TimeUnit unit) { return replay(selector, bufferSize, time, unit, Schedulers.computation()); } /** * Returns an Observable that emits items that are the results of invoking a specified selector * on items emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable, replaying no more than {@code bufferSize} items that were emitted * within a specified time window. *

* * * @param * the type of items emitted by the resulting Observable * @param selector * a selector function, which can use the multicasted sequence as many times as * needed, without causing multiple subscriptions to the Observable * @param bufferSize * the buffer size that limits the number of items the connectable observable can * replay * @param time * the duration of the window in which the replayed items must have been emitted * @param unit * the time unit of {@code time} * @param scheduler * the scheduler that is the time source for the window * @return an Observable that emits items that are the results of invoking the selector on items * emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable, and replay no more than {@code bufferSize} items that were * emitted within the window defined by {@code time} * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final Observable replay(Func1, ? extends Observable> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } return OperationMulticast.multicast(this, new Func0>() { @Override public final Subject call() { return OperationReplay.replayWindowed(time, unit, bufferSize, scheduler); } }, selector); } /** * Returns an Observable that emits items that are the results of invoking a specified selector * on items emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable, replaying a maximum of {@code bufferSize} items. *

* * * @param * the type of items emitted by the resulting Observable * @param selector * a selector function, which can use the multicasted sequence as many times as * needed, without causing multiple subscriptions to the Observable * @param bufferSize * the buffer size that limits the number of items the connectable observable can * replay * @param scheduler * the scheduler on which the replay is observed * @return an Observable that emits items that are the results of invoking the selector on items * emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable, replaying no more than {@code bufferSize} notifications * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final Observable replay(Func1, ? extends Observable> selector, final int bufferSize, final Scheduler scheduler) { return OperationMulticast.multicast(this, new Func0>() { @Override public final Subject call() { return OperationReplay. createScheduledSubject(OperationReplay. replayBuffered(bufferSize), scheduler); } }, selector); } /** * Returns an Observable that emits items that are the results of invoking a specified selector * on items emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable, replaying all items that were emitted within a specified time window. *

* * * @param * the type of items emitted by the resulting Observable * @param selector * a selector function, which can use the multicasted sequence as many times as * needed, without causing multiple subscriptions to the Observable * @param time * the duration of the window in which the replayed items must have been emitted * @param unit * the time unit of {@code time} * @return an Observable that emits items that are the results of invoking the selector on items * emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable, replaying all items that were emitted within the window * defined by {@code time} * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final Observable replay(Func1, ? extends Observable> selector, long time, TimeUnit unit) { return replay(selector, time, unit, Schedulers.computation()); } /** * Returns an Observable that emits items that are the results of invoking a specified selector * on items emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable, replaying all items that were emitted within a specified time window. *

* * * @param * the type of items emitted by the resulting Observable * @param selector * a selector function, which can use the multicasted sequence as many times as * needed, without causing multiple subscriptions to the Observable * @param time * the duration of the window in which the replayed items must have been emitted * @param unit * the time unit of {@code time} * @param scheduler * the scheduler that is the time source for the window * @return an Observable that emits items that are the results of invoking the selector on items * emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable, replaying all items that were emitted within the window * defined by {@code time} * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final Observable replay(Func1, ? extends Observable> selector, final long time, final TimeUnit unit, final Scheduler scheduler) { return OperationMulticast.multicast(this, new Func0>() { @Override public final Subject call() { return OperationReplay.replayWindowed(time, unit, -1, scheduler); } }, selector); } /** * Returns an Observable that emits items that are the results of invoking a specified selector * on items emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable. *

* * * @param * the type of items emitted by the resulting Observable * @param selector * a selector function, which can use the multicasted sequence as many times as * needed, without causing multiple subscriptions to the Observable * @param scheduler * the scheduler where the replay is observed * @return an Observable that emits items that are the results of invoking the selector on items * emitted by a {@link ConnectableObservable} that shares a single subscription to the * underlying Observable, replaying all items * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final Observable replay(Func1, ? extends Observable> selector, final Scheduler scheduler) { return OperationMulticast.multicast(this, new Func0>() { @Override public final Subject call() { return OperationReplay.createScheduledSubject(ReplaySubject. create(), scheduler); } }, selector); } /** * Returns a {@link ConnectableObservable} that shares a single subscription to the source * Observable that replays at most {@code bufferSize} items emitted by that Observable. *

* * * @param bufferSize * the buffer size that limits the number of items that can be replayed * @return a {@link ConnectableObservable} that shares a single subscription to the source * Observable and replays at most {@code bufferSize} items emitted by that Observable * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final ConnectableObservable replay(int bufferSize) { return OperationMulticast.multicast(this, OperationReplay. replayBuffered(bufferSize)); } /** * Returns a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable and replays at most {@code bufferSize} items that were emitted during a specified * time window. *

* * * @param bufferSize * the buffer size that limits the number of items that can be replayed * @param time * the duration of the window in which the replayed items must have been emitted * @param unit * the time unit of {@code time} * @return a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable and replays at most {@code bufferSize} items that were emitted during the * window defined by {@code time} * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final ConnectableObservable replay(int bufferSize, long time, TimeUnit unit) { return replay(bufferSize, time, unit, Schedulers.computation()); } /** * Returns a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable and that replays a maximum of {@code bufferSize} items that are emitted within * a specified time window. *

* * * @param bufferSize * the buffer size that limits the number of items that can be replayed * @param time * the duration of the window in which the replayed items must have been emitted * @param unit * the time unit of {@code time} * @param scheduler * the scheduler that is used as a time source for the window * @return a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable and replays at most {@code bufferSize} items that were emitted during the * window defined by {@code time} * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final ConnectableObservable replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) { if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } return OperationMulticast.multicast(this, OperationReplay. replayWindowed(time, unit, bufferSize, scheduler)); } /** * Returns a {@link ConnectableObservable} that shares a single subscription to the source * Observable and replays at most {@code bufferSize} items emitted by that Observable. *

* * * @param bufferSize * the buffer size that limits the number of items that can be replayed * @param scheduler * the scheduler on which the Observers will observe the emitted items * @return a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable and replays at most {@code bufferSize} items that were emitted by the * Observable * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final ConnectableObservable replay(int bufferSize, Scheduler scheduler) { return OperationMulticast.multicast(this, OperationReplay.createScheduledSubject( OperationReplay. replayBuffered(bufferSize), scheduler)); } /** * Returns a {@link ConnectableObservable} that shares a single subscription to the source * Observable and replays all items emitted by that Observable within a specified time window. *

* * * @param time * the duration of the window in which the replayed items must have been emitted * @param unit * the time unit of {@code time} * @return a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable and replays the items that were emitted during the window defined by {@code time} * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final ConnectableObservable replay(long time, TimeUnit unit) { return replay(time, unit, Schedulers.computation()); } /** * Returns a {@link ConnectableObservable} that shares a single subscription to the source * Observable and replays all items emitted by that Observable within a specified time window. *

* * * @param time * the duration of the window in which the replayed items must have been emitted * @param unit * the time unit of {@code time} * @param scheduler * the scheduler that is the time source for the window * @return a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable and replays the items that were emitted during the window defined by {@code time} * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final ConnectableObservable replay(long time, TimeUnit unit, Scheduler scheduler) { return OperationMulticast.multicast(this, OperationReplay. replayWindowed(time, unit, -1, scheduler)); } /** * Returns a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable that will replay all of its items and notifications to any future {@link Observer} on the given {@link Scheduler}. *

* * * @param scheduler * the scheduler on which the Observers will observe the emitted items * @return a {@link ConnectableObservable} that shares a single subscription to the source * Observable that will replay all of its items and notifications to any future {@link Observer} on the given {@link Scheduler} * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public final ConnectableObservable replay(Scheduler scheduler) { return OperationMulticast.multicast(this, OperationReplay.createScheduledSubject(ReplaySubject. create(), scheduler)); } /** * Return an Observable that mirrors the source Observable, resubscribing to it if it calls {@code onError} (infinite retry count). *

* *

* If the source Observable calls {@link Observer#onError}, this method will resubscribe to the * source Observable. *

* Any and all items emitted by the source Observable will be emitted by the resulting * Observable, even those emitted during failed subscriptions. For example, if an Observable * fails at first but emits {@code [1, 2]} then succeeds the second time and emits {@code [1, 2, 3, 4, 5]} then the complete sequence of emissions and notifications would be * {@code [1, 2, 1, 2, 3, 4, 5, onCompleted]}. * * @return the source Observable modified with retry logic * @see RxJava Wiki: retry() */ public final Observable retry() { return create(OperationRetry.retry(this)); } /** * Return an Observable that mirrors the source Observable, resubscribing to it if it calls {@code onError} up to a certain number of retries. *

* *

* If the source Observable calls {@link Observer#onError}, this method will resubscribe to the * source Observable for a maximum of {@code retryCount} resubscriptions. *

* Any and all items emitted by the source Observable will be emitted by the resulting * Observable, even those emitted during failed subscriptions. For example, if an Observable * fails at first but emits {@code [1, 2]} then succeeds the second time and emits {@code [1, 2, 3, 4, 5]} then the complete sequence of emissions and notifications would be * {@code [1, 2, 1, 2, 3, 4, 5, onCompleted]}. * * @param retryCount * number of retry attempts before failing * @return the source Observable modified with retry logic * @see RxJava Wiki: retry() */ public final Observable retry(int retryCount) { return create(OperationRetry.retry(this, retryCount)); } /** * Returns an Observable that emits the results of sampling the items emitted by the source * Observable at a specified time interval. *

* * * @param period * the sampling rate * @param unit * the {@link TimeUnit} in which {@code period} is defined * @return an Observable that emits the results of sampling the items emitted by the source * Observable at the specified time interval * @see RxJava Wiki: sample() */ public final Observable sample(long period, TimeUnit unit) { return create(OperationSample.sample(this, period, unit)); } /** * Returns an Observable that emits the results of sampling the items emitted by the source * Observable at a specified time interval. *

* * * @param period * the sampling rate * @param unit * the {@link TimeUnit} in which {@code period} is defined * @param scheduler * the {@link Scheduler} to use when sampling * @return an Observable that emits the results of sampling the items emitted by the source * Observable at the specified time interval * @see RxJava Wiki: sample() */ public final Observable sample(long period, TimeUnit unit, Scheduler scheduler) { return create(OperationSample.sample(this, period, unit, scheduler)); } /** * Return an Observable that emits the results of sampling the items emitted by the source * Observable whenever the specified {@code sampler} Observable emits an item or completes. *

* * * @param sampler * the Observable to use for sampling the source Observable * @return an Observable that emits the results of sampling the items emitted by this Observable * whenever the {@code sampler} Observable emits an item or completes * @see RxJava Wiki: sample() */ public final Observable sample(Observable sampler) { return create(new OperationSample.SampleWithObservable(this, sampler)); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted * by the source Observable into the same function, and so on until all items have been emitted * by the source Observable, emitting the result of each of these iterations. *

* *

* This sort of function is sometimes called an accumulator. * * @param accumulator * an accumulator function to be invoked on each item emitted by the source * Observable, whose result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the next accumulator call * @return an Observable that emits the results of each call to the accumulator function * @see RxJava Wiki: scan() * @see MSDN: Observable.Scan */ public final Observable scan(Func2 accumulator) { return create(OperationScan.scan(this, accumulator)); } /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable and a seed value, then feeds the result of that function along with the * second item emitted by the source Observable into the same function, and so on until all * items have been emitted by the source Observable, emitting the result of each of these * iterations. *

* *

* This sort of function is sometimes called an accumulator. *

* Note that the Observable that results from this method will emit {@code initialValue} as its * first emitted item. * * @param initialValue * the initial (seed) accumulator item * @param accumulator * an accumulator function to be invoked on each item emitted by the source * Observable, whose result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the next accumulator call * @return an Observable that emits the results of each call to the accumulator function * @see RxJava Wiki: scan() * @see MSDN: Observable.Scan */ public final Observable scan(R initialValue, Func2 accumulator) { return create(OperationScan.scan(this, initialValue, accumulator)); } /** * If the source Observable completes after emitting a single item, return an Observable * that emits that item. If the source Observable emits more than one item or no items, throw * an {@code IllegalArgumentException}. *

* * * @return an Observable that emits the single item emitted by the source Observable * @throws IllegalArgumentException * if the source emits more than one item or no items * @see RxJava Wiki: single() * @see MSDN: {@code Observable.singleAsync()} */ public final Observable single() { return create(OperationSingle. single(this)); } /** * If the Observable completes after emitting a single item that matches a specified predicate, * return an Observable that emits that item. If the source Observable emits more than one such * item or no such items, throw an {@code IllegalArgumentException}. *

* * * @param predicate * a predicate function to evaluate items emitted by the source Observable * @return an Observable that emits the single item emitted by the source Observable that * matches the predicate * @throws IllegalArgumentException * if the source Observable emits either more than one item that matches the * predicate or no items that match the predicate * @see RxJava Wiki: single() * @see MSDN: {@code Observable.singleAsync()} */ public final Observable single(Func1 predicate) { return filter(predicate).single(); } /** * If the source Observable completes after emitting a single item, return an Observable that * emits that item; if the source Observable is empty, return an Observable that emits a default * item. If the source Observable emits more than one item, throw an {@code IllegalArgumentException.}

* * * @param defaultValue * a default value to emit if the source Observable emits no item * @return an Observable that emits the single item emitted by the source Observable, or a * default item if the source Observable is empty * @throws IllegalArgumentException * if the source Observable emits more than one item * @see RxJava Wiki: single() * @see MSDN: {@code Observable.singleOrDefaultAsync()} */ public final Observable singleOrDefault(T defaultValue) { return create(OperationSingle. singleOrDefault(this, defaultValue)); } /** * If the Observable completes after emitting a single item that matches a predicate, return an * Observable that emits that item; if the source Observable emits no such item, return an * Observable that emits a default item. If the source Observable emits more than one such item, * throw an {@code IllegalArgumentException}. *

* * * @param defaultValue * a default item to emit if the source Observable emits no matching items * @param predicate * a predicate function to evaluate items emitted by the source Observable * @return an Observable that emits the single item emitted by the source Observable that * matches the predicate, or the default item if no emitted item matches the predicate * @throws IllegalArgumentException * if the source Observable emits more than one item that matches the predicate * @see RxJava Wiki: single() * @see MSDN: {@code Observable.singleOrDefaultAsync()} */ public final Observable singleOrDefault(T defaultValue, Func1 predicate) { return filter(predicate).singleOrDefault(defaultValue); } /** * Returns an Observable that skips the first {@code num} items emitted by the source Observable * and emits the remainder. *

* * * @param num * the number of items to skip * @return an Observable that is identical to the source Observable except that it does not * emit the first {@code num} items that the source Observable emits * @see RxJava Wiki: skip() */ public final Observable skip(int num) { return create(OperationSkip.skip(this, num)); } /** * Returns an Observable that skips values emitted by the source Observable before a specified * time window elapses. *

* * * @param time * the length of the time window to skip * @param unit * the time unit of {@code time} * @return an Observable that skips values emitted by the source Observable before the time * window defined by {@code time} elapses * @see RxJava Wiki: skip() */ public final Observable skip(long time, TimeUnit unit) { return skip(time, unit, Schedulers.computation()); } /** * Returns an Observable that skips values emitted by the source Observable before a specified * time window on a specified {@link Scheduler} elapses. *

* * * @param time * the length of the time window to skip * @param unit * the time unit of {@code time} * @param scheduler * the {@link Scheduler} on which the timed wait happens * @return an Observable that skips values emitted by the source Observable before the time * window defined by {@code time} and {@code scheduler} elapses * @see RxJava Wiki: skip() */ public final Observable skip(long time, TimeUnit unit, Scheduler scheduler) { return create(new OperationSkip.SkipTimed(this, time, unit, scheduler)); } /** * Returns an Observable that drops a specified number of items from the end of the sequence * emitted by the source Observable. *

* *

* This operator accumulates a queue long enough to store the first {@code count} items. As more * items are received, items are taken from the front of the queue and emitted by the returned * Observable. This causes such items to be delayed. * * @param count * number of items to drop from the end of the source sequence * @return an Observable that emits the items emitted by the source Observable except for the * dropped ones at the end * @throws IndexOutOfBoundsException * if {@code count} is less than zero * @see RxJava Wiki: skipLast() * @see MSDN: Observable.SkipLast */ public final Observable skipLast(int count) { return create(OperationSkipLast.skipLast(this, count)); } /** * Returns an Observable that drops items emitted by the source Observable during a specified * time window before the source completes. *

* * * @param time * the length of the time window * @param unit * the time unit of {@code time} * @return an Observable that drops those items emitted by the source Observable in a time * window before the source completes defined by {@code time} * @see RxJava Wiki: skipLast() * @see MSDN: Observable.SkipLast */ public final Observable skipLast(long time, TimeUnit unit) { return skipLast(time, unit, Schedulers.computation()); } /** * Returns an Observable that drops items emitted by the source Observable during a specified * time window (defined on a specified scheduler) before the source completes. *

* * * @param time * the length of the time window * @param unit * the time unit of {@code time} * @param scheduler * the scheduler used as the time source * @return an Observable that drops those items emitted by the source Observable in a time * window before the source completes defined by {@code time} and {@code scheduler} * @see RxJava Wiki: skipLast() * @see MSDN: Observable.SkipLast */ public final Observable skipLast(long time, TimeUnit unit, Scheduler scheduler) { return create(new OperationSkipLast.SkipLastTimed(this, time, unit, scheduler)); } /** * Returns an Observable that skips items emitted by the source Observable until a second * Observable emits an item. *

* * * @param other * the second Observable that has to emit an item before the source Observable's * elements begin to be mirrored by the resulting Observable * @return an Observable that skips items from the source Observable until the second * Observable emits an item, then emits the remaining items * @see RxJava Wiki: skipUntil() * @see MSDN: Observable.SkipUntil */ public final Observable skipUntil(Observable other) { return create(new OperationSkipUntil(this, other)); } /** * Returns an Observable that skips all items emitted by the source Observable as long as a * specified condition holds true, but emits all further source items as soon as the condition * becomes false. *

* * * @param predicate * a function to test each item emitted from the source Observable * @return an Observable that begins emitting items emitted by the source Observable when the * specified predicate becomes false * @see RxJava Wiki: skipWhile() * @see MSDN: Observable.SkipWhile */ public final Observable skipWhile(Func1 predicate) { return create(OperationSkipWhile.skipWhile(this, predicate)); } /** * Returns an Observable that skips all items emitted by the source Observable as long as a * specified condition holds true, but emits all further source items as soon as the condition * becomes false. *

* * * @param predicate * a function to test each item emitted from the source Observable. It takes the * emitted item as the first parameter and the sequential index of the emitted item * as a second parameter. * @return an Observable that begins emitting items emitted by the source Observable when the * specified predicate becomes false * @see RxJava Wiki: skipWhileWithIndex() * @see MSDN: Observable.SkipWhile */ public final Observable skipWhileWithIndex(Func2 predicate) { return create(OperationSkipWhile.skipWhileWithIndex(this, predicate)); } /** * Returns an Observable that emits the items in a specified {@link Iterable} before it begins * to emit items emitted by the source Observable. *

* * * @param values * an Iterable that contains the items you want the modified Observable to emit first * @return an Observable that emits the items in the specified {@link Iterable} and then emits * the items emitted by the source Observable * @see RxJava Wiki: startWith() */ public final Observable startWith(Iterable values) { return concat(Observable. from(values), this); } /** * Returns an Observable that emits the items in a specified {@link Iterable}, on a specified {@link Scheduler} before it begins to emit items emitted by the source Observable. *

* * * @param values * an Iterable that contains the items you want the modified Observable to emit first * @param scheduler * the scheduler to emit the prepended values on * @return an Observable that emits the items in the specified {@link Iterable} and then emits * the items emitted by the source Observable * @see RxJava Wiki: startWith() * @see MSDN: Observable.StartWith */ public final Observable startWith(Iterable values, Scheduler scheduler) { return concat(from(values, scheduler), this); } /** * Returns an Observable that emits a specified item before it begins to emit items emitted by * the source Observable. *

* * * @param t1 * the item to emit * @return an Observable that emits the specified item before it begins to emit items emitted by * the source Observable * @see RxJava Wiki: startWith() */ public final Observable startWith(T t1) { return concat(Observable. from(t1), this); } /** * Returns an Observable that emits the specified items before it begins to emit items emitted * by the source Observable. *

* * * @param t1 * the first item to emit * @param t2 * the second item to emit * @return an Observable that emits the specified items before it begins to emit items emitted * by the source Observable * @see RxJava Wiki: startWith() */ public final Observable startWith(T t1, T t2) { return concat(Observable. from(t1, t2), this); } /** * Returns an Observable that emits the specified items before it begins to emit items emitted * by the source Observable. *

* * * @param t1 * the first item to emit * @param t2 * the second item to emit * @param t3 * the third item to emit * @return an Observable that emits the specified items before it begins to emit items emitted * by the source Observable * @see RxJava Wiki: startWith() */ public final Observable startWith(T t1, T t2, T t3) { return concat(Observable. from(t1, t2, t3), this); } /** * Returns an Observable that emits the specified items before it begins to emit items emitted * by the source Observable. *

* * * @param t1 * the first item to emit * @param t2 * the second item to emit * @param t3 * the third item to emit * @param t4 * the fourth item to emit * @return an Observable that emits the specified items before it begins to emit items emitted * by the source Observable * @see RxJava Wiki: startWith() */ public final Observable startWith(T t1, T t2, T t3, T t4) { return concat(Observable. from(t1, t2, t3, t4), this); } /** * Returns an Observable that emits the specified items before it begins to emit items emitted * by the source Observable. *

* * * @param t1 * the first item to emit * @param t2 * the second item to emit * @param t3 * the third item to emit * @param t4 * the fourth item to emit * @param t5 * the fifth item to emit * @return an Observable that emits the specified items before it begins to emit items emitted * by the source Observable * @see RxJava Wiki: startWith() */ public final Observable startWith(T t1, T t2, T t3, T t4, T t5) { return concat(Observable. from(t1, t2, t3, t4, t5), this); } /** * Returns an Observable that emits the specified items before it begins to emit items emitted * by the source Observable. *

* * * @param t1 * the first item to emit * @param t2 * the second item to emit * @param t3 * the third item to emit * @param t4 * the fourth item to emit * @param t5 * the fifth item to emit * @param t6 * the sixth item to emit * @return an Observable that emits the specified items before it begins to emit items emitted * by the source Observable * @see RxJava Wiki: startWith() */ public final Observable startWith(T t1, T t2, T t3, T t4, T t5, T t6) { return concat(Observable. from(t1, t2, t3, t4, t5, t6), this); } /** * Returns an Observable that emits the specified items before it begins to emit items emitted * by the source Observable. *

* * * @param t1 * the first item to emit * @param t2 * the second item to emit * @param t3 * the third item to emit * @param t4 * the fourth item to emit * @param t5 * the fifth item to emit * @param t6 * the sixth item to emit * @param t7 * the seventh item to emit * @return an Observable that emits the specified items before it begins to emit items emitted * by the source Observable * @see RxJava Wiki: startWith() */ public final Observable startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7) { return concat(Observable. from(t1, t2, t3, t4, t5, t6, t7), this); } /** * Returns an Observable that emits the specified items before it begins to emit items emitted * by the source Observable. *

* * * @param t1 * the first item to emit * @param t2 * the second item to emit * @param t3 * the third item to emit * @param t4 * the fourth item to emit * @param t5 * the fifth item to emit * @param t6 * the sixth item to emit * @param t7 * the seventh item to emit * @param t8 * the eighth item to emit * @return an Observable that emits the specified items before it begins to emit items emitted * by the source Observable * @see RxJava Wiki: startWith() */ public final Observable startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) { return concat(Observable. from(t1, t2, t3, t4, t5, t6, t7, t8), this); } /** * Returns an Observable that emits the specified items before it begins to emit items emitted * by the source Observable. *

* * * @param t1 * the first item to emit * @param t2 * the second item to emit * @param t3 * the third item to emit * @param t4 * the fourth item to emit * @param t5 * the fifth item to emit * @param t6 * the sixth item to emit * @param t7 * the seventh item to emit * @param t8 * the eighth item to emit * @param t9 * the ninth item to emit * @return an Observable that emits the specified items before it begins to emit items emitted * by the source Observable * @see RxJava Wiki: startWith() */ public final Observable startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9) { return concat(Observable. from(t1, t2, t3, t4, t5, t6, t7, t8, t9), this); } /** * Returns an Observable that emits the items from a specified array, on a specified scheduler, * before it begins to emit items emitted by the source Observable. *

* * * @param values * the items you want the modified Observable to emit first * @param scheduler * the scheduler to emit the prepended values on * @return an Observable that emits the items from {@code values}, on {@code scheduler}, before * it begins to emit items emitted by the source Observable. * @see RxJava Wiki: startWith() * @see MSDN: Observable.StartWith */ public final Observable startWith(T[] values, Scheduler scheduler) { return startWith(Arrays.asList(values), scheduler); } // TODO should this be called `observe` instead of `subscribe`? public final void subscribe(Operator o) { f.call(o); } /** * Subscribe and ignore all events. * * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving * items before the Observable has finished sending them */ public final Subscription subscribe() { return protectivelyWrapAndSubscribe(new Observer() { @Override public final void onCompleted() { // do nothing } @Override public final void onError(Throwable e) { throw new OnErrorNotImplementedException(e); } @Override public final void onNext(T args) { // do nothing } }); } /** * An {@link Observer} must call an Observable's {@code subscribe} method in order to receive * items and notifications from the Observable. * * @param onNext * FIXME FIXME FIXME * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving * items before the Observable has finished sending them * @see RxJava Wiki: onNext, onCompleted, and onError */ public final Subscription subscribe(final Action1 onNext) { if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public final void onCompleted() { // do nothing } @Override public final void onError(Throwable e) { throw new OnErrorNotImplementedException(e); } @Override public final void onNext(T args) { onNext.call(args); } }); } /** * An {@link Observer} must call an Observable's {@code subscribe} method in order to receive * items and notifications from the Observable. * * @param onNext * FIXME FIXME FIXME * @param onError * FIXME FIXME FIXME * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving * items before the Observable has finished sending them * @see RxJava Wiki: onNext, onCompleted, and onError */ public final Subscription subscribe(final Action1 onNext, final Action1 onError) { if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } if (onError == null) { throw new IllegalArgumentException("onError can not be null"); } /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on * "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public final void onCompleted() { // do nothing } @Override public final void onError(Throwable e) { onError.call(e); } @Override public final void onNext(T args) { onNext.call(args); } }); } /** * An {@link Observer} must call an Observable's {@code subscribe} method in order to receive * items and notifications from the Observable. * * @param onNext * FIXME FIXME FIXME * @param onError * FIXME FIXME FIXME * @param onComplete * FIXME FIXME FIXME * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving * items before the Observable has finished sending them * @see RxJava Wiki: onNext, onCompleted, and onError */ public final Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) { if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } if (onError == null) { throw new IllegalArgumentException("onError can not be null"); } if (onComplete == null) { throw new IllegalArgumentException("onComplete can not be null"); } /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public final void onCompleted() { onComplete.call(); } @Override public final void onError(Throwable e) { onError.call(e); } @Override public final void onNext(T args) { onNext.call(args); } }); } /** * An {@link Observer} must call an Observable's {@code subscribe} method in order to receive * items and notifications from the Observable. * * @param onNext * FIXME FIXME FIXME * @param onError * FIXME FIXME FIXME * @param onComplete * FIXME FIXME FIXME * @param scheduler * FIXME FIXME FIXME * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving * items before the Observable has finished sending them * @see RxJava Wiki: onNext, onCompleted, and onError */ public final Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); } /** * An {@link Observer} must call an Observable's {@code subscribe} method in order to receive * items and notifications from the Observable. * * @param onNext * FIXME FIXME FIXME * @param onError * FIXME FIXME FIXME * @param scheduler * FIXME FIXME FIXME * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving * items before the Observable has finished sending them * @see RxJava Wiki: onNext, onCompleted, and onError */ public final Subscription subscribe(final Action1 onNext, final Action1 onError, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError); } /** * An {@link Observer} must call an Observable's {@code subscribe} method in order to receive * items and notifications from the Observable. * * @param onNext * FIXME FIXME FIXME * @param scheduler * FIXME FIXME FIXME * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving * items before the Observable has finished sending them * @see RxJava Wiki: onNext, onCompleted, and onError */ public final Subscription subscribe(final Action1 onNext, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext); } /** * An {@link Observer} must call an Observable's {@code subscribe} method in order to receive * items and notifications from the Observable. *

* A typical implementation of {@code subscribe} does the following: *

    *
  1. It stores a reference to the Observer in a collection object, such as a {@code List} object.
  2. *
  3. It returns a reference to the {@link Subscription} interface. This enables Observers to * unsubscribe, that is, to stop receiving items and notifications before the Observable stops * sending them, which also invokes the Observer's {@link Observer#onCompleted onCompleted} method.
  4. *

* An {@code Observable} instance is responsible for accepting all subscriptions and * notifying all Observers. Unless the documentation for a particular {@code Observable} implementation indicates otherwise, Observers should make no assumptions about the order in * which multiple Observers will receive their notifications. *

* For more information see the * RxJava Wiki * * @param observer * the {@link Observer} * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving * items before the Observable has finished sending them * @throws IllegalArgumentException * if the {@link Observer} provided as the argument to {@code subscribe()} is {@code null} */ public final Subscription subscribe(Observer observer) { // allow the hook to intercept and/or decorate Action1> onSubscribeFunction = hook.onSubscribeStart(this, f); // validate and proceed if (observer == null) { throw new IllegalArgumentException("observer can not be null"); } if (onSubscribeFunction == null) { throw new IllegalStateException("onSubscribe function can not be null."); // the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception } try { Operator op = null; /** * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ if (isInternalImplementation(observer)) { op = Operator.create(observer, new CompositeSubscription()); onSubscribeFunction.call(op); } else { // TODO this doesn't seem correct any longer with the Operator and injecting of CompositeSubscription SafeObservableSubscription subscription = new SafeObservableSubscription(op); op = Operator.create(new SafeObserver(subscription, observer), new CompositeSubscription()); onSubscribeFunction.call(op); } return hook.onSubscribeReturn(this, op); } catch (OnErrorNotImplementedException e) { // special handling when onError is not implemented ... we just rethrow throw e; } catch (Throwable e) { // if an unhandled error occurs executing the onSubscribe we will propagate it try { observer.onError(hook.onSubscribeError(this, e)); } catch (OnErrorNotImplementedException e2) { // special handling when onError is not implemented ... we just rethrow throw e2; } catch (Throwable e2) { // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); hook.onSubscribeError(this, r); throw r; } return Subscriptions.empty(); } } /** * An {@link Observer} must call an Observable's {@code subscribe} method in order to receive * items and notifications from the Observable. *

* A typical implementation of {@code subscribe} does the following: *

    *
  1. It stores a reference to the Observer in a collection object, such as a {@code List} object.
  2. *
  3. It returns a reference to the {@link Subscription} interface. This enables Observers to * unsubscribe, that is, to stop receiving items and notifications before the Observable stops * sending them, which also invokes the Observer's {@link Observer#onCompleted onCompleted} method.
  4. *

* An {@code Observable} instance is responsible for accepting all subscriptions and * notifying all Observers. Unless the documentation for a particular {@code Observable} implementation indicates otherwise, Observers should make no assumptions about the order in * which multiple Observers will receive their notifications. *

* For more information see the * RxJava Wiki * * @param observer * the {@link Observer} * @param scheduler * the {@link Scheduler} on which Observers subscribe to the Observable * @return a {@link Subscription} reference with which Observers can stop receiving items and * notifications before the Observable has finished sending them * @throws IllegalArgumentException * if an argument to {@code subscribe()} is {@code null} */ public final Subscription subscribe(Observer observer, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(observer); } /** * Asynchronously subscribes and unsubscribes Observers to this Observable on the specified {@link Scheduler}. *

* * * @param scheduler * the {@link Scheduler} to perform subscription and unsubscription actions on * @return the source Observable modified so that its subscriptions and unsubscriptions happen * on the specified {@link Scheduler} * @see RxJava Wiki: subscribeOn() */ public final Observable subscribeOn(Scheduler scheduler) { return create(OperationSubscribeOn.subscribeOn(this, scheduler)); } /** * Returns an Observable that extracts a Double from each of the items emitted by the source * Observable via a function you specify, and then emits the sum of these Doubles. *

* * * @param valueExtractor * the function to extract a Double from each item emitted by the source Observable * @return an Observable that emits the Double sum of the Double values corresponding to the * items emitted by the source Observable as transformed by the provided function * @see RxJava Wiki: sumDouble() * @see MSDN: Observable.Sum */ public final Observable sumDouble(Func1 valueExtractor) { return create(new OperationSum.SumDoubleExtractor(this, valueExtractor)); } /** * Returns an Observable that extracts a Float from each of the items emitted by the source * Observable via a function you specify, and then emits the sum of these Floats. *

* * * @param valueExtractor * the function to extract a Float from each item emitted by the source Observable * @return an Observable that emits the Float sum of the Float values corresponding to the items * emitted by the source Observable as transformed by the provided function * @see RxJava Wiki: sumFloat() * @see MSDN: Observable.Sum */ public final Observable sumFloat(Func1 valueExtractor) { return create(new OperationSum.SumFloatExtractor(this, valueExtractor)); } /** * Returns an Observable that extracts an Integer from each of the items emitted by the source * Observable via a function you specify, and then emits the sum of these Integers. *

* * * @param valueExtractor * the function to extract an Integer from each item emitted by the source Observable * @return an Observable that emits the Integer sum of the Integer values corresponding to the * items emitted by the source Observable as transformed by the provided function * @see RxJava Wiki: sumInteger() * @see MSDN: Observable.Sum */ public final Observable sumInteger(Func1 valueExtractor) { return create(new OperationSum.SumIntegerExtractor(this, valueExtractor)); } /** * Returns an Observable that extracts a Long from each of the items emitted by the source * Observable via a function you specify, and then emits the sum of these Longs. *

* * * @param valueExtractor * the function to extract a Long from each item emitted by the source Observable * @return an Observable that emits the Long sum of the Long values corresponding to the items * emitted by the source Observable as transformed by the provided function * @see RxJava Wiki: sumLong() * @see MSDN: Observable.Sum */ public final Observable sumLong(Func1 valueExtractor) { return create(new OperationSum.SumLongExtractor(this, valueExtractor)); } /** * Returns a new Observable by applying a function that you supply to each item emitted by the * source Observable that returns an Observable, and then emitting the items emitted by the * most recently emitted of these Observables. *

* * * @param func * a function that, when applied to an item emitted by the source Observable, returns * an Observable * @return an Observable that emits the items emitted by the Observable returned from applying {@code func} to the most recently emitted item emitted by the source Observable */ public final Observable switchMap(Func1> func) { return switchOnNext(map(func)); } /** * Wraps the source Observable in another Observable that ensures that the resulting Observable * is chronologically well-behaved. *

* *

* A well-behaved Observable does not interleave its invocations of the {@link Observer#onNext onNext}, {@link Observer#onCompleted onCompleted}, and {@link Observer#onError onError} methods of * its {@link Observer}s; it invokes either {@code onCompleted} or {@code onError} only once; and it never invokes {@code onNext} after * invoking either {@code onCompleted} or {@code onError}. {@code synchronize} enforces this, * and the Observable it returns invokes {@code onNext} and {@code onCompleted} or {@code onError} synchronously. * * @return an Observable that is a chronologically well-behaved version of the source * Observable, and that synchronously notifies its {@link Observer}s * @see RxJava Wiki: synchronize() */ public final Observable synchronize() { return create(OperationSynchronize.synchronize(this)); } /** * Wraps the source Observable in another Observable that ensures that the resulting Observable * is chronologically well-behaved by acquiring a mutual-exclusion lock for the object provided * as the {@code lock} parameter. *

* *

* A well-behaved Observable does not interleave its invocations of the {@link Observer#onNext onNext}, {@link Observer#onCompleted onCompleted}, and {@link Observer#onError onError} methods of * its {@link Observer}s; it invokes either {@code onCompleted} or {@code onError} only once; and it never invokes {@code onNext} after * invoking either {@code onCompleted} or {@code onError}. {@code synchronize} enforces this, * and the Observable it returns invokes {@code onNext} and {@code onCompleted} or {@code onError} synchronously. * * @param lock * the lock object to synchronize each observer call on * @return an Observable that is a chronologically well-behaved version of the source * Observable, and that synchronously notifies its {@link Observer}s * @see RxJava Wiki: synchronize() */ public final Observable synchronize(Object lock) { return create(OperationSynchronize.synchronize(this, lock)); } /** * Returns an Observable that emits only the first {@code num} items emitted by the source * Observable. *

* *

* This method returns an Observable that will invoke a subscribing {@link Observer}'s {@link Observer#onNext onNext} function a maximum of {@code num} times before invoking * {@link Observer#onCompleted onCompleted}. * * @param num * the maximum number of items to emit * @return an Observable that emits only the first {@code num} items emitted by the source * Observable, or all of the items from the source Observable if that Observable emits * fewer than {@code num} items * @see RxJava Wiki: take() */ public final Observable take(final int num) { return bind(new OperatorTake(num)); } /** * Returns an Observable that emits those items emitted by source Observable before a specified * time runs out. *

* * * @param time * the length of the time window * @param unit * the time unit of {@code time} * @return an Observable that emits those items emitted by the source Observable before the time * runs out * @see RxJava Wiki: take() */ public final Observable take(long time, TimeUnit unit) { return take(time, unit, Schedulers.computation()); } /** * Returns an Observable that emits those items emitted by source Observable before a specified * time (on a specified scheduler) runs out. *

* * * @param time * the length of the time window * @param unit * the time unit of {@code time} * @param scheduler * the scheduler used for time source * @return an Observable that emits those items emitted by the source Observable before the time * runs out, according to the specified scheduler * @see RxJava Wiki: take() */ public final Observable take(long time, TimeUnit unit, Scheduler scheduler) { return create(new OperatorTakeTimed.TakeTimed(this, time, unit, scheduler)); } /** * Returns an Observable that emits only the very first item emitted by the source Observable. *

* * * @return an Observable that emits only the very first item emitted by the source Observable, * or an empty Observable if the source Observable completes without emitting a single * item * @see RxJava Wiki: first() * @see MSDN: {@code Observable.firstAsync()} * @deprecated use {@code take(1)} directly */ @Deprecated public final Observable takeFirst() { return take(1); } /** * Returns an Observable that emits only the very first item emitted by the source Observable * that satisfies a specified condition. *

* * * @param predicate * the condition any item emitted by the source Observable has to satisfy * @return an Observable that emits only the very first item emitted by the source Observable * that satisfies the given condition, or that completes without emitting anything if * the source Observable completes without emitting a single condition-satisfying item * @see RxJava Wiki: first() * @see MSDN: {@code Observable.firstAsync()} */ public final Observable takeFirst(Func1 predicate) { return filter(predicate).take(1); } /** * Returns an Observable that emits only the last {@code count} items emitted by the source * Observable. *

* * * @param count * the number of items to emit from the end of the sequence of items emitted by the * source Observable * @return an Observable that emits only the last {@code count} items emitted by the source * Observable * @see RxJava Wiki: takeLast() */ public final Observable takeLast(final int count) { return create(OperationTakeLast.takeLast(this, count)); } /** * Return an Observable that emits at most a specified number of items from the source * Observable that were emitted in a specified window of time before the Observable completed. *

* * * @param count * the maximum number of items to emit * @param time * the length of the time window * @param unit * the time unit of {@code time} * @return an Observable that emits at most {@code count} items from the source Observable that * were emitted in a specified window of time before the Observable completed. */ public final Observable takeLast(int count, long time, TimeUnit unit) { return takeLast(count, time, unit, Schedulers.computation()); } /** * Return an Observable that emits at most a specified number of items from the source * Observable that were emitted in a specified window of time before the Observable * completed, where the timing information is provided by a given scheduler. *

* * * @param count * the maximum number of items to emit * @param time * the length of the time window * @param unit * the time unit of {@code time} * @param scheduler * the Scheduler that provides the timestamps for the observed items * @return an Observable that emits at most {@code count} items from the source Observable that * were emitted in a specified window of time before the Observable completed, where * the timing information is provided by the given {@code scheduler} */ public final Observable takeLast(int count, long time, TimeUnit unit, Scheduler scheduler) { if (count < 0) { throw new IllegalArgumentException("count >= 0 required"); } return create(OperationTakeLast.takeLast(this, count, time, unit, scheduler)); } /** * Return an Observable that emits the items from the source Observable that were emitted in a * specified window of time before the Observable completed. *

* * * @param time * the length of the time window * @param unit * the time unit of {@code time} * @return an Observable that emits the items from the source Observable that were emitted in * the window of time before the Observable completed specified by {@code time} */ public final Observable takeLast(long time, TimeUnit unit) { return takeLast(time, unit, Schedulers.computation()); } /** * Return an Observable that emits the items from the source Observable that were emitted in a * specified window of time before the Observable completed, where the timing information is * provided by a specified scheduler. *

* * * @param time * the length of the time window * @param unit * the time unit of {@code time} * @param scheduler * the Scheduler that provides the timestamps for the Observed items * @return an Observable that emits the items from the source Observable that were emitted in * the window of time before the Observable completed specified by {@code time}, where * the timing information is provided by {@code scheduler} */ public final Observable takeLast(long time, TimeUnit unit, Scheduler scheduler) { return create(OperationTakeLast.takeLast(this, time, unit, scheduler)); } /** * Return an Observable that emits a single List containing the last {@code count} elements * emitted by the source Observable. *

* * * @param count * the number of items to emit in the list * @return an Observable that emits a single list containing the last {@code count} elements * emitted by the source Observable */ public final Observable> takeLastBuffer(int count) { return takeLast(count).toList(); } /** * Return an Observable that emits a single List containing at most {@code count} items from * the source Observable that were emitted during a specified window of time before the * source Observable completed. *

* * * @param count * the maximum number of items to emit * @param time * the length of the time window * @param unit * the time unit of {@code time} * @return an Observable that emits a single List containing at most {@code count} items emitted * by the source Observable during the time window defined by {@code time} before the * source Observable completed */ public final Observable> takeLastBuffer(int count, long time, TimeUnit unit) { return takeLast(count, time, unit).toList(); } /** * Return an Observable that emits a single List containing at most {@code count} items from * the source Observable that were emitted during a specified window of time (on a specified * scheduler) before the source Observable completed. *

* * * @param count * the maximum number of items to emit * @param time * the length of the time window * @param unit * the time unit of {@code time} * @param scheduler * the scheduler that provides the timestamps for the observed items * @return an Observable that emits a single List containing at most {@code count} items emitted * by the source Observable during the time window defined by {@code time} before the * source Observable completed */ public final Observable> takeLastBuffer(int count, long time, TimeUnit unit, Scheduler scheduler) { return takeLast(count, time, unit, scheduler).toList(); } /** * Return an Observable that emits a single List containing those items from the source * Observable that were emitted during a specified window of time before the source Observable * completed. *

* * * @param time * the length of the time window * @param unit * the time unit of {@code time} * @return an Observable that emits a single List containing the items emitted by the source * Observable during the time window defined by {@code time} before the source * Observable completed */ public final Observable> takeLastBuffer(long time, TimeUnit unit) { return takeLast(time, unit).toList(); } /** * Return an Observable that emits a single List containing those items from the source * Observable that were emitted during a specified window of time before the source Observable * completed, where the timing information is provided by the given Scheduler. *

* * * @param time * the length of the time window * @param unit * the time unit of {@code time} * @param scheduler * the Scheduler that provides the timestamps for the observed items * @return an Observable that emits a single List containing the items emitted by the source * Observable during the time window defined by {@code time} before the source * Observable completed, where the timing information is provided by {@code scheduler} */ public final Observable> takeLastBuffer(long time, TimeUnit unit, Scheduler scheduler) { return takeLast(time, unit, scheduler).toList(); } /** * Returns an Observable that emits the items emitted by the source Observable until a second * Observable emits an item. *

* * * @param other * the Observable whose first emitted item will cause {@code takeUntil} to stop * emitting items from the source Observable * @param * the type of items emitted by {@code other} * @return an Observable that emits the items emitted by the source Observable until such time * as {@code other} emits its first item * @see RxJava Wiki: takeUntil() */ public final Observable takeUntil(Observable other) { return OperationTakeUntil.takeUntil(this, other); } /** * Returns an Observable that emits items emitted by the source Observable so long as each item * satisfied a specified condition, and then completes as soon as this condition is not * satisfied. *

* * * @param predicate * a function that evaluates an item emitted by the source Observable and returns a * Boolean * @return an Observable that emits the items from the source Observable so long as each item * satisfies the condition defined by {@code predicate}, then completes * @see RxJava Wiki: takeWhile() */ public final Observable takeWhile(final Func1 predicate) { return create(OperationTakeWhile.takeWhile(this, predicate)); } /** * Returns an Observable that emits the items emitted by a source Observable so long as a given * predicate remains true, where the predicate operates on both the item and its index relative * to the complete sequence of emitted items. *

* * * @param predicate * a function to test each item emitted by the source Observable for a condition; the * second parameter of the function represents the sequential index of the source * item; it returns a Boolean * @return an Observable that emits items from the source Observable so long as the predicate * continues to return {@code true} for each item, then completes * @see RxJava Wiki: takeWhileWithIndex() */ public final Observable takeWhileWithIndex(final Func2 predicate) { return create(OperationTakeWhile.takeWhileWithIndex(this, predicate)); } /** * Matches when the Observable has an available item and projects the item by invoking the * selector function. *

* * * @param selector * selector that will be invoked for items emitted by the source Observable * @return a {@link Plan} that produces the projected results, to be fed (with other Plans) to * the {@link #when} operator * @throws NullPointerException * if {@code selector} is null * @see RxJava Wiki: then() * @see MSDN: Observable.Then */ public final Plan0 then(Func1 selector) { return OperationJoinPatterns.then(this, selector); } /** * Returns an Observable that emits only the first item emitted by the source Observable during * sequential time windows of a specified duration. *

* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals. *

* * * @param windowDuration * time to wait before emitting another item after emitting the last item * @param unit * the unit of time of {@code windowDuration} * @return an Observable that performs the throttle operation * @see RxJava Wiki: throttleFirst() */ public final Observable throttleFirst(long windowDuration, TimeUnit unit) { return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit)); } /** * Returns an Observable that emits only the first item emitted by the source Observable during * sequential time windows of a specified duration, where the windows are managed by a specified * Scheduler. *

* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals. *

* * * @param skipDuration * time to wait before emitting another item after emitting the last item * @param unit * the unit of time of {@code skipDuration} * @param scheduler * the {@link Scheduler} to use internally to manage the timers that handle timeout * for each event * @return an Observable that performs the throttle operation * @see RxJava Wiki: throttleFirst() */ public final Observable throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) { return create(OperationThrottleFirst.throttleFirst(this, skipDuration, unit, scheduler)); } /** * Returns an Observable that emits only the last item emitted by the source Observable during * sequential time windows of a specified duration. *

* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval * whereas {@link #throttleFirst} does not tick, it just tracks passage of time. *

* * * @param intervalDuration * duration of windows within which the last item emitted by the source Observable * will be emitted * @param unit * the unit of time of {@code intervalDuration} * @return an Observable that performs the throttle operation * @see RxJava Wiki: throttleLast() * @see #sample(long, TimeUnit) */ public final Observable throttleLast(long intervalDuration, TimeUnit unit) { return sample(intervalDuration, unit); } /** * Returns an Observable that emits only the last item emitted by the source Observable during * sequential time windows of a specified duration, where the duration is governed by a * specified Scheduler. *

* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval * whereas {@link #throttleFirst} does not tick, it just tracks passage of time. *

* * * @param intervalDuration * duration of windows within which the last item emitted by the source Observable * will be emitted * @param unit * the unit of time of {@code intervalDuration} * @param scheduler * the {@link Scheduler} to use internally to manage the timers that handle timeout * for each event * @return an Observable that performs the throttle operation * @see RxJava Wiki: throttleLast() * @see #sample(long, TimeUnit, Scheduler) */ public final Observable throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) { return sample(intervalDuration, unit, scheduler); } /** * Returns an Observable that only emits those items emitted by the source Observable that are * not followed by another emitted item within a specified time window. *

* Note: If the source Observable keeps emitting items more frequently than the length * of the time window then no items will be emitted by the resulting Observable. *

* *

* Information on debounce vs throttle: *

*

* * @param timeout * the length of the window of time that must pass after the emission of an item from * the source Observable in which that Observable emits no items in order for the * item to be emitted by the resulting Observable * @param unit * the {@link TimeUnit} of {@code timeout} * @return an Observable that filters out items that are too quickly followed by newer items * @see RxJava Wiki: throttleWithTimeout() * @see #debounce(long, TimeUnit) */ public final Observable throttleWithTimeout(long timeout, TimeUnit unit) { return create(OperationDebounce.debounce(this, timeout, unit)); } /** * Returns an Observable that only emits those items emitted by the source Observable that are * not followed by another emitted item within a specified time window, where the time window * is governed by a specified Scheduler. *

* Note: If the source Observable keeps emitting items more frequently than the length * of the time window then no items will be emitted by the resulting Observable. *

* *

* Information on debounce vs throttle: *

*

* * @param timeout * the length of the window of time that must pass after the emission of an item from * the source Observable in which that Observable emits no items in order for the * item to be emitted by the resulting Observable * @param unit * the {@link TimeUnit} of {@code timeout} * @param scheduler * the {@link Scheduler} to use internally to manage the timers that handle the * timeout for each item * @return an Observable that filters out items that are too quickly followed by newer items * @see RxJava Wiki: throttleWithTimeout() * @see #debounce(long, TimeUnit, Scheduler) */ public final Observable throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) { return create(OperationDebounce.debounce(this, timeout, unit, scheduler)); } /** * Returns an Observable that emits records of the time interval between consecutive items * emitted by the source Observable. *

* * * @return an Observable that emits time interval information items * @see RxJava Wiki: timeInterval() * @see MSDN: Observable.TimeInterval */ public final Observable> timeInterval() { return create(OperationTimeInterval.timeInterval(this)); } /** * Returns an Observable that emits records of the time interval between consecutive items * emitted by the source Observable, where this interval is computed on a specified Scheduler. *

* * * @param scheduler * the {@link Scheduler} used to compute time intervals * @return an Observable that emits time interval information items * @see RxJava Wiki: timeInterval() * @see MSDN: Observable.TimeInterval */ public final Observable> timeInterval(Scheduler scheduler) { return create(OperationTimeInterval.timeInterval(this, scheduler)); } /** * Returns an Observable that completes if either the first item emitted by the source * Observable or any subsequent item don't arrive within time windows defined by other * Observables. *

* * * @param * the first timeout value type (ignored) * @param * the subsequent timeout value type (ignored) * @param firstTimeoutSelector * a function that returns an Observable that determines the timeout window for the * first source item * @param timeoutSelector * a function that returns an Observable for each item emitted by the source * Observable and that determines the timeout window in which the subsequent source * item must arrive in order to continue the sequence * @return an Observable that completes if either the first item or any subsequent item doesn't * arrive within the time windows specified by the timeout selectors */ public final Observable timeout(Func0> firstTimeoutSelector, Func1> timeoutSelector) { if (firstTimeoutSelector == null) { throw new NullPointerException("firstTimeoutSelector"); } return timeout(firstTimeoutSelector, timeoutSelector, Observable. empty()); } /** * Returns an Observable that mirrors the source Observable, but switches to a fallback * Observable if either the first item emitted by the source Observable or any subsequent item * don't arrive within time windows defined by other Observables. *

* * * @param * the first timeout value type (ignored) * @param * the subsequent timeout value type (ignored) * @param firstTimeoutSelector * a function that returns an Observable which determines the timeout window for the * first source item * @param timeoutSelector * a function that returns an Observable for each item emitted by the source * Observable and that determines the timeout window in which the subsequent source * item must arrive in order to continue the sequence * @param other * the fallback Observable to switch to if the source Observable times out * @return an Observable that mirrors the source Observable, but switches to the {@code other} Observable if either the first item emitted by the source Observable or any * subsequent item don't arrive within time windows defined by the timeout selectors */ public final Observable timeout(Func0> firstTimeoutSelector, Func1> timeoutSelector, Observable other) { if (firstTimeoutSelector == null) { throw new NullPointerException("firstTimeoutSelector"); } if (other == null) { throw new NullPointerException("other"); } return create(OperationTimeout.timeoutSelector(this, firstTimeoutSelector, timeoutSelector, other)); } /** * Returns an Observable that mirrors the source Observable, but completes if an item emitted by * the source Observable doesn't arrive within a window of time after the emission of the * previous item, where that period of time is measured by an Observable that is a function * of the previous item. *

* *

* The arrival of the first source item is never timed out. * * @param * the timeout value type (ignored) * @param timeoutSelector * a function that returns an observable for each item emitted by the source * Observable and that determines the timeout window for the subsequent item * @return an Observable that mirrors the source Observable, but completes if a item emitted by * the source Observable takes longer to arrive than the time window defined by the * selector for the previously emitted item */ public final Observable timeout(Func1> timeoutSelector) { return timeout(timeoutSelector, Observable. empty()); } /** * Returns an Observable that mirrors the source Observable, but that switches to a fallback * Observable if an item emitted by the source Observable doesn't arrive within a window of time * after the emission of the previous item, where that period of time is measured by an * Observable that is a function of the previous item. *

* *

* The arrival of the first source item is never timed out. * * @param * the timeout value type (ignored) * @param timeoutSelector * a function that returns an observable for each item emitted by the source * Observable and that determines the timeout window for the subsequent item * @param other * the fallback Observable to switch to if the source Observable times out * @return an Observable that mirrors the source Observable, but switches to mirroring a * fallback Observable if a item emitted by the source Observable takes longer to arrive * than the time window defined by the selector for the previously emitted item */ public final Observable timeout(Func1> timeoutSelector, Observable other) { if (other == null) { throw new NullPointerException("other"); } return create(OperationTimeout.timeoutSelector(this, null, timeoutSelector, other)); } /** * Returns an Observable that mirrors the source Observable but applies a timeout policy for * each emitted item. If the next item isn't emitted within the specified timeout duration * starting from its predecessor, the resulting Observable terminates and notifies observers of * a {@code TimeoutException}. *

* * * @param timeout * maximum duration between emitted items before a timeout occurs * @param timeUnit * the unit of time that applies to the {@code timeout} argument. * @return the source Observable modified to notify observers of a {@code TimeoutException} in * case of a timeout * @see RxJava Wiki: timeout() * @see MSDN: Observable.Timeout */ public final Observable timeout(long timeout, TimeUnit timeUnit) { return create(OperationTimeout.timeout(this, timeout, timeUnit)); } /** * Returns an Observable that mirrors the source Observable but applies a timeout policy for * each emitted item. If the next item isn't emitted within the specified timeout duration * starting from its predecessor, the resulting Observable begins instead to mirror a fallback * Observable. *

* * * @param timeout * maximum duration between items before a timeout occurs * @param timeUnit * the unit of time that applies to the {@code timeout} argument * @param other * the fallback Observable to use in case of a timeout * @return the source Observable modified to switch to the fallback Observable in case of a * timeout * @see RxJava Wiki: timeout() * @see MSDN: Observable.Timeout */ public final Observable timeout(long timeout, TimeUnit timeUnit, Observable other) { return create(OperationTimeout.timeout(this, timeout, timeUnit, other)); } /** * Returns an Observable that mirrors the source Observable but applies a timeout policy for * each emitted item using a specified Scheduler. If the next item isn't emitted within the * specified timeout duration starting from its predecessor, the resulting Observable begins * instead to mirror a fallback Observable. *

* * * @param timeout * maximum duration between items before a timeout occurs * @param timeUnit * the unit of time that applies to the {@code timeout} argument * @param other * the Observable to use as the fallback in case of a timeout * @param scheduler * the {@link Scheduler} to run the timeout timers on * @return the source Observable modified so that it will switch to the fallback Observable in * case of a timeout * @see RxJava Wiki: timeout() * @see MSDN: Observable.Timeout */ public final Observable timeout(long timeout, TimeUnit timeUnit, Observable other, Scheduler scheduler) { return create(OperationTimeout.timeout(this, timeout, timeUnit, other, scheduler)); } /** * Returns an Observable that mirrors the source Observable but applies a timeout policy for * each emitted item, where this policy is governed on a specified Scheduler. If the next item * isn't emitted within the specified timeout duration starting from its predecessor, the * resulting Observable terminates and notifies observers of a {@code TimeoutException}. *

* * * @param timeout * maximum duration between items before a timeout occurs * @param timeUnit * the unit of time that applies to the {@code timeout} argument * @param scheduler * the Scheduler to run the timeout timers on * @return the source Observable modified to notify observers of a {@code TimeoutException} in * case of a timeout * @see RxJava Wiki: timeout() * @see MSDN: Observable.Timeout */ public final Observable timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) { return create(OperationTimeout.timeout(this, timeout, timeUnit, scheduler)); } /** * Returns an Observable that emits each item emitted by the source Observable, wrapped in a {@link Timestamped} object. *

* * * @return an Observable that emits timestamped items from the source Observable * @see RxJava Wiki: timestamp() * @see MSDN: Observable.Timestamp */ public final Observable> timestamp() { return timestamp(Schedulers.immediate()); } /** * Returns an Observable that emits each item emitted by the source Observable, wrapped in a {@link Timestamped} object whose timestamps are provided by a specified Scheduler. *

* * * @param scheduler * the {@link Scheduler} to use as a time source * @return an Observable that emits timestamped items from the source Observable with timestamps * provided by the {@code scheduler} * @see RxJava Wiki: timestamp() * @see MSDN: Observable.Timestamp */ public final Observable> timestamp(Scheduler scheduler) { return bind(new OperatorTimestamp(scheduler)); } /** * Converts an Observable into a {@link BlockingObservable} (an Observable with blocking * operators). * * @return a {@code BlockingObservable} version of this Observable * @see RxJava Wiki: Blocking Observable Operators */ public final BlockingObservable toBlockingObservable() { return BlockingObservable.from(this); } /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. *

* *

* Normally, an Observable that returns multiple items will do so by invoking its {@link Observer}'s {@link Observer#onNext onNext} method for each such item. You can change * this behavior, instructing the Observable to compose a list of all of these items and then to * invoke the Observer's {@code onNext} function once, passing it the entire list, by calling * the Observable's {@code toList} method prior to calling its {@link #subscribe} method. *

* Be careful not to use this operator on Observables that emit infinite or very large numbers * of items, as you do not have the option to unsubscribe. * * @return an Observable that emits a single item: a List containing all of the items emitted by * the source Observable. * @see RxJava Wiki: toList() */ public final Observable> toList() { return bind(new OperatorToObservableList()); } /** * Return an Observable that emits a single HashMap containing all items emitted by the source * Observable, mapped by the keys returned by a specified {@code keySelector} function. *

* *

* If more than one source item maps to the same key, the HashMap will contain the latest of * those items. * * @param keySelector * the function that extracts the key from a source item to be used in the HashMap * @return an Observable that emits a single item: a HashMap containing the mapped items from * the source Observable * @see RxJava Wiki: toMap() * @see MSDN: Observable.ToDictionary */ public final Observable> toMap(Func1 keySelector) { return create(OperationToMap.toMap(this, keySelector)); } /** * Return an Observable that emits a single HashMap containing values corresponding to items * emitted by the source Observable, mapped by the keys returned by a specified {@code keySelector} function. *

* *

* If more than one source item maps to the same key, the HashMap will contain a single entry * that corresponds to the latest of those items. * * @param keySelector * the function that extracts the key from a source item to be used in the HashMap * @param valueSelector * the function that extracts the value from a source item to be used in the HashMap * @return an Observable that emits a single item: a HashMap containing the mapped items from * the source Observable * @see RxJava Wiki: toMap() * @see MSDN: Observable.ToDictionary */ public final Observable> toMap(Func1 keySelector, Func1 valueSelector) { return create(OperationToMap.toMap(this, keySelector, valueSelector)); } /** * Return an Observable that emits a single Map, returned by a specified {@code mapFactory} function, that contains keys and values extracted from the items emitted by the source * Observable. *

* * * @param keySelector * the function that extracts the key from a source item to be used in the Map * @param valueSelector * the function that extracts the value from the source items to be used as value in * the Map * @param mapFactory * the function that returns a Map instance to be used * @return an Observable that emits a single item: a Map that contains the mapped items emitted * by the source Observable * @see RxJava Wiki: toMap() */ public final Observable> toMap(Func1 keySelector, Func1 valueSelector, Func0> mapFactory) { return create(OperationToMap.toMap(this, keySelector, valueSelector, mapFactory)); } /** * Return an Observable that emits a single HashMap that contains an ArrayList of items emitted * by the source Observable keyed by a specified {@code keySelector} function. *

* * * @param keySelector * the function that extracts the key from the source items to be used as key in the * HashMap * @return an Observable that emits a single item: a HashMap that contains an ArrayList of items * mapped from the source Observable * @see RxJava Wiki: toMap() * @see MSDN: Observable.ToLookup */ public final Observable>> toMultimap(Func1 keySelector) { return create(OperationToMultimap.toMultimap(this, keySelector)); } /** * Return an Observable that emits a single HashMap that contains an ArrayList of values * extracted by a specified {@code valueSelector} function from items emitted by the source * Observable, keyed by a specified {@code keySelector} function. *

* * * @param keySelector * the function that extracts a key from the source items to be used as key in the * HashMap * @param valueSelector * the function that extracts a value from the source items to be used as value in * the HashMap * @return an Observable that emits a single item: a HashMap that contains an ArrayList of items * mapped from the source Observable * @see RxJava Wiki: toMap() * @see MSDN: Observable.ToLookup */ public final Observable>> toMultimap(Func1 keySelector, Func1 valueSelector) { return create(OperationToMultimap.toMultimap(this, keySelector, valueSelector)); } /** * Return an Observable that emits a single Map, returned by a specified {@code mapFactory} function, that contains an ArrayList of values, extracted by a specified {@code valueSelector} function * from items emitted by the source Observable and keyed by the {@code keySelector} function. *

* * * @param keySelector * the function that extracts a key from the source items to be used as the key in * the Map * @param valueSelector * the function that extracts a value from the source items to be used as the value * in the Map * @param mapFactory * the function that returns a Map instance to be used * @return an Observable that emits a single item: a Map that contains a list items mapped * from the source Observable * @see RxJava Wiki: toMap() */ public final Observable>> toMultimap(Func1 keySelector, Func1 valueSelector, Func0>> mapFactory) { return create(OperationToMultimap.toMultimap(this, keySelector, valueSelector, mapFactory)); } /** * Return an Observable that emits a single Map, returned by a specified {@code mapFactory} function, that contains a custom collection of values, extracted by a specified {@code valueSelector} * function from items emitted by the source Observable, and keyed by the {@code keySelector} function. *

* * * @param keySelector * the function that extracts a key from the source items to be used as the key in * the Map * @param valueSelector * the function that extracts a value from the source items to be used as the value * in the Map * @param mapFactory * the function that returns a Map instance to be used * @param collectionFactory * the function that returns a Collection instance for a particular key to be used in * the Map * @return an Observable that emits a single item: a Map that contains the collection of mapped * items from the source Observable * @see RxJava Wiki: toMap() */ public final Observable>> toMultimap(Func1 keySelector, Func1 valueSelector, Func0>> mapFactory, Func1> collectionFactory) { return create(OperationToMultimap.toMultimap(this, keySelector, valueSelector, mapFactory, collectionFactory)); } /** * Returns an Observable that emits a list that contains the items emitted by the source * Observable, in a sorted order. Each item emitted by the Observable must implement {@link Comparable} with respect to all other items in the sequence. *

* * * @throws ClassCastException * if any item emitted by the Observable does not implement {@link Comparable} with * respect to all other items emitted by the Observable * @return an Observable that emits a list that contains the items emitted by the source * Observable in sorted order * @see RxJava Wiki: toSortedList() */ public final Observable> toSortedList() { return bind(new OperatorToObservableSortedList()); } /** * Returns an Observable that emits a list that contains the items emitted by the source * Observable, in a sorted order based on a specified comparison function. *

* * * @param sortFunction * a function that compares two items emitted by the source Observable and returns an * Integer that indicates their sort order * @return an Observable that emits a list that contains the items emitted by the source * Observable in sorted order * @see RxJava Wiki: toSortedList() */ public final Observable> toSortedList(Func2 sortFunction) { return bind(new OperatorToObservableSortedList(sortFunction)); } /** * Returns an Observable that represents a filtered version of the source Observable. *

* * * @param predicate * a function that evaluates an item emitted by the source Observable, returning {@code true} if it passes the filter * @return an Observable that emits only those items emitted by the source Observable that the * filter evaluates as {@code true} * @see RxJava Wiki: where() * @see #filter(Func1) */ @Deprecated public final Observable where(Func1 predicate) { return filter(predicate); } /** * Returns an Observable that emits windows of items it collects from the source Observable. * The resulting Observable emits connected, non-overlapping windows. It emits the current * window and opens a new one when the Observable produced by the specified {@code closingSelector} emits an item. The {@code closingSelector} then creates a new * Observable to generate the closer of the next window. *

* * * @param closingSelector * a {@link Func0} that produces an Observable for every window created. When this * Observable emits an item, {@code window()} emits the associated window and begins * a new one. * @return an Observable that emits connected, non-overlapping windows of items from the source * Observable when {@code closingSelector} emits an item * @see RxJava Wiki: window() */ public final Observable> window(Func0> closingSelector) { return create(OperationWindow.window(this, closingSelector)); } /** * Returns an Observable that emits windows of items it collects from the source Observable. * The resulting Observable emits connected, non-overlapping windows, each containing {@code count} items. When the source Observable completes or encounters an error, the * resulting Observable emits the current window and propagates the notification from the source * Observable. *

* * * @param count * the maximum size of each window before it should be emitted * @return an Observable that emits connected, non-overlapping windows, each containing at most {@code count} items from the source Observable * @see RxJava Wiki: window() */ public final Observable> window(int count) { return create(OperationWindow.window(this, count)); } /** * Returns an Observable that emits windows of items it collects from the source Observable. * The resulting Observable emits windows every {@code skip} items, each containing * no more than {@code count} items. When the source Observable completes or encounters an * error, the resulting Observable emits the current window and propagates the notification from * the source Observable. *

* * * @param count * the maximum size of each window before it should be emitted * @param skip * how many items need to be skipped before starting a new window. Note that if {@code skip} and {@code count} are equal this is the same operation as {@link #window(int)}. * @return an Observable that emits windows every {@code skip} items containing at most {@code count} items from the source Observable * @see RxJava Wiki: window() */ public final Observable> window(int count, int skip) { return create(OperationWindow.window(this, count, skip)); } /** * Returns an Observable that emits windows of items it collects from the source Observable. * The resulting Observable starts a new window periodically, as determined by the {@code timeshift} argument. It emits each window after a fixed timespan, specified by the {@code timespan} * argument. When the source Observable completes or Observable completes or * encounters an error, the resulting Observable emits the current window and propagates the * notification from the source Observable. *

* * * @param timespan * the period of time each window collects items before it should be emitted * @param timeshift * the period of time after which a new window will be created * @param unit * the unit of time that applies to the {@code timespan} and {@code timeshift} arguments * @return an Observable that emits new windows periodically as a fixed timespan elapses * @see RxJava Wiki: window() */ public final Observable> window(long timespan, long timeshift, TimeUnit unit) { return create(OperationWindow.window(this, timespan, timeshift, unit)); } /** * Returns an Observable that emits windows of items it collects from the source Observable. * The resulting Observable starts a new window periodically, as determined by the {@code timeshift} argument. It emits each window after a fixed timespan, specified by the {@code timespan} * argument. When the source Observable completes or Observable completes or * encounters an error, the resulting Observable emits the current window and propagates the * notification from the source Observable. *

* * * @param timespan * the period of time each window collects items before it should be emitted * @param timeshift * the period of time after which a new window will be created * @param unit * the unit of time that applies to the {@code timespan} and {@code timeshift} arguments * @param scheduler * the {@link Scheduler} to use when determining the end and start of a window * @return an Observable that emits new windows periodically as a fixed timespan elapses * @see RxJava Wiki: window() */ public final Observable> window(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) { return create(OperationWindow.window(this, timespan, timeshift, unit, scheduler)); } /** * Returns an Observable that emits windows of items it collects from the source Observable. * The resulting Observable emits connected, non-overlapping windows, each of a fixed duration * specified by the {@code timespan} argument. When the source Observable completes or * encounters an error, the resulting Observable emits the current window and propagates the * notification from the source Observable. *

* * * @param timespan * the period of time each window collects items before it should be emitted and * replaced with a new window * @param unit * the unit of time that applies to the {@code timespan} argument * @return an Observable that emits connected, non-overlapping windows represending items * emitted by the source Observable during fixed, consecutive durations * @see RxJava Wiki: window() */ public final Observable> window(long timespan, TimeUnit unit) { return create(OperationWindow.window(this, timespan, unit)); } /** * Returns an Observable that emits windows of items it collects from the source Observable. * The resulting Observable emits connected, non-overlapping windows, each of a fixed duration * as specified by the {@code timespan} argument or a maximum size as specified by the {@code count} argument (whichever is reached first). When the source Observable completes or * encounters an error, the resulting Observable emits the current window and propagates the * notification from the source Observable. *

* * * @param timespan * the period of time each window collects items before it should be emitted and * replaced with a new window * @param unit * the unit of time that applies to the {@code timespan} argument * @param count * the maximum size of each window before it should be emitted * @return an Observable that emits connected, non-overlapping windows of items from the source * Observable that were emitted during a fixed duration of time or when the window has * reached maximum capacity (whichever occurs first) * @see RxJava Wiki: window() */ public final Observable> window(long timespan, TimeUnit unit, int count) { return create(OperationWindow.window(this, timespan, unit, count)); } /** * Returns an Observable that emits windows of items it collects from the source Observable. The * resulting Observable emits connected, non-overlapping windows, each of a fixed duration * specified by the {@code timespan} argument or a maximum size specified by the {@code count} argument (whichever is reached first). When the source Observable completes or encounters an * error, the resulting Observable emits the current window and propagates the notification from * the source Observable. *

* * * @param timespan * the period of time each window collects items before it should be emitted and * replaced with a new window * @param unit * the unit of time which applies to the {@code timespan} argument * @param count * the maximum size of each window before it should be emitted * @param scheduler * the {@link Scheduler} to use when determining the end and start of a window * @return an Observable that emits connected, non-overlapping windows of items from the source * Observable that were emitted during a fixed duration of time or when the window has * reached maximum capacity (whichever occurs first) * @see RxJava Wiki: window() */ public final Observable> window(long timespan, TimeUnit unit, int count, Scheduler scheduler) { return create(OperationWindow.window(this, timespan, unit, count, scheduler)); } /** * Returns an Observable that emits windows of items it collects from the source Observable. * The resulting Observable emits connected, non-overlapping windows, each of a fixed duration * as specified by the {@code timespan} argument. When the source Observable completes or * encounters an error, the resulting Observable emits the current window and propagates the * notification from the source Observable. *

* * * @param timespan * the period of time each window collects items before it should be emitted and * replaced with a new window * @param unit * the unit of time which applies to the {@code timespan} argument * @param scheduler * the {@link Scheduler} to use when determining the end and start of a window * @return an Observable that emits connected, non-overlapping windows containing items emitted * by the source Observable within a fixed duration * @see RxJava Wiki: window() */ public final Observable> window(long timespan, TimeUnit unit, Scheduler scheduler) { return create(OperationWindow.window(this, timespan, unit, scheduler)); } /** * Returns an Observable that emits windows of items it collects from the source Observable. * The resulting Observable emits windows that contain those items emitted by the source * Observable between the time when the {@code windowOpenings} Observable emits an item and when * the Observable returned by {@code closingSelector} emits an item. *

* * * @param windowOpenings * an Observable that, when it emits an item, causes another window to be created * @param closingSelector * a {@link Func1} that produces an Observable for every window created. When this * Observable emits an item, the associated window is closed and emitted * @return an Observable that emits windows of items emitted by the source Observable that are * governed by the specified window-governing Observables * @see RxJava Wiki: window() */ public final Observable> window(Observable windowOpenings, Func1> closingSelector) { return create(OperationWindow.window(this, windowOpenings, closingSelector)); } /** * Returns an Observable that emits non-overlapping windows of items it collects from the * source observable where the boundary of each window is determined by the items emitted from * a specified boundary-governing Observable. *

* * * @param * the window element type (ignored) * @param boundary * an Observable whose emitted items close and open windows * @return an Observable that emits non-overlapping windows of items it collects from the source * Observable where the boundary of each window is determined by the items emitted from * the {@code boundary} Observable */ public final Observable> window(Observable boundary) { return create(OperationWindow.window(this, boundary)); } /** * Returns an Observable that emits items that are the result of applying a specified function * to pairs of values, one each from the source Observable and a specified Iterable sequence. *

* *

* Note that the {@code other} Iterable is evaluated as items are observed from the source * Observable; it is not pre-consumed. This allows you to zip infinite streams on either side. * * @param * the type of items in the {@code other} Iterable * @param * the type of items emitted by the resulting Observable * @param other * the Iterable sequence * @param zipFunction * a function that combines the pairs of items from the Observable and the Iterable * to generate the items to be emitted by the resulting Observable * @return an Observable that pairs up values from the source Observable and the {@code other} Iterable sequence and emits the results of {@code zipFunction} applied to these pairs */ public final Observable zip(Iterable other, Func2 zipFunction) { return create(OperationZip.zipIterable(this, other, zipFunction)); } /** * Returns an Observable that emits items that are the result of applying a specified function * to pairs of values, one each from the source Observable and another specified Observable. *

* * * @param * the type of items emitted by the {@code other} Observable * @param * the type of items emitted by the resulting Observable * @param other * the other Observable * @param zipFunction * a function that combines the pairs of items from the two Observables to generate * the items to be emitted by the resulting Observable * @return an Observable that pairs up values from the source Observable and the {@code other} Observable and emits the results of {@code zipFunction} applied to these pairs */ public final Observable zip(Observable other, Func2 zipFunction) { return zip(this, other, zipFunction); } /** * An Observable that never sends any information to an {@link Observer}. * * This Observable is useful primarily for testing purposes. * * @param * the type of item (not) emitted by the Observable */ private static class NeverObservable extends Observable { public NeverObservable() { super(new Action1>() { @Override public void call(Operator observer) { // do nothing } }); } } /** * An Observable that invokes {@link Observer#onError onError} when the {@link Observer} subscribes to it. * * @param * the type of item (ostensibly) emitted by the Observable */ private static class ThrowObservable extends Observable { public ThrowObservable(final Throwable exception) { super(new Action1>() { /** * Accepts an {@link Observer} and calls its {@link Observer#onError onError} method. * * @param observer * an {@link Observer} of this Observable * @return a reference to the subscription */ @Override public void call(Operator observer) { observer.onError(exception); } }); } } @SuppressWarnings("rawtypes") private final static ConcurrentHashMap internalClassMap = new ConcurrentHashMap(); /** * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. *

* For why this is being used see https://github.com/Netflix/RxJava/issues/216 for discussion on * "Guideline 6.4: Protect calls to user code from within an operator" *

* Note: If strong reasons for not depending on package names comes up then the implementation * of this method can change to looking for a marker interface. * * @param o * FIXME FIXME FIXME * @return {@code true} if the given function is an internal implementation, and {@code false} otherwise */ private boolean isInternalImplementation(Object o) { if (o == null) { return true; } // prevent double-wrapping (yeah it happens) if (o instanceof SafeObserver) { return true; } Class clazz = o.getClass(); if (internalClassMap.containsKey(clazz)) { //don't need to do reflection return internalClassMap.get(clazz); } else { // we treat the following package as "internal" and don't wrap it Package p = o.getClass().getPackage(); // it can be null Boolean isInternal = (p != null && p.getName().startsWith("rx.operators")); internalClassMap.put(clazz, isInternal); return isInternal; } } }