/** * Copyright 2013 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package rx; import static org.junit.Assert.*; import static rx.util.functions.Functions.*; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import rx.joins.Pattern2; import rx.joins.Plan0; import rx.observables.BlockingObservable; import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; import rx.operators.OperationAll; import rx.operators.OperationAmb; import rx.operators.OperationAny; import rx.operators.OperationAsObservable; import rx.operators.OperationAverage; import rx.operators.OperationBuffer; import rx.operators.OperationCache; import rx.operators.OperationCast; import rx.operators.OperationCombineLatest; import rx.operators.OperationConcat; import rx.operators.OperationConditionals; import rx.operators.OperationDebounce; import rx.operators.OperationDefaultIfEmpty; import rx.operators.OperationDefer; import rx.operators.OperationDelay; import rx.operators.OperationDematerialize; import rx.operators.OperationDistinct; import rx.operators.OperationDistinctUntilChanged; import rx.operators.OperationDoOnEach; import rx.operators.OperationElementAt; import rx.operators.OperationFilter; import rx.operators.OperationFinally; import rx.operators.OperationGroupBy; import rx.operators.OperationGroupByUntil; import rx.operators.OperationGroupJoin; import rx.operators.OperationInterval; import rx.operators.OperationJoin; import rx.operators.OperationJoinPatterns; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; import rx.operators.OperationMerge; import rx.operators.OperationMergeDelayError; import rx.operators.OperationMinMax; import rx.operators.OperationMulticast; import rx.operators.OperationObserveOn; import rx.operators.OperationOnErrorResumeNextViaFunction; import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationParallel; import rx.operators.OperationParallelMerge; import rx.operators.OperationRepeat; import rx.operators.OperationReplay; import rx.operators.OperationRetry; import rx.operators.OperationSample; import rx.operators.OperationScan; import rx.operators.OperationSequenceEqual; import rx.operators.OperationSingle; import rx.operators.OperationSkip; import rx.operators.OperationSkipLast; import rx.operators.OperationSkipUntil; import rx.operators.OperationSkipWhile; import rx.operators.OperationSubscribeOn; import rx.operators.OperationSum; import rx.operators.OperationSwitch; import rx.operators.OperationSynchronize; import rx.operators.OperationTake; import rx.operators.OperationTakeLast; import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; import rx.operators.OperationThrottleFirst; import rx.operators.OperationTimeInterval; import rx.operators.OperationTimeout; import rx.operators.OperationTimer; import rx.operators.OperationTimestamp; import rx.operators.OperationToMap; import rx.operators.OperationToMultimap; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; import rx.operators.OperationToObservableList; import rx.operators.OperationToObservableSortedList; import rx.operators.OperationUsing; import rx.operators.OperationWindow; import rx.operators.OperationZip; import rx.operators.SafeObservableSubscription; import rx.operators.SafeObserver; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.schedulers.Schedulers; import rx.subjects.AsyncSubject; import rx.subjects.PublishSubject; import rx.subjects.ReplaySubject; import rx.subjects.Subject; import rx.subscriptions.Subscriptions; import rx.util.OnErrorNotImplementedException; import rx.util.Range; import rx.util.TimeInterval; import rx.util.Timestamped; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Action2; import rx.util.functions.Func0; import rx.util.functions.Func1; import rx.util.functions.Func2; import rx.util.functions.Func3; import rx.util.functions.Func4; import rx.util.functions.Func5; import rx.util.functions.Func6; import rx.util.functions.Func7; import rx.util.functions.Func8; import rx.util.functions.Func9; import rx.util.functions.FuncN; import rx.util.functions.Function; import rx.util.functions.Functions; /** * The Observable interface that implements the Reactive Pattern. *

* This interface provides overloaded methods for subscribing as well as * delegate methods to the various operators. *

* The documentation for this interface makes use of marble diagrams. The * following legend explains these diagrams: *

* *

* For more information see the * RxJava Wiki * * @param the type of the item emitted by the Observable */ public class Observable { private final static ConcurrentHashMap internalClassMap = new ConcurrentHashMap(); /** * Executed when 'subscribe' is invoked. */ private final OnSubscribeFunc onSubscribe; /** * Function interface for work to be performed when an {@link Observable} * is subscribed to via {@link Observable#subscribe(Observer)} * * @param */ public static interface OnSubscribeFunc extends Function { public Subscription onSubscribe(Observer t1); } /** * Observable with Function to execute when subscribed to. *

* NOTE: Use {@link #create(OnSubscribeFunc)} to create an Observable * instead of this constructor unless you specifically have a need for * inheritance. * * @param onSubscribe {@link OnSubscribeFunc} to be executed when * {@link #subscribe(Observer)} is called */ protected Observable(OnSubscribeFunc onSubscribe) { this.onSubscribe = onSubscribe; } private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); /** * An {@link Observer} must call an Observable's {@code subscribe} method in * order to receive items and notifications from the Observable. *

* A typical implementation of {@code subscribe} does the following: *

    *
  1. It stores a reference to the Observer in a collection object, such as * a {@code List} object.
  2. *
  3. It returns a reference to the {@link Subscription} interface. This * enables Observers to unsubscribe, that is, to stop receiving items * and notifications before the Observable stops sending them, which * also invokes the Observer's {@link Observer#onCompleted onCompleted} * method.
  4. *

* An Observable<T> instance is responsible for accepting * all subscriptions and notifying all Observers. Unless the documentation * for a particular Observable<T> implementation * indicates otherwise, Observers should make no assumptions about the order * in which multiple Observers will receive their notifications. *

* For more information see the * RxJava Wiki * * @param observer the Observer * @return a {@link Subscription} reference with which the {@link Observer} * can stop receiving items before the Observable has finished * sending them * @throws IllegalArgumentException if the {@link Observer} provided as the * argument to {@code subscribe()} is * {@code null} */ public Subscription subscribe(Observer observer) { // allow the hook to intercept and/or decorate OnSubscribeFunc onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe); // validate and proceed if (observer == null) { throw new IllegalArgumentException("observer can not be null"); } if (onSubscribeFunction == null) { throw new IllegalStateException("onSubscribe function can not be null."); // the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception } try { /** * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ if (isInternalImplementation(observer)) { Subscription s = onSubscribeFunction.onSubscribe(observer); if (s == null) { // this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens // we want to gracefully handle it the same as AtomicObservableSubscription does return hook.onSubscribeReturn(this, Subscriptions.empty()); } else { return hook.onSubscribeReturn(this, s); } } else { SafeObservableSubscription subscription = new SafeObservableSubscription(); subscription.wrap(onSubscribeFunction.onSubscribe(new SafeObserver(subscription, observer))); return hook.onSubscribeReturn(this, subscription); } } catch (OnErrorNotImplementedException e) { // special handling when onError is not implemented ... we just rethrow throw e; } catch (Throwable e) { // if an unhandled error occurs executing the onSubscribe we will propagate it try { observer.onError(hook.onSubscribeError(this, e)); } catch (OnErrorNotImplementedException e2) { // special handling when onError is not implemented ... we just rethrow throw e2; } catch (Throwable e2) { // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); hook.onSubscribeError(this, r); throw r; } return Subscriptions.empty(); } } /** * An {@link Observer} must call an Observable's {@code subscribe} method in * order to receive items and notifications from the Observable. *

* A typical implementation of {@code subscribe} does the following: *

    *
  1. It stores a reference to the Observer in a collection object, such as * a {@code List} object.
  2. *
  3. It returns a reference to the {@link Subscription} interface. This * enables Observers to unsubscribe, that is, to stop receiving items * and notifications before the Observable stops sending them, which * also invokes the Observer's {@link Observer#onCompleted onCompleted} * method.
  4. *

* An {@code Observable} instance is responsible for accepting all * subscriptions and notifying all Observers. Unless the documentation for a * particular {@code Observable} implementation indicates otherwise, * Observers should make no assumptions about the order in which multiple * Observers will receive their notifications. *

* For more information see the * RxJava Wiki * * @param observer the Observer * @param scheduler the {@link Scheduler} on which Observers subscribe to * the Observable * @return a {@link Subscription} reference with which Observers can stop * receiving items and notifications before the Observable has * finished sending them * @throws IllegalArgumentException if an argument to {@code subscribe()} * is {@code null} */ public Subscription subscribe(Observer observer, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(observer); } /** * Protects against errors being thrown from Observer implementations and * ensures onNext/onError/onCompleted contract compliance. *

* See https://github.com/Netflix/RxJava/issues/216 for a discussion on * "Guideline 6.4: Protect calls to user code from within an operator" */ private Subscription protectivelyWrapAndSubscribe(Observer o) { SafeObservableSubscription subscription = new SafeObservableSubscription(); return subscription.wrap(subscribe(new SafeObserver(subscription, o))); } /** * Subscribe and ignore all events. * * @return */ public Subscription subscribe() { return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { // do nothing } @Override public void onError(Throwable e) { handleError(e); throw new OnErrorNotImplementedException(e); } @Override public void onNext(T args) { // do nothing } }); } /** * An {@link Observer} must call an Observable's {@code subscribe} method * in order to receive items and notifications from the Observable. * * @param onNext * @return * @see RxJava Wiki: onNext, onCompleted, and onError */ public Subscription subscribe(final Action1 onNext) { if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { // do nothing } @Override public void onError(Throwable e) { handleError(e); throw new OnErrorNotImplementedException(e); } @Override public void onNext(T args) { onNext.call(args); } }); } /** * An {@link Observer} must call an Observable's {@code subscribe} method in * order to receive items and notifications from the Observable. * * @param onNext * @param scheduler * @return * @see RxJava Wiki: onNext, onCompleted, and onError */ public Subscription subscribe(final Action1 onNext, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext); } /** * An {@link Observer} must call an Observable's {@code subscribe} method in * order to receive items and notifications from the Observable. * * @param onNext * @param onError * @return * @see RxJava Wiki: onNext, onCompleted, and onError */ public Subscription subscribe(final Action1 onNext, final Action1 onError) { if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } if (onError == null) { throw new IllegalArgumentException("onError can not be null"); } /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on * "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { // do nothing } @Override public void onError(Throwable e) { handleError(e); onError.call(e); } @Override public void onNext(T args) { onNext.call(args); } }); } /** * An {@link Observer} must call an Observable's {@code subscribe} method in * order to receive items and notifications from the Observable. * * @param onNext * @param onError * @param scheduler * @return * @see RxJava Wiki: onNext, onCompleted, and onError */ public Subscription subscribe(final Action1 onNext, final Action1 onError, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError); } /** * An {@link Observer} must call an Observable's {@code subscribe} method in * order to receive items and notifications from the Observable. * * @param onNext * @param onError * @param onComplete * @return * @see RxJava Wiki: onNext, onCompleted, and onError */ public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) { if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } if (onError == null) { throw new IllegalArgumentException("onError can not be null"); } if (onComplete == null) { throw new IllegalArgumentException("onComplete can not be null"); } /** * Wrapping since raw functions provided by the user are being invoked. * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override public void onCompleted() { onComplete.call(); } @Override public void onError(Throwable e) { handleError(e); onError.call(e); } @Override public void onNext(T args) { onNext.call(args); } }); } /** * An {@link Observer} must call an Observable's {@code subscribe} method in * order to receive items and notifications from the Observable. * * @param onNext * @param onError * @param onComplete * @param scheduler * @return * @see RxJava Wiki: onNext, onCompleted, and onError */ public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete, Scheduler scheduler) { return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); } /** * Hides the identity of this observable. * @return an Observable hiding the identity of this Observable. */ public Observable asObservable() { return create(new OperationAsObservable(this)); } /** * Returns a {@link ConnectableObservable} that upon connection causes the * source Observable to push results into the specified subject. * * @param subject the {@link Subject} for the {@link ConnectableObservable} * to push source items into * @param result type * @return a {@link ConnectableObservable} that upon connection causes the * source Observable to push results into the specified * {@link Subject} * @see RxJava Wiki: Observable.publish() and Observable.multicast() */ public ConnectableObservable multicast(Subject subject) { return OperationMulticast.multicast(this, subject); } /** * Returns an observable sequence that contains the elements of a sequence * produced by multicasting the source sequence within a selector function. * * @param subjectFactory the subject factory * @param selector the selector function which can use the multicasted * source sequence subject to the policies enforced by the * created subject * @return the Observable sequence that contains the elements of a sequence * produced by multicasting the source sequence within a selector * function * @see RxJava: Observable.publish() and Observable.multicast() * @see MSDN: Observable.Multicast */ public Observable multicast( final Func0> subjectFactory, final Func1, ? extends Observable> selector) { return OperationMulticast.multicast(this, subjectFactory, selector); } /** * Allow the {@link RxJavaErrorHandler} to receive the exception from * onError. * * @param e */ private void handleError(Throwable e) { // onError should be rare so we'll only fetch when needed RxJavaPlugins.getInstance().getErrorHandler().handleError(e); } /** * An Observable that never sends any information to an {@link Observer}. * * This Observable is useful primarily for testing purposes. * * @param the type of item emitted by the Observable */ private static class NeverObservable extends Observable { public NeverObservable() { super(new OnSubscribeFunc() { @Override public Subscription onSubscribe(Observer t1) { return Subscriptions.empty(); } }); } } /** * An Observable that invokes {@link Observer#onError onError} when the * {@link Observer} subscribes to it. * * @param the type of item emitted by the Observable */ private static class ThrowObservable extends Observable { public ThrowObservable(final Throwable exception) { super(new OnSubscribeFunc() { /** * Accepts an {@link Observer} and calls its * {@link Observer#onError onError} method. * * @param observer an {@link Observer} of this Observable * @return a reference to the subscription */ @Override public Subscription onSubscribe(Observer observer) { observer.onError(exception); return Subscriptions.empty(); } }); } } /** * Creates an Observable that will execute the given function when an * {@link Observer} subscribes to it. *

* *

* Write the function you pass to create so that it behaves as * an Observable: It should invoke the Observer's * {@link Observer#onNext onNext}, {@link Observer#onError onError}, and * {@link Observer#onCompleted onCompleted} methods appropriately. *

* A well-formed Observable must invoke either the Observer's * onCompleted method exactly once or its onError * method exactly once. *

* See Rx Design * Guidelines (PDF) for detailed information. * * @param the type of the items that this Observable emits * @param func a function that accepts an {@code Observer}, invokes its * {@code onNext}, {@code onError}, and {@code onCompleted} * methods as appropriate, and returns a {@link Subscription} to * allow the Observer to cancel the subscription * @return an Observable that, when an {@link Observer} subscribes to it, * will execute the given function * @see RxJava Wiki: create() * @see MSDN: Observable.Create */ public static Observable create(OnSubscribeFunc func) { return new Observable(func); } /** * Returns an Observable that emits no items to the {@link Observer} and * immediately invokes its {@link Observer#onCompleted onCompleted} method. *

* * * @param the type of the items (ostensibly) emitted by the Observable * @return an Observable that returns no data to the {@link Observer} and * immediately invokes the {@link Observer}'s * {@link Observer#onCompleted() onCompleted} method * @see RxJava Wiki: empty() * @see MSDN: Observable.Empty */ public static Observable empty() { return from(new ArrayList()); } /** * Returns an Observable that emits no items to the {@link Observer} and * immediately invokes its {@link Observer#onCompleted onCompleted} method * with the specified scheduler. *

* * * @param scheduler the scheduler to call the {@link Observer#onCompleted onCompleted} method * @param the type of the items (ostensibly) emitted by the Observable * @return an Observable that returns no data to the {@link Observer} and * immediately invokes the {@link Observer}'s * {@link Observer#onCompleted() onCompleted} method with the * specified scheduler * @see RxJava Wiki: empty() * @see MSDN: Observable.Empty Method (IScheduler) */ public static Observable empty(Scheduler scheduler) { return Observable. empty().subscribeOn(scheduler); } /** * Returns an Observable that invokes an {@link Observer}'s * {@link Observer#onError onError} method when the Observer subscribes to * it. *

* * * @param exception the particular error to report * @param the type of the items (ostensibly) emitted by the Observable * @return an Observable that invokes the {@link Observer}'s * {@link Observer#onError onError} method when the Observer * subscribes to it * @see RxJava Wiki: error() * @see MSDN: Observable.Throw */ public static Observable error(Throwable exception) { return new ThrowObservable(exception); } /** * Returns an Observable that invokes an {@link Observer}'s * {@link Observer#onError onError} method with the specified scheduler. *

* * * @param exception the particular error to report * @param scheduler the scheduler to call the * {@link Observer#onError onError} method * @param the type of the items (ostensibly) emitted by the Observable * @return an Observable that invokes the {@link Observer}'s * {@link Observer#onError onError} method with the specified * scheduler * @see RxJava Wiki: error() * @see MSDN: Observable.Throw */ public static Observable error(Throwable exception, Scheduler scheduler) { return Observable. error(exception).subscribeOn(scheduler); } /** * Converts an {@link Iterable} sequence into an Observable. *

* *

* Note: the entire iterable sequence is immediately emitted each time an * {@link Observer} subscribes. Since this occurs before the * {@link Subscription} is returned, it is not possible to unsubscribe from * the sequence before it completes. * * @param iterable the source {@link Iterable} sequence * @param the type of items in the {@link Iterable} sequence and the * type of items to be emitted by the resulting Observable * @return an Observable that emits each item in the source {@link Iterable} * sequence * @see RxJava Wiki: from() */ public static Observable from(Iterable iterable) { return from(iterable, Schedulers.currentThread()); } /** * Converts an {@link Iterable} sequence into an Observable with the * specified scheduler. *

* * * @param iterable the source {@link Iterable} sequence * @param scheduler the scheduler to emit the items of the iterable * @param the type of items in the {@link Iterable} sequence and the * type of items to be emitted by the resulting Observable * @return an Observable that emits each item in the source {@link Iterable} * sequence with the specified scheduler * @see RxJava Wiki: from() * @see MSDN: Observable.ToObservable */ public static Observable from(Iterable iterable, Scheduler scheduler) { return create(OperationToObservableIterable.toObservableIterable(iterable, scheduler)); } /** * Converts an Array into an Observable that emits the items in the Array. *

* *

* Note: the entire array is immediately emitted each time an * {@link Observer} subscribes. Since this occurs before the * {@link Subscription} is returned, it is not possible to unsubscribe from * the sequence before it completes. * * @param items the source array * @param the type of items in the Array and the type of items to be * emitted by the resulting Observable * @return an Observable that emits each item in the source Array * @see RxJava Wiki: from() */ public static Observable from(T[] items) { return from(Arrays.asList(items)); } /** * Converts an Array into an Observable. *

* *

* Note: the entire array is immediately emitted each time an * {@link Observer} subscribes. Since this occurs before the * {@link Subscription} is returned, it is not possible to unsubscribe from * the sequence before it completes. * * @param items the source array * @param scheduler the scheduler to emit the items of the array * @param the type of items in the Array and the type of items to be * emitted by the resulting Observable * @return an Observable that emits each item in the source Array * @see RxJava Wiki: from() */ public static Observable from(T[] items, Scheduler scheduler) { return from(Arrays.asList(items), scheduler); } /** * Converts an item into an Observable that emits that item. *

* *

* Note: the item is immediately emitted each time an {@link Observer} * subscribes. Since this occurs before the {@link Subscription} is * returned, it is not possible to unsubscribe from the sequence before it * completes. * * @param t1 the item * @param the type of the item, and the type of the item to be * emitted by the resulting Observable * @return an Observable that emits the item * @see RxJava Wiki: from() */ @SuppressWarnings("unchecked") // suppress unchecked because we are using varargs inside the method public static Observable from(T t1) { return from(Arrays.asList(t1)); } /** * Converts a series of items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} * subscribes. Since this occurs before the {@link Subscription} is * returned, it is not possible to unsubscribe from the sequence before it * completes. * * @param t1 first item * @param t2 second item * @param the type of items, and the type of items to be emitted by the * resulting Observable * @return an Observable that emits each item * @see RxJava Wiki: from() */ @SuppressWarnings("unchecked") // suppress unchecked because we are using varargs inside the method public static Observable from(T t1, T t2) { return from(Arrays.asList(t1, t2)); } /** * Converts a series of items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} * subscribes. Since this occurs before the {@link Subscription} is * returned, it is not possible to unsubscribe from the sequence before it * completes. * * @param t1 first item * @param t2 second item * @param t3 third item * @param the type of items, and the type of items to be emitted by the * resulting Observable * @return an Observable that emits each item * @see RxJava Wiki: from() */ @SuppressWarnings("unchecked") // suppress unchecked because we are using varargs inside the method public static Observable from(T t1, T t2, T t3) { return from(Arrays.asList(t1, t2, t3)); } /** * Converts a series of items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} * subscribes. Since this occurs before the {@link Subscription} is * returned, it is not possible to unsubscribe from the sequence before it * completes. * * @param t1 first item * @param t2 second item * @param t3 third item * @param t4 fourth item * @param the type of items, and the type of items to be emitted by the * resulting Observable * @return an Observable that emits each item * @see RxJava Wiki: from() */ @SuppressWarnings("unchecked") // suppress unchecked because we are using varargs inside the method public static Observable from(T t1, T t2, T t3, T t4) { return from(Arrays.asList(t1, t2, t3, t4)); } /** * Converts a series of items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} * subscribes. Since this occurs before the {@link Subscription} is * returned, it is not possible to unsubscribe from the sequence before it * completes. * * @param t1 first item * @param t2 second item * @param t3 third item * @param t4 fourth item * @param t5 fifth item * @param the type of items, and the type of items to be emitted by the * resulting Observable * @return an Observable that emits each item * @see RxJava Wiki: from() */ @SuppressWarnings("unchecked") // suppress unchecked because we are using varargs inside the method public static Observable from(T t1, T t2, T t3, T t4, T t5) { return from(Arrays.asList(t1, t2, t3, t4, t5)); } /** * Converts a series of items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} * subscribes. Since this occurs before the {@link Subscription} is * returned, it is not possible to unsubscribe from the sequence before it * completes. * * @param t1 first item * @param t2 second item * @param t3 third item * @param t4 fourth item * @param t5 fifth item * @param t6 sixth item * @param the type of items, and the type of items to be emitted by the * resulting Observable * @return an Observable that emits each item * @see RxJava Wiki: from() */ @SuppressWarnings("unchecked") // suppress unchecked because we are using varargs inside the method public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6) { return from(Arrays.asList(t1, t2, t3, t4, t5, t6)); } /** * Converts a series of items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} * subscribes. Since this occurs before the {@link Subscription} is * returned, it is not possible to unsubscribe from the sequence before it * completes. * * @param t1 first item * @param t2 second item * @param t3 third item * @param t4 fourth item * @param t5 fifth item * @param t6 sixth item * @param t7 seventh item * @param the type of items, and the type of items to be emitted by the * resulting Observable * @return an Observable that emits each item * @see RxJava Wiki: from() */ @SuppressWarnings("unchecked") // suppress unchecked because we are using varargs inside the method public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7) { return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7)); } /** * Converts a series of items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} * subscribes. Since this occurs before the {@link Subscription} is * returned, it is not possible to unsubscribe from the sequence before it * completes. * * @param t1 first item * @param t2 second item * @param t3 third item * @param t4 fourth item * @param t5 fifth item * @param t6 sixth item * @param t7 seventh item * @param t8 eighth item * @param the type of items, and the type of items to be emitted by the * resulting Observable * @return an Observable that emits each item * @see RxJava Wiki: from() */ @SuppressWarnings("unchecked") // suppress unchecked because we are using varargs inside the method public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) { return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8)); } /** * Converts a series of items into an Observable that emits those items. *

* *

* Note: the items will be immediately emitted each time an {@link Observer} * subscribes. Since this occurs before the {@link Subscription} is * returned, it is not possible to unsubscribe from the sequence before it * completes. * * @param t1 first item * @param t2 second item * @param t3 third item * @param t4 fourth item * @param t5 fifth item * @param t6 sixth item * @param t7 seventh item * @param t8 eighth item * @param t9 ninth item * @param the type of items, and the type of items to be emitted by the * resulting Observable * @return an Observable that emits each item * @see RxJava Wiki: from() */ @SuppressWarnings("unchecked") // suppress unchecked because we are using varargs inside the method public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9) { return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9)); } /** * Converts a series of items into an Observable that emits those items. *

* *

* @param t1 first item * @param t2 second item * @param t3 third item * @param t4 fourth item * @param t5 fifth item * @param t6 sixth item * @param t7 seventh item * @param t8 eighth item * @param t9 ninth item * @param t10 tenth item * @param the type of items, and the type of items to be emitted by the * resulting Observable * @return an Observable that emits each item * @see RxJava Wiki: from() */ @SuppressWarnings("unchecked") // suppress unchecked because we are using varargs inside the method public static Observable from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10) { return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10)); } /** * Generates an Observable that emits a sequence of Integers within a * specified range. *

* *

* @param start the value of the first Integer in the sequence * @param count the number of sequential Integers to generate * @return an Observable that emits a range of sequential Integers * @see RxJava Wiki: range() * @see MSDN: Observable.Range */ public static Observable range(int start, int count) { return from(Range.createWithCount(start, count)); } /** * Generates an Observable that emits a sequence of Integers within a * specified range with the specified scheduler. *

* * * @param start the value of the first Integer in the sequence * @param count the number of sequential Integers to generate * @param scheduler the scheduler to run the generator loop on * @return an Observable that emits a range of sequential Integers * @see RxJava Wiki: range() * @see MSDN: Observable.Range */ public static Observable range(int start, int count, Scheduler scheduler) { return from(Range.createWithCount(start, count), scheduler); } /** * Repeats the observable sequence indefinitely. *

* * * @return an Observable that emits the items emitted by the source * Observable repeatedly and in sequence * @see RxJava Wiki: repeat() * @see MSDN: Observable.Repeat */ public Observable repeat() { return this.repeat(Schedulers.currentThread()); } /** * Repeats the observable sequence indefinitely, on a particular scheduler. *

* * * @param scheduler the scheduler to send the values on. * @return an Observable that emits the items emitted by the source * Observable repeatedly and in sequence * @see RxJava Wiki: repeat() * @see MSDN: Observable.Repeat */ public Observable repeat(Scheduler scheduler) { return create(OperationRepeat.repeat(this, scheduler)); } /** * Returns an Observable that calls an Observable factory to create its * Observable for each new Observer that subscribes. That is, for each * subscriber, the actual Observable is determined by the factory function. *

* *

* The defer operator allows you to defer or delay emitting items from an * Observable until such time as an Observer subscribes to the Observable. * This allows an {@link Observer} to easily obtain updates or a refreshed * version of the sequence. * * @param observableFactory the Observable factory function to invoke for * each {@link Observer} that subscribes to the * resulting Observable * @param the type of the items emitted by the Observable * @return an Observable whose {@link Observer}s trigger an invocation of * the given Observable factory function * @see RxJava Wiki: defer() */ public static Observable defer(Func0> observableFactory) { return create(OperationDefer.defer(observableFactory)); } /** * Returns an Observable that emits a single item and then completes. *

* *

* To convert any object into an Observable that emits that object, pass * that object into the just method. *

* This is similar to the {@link #from(java.lang.Object[])} method, except * that from() will convert an {@link Iterable} object into an * Observable that emits each of the items in the Iterable, one at a time, * while the just() method converts an Iterable into an * Observable that emits the entire Iterable as a single item. * * @param value the item to emit * @param the type of that item * @return an Observable that emits a single item and then completes * @see RxJava Wiki: just() */ public static Observable just(T value) { return from(Arrays.asList((value))); } /** * Returns an Observable that emits a single item and then completes, on a * specified scheduler. *

* *

* This is a scheduler version of {@link Observable#just(Object)}. * * @param value the item to emit * @param the type of that item * @param scheduler the scheduler to emit the single item on * @return an Observable that emits a single item and then completes, on a * specified scheduler * @see RxJava Wiki: just() */ public static Observable just(T value, Scheduler scheduler) { return from(Arrays.asList((value)), scheduler); } /** * Flattens a sequence of Observables emitted by an Observable into one * Observable, without any transformation. *

* *

* You can combine the items emitted by multiple Observables so that they * act like a single Observable, by using the {@code merge} method. * * @param source an Observable that emits Observables * @return an Observable that emits items that are the result of flattening * the items emitted by the Observables emitted by the * {@code source} Observable * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ public static Observable merge(Observable> source) { return create(OperationMerge.merge(source)); } /** * Flattens a series of Observables into one Observable, without any * transformation. *

* *

* You can combine items emitted by multiple Observables so that they act * like a single Observable, by using the {@code merge} method. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable merge(Observable t1, Observable t2) { return create(OperationMerge.merge(t1, t2)); } /** * Flattens a series of Observables into one Observable, without any * transformation. *

* *

* You can combine items emitted by multiple Observables so that they act * like a single Observable, by using the {@code merge} method. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable merge(Observable t1, Observable t2, Observable t3) { return create(OperationMerge.merge(t1, t2, t3)); } /** * Flattens a series of Observables into one Observable, without any * transformation. *

* *

* You can combine items emitted by multiple Observables so that they act * like a single Observable, by using the {@code merge} method. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @param t4 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4) { return create(OperationMerge.merge(t1, t2, t3, t4)); } /** * Flattens a series of Observables into one Observable, without any * transformation. *

* *

* You can combine items emitted by multiple Observables so that they act * like a single Observable, by using the {@code merge} method. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @param t4 an Observable to be merged * @param t5 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5) { return create(OperationMerge.merge(t1, t2, t3, t4, t5)); } /** * Flattens a series of Observables into one Observable, without any * transformation. *

* *

* You can combine items emitted by multiple Observables so that they act * like a single Observable, by using the {@code merge} method. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @param t4 an Observable to be merged * @param t5 an Observable to be merged * @param t6 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6) { return create(OperationMerge.merge(t1, t2, t3, t4, t5, t6)); } /** * Flattens a series of Observables into one Observable, without any * transformation. *

* *

* You can combine items emitted by multiple Observables so that they act * like a single Observable, by using the {@code merge} method. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @param t4 an Observable to be merged * @param t5 an Observable to be merged * @param t6 an Observable to be merged * @param t7 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7) { return create(OperationMerge.merge(t1, t2, t3, t4, t5, t6, t7)); } /** * Flattens a series of Observables into one Observable, without any * transformation. *

* *

* You can combine items emitted by multiple Observables so that they act * like a single Observable, by using the {@code merge} method. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @param t4 an Observable to be merged * @param t5 an Observable to be merged * @param t6 an Observable to be merged * @param t7 an Observable to be merged * @param t8 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8) { return create(OperationMerge.merge(t1, t2, t3, t4, t5, t6, t7, t8)); } /** * Flattens a series of Observables into one Observable, without any * transformation. *

* *

* You can combine items emitted by multiple Observables so that they act * like a single Observable, by using the {@code merge} method. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @param t4 an Observable to be merged * @param t5 an Observable to be merged * @param t6 an Observable to be merged * @param t7 an Observable to be merged * @param t8 an Observable to be merged * @param t9 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: merge() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8, Observable t9) { return create(OperationMerge.merge(t1, t2, t3, t4, t5, t6, t7, t8, t9)); } /** * Returns an Observable that emits the items emitted by two or more * Observables, one after the other. *

* * * @param observables an Observable that emits Observables * @return an Observable that emits items that are the result of combining * the items emitted by the {@code source} Observables, one after * the other * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ public static Observable concat(Observable> observables) { return create(OperationConcat.concat(observables)); } /** * Returns an Observable that emits the items emitted by two Observables, * one after the other. *

* * * @param t1 an Observable to be concatenated * @param t2 an Observable to be concatenated * @return an Observable that emits items that are the result of combining * the items emitted by the {@code source} Observables, one after * the other * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable concat(Observable t1, Observable t2) { return create(OperationConcat.concat(t1, t2)); } /** * Returns an Observable that emits the items emitted by three Observables, * one after the other. *

* * * @param t1 an Observable to be concatenated * @param t2 an Observable to be concatenated * @param t3 an Observable to be concatenated * @return an Observable that emits items that are the result of combining * the items emitted by the {@code source} Observables, one after * the other * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable concat(Observable t1, Observable t2, Observable t3) { return create(OperationConcat.concat(t1, t2, t3)); } /** * Returns an Observable that emits the items emitted by four Observables, * one after the other. *

* * * @param t1 an Observable to be concatenated * @param t2 an Observable to be concatenated * @param t3 an Observable to be concatenated * @param t4 an Observable to be concatenated * @return an Observable that emits items that are the result of combining * the items emitted by the {@code source} Observables, one after * the other * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4) { return create(OperationConcat.concat(t1, t2, t3, t4)); } /** * Returns an Observable that emits the items emitted by five Observables, * one after the other. *

* * * @param t1 an Observable to be concatenated * @param t2 an Observable to be concatenated * @param t3 an Observable to be concatenated * @param t4 an Observable to be concatenated * @param t5 an Observable to be concatenated * @return an Observable that emits items that are the result of combining * the items emitted by the {@code source} Observables, one after * the other * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5) { return create(OperationConcat.concat(t1, t2, t3, t4, t5)); } /** * Returns an Observable that emits the items emitted by six Observables, * one after the other. *

* * * @param t1 an Observable to be concatenated * @param t2 an Observable to be concatenated * @param t3 an Observable to be concatenated * @param t4 an Observable to be concatenated * @param t5 an Observable to be concatenated * @param t6 an Observable to be concatenated * @return an Observable that emits items that are the result of combining * the items emitted by the {@code source} Observables, one after * the other * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6) { return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6)); } /** * Returns an Observable that emits the items emitted by secven Observables, * one after the other. *

* * * @param t1 an Observable to be concatenated * @param t2 an Observable to be concatenated * @param t3 an Observable to be concatenated * @param t4 an Observable to be concatenated * @param t5 an Observable to be concatenated * @param t6 an Observable to be concatenated * @param t7 an Observable to be concatenated * @return an Observable that emits items that are the result of combining * the items emitted by the {@code source} Observables, one after * the other * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7) { return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6, t7)); } /** * Returns an Observable that emits the items emitted by eight Observables, * one after the other. *

* * * @param t1 an Observable to be concatenated * @param t2 an Observable to be concatenated * @param t3 an Observable to be concatenated * @param t4 an Observable to be concatenated * @param t5 an Observable to be concatenated * @param t6 an Observable to be concatenated * @param t7 an Observable to be concatenated * @param t8 an Observable to be concatenated * @return an Observable that emits items that are the result of combining * the items emitted by the {@code source} Observables, one after * the other * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8) { return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6, t7, t8)); } /** * Returns an Observable that emits the items emitted by nine Observables, * one after the other. *

* * * @param t1 an Observable to be concatenated * @param t2 an Observable to be concatenated * @param t3 an Observable to be concatenated * @param t4 an Observable to be concatenated * @param t5 an Observable to be concatenated * @param t6 an Observable to be concatenated * @param t7 an Observable to be concatenated * @param t8 an Observable to be concatenated * @param t9 an Observable to be concatenated * @return an Observable that emits items that are the result of combining * the items emitted by the {@code source} Observables, one after * the other * @see RxJava Wiki: concat() * @see MSDN: Observable.Concat */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable concat(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8, Observable t9) { return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6, t7, t8, t9)); } /** * This behaves like {@link #merge(Observable)} except that if any of the * merged Observables notify of an error via * {@link Observer#onError onError}, {@code mergeDelayError} will refrain * from propagating that error notification until all of the merged * Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, * {@code mergeDelayError} will only invoke the {@code onError} method of * its Observers once. *

* This method allows an Observer to receive all successfully emitted items * from all of the source Observables without being interrupted by an error * notification from one of them. * * @param source an Observable that emits Observables * @return an Observable that emits items that are the result of flattening * the items emitted by the Observables emitted by the * {@code source} Observable * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ public static Observable mergeDelayError(Observable> source) { return create(OperationMergeDelayError.mergeDelayError(source)); } /** * This behaves like {@link #merge(Observable, Observable)} except that if * any of the merged Observables notify of an error via * {@link Observer#onError onError}, {@code mergeDelayError} will refrain * from propagating that error notification until all of the merged * Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, * {@code mergeDelayError} will only invoke the {@code onError} method of * its Observers once. *

* This method allows an Observer to receive all successfully emitted items * from all of the source Observables without being interrupted by an error * notification from one of them. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable mergeDelayError(Observable t1, Observable t2) { return create(OperationMergeDelayError.mergeDelayError(t1, t2)); } /** * This behaves like {@link #merge(Observable, Observable, Observable)} * except that if any of the merged Observables notify of an error via * {@link Observer#onError onError}, {@code mergeDelayError} will refrain * from propagating that error notification until all of the merged * Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, * {@code mergeDelayError} will only invoke the {@code onError} method of * its Observers once. *

* This method allows an Observer to receive all successfully emitted items * from all of the source Observables without being interrupted by an error * notification from one of them. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable mergeDelayError(Observable t1, Observable t2, Observable t3) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3)); } /** * This behaves like * {@link #merge(Observable, Observable, Observable, Observable)} except * that if any of the merged Observables notify of an error via * {@link Observer#onError onError}, {@code mergeDelayError} will refrain * from propagating that error notification until all of the merged * Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, * {@code mergeDelayError} will only invoke the {@code onError} method of * its Observers once. *

* This method allows an Observer to receive all successfully emitted items * from all of the source Observables without being interrupted by an error * notification from one of them. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @param t4 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4)); } /** * This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable)} * except that if any of the merged Observables notify of an error via * {@link Observer#onError onError}, {@code mergeDelayError} will refrain * from propagating that error notification until all of the merged * Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, * {@code mergeDelayError} will only invoke the {@code onError} method of * its Observers once. *

* This method allows an Observer to receive all successfully emitted items * from all of the source Observables without being interrupted by an error * notification from one of them. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @param t4 an Observable to be merged * @param t5 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5)); } /** * This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable)} * except that if any of the merged Observables notify of an error via * {@link Observer#onError onError}, {@code mergeDelayError} will refrain * from propagating that error notification until all of the merged * Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, * {@code mergeDelayError} will only invoke the {@code onError} method of * its Observers once. *

* This method allows an Observer to receive all successfully emitted items * from all of the source Observables without being interrupted by an error * notification from one of them. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @param t4 an Observable to be merged * @param t5 an Observable to be merged * @param t6 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5, t6)); } /** * This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable)} * except that if any of the merged Observables notify of an error via * {@link Observer#onError onError}, {@code mergeDelayError} will refrain * from propagating that error notification until all of the merged * Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, * {@code mergeDelayError} will only invoke the {@code onError} method of * its Observers once. *

* This method allows an Observer to receive all successfully emitted items * from all of the source Observables without being interrupted by an error * notification from one of them. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @param t4 an Observable to be merged * @param t5 an Observable to be merged * @param t6 an Observable to be merged * @param t7 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5, t6, t7)); } /** * This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable)} * except that if any of the merged Observables notify of an error via * {@link Observer#onError onError}, {@code mergeDelayError} will refrain * from propagating that error notification until all of the merged * Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, * {@code mergeDelayError} will only invoke the {@code onError} method of * its Observers once. *

* This method allows an Observer to receive all successfully emitted items * from all of the source Observables without being interrupted by an error * notification from one of them. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @param t4 an Observable to be merged * @param t5 an Observable to be merged * @param t6 an Observable to be merged * @param t7 an Observable to be merged * @param t8 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5, t6, t7, t8)); } /** * This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable)} * except that if any of the merged Observables notify of an error via * {@link Observer#onError onError}, {@code mergeDelayError} will refrain * from propagating that error notification until all of the merged * Observables have finished emitting items. *

* *

* Even if multiple merged Observables send {@code onError} notifications, * {@code mergeDelayError} will only invoke the {@code onError} method of * its Observers once. *

* This method allows an Observer to receive all successfully emitted items * from all of the source Observables without being interrupted by an error * notification from one of them. * * @param t1 an Observable to be merged * @param t2 an Observable to be merged * @param t3 an Observable to be merged * @param t4 an Observable to be merged * @param t5 an Observable to be merged * @param t6 an Observable to be merged * @param t7 an Observable to be merged * @param t8 an Observable to be merged * @param t9 an Observable to be merged * @return an Observable that emits items that are the result of flattening * the items emitted by the {@code source} Observables * @see RxJava Wiki: mergeDelayError() * @see MSDN: Observable.Merge */ @SuppressWarnings("unchecked") // suppress because the types are checked by the method signature before using a vararg public static Observable mergeDelayError(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8, Observable t9) { return create(OperationMergeDelayError.mergeDelayError(t1, t2, t3, t4, t5, t6, t7, t8, t9)); } /** * Returns an Observable that never sends any items or notifications to an * {@link Observer}. *

* *

* This Observable is useful primarily for testing purposes. * * @param the type of items (not) emitted by the Observable * @return an Observable that never emits any items or sends any * notifications to an {@link Observer} * @see RxJava Wiki: never() */ public static Observable never() { return new NeverObservable(); } /** * Given an Observable that emits Observables, returns an Observable that * emits the items emitted by the most recently emitted of those * Observables. *

* * * @param sequenceOfSequences the source Observable that emits Observables * @return an Observable that emits only the items emitted by the Observable * most recently emitted by the source Observable * @see RxJava Wiki: switchOnNext() * @deprecated use {@link #switchOnNext} */ @Deprecated public static Observable switchDo(Observable> sequenceOfSequences) { return create(OperationSwitch.switchDo(sequenceOfSequences)); } /** * Given an Observable that emits Observables, returns an Observable that * emits the items emitted by the most recently emitted of those * Observables. *

* * * @param sequenceOfSequences the source Observable that emits Observables * @return an Observable that emits only the items emitted by the Observable * most recently emitted by the source Observable * @see RxJava Wiki: switchOnNext() */ public static Observable switchOnNext(Observable> sequenceOfSequences) { return create(OperationSwitch.switchDo(sequenceOfSequences)); } /** * Given an Observable that emits Observables, returns an Observable that * emits the items emitted by the most recently emitted of those * Observables. *

* * * @param sequenceOfSequences the source Observable that emits Observables * @return an Observable that emits only the items emitted by the Observable * most recently emitted by the source Observable * @see RxJava Wiki: switchOnNext() * @see {@link #switchOnNext(Observable)} */ public static Observable switchLatest(Observable> sequenceOfSequences) { return create(OperationSwitch.switchDo(sequenceOfSequences)); } /** * Return a particular one of several possible Observables based on a case * selector. *

* * * @param the case key type * @param the result value type * @param caseSelector the function that produces a case key when an * Observer subscribes * @param mapOfCases a map that maps a case key to an Observable * @return a particular Observable chosen by key from the map of * Observables, or an empty Observable if no Observable matches the * key */ public static Observable switchCase(Func0 caseSelector, Map> mapOfCases) { return switchCase(caseSelector, mapOfCases, Observable.empty()); } /** * Return a particular one of several possible Observables based on a case * selector and run it on the designated scheduler. *

* * * @param the case key type * @param the result value type * @param caseSelector the function that produces a case key when an * Observer subscribes * @param mapOfCases a map that maps a case key to an Observable * @param scheduler the scheduler where the empty observable is observed * @return a particular Observable chosen by key from the map of * Observables, or an empty Observable if no Observable matches the * key, but one that runs on the designated scheduler in either case */ public static Observable switchCase(Func0 caseSelector, Map> mapOfCases, Scheduler scheduler) { return switchCase(caseSelector, mapOfCases, Observable.empty(scheduler)); } /** * Return a particular one of several possible Observables based on a case * selector, or a default Observable if the case selector does not map to * a particular one. *

* * * @param the case key type * @param the result value type * @param caseSelector the function that produces a case key when an * Observer subscribes * @param mapOfCases a map that maps a case key to an Observable * @param defaultCase the default Observable if the {@code mapOfCases} * doesn't contain a value for the key returned by the * {@case caseSelector} * @return a particular Observable chosen by key from the map of * Observables, or the default case if no Observable matches the key */ public static Observable switchCase(Func0 caseSelector, Map> mapOfCases, Observable defaultCase) { return create(OperationConditionals.switchCase(caseSelector, mapOfCases, defaultCase)); } /** * Return an Observable that replays the emissions from the source * Observable, and then continues to replay them so long as a condtion is * true. *

* * * @param postCondition the post condition to test after the source * Observable completes * @return an Observable that replays the emissions from the source * Observable, and then continues to replay them so long as the post * condition is true */ public Observable doWhile(Func0 postCondition) { return create(OperationConditionals.doWhile(this, postCondition)); } /** * Return an Observable that replays the emissions from the source * Observable so long as a condtion is true. *

* * * @param preCondition the condition to evaluate before subscribing to or * replaying the source Observable * @return an Observable that replays the emissions from the source * Observable so long as preCondition is true */ public Observable whileDo(Func0 preCondition) { return create(OperationConditionals.whileDo(this, preCondition)); } /** * Return an Observable that emits the emissions from a specified Observable * if a condition evaluates to true, otherwise return an empty Observable. *

* * * @param the result value type * @param condition the condition that decides whether to emit the emissions * from the then Observable * @param then the Observable sequence to emit to if {@code condition} is * {@code true} * @return an Observable that mimics the {@code then} Observable if the * {@code condition} function evaluates to true, or an empty * Observable otherwise */ public static Observable ifThen(Func0 condition, Observable then) { return ifThen(condition, then, Observable.empty()); } /** * Return an Observable that emits the emissions from a specified Observable * if a condition evaluates to true, otherwise return an empty Observable * that runs on a specified Scheduler. *

* * * @param the result value type * @param condition the condition that decides whether to emit the emissions * from the then Observable * @param then the Observable sequence to emit to if {@code condition} is * {@code true} * @param scheduler the Scheduler on which the empty Observable runs if the * in case the condition returns false * @return an Observable that mimics the {@code then} Observable if the * {@code condition} function evaluates to true, or an empty * Observable running on the specified Scheduler otherwise */ public static Observable ifThen(Func0 condition, Observable then, Scheduler scheduler) { return ifThen(condition, then, Observable.empty(scheduler)); } /** * Return an Observable that emits the emissions from one specified * Observable if a condition evaluates to true, or from another specified * Observable otherwise. *

* * * @param the result value type * @param condition the condition that decides which Observable to emit the * emissions from * @param then the Observable sequence to emit to if {@code condition} is * {@code true} * @param orElse the Observable sequence to emit to if {@code condition} is * {@code false} * @return an Observable that mimics either the {@code then} or * {@code orElse} Observables depending on a condition function */ public static Observable ifThen(Func0 condition, Observable then, Observable orElse) { return create(OperationConditionals.ifThen(condition, then, orElse)); } /** * Accepts an Observable and wraps it in another Observable that ensures * that the resulting Observable is chronologically well-behaved. *

* *

* A well-behaved Observable does not interleave its invocations of the * {@link Observer#onNext onNext}, {@link Observer#onCompleted onCompleted}, * and {@link Observer#onError onError} methods of its {@link Observer}s; it * invokes {@code onCompleted} or {@code onError} only once; and it never * invokes {@code onNext} after invoking either {@code onCompleted} or * {@code onError}. {@code synchronize} enforces this, and the Observable it * returns invokes {@code onNext} and {@code onCompleted} or {@code onError} * synchronously. * * @return an Observable that is a chronologically well-behaved version of * the source Observable, and that synchronously notifies its * {@link Observer}s * @see RxJava Wiki: synchronize() */ public Observable synchronize() { return create(OperationSynchronize.synchronize(this)); } /** * Accepts an Observable and wraps it in another Observable that ensures * that the resulting Observable is chronologically well-behaved. This is * accomplished by acquiring a mutual-exclusion lock for the object provided * as the lock parameter. *

* *

* A well-behaved Observable does not interleave its invocations of the * {@link Observer#onNext onNext}, {@link Observer#onCompleted onCompleted}, * and {@link Observer#onError onError} methods of its {@link Observer}s; it * invokes {@code onCompleted} or {@code onError} only once; and it never * invokes {@code onNext} after invoking either {@code onCompleted} or * {@code onError}. {@code synchronize} enforces this, and the Observable it * returns invokes {@code onNext} and {@code onCompleted} or {@code onError} * synchronously. * * @param lock the lock object to synchronize each observer call on * @return an Observable that is a chronologically well-behaved version of * the source Observable, and that synchronously notifies its * {@link Observer}s * @see RxJava Wiki: synchronize() */ public Observable synchronize(Object lock) { return create(OperationSynchronize.synchronize(this, lock)); } /** * @deprecated use {@link #synchronize()} or {@link #synchronize(Object)} */ @Deprecated public static Observable synchronize(Observable source) { return create(OperationSynchronize.synchronize(source)); } /** * Returns an Observable that emits an item each time interval, containing * a sequential number. *

* * * @param interval interval size in time units (see below) * @param unit time units to use for the interval size * @return an Observable that emits an item each time interval * @see RxJava Wiki: interval() * @see MSDN: Observable.Interval */ public static Observable interval(long interval, TimeUnit unit) { return create(OperationInterval.interval(interval, unit)); } /** * Returns an Observable that emits an item each time interval, containing * a sequential number. *

* * * @param interval interval size in time units (see below) * @param unit time units to use for the interval size * @param scheduler the scheduler to use for scheduling the items * @return an Observable that emits an item each time interval * @see RxJava Wiki: interval() * @see MSDN: Observable.Interval */ public static Observable interval(long interval, TimeUnit unit, Scheduler scheduler) { return create(OperationInterval.interval(interval, unit, scheduler)); } /** * Returns an Observable that emits one item after a given delay, and then * completes. *

* * * @param delay the initial delay before emitting a single 0L * @param unit time units to use for the interval size * @see RxJava wiki: timer() */ public static Observable timer(long delay, TimeUnit unit) { return timer(delay, unit, Schedulers.threadPoolForComputation()); } /** * Returns an Observable that emits one item after a given delay, and then * completes. *

* * * @param delay the initial delay before emitting a single 0L * @param unit time units to use for the interval size * @param scheduler the scheduler to use for scheduling the item * @see RxJava wiki: timer() */ public static Observable timer(long delay, TimeUnit unit, Scheduler scheduler) { return create(new OperationTimer.TimerOnce(delay, unit, scheduler)); } /** * Return an Observable which emits a 0L after the {@code initialDelay} and * ever increasing numbers after each {@code period}. *

* * * @param initialDelay the initial delay time to wait before emitting the * first value of 0L * @param period the time period after emitting the subsequent numbers * @param unit the time unit for both initialDelay and * period * @return an Observable which emits a 0L after the {@code initialDelay} and * ever increasing numbers after each {@code period} * @see RxJava Wiki: timer() * @see MSDN: Observable.Timer */ public static Observable timer(long initialDelay, long period, TimeUnit unit) { return timer(initialDelay, period, unit, Schedulers.threadPoolForComputation()); } /** * Return an Observable which emits a 0L after the {@code initialDelay} and * ever increasing numbers after each {@code period} while running on the * given {@code scheduler}. *

* * * @param initialDelay the initial delay time to wait before emitting the * first value of 0L * @param period the time period after emitting the subsequent numbers * @param unit the time unit for both initialDelay and * period * @param scheduler the scheduler on which the waiting happens and value * emissions run * @return an Observable that emits a 0L after the {@code initialDelay} and * ever increasing numbers after each {@code period} while running * on the given {@code scheduler} * @see RxJava Wiki: timer() * @see MSDN: Observable.Timer */ public static Observable timer(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { return create(new OperationTimer.TimerPeriodically(initialDelay, period, unit, scheduler)); } /** * Returns an Observable that emits the items emitted by the source * Observable shifted forward in time by a specified delay. Error * notifications from the source Observable are not delayed. *

* * * @param delay the delay to shift the source by * @param unit the {@link TimeUnit} in which period is defined * @return the source Observable, but shifted by the specified delay * @see RxJava Wiki: delay() * @see MSDN: Observable.Delay */ public Observable delay(long delay, TimeUnit unit) { return OperationDelay.delay(this, delay, unit, Schedulers.threadPoolForComputation()); } /** * Returns an Observable that emits the items emitted by the source * Observable shifted forward in time by a specified delay. Error * notifications from the source Observable are not delayed. *

* * * @param delay the delay to shift the source by * @param unit the {@link TimeUnit} in which period is defined * @param scheduler the {@link Scheduler} to use for delaying * @return the source Observable, but shifted by the specified delay * @see RxJava Wiki: delay() * @see MSDN: Observable.Delay */ public Observable delay(long delay, TimeUnit unit, Scheduler scheduler) { return OperationDelay.delay(this, delay, unit, scheduler); } /** * Return an Observable that delays the subscription to the source * Observable by a given amount of time. *

* * * @param delay the time to delay the subscription * @param unit the time unit * @return an Observable that delays the subscription to the source * Observable by the given amount */ public Observable delaySubscription(long delay, TimeUnit unit) { return delaySubscription(delay, unit, Schedulers.threadPoolForComputation()); } /** * Return an Observable that delays the subscription to the source * Observable by a given amount of time, both waiting and subscribing on * a given Scheduler. *

* * * @param delay the time to delay the subscription * @param unit the time unit * @param scheduler the scheduler on which the waiting and subscription will * happen * @return an Observable that delays the subscription to the source * Observable by a given amount, waiting and subscribing on the * given Scheduler */ public Observable delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) { return create(OperationDelay.delaySubscription(this, delay, unit, scheduler)); } /** * Drops items emitted by an Observable that are followed by newer items * before a timeout value expires. The timer resets on each emission. *

* Note: If events keep firing faster than the timeout then no items will be * emitted by the resulting Observable. *

* *

* Information on debounce vs throttle: *

*

* * @param timeout the time each item has to be "the most recent" of those * emitted by the source {@link Observable} to ensure that * it's not dropped * @param unit the {@link TimeUnit} for the timeout * @return an {@link Observable} that filters out items that are too quickly * followed by newer items * @see RxJava Wiki: debounce() * @see #throttleWithTimeout(long, TimeUnit) */ public Observable debounce(long timeout, TimeUnit unit) { return create(OperationDebounce.debounce(this, timeout, unit)); } /** * Drops items emitted by an Observable that are followed by newer items * before a timeout value expires. The timer resets on each emission. *

* Note: If events keep firing faster than the timeout then no items will be * emitted by the resulting Observable. *

* *

* Information on debounce vs throttle: *

*

* * @param timeout the time each item has to be "the most recent" of those * emitted by the source {@link Observable} to ensure that * it's not dropped * @param unit the unit of time for the specified timeout * @param scheduler the {@link Scheduler} to use internally to manage the * timers that handle the timeout for each event * @return an {@link Observable} that filters out items that are too quickly * followed by newer items * @see RxJava Wiki: debounce() * @see #throttleWithTimeout(long, TimeUnit, Scheduler) */ public Observable debounce(long timeout, TimeUnit unit, Scheduler scheduler) { return create(OperationDebounce.debounce(this, timeout, unit, scheduler)); } /** * Drops items emitted by an Observable that are followed by newer items * before a timeout value expires. The timer resets on each emission. *

* Note: If events keep firing faster than the timeout then no items will be * emitted by the resulting Observable. *

* *

* Information on debounce vs throttle: *

*

* * @param timeout the time each item has to be "the most recent" of those * emitted by the source {@link Observable} to ensure that * it's not dropped * @param unit the {@link TimeUnit} for the timeout * @return an {@link Observable} that filters out items that are too quickly * followed by newer items * @see RxJava Wiki: throttleWithTimeout() * @see #debounce(long, TimeUnit) */ public Observable throttleWithTimeout(long timeout, TimeUnit unit) { return create(OperationDebounce.debounce(this, timeout, unit)); } /** * Drops items emitted by an Observable that are followed by newer items * before a timeout value expires. The timer resets on each emission. *

* Note: If events keep firing faster than the timeout then no items will be * emitted by the resulting Observable. *

* *

* Information on debounce vs throttle: *

*

* * @param timeout the time each item has to be "the most recent" emitted by * the {@link Observable} to ensure that it's not dropped * @param unit the {@link TimeUnit} for the timeout * @param scheduler the {@link Scheduler} to use internally to manage the * timers that handle the timeout for each item * @return an {@link Observable} that filters out items that are too quickly * followed by newer items * @see RxJava Wiki: throttleWithTimeout() * @see #debounce(long, TimeUnit, Scheduler) */ public Observable throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) { return create(OperationDebounce.debounce(this, timeout, unit, scheduler)); } /** * Throttles by skipping items emitted by the source Observable until * windowDuration passes and then emitting the next item * emitted by the source Observable. *

* This differs from {@link #throttleLast} in that this only tracks passage * of time whereas {@link #throttleLast} ticks at scheduled intervals. *

* * * @param windowDuration time to wait before emitting another item after * emitting the last item * @param unit the unit of time for the specified timeout * @return an Observable that performs the throttle operation * @see RxJava Wiki: throttleFirst() */ public Observable throttleFirst(long windowDuration, TimeUnit unit) { return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit)); } /** * Throttles by skipping items emitted by the source Observable until * skipDuration passes and then emitting the next item emitted * by the source Observable. *

* This differs from {@link #throttleLast} in that this only tracks passage * of time whereas {@link #throttleLast} ticks at scheduled intervals. *

* * * @param skipDuration time to wait before emitting another item after * emitting the last item * @param unit the unit of time for the specified timeout * @param scheduler the {@link Scheduler} to use internally to manage the * timers that handle timeout for each event * @return an Observable that performs the throttle operation * @see RxJava Wiki: throttleFirst() */ public Observable throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) { return create(OperationThrottleFirst.throttleFirst(this, skipDuration, unit, scheduler)); } /** * Throttles by emitting the last item from the source Observable that falls * in each interval defined by intervalDuration. *

* This differs from {@link #throttleFirst} in that this ticks along at a * scheduled interval whereas {@link #throttleFirst} does not tick, it just * tracks passage of time. *

* * * @param intervalDuration duration of windows within which the last item * emitted by the source Observable will be emitted * @param unit the unit of time for the specified interval * @return an Observable that performs the throttle operation * @see RxJava Wiki: throttleLast() * @see #sample(long, TimeUnit) */ public Observable throttleLast(long intervalDuration, TimeUnit unit) { return sample(intervalDuration, unit); } /** * Throttles by emitting the last item in each interval defined by * intervalDuration. *

* This differs from {@link #throttleFirst} in that this ticks along at a * scheduled interval whereas {@link #throttleFirst} does not tick, it just * tracks passage of time. *

* * * @param intervalDuration duration of windows within which the last item * emitted by the source Observable will be emitted * @param unit the unit of time for the specified interval * @param scheduler the {@link Scheduler} to use internally to manage the * timers that handle timeout for each event * @return an Observable that performs the throttle operation * @see RxJava Wiki: throttleLast() * @see #sample(long, TimeUnit, Scheduler) */ public Observable throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) { return sample(intervalDuration, unit, scheduler); } /** * Wraps each item emitted by a source Observable in a {@link Timestamped} * object. *

* * * @return an Observable that emits timestamped items from the source * Observable * @see RxJava Wiki: timestamp() * @see MSDN: Observable.Timestamp */ public Observable> timestamp() { return create(OperationTimestamp.timestamp(this)); } /** * Wraps each item emitted by a source Observable in a {@link Timestamped} * object with timestamps provided by the given Scheduler. *

* * * @param scheduler the {@link Scheduler} to use as a time source. * @return an Observable that emits timestamped items from the source * Observable with timestamps provided by the given Scheduler * @see RxJava Wiki: timestamp() * @see MSDN: Observable.Timestamp */ public Observable> timestamp(Scheduler scheduler) { return create(OperationTimestamp.timestamp(this, scheduler)); } /** * Converts a {@link Future} into an Observable. *

* *

* You can convert any object that supports the {@link Future} interface * into an Observable that emits the return value of the {@link Future#get} * method of that object, by passing the object into the {@code from} * method. *

* Important note: This Observable is blocking; you cannot * unsubscribe from it. * * @param future the source {@link Future} * @param the type of object that the {@link Future} returns, and also * the type of item to be emitted by the resulting Observable * @return an Observable that emits the item from the source Future * @see RxJava Wiki: from() */ public static Observable from(Future future) { return create(OperationToObservableFuture.toObservableFuture(future)); } /** * Converts a {@link Future} into an Observable. *

* *

* You can convert any object that supports the {@link Future} interface * into an Observable that emits the return value of the {@link Future#get} * method of that object, by passing the object into the {@code from} * method. *

* * @param future the source {@link Future} * @param scheduler the {@link Scheduler} to wait for the Future on. Use a * Scheduler such as {@link Schedulers#threadPoolForIO()} * that can block and wait on the future. * @param the type of object that the {@link Future} returns, and also * the type of item to be emitted by the resulting Observable * @return an Observable that emits the item from the source Future * @see RxJava Wiki: from() */ public static Observable from(Future future, Scheduler scheduler) { return create(OperationToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler); } /** * Converts a {@link Future} into an Observable with timeout. *

* *

* You can convert any object that supports the {@link Future} interface * into an Observable that emits the return value of the {link Future#get} * method of that object, by passing the object into the {@code from} * method. *

* Important note: This Observable is blocking; you cannot * unsubscribe from it. * * @param future the source {@link Future} * @param timeout the maximum time to wait before calling get() * @param unit the {@link TimeUnit} of the timeout argument * @param the type of object that the {@link Future} returns, and also * the type of item to be emitted by the resulting Observable * @return an Observable that emits the item from the source {@link Future} * @see RxJava Wiki: from() */ public static Observable from(Future future, long timeout, TimeUnit unit) { return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); } /** * Returns an Observable that emits a Boolean value that indicates whether * two sequences are equal by comparing the elements emitted by each * Observable pairwise. *

* * * @param first the first Observable to compare * @param second the second Observable to compare * @param the type of items emitted by each Observable * @return an Observable that emits a Boolean value that indicates whether * two sequences are equal by comparing the elements pairwise * @see RxJava Wiki: sequenceEqual() */ public static Observable sequenceEqual(Observable first, Observable second) { return sequenceEqual(first, second, new Func2() { @Override public Boolean call(T first, T second) { if(first == null) { return second == null; } return first.equals(second); } }); } /** * Returns an Observable that emits a Boolean value that indicates whether * two sequences are equal by comparing the elements emitted by each * Observable pairwise based on the results of a specified equality * function. *

* * * @param first the first Observable to compare * @param second the second Observable to compare * @param equality a function used to compare items emitted by both * Observables * @param the type of items emitted by each Observable * @return an Observable that emits a Boolean value that indicates whether * two sequences are equal by comparing the elements pairwise * @see RxJava Wiki: sequenceEqual() */ public static Observable sequenceEqual(Observable first, Observable second, Func2 equality) { return OperationSequenceEqual.sequenceEqual(first, second, equality); } /** * Returns an Observable that emits the results of a function of your * choosing applied to combinations of two items emitted, in sequence, by * two other Observables. *

* *

* {@code zip} applies this function in strict sequence, so the first item * emitted by the new Observable will be the result of the function applied * to the first item emitted by {@code o1} and the first item emitted by * {@code o2}; the second item emitted by the new Observable will be the * result of the function applied to the second item emitted by {@code o1} * and the second item emitted by {@code o2}; and so forth. *

* The resulting {@code Observable} returned from {@code zip} will * invoke {@link Observer#onNext onNext} as many times as the number of * {@code onNext} invocations of the source Observable that emits the fewest * items. * * @param o1 the first source Observable * @param o2 another source Observable * @param zipFunction a function that, when applied to an item emitted by * each of the source Observables, results in an item that will * be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public static Observable zip(Observable o1, Observable o2, Func2 zipFunction) { return create(OperationZip.zip(o1, o2, zipFunction)); } /** * Returns an Observable that emits the results of a function of your * choosing applied to combinations of three items emitted, in sequence, by * three other Observables. *

* *

* {@code zip} applies this function in strict sequence, so the first item * emitted by the new Observable will be the result of the function applied * to the first item emitted by {@code o1}, the first item emitted by * {@code o2}, and the first item emitted by {@code o3}; the second item * emitted by the new Observable will be the result of the function applied * to the second item emitted by {@code o1}, the second item emitted by * {@code o2}, and the second item emitted by {@code o3}; and so forth. *

* The resulting {@code Observable} returned from {@code zip} will * invoke {@link Observer#onNext onNext} as many times as the number of * {@code onNext} invocations of the source Observable that emits the fewest * items. * * @param o1 the first source Observable * @param o2 a second source Observable * @param o3 a third source Observable * @param zipFunction a function that, when applied to an item emitted by * each of the source Observables, results in an item * that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public static Observable zip(Observable o1, Observable o2, Observable o3, Func3 zipFunction) { return create(OperationZip.zip(o1, o2, o3, zipFunction)); } /** * Returns an Observable that emits the results of a function of your * choosing applied to combinations of four items emitted, in sequence, by * four other Observables. *

* *

* {@code zip} applies this function in strict sequence, so the first item * emitted by the new Observable will be the result of the function applied * to the first item emitted by {@code o1}, the first item emitted by * {@code o2}, the first item emitted by {@code o3}, and the first item * emitted by {@code 04}; the second item emitted by the new Observable will * be the result of the function applied to the second item emitted by each * of those Observables; and so forth. *

* The resulting {@code Observable} returned from {@code zip} will * invoke {@link Observer#onNext onNext} as many times as the number of * {@code onNext} invocations of the source Observable that emits the fewest * items. * * @param o1 one source Observable * @param o2 a second source Observable * @param o3 a third source Observable * @param o4 a fourth source Observable * @param zipFunction a function that, when applied to an item emitted by * each of the source Observables, results in an item that will * be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Func4 zipFunction) { return create(OperationZip.zip(o1, o2, o3, o4, zipFunction)); } /** * Returns an Observable that emits the results of a function of your * choosing applied to combinations of five items emitted, in sequence, by * five other Observables. *

* *

* {@code zip} applies this function in strict sequence, so the first item * emitted by the new Observable will be the result of the function applied * to the first item emitted by {@code o1}, the first item emitted by * {@code o2}, the first item emitted by {@code o3}, the first item emitted * by {@code o4}, and the first item emitted by {@code o5}; the second item * emitted by the new Observable will be the result of the function applied * to the second item emitted by each of those Observables; and so forth. *

* The resulting {@code Observable} returned from {@code zip} will * invoke {@link Observer#onNext onNext} as many times as the number of * {@code onNext} invocations of the source Observable that emits the fewest * items. * * @param o1 the first source Observable * @param o2 a second source Observable * @param o3 a third source Observable * @param o4 a fourth source Observable * @param o5 a fifth source Observable * @param zipFunction a function that, when applied to an item emitted by * each of the source Observables, results in an item * that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Func5 zipFunction) { return create(OperationZip.zip(o1, o2, o3, o4, o5, zipFunction)); } /** * Returns an Observable that emits the results of a function of your * choosing applied to combinations of six items emitted, in sequence, by * six other Observables. *

* *

* {@code zip} applies this function in strict sequence, so the first item * emitted by the new Observable will be the result of the function applied * to the first item emitted each source Observable, the second item emitted * by the new Observable will be the result of the function applied to the * second item emitted by each of those Observables, and so forth. *

* The resulting {@code Observable} returned from {@code zip} will * invoke {@link Observer#onNext onNext} as many times as the number of * {@code onNext} invocations of the source Observable that emits the fewest * items. * * @param o1 the first source Observable * @param o2 a second source Observable * @param o3 a third source Observable * @param o4 a fourth source Observable * @param o5 a fifth source Observable * @param o6 a sixth source Observable * @param zipFunction a function that, when applied to an item emitted by * each of the source Observables, results in an item * that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Func6 zipFunction) { return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, zipFunction)); } /** * Returns an Observable that emits the results of a function of your * choosing applied to combinations of seven items emitted, in sequence, by * seven other Observables. *

* *

* {@code zip} applies this function in strict sequence, so the first item * emitted by the new Observable will be the result of the function applied * to the first item emitted each source Observable, the second item emitted * by the new Observable will be the result of the function applied to the * second item emitted by each of those Observables, and so forth. *

* The resulting {@code Observable} returned from {@code zip} will * invoke {@link Observer#onNext onNext} as many times as the number of * {@code onNext} invocations of the source Observable that emits the fewest * items. * * @param o1 the first source Observable * @param o2 a second source Observable * @param o3 a third source Observable * @param o4 a fourth source Observable * @param o5 a fifth source Observable * @param o6 a sixth source Observable * @param o7 a seventh source Observable * @param zipFunction a function that, when applied to an item emitted by * each of the source Observables, results in an item * that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Func7 zipFunction) { return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, zipFunction)); } /** * Returns an Observable that emits the results of a function of your * choosing applied to combinations of eight items emitted, in sequence, by * eight other Observables. *

* *

* {@code zip} applies this function in strict sequence, so the first item * emitted by the new Observable will be the result of the function applied * to the first item emitted each source Observable, the second item emitted * by the new Observable will be the result of the function applied to the * second item emitted by each of those Observables, and so forth. *

* The resulting {@code Observable} returned from {@code zip} will * invoke {@link Observer#onNext onNext} as many times as the number of * {@code onNext} invocations of the source Observable that emits the fewest * items. * * @param o1 the first source Observable * @param o2 a second source Observable * @param o3 a third source Observable * @param o4 a fourth source Observable * @param o5 a fifth source Observable * @param o6 a sixth source Observable * @param o7 a seventh source Observable * @param o8 an eighth source Observable * @param zipFunction a function that, when applied to an item emitted by * each of the source Observables, results in an item * that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Func8 zipFunction) { return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, zipFunction)); } /** * Returns an Observable that emits the results of a function of your * choosing applied to combinations of nine items emitted, in sequence, by * nine other Observables. *

* *

* {@code zip} applies this function in strict sequence, so the first item * emitted by the new Observable will be the result of the function applied * to the first item emitted each source Observable, the second item emitted * by the new Observable will be the result of the function applied to the * second item emitted by each of those Observables, and so forth. *

* The resulting {@code Observable} returned from {@code zip} will * invoke {@link Observer#onNext onNext} as many times as the number of * {@code onNext} invocations of the source Observable that emits the fewest * items. * * @param o1 the first source Observable * @param o2 a second source Observable * @param o3 a third source Observable * @param o4 a fourth source Observable * @param o5 a fifth source Observable * @param o6 a sixth source Observable * @param o7 a seventh source Observable * @param o8 an eighth source Observable * @param o9 a ninth source Observable * @param zipFunction a function that, when applied to an item emitted by * each of the source Observables, results in an item * that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Observable o9, Func9 zipFunction) { return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9, zipFunction)); } /** * Combines the given Observables, emitting an item that aggregates the * latest values of each of the source Observables each time an item is * received from any of the source Observables, where this aggregation is * defined by a given function. *

* * * @param o1 the first source Observable * @param o2 the second source Observable * @param combineFunction the aggregation function used to combine the * items emitted by the source Observables * @return an Observable whose emissions are the result of combining the * emissions of the source Observables with the given aggregation * function * @see RxJava Wiki: combineLatest() */ public static Observable combineLatest(Observable o1, Observable o2, Func2 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, combineFunction)); } /** * Combines the given Observables, emitting an item that aggregates the * latest values of each of the source Observables each time an item is * received from any of the source Observables, where this aggregation is * defined by a given function. *

* * * @param o1 the first source Observable * @param o2 the second source Observable * @param o3 the third source Observable * @param combineFunction the aggregation function used to combine the * items emitted by the source Observables * @return an Observable whose emissions are the result of combining the * emissions of the source Observables with the given aggregation * function * @see RxJava Wiki: combineLatest() */ public static Observable combineLatest(Observable o1, Observable o2, Observable o3, Func3 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, combineFunction)); } /** * Combines the given Observables, emitting an item that aggregates the * latest values of each of the source Observables each time an item is * received from any of the source Observables, where this aggregation is * defined by a given function. *

* * * @param o1 the first source Observable * @param o2 the second source Observable * @param o3 the third source Observable * @param o4 the fourth source Observable * @param combineFunction the aggregation function used to combine the * items emitted by the source Observables * @return an Observable whose emissions are the result of combining the * emissions of the source Observables with the given aggregation * function * @see RxJava Wiki: combineLatest() */ public static Observable combineLatest(Observable o1, Observable o2, Observable o3, Observable o4, Func4 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, combineFunction)); } /** * Combines the given Observables, emitting an item that aggregates the * latest values of each of the source Observables each time an item is * received from any of the source Observables, where this aggregation is * defined by a given function. *

* * * @param o1 the first source Observable * @param o2 the second source Observable * @param o3 the third source Observable * @param o4 the fourth source Observable * @param o5 the fifth source Observable * @param combineFunction the aggregation function used to combine the * items emitted by the source Observables * @return an Observable whose emissions are the result of combining the * emissions of the source Observables with the given aggregation * function * @see RxJava Wiki: combineLatest() */ public static Observable combineLatest(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Func5 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, combineFunction)); } /** * Combines the given Observables, emitting an item that aggregates the * latest values of each of the source Observables each time an item is * received from any of the source Observables, where this aggregation is * defined by a given function. *

* * * @param o1 the first source Observable * @param o2 the second source Observable * @param o3 the third source Observable * @param o4 the fourth source Observable * @param o5 the fifth source Observable * @param o6 the sixth source Observable * @param combineFunction the aggregation function used to combine the * items emitted by the source Observables * @return an Observable whose emissions are the result of combining the * emissions of the source Observables with the given aggregation * function * @see RxJava Wiki: combineLatest() */ public static Observable combineLatest(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Func6 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, combineFunction)); } /** * Combines the given Observables, emitting an item that aggregates the * latest values of each of the source Observables each time an item is * received from any of the source Observables, where this aggregation is * defined by a given function. *

* * * @param o1 the first source Observable * @param o2 the second source Observable * @param o3 the third source Observable * @param o4 the fourth source Observable * @param o5 the fifth source Observable * @param o6 the sixth source Observable * @param o7 the seventh source Observable * @param combineFunction the aggregation function used to combine the * items emitted by the source Observables * @return an Observable whose emissions are the result of combining the * emissions of the source Observables with the given aggregation * function * @see RxJava Wiki: combineLatest() */ public static Observable combineLatest(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Func7 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, combineFunction)); } /** * Combines the given Observables, emitting an item that aggregates the * latest values of each of the source Observables each time an item is * received from any of the source Observables, where this aggregation is * defined by a given function. *

* * * @param o1 the first source Observable * @param o2 the second source Observable * @param o3 the third source Observable * @param o4 the fourth source Observable * @param o5 the fifth source Observable * @param o6 the sixth source Observable * @param o7 the seventh source Observable * @param o8 the eighth source Observable * @param combineFunction the aggregation function used to combine the * items emitted by the source Observables * @return an Observable whose emissions are the result of combining the * emissions of the source Observables with the given aggregation * function * @see RxJava Wiki: combineLatest() */ public static Observable combineLatest(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Func8 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, combineFunction)); } /** * Combines the given Observables, emitting an item that aggregates the * latest values of each of the source Observables each time an item is * received from any of the source Observables, where this aggregation is * defined by a given function. *

* * * @param o1 the first source Observable * @param o2 the second source Observable * @param o3 the third source Observable * @param o4 the fourth source Observable * @param o5 the fifth source Observable * @param o6 the sixth source Observable * @param o7 the seventh source Observable * @param o8 the eighth source Observable * @param o9 the ninth source Observable * @param combineFunction the aggregation function used to combine the * items emitted by the source Observables * @return an Observable whose emissions are the result of combining the * emissions of the source Observables with the given aggregation * function * @see RxJava Wiki: combineLatest() */ public static Observable combineLatest(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Observable o9, Func9 combineFunction) { return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, o9, combineFunction)); } /** * Creates an Observable that emits buffers of items it collects from the * source Observable. The resulting Observable emits connected, * non-overlapping buffers. It emits the current buffer and replaces it with * a new buffer when the Observable produced by the specified * bufferClosingSelector emits an item. It then uses the * bufferClosingSelector to create a new Observable to observe * for the end of the next buffer. *

* * * @param bufferClosingSelector the {@link Func0} which is used to produce * an {@link Observable} for every buffer * created. When this {@link Observable} emits * an item, buffer() emits the * associated buffer and replaces it with a new * one. * @return an {@link Observable} that emits connected, non-overlapping * buffers when the current {@link Observable} created with the * {@code bufferClosingSelector} argument emits an item * @see RxJava Wiki: buffer() */ public Observable> buffer(Func0> bufferClosingSelector) { return create(OperationBuffer.buffer(this, bufferClosingSelector)); } /** * Creates an Observable that emits buffers of items it collects from the * source Observable. The resulting Observable emits buffers that it creates * when the specified bufferOpenings Observable emits an item, * and closes when the Observable returned from * bufferClosingSelector emits an item. *

* * * @param bufferOpenings the {@link Observable} that, when it emits an item, * causes a new buffer to be created * @param bufferClosingSelector the {@link Func1} that is used to produce * an {@link Observable} for every buffer * created. When this {@link Observable} emits * an item, the associated buffer is emitted. * @return an {@link Observable} that emits buffers that are created and * closed when the specified {@link Observable}s emit items * @see RxJava Wiki: buffer() */ public Observable> buffer(Observable bufferOpenings, Func1> bufferClosingSelector) { return create(OperationBuffer.buffer(this, bufferOpenings, bufferClosingSelector)); } /** * Creates an Observable that emits buffers of items it collects from the * source Observable. The resulting Observable emits connected, * non-overlapping buffers, each containing count items. When * the source Observable completes or encounters an error, it emits the * current buffer is emitted, and propagates the notification from the * source Observable. *

* * * @param count the maximum number of items in each buffer before it should * be emitted * @return an {@link Observable} that emits connected, non-overlapping * buffers, each containing at most "count" items from the source * Observable * @see RxJava Wiki: buffer() */ public Observable> buffer(int count) { return create(OperationBuffer.buffer(this, count)); } /** * Creates an Observable that emits buffers of items it collects from the * source Observable. The resulting Observable emits buffers every * skip items, each containing count items. When * the source Observable completes or encounters an error, the resulting * Observable emits the current buffer and propagates the notification from * the source Observable. *

* * * @param count the maximum size of each buffer before it should be emitted * @param skip how many produced items need to be skipped before starting a * new buffer. Note that when skip and * count are equal, this is the same operation as * {@link Observable#buffer(int)}. * @return an {@link Observable} that emits buffers every skip * item and containing at most count items * @see RxJava Wiki: buffer() */ public Observable> buffer(int count, int skip) { return create(OperationBuffer.buffer(this, count, skip)); } /** * Creates an Observable that emits buffers of items it collects from the * source Observable. The resulting Observable emits connected, * non-overlapping buffers, each of a fixed duration specified by the * timespan argument. When the source Observable completes or * encounters an error, the resulting Observable emits the current buffer * and propagates the notification from the source Observable. *

* * * @param timespan the period of time each buffer collects items before it * should be emitted and replaced with a new buffer * @param unit the unit of time which applies to the timespan * argument * @return an {@link Observable} that emits connected, non-overlapping * buffers with a fixed duration * @see RxJava Wiki: buffer() */ public Observable> buffer(long timespan, TimeUnit unit) { return create(OperationBuffer.buffer(this, timespan, unit)); } /** * Creates an Observable that emits buffers of items it collects from the * source Observable. The resulting Observable emits connected, * non-overlapping buffers, each of a fixed duration specified by the * timespan argument. When the source Observable completes or * encounters an error, the resulting Observable emits the current buffer * and propagates the notification from the source Observable. *

* * * @param timespan the period of time each buffer collects items before it * should be emitted and replaced with a new buffer * @param unit the unit of time which applies to the timespan * argument * @param scheduler the {@link Scheduler} to use when determining the end * and start of a buffer * @return an {@link Observable} that emits connected, non-overlapping * buffers with a fixed duration * @see RxJava Wiki: buffer() */ public Observable> buffer(long timespan, TimeUnit unit, Scheduler scheduler) { return create(OperationBuffer.buffer(this, timespan, unit, scheduler)); } /** * Creates an Observable that emits buffers of items it collects from the * source Observable. The resulting Observable emits connected, * non-overlapping buffers, each of a fixed duration specified by the * timespan argument or a maximum size specified by the * count argument (whichever is reached first). When the source * Observable completes or encounters an error, the resulting Observable * emits the current buffer and propagates the notification from the source * Observable. *

* * * @param timespan the period of time each buffer collects items before it * should be emitted and replaced with a new buffer * @param unit the unit of time which applies to the timespan * argument * @param count the maximum size of each buffer before it should be emitted * @return an {@link Observable} that emits connected, non-overlapping * buffers of items emitted from the source Observable, after a * fixed duration or when the buffer reaches maximum capacity * (whichever occurs first) * @see RxJava Wiki: buffer() */ public Observable> buffer(long timespan, TimeUnit unit, int count) { return create(OperationBuffer.buffer(this, timespan, unit, count)); } /** * Creates an Observable that emits buffers of items it collects from the * source Observable. The resulting Observable emits connected, * non-overlapping buffers, each of a fixed duration specified by the * timespan argument or a maximum size specified by the * count argument (whichever is reached first). When the source * Observable completes or encounters an error, the resulting Observable * emits the current buffer and propagates the notification from the source * Observable. *

* * * @param timespan the period of time each buffer collects items before it * should be emitted and replaced with a new buffer * @param unit the unit of time which applies to the timespan * argument * @param count the maximum size of each buffer before it should be emitted * @param scheduler the {@link Scheduler} to use when determining the end and start of a buffer * @return an {@link Observable} that emits connected, non-overlapping * buffers of items emitted by the source Observable after a fixed * duration or when the buffer reaches maximum capacity (whichever * occurs first) * @see RxJava Wiki: buffer() */ public Observable> buffer(long timespan, TimeUnit unit, int count, Scheduler scheduler) { return create(OperationBuffer.buffer(this, timespan, unit, count, scheduler)); } /** * Creates an Observable that emits buffers of items it collects from the * source Observable. The resulting Observable starts a new buffer * periodically, as determined by the timeshift argument. It * emits buffer after a fixed timespan, specified by the * timespan argument. When the source Observable completes or * encounters an error, it emits the current buffer and propagates the * notification from the source Observable. *

* * * @param timespan the period of time each buffer collects items before it * should be emitted * @param timeshift the period of time after which a new buffer will be * created * @param unit the unit of time that applies to the timespan * and timeshift arguments * @return an {@link Observable} that emits new buffers of items emitted by * the source Observable periodically after a fixed timespan has * elapsed * @see RxJava Wiki: buffer() */ public Observable> buffer(long timespan, long timeshift, TimeUnit unit) { return create(OperationBuffer.buffer(this, timespan, timeshift, unit)); } /** * Creates an Observable that emits buffers of items it collects from the * source Observable. The resulting Observable starts a new buffer * periodically, as determined by the timeshift argument. It * emits each buffer after a fixed timespan, specified by the * timespan argument. When the source Observable completes or * encounters an error, the resulting Observable emits the current buffer * propagates the notification from the source Observable. *

* * * @param timespan the period of time each buffer collects items before it * should be emitted * @param timeshift the period of time after which a new buffer will be * created * @param unit the unit of time that applies to the timespan * and timeshift arguments * @param scheduler the {@link Scheduler} to use when determining the end * and start of a buffer * @return an {@link Observable} that emits new buffers of items emitted by * the source Observable periodically after a fixed timespan has * elapsed * @see RxJava Wiki: buffer() */ public Observable> buffer(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) { return create(OperationBuffer.buffer(this, timespan, timeshift, unit, scheduler)); } /** * Creates an Observable that emits windows of items it collects from the * source Observable. The resulting Observable emits connected, * non-overlapping windows. It emits the current window and opens a new one * when the Observable produced by the specified * closingSelector emits an item. The * closingSelector then creates a new Observable to observe * for the end of the next window. *

* * * @param closingSelector the {@link Func0} used to produce an * {@link Observable} for every window created. When this * {@link Observable} emits an item, window() emits * the associated window and begins a new one. * @return an {@link Observable} that emits connected, non-overlapping * windows when the current {@link Observable} created with the * closingSelector argument emits an item * @see RxJava Wiki: window() */ public Observable> window(Func0> closingSelector) { return create(OperationWindow.window(this, closingSelector)); } /** * Creates an Observable that emits windows of items it collects from the * source Observable. The resulting Observable emits windows. These windows * contain those items emitted by the source Observable between the time * when the windowOpenings Observable emits an item and when * the Observable returned by closingSelector emits an item. *

* * * @param windowOpenings the {@link Observable} that, when it emits an item, * causes another window to be created * @param closingSelector a {@link Func1} that produces an * {@link Observable} for every window created. When * this {@link Observable} emits an item, the * associated window is closed and emitted * @return an {@link Observable} that emits windows of items emitted by the * source Observable that are governed by the specified * {@link Observable}s emitting items * @see RxJava Wiki: window() */ public Observable> window(Observable windowOpenings, Func1> closingSelector) { return create(OperationWindow.window(this, windowOpenings, closingSelector)); } /** * Creates an Observable that emits windows of items it collects from the * source Observable. The resulting Observable emits connected, * non-overlapping windows, each containing count items. When * the source Observable completes or encounters an error, the resulting * Observable emits the current window and propagates the notification from * the source Observable. *

* * * @param count the maximum size of each window before it should be emitted * @return an {@link Observable} that emits connected, non-overlapping * windows containing at most count items * @see RxJava Wiki: window() */ public Observable> window(int count) { return create(OperationWindow.window(this, count)); } /** * Creates an Observable that emits windows of items it collects from the * source Observable. The resulting Observable emits windows every * skip items, each containing count items. * When the source Observable completes or encounters an error, the * resulting Observable emits the current window and propagates the * notification from the source Observable. *

* * * @param count the maximum size of each window before it should be emitted * @param skip how many items need to be skipped before starting a new * window. Note that if skip and count * are equal this is the same operation as {@link #window(int)}. * @return an {@link Observable} that emits windows every "skipped" * items containing at most count items * @see RxJava Wiki: window() */ public Observable> window(int count, int skip) { return create(OperationWindow.window(this, count, skip)); } /** * Creates an Observable that emits windows of items it collects from the * source Observable. The resulting Observable emits connected, * non-overlapping windows, each of a fixed duration specified by the * timespan argument. When the source Observable completes or * encounters an error, the resulting Observable emits the current window * and propagates the notification from the source Observable. *

* * * @param timespan the period of time each window collects items before it * should be emitted and replaced with a new window * @param unit the unit of time that applies to the timespan * argument * @return an {@link Observable} that emits connected, non-overlapping * windows with a fixed duration * @see RxJava Wiki: window() */ public Observable> window(long timespan, TimeUnit unit) { return create(OperationWindow.window(this, timespan, unit)); } /** * Creates an Observable that emits windows of items it collects from the * source Observable. The resulting Observable emits connected, * non-overlapping windows, each of a fixed duration as specified by the * timespan argument. When the source Observable completes or * encounters an error, the resulting Observable emits the current window * and propagates the notification from the source Observable. *

* * * @param timespan the period of time each window collects items before it * should be emitted and replaced with a new window * @param unit the unit of time which applies to the timespan * argument * @param scheduler the {@link Scheduler} to use when determining the end * and start of a window * @return an {@link Observable} that emits connected, non-overlapping * windows with a fixed duration * @see RxJava Wiki: window() */ public Observable> window(long timespan, TimeUnit unit, Scheduler scheduler) { return create(OperationWindow.window(this, timespan, unit, scheduler)); } /** * Creates an Observable that emits windows of items it collects from the * source Observable. The resulting Observable emits connected, * non-overlapping windows, each of a fixed duration as specified by the * timespan argument or a maximum size as specified by the * count argument (whichever is reached first). When the source * Observable completes or encounters an error, the resulting Observable * emits the current window and propagates the notification from the source * Observable. *

* * * @param timespan the period of time each window collects items before it * should be emitted and replaced with a new window * @param unit the unit of time that applies to the timespan * argument * @param count the maximum size of each window before it should be emitted * @return an {@link Observable} that emits connected, non-overlapping * windows after a fixed duration or when the window has reached * maximum capacity (whichever occurs first) * @see RxJava Wiki: window() */ public Observable> window(long timespan, TimeUnit unit, int count) { return create(OperationWindow.window(this, timespan, unit, count)); } /** * Creates an Observable that emits windows of items it collects from the * source Observable. The resulting Observable emits connected, * non-overlapping windows, each of a fixed duration specified by the * timespan argument or a maximum size specified by the * count argument (whichever is reached first). When the source * Observable completes or encounters an error, the resulting Observable * emits the current window and propagates the notification from the source * Observable. *

* * * @param timespan the period of time each window collects items before it * should be emitted and replaced with a new window * @param unit the unit of time which applies to the timespan * argument * @param count the maximum size of each window before it should be emitted * @param scheduler the {@link Scheduler} to use when determining the end * and start of a window. * @return an {@link Observable} that emits connected non-overlapping * windows after a fixed duration or when the window has reached * maximum capacity (whichever occurs first). * @see RxJava Wiki: window() */ public Observable> window(long timespan, TimeUnit unit, int count, Scheduler scheduler) { return create(OperationWindow.window(this, timespan, unit, count, scheduler)); } /** * Creates an Observable that emits windows of items it collects from the * source Observable. The resulting Observable starts a new window * periodically, as determined by the timeshift argument. It * emits each window after a fixed timespan, specified by the * timespan argument. When the source Observable completes or * Observable completes or encounters an error, the resulting Observable * emits the current window and propagates the notification from the source * Observable. *

* * * @param timespan the period of time each window collects items before it * should be emitted * @param timeshift the period of time after which a new window will be * created * @param unit the unit of time that applies to the timespan * and timeshift arguments * @return an {@link Observable} that emits new windows periodically as a * fixed timespan has elapsed * @see RxJava Wiki: window() */ public Observable> window(long timespan, long timeshift, TimeUnit unit) { return create(OperationWindow.window(this, timespan, timeshift, unit)); } /** * Creates an Observable that emits windows of items it collects from the * source Observable. The resulting Observable starts a new window * periodically, as determined by the timeshift argument. It * emits each window after a fixed timespan, specified by the * timespan argument. When the source Observable completes or * Observable completes or encounters an error, the resulting Observable * emits the current window and propagates the notification from the source * Observable. *

* * * @param timespan the period of time each window collects items before it * should be emitted * @param timeshift the period of time after which a new window will be * created * @param unit the unit of time that applies to the timespan * and timeshift arguments * @param scheduler the {@link Scheduler} to use when determining the end * and start of a window * @return an {@link Observable} that emits new windows periodically as a * fixed timespan has elapsed * @see RxJava Wiki: window() */ public Observable> window(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) { return create(OperationWindow.window(this, timespan, timeshift, unit, scheduler)); } /** * Returns an Observable that emits the results of a function of your * choosing applied to combinations of n items emitted, in sequence, * by n other Observables as provided by an Iterable. *

* {@code zip} applies this function in strict sequence, so the first item * emitted by the new Observable will be the result of the function applied * to the first item emitted by all of the source Observables; the second * item emitted by the new Observable will be the result of the function * applied to the second item emitted by each of those Observables; and so * forth. *

* The resulting {@code Observable} returned from {@code zip} will * invoke {@code onNext} as many times as the number of {@code onNext} * invokations of the source Observable that emits the fewest items. *

* * * @param ws an Observable of source Observables * @param zipFunction a function that, when applied to an item emitted by * each of the source Observables, results in an item * that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public static Observable zip(Observable> ws, final FuncN zipFunction) { return ws.toList().mergeMap(new Func1>, Observable>() { @Override public Observable call(List> wsList) { return create(OperationZip.zip(wsList, zipFunction)); } }); } /** * Returns an Observable that emits the results of a function of your * choosing applied to combinations items emitted, in sequence, by a * collection of other Observables. *

* {@code zip} applies this function in strict sequence, so the first item * emitted by the new Observable will be the result of the function applied * to the first item emitted by all of the source Observables; the second * item emitted by the new Observable will be the result of the function * applied to the second item emitted by each of those Observables; and so * forth. *

* The resulting {@code Observable} returned from {@code zip} will invoke * {@code onNext} as many times as the number of {@code onNext} invokations * of the source Observable that emits the fewest items. *

* * * @param ws a collection of source Observables * @param zipFunction a function that, when applied to an item emitted by * each of the source Observables, results in an item * that will be emitted by the resulting Observable * @return an Observable that emits the zipped results * @see RxJava Wiki: zip() */ public static Observable zip(Iterable> ws, FuncN zipFunction) { return create(OperationZip.zip(ws, zipFunction)); } /** * Filter items emitted by an Observable. *

* * * @param predicate a function that evaluates the items emitted by the * source Observable, returning {@code true} if they pass * the filter * @return an Observable that emits only those items emitted by the source * Observable that the filter evaluates as {@code true} * @see RxJava Wiki: filter() */ public Observable filter(Func1 predicate) { return create(OperationFilter.filter(this, predicate)); } /** * Returns an Observable that emits all sequentially distinct items * emitted by the source Observable. *

* * * @return an Observable that emits those items from the source Observable * that are sequentially distinct * @see RxJava Wiki: distinctUntilChanged() * @see MSDN: Observable.distinctUntilChanged */ public Observable distinctUntilChanged() { return create(OperationDistinctUntilChanged.distinctUntilChanged(this)); } /** * Returns an Observable that emits all items emitted by the source * Observable that are sequentially distinct according to a key selector * function. *

* * * @param keySelector a function that projects an emitted item to a key * value that is used to decide whether an item is * sequentially distinct from another one or not * @return an Observable that emits those items from the source Observable * whose keys are sequentially distinct * @see RxJava Wiki: distinctUntilChanged() * @see MSDN: Observable.distinctUntilChanged */ public Observable distinctUntilChanged(Func1 keySelector) { return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector)); } /** * Returns an Observable that emits all items emitted by the source * Observable that are distinct. *

* * * @return an Observable that emits only those items emitted by the source * Observable that are distinct from each other * @see RxJava Wiki: distinct() * @see MSDN: Observable.distinct */ public Observable distinct() { return create(OperationDistinct.distinct(this)); } /** * Returns an Observable that emits all items emitted by the source * Observable that are distinct according to a key selector function. *

* * * @param keySelector a function that projects an emitted item to a key * value that is used to decide whether an item is * distinct from another one or not * @return an Observable that emits those items emitted by the source * Observable that have distinct keys * @see RxJava Wiki: distinct() * @see MSDN: Observable.distinct */ public Observable distinct(Func1 keySelector) { return create(OperationDistinct.distinct(this, keySelector)); } /** * Returns an Observable that emits the item at a specified index in a * sequence of emissions from a source Observbable. *

* * * @param index the zero-based index of the item to retrieve * @return an Observable that emits the item at the specified position in * the sequence of those emitted by the source Observable * @throws IndexOutOfBoundsException if index is greater than * or equal to the number of items emitted * by the source Observable * @throws IndexOutOfBoundsException if index is less than 0 * @see RxJava Wiki: elementAt() */ public Observable elementAt(int index) { return create(OperationElementAt.elementAt(this, index)); } /** * Returns the item at a specified index in a sequence or the default item * if the index is out of range. *

* * * @param index the zero-based index of the item to retrieve * @param defaultValue the default item * @return an Observable that emits the item at the specified position in * the source sequence, or the default item if the index is outside * the bounds of the source sequence * @throws IndexOutOfBoundsException if index is less than 0 * @see RxJava Wiki: elementAtOrDefault() */ public Observable elementAtOrDefault(int index, T defaultValue) { return create(OperationElementAt.elementAtOrDefault(this, index, defaultValue)); } /** * Returns an {@link Observable} that emits true if any item * emitted by the source {@link Observable} satisfies a specified condition, * otherwise false. Note: this always emits false * if the source {@link Observable} is empty. *

* In Rx.Net this is the any operator but we renamed it in * RxJava to better match Java naming idioms. *

* * * @param predicate the condition to test every item emitted by the source * Observable * @return a subscription function for creating the target Observable * @see RxJava Wiki: exists() * @see MSDN: Observable.Any Note: the description in this page was wrong at the time of this writing. */ public Observable exists(Func1 predicate) { return create(OperationAny.exists(this, predicate)); } /** * Returns an Observable that emits a Boolean that indicates whether the * source Observable emitted a specified item. *

* * * @param element the item to search for in the emissions from the source * Observable * @return an Observable that emits true if the specified item * is emitted by the source Observable, or false if the * source Observable completes without emitting that item * @see RxJava Wiki: contains() * @see MSDN: Observable.Contains */ public Observable contains(final T element) { return exists(new Func1() { public Boolean call(T t1) { return element == null ? t1 == null : element.equals(t1); } }); } /** * Registers an {@link Action0} to be called when this Observable invokes * {@link Observer#onCompleted onCompleted} or * {@link Observer#onError onError}. *

* * * @param action an {@link Action0} to be invoked when the source * Observable finishes * @return an Observable that emits the same items as the source Observable, * then invokes the {@link Action0} * @see RxJava Wiki: finallyDo() * @see MSDN: Observable.Finally */ public Observable finallyDo(Action0 action) { return create(OperationFinally.finallyDo(this, action)); } /** * Creates a new Observable by applying a function that you supply to each * item emitted by the source Observable, where that function returns an * Observable, and then merging those resulting Observables and emitting the * results of this merger. *

* *

* Note: {@code mapMany} and {@code flatMap} are equivalent. * * @param func a function that, when applied to an item emitted by the * source Observable, returns an Observable * @return an Observable that emits the result of applying the * transformation function to each item emitted by the source * Observable and merging the results of the Observables obtained * from this transformation. * @see RxJava Wiki: flatMap() * @see #mapMany(Func1) */ public Observable flatMap(Func1> func) { return mergeMap(func); } /** * Creates a new Observable by applying a function that you supply to each * item emitted by the source Observable, where that function returns an * Observable, and then merging those resulting Observables and emitting the * results of this merger. *

* * * @param func a function that, when applied to an item emitted by the * source Observable, returns an Observable * @return an Observable that emits the result of applying the * transformation function to each item emitted by the source * Observable and merging the results of the Observables obtained * from this transformation. * @see RxJava Wiki: flatMap() * @see #flatMap(Func1) */ public Observable mergeMap(Func1> func) { return merge(map(func)); } /** * Creates a new Observable by applying a function that you supply to each * item emitted by the source Observable, where that function returns an * Observable, and then concatting those resulting Observables and emitting * the results of this concat. *

* * * @param func a function that, when applied to an item emitted by the * source Observable, returns an Observable * @return an Observable that emits the result of applying the * transformation function to each item emitted by the source * Observable and concatting the results of the Observables obtained * from this transformation. */ public Observable concatMap(Func1> func) { return concat(map(func)); } /** * Creates a new Observable by applying a function that you supply to each * item emitted by the source Observable resulting in an Observable of * Observables. Then a {@link #switchLatest(Observable)} / * {@link #switchOnNext(Observable)} is applied. *

* * * @param func a function that, when applied to an item emitted by the * source Observable, returns an Observable * @return an Observable that emits the result of applying the * transformation function to each item emitted by the source * Observable and then switch */ public Observable switchMap(Func1> func) { return switchOnNext(map(func)); } /** * Filter items emitted by an Observable. *

* * * @param predicate a function that evaluates an item emitted by the source * Observable, returning {@code true} if it passes the * filter * @return an Observable that emits only those items emitted by the source * Observable that the filter evaluates as {@code true} * @see RxJava Wiki: where() * @see #filter(Func1) */ public Observable where(Func1 predicate) { return filter(predicate); } /** * Returns an Observable that applies the given function to each item * emitted by an Observable and emits the results of these function * applications. *

* * * @param func a function to apply to each item emitted by the Observable * @return an Observable that emits the items from the source Observable, * transformed by the given function * @see RxJava Wiki: map() * @see MSDN: Observable.Select */ public Observable map(Func1 func) { return create(OperationMap.map(this, func)); } /** * Returns an Observable that applies the given function to each item * emitted by an Observable and emits the results of these function * applications. *

* * * @param func a function to apply to each item emitted by the Observable * that takes the index of the emitted item as additional * parameter * @return an Observable that emits the items from the source Observable, * transformed by the given function * @see RxJava Wiki: mapWithIndex() * @see MSDN: Observable.Select * @deprecated just use zip with {@link Observable#range(int)} */ public Observable mapWithIndex(Func2 func) { return create(OperationMap.mapWithIndex(this, func)); } /** * Creates a new Observable by applying a function that you supply to each * item emitted by the source Observable, where that function returns an * Observable, and then merging those resulting Observables and emitting * the results of this merger. *

* *

* Note: mapMany and flatMap are equivalent. * * @param func a function that, when applied to an item emitted by the * source Observable, returns an Observable * @return an Observable that emits the result of applying the * transformation function to each item emitted by the source * Observable and merging the results of the Observables obtained * from this transformation. * @see RxJava Wiki: mapMany() * @see #flatMap(Func1) * @deprecated */ public Observable mapMany(Func1> func) { return mergeMap(func); } /** * Turns all of the emissions and notifications from a source Observable * into emissions marked with their original types within * {@link Notification} objects. *

* * * @return an Observable whose items are the result of materializing the * items and notifications of the source Observable * @see RxJava Wiki: materialize() * @see MSDN: Observable.materialize */ public Observable> materialize() { return create(OperationMaterialize.materialize(this)); } /** * Asynchronously subscribes and unsubscribes Observers on the specified * {@link Scheduler}. *

* * * @param scheduler the {@link Scheduler} to perform subscription and * unsubscription actions on * @return the source Observable modified so that its subscriptions and * unsubscriptions happen on the specified {@link Scheduler} * @see RxJava Wiki: subscribeOn() */ public Observable subscribeOn(Scheduler scheduler) { return create(OperationSubscribeOn.subscribeOn(this, scheduler)); } /** * Asynchronously notify {@link Observer}s on the specified * {@link Scheduler}. *

* * * @param scheduler the {@link Scheduler} to notify {@link Observer}s on * @return the source Observable modified so that its {@link Observer}s are * notified on the specified {@link Scheduler} * @see RxJava Wiki: observeOn() */ public Observable observeOn(Scheduler scheduler) { return create(OperationObserveOn.observeOn(this, scheduler)); } /** * Returns an Observable that reverses the effect of * {@link #materialize materialize} by transforming the {@link Notification} * objects emitted by the source Observable into the items or notifications * they represent. *

* * * @return an Observable that emits the items and notifications embedded in * the {@link Notification} objects emitted by the source Observable * @throws Throwable if the source Observable is not of type * {@code Observable>} * @see RxJava Wiki: dematerialize() * @see MSDN: Observable.dematerialize */ @SuppressWarnings("unchecked") public Observable dematerialize() { return create(OperationDematerialize.dematerialize((Observable>) this)); } /** * Instruct an Observable to pass control to another Observable rather than * invoking {@link Observer#onError onError} if it encounters an error. *

* *

* By default, when an Observable encounters an error that prevents it from * emitting the expected item to its {@link Observer}, the Observable * invokes its Observer's onError method, and then quits * without invoking any more of its Observer's methods. The * onErrorResumeNext method changes this behavior. If you pass * a function that returns an Observable (resumeFunction) to * onErrorResumeNext, if the original Observable encounters an * error, instead of invoking its Observer's onError method, it * will instead relinquish control to the Observable returned from * resumeFunction, which will invoke the Observer's * {@link Observer#onNext onNext} method if it is able to do so. In such a * case, because no Observable necessarily invokes onError, the * Observer may never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback * data should errors be encountered. * * @param resumeFunction a function that returns an Observable that will * take over if the source Observable encounters an * error * @return the original Observable, with appropriately modified behavior * @see RxJava Wiki: onErrorResumeNext() */ public Observable onErrorResumeNext(final Func1> resumeFunction) { return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(this, resumeFunction)); } /** * Instruct an Observable to pass control to another Observable rather than * invoking {@link Observer#onError onError} if it encounters an error. *

* *

* By default, when an Observable encounters an error that prevents it from * emitting the expected item to its {@link Observer}, the Observable * invokes its Observer's onError method, and then quits * without invoking any more of its Observer's methods. The * onErrorResumeNext method changes this behavior. If you pass * another Observable (resumeSequence) to an Observable's * onErrorResumeNext method, if the original Observable * encounters an error, instead of invoking its Observer's * onError method, it will instead relinquish control to * resumeSequence which will invoke the Observer's * {@link Observer#onNext onNext} method if it is able to do so. In such a * case, because no Observable necessarily invokes onError, the * Observer may never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback * data should errors be encountered. * * @param resumeSequence a function that returns an Observable that will * take over if the source Observable encounters an * error * @return the original Observable, with appropriately modified behavior * @see RxJava Wiki: onErrorResumeNext() */ public Observable onErrorResumeNext(final Observable resumeSequence) { return create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(this, resumeSequence)); } /** * Instruct an Observable to pass control to another Observable rather than * invoking {@link Observer#onError onError} if it encounters an error of * type {@link java.lang.Exception}. *

* This differs from {@link #onErrorResumeNext} in that this one does not * handle {@link java.lang.Throwable} or {@link java.lang.Error} but lets * those continue through. *

* *

* By default, when an Observable encounters an error that prevents it from * emitting the expected item to its {@link Observer}, the Observable * invokes its Observer's onError method, and then quits * without invoking any more of its Observer's methods. The * onErrorResumeNext method changes this behavior. If you pass * another Observable (resumeSequence) to an Observable's * onErrorResumeNext method, if the original Observable * encounters an error, instead of invoking its Observer's * onError method, it will instead relinquish control to * resumeSequence which will invoke the Observer's * {@link Observer#onNext onNext} method if it is able to do so. In such a * case, because no Observable necessarily invokes onError, * the Observer may never know that an error happened. *

* You can use this to prevent errors from propagating or to supply fallback * data should errors be encountered. * * @param resumeSequence a function that returns an Observable that will * take over if the source Observable encounters an * error * @return the original Observable, with appropriately modified behavior * @see RxJava Wiki: onExceptionResumeNextViaObservable() */ public Observable onExceptionResumeNext(final Observable resumeSequence) { return create(OperationOnExceptionResumeNextViaObservable.onExceptionResumeNextViaObservable(this, resumeSequence)); } /** * Instruct an Observable to emit an item (returned by a specified function) * rather than invoking {@link Observer#onError onError} if it encounters an * error. *

* *

* By default, when an Observable encounters an error that prevents it from * emitting the expected item to its {@link Observer}, the Observable * invokes its Observer's onError method, and then quits * without invoking any more of its Observer's methods. The * onErrorReturn method changes this behavior. If you pass a * function (resumeFunction) to an Observable's * onErrorReturn method, if the original Observable encounters * an error, instead of invoking its Observer's onError method, * it will instead emit the return value of resumeFunction. *

* You can use this to prevent errors from propagating or to supply fallback * data should errors be encountered. * * @param resumeFunction a function that returns an item that the new * Observable will emit if the source Observable * encounters an error * @return the original Observable with appropriately modified behavior * @see RxJava Wiki: onErrorReturn() */ public Observable onErrorReturn(Func1 resumeFunction) { return create(OperationOnErrorReturn.onErrorReturn(this, resumeFunction)); } /** * Returns an Observable that applies a function of your choosing to the * first item emitted by a source Observable, then feeds the result of that * function along with the second item emitted by the source Observable into * the same function, and so on until all items have been emitted by the * source Observable, and emits the final result from the final call to your * function as its sole item. *

* *

* This technique, which is called "reduce" here, is sometimes called * "aggregate," "fold," "accumulate," "compress," or "inject" in other * programming contexts. Groovy, for instance, has an inject * method that does a similar operation on lists. * * @param accumulator an accumulator function to be invoked on each item * emitted by the source Observable, whose result will * be used in the next accumulator call * @return an Observable that emits a single item that is the result of * accumulating the output from the source Observable * @throws IllegalArgumentException if the source Observable emits no items * @see RxJava Wiki: reduce() * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(Func2 accumulator) { /* * Discussion and confirmation of implementation at https://github.com/Netflix/RxJava/issues/423#issuecomment-27642532 * * It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty. */ return create(OperationScan.scan(this, accumulator)).last(); } /** * Returns an Observable emits the count of the total number of items * emitted by the source Observable. *

* * * @return an Observable that emits the number of elements emitted by the * source Observable as its single item * @see RxJava Wiki: count() * @see MSDN: Observable.Count * @see #longCount() */ public Observable count() { return reduce(0, new Func2() { @Override public Integer call(Integer t1, T t2) { return t1 + 1; } }); } /** * Returns an Observable that emits the sum of all the Integers emitted by * the source Observable. *

* * * @param source source Observable to compute the sum of * @return an Observable that emits the sum of all the Integers emitted by * the source Observable as its single item * @see RxJava Wiki: sumInteger() * @see MSDN: Observable.Sum */ public static Observable sumInteger(Observable source) { return OperationSum.sum(source); } @Deprecated public static Observable sum(Observable source) { return OperationSum.sum(source); } /** * Returns an Observable that emits the sum of all the Longs emitted by the * source Observable. *

* * * @param source source Observable to compute the sum of * @return an Observable that emits the sum of all the Longs emitted by the * source Observable as its single item * @see RxJava Wiki: sumLong() * @see MSDN: Observable.Sum */ public static Observable sumLong(Observable source) { return OperationSum.sumLongs(source); } /** * Returns an Observable that emits the sum of all the Floats emitted by the * source Observable. *

* * * @param source source Observable to compute the sum of * @return an Observable that emits the sum of all the Floats emitted by the * source Observable as its single item * @see RxJava Wiki: sumFloat() * @see MSDN: Observable.Sum */ public static Observable sumFloat(Observable source) { return OperationSum.sumFloats(source); } /** * Returns an Observable that emits the sum of all the Doubles emitted by * the source Observable. *

* * * @param source source Observable to compute the sum of * @return an Observable that emits the sum of all the Doubles emitted by * the source Observable as its single item * @see RxJava Wiki: sumDouble() * @see MSDN: Observable.Sum */ public static Observable sumDouble(Observable source) { return OperationSum.sumDoubles(source); } /** * Create an Observable that extracts an integer from each of the items * emitted by the source Observable via a function you specify, and then * emits the sum of these integers. *

* * * @param valueExtractor the function to extract an integer from each item * emitted by the source Observable * @return an Observable that emits the integer sum of the integer values * corresponding to the items emitted by the source Observable * transformed by the provided function * @see RxJava Wiki: sumInteger() * @see MSDN: Observable.Sum */ public Observable sumInteger(Func1 valueExtractor) { return create(new OperationSum.SumIntegerExtractor(this, valueExtractor)); } /** * Create an Observable that extracts a long from each of the items emitted * by the source Observable via a function you specify, and then emits the * sum of these longs. *

* * * @param valueExtractor the function to extract a long from each item * emitted by the source Observable * @return an Observable that emits the long sum of the integer values * corresponding to the items emitted by the source Observable * transformed by the provided function * @see RxJava Wiki: sumLong() * @see MSDN: Observable.Sum */ public Observable sumLong(Func1 valueExtractor) { return create(new OperationSum.SumLongExtractor(this, valueExtractor)); } /** * Create an Observable that extracts a float from each of the items emitted * by the source Observable via a function you specify, and then emits the * sum of these floats. *

* * * @param valueExtractor the function to extract a float from each item * emitted by the source Observable * @return an Observable that emits the float sum of the integer values * corresponding to the items emitted by the source Observable * transformed by the provided function * @see RxJava Wiki: sumFloat() * @see MSDN: Observable.Sum */ public Observable sumFloat(Func1 valueExtractor) { return create(new OperationSum.SumFloatExtractor(this, valueExtractor)); } /** * Create an Observable that extracts a double from each of the items * emitted by the source Observable via a function you specify, and then * emits the sum of these doubles. *

* * * @param valueExtractor the function to extract a double from each item * emitted by the source Observable * @return an Observable that emits the double sum of the integer values * corresponding to the items emitted by the source Observable * transformed by the provided function * @see RxJava Wiki: sumDouble() * @see MSDN: Observable.Sum */ public Observable sumDouble(Func1 valueExtractor) { return create(new OperationSum.SumDoubleExtractor(this, valueExtractor)); } /** * Returns an Observable that computes the average of the Integers emitted * by the source Observable. *

* * * @param source source observable to compute the average of * @return an Observable that emits the average of all the Integers emitted * by the source Observable as its single item * @throws IllegalArgumentException if the source Observable emits no items * @see RxJava Wiki: averageInteger() * @see MSDN: Observable.Average */ public static Observable averageInteger(Observable source) { return OperationAverage.average(source); } @Deprecated public static Observable average(Observable source) { return OperationAverage.average(source); } /** * Returns an Observable that computes the average of the Longs emitted by * the source Observable. *

* * * @param source source Observable to compute the average of * @return an Observable that emits the average of all the Longs emitted by * the source Observable as its single item * @see RxJava Wiki: averageLong() * @see MSDN: Observable.Average */ public static Observable averageLong(Observable source) { return OperationAverage.averageLongs(source); } /** * Returns an Observable that computes the average of the Floats emitted by * the source Observable. *

* * * @param source source Observable to compute the average of * @return an Observable that emits the average of all the Floats emitted by * the source Observable as its single item * @see RxJava Wiki: averageFloat() * @see MSDN: Observable.Average */ public static Observable averageFloat(Observable source) { return OperationAverage.averageFloats(source); } /** * Returns an Observable that emits the average of the Doubles emitted * by the source Observable. *

* * * @param source source Observable to compute the average of * @return an Observable that emits the average of all the Doubles emitted * by the source Observable as its single item * @see RxJava Wiki: averageDouble() * @see MSDN: Observable.Average */ public static Observable averageDouble(Observable source) { return OperationAverage.averageDoubles(source); } /** * Create an Observable that transforms items emitted by the source * Observable into integers by using a function you provide and then emits * the integer average of the complete sequence of transformed values. *

* * * @param valueExtractor the function to transform an item emitted by the * source Observable into an integer * @return an Observable that emits the integer average of the complete * sequence of items emitted by the source Observable when * transformed into integers by the specified function * @see RxJava Wiki: averageInteger() * @see MSDN: Observable.Average */ public Observable averageInteger(Func1 valueExtractor) { return create(new OperationAverage.AverageIntegerExtractor(this, valueExtractor)); } /** * Create an Observable that transforms items emitted by the source * Observable into longs by using a function you provide and then emits * the long average of the complete sequence of transformed values. *

* * * @param valueExtractor the function to transform an item emitted by the * source Observable into a long * @return an Observable that emits the long average of the complete * sequence of items emitted by the source Observable when * transformed into longs by the specified function * @see RxJava Wiki: averageLong() * @see MSDN: Observable.Average */ public Observable averageLong(Func1 valueExtractor) { return create(new OperationAverage.AverageLongExtractor(this, valueExtractor)); } /** * Create an Observable that transforms items emitted by the source * Observable into floats by using a function you provide and then emits * the float average of the complete sequence of transformed values. *

* * * @param valueExtractor the function to transform an item emitted by the * source Observable into a float * @return an Observable that emits the float average of the complete * sequence of items emitted by the source Observable when * transformed into floats by the specified function * @see RxJava Wiki: averageFloat() * @see MSDN: Observable.Average */ public Observable averageFloat(Func1 valueExtractor) { return create(new OperationAverage.AverageFloatExtractor(this, valueExtractor)); } /** * Create an Observable that transforms items emitted by the source * Observable into doubles by using a function you provide and then emits * the double average of the complete sequence of transformed values. *

* * * @param valueExtractor the function to transform an item emitted by the * source Observable into a double * @return an Observable that emits the double average of the complete * sequence of items emitted by the source Observable when * transformed into doubles by the specified function * @see RxJava Wiki: averageDouble() * @see MSDN: Observable.Average */ public Observable averageDouble(Func1 valueExtractor) { return create(new OperationAverage.AverageDoubleExtractor(this, valueExtractor)); } /** * Returns an Observable that emits the minimum item emitted by the source * Observable. If there is more than one such item, it returns the * last-emitted one. *

* * * @param source an Observable to determine the minimum item of * @return an Observable that emits the minimum item emitted by the source * Observable * @throws IllegalArgumentException if the source is empty * @see MSDN: Observable.Min */ public static > Observable min(Observable source) { return OperationMinMax.min(source); } /** * Returns an Observable that emits the minimum item emitted by the source * Observable, according to a specified comparator. If there is more than * one such item, it returns the last-emitted one. *

* * * @param comparator the comparer used to compare elements * @return an Observable that emits the minimum item according to the * specified comparator * @throws IllegalArgumentException if the source is empty * @see RxJava Wiki: min() * @see MSDN: Observable.Min */ public Observable min(Comparator comparator) { return OperationMinMax.min(this, comparator); } /** * Returns an Observable that emits a List of items emitted by the source * Observable that have the minimum key value. For a source Observable that * emits no items, the resulting Observable emits an empty List. *

* * * @param selector the key selector function * @return an Observable that emits a List of the items from the source * Observable that had the minimum key value * @see RxJava Wiki: minBy() * @see MSDN: Observable.MinBy */ public > Observable> minBy(Func1 selector) { return OperationMinMax.minBy(this, selector); } /** * Returns an Observable that emits a List of items emitted by the source * Observable that have the minimum key value according to a given * comparator function. For a source Observable that emits no items, the * resulting Observable emits an empty List. *

* * * @param selector the key selector function * @param comparator the comparator used to compare key values * @return an Observable that emits a List of the items emitted by the * source Observable that had the minimum key value according to the * specified comparator * @see RxJava Wiki: minBy() * @see MSDN: Observable.MinBy */ public Observable> minBy(Func1 selector, Comparator comparator) { return OperationMinMax.minBy(this, selector, comparator); } /** * Returns an Observable that emits the maximum item emitted by the source * Observable. If there is more than one item with the same maximum value, * it emits the last-emitted of these. *

* * * @param source an Observable to scan for the maximum emitted item * @return an Observable that emits this maximum item from the source * @throws IllegalArgumentException if the source is empty * @see RxJava Wiki: max() * @see MSDN: Observable.Max */ public static > Observable max(Observable source) { return OperationMinMax.max(source); } /** * Returns an Observable that emits the maximum item emitted by the source * Observable, according to the specified comparator. If there is more than * one item with the same maximum value, it emits the last-emitted of these. *

* * * @param comparator the comparer used to compare items * @return an Observable that emits the maximum item emitted by the source * Observable, according to the specified comparator * @throws IllegalArgumentException if the source is empty * @see RxJava Wiki: max() * @see MSDN: Observable.Max */ public Observable max(Comparator comparator) { return OperationMinMax.max(this, comparator); } /** * Returns an Observable that emits a List of items emitted by the source * Observable that have the maximum key value. For a source Observable that * emits no items, the resulting Observable emits an empty List. *

* * * @param selector the key selector function * @return an Observable that emits a List of those items emitted by the * source Observable that had the maximum key value * @see RxJava Wiki: maxBy() * @see MSDN: Observable.MaxBy */ public > Observable> maxBy(Func1 selector) { return OperationMinMax.maxBy(this, selector); } /** * Returns an Observable that emits a List of items emitted by the source * Observable that have the maximum key value according to a specified * comparator. For a source Observable that emits no items, the resulting * Observable emits an empty List. *

* * * @param selector the key selector function * @param comparator the comparator used to compare key values * @return an Observable that emits a List of those items emitted by the * source Observable that had the maximum key value according to the * specified comparator * @see RxJava Wiki: maxBy() * @see MSDN: Observable.MaxBy */ public Observable> maxBy(Func1 selector, Comparator comparator) { return OperationMinMax.maxBy(this, selector, comparator); } /** * Returns a {@link ConnectableObservable} that shares a single subscription * to the underlying Observable that will replay all of its items and * notifications to any future {@link Observer}. *

* * * @return a {@link ConnectableObservable} that upon connection causes the * source Observable to emit items to its {@link Observer}s * @see RxJava Wiki: replay() */ public ConnectableObservable replay() { return OperationMulticast.multicast(this, ReplaySubject. create()); } /** * Returns a {@link ConnectableObservable} that shares a single subscription * to the underlying Observable that will replay all of its items and * notifications to any future {@link Observer} on the given scheduler. *

* * * @param scheduler the scheduler on which the Observers will observe the * emitted items * @return a {@link ConnectableObservable} that shares a single subscription * to the source Observable that will replay all of its items and * notifications to any future {@link Observer} on the given * scheduler * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public ConnectableObservable replay(Scheduler scheduler) { return OperationMulticast.multicast(this, OperationReplay.createScheduledSubject(ReplaySubject.create(), scheduler)); } /** * Returns a connectable observable sequence that shares a single * subscription to the source Observable that replays at most * {@code bufferSize} items emitted by that Observable. *

* * * @param bufferSize the buffer size * @return a connectable observable sequence that shares a single * subscription to the source Observable and replays at most * {@code bufferSize} items emitted by that Observable * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public ConnectableObservable replay(int bufferSize) { return OperationMulticast.multicast(this, OperationReplay.replayBuffered(bufferSize)); } /** * Returns a connectable observable sequence that shares a single * subscription to the source Observable and replays at most * {@code bufferSize} items emitted by that Observable. *

* * * @param bufferSize the buffer size * @param scheduler the scheduler on which the Observers will observe the * emitted items * @return a connectable observable sequence that shares a single * subscription to the source Observable and replays at most * {@code bufferSize} items emitted by that Observable * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public ConnectableObservable replay(int bufferSize, Scheduler scheduler) { return OperationMulticast.multicast(this, OperationReplay.createScheduledSubject( OperationReplay.replayBuffered(bufferSize), scheduler)); } /** * Returns a connectable observable sequence that shares a single * subscription to the source Observable and replays all items emitted by * that Observable within a time window. *

* * * @param time the window length * @param unit the window length time unit * @return a connectable observable sequence that shares a single * subscription to the source Observable and that replays all items * emitted by that Observable during the window defined by * {@code time} and {@code unit} * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public ConnectableObservable replay(long time, TimeUnit unit) { return replay(time, unit, Schedulers.threadPoolForComputation()); } /** * Returns a connectable observable sequence that shares a single * subscription to the source Observable and replays all items emitted by * that Observable within a time window. *

* * * @param time the window length * @param unit the window length time unit * @param scheduler the scheduler that is used as a time source for the * window * @return a connectable observable sequence that shares a single * subscription to the source Observable and replays all items * emitted by that Observable within the window defined by * {@code time} and {@code unit} * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public ConnectableObservable replay(long time, TimeUnit unit, Scheduler scheduler) { return OperationMulticast.multicast(this, OperationReplay.replayWindowed(time, unit, -1, scheduler)); } /** * Returns a connectable observable sequence that shares a single * subscription to the underlying sequence replaying {@code bufferSize} * notifications within window. *

* * * @param bufferSize the buffer size * @param time the window length * @param unit the window length time unit * @return Returns a connectable observable sequence that shares a single * subscription to the underlying sequence replaying bufferSize notifications within window * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public ConnectableObservable replay(int bufferSize, long time, TimeUnit unit) { return replay(bufferSize, time, unit, Schedulers.threadPoolForComputation()); } /** * Returns a connectable observable sequence that shares a single * subscription to the underlying sequence and that replays a maximum of * {@code bufferSize} items that are emitted within the window defined by * {@code time} and {@code unit}. *

* * * @param bufferSize the buffer size * @param time the window length * @param unit the window length time unit * @param scheduler the scheduler that is used as a time source for the * window * @return a connectable observable sequence that shares a single * subscription to the underlying sequence that replays a maximum of * {@code bufferSize} items that are emitted within the window * defined by {@code time} and {@code unit} * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public ConnectableObservable replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) { if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } return OperationMulticast.multicast(this, OperationReplay.replayWindowed(time, unit, bufferSize, scheduler)); } /** * Returns an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a single * subscription to the underlying sequence and starts with initial value. * * @param the return element type * @param selector the selector function which can use the multicasted * this sequence as many times as needed, without causing * multiple subscriptions to this sequence * @return an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a * single subscription to the underlying sequence and starts with * initial value * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public Observable replay(Func1, ? extends Observable> selector) { return OperationMulticast.multicast(this, new Func0>() { @Override public Subject call() { return ReplaySubject.create(); } }, selector); } /** * Returns an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a single * subscription to the underlying sequence replaying all notifications. * * @param the return element type * @param selector the selector function which can use the multicasted * this sequence as many times as needed, without causing * multiple subscriptions to this sequence * @param scheduler the scheduler where the replay is observed * @return an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a * single subscription to the underlying sequence replaying all * notifications * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public Observable replay(Func1, ? extends Observable> selector, final Scheduler scheduler) { return OperationMulticast.multicast(this, new Func0>() { @Override public Subject call() { return OperationReplay.createScheduledSubject(ReplaySubject.create(), scheduler); } }, selector); } /** * Returns an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a single * subscription to the underlying sequence replaying {@code bufferSize} * notifications. * * @param the return element type * @param selector the selector function which can use the multicasted * this sequence as many times as needed, without causing * multiple subscriptions to this sequence * @param bufferSize the buffer size * @return an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a * single subscription to the underlying sequence replaying * {@code bufferSize} notifications * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public Observable replay(Func1, ? extends Observable> selector, final int bufferSize) { return OperationMulticast.multicast(this, new Func0>() { @Override public Subject call() { return OperationReplay.replayBuffered(bufferSize); } }, selector); } /** * Returns an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a single * subscription to the underlying sequence replaying {@code bufferSize} * notifications. * * @param the return element type * @param selector the selector function which can use the multicasted * this sequence as many times as needed, without causing * multiple subscriptions to this sequence * @param bufferSize the buffer size * @param scheduler the scheduler where the replay is observed * @return an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a * single subscription to the underlying sequence replaying * {@code bufferSize} notifications * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public Observable replay(Func1, ? extends Observable> selector, final int bufferSize, final Scheduler scheduler) { return OperationMulticast.multicast(this, new Func0>() { @Override public Subject call() { return OperationReplay.createScheduledSubject(OperationReplay.replayBuffered(bufferSize), scheduler); } }, selector); } /** * Returns an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a single * subscription to the underlying sequence replaying all notifications * within window. * * @param the return element type * @param selector the selector function which can use the multicasted * this sequence as many times as needed, without causing * multiple subscriptions to this sequence * @param time the window length * @param unit the window length time unit * @return an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a * single subscription to the underlying sequence replaying all * notifications within window * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public Observable replay(Func1, ? extends Observable> selector, long time, TimeUnit unit) { return replay(selector, time, unit, Schedulers.threadPoolForComputation()); } /** * Returns an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a single * subscription to the underlying sequence replaying all notifications * within window. * * @param the return element type * @param selector the selector function which can use the multicasted * this sequence as many times as needed, without causing * multiple subscriptions to this sequence * @param time the window length * @param unit the window length time unit * @param scheduler the scheduler that is used as a time source for the * window * @return an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a * single subscription to the underlying sequence replaying all * notifications within window * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public Observable replay(Func1, ? extends Observable> selector, final long time, final TimeUnit unit, final Scheduler scheduler) { return OperationMulticast.multicast(this, new Func0>() { @Override public Subject call() { return OperationReplay.replayWindowed(time, unit, -1, scheduler); } }, selector); } /** * Returns an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a single * subscription to the underlying sequence replaying {@code bufferSize} * notifications within window. * * @param the return element type * @param selector the selector function which can use the multicasted * this sequence as many times as needed, without causing * multiple subscriptions to this sequence * @param bufferSize the buffer size * @param time the window length * @param unit the window length time unit * @return an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a * single subscription to the underlying sequence replaying * {@code bufferSize} notifications within window * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public Observable replay(Func1, ? extends Observable> selector, int bufferSize, long time, TimeUnit unit) { return replay(selector, bufferSize, time, unit, Schedulers.threadPoolForComputation()); } /** * Returns an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a single * subscription to the underlying sequence replaying {@code bufferSize} * notifications within window. * * @param the return element type * @param selector the selector function which can use the multicasted * this sequence as many times as needed, without causing * multiple subscriptions to this sequence * @param bufferSize the buffer size * @param time the window length * @param unit the window length time unit * @param scheduler the scheduler which is used as a time source for the * window * @return an observable sequence that is the result of invoking the * selector on a connectable observable sequence that shares a * single subscription to the underlying sequence replaying * {@code bufferSize} notifications within window * @see RxJava Wiki: replay() * @see MSDN: Observable.Replay */ public Observable replay(Func1, ? extends Observable> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } return OperationMulticast.multicast(this, new Func0>() { @Override public Subject call() { return OperationReplay.replayWindowed(time, unit, bufferSize, scheduler); } }, selector); } /** * Retry subscription to the source Observable when it calls * onError up to a certain number of retries. *

* *

* If the source Observable calls {@link Observer#onError}, this method will * resubscribe to the source Observable for a maximum of * retryCount resubscriptions. *

* Any and all items emitted by the source Observable will be emitted by * the resulting Observable, even those emitted during failed subscriptions. * For example, if an Observable fails at first but emits [1, 2] then * succeeds the second time and emits [1, 2, 3, 4, 5] then the complete * sequence of emissions and notifications would be * [1, 2, 1, 2, 3, 4, 5, onCompleted]. * * @param retryCount number of retry attempts before failing * @return the source Observable modified with retry logic * @see RxJava Wiki: retry() */ public Observable retry(int retryCount) { return create(OperationRetry.retry(this, retryCount)); } /** * Retry subscription to the source Observable whenever it calls * onError (infinite retry count). *

* *

* If the source Observable calls {@link Observer#onError}, this method will * resubscribe to the source Observable. *

* Any and all items emitted by the source Observable will be emitted by * the resulting Observable, even those emitted during failed subscriptions. * For example, if an Observable fails at first but emits [1, 2] then * succeeds the second time and emits [1, 2, 3, 4, 5] then the complete * sequence of emissions and notifications would be * [1, 2, 1, 2, 3, 4, 5, onCompleted]. * * @return the source Observable modified with retry logic * @see RxJava Wiki: retry() */ public Observable retry() { return create(OperationRetry.retry(this)); } /** * This method has similar behavior to {@link #replay} except that this * auto-subscribes to the source Observable rather than returning a * {@link ConnectableObservable}. *

* *

* This is useful when you want an Observable to cache responses and you * can't control the subscribe/unsubscribe behavior of all the * {@link Observer}s. *

* When you call {@code cache()}, it does not yet subscribe to the * source Observable. This only happens when {@code subscribe} is called * the first time on the Observable returned by {@code cache()}. *

* Note: You sacrifice the ability to unsubscribe from the origin when you * use the cache() operator so be careful not to use this * operator on Observables that emit an infinite or very large number of * items that will use up memory. * * @return an Observable that, when first subscribed to, caches all of its * items and notifications for the benefit of subsequent observers * @see RxJava Wiki: cache() */ public Observable cache() { return create(OperationCache.cache(this)); } /** * Perform work in parallel by sharding an {@code Observable} on a * {@link Schedulers#threadPoolForComputation()} {@link Scheduler} and * return an {@code Observable} with the output. *

* * * @param f a {@link Func1} that applies Observable operators to * {@code Observable} in parallel and returns an * {@code Observable} * @return an Observable with the output of the {@link Func1} executed on a * {@link Scheduler} * @see RxJava Wiki: parallel() */ public Observable parallel(Func1, Observable> f) { return OperationParallel.parallel(this, f); } /** * Perform work in parallel by sharding an {@code Observable} on a * {@link Scheduler} and return an {@code Observable} with the output. *

* * * @param f a {@link Func1} that applies Observable operators to * {@code Observable} in parallel and returns an * {@code Observable} * @param s a {@link Scheduler} to perform the work on * @return an Observable with the output of the {@link Func1} executed on a * {@link Scheduler} * @see RxJava Wiki: parallel() */ public Observable parallel(final Func1, Observable> f, final Scheduler s) { return OperationParallel.parallel(this, f, s); } /** * Merges an Observable<Observable<T>> to * Observable<Observable<T>> with the number of * inner Observables defined by parallelObservables. *

* For example, if the original * Observable<Observable<T>> has 100 Observables to * be emitted and parallelObservables is 8, the 100 will be * grouped onto 8 output Observables. *

* This is a mechanism for efficiently processing n number of * Observables on a smaller m number of resources (typically CPU * cores). *

* * * @param parallelObservables the number of Observables to merge into * @return an Observable of Observables constrained in number by * parallelObservables * @see RxJava Wiki: parallelMerge() */ public static Observable> parallelMerge(Observable> source, int parallelObservables) { return OperationParallelMerge.parallelMerge(source, parallelObservables); } /** * Merges an Observable<Observable<T>> to * Observable<Observable<T>> with the number of * inner Observables defined by parallelObservables and runs * each Observable on the defined Scheduler. *

* For example, if the original * Observable<Observable<T>> has 100 Observables to * be emitted and parallelObservables is 8, the 100 will be * grouped onto 8 output Observables. *

* This is a mechanism for efficiently processing n number of * Observables on a smaller m number of resources (typically CPU * cores). *

* * * @param parallelObservables the number of Observables to merge into * @param scheduler the Scheduler to run each Observable on * @return an Observable of Observables constrained in number by * parallelObservables * @see RxJava Wiki: parallelMerge() */ public static Observable> parallelMerge(Observable> source, int parallelObservables, Scheduler scheduler) { return OperationParallelMerge.parallelMerge(source, parallelObservables, scheduler); } /** * Returns a {@link ConnectableObservable}, which waits until its * {@link ConnectableObservable#connect connect} method is called before it * begins emitting items to those {@link Observer}s that have subscribed to * it. *

* * * @return a {@link ConnectableObservable} that upon connection causes the * source Observable to emit items to its {@link Observer}s * @see RxJava Wiki: publish() */ public ConnectableObservable publish() { return OperationMulticast.multicast(this, PublishSubject. create()); } /** * Returns a {@link ConnectableObservable} that emits only the last item * emitted by the source Observable. *

* * * @return a {@link ConnectableObservable} * @see RxJava Wiki: publishLast() */ public ConnectableObservable publishLast() { return OperationMulticast.multicast(this, AsyncSubject. create()); } /** * Synonymous with reduce(). *

* * * @see RxJava Wiki: reduce() * @see #reduce(Func2) * @deprecated use #reduce(Func2) */ public Observable aggregate(Func2 accumulator) { return reduce(accumulator); } /** * Returns an Observable that applies a function of your choosing to the * first item emitted by a source Observable, then feeds the result of that * function along with the second item emitted by an Observable into the * same function, and so on until all items have been emitted by the source * Observable, emitting the final result from the final call to your * function as its sole item. *

* *

* This technique, which is called "reduce" here, is sometimec called * "aggregate," "fold," "accumulate," "compress," or "inject" in other * programming contexts. Groovy, for instance, has an inject * method that does a similar operation on lists. * * @param initialValue the initial (seed) accumulator value * @param accumulator an accumulator function to be invoked on each item * emitted by the source Observable, the result of which * will be used in the next accumulator call * @return an Observable that emits a single item that is the result of * accumulating the output from the items emitted by the source * Observable * @see RxJava Wiki: reduce() * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(R initialValue, Func2 accumulator) { return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1); } /** * Collect values into a single mutable data structure. *

* A simplified version of `reduce` that does not need to return the state on each pass. *

* * @param state * @param collector * @return */ public Observable collect(R state, final Action2 collector) { Func2 accumulator = new Func2() { @Override public R call(R state, T value) { collector.call(state, value); return state; } }; return reduce(state, accumulator); } /** * Synonymous with reduce(). *

* * * @see RxJava Wiki: reduce() * @see #reduce(Object, Func2) * @deprecated use #reduce(Object, Func2) */ public Observable aggregate(R initialValue, Func2 accumulator) { return reduce(initialValue, accumulator); } /** * Returns an Observable that applies a function of your choosing to the * first item emitted by a source Observable, then feeds the result of that * function along with the second item emitted by an Observable into the * same function, and so on until all items have been emitted by the source * Observable, emitting the result of each of these iterations. *

* *

* This sort of function is sometimes called an accumulator. * * @param accumulator an accumulator function to be invoked on each item * emitted by the source Observable, whose result will be * emitted to {@link Observer}s via * {@link Observer#onNext onNext} and used in the next * accumulator call * @return an Observable that emits the results of each call to the * accumulator function * @see RxJava Wiki: scan() * @see MSDN: Observable.Scan */ public Observable scan(Func2 accumulator) { return create(OperationScan.scan(this, accumulator)); } /** * Returns an Observable that emits the results of sampling the items * emitted by the source Observable at a specified time interval. *

* * * @param period the sampling rate * @param unit the {@link TimeUnit} in which period is defined * @return an Observable that emits the results of sampling the items * emitted by the source Observable at the specified time interval * @see RxJava Wiki: sample() */ public Observable sample(long period, TimeUnit unit) { return create(OperationSample.sample(this, period, unit)); } /** * Returns an Observable that emits the results of sampling the items * emitted by the source Observable at a specified time interval. *

* * * @param period the sampling rate * @param unit the {@link TimeUnit} in which period is defined * @param scheduler the {@link Scheduler} to use when sampling * @return an Observable that emits the results of sampling the items * emitted by the source Observable at the specified time interval * @see RxJava Wiki: sample() */ public Observable sample(long period, TimeUnit unit, Scheduler scheduler) { return create(OperationSample.sample(this, period, unit, scheduler)); } /** * Return an Observable that emits the results of sampling the items emitted * by this Observable when the sampler Observable emits an item * or completes. *

* * * @param sampler the Observable to use for sampling the source Observable * @return an Observable that emits the results of sampling the items * emitted by this Observable whenever the sampler * Observable emits an item or completes * @see RxJava Wiki: sample() */ public Observable sample(Observable sampler) { return create(new OperationSample.SampleWithObservable(this, sampler)); } /** * Returns an Observable that applies a function of your choosing to the * first item emitted by a source Observable, then feeds the result of that * function along with the second item emitted by an Observable into the * same function, and so on until all items have been emitted by the source * Observable, emitting the result of each of these iterations. *

* *

* This sort of function is sometimes called an accumulator. *

* Note that when you pass a seed to scan() the resulting * Observable will emit that seed as its first emitted item. * * @param initialValue the initial (seed) accumulator item * @param accumulator an accumulator function to be invoked on each item * emitted by the source Observable, whose result will be * emitted to {@link Observer}s via * {@link Observer#onNext onNext} and used in the next * accumulator call * @return an Observable that emits the results of each call to the * accumulator function * @see RxJava Wiki: scan() * @see MSDN: Observable.Scan */ public Observable scan(R initialValue, Func2 accumulator) { return create(OperationScan.scan(this, initialValue, accumulator)); } /** * Returns an Observable that emits a Boolean that indicates whether all of * the items emitted by the source Observable satisfy a condition. *

* * * @param predicate a function that evaluates an item and returns a Boolean * @return an Observable that emits true if all items emitted * by the source Observable satisfy the predicate; otherwise, * false * @see RxJava Wiki: all() */ public Observable all(Func1 predicate) { return create(OperationAll.all(this, predicate)); } /** * Returns an Observable that skips the first num items emitted * by the source Observable and emits the remainder. *

* * * @param num the number of items to skip * @return an Observable that is identical to the source Observable except * that it does not emit the first num items that the * source Observable emits * @see RxJava Wiki: skip() */ public Observable skip(int num) { return create(OperationSkip.skip(this, num)); } /** * Create an Observable that skips values before the given time ellapses. *

* * * @param time the length of the time window * @param unit the time unit * @return an Observable that skips values before the given time ellapses * @see RxJava Wiki: skip() */ public Observable skip(long time, TimeUnit unit) { return skip(time, unit, Schedulers.threadPoolForComputation()); } /** * Create an Observable that skips values before the given time elapses * while waiting on the given scheduler. *

* * * @param time the length of the time window * @param unit the time unit * @param scheduler the scheduler where the timed wait happens * @return an Observable that skips values before the given time elapses * while waiting on the given scheduler * @see RxJava Wiki: skip() */ public Observable skip(long time, TimeUnit unit, Scheduler scheduler) { return create(new OperationSkip.SkipTimed(this, time, unit, scheduler)); } /** * If the Observable completes after emitting a single item, return an * Observable containing that item. If it emits more than one item or no * item, throw an IllegalArgumentException. *

* * * @return an Observable that emits the single item emitted by the source * Observable that matches the predicate * @throws IllegalArgumentException if the source emits more than one item * or no items * @see RxJava Wiki: single() * @see MSDN: Observable.singleAsync() */ public Observable single() { return create(OperationSingle. single(this)); } /** * If the Observable completes after emitting a single item that matches a * predicate, return an Observable that emits that item. If the source * Observable emits more than one such item or no such items, throw an * IllegalArgumentException. *

* * * @param predicate a predicate function to evaluate items emitted by the * source Observable * @return an Observable that emits the single item emitted by the source * Observable that matches the predicate * @throws IllegalArgumentException if the source Observable emits more than * one item or no items matching the * predicate * @see RxJava Wiki: single() * @see MSDN: Observable.singleAsync() */ public Observable single(Func1 predicate) { return filter(predicate).single(); } /** * If the source Observable completes after emitting a single item, return * an Observable that emits that item. If the source Observable is empty, * return an Observable that emits a default item. If the source Observable * emits more than one item, throw an IllegalArgumentException. *

* * * @param defaultValue a default value to emit if the source Observable * emits no item * @return an Observable that emits the single item emitted by the source * Observable, or default value if the source Observable is empty * @throws IllegalArgumentException if the source emits more than one item * @see RxJava Wiki: single() * @see MSDN: Observable.singleOrDefaultAsync() */ public Observable singleOrDefault(T defaultValue) { return create(OperationSingle. singleOrDefault(this, defaultValue)); } /** * If the Observable completes after emitting a single item that matches a * predicate, return an Observable that emits that item. If the source * Observable emits no such item, return an Observable that emits a default * item. If the source Observable emits more than one such item, throw an * IllegalArgumentException. *

* * * @param defaultValue a default value to emit if the source Observable * emits no matching items * @param predicate a predicate function to evaluate items emitted by the * source Observable * @return an Observable that emits the single item emitted by the source * Observable that matches the predicate, or the default item if no * emitted item matches the predicate * @throws IllegalArgumentException if the source emits more than one item * matching the predicate * @see RxJava Wiki: single() * @see MSDN: Observable.singleOrDefaultAsync() */ public Observable singleOrDefault(T defaultValue, Func1 predicate) { return filter(predicate).singleOrDefault(defaultValue); } /** * Returns an Observable that emits only the very first item emitted by the * source Observable, or an IllegalArgumentException if the * source {@link Observable} is empty. *

* * * @return an Observable that emits only the very first item from the * source, or an IllegalArgumentException if the source {@link Observable} is empty. * @see RxJava Wiki: first() * @see MSDN: Observable.firstAsync() */ public Observable first() { return take(1).single(); } /** * Returns an Observable that emits only the very first item emitted by the * source Observable that satisfies a given condition, or an * IllegalArgumentException if no such items are emitted. *

* * * @param predicate the condition any source emitted item has to satisfy * @return an Observable that emits only the very first item satisfying the * given condition from the source, or an IllegalArgumentException if no such items are emitted. * @see RxJava Wiki: first() * @see MSDN: Observable.firstAsync() */ public Observable first(Func1 predicate) { return takeFirst(predicate).single(); } /** * Returns an Observable that emits only the very first item emitted by the * source Observable, or a default item. *

* * * @param defaultValue the default item to emit if the source Observable * doesn't emit anything * @return an Observable that emits only the very first item from the * source, or a default item if the source Observable completes * without emitting a single item * @see RxJava Wiki: firstOrDefault() * @see MSDN: Observable.firstOrDefaultAsync() */ public Observable firstOrDefault(T defaultValue) { return take(1).singleOrDefault(defaultValue); } /** * Returns an Observable that emits only the very first item emitted by the * source Observable that satisfies a given condition, or a default item * otherwise. *

* * * @param predicate the condition any source emitted item has to satisfy * @param defaultValue the default item to emit if the source Observable * doesn't emit anything that satisfies the given condition * @return an Observable that emits only the very first item from the source * that satisfies the given condition, or a default item otherwise * @see RxJava Wiki: firstOrDefault() * @see MSDN: Observable.firstOrDefaultAsync() */ public Observable firstOrDefault(T defaultValue, Func1 predicate) { return takeFirst(predicate).singleOrDefault(defaultValue); } /** * Returns an Observable that emits the items emitted by the source * Observable or a specified default item if the source Observable is empty. *

* * * @param defaultValue the item to emit if the source Observable emits no * items * @return an Observable that emits either the specified default item if the * source Observable emits no items, or the items emitted by the * source Observable * @see RxJava Wiki: defaultIfEmpty() * @see MSDN: Observable.DefaultIfEmpty */ public Observable defaultIfEmpty(T defaultValue) { return create(OperationDefaultIfEmpty.defaultIfEmpty(this, defaultValue)); } /** * Returns an Observable that emits only the first num items * emitted by the source Observable. *

* *

* This method returns an Observable that will invoke a subscribing * {@link Observer}'s {@link Observer#onNext onNext} function a maximum of * num times before invoking * {@link Observer#onCompleted onCompleted}. * * @param num the number of items to emit * @return an Observable that emits only the first num items * emitted by the source Observable, or all of the items from the * source Observable if that Observable emits fewer than * num items * @see RxJava Wiki: take() */ public Observable take(final int num) { return create(OperationTake.take(this, num)); } /** * Create an Observable that emits the emitted items from the source * Observable before the time runs out. *

* * * @param time the length of the time window * @param unit the time unit * @return an Observable that emits the emitted items from the source * Observable before the time runs out * @see RxJava Wiki: take() */ public Observable take(long time, TimeUnit unit) { return take(time, unit, Schedulers.threadPoolForComputation()); } /** * Create an Observable that emits the emitted items from the source * Observable before the time runs out, waiting on the given scheduler. *

* * * @param time the length of the time window * @param unit the time unit * @param scheduler the scheduler used for time source * @return an Observable that emits the emitted items from the source * Observable before the time runs out, waiting on the given * scheduler * @see RxJava Wiki: take() */ public Observable take(long time, TimeUnit unit, Scheduler scheduler) { return create(new OperationTake.TakeTimed(this, time, unit, scheduler)); } /** * Returns an Observable that emits items emitted by the source Observable * so long as a specified condition is true. *

* * * @param predicate a function that evaluates an item emitted by the source * Observable and returns a Boolean * @return an Observable that emits the items from the source Observable so * long as each item satisfies the condition defined by * predicate * @see RxJava Wiki: takeWhile() */ public Observable takeWhile(final Func1 predicate) { return create(OperationTakeWhile.takeWhile(this, predicate)); } /** * Returns an Observable that emits the items emitted by a source Observable * so long as a given predicate remains true, where the predicate can * operate on both the item and its index relative to the complete sequence * of items. *

* * * @param predicate a function to test each item emitted by the source * Observable for a condition; the second parameter of the * function represents the index of the source item * @return an Observable that emits items from the source Observable so long * as the predicate continues to return true for each * item, then completes * @see RxJava Wiki: takeWhileWithIndex() */ public Observable takeWhileWithIndex(final Func2 predicate) { return create(OperationTakeWhile.takeWhileWithIndex(this, predicate)); } /** * Returns an Observable that emits only the very first item emitted by the * source Observable. *

* * * @return an Observable that emits only the very first item from the * source, or an empty Observable if the source Observable completes * without emitting a single item * @deprecated Use take(1) directly. * @see RxJava Wiki: first() * @see MSDN: Observable.firstAsync() */ @Deprecated public Observable takeFirst() { return take(1); } /** * Returns an Observable that emits only the very first item emitted by the * source Observable that satisfies a given condition. *

* * * @param predicate the condition any source emitted item has to satisfy * @return an Observable that emits only the very first item satisfying the * given condition from the source, or an empty Observable if the * source Observable completes without emitting a single matching * item * @see RxJava Wiki: first() * @see MSDN: Observable.firstAsync() */ public Observable takeFirst(Func1 predicate) { return filter(predicate).take(1); } /** * Returns an Observable that emits only the last count items * emitted by the source Observable. *

* * * @param count the number of items to emit from the end of the sequence * emitted by the source Observable * @return an Observable that emits only the last count items * emitted by the source Observable * @see RxJava Wiki: takeLast() */ public Observable takeLast(final int count) { return create(OperationTakeLast.takeLast(this, count)); } /** * Return an Observable which emits the items from the source Observable * that were emitted not before it completed minus a time window. *

* * * @param time the length of the time window, relative to the completion of * the source Observable * @param unit the time unit * @return an Observable that emits the items from the source Observable * that were emitted not before it completed minus a time window */ public Observable takeLast(long time, TimeUnit unit) { return takeLast(time, unit, Schedulers.threadPoolForComputation()); } /** * Return an Observable that emits the items from the source Observable that * were emitted not before the source Observable completed minus a time * window, where the timing information is provided by the given scheduler. *

* * * @param time the length of the time window, relative to the completion of * the source Observable * @param unit the time unit * @param scheduler the Scheduler that provides the timestamps for the * Observed items * @return an Observable that emits the items from the source Observable * that were emitted not before it completed minus a time window, * where the timing information is provided by the given Scheduler */ public Observable takeLast(long time, TimeUnit unit, Scheduler scheduler) { return create(OperationTakeLast.takeLast(this, time, unit, scheduler)); } /** * Return an Observable that emits at most a specified number of items from * the source Observable that were emitted not before it completed minus a * time window. *

* * * @param count the maximum number of items to emit * @param time the length of the time window, relative to the completion of * the source Observable * @param unit the time unit * @return an Observable that emits at most {@code count} items from the * source Observable which were emitted not before it completed * minus a time window */ public Observable takeLast(int count, long time, TimeUnit unit) { return takeLast(count, time, unit, Schedulers.threadPoolForComputation()); } /** * Return an Observable that emits at most a specified number of items from * the source Observable that were emitted not before it completed minus a * time window, where the timing information is provided by a given * scheduler. *

* * * @param count the maximum number of items to emit * @param time the length of the time window, relative to the completion of * the source Observable * @param unit the time unit * @param scheduler the Scheduler that provides the timestamps for the * observed items * @return an Observable that emits at most {@code count} items from the * source Observable which were emitted not before it completed * minus a time window, where the timing information is provided by * the given {@code scheduler} */ public Observable takeLast(int count, long time, TimeUnit unit, Scheduler scheduler) { if (count < 0) { throw new IllegalArgumentException("count >= 0 required"); } return create(OperationTakeLast.takeLast(this, count, time, unit, scheduler)); } /** * Return an Observable that emits single List containing the last * {@code count} elements emitted by the source Observable. *

* * * @param count the number of items to take last * @return an Observable that emits a single list containing the last * {@code count} elements emitted by the source Observable */ public Observable> takeLastBuffer(int count) { return takeLast(count).toList(); } /** * Return an Observable that emits single List containing items that were * emitted by the source Observable not before it completed minus a time * window. *

* * * @param time the length of the time window, relative to the completion of * the source Observable * @param unit the time unit * @return an Observable that emits single list containing items that were * were emitted by the source Observable not before it completed * minus a time window */ public Observable> takeLastBuffer(long time, TimeUnit unit) { return takeLast(time, unit).toList(); } /** * Return an Observable that emits single List containing items that were * emitted by the source Observable not before it completed minus a time * window, where the timing information is provided by the given Scheduler. *

* * * @param time the length of the time window, relative to the completion of * the source Observable * @param unit the time unit * @param scheduler the Scheduler that provides the timestamps for the * observed items * @return an Observable that emits single list containing items that were * were emitted by the source Observable not before it completed * minus a time window, where the timing information is provided by * the given scheduler */ public Observable> takeLastBuffer(long time, TimeUnit unit, Scheduler scheduler) { return takeLast(time, unit, scheduler).toList(); } /** * Return an Observable that emits a single List containing at most * {@code count} items from the source Observable that were emitted not * before it completed minus a time window. *

* * * @param count the maximum number of items to emit * @param time the length of the time window, relative to the completion of * the source Observable * @param unit the time unit * @return an Observable that emits a single List containing at most * {@code count} items emitted by the source Observable not before * it completed minus a time window */ public Observable> takeLastBuffer(int count, long time, TimeUnit unit) { return takeLast(count, time, unit).toList(); } /** * Return an Observable that emits a single List containing at most * {@code count} items from the source Observable that were emitted not * before it completed minus a time window. *

* * * @param count the maximum number of items to emit * @param time the length of the time window, relative to the completion of * the source Observable * @param unit the time unit * @param scheduler the scheduler that provides the timestamps for the * observed items * @return an Observable that emits a single List containing at most * {@code count} items emitted by the source Observable not before * it completed minus a time window */ public Observable> takeLastBuffer(int count, long time, TimeUnit unit, Scheduler scheduler) { return takeLast(count, time, unit, scheduler).toList(); } /** * Returns an Observable that emits the items from the source Observable * only until the other Observable emits an item. *

* * * @param other the Observable whose first emitted item will cause * takeUntil to stop emitting items from the * source Observable * @param the type of items emitted by other * @return an Observable that emits the items of the source Observable until * such time as other emits its first item * @see RxJava Wiki: takeUntil() */ public Observable takeUntil(Observable other) { return OperationTakeUntil.takeUntil(this, other); } /** * Returns an Observable that bypasses all items from the source Observable * as long as the specified condition holds true, but emits all further * source items as soon as the condition becomes false. *

* * * @param predicate a function to test each item emitted from the source * Observable for a condition. It receives the emitted item * as the first parameter and the index of the emitted item * as a second parameter. * @return an Observable that emits all items from the source Observable as * soon as the condition becomes false * @see RxJava Wiki: skipWhileWithIndex() * @see MSDN: Observable.SkipWhile */ public Observable skipWhileWithIndex(Func2 predicate) { return create(OperationSkipWhile.skipWhileWithIndex(this, predicate)); } /** * Returns an Observable that bypasses all items from the source Observable * as long as a specified condition holds true, but emits all further * source items as soon as the condition becomes false. *

* * * @param predicate a function to test each item emitted from the source * Observable for a condition * @return an Observable that emits all items from the source Observable as * soon as the condition becomes false * @see RxJava Wiki: skipWhile() * @see MSDN: Observable.SkipWhile */ public Observable skipWhile(Func1 predicate) { return create(OperationSkipWhile.skipWhile(this, predicate)); } /** * Bypasses a specified number of items at the end of an Observable * sequence. *

* This operator accumulates a queue long enough to store the first * count items. As more items are received, items are taken * from the front of the queue and emitted by the returned Observable. This * causes such items to be delayed. *

* * * @param count number of items to bypass at the end of the source sequence * @return an Observable that emits the items emitted by the source * Observable except for the bypassed ones at the end * @throws IndexOutOfBoundsException if count is less than zero * @see RxJava Wiki: skipLast() * @see MSDN: Observable.SkipLast */ public Observable skipLast(int count) { return create(OperationSkipLast.skipLast(this, count)); } /** * Create an Observable that skips values emitted in a time window before * the source completes. *

* * * @param time the length of the time window * @param unit the time unit * @return an Observable that skips values emitted in a time window before * the source completes * @see RxJava Wiki: skipLast() * @see MSDN: Observable.SkipLast */ public Observable skipLast(long time, TimeUnit unit) { return skipLast(time, unit, Schedulers.threadPoolForComputation()); } /** * Create an Observable that skips values emitted in a time window before * the source completes by using the given scheduler as time source. *

* * * @param time the length of the time window * @param unit the time unit * @param scheduler the scheduler used for time source * @return an Observable that skips values emitted in a time window before * the source completes by using the given scheduler as time source * @see RxJava Wiki: skipLast() * @see MSDN: Observable.SkipLast */ public Observable skipLast(long time, TimeUnit unit, Scheduler scheduler) { return create(new OperationSkipLast.SkipLastTimed(this, time, unit, scheduler)); } /** * Returns an Observable that emits a single item, a list composed of all * the items emitted by the source Observable. *

* *

* Normally, an Observable that returns multiple items will do so by * invoking its {@link Observer}'s {@link Observer#onNext onNext} method for * each such item. You can change this behavior, instructing the Observable * to compose a list of all of these items and then to invoke the Observer's * onNext function once, passing it the entire list, by calling * the Observable's toList method prior to calling its * {@link #subscribe} method. *

* Be careful not to use this operator on Observables that emit infinite or * very large numbers of items, as you do not have the option to * unsubscribe. * * @return an Observable that emits a single item: a List containing all of * the items emitted by the source Observable. * @see RxJava Wiki: toList() */ public Observable> toList() { return create(OperationToObservableList.toObservableList(this)); } /** * Return an Observable that emits the items emitted by the source * Observable, in a sorted order (each item emitted by the Observable must * implement {@link Comparable} with respect to all other items in the * sequence). *

* * * @throws ClassCastException if any item emitted by the Observable does not * implement {@link Comparable} with respect to * all other items emitted by the Observable * @return an Observable that emits the items from the source Observable in * sorted order * @see RxJava Wiki: toSortedList() */ public Observable> toSortedList() { return create(OperationToObservableSortedList.toSortedList(this)); } /** * Return an Observable that emits the items emitted by the source * Observable, in a sorted order based on a specified comparison function *

* * * @param sortFunction a function that compares two items emitted by the * source Observable and returns an Integer that * indicates their sort order * @return an Observable that emits the items from the source Observable in * sorted order * @see RxJava Wiki: toSortedList() */ public Observable> toSortedList(Func2 sortFunction) { return create(OperationToObservableSortedList.toSortedList(this, sortFunction)); } /** * Emit a specified set of items before beginning to emit items from the * source Observable. *

* * * @param values Iterable of the items you want the modified Observable to * emit first * @return an Observable that exhibits the modified behavior * @see RxJava Wiki: startWith() */ public Observable startWith(Iterable values) { return concat(Observable. from(values), this); } /** * Emit a specified set of items with the specified scheduler before * beginning to emit items from the source Observable. *

* * * @param values iterable of the items you want the modified Observable to * emit first * @param scheduler the scheduler to emit the prepended values on * @return an Observable that exhibits the modified behavior * @see RxJava Wiki: startWith() * @see MSDN: Observable.StartWith */ public Observable startWith(Iterable values, Scheduler scheduler) { return concat(from(values, scheduler), this); } /** * Emit a specified array of items with the specified scheduler before * beginning to emit items from the source Observable. *

* * * @param values the items you want the modified Observable to emit first * @param scheduler the scheduler to emit the prepended values on * @return an Observable that exhibits the modified behavior * @see RxJava Wiki: startWith() * @see MSDN: Observable.StartWith */ public Observable startWith(T[] values, Scheduler scheduler) { return startWith(Arrays.asList(values), scheduler); } /** * Emit a specified item before beginning to emit items from the source * Observable. *

* * * @param t1 item to emit * @return an Observable that exhibits the modified behavior * @see RxJava Wiki: startWith() */ public Observable startWith(T t1) { return concat(Observable. from(t1), this); } /** * Emit a specified set of items before beginning to emit items from the * source Observable. *

* * * @param t1 first item to emit * @param t2 second item to emit * @return an Observable that exhibits the modified behavior * @see RxJava Wiki: startWith() */ public Observable startWith(T t1, T t2) { return concat(Observable. from(t1, t2), this); } /** * Emit a specified set of items before beginning to emit items from the * source Observable. *

* * * @param t1 first item to emit * @param t2 second item to emit * @param t3 third item to emit * @return an Observable that exhibits the modified behavior * @see RxJava Wiki: startWith() */ public Observable startWith(T t1, T t2, T t3) { return concat(Observable. from(t1, t2, t3), this); } /** * Emit a specified set of items before beginning to emit items from the * source Observable. *

* * * @param t1 first item to emit * @param t2 second item to emit * @param t3 third item to emit * @param t4 fourth item to emit * @return an Observable that exhibits the modified behavior * @see RxJava Wiki: startWith() */ public Observable startWith(T t1, T t2, T t3, T t4) { return concat(Observable. from(t1, t2, t3, t4), this); } /** * Emit a specified set of items before beginning to emit items from the * source Observable. *

* * * @param t1 first item to emit * @param t2 second item to emit * @param t3 third item to emit * @param t4 fourth item to emit * @param t5 fifth item to emit * @return an Observable that exhibits the modified behavior * @see RxJava Wiki: startWith() */ public Observable startWith(T t1, T t2, T t3, T t4, T t5) { return concat(Observable. from(t1, t2, t3, t4, t5), this); } /** * Emit a specified set of items before beginning to emit items from the * source Observable. *

* * * @param t1 first item to emit * @param t2 second item to emit * @param t3 third item to emit * @param t4 fourth item to emit * @param t5 fifth item to emit * @param t6 sixth item to emit * @return an Observable that exhibits the modified behavior * @see RxJava Wiki: startWith() */ public Observable startWith(T t1, T t2, T t3, T t4, T t5, T t6) { return concat(Observable. from(t1, t2, t3, t4, t5, t6), this); } /** * Emit a specified set of items before beginning to emit items from the * source Observable. *

* * * @param t1 first item to emit * @param t2 second item to emit * @param t3 third item to emit * @param t4 fourth item to emit * @param t5 fifth item to emit * @param t6 sixth item to emit * @param t7 seventh item to emit * @return an Observable that exhibits the modified behavior * @see RxJava Wiki: startWith() */ public Observable startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7) { return concat(Observable. from(t1, t2, t3, t4, t5, t6, t7), this); } /** * Emit a specified set of items before beginning to emit items from the * source Observable. *

* * * @param t1 first item to emit * @param t2 second item to emit * @param t3 third item to emit * @param t4 fourth item to emit * @param t5 fifth item to emit * @param t6 sixth item to emit * @param t7 seventh item to emit * @param t8 eighth item to emit * @return an Observable that exhibits the modified behavior * @see RxJava Wiki: startWith() */ public Observable startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) { return concat(Observable. from(t1, t2, t3, t4, t5, t6, t7, t8), this); } /** * Emit a specified set of items before beginning to emit items from the * source Observable. *

* * * @param t1 first item to emit * @param t2 second item to emit * @param t3 third item to emit * @param t4 fourth item to emit * @param t5 fifth item to emit * @param t6 sixth item to emit * @param t7 seventh item to emit * @param t8 eighth item to emit * @param t9 ninth item to emit * @return an Observable that exhibits the modified behavior * @see RxJava Wiki: startWith() */ public Observable startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9) { return concat(Observable. from(t1, t2, t3, t4, t5, t6, t7, t8, t9), this); } /** * Groups the items emitted by an Observable according to a specified * criterion, and emits these grouped items as {@link GroupedObservable}s, * one GroupedObservable per group. *

* * * @param keySelector a function that extracts the key from an item * @param elementSelector a function to map a source item to an item in a * {@link GroupedObservable} * @param the key type * @param the type of items emitted by the resulting * {@link GroupedObservable}s * @return an Observable that emits {@link GroupedObservable}s, each of * which corresponds to a unique key value and emits items * representing items from the source Observable that share that key * value * @see RxJava Wiki: groupBy */ public Observable> groupBy(final Func1 keySelector, final Func1 elementSelector) { return create(OperationGroupBy.groupBy(this, keySelector, elementSelector)); } /** * Groups the items emitted by an Observable according to a specified * criterion, and emits these grouped items as {@link GroupedObservable}s, * one GroupedObservable per group. *

* * * @param keySelector a function that extracts the key for each item * @param the key type * @return an Observable that emits {@link GroupedObservable}s, each of * which corresponds to a unique key value and emits items * representing items from the source Observable that share that key * value * @see RxJava Wiki: groupBy */ public Observable> groupBy(final Func1 keySelector) { return create(OperationGroupBy.groupBy(this, keySelector)); } /** * Return an Observable that correlates two sequences when they overlap and * groups the results. *

* * * @param right the other Observable to correlate items from this Observable * with * @param leftDuration function that returns an Observable whose emissions * indicate the duration of the values of this * Observable * @param rightDuration function that returns an Observable whose emissions * indicate the duration of the values of the * right Observable * @param resultSelector function that takes an item emitted by each source * Observable and returns the value to be emitted by * the resulting Observable * @return an Observable that emits grouped items based on overlapping * durations from this and another Observable * @see RxJava Wiiki: groupJoin * @see MSDN: Observable.GroupJoin */ public Observable groupJoin(Observable right, Func1> leftDuration, Func1> rightDuration, Func2, ? extends R> resultSelector) { return create(new OperationGroupJoin(this, right, leftDuration, rightDuration, resultSelector)); } /** * Returns an Observable that emits true if the source * Observable is empty, otherwise false. *

* In Rx.Net this is negated as the any operator but we renamed * this in RxJava to better match Java naming idioms. *

* * * @return an Observable that emits a Boolean * @see RxJava Wiki: isEmpty() * @see MSDN: Observable.Any */ public Observable isEmpty() { return create(OperationAny.isEmpty(this)); } /** * Returns an Observable that emits the last item emitted by the source or * notifies observers of an IllegalArgumentException if the * source Observable is empty. *

* * * @return an Observable that emits the last item from the source Observable * or notifies observers of an error * @see RxJava Wiki: last() * @see MSDN: Observable.lastAsync() */ public Observable last() { return takeLast(1).single(); } /** * Returns an Observable that emits only the last item emitted by the source * Observable that satisfies a given condition, or an * IllegalArgumentException if no such items are emitted. *

* * * @param predicate the condition any source emitted item has to satisfy * @return an Observable that emits only the last item satisfying the given * condition from the source, or an IllegalArgumentException if no * such items are emitted * @throws IllegalArgumentException if no such itmes are emmited * @see RxJava Wiki: last() * @see MSDN: Observable.lastAsync() */ public Observable last(Func1 predicate) { return filter(predicate).takeLast(1).single(); } /** * Returns an Observable that emits only the last item emitted by the source * Observable, or a default item if the source is empty. *

* * * @param defaultValue the default item to emit if the source Observable is * empty * @return an Observable that emits only the last item from the source, or a * default item if the source is empty * @see RxJava Wiki: lastOrDefault() * @see MSDN: Observable.lastOrDefaultAsync() */ public Observable lastOrDefault(T defaultValue) { return takeLast(1).singleOrDefault(defaultValue); } /** * Returns an Observable that emits only the last item emitted by the source * Observable that satisfies a given condition, or a default item otherwise. *

* * * @param defaultValue the default item to emit if the source Observable * doesn't emit anything that satisfies the given * condition * @param predicate the condition any source emitted item has to satisfy * @return an Observable that emits only the last item from the source that * satisfies the given condition, or a default item otherwise * @see RxJava Wiki: lastOrDefault() * @see MSDN: Observable.lastOrDefaultAsync() */ public Observable lastOrDefault(T defaultValue, Func1 predicate) { return filter(predicate).takeLast(1).singleOrDefault(defaultValue); } /** * Returns an Observable that counts the total number of items emitted by * the source Observable and emits this count as a 64-bit long. *

* * * @return an Observable that emits the number of items emitted by the * source Observable as its single, 64-bit long item * @see RxJava Wiki: count() * @see MSDN: Observable.LongCount * @see #count() */ public Observable longCount() { return reduce(0L, new Func2() { @Override public Long call(Long t1, T t2) { return t1 + 1; } }); } /** * Converts an Observable into a {@link BlockingObservable} (an Observable * with blocking operators). * * @return a BlockingObservable version of this Observable * @see RxJava Wiki: Blocking Observable Operators */ public BlockingObservable toBlockingObservable() { return BlockingObservable.from(this); } /** * Converts the items emitted by an Observable to the specified type. *

* * * @param klass the target class type which the items will be converted to * @return an Observable that emits each item from the source Observable * converted to the specified type * @see RxJava Wiki: cast() * @see MSDN: Observable.Cast */ public Observable cast(final Class klass) { return create(OperationCast.cast(this, klass)); } /** * Filters the items emitted by an Observable based on the specified type. *

* * * @param klass the class type to filter the items emitted by the source * Observable * @return an Observable that emits items from the source Observable of * type klass. * @see RxJava Wiki: ofType() * @see MSDN: Observable.OfType */ public Observable ofType(final Class klass) { return filter(new Func1() { public Boolean call(T t) { return klass.isInstance(t); } }).cast(klass); } /** * Ignores all items emitted by an Observable and only calls * onCompleted or onError. *

* * * @return an empty Observable that only calls onCompleted or * onError * @see RxJava Wiki: ignoreElements() * @see MSDN: Observable.IgnoreElements */ public Observable ignoreElements() { return filter(alwaysFalse()); } /** * Applies a timeout policy for each item emitted by the Observable, using * the specified scheduler to run timeout timers. If the next item isn't * observed within the specified timeout duration starting from its * predecessor, observers are notified of a TimeoutException. *

* * * @param timeout maximum duration between items before a timeout occurs * @param timeUnit the unit of time which applies to the * timeout argument. * @return the source Observable modified to notify observers of a * TimeoutException in case of a timeout * @see RxJava Wiki: timeout() * @see MSDN: Observable.Timeout */ public Observable timeout(long timeout, TimeUnit timeUnit) { return create(OperationTimeout.timeout(this, timeout, timeUnit)); } /** * Applies a timeout policy for each item emitted by the Observable, using * the specified scheduler to run timeout timers. If the next item isn't * observed within the specified timeout duration starting from its * predecessor, a specified fallback Observable produces future items and * notifications from that point on. *

* * * @param timeout maximum duration between items before a timeout occurs * @param timeUnit the unit of time which applies to the * timeout argument * @param other fallback Observable to use in case of a timeout * @return the source Observable modified to switch to the fallback * Observable in case of a timeout * @see RxJava Wiki: timeout() * @see MSDN: Observable.Timeout */ public Observable timeout(long timeout, TimeUnit timeUnit, Observable other) { return create(OperationTimeout.timeout(this, timeout, timeUnit, other)); } /** * Applies a timeout policy for each item emitted by the Observable, using * the specified scheduler to run timeout timers. If the next item isn't * observed within the specified timeout duration starting from its * predecessor, the observer is notified of a TimeoutException. *

* * * @param timeout maximum duration between items before a timeout occurs * @param timeUnit the unit of time which applies to the * timeout argument * @param scheduler Scheduler to run the timeout timers on * @return the source Observable modified to notify observers of a * TimeoutException in case of a timeout * @see RxJava Wiki: timeout() * @see MSDN: Observable.Timeout */ public Observable timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) { return create(OperationTimeout.timeout(this, timeout, timeUnit, scheduler)); } /** * Applies a timeout policy for each item emitted by the Observable, using * the specified scheduler to run timeout timers. If the next item isn't * observed within the specified timeout duration starting from its * predecessor, a specified fallback Observable sequence produces future * items and notifications from that point on. *

* * * @param timeout maximum duration between items before a timeout occurs * @param timeUnit the unit of time which applies to the * timeout argument * @param other Observable to use as the fallback in case of a timeout * @param scheduler Scheduler to run the timeout timers on * @return the source Observable modified so that it will switch to the * fallback Observable in case of a timeout * @see RxJava Wiki: timeout() * @see MSDN: Observable.Timeout */ public Observable timeout(long timeout, TimeUnit timeUnit, Observable other, Scheduler scheduler) { return create(OperationTimeout.timeout(this, timeout, timeUnit, other, scheduler)); } /** * Records the time interval between consecutive items emitted by an * Observable. *

* * * @return an Observable that emits time interval information items * @see RxJava Wiki: timeInterval() * @see MSDN: Observable.TimeInterval */ public Observable> timeInterval() { return create(OperationTimeInterval.timeInterval(this)); } /** * Records the time interval between consecutive items emitted by an * Observable, using the specified Scheduler to compute time intervals. *

* * * @param scheduler Scheduler used to compute time intervals * @return an Observable that emits time interval information items * @see RxJava Wiki: timeInterval() * @see MSDN: Observable.TimeInterval */ public Observable> timeInterval(Scheduler scheduler) { return create(OperationTimeInterval.timeInterval(this, scheduler)); } /** * Constructs an Observable that creates a dependent resource object. *

* * * @param resourceFactory the factory function to obtain a resource object * that depends on the Observable * @param observableFactory the factory function to obtain an Observable * @return the Observable whose lifetime controls the lifetime of the * dependent resource object * @see RxJava Wiki: using() * @see MSDN: Observable.Using */ public static Observable using(Func0 resourceFactory, Func1> observableFactory) { return create(OperationUsing.using(resourceFactory, observableFactory)); } /** * Given multiple Observables, return the one that first emits an item. *

* * * @param o1 an Observable competing to react first * @param o2 an Observable competing to react first * @return an Observable that reflects whichever of the given Observables * reacted first * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public static Observable amb(Observable o1, Observable o2) { return create(OperationAmb.amb(o1, o2)); } /** * Given multiple Observables, return the one that first emits an item. *

* * * @param o1 an Observable competing to react first * @param o2 an Observable competing to react first * @param o3 an Observable competing to react first * @return an Observable that reflects whichever of the given Observables * reacted first * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public static Observable amb(Observable o1, Observable o2, Observable o3) { return create(OperationAmb.amb(o1, o2, o3)); } /** * Given multiple Observables, return the one that first emits an item. *

* * * @param o1 an Observable competing to react first * @param o2 an Observable competing to react first * @param o3 an Observable competing to react first * @param o4 an Observable competing to react first * @return an Observable that reflects whichever of the given Observables * reacted first * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4) { return create(OperationAmb.amb(o1, o2, o3, o4)); } /** * Given multiple Observables, return the one that first emits an item. *

* * * @param o1 an Observable competing to react first * @param o2 an Observable competing to react first * @param o3 an Observable competing to react first * @param o4 an Observable competing to react first * @param o5 an Observable competing to react first * @return an Observable that reflects whichever of the given Observables * reacted first * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5) { return create(OperationAmb.amb(o1, o2, o3, o4, o5)); } /** * Given multiple Observables, return the one that first emits an item. *

* * * @param o1 an Observable competing to react first * @param o2 an Observable competing to react first * @param o3 an Observable competing to react first * @param o4 an Observable competing to react first * @param o5 an Observable competing to react first * @param o6 an Observable competing to react first * @return an Observable that reflects whichever of the given Observables * reacted first * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6) { return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6)); } /** * Given multiple Observables, return the one that first emits an item. *

* * * @param o1 an Observable competing to react first * @param o2 an Observable competing to react first * @param o3 an Observable competing to react first * @param o4 an Observable competing to react first * @param o5 an Observable competing to react first * @param o6 an Observable competing to react first * @param o7 an Observable competing to react first * @return an Observable that reflects whichever of the given Observables * reacted first * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7) { return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7)); } /** * Given multiple Observables, return the one that first emits an item. *

* * * @param o1 an Observable competing to react first * @param o2 an Observable competing to react first * @param o3 an Observable competing to react first * @param o4 an Observable competing to react first * @param o5 an Observable competing to react first * @param o6 an Observable competing to react first * @param o7 an Observable competing to react first * @param o8 an observable competing to react first * @return an Observable that reflects whichever of the given Observables * reacted first * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8) { return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8)); } /** * Given multiple Observables, return the one that first emits an item. *

* * * @param o1 an Observable competing to react first * @param o2 an Observable competing to react first * @param o3 an Observable competing to react first * @param o4 an Observable competing to react first * @param o5 an Observable competing to react first * @param o6 an Observable competing to react first * @param o7 an Observable competing to react first * @param o8 an Observable competing to react first * @param o9 an Observable competing to react first * @return an Observable that reflects whichever of the given Observables * reacted first * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Observable o9) { return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8, o9)); } /** * Given multiple Observables, return the one that first emits an item. *

* * * @param sources Observable sources competing to react first * @return an Observable that reflects whichever of the given Observables * reacted first * @see RxJava Wiki: amb() * @see MSDN: Observable.Amb */ public static Observable amb(Iterable> sources) { return create(OperationAmb.amb(sources)); } /** * Invokes an action for each item emitted by the Observable. *

* * * @param observer the action to invoke for each item emitted by the source * Observable * @return the source Observable with the side-effecting behavior applied * @see RxJava Wiki: doOnEach() * @see MSDN: Observable.Do */ public Observable doOnEach(Observer observer) { return create(OperationDoOnEach.doOnEach(this, observer)); } /** * Invokes an action if the source Observable calls onError. *

* * * @param onError the action to invoke if the source Observable calls * onError * @return the source Observable with the side-effecting behavior applied * @see RxJava Wiki: doOnError() * @see MSDN: Observable.Do */ public Observable doOnError(final Action1 onError) { Observer observer = new Observer() { @Override public void onCompleted() {} @Override public void onError(Throwable e) { onError.call(e); } @Override public void onNext(T args) { } }; return create(OperationDoOnEach.doOnEach(this, observer)); } /** * Invokes an action when the source Observable calls * onCompleted. *

* * * @param onCompleted the action to invoke when the source Observable calls * onCompleted * @return the source Observable with the side-effecting behavior applied * @see RxJava Wiki: doOnCompleted() * @see MSDN: Observable.Do */ public Observable doOnCompleted(final Action0 onCompleted) { Observer observer = new Observer() { @Override public void onCompleted() { onCompleted.call(); } @Override public void onError(Throwable e) { } @Override public void onNext(T args) { } }; return create(OperationDoOnEach.doOnEach(this, observer)); } /** * Invokes an action when the source Observable calls * onNext. *

* * * @param onNext the action to invoke when the source Observable calls * onNext * @return the source Observable with the side-effecting behavior applied * @see RxJava Wiki: doOnNext() * @see MSDN: Observable.Do */ public Observable doOnNext(final Action1 onNext) { Observer observer = new Observer() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(T args) { onNext.call(args); } }; return create(OperationDoOnEach.doOnEach(this, observer)); } /** * Invokes an action for each item emitted by the Observable. *

* * * @param observer the action to invoke for each item emitted by the source * Observable * @return the source Observable with the side-effecting behavior applied * @see RxJava Wiki: doOnEach() * @see MSDN: Observable.Do */ public Observable doOnEach(final Action1> onNotification) { Observer observer = new Observer() { @Override public void onCompleted() { onNotification.call(new Notification()); } @Override public void onError(Throwable e) { onNotification.call(new Notification(e)); } @Override public void onNext(T v) { onNotification.call(new Notification(v)); } }; return create(OperationDoOnEach.doOnEach(this, observer)); } /** * Whether a given {@link Function} is an internal implementation inside * rx.* packages or not. *

* For why this is being used see * https://github.com/Netflix/RxJava/issues/216 for discussion on * "Guideline 6.4: Protect calls to user code from within an operator" *

* Note: If strong reasons for not depending on package names comes up then * the implementation of this method can change to looking for a marker * interface. * * @param o * @return {@code true} if the given function is an internal implementation, * and {@code false} otherwise. */ private boolean isInternalImplementation(Object o) { if (o == null) { return true; } // prevent double-wrapping (yeah it happens) if (o instanceof SafeObserver) { return true; } Class clazz = o.getClass(); if (internalClassMap.containsKey(clazz)) { //don't need to do reflection return internalClassMap.get(clazz); } else { // we treat the following package as "internal" and don't wrap it Package p = o.getClass().getPackage(); // it can be null Boolean isInternal = (p != null && p.getName().startsWith("rx.operators")); internalClassMap.put(clazz, isInternal); return isInternal; } } /** * Creates a pattern that matches when both Observables emit an item. *

* * * @param second Observable to match with the source Observable * @return Pattern object that matches when both Observables emit an item * @throws NullPointerException if right is null * @see RxJava Wiki: and() * @see MSDN: Observable.And */ public Pattern2 and(Observable right) { return OperationJoinPatterns.and(this, right); } /** * Matches when the Observable has an available item and projects the item * by invoking the selector function. *

* * * @param selector selector that will be invoked for items emitted by the * source Observable * @return a Plan that produces the projected results, to be fed (with other * Plans) to the {@link #when} operator * @throws NullPointerException if selector is null * @see RxJava Wiki: then() * @see MSDN: Observable.Then */ public Plan0 then(Func1 selector) { return OperationJoinPatterns.then(this, selector); } /** * Joins together the results from several patterns. *

* * * @param plans a series of plans created by use of the {@link #then} * operator on patterns * @return an Observable that emits the results from matching several * patterns * @throws NullPointerException if plans is null * @see RxJava Wiki: when() * @see MSDN: Observable.When */ public static Observable when(Plan0... plans) { return create(OperationJoinPatterns.when(plans)); } /** * Joins together the results from several patterns. *

* * * @param plans a series of plans created by use of the {@link #then} * operator on patterns * @return an Observable that emits the results from matching several * patterns * @throws NullPointerException if plans is null * @see RxJava Wiki: when() * @see MSDN: Observable.When */ public static Observable when(Iterable> plans) { if (plans == null) { throw new NullPointerException("plans"); } return create(OperationJoinPatterns.when(plans)); } /** * Joins the results from a pattern. *

* * * @param p1 the plan to join * @return an Observable that emits the results from matching a pattern * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public static Observable when(Plan0 p1) { return create(OperationJoinPatterns.when(p1)); } /** * Joins together the results from several patterns. *

* * * @param p1 a plan * @param p2 a plan * @return an Observable that emits the results from matching several * patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public static Observable when(Plan0 p1, Plan0 p2) { return create(OperationJoinPatterns.when(p1, p2)); } /** * Joins together the results from several patterns. *

* * * @param p1 a plan * @param p2 a plan * @param p3 a plan * @return an Observable that emits the results from matching several * patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3) { return create(OperationJoinPatterns.when(p1, p2, p3)); } /** * Joins together the results from several patterns. *

* * * @param p1 a plan * @param p2 a plan * @param p3 a plan * @param p4 a plan * @return an Observable that emits the results from matching several * patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4) { return create(OperationJoinPatterns.when(p1, p2, p3, p4)); } /** * Joins together the results from several patterns. *

* * * @param p1 a plan * @param p2 a plan * @param p3 a plan * @param p4 a plan * @param p5 a plan * @return an Observable that emits the results from matching several * patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5) { return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5)); } /** * Joins together the results from several patterns. *

* * * @param p1 a plan * @param p2 a plan * @param p3 a plan * @param p4 a plan * @param p5 a plan * @param p6 a plan * @return an Observable that emits the results from matching several * patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6) { return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6)); } /** * Joins together the results from several patterns. *

* * * @param p1 a plan * @param p2 a plan * @param p3 a plan * @param p4 a plan * @param p5 a plan * @param p6 a plan * @param p7 a plan * @return an Observable that emits the results from matching several * patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6, Plan0 p7) { return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7)); } /** * Joins together the results from several patterns. *

* * * @param p1 a plan * @param p2 a plan * @param p3 a plan * @param p4 a plan * @param p5 a plan * @param p6 a plan * @param p7 a plan * @param p8 a plan * @return an Observable that emits the results from matching several * patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6, Plan0 p7, Plan0 p8) { return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8)); } /** * Joins together the results from several patterns. *

* * * @param p1 a plan * @param p2 a plan * @param p3 a plan * @param p4 a plan * @param p5 a plan * @param p6 a plan * @param p7 a plan * @param p8 a plan * @param p9 a plan * @return an Observable that emits the results from matching several * patterns * @see RxJava Wiki: when() * @see MSDN: Observable.When */ @SuppressWarnings("unchecked") public static Observable when(Plan0 p1, Plan0 p2, Plan0 p3, Plan0 p4, Plan0 p5, Plan0 p6, Plan0 p7, Plan0 p8, Plan0 p9) { return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9)); } /** * Correlates the items emitted by two Observables based on overlapping * durations. *

* * * @param right the second Observable to join items from * @param leftDurationSelector a function to select the duration of each * item emitted by this Observable, used to * determine overlap * @param rightDurationSelector a function to select the duration of each * item emitted by the right * Observable, used to determine overlap * @param resultSelector a function that computes a result item for any two * overlapping items emitted by the two Observables * @return an Observable that emits result items computed from source items * that have an overlapping duration * @see RxJava Wiki: join() * @see MSDN: Observable.Join */ public Observable join(Observable right, Func1> leftDurationSelector, Func1> rightDurationSelector, Func2 resultSelector) { return create(new OperationJoin(this, right, leftDurationSelector, rightDurationSelector, resultSelector)); } /** * Return an Observable that emits a single HashMap containing all items * emitted by the source Observable, mapped by the keys returned by the * {@code keySelector} function. *

* *

* If a source item maps to the same key, the HashMap will contain the * latest of those items. * * @param keySelector the function that extracts the key from the source * items to be used as keys in the HashMap * @return an Observable that emits a single HashMap containing the mapped * items from the source Observable * @see RxJava Wiki: toMap() * @see MSDN: Observable.ToDictionary */ public Observable> toMap(Func1 keySelector) { return create(OperationToMap.toMap(this, keySelector)); } /** * Return an Observable that emits a single HashMap containing elements with * key and value extracted from the items emitted by the source Observable. *

* *

* If a source item maps to the same key, the HashMap will contain the * latest of those items. * * @param keySelector the function that extracts the key from the source * items to be used as key in the HashMap * @param valueSelector the function that extracts the value from the source * items to be used as value in the HashMap * @return an Observable that emits a single HashMap containing the mapped * items from the source Observable * @see RxJava Wiki: toMap() * @see MSDN: Observable.ToDictionary */ public Observable> toMap(Func1 keySelector, Func1 valueSelector) { return create(OperationToMap.toMap(this, keySelector, valueSelector)); } /** * Return an Observable that emits a single Map, returned by the * mapFactory function, containing key and value extracted from * the items emitted by the source Observable. *

* * * @param keySelector the function that extracts the key from the source * items to be used as key in the Map * @param valueSelector the function that extracts the value from the source * items to be used as value in the Map * @param mapFactory the function that returns an Map instance to be used * @return an Observable that emits a single Map containing the mapped * items emitted by the source Observable * @see RxJava Wiki: toMap() */ public Observable> toMap(Func1 keySelector, Func1 valueSelector, Func0> mapFactory) { return create(OperationToMap.toMap(this, keySelector, valueSelector, mapFactory)); } /** * Return an Observable that emits a single HashMap containing an ArrayList * of items, emitted by the source Observable and keyed by the * keySelector function. *

* * * @param keySelector the function that extracts the key from the source * items to be used as key in the HashMap * @return an Observable that emits a single HashMap containing an ArrayList * of items mapped from the source Observable * @see RxJava Wiki: toMap() * @see MSDN: Observable.ToLookup */ public Observable>> toMultimap(Func1 keySelector) { return create(OperationToMultimap.toMultimap(this, keySelector)); } /** * Return an Observable that emits a single HashMap containing an ArrayList * of values, extracted by the valueSelector function, emitted * by the source Observable and keyed by the keySelector * function. *

* * * @param keySelector the function that extracts the key from the source * items to be used as key in the HashMap * @param valueSelector the function that extracts the value from the source * items to be used as value in the Map * @return an Observable that emits a single HashMap containing an ArrayList * of items mapped from the source Observable * @see RxJava Wiki: toMap() * @see MSDN: Observable.ToLookup */ public Observable>> toMultimap(Func1 keySelector, Func1 valueSelector) { return create(OperationToMultimap.toMultimap(this, keySelector, valueSelector)); } /** * Return an Observable that emits a single Map, returned by the * mapFactory function, containing an ArrayList of values, * extracted by the valueSelector function, emitted by the * source Observable and keyed by the keySelector function. *

* * * @param keySelector the function that extracts the key from the source * items to be used as key in the Map * @param valueSelector the function that extracts the value from the source * items to be used as value in the Map * @param mapFactory the function that returns an Map instance to be used * @return an Observable that emits a single Map containing the list of * mapped items from the source Observable. * @see RxJava Wiki: toMap() */ public Observable>> toMultimap(Func1 keySelector, Func1 valueSelector, Func0>> mapFactory) { return create(OperationToMultimap.toMultimap(this, keySelector, valueSelector, mapFactory)); } /** * Return an Observable that emits a single Map, returned by the * mapFactory function, that contains a custom collection of * values, extracted by the valueSelector function, emitted by * the source Observable and keyed by the keySelector function. *

* * * @param keySelector the function that extracts the key from the source * items to be used as key in the Map * @param valueSelector the function that extracts the value from the source * items to be used as value in the Map * @param mapFactory the function that returns an Map instance to be used * @param collectionFactory the function that returns a Collection instance * for a particular key to be used in the Map * @return an Observable that emits a single Map containing the collection * of mapped items from the source Observable. * @see RxJava Wiki: toMap() */ public Observable>> toMultimap(Func1 keySelector, Func1 valueSelector, Func0>> mapFactory, Func1> collectionFactory) { return create(OperationToMultimap.toMultimap(this, keySelector, valueSelector, mapFactory, collectionFactory)); } /** * Return an Observable that skips items from the source Observable until * the secondary Observable emits an item. *

* * * @param other the other Observable that has to emit an item before this * Observable's elements are relayed * @return an Observable that skips items from the source Observable * until the secondary Observable emits an item. * @see RxJava Wiki: skipUntil() * @see MSDN: Observable.SkipUntil */ public Observable skipUntil(Observable other) { return create(new OperationSkipUntil(this, other)); } /** * Groups the items emitted by an Observable according to a specified key * selector function until the duration Observable expires for the key. *

* * * @param keySelector a function to extract the key for each item * @param durationSelector a function to signal the expiration of a group * @return an Observable that emits grouped Observables, each of which * corresponds to a key value and emits all items that share that * same key value that were emitted during the key's duration * @see RxJava Wiki: groupByUntil() * @see MSDN: Observable.GroupByUntil */ public Observable> groupByUntil(Func1 keySelector, Func1, ? extends Observable> durationSelector) { return groupByUntil(keySelector, Functions.identity(), durationSelector); } /** * Groups the items emitted by an Observable according to specified key and * value selector functions until the duration Observable expires for the * key. *

* * * @param keySelector a function to extract the key for each item * @param valueSelector a function to map each source item to an item * emitted by an Observable group * @param durationSelector a function to signal the expiration of a group * @return an Observable that emits grouped Observables, each of which * corresponds to a key value and emits all items that share that * same key value that were emitted during the key's duration * @see RxJava Wiki: groupByUntil() * @see MSDN: Observable.GroupByUntil */ public Observable> groupByUntil(Func1 keySelector, Func1 valueSelector, Func1, ? extends Observable> durationSelector) { return create(new OperationGroupByUntil(this, keySelector, valueSelector, durationSelector)); } }