/** * Copyright 2014 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package rx; import java.util.concurrent.TimeUnit; import rx.annotations.Experimental; import rx.functions.*; import rx.internal.schedulers.*; import rx.schedulers.Schedulers; /** * A {@code Scheduler} is an object that schedules units of work. You can find common implementations of this * class in {@link Schedulers}. */ public abstract class Scheduler { /* * Why is this an abstract class instead of an interface? * * : Java doesn't support extension methods and there are many overload methods needing default * implementations. * * : Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for * a long time. * * : If only an interface were used Scheduler implementations would then need to extend from an * AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the * functionality. * * : Without virtual extension methods even additive changes are breaking and thus severely impede library * maintenance. */ /** * Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions. *
* When work is completed it should be unsubscribed using {@link Scheduler.Worker#unsubscribe()}. *
* Work on a {@link Scheduler.Worker} is guaranteed to be sequential. * * @return a Worker representing a serial queue of actions to be executed */ public abstract Worker createWorker(); /** * Sequential Scheduler for executing actions on a single thread or event loop. *
* Unsubscribing the {@link Worker} cancels all outstanding work and allows resources cleanup. */ public abstract static class Worker implements Subscription { /** * Schedules an Action for execution. * * @param action * Action to schedule * @return a subscription to be able to prevent or cancel the execution of the action */ public abstract Subscription schedule(Action0 action); /** * Schedules an Action for execution at some point in the future. *
* Note to implementors: non-positive {@code delayTime} should be regarded as non-delayed schedule, i.e., * as if the {@link #schedule(rx.functions.Action0)} was called. * * @param action * the Action to schedule * @param delayTime * time to wait before executing the action; non-positive values indicate an non-delayed * schedule * @param unit * the time unit of {@code delayTime} * @return a subscription to be able to prevent or cancel the execution of the action */ public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit); /** * Schedules a cancelable action to be executed periodically. This default implementation schedules * recursively and waits for actions to complete (instead of potentially executing long-running actions * concurrently). Each scheduler that can do periodic scheduling in a better way should override this. *
* Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as * non-delayed scheduling of the first and any subsequent executions. * * @param action * the Action to execute periodically * @param initialDelay * time to wait before executing the action for the first time; non-positive values indicate * an non-delayed schedule * @param period * the time interval to wait each time in between executing the action; non-positive values * indicate no delay between repeated schedules * @param unit * the time unit of {@code period} * @return a subscription to be able to prevent or cancel the execution of the action */ public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) { return SchedulePeriodicHelper.schedulePeriodically(this, action, initialDelay, period, unit, null); } /** * Gets the current time, in milliseconds, according to this Scheduler. * * @return the scheduler's notion of current absolute time in milliseconds */ public long now() { return System.currentTimeMillis(); } } /** * Gets the current time, in milliseconds, according to this Scheduler. * * @return the scheduler's notion of current absolute time in milliseconds */ public long now() { return System.currentTimeMillis(); } /** * Allows the use of operators for controlling the timing around when * actions scheduled on workers are actually done. This makes it possible to * layer additional behavior on this {@link Scheduler}. The only parameter * is a function that flattens an {@link Observable} of {@link Observable} * of {@link Completable}s into just one {@link Completable}. There must be * a chain of operators connecting the returned value to the source * {@link Observable} otherwise any work scheduled on the returned * {@link Scheduler} will not be executed. *
* When {@link Scheduler#createWorker()} is invoked a {@link Observable} of * {@link Completable}s is onNext'd to the combinator to be flattened. If * the inner {@link Observable} is not immediately subscribed to an calls to * {@link Worker#schedule} are buffered. Once the {@link Observable} is * subscribed to actions are then onNext'd as {@link Completable}s. *
* Finally the actions scheduled on the parent {@link Scheduler} when the * inner most {@link Completable}s are subscribed to. *
* When the {@link Worker} is unsubscribed the {@link Completable} emits an * onComplete and triggers any behavior in the flattening operator. The * {@link Observable} and all {@link Completable}s give to the flattening * function never onError. *
* Limit the amount concurrency two at a time without creating a new fix * size thread pool: * *
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // callbacks two at a time
* return Completable.merge(Observable.merge(workers), 2);
* });
*
* * This is a slightly different way to limit the concurrency but it has some * interesting benefits and drawbacks to the method above. It works by * limited the number of concurrent {@link Worker}s rather than individual * actions. Generally each {@link Observable} uses its own {@link Worker}. * This means that this will essentially limit the number of concurrent * subscribes. The danger comes from using operators like * {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where * subscribing to the first {@link Observable} could deadlock the * subscription to the second. * *
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // Observables two at a time
* return Completable.merge(Observable.merge(workers, 2));
* });
*
*
* Slowing down the rate to no more than than 1 a second. This suffers from
* the same problem as the one above I could find an {@link Observable}
* operator that limits the rate without dropping the values (aka leaky
* bucket algorithm).
*
*
* Scheduler slowScheduler = Schedulers.computation().when(workers -> {
* // use concatenate to make each worker happen one at a time.
* return Completable.concat(workers.map(actions -> {
* // delay the starting of the next worker by 1 second.
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
* }));
* });
*
*
* @param