From 3e148a8832c18b0ac3297da4620d2d8c8a78da40 Mon Sep 17 00:00:00 2001 From: Jake Wharton Date: Sun, 10 Apr 2016 17:52:59 -0400 Subject: [PATCH 001/273] Remove unused local. --- src/main/java/rx/subjects/ReplaySubject.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/rx/subjects/ReplaySubject.java b/src/main/java/rx/subjects/ReplaySubject.java index db8490f731..f1b693dc9a 100644 --- a/src/main/java/rx/subjects/ReplaySubject.java +++ b/src/main/java/rx/subjects/ReplaySubject.java @@ -717,8 +717,7 @@ public boolean isEmpty() { @SuppressWarnings("unchecked") public T[] toArray(T[] a) { List list = new ArrayList(); - NodeList.Node l = head(); - NodeList.Node next = l.next; + NodeList.Node next = head().next; while (next != null) { Object o = leaveTransform.call(next.value); @@ -727,7 +726,6 @@ public T[] toArray(T[] a) { } else { list.add((T)o); } - l = next; next = next.next; } return list.toArray(a); From d799ecac4cb496303f774b1864882dbcabbf7ec7 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Thu, 14 Apr 2016 19:50:26 +0200 Subject: [PATCH 002/273] 1.x: deanonymize Observable inner classes (#3848) * 1.x: deanonymize Observable inner classes * Further simplification of types * Fix indentation, rename Lambda to Action --- src/main/java/rx/Observable.java | 520 +++--------------- src/main/java/rx/functions/Actions.java | 23 + .../operators/EmptyObservableHolder.java | 46 ++ .../operators/NeverObservableHolder.java | 45 ++ .../internal/operators/OnSubscribeLift.java | 65 +++ .../internal/operators/OnSubscribeThrow.java | 46 ++ .../util/ActionNotificationObserver.java | 48 ++ .../rx/internal/util/ActionSubscriber.java | 51 ++ .../util/InternalObservableUtils.java | 379 +++++++++++++ .../rx/internal/util/ObserverSubscriber.java | 46 ++ 10 files changed, 825 insertions(+), 444 deletions(-) create mode 100644 src/main/java/rx/internal/operators/EmptyObservableHolder.java create mode 100644 src/main/java/rx/internal/operators/NeverObservableHolder.java create mode 100644 src/main/java/rx/internal/operators/OnSubscribeLift.java create mode 100644 src/main/java/rx/internal/operators/OnSubscribeThrow.java create mode 100644 src/main/java/rx/internal/util/ActionNotificationObserver.java create mode 100644 src/main/java/rx/internal/util/ActionSubscriber.java create mode 100644 src/main/java/rx/internal/util/InternalObservableUtils.java create mode 100644 src/main/java/rx/internal/util/ObserverSubscriber.java diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index bd96ee4556..92e9ea56fa 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -19,7 +19,6 @@ import rx.exceptions.*; import rx.functions.*; import rx.internal.operators.*; -import rx.internal.producers.SingleProducer; import rx.internal.util.*; import rx.observables.*; import rx.observers.SafeSubscriber; @@ -191,11 +190,23 @@ public interface Operator extends Func1, Subscriber< */ @Experimental public R extend(Func1, ? extends R> conversion) { - return conversion.call(new OnSubscribe() { - @Override - public void call(Subscriber subscriber) { - subscriber.add(Observable.subscribe(subscriber, Observable.this)); - }}); + return conversion.call(new OnSubscribeExtend(this)); + } + + /** + * Transforms a OnSubscribe.call() into an Observable.subscribe() call. + *

Note: has to be in Observable because it calls the private subscribe() method + * @param the value type + */ + static final class OnSubscribeExtend implements OnSubscribe { + final Observable parent; + OnSubscribeExtend(Observable parent) { + this.parent = parent; + } + @Override + public void call(Subscriber subscriber) { + subscriber.add(subscribe(subscriber, parent)); + } } /** @@ -222,30 +233,7 @@ public void call(Subscriber subscriber) { * @see RxJava wiki: Implementing Your Own Operators */ public final Observable lift(final Operator operator) { - return new Observable(new OnSubscribe() { - @Override - public void call(Subscriber o) { - try { - Subscriber st = hook.onLift(operator).call(o); - try { - // new Subscriber created and being subscribed with so 'onStart' it - st.onStart(); - onSubscribe.call(st); - } catch (Throwable e) { - // localized capture of errors rather than it skipping all operators - // and ending up in the try/catch of the subscribe method which then - // prevents onErrorResumeNext and other similar approaches to error handling - Exceptions.throwIfFatal(e); - st.onError(e); - } - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - // if the lift function failed all we can do is pass the error to the final Subscriber - // as we don't have the operator available to us - o.onError(e); - } - } - }); + return new Observable(new OnSubscribeLift(onSubscribe, operator)); } /** @@ -1253,16 +1241,6 @@ public static Observable defer(Func0> observableFactory) { return create(new OnSubscribeDefer(observableFactory)); } - /** Lazy initialized Holder for an empty observable which just emits onCompleted to any subscriber. */ - private static final class EmptyHolder { - final static Observable INSTANCE = create(new OnSubscribe() { - @Override - public void call(Subscriber subscriber) { - subscriber.onCompleted(); - } - }); - } - /** * Returns an Observable that emits no items to the {@link Observer} and immediately invokes its * {@link Observer#onCompleted onCompleted} method. @@ -1279,9 +1257,8 @@ public void call(Subscriber subscriber) { * {@link Observer}'s {@link Observer#onCompleted() onCompleted} method * @see ReactiveX operators documentation: Empty */ - @SuppressWarnings("unchecked") public static Observable empty() { - return (Observable) EmptyHolder.INSTANCE; + return EmptyObservableHolder.instance(); } /** @@ -1303,7 +1280,7 @@ public static Observable empty() { * @see ReactiveX operators documentation: Throw */ public static Observable error(Throwable exception) { - return new ThrowObservable(exception); + return create(new OnSubscribeThrow(exception)); } /** @@ -2740,7 +2717,7 @@ public final Observable> nest() { * @see ReactiveX operators documentation: Never */ public static Observable never() { - return NeverObservable.instance(); + return NeverObservableHolder.instance(); } /** @@ -2821,17 +2798,9 @@ public static Observable range(int start, int count, Scheduler schedule * @see ReactiveX operators documentation: SequenceEqual */ public static Observable sequenceEqual(Observable first, Observable second) { - return sequenceEqual(first, second, new Func2() { - @Override - public final Boolean call(T first, T second) { - if (first == null) { - return second == null; - } - return first.equals(second); - } - }); + return sequenceEqual(first, second, InternalObservableUtils.OBJECT_EQUALS); } - + /** * Returns an Observable that emits a Boolean value that indicates whether two Observable sequences are the * same by comparing the items emitted by each Observable pairwise based on the results of a specified @@ -3146,16 +3115,9 @@ public static Observable zip(Iterable> ws, FuncN< * @see ReactiveX operators documentation: Zip */ public static Observable zip(Observable> ws, final FuncN zipFunction) { - return ws.toList().map(new Func1>, Observable[]>() { - - @Override - public Observable[] call(List> o) { - return o.toArray(new Observable[o.size()]); - } - - }).lift(new OperatorZip(zipFunction)); + return ws.toList().map(InternalObservableUtils.TO_ARRAY).lift(new OperatorZip(zipFunction)); } - + /** * Returns an Observable that emits the results of a specified combiner function applied to combinations of * two items emitted, in sequence, by two other Observables. @@ -4016,15 +3978,7 @@ public final Observable cast(final Class klass) { * @see ReactiveX operators documentation: Reduce */ public final Observable collect(Func0 stateFactory, final Action2 collector) { - Func2 accumulator = new Func2() { - - @Override - public final R call(R state, T value) { - collector.call(state, value); - return state; - } - - }; + Func2 accumulator = InternalObservableUtils.createCollectorCaller(collector); /* * Discussion and confirmation of implementation at @@ -4147,12 +4101,7 @@ public final Observable concatWith(Observable t1) { * @see ReactiveX operators documentation: Contains */ public final Observable contains(final Object element) { - return exists(new Func1() { - @Override - public final Boolean call(T t1) { - return element == null ? t1 == null : element.equals(t1); - } - }); + return exists(InternalObservableUtils.equalsWith(element)); } /** @@ -4172,16 +4121,7 @@ public final Boolean call(T t1) { * @see #countLong() */ public final Observable count() { - return reduce(0, CountHolder.INSTANCE); - } - - private static final class CountHolder { - static final Func2 INSTANCE = new Func2() { - @Override - public final Integer call(Integer count, Object o) { - return count + 1; - } - }; + return reduce(0, InternalObservableUtils.COUNTER); } /** @@ -4203,18 +4143,9 @@ public final Integer call(Integer count, Object o) { * @see #count() */ public final Observable countLong() { - return reduce(0L, CountLongHolder.INSTANCE); + return reduce(0L, InternalObservableUtils.LONG_COUNTER); } - private static final class CountLongHolder { - static final Func2 INSTANCE = new Func2() { - @Override - public final Long call(Long count, Object o) { - return count + 1; - } - }; - } - /** * Returns an Observable that mirrors the source Observable, except that it drops items emitted by the * source Observable that are followed by another item within a computed debounce duration. @@ -4340,12 +4271,7 @@ public final Observable debounce(long timeout, TimeUnit unit, Scheduler sched */ public final Observable defaultIfEmpty(final T defaultValue) { //if empty switch to an observable that emits defaultValue and supports backpressure - return switchIfEmpty(Observable.create(new OnSubscribe() { - - @Override - public void call(Subscriber subscriber) { - subscriber.setProducer(new SingleProducer(subscriber, defaultValue)); - }})); + return switchIfEmpty(just(defaultValue)); } /** @@ -4676,21 +4602,9 @@ public final Observable distinctUntilChanged(Func1ReactiveX operators documentation: Do */ public final Observable doOnCompleted(final Action0 onCompleted) { - Observer observer = new Observer() { - @Override - public final void onCompleted() { - onCompleted.call(); - } - - @Override - public final void onError(Throwable e) { - } - - @Override - public final void onNext(T args) { - } - - }; + Action1 onNext = Actions.empty(); + Action1 onError = Actions.empty(); + Observer observer = new ActionSubscriber(onNext, onError, onCompleted); return lift(new OperatorDoOnEach(observer)); } @@ -4710,23 +4624,7 @@ public final void onNext(T args) { * @see ReactiveX operators documentation: Do */ public final Observable doOnEach(final Action1> onNotification) { - Observer observer = new Observer() { - @Override - public final void onCompleted() { - onNotification.call(Notification.createOnCompleted()); - } - - @Override - public final void onError(Throwable e) { - onNotification.call(Notification.createOnError(e)); - } - - @Override - public final void onNext(T v) { - onNotification.call(Notification.createOnNext(v)); - } - - }; + Observer observer = new ActionNotificationObserver(onNotification); return lift(new OperatorDoOnEach(observer)); } @@ -4772,21 +4670,9 @@ public final Observable doOnEach(Observer observer) { * @see ReactiveX operators documentation: Do */ public final Observable doOnError(final Action1 onError) { - Observer observer = new Observer() { - @Override - public final void onCompleted() { - } - - @Override - public final void onError(Throwable e) { - onError.call(e); - } - - @Override - public final void onNext(T args) { - } - - }; + Action1 onNext = Actions.empty(); + Action0 onCompleted = Actions.empty(); + Observer observer = new ActionSubscriber(onNext, onError, onCompleted); return lift(new OperatorDoOnEach(observer)); } @@ -4806,21 +4692,9 @@ public final void onNext(T args) { * @see ReactiveX operators documentation: Do */ public final Observable doOnNext(final Action1 onNext) { - Observer observer = new Observer() { - @Override - public final void onCompleted() { - } - - @Override - public final void onError(Throwable e) { - } - - @Override - public final void onNext(T args) { - onNext.call(args); - } - - }; + Action1 onError = Actions.empty(); + Action0 onCompleted = Actions.empty(); + Observer observer = new ActionSubscriber(onNext, onError, onCompleted); return lift(new OperatorDoOnEach(observer)); } @@ -4891,22 +4765,10 @@ public final Observable doOnSubscribe(final Action0 subscribe) { * @see #finallyDo(Action0) */ public final Observable doOnTerminate(final Action0 onTerminate) { - Observer observer = new Observer() { - @Override - public final void onCompleted() { - onTerminate.call(); - } - - @Override - public final void onError(Throwable e) { - onTerminate.call(); - } - - @Override - public final void onNext(T args) { - } - - }; + Action1 onNext = Actions.empty(); + Action1 onError = Actions.toAction1(onTerminate); + + Observer observer = new ActionSubscriber(onNext, onError, onTerminate); return lift(new OperatorDoOnEach(observer)); } @@ -6111,15 +5973,10 @@ public final Observable ignoreElements() { * @return an Observable that emits a Boolean * @see ReactiveX operators documentation: Contains */ - @SuppressWarnings("unchecked") public final Observable isEmpty() { - return lift((OperatorAny) HolderAnyForEmpty.INSTANCE); + return lift(InternalObservableUtils.IS_EMPTY); } - private static class HolderAnyForEmpty { - static final OperatorAny INSTANCE = new OperatorAny(UtilityFunctions.alwaysTrue(), true); - } - /** * Correlates the items emitted by two Observables based on overlapping durations. *

@@ -6459,12 +6316,7 @@ public final Observable observeOn(Scheduler scheduler, boolean delayError, in * @see ReactiveX operators documentation: Filter */ public final Observable ofType(final Class klass) { - return filter(new Func1() { - @Override - public final Boolean call(T t) { - return klass.isInstance(t); - } - }).cast(klass); + return filter(InternalObservableUtils.isInstanceOf(klass)).cast(klass); } /** @@ -6976,18 +6828,7 @@ public final Observable repeat(final long count, Scheduler scheduler) { * @see ReactiveX operators documentation: Repeat */ public final Observable repeatWhen(final Func1, ? extends Observable> notificationHandler, Scheduler scheduler) { - Func1>, ? extends Observable> dematerializedNotificationHandler = new Func1>, Observable>() { - @Override - public Observable call(Observable> notifications) { - return notificationHandler.call(notifications.map(new Func1, Void>() { - @Override - public Void call(Notification notification) { - return null; - } - })); - } - }; - return OnSubscribeRedo.repeat(this, dematerializedNotificationHandler, scheduler); + return OnSubscribeRedo.repeat(this, InternalObservableUtils.createRepeatDematerializer(notificationHandler), scheduler); } /** @@ -7010,18 +6851,7 @@ public Void call(Notification notification) { * @see ReactiveX operators documentation: Repeat */ public final Observable repeatWhen(final Func1, ? extends Observable> notificationHandler) { - Func1>, ? extends Observable> dematerializedNotificationHandler = new Func1>, Observable>() { - @Override - public Observable call(Observable> notifications) { - return notificationHandler.call(notifications.map(new Func1, Void>() { - @Override - public Void call(Notification notification) { - return null; - } - })); - } - }; - return OnSubscribeRedo.repeat(this, dematerializedNotificationHandler); + return OnSubscribeRedo.repeat(this, InternalObservableUtils.createRepeatDematerializer(notificationHandler)); } /** @@ -7072,12 +6902,7 @@ public final ConnectableObservable replay() { * @see ReactiveX operators documentation: Replay */ public final Observable replay(Func1, ? extends Observable> selector) { - return OperatorReplay.multicastSelector(new Func0>() { - @Override - public ConnectableObservable call() { - return Observable.this.replay(); - } - }, selector); + return OperatorReplay.multicastSelector(InternalObservableUtils.createReplaySupplier(this), selector); } /** @@ -7108,12 +6933,7 @@ public ConnectableObservable call() { * @see ReactiveX operators documentation: Replay */ public final Observable replay(Func1, ? extends Observable> selector, final int bufferSize) { - return OperatorReplay.multicastSelector(new Func0>() { - @Override - public ConnectableObservable call() { - return Observable.this.replay(bufferSize); - } - }, selector); + return OperatorReplay.multicastSelector(InternalObservableUtils.createReplaySupplier(this, bufferSize), selector); } /** @@ -7192,12 +7012,8 @@ public final Observable replay(Func1, ? extends Obs if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } - return OperatorReplay.multicastSelector(new Func0>() { - @Override - public ConnectableObservable call() { - return Observable.this.replay(bufferSize, time, unit, scheduler); - } - }, selector); + return OperatorReplay.multicastSelector( + InternalObservableUtils.createReplaySupplier(this, bufferSize, time, unit, scheduler), selector); } /** @@ -7230,17 +7046,8 @@ public ConnectableObservable call() { * @see ReactiveX operators documentation: Replay */ public final Observable replay(final Func1, ? extends Observable> selector, final int bufferSize, final Scheduler scheduler) { - return OperatorReplay.multicastSelector(new Func0>() { - @Override - public ConnectableObservable call() { - return Observable.this.replay(bufferSize); - } - }, new Func1, Observable>() { - @Override - public Observable call(Observable t) { - return selector.call(t).observeOn(scheduler); - } - }); + return OperatorReplay.multicastSelector(InternalObservableUtils.createReplaySupplier(this, bufferSize), + InternalObservableUtils.createReplaySelectorAndObserveOn(selector, scheduler)); } /** @@ -7308,12 +7115,8 @@ public final Observable replay(Func1, ? extends Obs * @see ReactiveX operators documentation: Replay */ public final Observable replay(Func1, ? extends Observable> selector, final long time, final TimeUnit unit, final Scheduler scheduler) { - return OperatorReplay.multicastSelector(new Func0>() { - @Override - public ConnectableObservable call() { - return Observable.this.replay(time, unit, scheduler); - } - }, selector); + return OperatorReplay.multicastSelector( + InternalObservableUtils.createReplaySupplier(this, time, unit, scheduler), selector); } /** @@ -7343,17 +7146,9 @@ public ConnectableObservable call() { * @see ReactiveX operators documentation: Replay */ public final Observable replay(final Func1, ? extends Observable> selector, final Scheduler scheduler) { - return OperatorReplay.multicastSelector(new Func0>() { - @Override - public ConnectableObservable call() { - return Observable.this.replay(); - } - }, new Func1, Observable>() { - @Override - public Observable call(Observable t) { - return selector.call(t).observeOn(scheduler); - } - }); + return OperatorReplay.multicastSelector( + InternalObservableUtils.createReplaySupplier(this), + InternalObservableUtils.createReplaySelectorAndObserveOn(selector, scheduler)); } /** @@ -7689,18 +7484,7 @@ public final Observable retry(Func2 predicate) { * @see ReactiveX operators documentation: Retry */ public final Observable retryWhen(final Func1, ? extends Observable> notificationHandler) { - Func1>, ? extends Observable> dematerializedNotificationHandler = new Func1>, Observable>() { - @Override - public Observable call(Observable> notifications) { - return notificationHandler.call(notifications.map(new Func1, Throwable>() { - @Override - public Throwable call(Notification notification) { - return notification.getThrowable(); - } - })); - } - }; - return OnSubscribeRedo. retry(this, dematerializedNotificationHandler); + return OnSubscribeRedo.retry(this, InternalObservableUtils.createRetryDematerializer(notificationHandler)); } /** @@ -7727,18 +7511,7 @@ public Throwable call(Notification notification) { * @see ReactiveX operators documentation: Retry */ public final Observable retryWhen(final Func1, ? extends Observable> notificationHandler, Scheduler scheduler) { - Func1>, ? extends Observable> dematerializedNotificationHandler = new Func1>, Observable>() { - @Override - public Observable call(Observable> notifications) { - return notificationHandler.call(notifications.map(new Func1, Throwable>() { - @Override - public Throwable call(Notification notification) { - return notification.getThrowable(); - } - })); - } - }; - return OnSubscribeRedo. retry(this, dematerializedNotificationHandler, scheduler); + return OnSubscribeRedo. retry(this, InternalObservableUtils.createRetryDematerializer(notificationHandler), scheduler); } /** @@ -8512,24 +8285,10 @@ public final Observable startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T * @see ReactiveX operators documentation: Subscribe */ public final Subscription subscribe() { - return subscribe(new Subscriber() { - - @Override - public final void onCompleted() { - // do nothing - } - - @Override - public final void onError(Throwable e) { - throw new OnErrorNotImplementedException(e); - } - - @Override - public final void onNext(T args) { - // do nothing - } - - }); + Action1 onNext = Actions.empty(); + Action1 onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED; + Action0 onCompleted = Actions.empty(); + return subscribe(new ActionSubscriber(onNext, onError, onCompleted)); } /** @@ -8554,24 +8313,9 @@ public final Subscription subscribe(final Action1 onNext) { throw new IllegalArgumentException("onNext can not be null"); } - return subscribe(new Subscriber() { - - @Override - public final void onCompleted() { - // do nothing - } - - @Override - public final void onError(Throwable e) { - throw new OnErrorNotImplementedException(e); - } - - @Override - public final void onNext(T args) { - onNext.call(args); - } - - }); + Action1 onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED; + Action0 onCompleted = Actions.empty(); + return subscribe(new ActionSubscriber(onNext, onError, onCompleted)); } /** @@ -8602,24 +8346,8 @@ public final Subscription subscribe(final Action1 onNext, final Actio throw new IllegalArgumentException("onError can not be null"); } - return subscribe(new Subscriber() { - - @Override - public final void onCompleted() { - // do nothing - } - - @Override - public final void onError(Throwable e) { - onError.call(e); - } - - @Override - public final void onNext(T args) { - onNext.call(args); - } - - }); + Action0 onCompleted = Actions.empty(); + return subscribe(new ActionSubscriber(onNext, onError, onCompleted)); } /** @@ -8635,7 +8363,7 @@ public final void onNext(T args) { * @param onError * the {@code Action1} you have designed to accept any error notification from the * Observable - * @param onComplete + * @param onCompleted * the {@code Action0} you have designed to accept a completion notification from the * Observable * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before @@ -8646,35 +8374,18 @@ public final void onNext(T args) { * if {@code onComplete} is null * @see ReactiveX operators documentation: Subscribe */ - public final Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) { + public final Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onCompleted) { if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } if (onError == null) { throw new IllegalArgumentException("onError can not be null"); } - if (onComplete == null) { + if (onCompleted == null) { throw new IllegalArgumentException("onComplete can not be null"); } - return subscribe(new Subscriber() { - - @Override - public final void onCompleted() { - onComplete.call(); - } - - @Override - public final void onError(Throwable e) { - onError.call(e); - } - - @Override - public final void onNext(T args) { - onNext.call(args); - } - - }); + return subscribe(new ActionSubscriber(onNext, onError, onCompleted)); } /** @@ -8695,24 +8406,7 @@ public final Subscription subscribe(final Observer observer) { if (observer instanceof Subscriber) { return subscribe((Subscriber)observer); } - return subscribe(new Subscriber() { - - @Override - public void onCompleted() { - observer.onCompleted(); - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onNext(T t) { - observer.onNext(t); - } - - }); + return subscribe(new ObserverSubscriber(observer)); } /** @@ -8801,7 +8495,7 @@ public final Subscription subscribe(Subscriber subscriber) { return Observable.subscribe(subscriber, this); } - private static Subscription subscribe(Subscriber subscriber, Observable observable) { + static Subscription subscribe(Subscriber subscriber, Observable observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("observer can not be null"); @@ -10601,66 +10295,4 @@ public final Observable zipWith(Iterable other, Func2 Observable zipWith(Observable other, Func2 zipFunction) { return (Observable)zip(this, other, zipFunction); } - - /** - * An Observable that never sends any information to an {@link Observer}. - * This Observable is useful primarily for testing purposes. - * - * @param - * the type of item (not) emitted by the Observable - */ - private static class NeverObservable extends Observable { - - private static class Holder { - static final NeverObservable INSTANCE = new NeverObservable(); - } - - /** - * Returns a singleton instance of NeverObservable (cast to the generic type). - * - * @return singleton instance of NeverObservable (cast to the generic type) - */ - @SuppressWarnings("unchecked") - static NeverObservable instance() { - return (NeverObservable) Holder.INSTANCE; - } - - NeverObservable() { - super(new OnSubscribe() { - - @Override - public void call(Subscriber observer) { - // do nothing - } - - }); - } - } - - /** - * An Observable that invokes {@link Observer#onError onError} when the {@link Observer} subscribes to it. - * - * @param - * the type of item (ostensibly) emitted by the Observable - */ - private static class ThrowObservable extends Observable { - - public ThrowObservable(final Throwable exception) { - super(new OnSubscribe() { - - /** - * Accepts an {@link Observer} and calls its {@link Observer#onError onError} method. - * - * @param observer - * an {@link Observer} of this Observable - */ - @Override - public void call(Subscriber observer) { - observer.onError(exception); - } - - }); - } - } - } diff --git a/src/main/java/rx/functions/Actions.java b/src/main/java/rx/functions/Actions.java index ea18eaed91..bbf9d0b151 100644 --- a/src/main/java/rx/functions/Actions.java +++ b/src/main/java/rx/functions/Actions.java @@ -432,4 +432,27 @@ public R call(Object... args) { } }; } + + /** + * Wraps an Action0 instance into an Action1 instance where the latter calls + * the former. + * @param action the action to call + * @return the new Action1 instance + */ + public static Action1 toAction1(Action0 action) { + return new Action1CallsAction0(action); + } + + static final class Action1CallsAction0 implements Action1 { + final Action0 action; + + public Action1CallsAction0(Action0 action) { + this.action = action; + } + + @Override + public void call(T t) { + action.call(); + } + } } diff --git a/src/main/java/rx/internal/operators/EmptyObservableHolder.java b/src/main/java/rx/internal/operators/EmptyObservableHolder.java new file mode 100644 index 0000000000..20091ca211 --- /dev/null +++ b/src/main/java/rx/internal/operators/EmptyObservableHolder.java @@ -0,0 +1,46 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import rx.*; +import rx.Observable.OnSubscribe; + +/** + * Holds a singleton instance of an empty Observable which is stateless and completes + * the child subscriber immediately. + */ +public enum EmptyObservableHolder implements OnSubscribe { + INSTANCE + ; + + /** + * Returns a type-corrected singleton instance of the empty Observable. + * @return a type-corrected singleton instance of the empty Observable. + */ + @SuppressWarnings("unchecked") + public static Observable instance() { + return (Observable)EMPTY; + } + + /** The singleton instance. */ + static final Observable EMPTY = Observable.create(INSTANCE); + + @Override + public void call(Subscriber child) { + child.onCompleted(); + } +} diff --git a/src/main/java/rx/internal/operators/NeverObservableHolder.java b/src/main/java/rx/internal/operators/NeverObservableHolder.java new file mode 100644 index 0000000000..451f31c67b --- /dev/null +++ b/src/main/java/rx/internal/operators/NeverObservableHolder.java @@ -0,0 +1,45 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import rx.*; +import rx.Observable.OnSubscribe; + +/** + * Holds a singleton instance of a never Observable which is stateless doesn't + * call any of the Subscriber's methods. + */ +public enum NeverObservableHolder implements OnSubscribe { + INSTANCE + ; + + /** + * Returns a type-corrected singleton instance of the never Observable. + * @return a type-corrected singleton instance of the never Observable. + */ + @SuppressWarnings("unchecked") + public static Observable instance() { + return (Observable)NEVER; + } + + /** The singleton instance. */ + static final Observable NEVER = Observable.create(INSTANCE); + + @Override + public void call(Subscriber child) { + } +} diff --git a/src/main/java/rx/internal/operators/OnSubscribeLift.java b/src/main/java/rx/internal/operators/OnSubscribeLift.java new file mode 100644 index 0000000000..ba6210f38b --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeLift.java @@ -0,0 +1,65 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import rx.Observable.*; +import rx.Subscriber; +import rx.exceptions.Exceptions; +import rx.plugins.*; + +/** + * Transforms the downstream Subscriber into a Subscriber via an operator + * callback and calls the parent OnSubscribe.call() method with it. + * @param the source value type + * @param the result value type + */ +public final class OnSubscribeLift implements OnSubscribe { + + static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); + + final OnSubscribe parent; + + final Operator operator; + + public OnSubscribeLift(OnSubscribe parent, Operator operator) { + this.parent = parent; + this.operator = operator; + } + + @Override + public void call(Subscriber o) { + try { + Subscriber st = hook.onLift(operator).call(o); + try { + // new Subscriber created and being subscribed with so 'onStart' it + st.onStart(); + parent.call(st); + } catch (Throwable e) { + // localized capture of errors rather than it skipping all operators + // and ending up in the try/catch of the subscribe method which then + // prevents onErrorResumeNext and other similar approaches to error handling + Exceptions.throwIfFatal(e); + st.onError(e); + } + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + // if the lift function failed all we can do is pass the error to the final Subscriber + // as we don't have the operator available to us + o.onError(e); + } + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/operators/OnSubscribeThrow.java b/src/main/java/rx/internal/operators/OnSubscribeThrow.java new file mode 100644 index 0000000000..d39bba5939 --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeThrow.java @@ -0,0 +1,46 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import rx.*; +import rx.Observable.OnSubscribe; + +/** + * An Observable that invokes {@link Observer#onError onError} when the {@link Observer} subscribes to it. + * + * @param + * the type of item (ostensibly) emitted by the Observable + */ +public final class OnSubscribeThrow implements OnSubscribe { + + private final Throwable exception; + + public OnSubscribeThrow(Throwable exception) { + this.exception = exception; + } + + /** + * Accepts an {@link Observer} and calls its {@link Observer#onError onError} method. + * + * @param observer + * an {@link Observer} of this Observable + */ + @Override + public void call(Subscriber observer) { + observer.onError(exception); + } +} diff --git a/src/main/java/rx/internal/util/ActionNotificationObserver.java b/src/main/java/rx/internal/util/ActionNotificationObserver.java new file mode 100644 index 0000000000..162d9371b4 --- /dev/null +++ b/src/main/java/rx/internal/util/ActionNotificationObserver.java @@ -0,0 +1,48 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.util; + +import rx.*; +import rx.functions.*; + +/** + * An Observer that forwards the onXXX method calls to a notification callback + * by transforming each signal type into Notifications. + * @param the value type + */ +public final class ActionNotificationObserver implements Observer { + + final Action1> onNotification; + + public ActionNotificationObserver(Action1> onNotification) { + this.onNotification = onNotification; + } + + @Override + public void onNext(T t) { + onNotification.call(Notification.createOnNext(t)); + } + + @Override + public void onError(Throwable e) { + onNotification.call(Notification.createOnError(e)); + } + + @Override + public void onCompleted() { + onNotification.call(Notification.createOnCompleted()); + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/ActionSubscriber.java b/src/main/java/rx/internal/util/ActionSubscriber.java new file mode 100644 index 0000000000..33a88fe5ed --- /dev/null +++ b/src/main/java/rx/internal/util/ActionSubscriber.java @@ -0,0 +1,51 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.util; + +import rx.Subscriber; +import rx.functions.*; + +/** + * A Subscriber that forwards the onXXX method calls to callbacks. + * @param the value type + */ +public final class ActionSubscriber extends Subscriber { + + final Action1 onNext; + final Action1 onError; + final Action0 onCompleted; + + public ActionSubscriber(Action1 onNext, Action1 onError, Action0 onCompleted) { + this.onNext = onNext; + this.onError = onError; + this.onCompleted = onCompleted; + } + + @Override + public void onNext(T t) { + onNext.call(t); + } + + @Override + public void onError(Throwable e) { + onError.call(e); + } + + @Override + public void onCompleted() { + onCompleted.call(); + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/InternalObservableUtils.java b/src/main/java/rx/internal/util/InternalObservableUtils.java new file mode 100644 index 0000000000..0e8e3d6878 --- /dev/null +++ b/src/main/java/rx/internal/util/InternalObservableUtils.java @@ -0,0 +1,379 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.util; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import rx.*; +import rx.Observable.Operator; +import rx.exceptions.OnErrorNotImplementedException; +import rx.functions.*; +import rx.internal.operators.OperatorAny; +import rx.observables.ConnectableObservable; + +/** + * Holder of named utility classes factored out from Observable to save + * source space and help with debugging with properly named objects. + */ +public enum InternalObservableUtils { + ; + + /** + * A BiFunction that expects an integer as its first parameter and returns +1. + */ + public static final PlusOneFunc2 COUNTER = new PlusOneFunc2(); + + static final class PlusOneFunc2 implements Func2 { + @Override + public Integer call(Integer count, Object o) { + return count + 1; + } + } + + /** + * A BiFunction that expects a long as its first parameter and returns +1. + */ + public static final PlusOneLongFunc2 LONG_COUNTER = new PlusOneLongFunc2(); + + static final class PlusOneLongFunc2 implements Func2 { + @Override + public Long call(Long count, Object o) { + return count + 1; + } + } + + /** + * A bifunction comparing two objects via null-safe equals. + */ + public static final ObjectEqualsFunc2 OBJECT_EQUALS = new ObjectEqualsFunc2(); + + static final class ObjectEqualsFunc2 implements Func2 { + @Override + public Boolean call(Object first, Object second) { + return first == second || (first != null && first.equals(second)); + } + } + + /** + * A function that converts a List of Observables into an array of Observables. + */ + public static final ToArrayFunc1 TO_ARRAY = new ToArrayFunc1(); + + static final class ToArrayFunc1 implements Func1>, Observable[]> { + @Override + public Observable[] call(List> o) { + return o.toArray(new Observable[o.size()]); + } + } + + /** + * Returns a Func1 that checks if its argument is null-safe equals with the given + * constant reference. + * @param other the other object to check against (nulls allowed) + * @return the comparison function + */ + public static Func1 equalsWith(Object other) { + return new EqualsWithFunc1(other); + } + + static final class EqualsWithFunc1 implements Func1 { + final Object other; + + public EqualsWithFunc1(Object other) { + this.other = other; + } + + @Override + public Boolean call(Object t) { + return t == other || (t != null && t.equals(other)); + } + } + + /** + * Returns a Func1 that checks if its argument is an instance of + * the supplied class. + * @param clazz the class to check against + * @return the comparison function + */ + public static Func1 isInstanceOf(Class clazz) { + return new IsInstanceOfFunc1(clazz); + } + + static final class IsInstanceOfFunc1 implements Func1 { + final Class clazz; + + public IsInstanceOfFunc1(Class other) { + this.clazz = other; + } + + @Override + public Boolean call(Object t) { + return clazz.isInstance(t); + } + } + + /** + * Returns a function that dematerializes the notification signal from an Observable and calls + * a notification handler with a null for non-terminal events. + * @param notificationHandler the handler to notify with nulls + * @return the Func1 instance + */ + public static final Func1>, Observable> createRepeatDematerializer(Func1, ? extends Observable> notificationHandler) { + return new RepeatNotificationDematerializer(notificationHandler); + } + + static final class RepeatNotificationDematerializer implements Func1>, Observable> { + + final Func1, ? extends Observable> notificationHandler; + + public RepeatNotificationDematerializer(Func1, ? extends Observable> notificationHandler) { + this.notificationHandler = notificationHandler; + } + + @Override + public Observable call(Observable> notifications) { + return notificationHandler.call(notifications.map(RETURNS_VOID)); + } + }; + + static final ReturnsVoidFunc1 RETURNS_VOID = new ReturnsVoidFunc1(); + + static final class ReturnsVoidFunc1 implements Func1 { + @Override + public Void call(Object t) { + return null; + } + } + + /** + * Creates a Func1 which calls the selector function with the received argument, applies an + * observeOn on the result and returns the resulting Observable. + * @param selector the selector function + * @param scheduler the scheduler to apply on the output of the selector + * @return the new Func1 instance + */ + public static Func1, Observable> createReplaySelectorAndObserveOn( + Func1, ? extends Observable> selector, + Scheduler scheduler) { + return new SelectorAndObserveOn(selector, scheduler); + } + + static final class SelectorAndObserveOn implements Func1, Observable> { + final Func1, ? extends Observable> selector; + final Scheduler scheduler; + + public SelectorAndObserveOn(Func1, ? extends Observable> selector, + Scheduler scheduler) { + super(); + this.selector = selector; + this.scheduler = scheduler; + } + + + + @Override + public Observable call(Observable t) { + return selector.call(t).observeOn(scheduler); + } + } + + /** + * Returns a function that dematerializes the notification signal from an Observable and calls + * a notification handler with the Throwable. + * @param notificationHandler the handler to notify with Throwables + * @return the Func1 instance + */ + public static final Func1>, Observable> createRetryDematerializer(Func1, ? extends Observable> notificationHandler) { + return new RetryNotificationDematerializer(notificationHandler); + } + + static final class RetryNotificationDematerializer implements Func1>, Observable> { + final Func1, ? extends Observable> notificationHandler; + + public RetryNotificationDematerializer(Func1, ? extends Observable> notificationHandler) { + this.notificationHandler = notificationHandler; + } + + @Override + public Observable call(Observable> notifications) { + return notificationHandler.call(notifications.map(ERROR_EXTRACTOR)); + } + } + + static final NotificationErrorExtractor ERROR_EXTRACTOR = new NotificationErrorExtractor(); + + static final class NotificationErrorExtractor implements Func1, Throwable> { + @Override + public Throwable call(Notification t) { + return t.getThrowable(); + } + } + + /** + * Returns a Func0 that supplies the ConnectableObservable returned by calling replay() on the source. + * @param source the source to call replay on by the supplier function + * @return the new Func0 instance + */ + public static Func0> createReplaySupplier(final Observable source) { + return new ReplaySupplierNoParams(source); + } + + private static final class ReplaySupplierNoParams implements Func0> { + private final Observable source; + + private ReplaySupplierNoParams(Observable source) { + this.source = source; + } + + @Override + public ConnectableObservable call() { + return source.replay(); + } + } + /** + * Returns a Func0 that supplies the ConnectableObservable returned by calling a parameterized replay() on the source. + * @param source the source to call replay on by the supplier function + * @param bufferSize + * the buffer size that limits the number of items the connectable observable can replay + * @return the new Func0 instance + */ + public static Func0> createReplaySupplier(final Observable source, final int bufferSize) { + return new ReplaySupplierBuffer(source, bufferSize); + } + + static final class ReplaySupplierBuffer implements Func0> { + private final Observable source; + private final int bufferSize; + + private ReplaySupplierBuffer(Observable source, int bufferSize) { + this.source = source; + this.bufferSize = bufferSize; + } + + @Override + public ConnectableObservable call() { + return source.replay(bufferSize); + } + } + + /** + * Returns a Func0 that supplies the ConnectableObservable returned by calling a parameterized replay() on the source. + * @param source the source to call replay on by the supplier function + * @param time + * the duration of the window in which the replayed items must have been emitted + * @param unit + * the time unit of {@code time} + * @return the new Func0 instance + */ + public static Func0> createReplaySupplier(final Observable source, final long time, final TimeUnit unit, final Scheduler scheduler) { + return new ReplaySupplierBufferTime(source, time, unit, scheduler); + } + + static final class ReplaySupplierBufferTime implements Func0> { + private final TimeUnit unit; + private final Observable source; + private final long time; + private final Scheduler scheduler; + + private ReplaySupplierBufferTime(Observable source, long time, TimeUnit unit, Scheduler scheduler) { + this.unit = unit; + this.source = source; + this.time = time; + this.scheduler = scheduler; + } + + @Override + public ConnectableObservable call() { + return source.replay(time, unit, scheduler); + } + } + + /** + * Returns a Func0 that supplies the ConnectableObservable returned by calling a parameterized replay() on the source. + * @param source the source to call replay on by the supplier function + * @param bufferSize + * the buffer size that limits the number of items the connectable observable can replay + * @param time + * the duration of the window in which the replayed items must have been emitted + * @param unit + * the time unit of {@code time} + * @return the new Func0 instance + */ + public static Func0> createReplaySupplier(final Observable source, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { + return new ReplaySupplierTime(source, bufferSize, time, unit, scheduler); + } + + static final class ReplaySupplierTime implements Func0> { + private final long time; + private final TimeUnit unit; + private final Scheduler scheduler; + private final int bufferSize; + private final Observable source; + + private ReplaySupplierTime(Observable source, int bufferSize, long time, TimeUnit unit, + Scheduler scheduler) { + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + this.bufferSize = bufferSize; + this.source = source; + } + + @Override + public ConnectableObservable call() { + return source.replay(bufferSize, time, unit, scheduler); + } + } + + /** + * Returns a Func2 which calls a collector with its parameters and returns the first (R) parameter. + * @param collector the collector action to call + * @return the new Func2 instance + */ + public static Func2 createCollectorCaller(Action2 collector) { + return new CollectorCaller(collector); + } + + static final class CollectorCaller implements Func2 { + final Action2 collector; + + public CollectorCaller(Action2 collector) { + this.collector = collector; + } + + @Override + public R call(R state, T value) { + collector.call(state, value); + return state; + } + } + + /** + * Throws an OnErrorNotImplementedException when called. + */ + public static final Action1 ERROR_NOT_IMPLEMENTED = new ErrorNotImplementedAction(); + + static final class ErrorNotImplementedAction implements Action1 { + @Override + public void call(Throwable t) { + throw new OnErrorNotImplementedException(t); + } + } + + public static final Operator IS_EMPTY = new OperatorAny(UtilityFunctions.alwaysTrue(), true); +} diff --git a/src/main/java/rx/internal/util/ObserverSubscriber.java b/src/main/java/rx/internal/util/ObserverSubscriber.java new file mode 100644 index 0000000000..dbf519c263 --- /dev/null +++ b/src/main/java/rx/internal/util/ObserverSubscriber.java @@ -0,0 +1,46 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.util; + +import rx.*; + +/** + * Wraps an Observer and forwards the onXXX method calls to it. + * @param the value type + */ +public final class ObserverSubscriber extends Subscriber { + final Observer observer; + + public ObserverSubscriber(Observer observer) { + this.observer = observer; + } + + @Override + public void onNext(T t) { + observer.onNext(t); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } +} From 54c6ed2f3fe4fabb00baca3df0f97901dee5b587 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Thu, 14 Apr 2016 19:51:04 +0200 Subject: [PATCH 003/273] 1.x: ConcatMap vs ConcatMapIterable perf (#3853) --- .../rx/operators/ConcatMapInterablePerf.java | 181 ++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 src/perf/java/rx/operators/ConcatMapInterablePerf.java diff --git a/src/perf/java/rx/operators/ConcatMapInterablePerf.java b/src/perf/java/rx/operators/ConcatMapInterablePerf.java new file mode 100644 index 0000000000..40ea778acb --- /dev/null +++ b/src/perf/java/rx/operators/ConcatMapInterablePerf.java @@ -0,0 +1,181 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.operators; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import rx.Observable; +import rx.functions.Func1; +import rx.jmh.LatchedObserver; + +/** + * Benchmark ConcatMapIterable. + *

+ * gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*ConcatMapInterablePerf.*" + *

+ * gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*ConcatMapInterablePerf.*" + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Thread) +public class ConcatMapInterablePerf { + + @Param({"1", "10", "100", "1000", "10000", "100000", "1000000"}) + public int count; + + Observable justPlain; + + Observable justIterable; + + Observable rangePlain; + + Observable rangeIterable; + + Observable xrangePlain; + + Observable xrangeIterable; + + Observable chainPlain; + + Observable chainIterable; + + @Setup + public void setup() { + Integer[] values = new Integer[count]; + for (int i = 0; i < count; i++) { + values[i] = i; + } + + int c = 1000000 / count; + Integer[] xvalues = new Integer[c]; + for (int i = 0; i < c; i++) { + xvalues[i] = i; + } + + Observable source = Observable.from(values); + + justPlain = source.concatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return Observable.just(v); + } + }); + justIterable = source.concatMapIterable(new Func1>() { + @Override + public Iterable call(Integer v) { + return Collections.singleton(v); + } + }); + + final Observable range = Observable.range(1, 2); + final List xrange = Arrays.asList(1, 2); + + rangePlain = source.concatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return range; + } + }); + rangeIterable = source.concatMapIterable(new Func1>() { + @Override + public Iterable call(Integer v) { + return xrange; + } + }); + + final Observable xsource = Observable.from(xvalues); + final List xvaluesList = Arrays.asList(xvalues); + + xrangePlain = source.concatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return xsource; + } + }); + xrangeIterable = source.concatMapIterable(new Func1>() { + @Override + public Iterable call(Integer v) { + return xvaluesList; + } + }); + + chainPlain = xrangePlain.concatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return Observable.just(v); + } + }); + chainIterable = xrangeIterable.concatMapIterable(new Func1>() { + @Override + public Iterable call(Integer v) { + return Collections.singleton(v); + } + }); + } + + @Benchmark + public void justPlain(Blackhole bh) { + justPlain.subscribe(new LatchedObserver(bh)); + } + + @Benchmark + public void justIterable(Blackhole bh) { + justIterable.subscribe(new LatchedObserver(bh)); + } + + @Benchmark + public void rangePlain(Blackhole bh) { + rangePlain.subscribe(new LatchedObserver(bh)); + } + + @Benchmark + public void rangeIterable(Blackhole bh) { + rangeIterable.subscribe(new LatchedObserver(bh)); + } + + @Benchmark + public void xrangePlain(Blackhole bh) { + xrangePlain.subscribe(new LatchedObserver(bh)); + } + + @Benchmark + public void xrangeIterable(Blackhole bh) { + xrangeIterable.subscribe(new LatchedObserver(bh)); + } + + @Benchmark + public void chainPlain(Blackhole bh) { + chainPlain.subscribe(new LatchedObserver(bh)); + } + + @Benchmark + public void chainIterable(Blackhole bh) { + chainIterable.subscribe(new LatchedObserver(bh)); + } + +} From a4298155f8bb1a5677495941a5d0f3ab6cbbbf77 Mon Sep 17 00:00:00 2001 From: Jake Wharton Date: Mon, 18 Apr 2016 06:34:36 -0400 Subject: [PATCH 004/273] Provide factories for creating the default scheduler instances. (#3856) --- .../schedulers/CachedThreadScheduler.java | 7 ++-- .../schedulers/NewThreadScheduler.java | 36 +++++++++++++++++++ .../java/rx/plugins/RxJavaSchedulersHook.java | 29 +++++++++++++++ .../rx/schedulers/NewThreadScheduler.java | 18 +++------- src/main/java/rx/schedulers/Schedulers.java | 15 ++++---- 5 files changed, 81 insertions(+), 24 deletions(-) rename src/main/java/rx/{ => internal}/schedulers/CachedThreadScheduler.java (98%) create mode 100644 src/main/java/rx/internal/schedulers/NewThreadScheduler.java diff --git a/src/main/java/rx/schedulers/CachedThreadScheduler.java b/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java similarity index 98% rename from src/main/java/rx/schedulers/CachedThreadScheduler.java rename to src/main/java/rx/internal/schedulers/CachedThreadScheduler.java index 31c6f9288f..8e131e58e5 100644 --- a/src/main/java/rx/schedulers/CachedThreadScheduler.java +++ b/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java @@ -13,18 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.schedulers; +package rx.internal.schedulers; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import rx.*; import rx.functions.Action0; -import rx.internal.schedulers.*; import rx.internal.util.RxThreadFactory; import rx.subscriptions.*; -/* package */final class CachedThreadScheduler extends Scheduler implements SchedulerLifecycle { +public final class CachedThreadScheduler extends Scheduler implements SchedulerLifecycle { private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-"; static final RxThreadFactory WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX); @@ -234,4 +233,4 @@ public void setExpirationTime(long expirationTime) { this.expirationTime = expirationTime; } } -} \ No newline at end of file +} diff --git a/src/main/java/rx/internal/schedulers/NewThreadScheduler.java b/src/main/java/rx/internal/schedulers/NewThreadScheduler.java new file mode 100644 index 0000000000..19aaedf8db --- /dev/null +++ b/src/main/java/rx/internal/schedulers/NewThreadScheduler.java @@ -0,0 +1,36 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.schedulers; + +import rx.Scheduler; +import rx.internal.util.RxThreadFactory; + +/** + * Schedules work on a new thread. + */ +public final class NewThreadScheduler extends Scheduler { + + private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler-"; + private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX); + + public NewThreadScheduler() { + } + + @Override + public Worker createWorker() { + return new NewThreadWorker(THREAD_FACTORY); + } +} diff --git a/src/main/java/rx/plugins/RxJavaSchedulersHook.java b/src/main/java/rx/plugins/RxJavaSchedulersHook.java index 133cdc363a..38d09dede9 100644 --- a/src/main/java/rx/plugins/RxJavaSchedulersHook.java +++ b/src/main/java/rx/plugins/RxJavaSchedulersHook.java @@ -17,7 +17,12 @@ package rx.plugins; import rx.Scheduler; +import rx.annotations.Experimental; import rx.functions.Action0; +import rx.internal.schedulers.CachedThreadScheduler; +import rx.internal.schedulers.EventLoopsScheduler; +import rx.internal.schedulers.NewThreadScheduler; +import rx.schedulers.Schedulers; /** * This plugin class provides 2 ways to customize {@link Scheduler} functionality @@ -35,6 +40,30 @@ */ public class RxJavaSchedulersHook { + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}. + */ + @Experimental + public static Scheduler createComputationScheduler() { + return new EventLoopsScheduler(); + } + + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}. + */ + @Experimental + public static Scheduler createIoScheduler() { + return new CachedThreadScheduler(); + } + + /** + * Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()}. + */ + @Experimental + public static Scheduler createNewThreadScheduler() { + return new NewThreadScheduler(); + } + protected RxJavaSchedulersHook() { } diff --git a/src/main/java/rx/schedulers/NewThreadScheduler.java b/src/main/java/rx/schedulers/NewThreadScheduler.java index d8c87bd245..5dc6046268 100644 --- a/src/main/java/rx/schedulers/NewThreadScheduler.java +++ b/src/main/java/rx/schedulers/NewThreadScheduler.java @@ -16,28 +16,18 @@ package rx.schedulers; import rx.Scheduler; -import rx.internal.schedulers.NewThreadWorker; -import rx.internal.util.RxThreadFactory; /** - * Schedules work on a new thread. + * @deprecated This type was never publicly instantiable. Use {@link Schedulers#newThread()}. */ +@Deprecated public final class NewThreadScheduler extends Scheduler { - - private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler-"; - private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX); - private static final NewThreadScheduler INSTANCE = new NewThreadScheduler(); - - /* package */static NewThreadScheduler instance() { - return INSTANCE; - } - private NewThreadScheduler() { - + throw new AssertionError(); } @Override public Worker createWorker() { - return new NewThreadWorker(THREAD_FACTORY); + return null; } } diff --git a/src/main/java/rx/schedulers/Schedulers.java b/src/main/java/rx/schedulers/Schedulers.java index 0ec2d3a273..f81235347c 100644 --- a/src/main/java/rx/schedulers/Schedulers.java +++ b/src/main/java/rx/schedulers/Schedulers.java @@ -19,6 +19,7 @@ import rx.internal.schedulers.*; import rx.internal.util.RxRingBuffer; import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaSchedulersHook; import java.util.concurrent.Executor; @@ -34,25 +35,27 @@ public final class Schedulers { private static final Schedulers INSTANCE = new Schedulers(); private Schedulers() { - Scheduler c = RxJavaPlugins.getInstance().getSchedulersHook().getComputationScheduler(); + RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook(); + + Scheduler c = hook.getComputationScheduler(); if (c != null) { computationScheduler = c; } else { - computationScheduler = new EventLoopsScheduler(); + computationScheduler = RxJavaSchedulersHook.createComputationScheduler(); } - Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler(); + Scheduler io = hook.getIOScheduler(); if (io != null) { ioScheduler = io; } else { - ioScheduler = new CachedThreadScheduler(); + ioScheduler = RxJavaSchedulersHook.createIoScheduler(); } - Scheduler nt = RxJavaPlugins.getInstance().getSchedulersHook().getNewThreadScheduler(); + Scheduler nt = hook.getNewThreadScheduler(); if (nt != null) { newThreadScheduler = nt; } else { - newThreadScheduler = NewThreadScheduler.instance(); + newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler(); } } From 6efc2cfa863ddf45f336249bf554d88ed8fa7f2d Mon Sep 17 00:00:00 2001 From: Artem Zinnatullin Date: Tue, 19 Apr 2016 09:18:23 +0300 Subject: [PATCH 005/273] 1.x: Add Single.toCompletable() (#3866) Closes #3865. --- src/main/java/rx/Single.java | 25 +++++++++++++++++++++++++ src/test/java/rx/SingleTest.java | 23 +++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index b1e2ed4074..e954612b4f 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -2095,6 +2095,31 @@ public final Observable toObservable() { return asObservable(this); } + /** + * Returns a {@link Completable} that discards result of the {@link Single} (similar to + * {@link Observable#ignoreElements()}) and calls {@code onCompleted} when this source {@link Single} calls + * {@code onSuccess}. Error terminal event is propagated. + *

+ * + *

+ *
Scheduler:
+ *
{@code toCompletable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return a {@link Completable} that calls {@code onCompleted} on it's subscriber when the source {@link Single} + * calls {@code onSuccess}. + * @see ReactiveX documentation: + * Completable. + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical + * with the release number). + */ + @Experimental + public final Completable toCompletable() { + return Completable.fromSingle(this); + } + /** * Returns a Single that mirrors the source Single but applies a timeout policy for its emitted item. If it * is not emitted within the specified timeout duration, the resulting Single terminates and notifies diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 2952e22bfd..3760330eb4 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -812,6 +812,29 @@ public void testToObservable() { ts.assertCompleted(); } + @Test + public void toCompletableSuccess() { + Completable completable = Single.just("value").toCompletable(); + TestSubscriber testSubscriber = new TestSubscriber(); + completable.subscribe(testSubscriber); + + testSubscriber.assertCompleted(); + testSubscriber.assertNoValues(); + testSubscriber.assertNoErrors(); + } + + @Test + public void toCompletableError() { + TestException exception = new TestException(); + Completable completable = Single.error(exception).toCompletable(); + TestSubscriber testSubscriber = new TestSubscriber(); + completable.subscribe(testSubscriber); + + testSubscriber.assertError(exception); + testSubscriber.assertNoValues(); + testSubscriber.assertNotCompleted(); + } + @Test public void doOnErrorShouldNotCallActionIfNoErrorHasOccurred() { Action1 action = mock(Action1.class); From 8af2bc96808267b3e3d07021ab4e95a8467a05b9 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 20 Apr 2016 10:03:04 -0700 Subject: [PATCH 006/273] Fix an unsubscribe race in EventLoopWorker (#3868) There is an unsubscribe race condition similar to #3842 in `CachedThreadScheduler.EventLoopWorker` and `EventLoopsScheduler.EventLoopWorker`. Image the following execution order: | Execution Order | thread 1 | thread 2 | | ------------- | ------------- | ------------- | | 1 | | submit task A | | 2 | | submit task B | | 3 | unsubscribe Worker | | | 4 | unsubscribe task A | | | 5 | | task A won't run as it's unsubscribed | | 6 | | run task B | | 7 | unsubscribe task B | | So task B will run but its previous task A will be skipped. This PR adds a check before running an action and moves `workerUnderConcurrentUnsubscribeShouldNotAllowLaterTasksToRunDueToUnsubscriptionRace` to `AbstractSchedulerConcurrencyTests` to test all concurrent schedulers. --- .../schedulers/CachedThreadScheduler.java | 12 +++++-- .../schedulers/EventLoopsScheduler.java | 27 +++++++++++++--- .../AbstractSchedulerConcurrencyTests.java | 31 +++++++++++++++++++ .../rx/schedulers/ExecutorSchedulerTest.java | 30 ------------------ 4 files changed, 63 insertions(+), 37 deletions(-) diff --git a/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java b/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java index 8e131e58e5..472ade9109 100644 --- a/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java +++ b/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java @@ -204,13 +204,21 @@ public Subscription schedule(Action0 action) { } @Override - public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { + public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { if (innerSubscription.isUnsubscribed()) { // don't schedule, we are unsubscribed return Subscriptions.unsubscribed(); } - ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit); + ScheduledAction s = threadWorker.scheduleActual(new Action0() { + @Override + public void call() { + if (isUnsubscribed()) { + return; + } + action.call(); + } + }, delayTime, unit); innerSubscription.add(s); s.addParent(innerSubscription); return s; diff --git a/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java b/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java index afcf7464ed..1eef164937 100644 --- a/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java +++ b/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java @@ -156,20 +156,37 @@ public boolean isUnsubscribed() { } @Override - public Subscription schedule(Action0 action) { + public Subscription schedule(final Action0 action) { if (isUnsubscribed()) { return Subscriptions.unsubscribed(); } - return poolWorker.scheduleActual(action, 0, null, serial); + return poolWorker.scheduleActual(new Action0() { + @Override + public void call() { + if (isUnsubscribed()) { + return; + } + action.call(); + } + }, 0, null, serial); } + @Override - public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { + public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { if (isUnsubscribed()) { return Subscriptions.unsubscribed(); } - return poolWorker.scheduleActual(action, delayTime, unit, timed); + return poolWorker.scheduleActual(new Action0() { + @Override + public void call() { + if (isUnsubscribed()) { + return; + } + action.call(); + } + }, delayTime, unit, timed); } } @@ -178,4 +195,4 @@ static final class PoolWorker extends NewThreadWorker { super(threadFactory); } } -} \ No newline at end of file +} diff --git a/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java b/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java index 2eab100310..9e3fdf94d3 100644 --- a/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java +++ b/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java @@ -20,6 +20,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -423,4 +425,33 @@ public void call(Integer t) { assertEquals(5, count.get()); } + @Test + public void workerUnderConcurrentUnsubscribeShouldNotAllowLaterTasksToRunDueToUnsubscriptionRace() { + Scheduler scheduler = getScheduler(); + for (int i = 0; i < 1000; i++) { + Worker worker = scheduler.createWorker(); + final Queue q = new ConcurrentLinkedQueue(); + Action0 action1 = new Action0() { + + @Override + public void call() { + q.add(1); + } + }; + Action0 action2 = new Action0() { + + @Override + public void call() { + q.add(2); + } + }; + worker.schedule(action1); + worker.schedule(action2); + worker.unsubscribe(); + if (q.size() == 1 && q.poll() == 2) { + //expect a queue of 1,2 or 1. If queue is just 2 then we have a problem! + fail("wrong order on loop " + i); + } + } + } } diff --git a/src/test/java/rx/schedulers/ExecutorSchedulerTest.java b/src/test/java/rx/schedulers/ExecutorSchedulerTest.java index 0777208cab..ed4e03213d 100644 --- a/src/test/java/rx/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/rx/schedulers/ExecutorSchedulerTest.java @@ -18,11 +18,9 @@ import static org.junit.Assert.*; import java.lang.management.*; -import java.util.Queue; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Assert; import org.junit.Test; import rx.*; @@ -277,32 +275,4 @@ public void call() { assertFalse(w.tasks.hasSubscriptions()); } - - @Test - public void workerUnderConcurrentUnsubscribeShouldNotAllowLaterTasksToRunDueToUnsubscriptionRace() { - Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(1)); - for (int i = 0; i< 1000; i++) { - Worker worker = scheduler.createWorker(); - final Queue q = new ConcurrentLinkedQueue(); - Action0 action1 = new Action0() { - - @Override - public void call() { - q.add(1); - }}; - Action0 action2 = new Action0() { - - @Override - public void call() { - q.add(2); - }}; - worker.schedule(action1); - worker.schedule(action2); - worker.unsubscribe(); - if (q.size()==1 && q.poll() == 2) { - //expect a queue of 1,2 or 1. If queue is just 2 then we have a problem! - Assert.fail("wrong order on loop " + i); - } - } - } } From 3439dd8f73affa9b80277e72777ec3f15baafcfc Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Thu, 21 Apr 2016 03:15:37 +1000 Subject: [PATCH 007/273] ensure waiting tasks are cancelled on worker unsubscription (#3867) --- src/main/java/rx/schedulers/ExecutorScheduler.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/rx/schedulers/ExecutorScheduler.java b/src/main/java/rx/schedulers/ExecutorScheduler.java index 70f217e49b..11d10214f9 100644 --- a/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -100,14 +100,17 @@ public void run() { queue.clear(); return; } - ScheduledAction sa = queue.poll(); if (sa == null) { return; } - if (!sa.isUnsubscribed()) { - sa.run(); + if (!tasks.isUnsubscribed()) { + sa.run(); + } else { + queue.clear(); + return; + } } } while (wip.decrementAndGet() != 0); } From 95389c260fe93972421bfe8092bceb283d0db346 Mon Sep 17 00:00:00 2001 From: Jake Wharton Date: Thu, 21 Apr 2016 03:06:35 -0400 Subject: [PATCH 008/273] Deprecate remaining public scheduler types. (#3871) --- .../internal/operators/OperatorObserveOn.java | 2 +- .../schedulers/ExecutorScheduler.java | 5 +- .../GenericScheduledExecutorService.java | 1 - .../schedulers/ImmediateScheduler.java | 73 ++++++++++ .../schedulers/SleepingAction.java | 2 +- .../schedulers/TrampolineScheduler.java | 131 ++++++++++++++++++ .../rx/schedulers/ImmediateScheduler.java | 55 +------- .../rx/schedulers/NewThreadScheduler.java | 1 + src/main/java/rx/schedulers/Schedulers.java | 14 +- .../rx/schedulers/TrampolineScheduler.java | 113 +-------------- .../schedulers/ExecutorSchedulerTest.java | 81 +---------- .../schedulers/ComputationSchedulerTests.java | 4 +- ...chedulerTest.java => IoSchedulerTest.java} | 8 +- .../java/rx/schedulers/SchedulerTests.java | 78 ++++++++++- 14 files changed, 316 insertions(+), 252 deletions(-) rename src/main/java/rx/{ => internal}/schedulers/ExecutorScheduler.java (98%) create mode 100644 src/main/java/rx/internal/schedulers/ImmediateScheduler.java rename src/main/java/rx/{ => internal}/schedulers/SleepingAction.java (98%) create mode 100644 src/main/java/rx/internal/schedulers/TrampolineScheduler.java rename src/test/java/rx/{ => internal}/schedulers/ExecutorSchedulerTest.java (70%) rename src/test/java/rx/schedulers/{CachedThreadSchedulerTest.java => IoSchedulerTest.java} (91%) diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index 1720e1dfe2..f09a424020 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -22,11 +22,11 @@ import rx.Observable.Operator; import rx.exceptions.MissingBackpressureException; import rx.functions.Action0; +import rx.internal.schedulers.*; import rx.internal.util.*; import rx.internal.util.atomic.SpscAtomicArrayQueue; import rx.internal.util.unsafe.*; import rx.plugins.RxJavaPlugins; -import rx.schedulers.*; /** * Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer. diff --git a/src/main/java/rx/schedulers/ExecutorScheduler.java b/src/main/java/rx/internal/schedulers/ExecutorScheduler.java similarity index 98% rename from src/main/java/rx/schedulers/ExecutorScheduler.java rename to src/main/java/rx/internal/schedulers/ExecutorScheduler.java index 11d10214f9..b40c0b4900 100644 --- a/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/src/main/java/rx/internal/schedulers/ExecutorScheduler.java @@ -13,14 +13,13 @@ * License for the specific language governing permissions and limitations under * the License. */ -package rx.schedulers; +package rx.internal.schedulers; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import rx.*; import rx.functions.Action0; -import rx.internal.schedulers.*; import rx.plugins.RxJavaPlugins; import rx.subscriptions.*; @@ -30,7 +29,7 @@ * Note that thread-hopping is unavoidable with this kind of Scheduler as we don't know about the underlying * threading behavior of the executor. */ -/* public */final class ExecutorScheduler extends Scheduler { +public final class ExecutorScheduler extends Scheduler { final Executor executor; public ExecutorScheduler(Executor executor) { this.executor = executor; diff --git a/src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java b/src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java index e322945068..3bc60f076b 100644 --- a/src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java +++ b/src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java @@ -20,7 +20,6 @@ import rx.Scheduler; import rx.internal.util.RxThreadFactory; -import rx.schedulers.*; /** * A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability. diff --git a/src/main/java/rx/internal/schedulers/ImmediateScheduler.java b/src/main/java/rx/internal/schedulers/ImmediateScheduler.java new file mode 100644 index 0000000000..c3dd7a6f85 --- /dev/null +++ b/src/main/java/rx/internal/schedulers/ImmediateScheduler.java @@ -0,0 +1,73 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.schedulers; + +import java.util.concurrent.TimeUnit; + +import rx.Scheduler; +import rx.Subscription; +import rx.functions.Action0; +import rx.subscriptions.BooleanSubscription; +import rx.subscriptions.Subscriptions; + +/** + * Executes work immediately on the current thread. + */ +public final class ImmediateScheduler extends Scheduler { + public static final ImmediateScheduler INSTANCE = new ImmediateScheduler(); + + private ImmediateScheduler() { + } + + @Override + public Worker createWorker() { + return new InnerImmediateScheduler(); + } + + private class InnerImmediateScheduler extends Scheduler.Worker implements Subscription { + + final BooleanSubscription innerSubscription = new BooleanSubscription(); + + InnerImmediateScheduler() { + } + + @Override + public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { + // since we are executing immediately on this thread we must cause this thread to sleep + long execTime = ImmediateScheduler.this.now() + unit.toMillis(delayTime); + + return schedule(new SleepingAction(action, this, execTime)); + } + + @Override + public Subscription schedule(Action0 action) { + action.call(); + return Subscriptions.unsubscribed(); + } + + @Override + public void unsubscribe() { + innerSubscription.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return innerSubscription.isUnsubscribed(); + } + + } + +} diff --git a/src/main/java/rx/schedulers/SleepingAction.java b/src/main/java/rx/internal/schedulers/SleepingAction.java similarity index 98% rename from src/main/java/rx/schedulers/SleepingAction.java rename to src/main/java/rx/internal/schedulers/SleepingAction.java index bb13734475..c41c01caf0 100644 --- a/src/main/java/rx/schedulers/SleepingAction.java +++ b/src/main/java/rx/internal/schedulers/SleepingAction.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.schedulers; +package rx.internal.schedulers; import rx.Scheduler; import rx.functions.Action0; diff --git a/src/main/java/rx/internal/schedulers/TrampolineScheduler.java b/src/main/java/rx/internal/schedulers/TrampolineScheduler.java new file mode 100644 index 0000000000..94fea1964a --- /dev/null +++ b/src/main/java/rx/internal/schedulers/TrampolineScheduler.java @@ -0,0 +1,131 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.schedulers; + +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import rx.Scheduler; +import rx.Subscription; +import rx.functions.Action0; +import rx.subscriptions.BooleanSubscription; +import rx.subscriptions.Subscriptions; + +/** + * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed + * after the current unit of work is completed. + */ +public final class TrampolineScheduler extends Scheduler { + public static final TrampolineScheduler INSTANCE = new TrampolineScheduler(); + + @Override + public Worker createWorker() { + return new InnerCurrentThreadScheduler(); + } + + private TrampolineScheduler() { + } + + private static class InnerCurrentThreadScheduler extends Scheduler.Worker implements Subscription { + + final AtomicInteger counter = new AtomicInteger(); + final PriorityBlockingQueue queue = new PriorityBlockingQueue(); + private final BooleanSubscription innerSubscription = new BooleanSubscription(); + private final AtomicInteger wip = new AtomicInteger(); + + InnerCurrentThreadScheduler() { + } + + @Override + public Subscription schedule(Action0 action) { + return enqueue(action, now()); + } + + @Override + public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { + long execTime = now() + unit.toMillis(delayTime); + + return enqueue(new SleepingAction(action, this, execTime), execTime); + } + + private Subscription enqueue(Action0 action, long execTime) { + if (innerSubscription.isUnsubscribed()) { + return Subscriptions.unsubscribed(); + } + final TimedAction timedAction = new TimedAction(action, execTime, counter.incrementAndGet()); + queue.add(timedAction); + + if (wip.getAndIncrement() == 0) { + do { + final TimedAction polled = queue.poll(); + if (polled != null) { + polled.action.call(); + } + } while (wip.decrementAndGet() > 0); + return Subscriptions.unsubscribed(); + } else { + // queue wasn't empty, a parent is already processing so we just add to the end of the queue + return Subscriptions.create(new Action0() { + + @Override + public void call() { + queue.remove(timedAction); + } + + }); + } + } + + @Override + public void unsubscribe() { + innerSubscription.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return innerSubscription.isUnsubscribed(); + } + + } + + private static final class TimedAction implements Comparable { + final Action0 action; + final Long execTime; + final int count; // In case if time between enqueueing took less than 1ms + + TimedAction(Action0 action, Long execTime, int count) { + this.action = action; + this.execTime = execTime; + this.count = count; + } + + @Override + public int compareTo(TimedAction that) { + int result = execTime.compareTo(that.execTime); + if (result == 0) { + return compare(count, that.count); + } + return result; + } + } + + // because I can't use Integer.compare from Java 7 + static int compare(int x, int y) { + return (x < y) ? -1 : ((x == y) ? 0 : 1); + } + +} diff --git a/src/main/java/rx/schedulers/ImmediateScheduler.java b/src/main/java/rx/schedulers/ImmediateScheduler.java index e480754a58..3be8edda4a 100644 --- a/src/main/java/rx/schedulers/ImmediateScheduler.java +++ b/src/main/java/rx/schedulers/ImmediateScheduler.java @@ -15,63 +15,20 @@ */ package rx.schedulers; -import java.util.concurrent.TimeUnit; - import rx.Scheduler; -import rx.Subscription; -import rx.functions.Action0; -import rx.subscriptions.BooleanSubscription; -import rx.subscriptions.Subscriptions; /** - * Executes work immediately on the current thread. + * @deprecated This type was never publicly instantiable. Use {@link Schedulers#immediate()}. */ +@Deprecated +@SuppressWarnings("unused") // Class was part of public API. public final class ImmediateScheduler extends Scheduler { - private static final ImmediateScheduler INSTANCE = new ImmediateScheduler(); - - /* package */static ImmediateScheduler instance() { - return INSTANCE; - } - - /* package accessible for unit tests */ImmediateScheduler() { + private ImmediateScheduler() { + throw new AssertionError(); } @Override public Worker createWorker() { - return new InnerImmediateScheduler(); + return null; } - - private class InnerImmediateScheduler extends Scheduler.Worker implements Subscription { - - final BooleanSubscription innerSubscription = new BooleanSubscription(); - - InnerImmediateScheduler() { - } - - @Override - public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { - // since we are executing immediately on this thread we must cause this thread to sleep - long execTime = ImmediateScheduler.this.now() + unit.toMillis(delayTime); - - return schedule(new SleepingAction(action, this, execTime)); - } - - @Override - public Subscription schedule(Action0 action) { - action.call(); - return Subscriptions.unsubscribed(); - } - - @Override - public void unsubscribe() { - innerSubscription.unsubscribe(); - } - - @Override - public boolean isUnsubscribed() { - return innerSubscription.isUnsubscribed(); - } - - } - } diff --git a/src/main/java/rx/schedulers/NewThreadScheduler.java b/src/main/java/rx/schedulers/NewThreadScheduler.java index 5dc6046268..192f8b29a8 100644 --- a/src/main/java/rx/schedulers/NewThreadScheduler.java +++ b/src/main/java/rx/schedulers/NewThreadScheduler.java @@ -21,6 +21,7 @@ * @deprecated This type was never publicly instantiable. Use {@link Schedulers#newThread()}. */ @Deprecated +@SuppressWarnings("unused") // Class was part of public API. public final class NewThreadScheduler extends Scheduler { private NewThreadScheduler() { throw new AssertionError(); diff --git a/src/main/java/rx/schedulers/Schedulers.java b/src/main/java/rx/schedulers/Schedulers.java index f81235347c..eae594ef08 100644 --- a/src/main/java/rx/schedulers/Schedulers.java +++ b/src/main/java/rx/schedulers/Schedulers.java @@ -61,21 +61,21 @@ private Schedulers() { /** * Creates and returns a {@link Scheduler} that executes work immediately on the current thread. - * - * @return an {@link ImmediateScheduler} instance + * + * @return a {@link Scheduler} that executes work immediately */ public static Scheduler immediate() { - return ImmediateScheduler.instance(); + return rx.internal.schedulers.ImmediateScheduler.INSTANCE; } /** * Creates and returns a {@link Scheduler} that queues work on the current thread to be executed after the * current work completes. - * - * @return a {@link TrampolineScheduler} instance + * + * @return a {@link Scheduler} that queues work on the current thread */ public static Scheduler trampoline() { - return TrampolineScheduler.instance(); + return rx.internal.schedulers.TrampolineScheduler.INSTANCE; } /** @@ -83,7 +83,7 @@ public static Scheduler trampoline() { *

* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}. * - * @return a {@link NewThreadScheduler} instance + * @return a {@link Scheduler} that creates new threads */ public static Scheduler newThread() { return INSTANCE.newThreadScheduler; diff --git a/src/main/java/rx/schedulers/TrampolineScheduler.java b/src/main/java/rx/schedulers/TrampolineScheduler.java index 45bb18546c..5f708cdc23 100644 --- a/src/main/java/rx/schedulers/TrampolineScheduler.java +++ b/src/main/java/rx/schedulers/TrampolineScheduler.java @@ -15,121 +15,20 @@ */ package rx.schedulers; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - import rx.Scheduler; -import rx.Subscription; -import rx.functions.Action0; -import rx.subscriptions.BooleanSubscription; -import rx.subscriptions.Subscriptions; /** - * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed - * after the current unit of work is completed. + * @deprecated This type was never publicly instantiable. Use {@link Schedulers#trampoline()}. */ +@Deprecated +@SuppressWarnings("unused") // Class was part of public API. public final class TrampolineScheduler extends Scheduler { - private static final TrampolineScheduler INSTANCE = new TrampolineScheduler(); - - /* package */static TrampolineScheduler instance() { - return INSTANCE; + private TrampolineScheduler() { + throw new AssertionError(); } @Override public Worker createWorker() { - return new InnerCurrentThreadScheduler(); - } - - /* package accessible for unit tests */TrampolineScheduler() { + return null; } - - private static class InnerCurrentThreadScheduler extends Scheduler.Worker implements Subscription { - - final AtomicInteger counter = new AtomicInteger(); - final PriorityBlockingQueue queue = new PriorityBlockingQueue(); - private final BooleanSubscription innerSubscription = new BooleanSubscription(); - private final AtomicInteger wip = new AtomicInteger(); - - InnerCurrentThreadScheduler() { - } - - @Override - public Subscription schedule(Action0 action) { - return enqueue(action, now()); - } - - @Override - public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { - long execTime = now() + unit.toMillis(delayTime); - - return enqueue(new SleepingAction(action, this, execTime), execTime); - } - - private Subscription enqueue(Action0 action, long execTime) { - if (innerSubscription.isUnsubscribed()) { - return Subscriptions.unsubscribed(); - } - final TimedAction timedAction = new TimedAction(action, execTime, counter.incrementAndGet()); - queue.add(timedAction); - - if (wip.getAndIncrement() == 0) { - do { - final TimedAction polled = queue.poll(); - if (polled != null) { - polled.action.call(); - } - } while (wip.decrementAndGet() > 0); - return Subscriptions.unsubscribed(); - } else { - // queue wasn't empty, a parent is already processing so we just add to the end of the queue - return Subscriptions.create(new Action0() { - - @Override - public void call() { - queue.remove(timedAction); - } - - }); - } - } - - @Override - public void unsubscribe() { - innerSubscription.unsubscribe(); - } - - @Override - public boolean isUnsubscribed() { - return innerSubscription.isUnsubscribed(); - } - - } - - private static final class TimedAction implements Comparable { - final Action0 action; - final Long execTime; - final int count; // In case if time between enqueueing took less than 1ms - - TimedAction(Action0 action, Long execTime, int count) { - this.action = action; - this.execTime = execTime; - this.count = count; - } - - @Override - public int compareTo(TimedAction that) { - int result = execTime.compareTo(that.execTime); - if (result == 0) { - return compare(count, that.count); - } - return result; - } - } - - // because I can't use Integer.compare from Java 7 - static int compare(int x, int y) { - return (x < y) ? -1 : ((x == y) ? 0 : 1); - } - } diff --git a/src/test/java/rx/schedulers/ExecutorSchedulerTest.java b/src/test/java/rx/internal/schedulers/ExecutorSchedulerTest.java similarity index 70% rename from src/test/java/rx/schedulers/ExecutorSchedulerTest.java rename to src/test/java/rx/internal/schedulers/ExecutorSchedulerTest.java index ed4e03213d..fad2435ce1 100644 --- a/src/test/java/rx/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/rx/internal/schedulers/ExecutorSchedulerTest.java @@ -13,22 +13,21 @@ * License for the specific language governing permissions and limitations under * the License. */ -package rx.schedulers; +package rx.internal.schedulers; import static org.junit.Assert.*; -import java.lang.management.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; - import org.junit.Test; - import rx.*; import rx.Scheduler.Worker; import rx.functions.*; -import rx.internal.schedulers.NewThreadWorker; +import rx.internal.schedulers.ExecutorScheduler.ExecutorSchedulerWorker; import rx.internal.util.RxThreadFactory; -import rx.schedulers.ExecutorScheduler.ExecutorSchedulerWorker; +import rx.schedulers.AbstractSchedulerConcurrencyTests; +import rx.schedulers.SchedulerTests; +import rx.schedulers.Schedulers; public class ExecutorSchedulerTest extends AbstractSchedulerConcurrencyTests { @@ -49,72 +48,6 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler()); } - public static void testCancelledRetention(Scheduler.Worker w, boolean periodic) throws InterruptedException { - System.out.println("Wait before GC"); - Thread.sleep(1000); - - System.out.println("GC"); - System.gc(); - - Thread.sleep(1000); - - - MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); - MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); - long initial = memHeap.getUsed(); - - System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0); - - int n = 500 * 1000; - if (periodic) { - final CountDownLatch cdl = new CountDownLatch(n); - final Action0 action = new Action0() { - @Override - public void call() { - cdl.countDown(); - } - }; - for (int i = 0; i < n; i++) { - if (i % 50000 == 0) { - System.out.println(" -> still scheduling: " + i); - } - w.schedulePeriodically(action, 0, 1, TimeUnit.DAYS); - } - - System.out.println("Waiting for the first round to finish..."); - cdl.await(); - } else { - for (int i = 0; i < n; i++) { - if (i % 50000 == 0) { - System.out.println(" -> still scheduling: " + i); - } - w.schedule(Actions.empty(), 1, TimeUnit.DAYS); - } - } - - memHeap = memoryMXBean.getHeapMemoryUsage(); - long after = memHeap.getUsed(); - System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0); - - w.unsubscribe(); - - System.out.println("Wait before second GC"); - Thread.sleep(NewThreadWorker.PURGE_FREQUENCY + 2000); - - System.out.println("Second GC"); - System.gc(); - - Thread.sleep(1000); - - memHeap = memoryMXBean.getHeapMemoryUsage(); - long finish = memHeap.getUsed(); - System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); - - if (finish > initial * 5) { - fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d)); - } - } - @Test(timeout = 30000) public void testCancelledTaskRetention() throws InterruptedException { ExecutorService exec = Executors.newSingleThreadExecutor(); @@ -122,14 +55,14 @@ public void testCancelledTaskRetention() throws InterruptedException { try { Scheduler.Worker w = s.createWorker(); try { - testCancelledRetention(w, false); + SchedulerTests.testCancelledRetention(w, false); } finally { w.unsubscribe(); } w = s.createWorker(); try { - testCancelledRetention(w, true); + SchedulerTests.testCancelledRetention(w, true); } finally { w.unsubscribe(); } diff --git a/src/test/java/rx/schedulers/ComputationSchedulerTests.java b/src/test/java/rx/schedulers/ComputationSchedulerTests.java index 7191f60015..7e9163e7c0 100644 --- a/src/test/java/rx/schedulers/ComputationSchedulerTests.java +++ b/src/test/java/rx/schedulers/ComputationSchedulerTests.java @@ -157,13 +157,13 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru public void testCancelledTaskRetention() throws InterruptedException { Worker w = Schedulers.computation().createWorker(); try { - ExecutorSchedulerTest.testCancelledRetention(w, false); + SchedulerTests.testCancelledRetention(w, false); } finally { w.unsubscribe(); } w = Schedulers.computation().createWorker(); try { - ExecutorSchedulerTest.testCancelledRetention(w, true); + SchedulerTests.testCancelledRetention(w, true); } finally { w.unsubscribe(); } diff --git a/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java b/src/test/java/rx/schedulers/IoSchedulerTest.java similarity index 91% rename from src/test/java/rx/schedulers/CachedThreadSchedulerTest.java rename to src/test/java/rx/schedulers/IoSchedulerTest.java index 9abb52b7ec..a16a19a61c 100644 --- a/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java +++ b/src/test/java/rx/schedulers/IoSchedulerTest.java @@ -24,7 +24,7 @@ import rx.Scheduler.Worker; import rx.functions.*; -public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { +public class IoSchedulerTest extends AbstractSchedulerConcurrencyTests { @Override protected Scheduler getScheduler() { @@ -71,16 +71,16 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru public void testCancelledTaskRetention() throws InterruptedException { Worker w = Schedulers.io().createWorker(); try { - ExecutorSchedulerTest.testCancelledRetention(w, false); + SchedulerTests.testCancelledRetention(w, false); } finally { w.unsubscribe(); } w = Schedulers.io().createWorker(); try { - ExecutorSchedulerTest.testCancelledRetention(w, true); + SchedulerTests.testCancelledRetention(w, true); } finally { w.unsubscribe(); } } -} \ No newline at end of file +} diff --git a/src/test/java/rx/schedulers/SchedulerTests.java b/src/test/java/rx/schedulers/SchedulerTests.java index a9146fafde..cd1d0a1912 100644 --- a/src/test/java/rx/schedulers/SchedulerTests.java +++ b/src/test/java/rx/schedulers/SchedulerTests.java @@ -4,14 +4,20 @@ import rx.Observable; import rx.Observer; import rx.Scheduler; +import rx.functions.Action0; +import rx.functions.Actions; +import rx.internal.schedulers.NewThreadWorker; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -final class SchedulerTests { +public final class SchedulerTests { private SchedulerTests() { // No instances. } @@ -23,7 +29,7 @@ private SchedulerTests() { * Schedulers which execute on a separate thread from their calling thread should exhibit this behavior. Schedulers * which execute on their calling thread may not. */ - static void testUnhandledErrorIsDeliveredToThreadHandler(Scheduler scheduler) throws InterruptedException { + public static void testUnhandledErrorIsDeliveredToThreadHandler(Scheduler scheduler) throws InterruptedException { Thread.UncaughtExceptionHandler originalHandler = Thread.getDefaultUncaughtExceptionHandler(); try { CapturingUncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler(); @@ -57,7 +63,7 @@ static void testUnhandledErrorIsDeliveredToThreadHandler(Scheduler scheduler) th * This is a companion test to {@link #testUnhandledErrorIsDeliveredToThreadHandler}, and is needed only for the * same Schedulers. */ - static void testHandledErrorIsNotDeliveredToThreadHandler(Scheduler scheduler) throws InterruptedException { + public static void testHandledErrorIsNotDeliveredToThreadHandler(Scheduler scheduler) throws InterruptedException { Thread.UncaughtExceptionHandler originalHandler = Thread.getDefaultUncaughtExceptionHandler(); try { CapturingUncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler(); @@ -88,6 +94,72 @@ static void testHandledErrorIsNotDeliveredToThreadHandler(Scheduler scheduler) t } } + public static void testCancelledRetention(Scheduler.Worker w, boolean periodic) throws InterruptedException { + System.out.println("Wait before GC"); + Thread.sleep(1000); + + System.out.println("GC"); + System.gc(); + + Thread.sleep(1000); + + + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + long initial = memHeap.getUsed(); + + System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0); + + int n = 500 * 1000; + if (periodic) { + final CountDownLatch cdl = new CountDownLatch(n); + final Action0 action = new Action0() { + @Override + public void call() { + cdl.countDown(); + } + }; + for (int i = 0; i < n; i++) { + if (i % 50000 == 0) { + System.out.println(" -> still scheduling: " + i); + } + w.schedulePeriodically(action, 0, 1, TimeUnit.DAYS); + } + + System.out.println("Waiting for the first round to finish..."); + cdl.await(); + } else { + for (int i = 0; i < n; i++) { + if (i % 50000 == 0) { + System.out.println(" -> still scheduling: " + i); + } + w.schedule(Actions.empty(), 1, TimeUnit.DAYS); + } + } + + memHeap = memoryMXBean.getHeapMemoryUsage(); + long after = memHeap.getUsed(); + System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0); + + w.unsubscribe(); + + System.out.println("Wait before second GC"); + Thread.sleep(NewThreadWorker.PURGE_FREQUENCY + 2000); + + System.out.println("Second GC"); + System.gc(); + + Thread.sleep(1000); + + memHeap = memoryMXBean.getHeapMemoryUsage(); + long finish = memHeap.getUsed(); + System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); + + if (finish > initial * 5) { + fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d)); + } + } + private static final class CapturingObserver implements Observer { CountDownLatch completed = new CountDownLatch(1); int errorCount = 0; From 3f6c4fd164801fd994034f99e663dc17163617c9 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Thu, 21 Apr 2016 21:58:08 +0200 Subject: [PATCH 009/273] 1.x: fix from(Iterable) error handling of Iterable/Iterator (#3862) * 1.x: fix from(Iterable) error handling of Iterable/Iterator * Check dead-on-arrival Subscribers * Use n again to avoid a potential cache-miss with get() --- .../operators/OnSubscribeFromIterable.java | 130 +++++++--- .../OnSubscribeFromIterableTest.java | 244 +++++++++++++++++- 2 files changed, 325 insertions(+), 49 deletions(-) diff --git a/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java b/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java index b94e35c35c..9389c4480c 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java +++ b/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java @@ -20,6 +20,7 @@ import rx.*; import rx.Observable.OnSubscribe; +import rx.exceptions.Exceptions; /** * Converts an {@code Iterable} sequence into an {@code Observable}. @@ -42,11 +43,25 @@ public OnSubscribeFromIterable(Iterable iterable) { @Override public void call(final Subscriber o) { - final Iterator it = is.iterator(); - if (!it.hasNext() && !o.isUnsubscribed()) - o.onCompleted(); - else - o.setProducer(new IterableProducer(o, it)); + final Iterator it; + boolean b; + + try { + it = is.iterator(); + + b = it.hasNext(); + } catch (Throwable ex) { + Exceptions.throwOrReport(ex, o); + return; + } + + if (!o.isUnsubscribed()) { + if (!b) { + o.onCompleted(); + } else { + o.setProducer(new IterableProducer(o, it)); + } + } } private static final class IterableProducer extends AtomicLong implements Producer { @@ -81,38 +96,58 @@ void slowpath(long n) { final Iterator it = this.it; long r = n; - while (true) { - /* - * This complicated logic is done to avoid touching the - * volatile `requested` value during the loop itself. If - * it is touched during the loop the performance is - * impacted significantly. - */ - long numToEmit = r; - while (true) { + long e = 0; + + for (;;) { + while (e != r) { if (o.isUnsubscribed()) { return; - } else if (it.hasNext()) { - if (--numToEmit >= 0) { - o.onNext(it.next()); - } else - break; - } else if (!o.isUnsubscribed()) { - o.onCompleted(); + } + + T value; + + try { + value = it.next(); + } catch (Throwable ex) { + Exceptions.throwOrReport(ex, o); return; - } else { - // is unsubscribed + } + + o.onNext(value); + + if (o.isUnsubscribed()) { return; } + + boolean b; + + try { + b = it.hasNext(); + } catch (Throwable ex) { + Exceptions.throwOrReport(ex, o); + return; + } + + if (!b) { + if (!o.isUnsubscribed()) { + o.onCompleted(); + } + return; + } + + e++; } - r = addAndGet(-r); - if (r == 0L) { - // we're done emitting the number requested so - // return - return; + + r = get(); + if (e == r) { + r = BackpressureUtils.produced(this, e); + if (r == 0L) { + break; + } + e = 0L; } - } + } void fastpath() { @@ -120,16 +155,39 @@ void fastpath() { final Subscriber o = this.o; final Iterator it = this.it; - while (true) { + for (;;) { if (o.isUnsubscribed()) { return; - } else if (it.hasNext()) { - o.onNext(it.next()); - } else if (!o.isUnsubscribed()) { - o.onCompleted(); + } + + T value; + + try { + value = it.next(); + } catch (Throwable ex) { + Exceptions.throwOrReport(ex, o); + return; + } + + o.onNext(value); + + if (o.isUnsubscribed()) { return; - } else { - // is unsubscribed + } + + boolean b; + + try { + b = it.hasNext(); + } catch (Throwable ex) { + Exceptions.throwOrReport(ex, o); + return; + } + + if (!b) { + if (!o.isUnsubscribed()) { + o.onCompleted(); + } return; } } diff --git a/src/test/java/rx/internal/operators/OnSubscribeFromIterableTest.java b/src/test/java/rx/internal/operators/OnSubscribeFromIterableTest.java index a75e733951..00956b9cae 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeFromIterableTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeFromIterableTest.java @@ -15,28 +15,21 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.*; + +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; import rx.Observable; import rx.Observer; import rx.Subscriber; +import rx.exceptions.TestException; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -313,5 +306,230 @@ public void onNext(Integer t) { }); assertFalse(called.get()); } + + @Test + public void getIteratorThrows() { + Iterable it = new Iterable() { + @Override + public Iterator iterator() { + throw new TestException("Forced failure"); + } + }; + + TestSubscriber ts = new TestSubscriber(); + + Observable.from(it).unsafeSubscribe(ts); + + ts.assertNoValues(); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test + public void hasNextThrowsImmediately() { + Iterable it = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + throw new TestException("Forced failure"); + } + + @Override + public Integer next() { + return null; + } + + @Override + public void remove() { + // ignored + } + }; + } + }; + + TestSubscriber ts = new TestSubscriber(); + + Observable.from(it).unsafeSubscribe(ts); + + ts.assertNoValues(); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test + public void hasNextThrowsSecondTimeFastpath() { + Iterable it = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int count; + @Override + public boolean hasNext() { + if (++count >= 2) { + throw new TestException("Forced failure"); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + + @Override + public void remove() { + // ignored + } + }; + } + }; + + TestSubscriber ts = new TestSubscriber(); + + Observable.from(it).unsafeSubscribe(ts); + + ts.assertValues(1); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test + public void hasNextThrowsSecondTimeSlowpath() { + Iterable it = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int count; + @Override + public boolean hasNext() { + if (++count >= 2) { + throw new TestException("Forced failure"); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + + @Override + public void remove() { + // ignored + } + }; + } + }; + + TestSubscriber ts = new TestSubscriber(5); + + Observable.from(it).unsafeSubscribe(ts); + + ts.assertValues(1); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + @Test + public void nextThrowsFastpath() { + Iterable it = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + throw new TestException("Forced failure"); + } + + @Override + public void remove() { + // ignored + } + }; + } + }; + + TestSubscriber ts = new TestSubscriber(); + + Observable.from(it).unsafeSubscribe(ts); + + ts.assertNoValues(); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test + public void nextThrowsSlowpath() { + Iterable it = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + throw new TestException("Forced failure"); + } + + @Override + public void remove() { + // ignored + } + }; + } + }; + + TestSubscriber ts = new TestSubscriber(5); + + Observable.from(it).unsafeSubscribe(ts); + + ts.assertNoValues(); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + + @Test + public void deadOnArrival() { + Iterable it = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public Integer next() { + throw new NoSuchElementException(); + } + + @Override + public void remove() { + // ignored + } + }; + } + }; + + TestSubscriber ts = new TestSubscriber(5); + ts.unsubscribe(); + + Observable.from(it).unsafeSubscribe(ts); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + } } From 7b11b1c5e47e22c9f24fc0ee5c3c9e89bb091903 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Sat, 23 Apr 2016 08:45:04 +1000 Subject: [PATCH 010/273] remove unused field baseCapacity (#3874) --- .../rx/internal/operators/OperatorOnBackpressureBuffer.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java index 4f66bbb4d7..04e6e81be9 100644 --- a/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java @@ -110,7 +110,6 @@ public Subscriber call(final Subscriber child) { private static final class BufferSubscriber extends Subscriber implements BackpressureDrainManager.BackpressureQueueCallback { // TODO get a different queue implementation private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); - private final Long baseCapacity; private final AtomicLong capacity; private final Subscriber child; private final AtomicBoolean saturated = new AtomicBoolean(false); @@ -122,7 +121,6 @@ private static final class BufferSubscriber extends Subscriber implements public BufferSubscriber(final Subscriber child, Long capacity, Action0 onOverflow, BackpressureOverflow.Strategy overflowStrategy) { this.child = child; - this.baseCapacity = capacity; this.capacity = capacity != null ? new AtomicLong(capacity) : null; this.onOverflow = onOverflow; this.manager = new BackpressureDrainManager(this); From 9b0887017a09e3c1ee7e9c2df066d8b2da30db48 Mon Sep 17 00:00:00 2001 From: Daniel Lew Date: Fri, 29 Apr 2016 00:56:57 -0500 Subject: [PATCH 011/273] throwIfFatal() now throws OnCompletedFailedException (#3886) Otherwise, if there's an error in onCompleted, the exception is swallowed and unreported. --- src/main/java/rx/exceptions/Exceptions.java | 3 +++ .../java/rx/exceptions/ExceptionsTest.java | 22 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/src/main/java/rx/exceptions/Exceptions.java b/src/main/java/rx/exceptions/Exceptions.java index 6c37167c3e..f427018f53 100644 --- a/src/main/java/rx/exceptions/Exceptions.java +++ b/src/main/java/rx/exceptions/Exceptions.java @@ -61,6 +61,7 @@ public static RuntimeException propagate(Throwable t) { *