/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import rx.observables.BlockingObservable;
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.SafeObservableSubscription;
import rx.operators.SafeObserver;
import rx.operators.OperationAll;
import rx.operators.OperationBuffer;
import rx.operators.OperationCache;
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationGroupBy;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMulticast;
import rx.operators.OperationObserveOn;
import rx.operators.OperationOnErrorResumeNextViaFunction;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationSample;
import rx.operators.OperationScan;
import rx.operators.OperationSkip;
import rx.operators.OperationSubscribeOn;
import rx.operators.OperationSwitch;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTake;
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationTimestamp;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
import rx.operators.OperationToObservableList;
import rx.operators.OperationToObservableSortedList;
import rx.operators.OperationWhere;
import rx.operators.OperationZip;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.BufferClosing;
import rx.util.BufferOpening;
import rx.util.OnErrorNotImplementedException;
import rx.util.Range;
import rx.util.Timestamped;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
import rx.util.functions.Func3;
import rx.util.functions.Func4;
import rx.util.functions.FuncN;
import rx.util.functions.Function;
import rx.util.functions.FunctionLanguageAdaptor;
import rx.util.functions.Functions;
/**
* The Observable interface that implements the Reactive Pattern.
*
* This interface provides overloaded methods for subscribing as well as delegate methods to the
* various operators.
*
* The documentation for this interface makes use of marble diagrams. The following legend explains
* these diagrams:
*
*
*
* For more information see the RxJava
* Wiki
*
* @param
*/
public class Observable {
//TODO use a consistent parameter naming scheme (for example: for all operators that modify a source Observable, the parameter representing that source Observable should have the same name, e.g. "source" -- currently such parameters are named any of "sequence", "that", "source", "items", or "observable")
private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
private final Func1, Subscription> onSubscribe;
/**
* Observable with Function to execute when subscribed to.
*
* NOTE: Use {@link #create(Func1)} to create an Observable instead of this method unless you
* specifically have a need for inheritance.
*
* @param onSubscribe
* {@link Func1} to be executed when {@link #subscribe(Observer)} is called.
*/
protected Observable(Func1, Subscription> onSubscribe) {
this.onSubscribe = onSubscribe;
}
protected Observable() {
this(null);
//TODO should this be made private to prevent it? It really serves no good purpose and only confuses things. Unit tests are incorrectly using it today
}
/**
* An {@link Observer} must call an Observable's {@code subscribe} method in order to
* receive items and notifications from the Observable.
*
* A typical implementation of {@code subscribe} does the following:
*
* It stores a reference to the Observer in a collection object, such as a
* {@code List} object.
*
* It returns a reference to the {@link Subscription} interface. This enables Observers to
* unsubscribe, that is, to stop receiving items and notifications before the Observable stops
* sending them, which also invokes the Observer's {@link Observer#onCompleted onCompleted}
* method.
*
* An Observable<T> instance is responsible for accepting all subscriptions
* and notifying all Observers. Unless the documentation for a particular
* Observable<T> implementation indicates otherwise, Observers should make no
* assumptions about the order in which multiple Observers will receive their notifications.
*
* For more information see the
* RxJava Wiki
*
* @param observer the observer
* @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items
* before the Observable has finished sending them
* @throws IllegalArgumentException
* if the {@link Observer} provided as the argument to {@code subscribe()} is
* {@code null}
*/
public Subscription subscribe(Observer observer) {
// allow the hook to intercept and/or decorate
Func1, Subscription> onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe);
// validate and proceed
if (observer == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (onSubscribeFunction == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
// the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception
}
try {
/**
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
if (isInternalImplementation(observer)) {
Subscription s = onSubscribeFunction.call(observer);
if (s == null) {
// this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens
// we want to gracefully handle it the same as AtomicObservableSubscription does
return hook.onSubscribeReturn(this, Subscriptions.empty());
} else {
return hook.onSubscribeReturn(this, s);
}
} else {
SafeObservableSubscription subscription = new SafeObservableSubscription();
subscription.wrap(onSubscribeFunction.call(new SafeObserver(subscription, observer)));
return hook.onSubscribeReturn(this, subscription);
}
} catch (OnErrorNotImplementedException e) {
// special handling when onError is not implemented ... we just rethrow
throw e;
} catch (Throwable e) {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
observer.onError(hook.onSubscribeError(this, e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Throwable e2) {
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
hook.onSubscribeError(this, r);
throw r;
}
return Subscriptions.empty();
}
}
/**
* An {@link Observer} must call an Observable's {@code subscribe} method in order to
* receive items and notifications from the Observable.
*
* A typical implementation of {@code subscribe} does the following:
*
* It stores a reference to the Observer in a collection object, such as a
* {@code List} object.
*
* It returns a reference to the {@link Subscription} interface. This enables Observers to
* unsubscribe, that is, to stop receiving items and notifications before the Observable stops
* sending them, which also invokes the Observer's {@link Observer#onCompleted onCompleted}
* method.
*
* An {@code Observable} instance is responsible for accepting all subscriptions
* and notifying all Observers. Unless the documentation for a particular
* {@code Observable} implementation indicates otherwise, Observers should make no
* assumptions about the order in which multiple Observers will receive their notifications.
*
* For more information see the
* RxJava Wiki
*
* @param observer the observer
* @param scheduler
* the {@link Scheduler} on which Observers subscribe to the Observable
* @return a {@link Subscription} reference with which Observers can stop receiving items and
* notifications before the Observable has finished sending them
* @throws IllegalArgumentException
* if an argument to {@code subscribe()} is {@code null}
*/
public Subscription subscribe(Observer observer, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(observer);
}
/**
* Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
private Subscription protectivelyWrapAndSubscribe(Observer o) {
SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(subscribe(new SafeObserver(subscription, o)));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Map callbacks) {
if (callbacks == null) {
throw new RuntimeException("callbacks map can not be null");
}
Object _onNext = callbacks.get("onNext");
if (_onNext == null) {
throw new RuntimeException("'onNext' key must contain an implementation");
}
// lookup and memoize onNext
final FuncN onNext = Functions.from(_onNext);
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
Object onComplete = callbacks.get("onCompleted");
if (onComplete != null) {
Functions.from(onComplete).call();
}
}
@Override
public void onError(Throwable e) {
handleError(e);
Object onError = callbacks.get("onError");
if (onError != null) {
Functions.from(onError).call(e);
} else {
throw new OnErrorNotImplementedException(e);
}
}
@Override
public void onNext(Object args) {
onNext.call(args);
}
});
}
public Subscription subscribe(final Map callbacks, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(callbacks);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object o) {
if (o instanceof Observer) {
// in case a dynamic language is not correctly handling the overloaded methods and we receive an Observer just forward to the correct method.
return subscribe((Observer) o);
}
if (o == null) {
throw new IllegalArgumentException("onNext can not be null");
}
// lookup and memoize onNext
final FuncN onNext = Functions.from(o);
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
// do nothing
}
@Override
public void onError(Throwable e) {
handleError(e);
throw new OnErrorNotImplementedException(e);
}
@Override
public void onNext(Object args) {
onNext.call(args);
}
});
}
public Subscription subscribe(final Object o, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(o);
}
public Subscription subscribe(final Action1 onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
// do nothing
}
@Override
public void onError(Throwable e) {
handleError(e);
throw new OnErrorNotImplementedException(e);
}
@Override
public void onNext(T args) {
onNext.call(args);
}
});
}
public Subscription subscribe(final Action1 onNext, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object onNext, final Object onError) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
// lookup and memoize onNext
final FuncN onNextFunction = Functions.from(onNext);
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
// do nothing
}
@Override
public void onError(Throwable e) {
handleError(e);
Functions.from(onError).call(e);
}
@Override
public void onNext(Object args) {
onNextFunction.call(args);
}
});
}
public Subscription subscribe(final Object onNext, final Object onError, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError);
}
public Subscription subscribe(final Action1 onNext, final Action1 onError) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
// do nothing
}
@Override
public void onError(Throwable e) {
handleError(e);
onError.call(e);
}
@Override
public void onNext(T args) {
onNext.call(args);
}
});
}
public Subscription subscribe(final Action1 onNext, final Action1 onError, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
if (onComplete == null) {
throw new IllegalArgumentException("onComplete can not be null");
}
// lookup and memoize onNext
final FuncN onNextFunction = Functions.from(onNext);
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
Functions.from(onComplete).call();
}
@Override
public void onError(Throwable e) {
handleError(e);
Functions.from(onError).call(e);
}
@Override
public void onNext(Object args) {
onNextFunction.call(args);
}
});
}
public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}
public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
if (onComplete == null) {
throw new IllegalArgumentException("onComplete can not be null");
}
/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {
@Override
public void onCompleted() {
onComplete.call();
}
@Override
public void onError(Throwable e) {
handleError(e);
onError.call(e);
}
@Override
public void onNext(T args) {
onNext.call(args);
}
});
}
public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}
/**
* Returns a {@link ConnectableObservable} that upon connection causes the source Observable to
* push results into the specified subject.
*
* @param subject
* the {@link Subject} for the {@link ConnectableObservable} to push source items
* into
* @param
* result type
* @return a {@link ConnectableObservable} that upon connection causes the source Observable to
* push results into the specified {@link Subject}
*/
public ConnectableObservable multicast(Subject subject) {
return multicast(this, subject);
}
/**
* Allow the {@link RxJavaErrorHandler} to receive the exception from onError.
*
* @param e
*/
private void handleError(Throwable e) {
// onError should be rare so we'll only fetch when needed
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
}
/**
* An Observable that never sends any information to an {@link Observer}.
*
* This Observable is useful primarily for testing purposes.
*
* @param
* the type of item emitted by the Observable
*/
private static class NeverObservable extends Observable {
public NeverObservable() {
super(new Func1, Subscription>() {
@Override
public Subscription call(Observer t1) {
return Subscriptions.empty();
}
});
}
}
/**
* an Observable that invokes {@link Observer#onError onError} when the {@link Observer}
* subscribes to it.
*
* @param
* the type of item emitted by the Observable
*/
private static class ThrowObservable extends Observable {
public ThrowObservable(final Throwable exception) {
super(new Func1, Subscription>() {
/**
* Accepts an {@link Observer} and calls its {@link Observer#onError onError} method.
*
* @param observer
* an {@link Observer} of this Observable
* @return a reference to the subscription
*/
@Override
public Subscription call(Observer observer) {
observer.onError(exception);
return Subscriptions.empty();
}
});
}
}
/**
* Creates an Observable which produces buffers of collected values. This Observable produces connected
* non-overlapping buffers. The current buffer is emitted and replaced with a new buffer when the
* Observable produced by the specified {@link Func0} produces a {@link BufferClosing} object. The
* {@link Func0} will then be used to create a new Observable to listen for the end of the next buffer.
*
* @param source
* The source {@link Observable} which produces values.
* @param bufferClosingSelector
* The {@link Func0} which is used to produce an {@link Observable} for every buffer created.
* When this {@link Observable} produces a {@link BufferClosing} object, the associated buffer
* is emitted and replaced with a new one.
* @return
* An {@link Observable} which produces connected non-overlapping buffers, which are emitted
* when the current {@link Observable} created with the {@link Func0} argument produces a
* {@link BufferClosing} object.
*/
public static Observable> buffer(Observable source, Func0> bufferClosingSelector) {
return create(OperationBuffer.buffer(source, bufferClosingSelector));
}
/**
* Creates an Observable which produces buffers of collected values. This Observable produces buffers.
* Buffers are created when the specified "bufferOpenings" Observable produces a {@link BufferOpening} object.
* Additionally the {@link Func0} argument is used to create an Observable which produces {@link BufferClosing}
* objects. When this Observable produces such an object, the associated buffer is emitted.
*
* @param source
* The source {@link Observable} which produces values.
* @param bufferOpenings
* The {@link Observable} which when it produces a {@link BufferOpening} object, will cause
* another buffer to be created.
* @param bufferClosingSelector
* The {@link Func0} which is used to produce an {@link Observable} for every buffer created.
* When this {@link Observable} produces a {@link BufferClosing} object, the associated buffer
* is emitted.
* @return
* An {@link Observable} which produces buffers which are created and emitted when the specified
* {@link Observable}s publish certain objects.
*/
public static Observable> buffer(Observable source, Observable bufferOpenings, Func1> bufferClosingSelector) {
return create(OperationBuffer.buffer(source, bufferOpenings, bufferClosingSelector));
}
/**
* Creates an Observable which produces buffers of collected values. This Observable produces connected
* non-overlapping buffers, each containing "count" elements. When the source Observable completes or
* encounters an error, the current buffer is emitted, and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param count
* The maximum size of each buffer before it should be emitted.
* @return
* An {@link Observable} which produces connected non-overlapping buffers containing at most
* "count" produced values.
*/
public static Observable> buffer(Observable source, int count) {
return create(OperationBuffer.buffer(source, count));
}
/**
* Creates an Observable which produces buffers of collected values. This Observable produces buffers every
* "skip" values, each containing "count" elements. When the source Observable completes or encounters an error,
* the current buffer is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param count
* The maximum size of each buffer before it should be emitted.
* @param skip
* How many produced values need to be skipped before starting a new buffer. Note that when "skip" and
* "count" are equals that this is the same operation as {@link Observable#buffer(Observable, int)}.
* @return
* An {@link Observable} which produces buffers every "skipped" values containing at most
* "count" produced values.
*/
public static Observable> buffer(Observable source, int count, int skip) {
return create(OperationBuffer.buffer(source, count, skip));
}
/**
* Creates an Observable which produces buffers of collected values. This Observable produces connected
* non-overlapping buffers, each of a fixed duration specified by the "timespan" argument. When the source
* Observable completes or encounters an error, the current buffer is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param timespan
* The period of time each buffer is collecting values before it should be emitted, and
* replaced with a new buffer.
* @param unit
* The unit of time which applies to the "timespan" argument.
* @return
* An {@link Observable} which produces connected non-overlapping buffers with a fixed duration.
*/
public static Observable> buffer(Observable source, long timespan, TimeUnit unit) {
return create(OperationBuffer.buffer(source, timespan, unit));
}
/**
* Creates an Observable which produces buffers of collected values. This Observable produces connected
* non-overlapping buffers, each of a fixed duration specified by the "timespan" argument. When the source
* Observable completes or encounters an error, the current buffer is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param timespan
* The period of time each buffer is collecting values before it should be emitted, and
* replaced with a new buffer.
* @param unit
* The unit of time which applies to the "timespan" argument.
* @param scheduler
* The {@link Scheduler} to use when determining the end and start of a buffer.
* @return
* An {@link Observable} which produces connected non-overlapping buffers with a fixed duration.
*/
public static Observable> buffer(Observable source, long timespan, TimeUnit unit, Scheduler scheduler) {
return create(OperationBuffer.buffer(source, timespan, unit, scheduler));
}
/**
* Creates an Observable which produces buffers of collected values. This Observable produces connected
* non-overlapping buffers, each of a fixed duration specified by the "timespan" argument or a maximum size
* specified by the "count" argument (which ever is reached first). When the source Observable completes
* or encounters an error, the current buffer is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param timespan
* The period of time each buffer is collecting values before it should be emitted, and
* replaced with a new buffer.
* @param unit
* The unit of time which applies to the "timespan" argument.
* @param count
* The maximum size of each buffer before it should be emitted.
* @return
* An {@link Observable} which produces connected non-overlapping buffers which are emitted after
* a fixed duration or when the buffer has reached maximum capacity (which ever occurs first).
*/
public static Observable> buffer(Observable source, long timespan, TimeUnit unit, int count) {
return create(OperationBuffer.buffer(source, timespan, unit, count));
}
/**
* Creates an Observable which produces buffers of collected values. This Observable produces connected
* non-overlapping buffers, each of a fixed duration specified by the "timespan" argument or a maximum size
* specified by the "count" argument (which ever is reached first). When the source Observable completes
* or encounters an error, the current buffer is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param timespan
* The period of time each buffer is collecting values before it should be emitted, and
* replaced with a new buffer.
* @param unit
* The unit of time which applies to the "timespan" argument.
* @param count
* The maximum size of each buffer before it should be emitted.
* @param scheduler
* The {@link Scheduler} to use when determining the end and start of a buffer.
* @return
* An {@link Observable} which produces connected non-overlapping buffers which are emitted after
* a fixed duration or when the buffer has reached maximum capacity (which ever occurs first).
*/
public static Observable> buffer(Observable source, long timespan, TimeUnit unit, int count, Scheduler scheduler) {
return create(OperationBuffer.buffer(source, timespan, unit, count, scheduler));
}
/**
* Creates an Observable which produces buffers of collected values. This Observable starts a new buffer
* periodically, which is determined by the "timeshift" argument. Each buffer is emitted after a fixed timespan
* specified by the "timespan" argument. When the source Observable completes or encounters an error, the
* current buffer is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param timespan
* The period of time each buffer is collecting values before it should be emitted.
* @param timeshift
* The period of time after which a new buffer will be created.
* @param unit
* The unit of time which applies to the "timespan" and "timeshift" argument.
* @return
* An {@link Observable} which produces new buffers periodically, and these are emitted after
* a fixed timespan has elapsed.
*/
public static Observable> buffer(Observable source, long timespan, long timeshift, TimeUnit unit) {
return create(OperationBuffer.buffer(source, timespan, timeshift, unit));
}
/**
* Creates an Observable which produces buffers of collected values. This Observable starts a new buffer
* periodically, which is determined by the "timeshift" argument. Each buffer is emitted after a fixed timespan
* specified by the "timespan" argument. When the source Observable completes or encounters an error, the
* current buffer is emitted and the event is propagated.
*
* @param source
* The source {@link Observable} which produces values.
* @param timespan
* The period of time each buffer is collecting values before it should be emitted.
* @param timeshift
* The period of time after which a new buffer will be created.
* @param unit
* The unit of time which applies to the "timespan" and "timeshift" argument.
* @param scheduler
* The {@link Scheduler} to use when determining the end and start of a buffer.
* @return
* An {@link Observable} which produces new buffers periodically, and these are emitted after
* a fixed timespan has elapsed.
*/
public static Observable> buffer(Observable source, long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) {
return create(OperationBuffer.buffer(source, timespan, timeshift, unit, scheduler));
}
/**
* Creates an Observable that will execute the given function when an {@link Observer}
* subscribes to it.
*
*
*
* Write the function you pass to create so that it behaves as an Observable: It
* should invoke the Observer's {@link Observer#onNext onNext},
* {@link Observer#onError onError}, and {@link Observer#onCompleted onCompleted} methods
* appropriately.
*
* A well-formed Observable must invoke either the Observer's onCompleted method
* exactly once or its onError method exactly once.
*
* See Rx Design Guidelines (PDF)
* for detailed information.
*
* @param
* the type of the items that this Observable emits
* @param func
* a function that accepts an {@code Observer}, invokes its
* {@code onNext}, {@code onError}, and {@code onCompleted} methods
* as appropriate, and returns a {@link Subscription} to allow the Observer to
* canceling the subscription
* @return an Observable that, when an {@link Observer} subscribes to it, will execute the given
* function
*/
public static Observable create(Func1, Subscription> func) {
return new Observable(func);
}
/**
* Creates an Observable that will execute the given function when an {@link Observer}
* subscribes to it.
*
*
*
* This method accepts {@link Object} to allow different languages to pass in methods using
* {@link FunctionLanguageAdaptor}.
*
* Write the function you pass to create so that it behaves as an Observable: It
* should invoke the Observer's {@link Observer#onNext onNext},
* {@link Observer#onError onError}, and {@link Observer#onCompleted onCompleted} methods
* appropriately.
*
* A well-formed Observable must invoke either the Observer's onCompleted method
* exactly once or its onError method exactly once.
*
* See Rx Design Guidelines (PDF)
* for detailed information.
*
* @param
* the type of the items that this Observable emits
* @param func
* a function that accepts an {@code Observer}, invokes its
* {@code onNext}, {@code onError}, and {@code onCompleted} methods
* as appropriate, and returns a {@link Subscription} that allows the Observer to
* cancel the subscription
* @return an Observable that, when an {@link Observer} subscribes to it, will execute the given
* function
*/
public static Observable create(final Object func) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(func);
return create(new Func1, Subscription>() {
@Override
public Subscription call(Observer t1) {
return (Subscription) _f.call(t1);
}
});
}
/**
* Returns an Observable that emits no data to the {@link Observer} and immediately invokes
* its {@link Observer#onCompleted onCompleted} method.
*
*
*
* @param
* the type of the items (ostensibly) emitted by the Observable
* @return an Observable that returns no data to the {@link Observer} and immediately invokes
* the {@link Observer}'s {@link Observer#onCompleted() onCompleted} method
*/
public static Observable empty() {
return toObservable(new ArrayList());
}
/**
* Returns an Observable that invokes an {@link Observer}'s {@link Observer#onError onError}
* method when the Observer subscribes to it
*
*
*
* @param exception
* the particular error to report
* @param
* the type of the items (ostensibly) emitted by the Observable
* @return an Observable that invokes the {@link Observer}'s
* {@link Observer#onError onError} method when the Observer subscribes to it
*/
public static Observable error(Throwable exception) {
return new ThrowObservable(exception);
}
/**
* Filters an Observable by discarding any items it emits that do not satisfy some predicate.
*
*
*
* @param that
* the Observable to filter
* @param predicate
* a function that evaluates the items emitted by the source Observable, returning
* {@code true} if they pass the filter
* @return an Observable that emits only those items emitted by the source Observable for which the
* predicate evaluates to {@code true}
*/
public static Observable filter(Observable that, Func1 predicate) {
return create(OperationFilter.filter(that, predicate));
}
/**
* Filters an Observable by discarding any items it emits that do not satisfy some predicate
*
*
*
* @param that
* the Observable to filter
* @param function
* a function that evaluates an item emitted by the source Observable, and
* returns {@code true} if it passes the filter
* @return an Observable that emits only those items emitted by the source Observable for which the
* predicate function evaluates to {@code true}
*/
public static Observable filter(Observable that, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return filter(that, new Func1() {
@Override
public Boolean call(T t1) {
return (Boolean) _f.call(t1);
}
});
}
/**
* Filters an Observable by discarding any items it emits that do not satisfy some predicate
*
*
*
* @param that
* the Observable to filter
* @param predicate
* a function that evaluates an item emitted by the source Observable, and
* returns {@code true} if it passes the filter
* @return an Observable that emits only those items emitted by the source Observable for which
* the predicate evaluates to {@code true}
*/
public static Observable where(Observable that, Func1 predicate) {
return create(OperationWhere.where(that, predicate));
}
/**
* Converts an {@link Iterable} sequence into an Observable.
*
*
*
*
Implementation note: the entire iterable sequence will be immediately emitted each time an
* {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned,
* it in not possible to unsubscribe from the sequence before it completes.
*
* @param iterable
* the source {@link Iterable} sequence
* @param
* the type of items in the {@link Iterable} sequence and the type of items to be
* emitted by the resulting Observable
* @return an Observable that emits each item in the source {@link Iterable} sequence
* @see #toObservable(Iterable)
*/
public static Observable from(Iterable iterable) {
return toObservable(iterable);
}
/**
* Converts an Array into an Observable.
*
*
*
*
Implementation note: the entire array will be immediately emitted each time an
* {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned,
* it in not possible to unsubscribe from the sequence before it completes.
*
* @param items
* the source Array
* @param
* the type of items in the Array, and the type of items to be emitted by the
* resulting Observable
* @return an Observable that emits each item in the source Array
* @see #toObservable(Object...)
*/
public static Observable from(T... items) {
return toObservable(items);
}
/**
* Generates an Observable that emits a sequence of integers within a specified range.
*
*
*
*
Implementation note: the entire range will be immediately emitted each time an
* {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned,
* it in not possible to unsubscribe from the sequence before it completes.
*
* @param start
* the value of the first integer in the sequence
* @param count
* the number of sequential integers to generate
*
* @return an Observable that emits a range of sequential integers
*
* @see Observable.Range Method (Int32, Int32)
*/
public static Observable range(int start, int count) {
return from(Range.createWithCount(start, count));
}
/**
* Asynchronously subscribes and unsubscribes Observers on the specified {@link Scheduler}.
*
*
*
* @param source
* the source Observable
* @param scheduler
* the {@link Scheduler} to perform subscription and unsubscription actions on
* @param
* the type of the items emitted by the Observable
* @return the source Observable modified so that its subscriptions and unsubscriptions happen
* on the specified {@link Scheduler}
*/
public static Observable subscribeOn(Observable source, Scheduler scheduler) {
return create(OperationSubscribeOn.subscribeOn(source, scheduler));
}
/**
* Asynchronously notify Observers on the specified {@link Scheduler}.
*
*
*
* @param source
* the source Observable
* @param scheduler
* the {@link Scheduler} to notify Observers on
* @param
* the type of the items emitted by the Observable
* @return the source Observable modified so that its Observers are notified on the specified
* {@link Scheduler}
*/
public static Observable observeOn(Observable source, Scheduler scheduler) {
return create(OperationObserveOn.observeOn(source, scheduler));
}
/**
* Returns an Observable that calls an Observable factory to create its Observable for each
* new Observer that subscribes. That is, for each subscriber, the actuall Observable is determined
* by the factory function.
*
*
*
*
* The defer operator allows you to defer or delay emitting items from an Observable until such
* time as an Observer subscribes to the Observable. This allows an {@link Observer} to easily
* obtain updates or a refreshed version of the sequence.
*
* @param observableFactory
* the Observable factory function to invoke for each {@link Observer} that
* subscribes to the resulting Observable
* @param
* the type of the items emitted by the Observable
* @return an Observable whose {@link Observer}s trigger an invocation of the given Observable
* factory function
*/
public static Observable defer(Func0> observableFactory) {
return create(OperationDefer.defer(observableFactory));
}
/**
* Returns an Observable that calls an Observable factory to create its Observable for each
* new Observer that subscribes.
*
*
*
* The defer operator allows you to defer or delay emitting items from an Observable
* until such time as an {@link Observer} subscribes to the Observable. This allows an Observer
* to easily obtain an updates or refreshed version of the sequence.
*
* @param observableFactory
* the Observable factory function to invoke for each {@link Observer} that
* subscribes to the resulting Observable
* @param
* the type of the items emitted by the Observable
* @return an Observable whose {@link Observer}s trigger an invocation of the given Observable
* factory function
*/
public static Observable defer(Object observableFactory) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(observableFactory);
return create(OperationDefer.defer(new Func0>() {
@Override
@SuppressWarnings("unchecked")
public Observable call() {
return (Observable) _f.call();
}
}));
}
/**
* Returns an Observable that emits a single item and then completes.
*
*
*
* To convert any object into an Observable that emits that object, pass that object into the
* just method.
*
* This is similar to the {@link #from(java.lang.Object[])} method, except that
* from() will convert an {@link Iterable} object into an Observable that emits
* each of the items in the Iterable, one at a time, while the just() method
* converts an Iterable into an Observable that emits the entire Iterable as a single item.
*
* @param value
* the item to pass to the {@link Observer}'s {@link Observer#onNext onNext} method
* @param
* the type of that item
* @return an Observable that emits a single item and then completes
*/
public static Observable just(T value) {
List list = new ArrayList();
list.add(value);
return toObservable(list);
}
/**
* Returns an Observable that applies a function of your choosing to each item emitted by an
* Observable and emits the result.
*
*
*
* @param sequence
* the source Observable
* @param func
* a function to apply to each item emitted by the source Observable
* @param
* the type of items emitted by the the source Observable
* @param
* the type of items to be emitted by the resulting Observable
* @return an Observable that emits the items from the source Observable as transformed by the
* given function
*/
public static Observable map(Observable sequence, Func1 func) {
return create(OperationMap.map(sequence, func));
}
/**
* Returns an Observable that applies the given function to each item emitted by an
* Observable and emits the result.
*
*
*
* @param sequence
* the source Observable
* @param func
* a function to apply to each item emitted by the source Observable
* @param
* the type of items emitted by the the source Observable
* @param
* the type of items to be emitted by the resulting Observable
* @return an Observable that emits the items from the source Observable as transformed by the
* given function
*/
public static Observable map(Observable sequence, final Object func) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(func);
return map(sequence, new Func1() {
@SuppressWarnings("unchecked")
@Override
public R call(T t1) {
return (R) _f.call(t1);
}
});
}
/**
* Creates a new Observable by applying a function that you supply to each item emitted by
* the source Observable, where that function returns an Observable, and then merging those
* resulting Observables and emitting the results of this merger.
*
*
*
* Note: {@code mapMany} and {@code flatMap} are equivalent.
*
* @param sequence
* the source Observable
* @param func
* a function that, when applied to an item emitted by the source Observable,
* returns an Observable
* @param
* the type of items emitted by the source Observable
* @param
* the type of items emitted by the Observables that are returned from
* {@code func}
* @return an Observable that emits the result of applying the transformation function to each
* item emitted by the source Observable and merging the results of the Observables
* obtained from this transformation
* @see #flatMap(Observable, Func1)
*/
public static Observable mapMany(Observable sequence, Func1> func) {
return create(OperationMap.mapMany(sequence, func));
}
/**
* Creates a new Observable by applying a function that you supply to each item emitted by
* the source Observable, where that function returns an Observable, and then merging those
* resulting Observables and emitting the results of this merger.
*
*
*
* Note: {@code mapMany} and {@code flatMap} are equivalent.
*
* @param sequence
* the source Observable
* @param func
* a function that, when applied to each item emitted by the source Observable,
* generates an Observable
* @param
* the type of items emitted by the source Observable
* @param
* the type of items emitted by the Observables that are returned from
* {@code func}
* @return an Observable that emits the result of applying the transformation function to each
* item emitted by the source Observable and merging the results of the Observables
* obtained from this transformation
*/
public static Observable mapMany(Observable sequence, final Object func) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(func);
return mapMany(sequence, new Func1() {
@SuppressWarnings("unchecked")
@Override
public R call(T t1) {
return (R) _f.call(t1);
}
});
}
/**
* Turns all of the notifications from a source Observable into {@link Observer#onNext onNext}
* emissions, and marks them with their original notification types within {@link Notification}
* objects.
*
*
*
* @param sequence
* the Observable you want to materialize in this way
* @return an Observable that emits items that are the result of materializing the
* notifications of the source Observable.
* @see MSDN: Observable.Materialize
*/
public static Observable> materialize(final Observable sequence) {
return create(OperationMaterialize.materialize(sequence));
}
/**
* Reverses the effect of {@link #materialize materialize} by transforming the
* {@link Notification} objects emitted by a source Observable into the items or notifications
* they represent.
*
*
*
* @param sequence
* an Observable that emits {@link Notification} objects that represent the items and
* notifications emitted by an Observable
* @return an Observable that emits the items and notifications embedded in the
* {@link Notification} objects emitted by the source Observable
* @see MSDN: Observable.Dematerialize
*/
public static Observable dematerialize(final Observable> sequence) {
return create(OperationDematerialize.dematerialize(sequence));
}
/**
* Flattens a list of Observables into one Observable, without any transformation.
*
*
*
* You can combine the items emitted by multiple Observables so that they act like a single
* Observable, by using the merge method.
*
* @param source
* a list of Observables
* @return an Observable that emits items that are the result of flattening the
* {@code source} list of Observables
* @see MSDN: Observable.Merge
*/
public static Observable merge(List> source) {
return create(OperationMerge.merge(source));
}
/**
* Flattens a sequence of Observables emitted by an Observable into one Observable, without any
* transformation.
*
*
*
* You can combine the items emitted by multiple Observables so that they act like a single
* Observable, by using the {@code merge} method.
*
* @param source
* an Observable that emits Observables
* @return an Observable that emits items that are the result of flattening the items emitted
* by the Observables emitted by the {@code source} Observable
* @see MSDN: Observable.Merge Method
*/
public static Observable merge(Observable> source) {
return create(OperationMerge.merge(source));
}
/**
* Flattens a series of Observables into one Observable, without any transformation.
*
*
*
* You can combine items emitted by multiple Observables so that they act like a single
* Observable, by using the {@code merge} method.
*
* @param source
* a series of Observables
* @return an Observable that emits items that are the result of flattening the items emitted
* by the {@code source} Observables
* @see MSDN: Observable.Merge Method
*/
public static Observable merge(Observable... source) {
return create(OperationMerge.merge(source));
}
/**
* Returns an Observable that emits the items from the {@code source} Observable until
* the {@code other} Observable emits an item.
*
*
*
* @param source
* the source Observable
* @param other
* the Observable whose first emitted item will cause {@code takeUntil} to stop
* emitting items from the {@code source} Observable
* @param
* the type of items emitted by {@code source}
* @param
* the type of items emitted by {@code other}
* @return an Observable that emits the items emitted by {@code source} until such time as
* {@code other} emits its first item
*/
public static Observable takeUntil(final Observable source, final Observable other) {
return OperationTakeUntil.takeUntil(source, other);
}
/**
* Returns an Observable that emits the items emitted by two or more Observables, one after the
* other.
*
*
*
* @param source
* a series of Observables
* @return an Observable that emits items that are the result of combining the items emitted by
* the {@code source} Observables, one after the other
* @see MSDN: Observable.Concat Method
*/
public static Observable concat(Observable... source) {
return create(OperationConcat.concat(source));
}
/**
* Returns an Observable that emits the same items as the source Observable, and then calls
* the given Action after the Observable completes.
*
*
*
* @param source
* an Observable
* @param action
* an {@link Action0} to be invoked when the source Observable completes
* or errors
* @return an Observable that emits the same items as the source, then invokes the action
* @see MSDN:
* Observable.Finally Method
*/
public static Observable finallyDo(Observable source, Action0 action) {
return create(OperationFinally.finallyDo(source, action));
}
/**
* Creates a new Observable by applying a function that you supply to each item emitted by
* the source Observable, where that function returns an Observable, and then merging those
* resulting Observables and emitting the results of this merger.
*
*
*
* Note: {@code mapMany} and {@code flatMap} are equivalent.
*
* @param sequence
* the source Observable
* @param func
* a function that, when applied to each item emitted by the source Observable,
* generates an Observable
* @param
* the type of items emitted by the source Observable
* @param
* the type of items emitted by the Observables that are returned from
* {@code func}
* @return an Observable that emits the result of applying the transformation function to each
* item emitted by the source Observable and merging the results of the Observables
* obtained from this transformation
* @see #mapMany(Observable, Func1)
*/
public static Observable flatMap(Observable sequence, Func1> func) {
return mapMany(sequence, func);
}
/**
* Creates a new Observable by applying a function that you supply to each item emitted by
* the source Observable, where that function returns an Observable, and then merging those
* resulting Observables and emitting the results of this merger.
*
*
*
* Note: {@code mapMany} and {@code flatMap} are equivalent.
*
* @param sequence
* the source Observable
* @param func
* a function that, when applied to each item emitted by the source Observable,
* generates an Observable
* @param
* the type of items emitted by the source Observable
* @param
* the type of items emitted by the Observables that are returned from
* {@code func}
* @return an Observable that emits the result of applying the transformation function to each
* item emitted by the source Observable and merging the results of the Observables
* obtained from this transformation
* @see #mapMany(Observable, Func1)
*/
public static Observable flatMap(Observable sequence, final Object func) {
return mapMany(sequence, func);
}
/**
* Groups the items emitted by an Observable according to a specified criterion, and emits these
* grouped items as {@link GroupedObservable}s, one GroupedObservable per group.
*
*
*
* @param source
* an Observable whose items you want to group
* @param keySelector
* a function that extracts the key for each item omitted by the source Observable
* @param elementSelector
* a function to map each item emitted by the source Observable to an item emitted
* by a {@link GroupedObservable}
* @param
* the key type
* @param
* the type of items emitted by the source Observable
* @param
* the type of items to be emitted by the resulting {@link GroupedObservable}s
* @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a
* unique key value and emits items representing items from the source Observable that
* share that key value
*/
public static Observable> groupBy(Observable source, final Func1 keySelector, final Func1 elementSelector) {
return create(OperationGroupBy.groupBy(source, keySelector, elementSelector));
}
/**
* Groups the items emitted by an Observable according to a specified criterion, and emits these
* grouped items as {@link GroupedObservable}s, one GroupedObservable per group.
*
*
*
* @param source
* an Observable whose items you want to group
* @param keySelector
* a function that extracts the key for each item omitted by the source Observable
* @param elementSelector
* a function to map each item emitted by the source Observable to an item emitted
* by a {@link GroupedObservable}
* @param
* the key type
* @param
* the type of items emitted by the source Observable
* @param
* the type of items to be emitted by the resulting {@link GroupedObservable}s
* @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a
* unique key value and emits items representing items from the source Observable that
* share that key value
*/
@SuppressWarnings("rawtypes")
public static Observable> groupBy(Observable