using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Threading.Tasks;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Input;
using System.Linq.Expressions;
using System.Reactive.Disposables;
using Splat;
namespace ReactiveUI
{
public static class ReactiveCommand
{
///
/// Creates a default ReactiveCommand that has no background action. This
/// is probably what you want if you were calling the constructor in
/// previous versions of ReactiveUI
///
/// An Observable that determines when the
/// Command can Execute. WhenAny is a great way to create this!
/// The scheduler to deliver events on.
/// Defaults to RxApp.MainThreadScheduler.
/// A ReactiveCommand whose ExecuteAsync just returns the
/// CommandParameter immediately. Which you should ignore!
public static ReactiveCommand Create(IObservable canExecute = null, IScheduler scheduler = null)
{
canExecute = canExecute ?? Observable.Return(true);
return new ReactiveCommand(canExecute, x => Observable.Return(x), scheduler);
}
///
/// Creates a ReactiveCommand typed to the given executeAsync Observable
/// method. Use this method if your background method returns IObservable.
///
/// An Observable that determines when the
/// Command can Execute. WhenAny is a great way to create this!
/// Method to call that creates an Observable
/// representing an operation to execute in the background. The Command's
/// CanExecute will be false until this Observable completes. If this
/// Observable terminates with OnError, the Exception is marshaled to
/// ThrownExceptions.
/// The scheduler to deliver events on.
/// Defaults to RxApp.MainThreadScheduler.
/// A ReactiveCommand which returns all items that are created via
/// calling executeAsync as a single stream.
public static ReactiveCommand CreateAsyncObservable(IObservable canExecute, Func> executeAsync, IScheduler scheduler = null)
{
return new ReactiveCommand(canExecute, executeAsync, scheduler);
}
///
/// Creates a ReactiveCommand typed to the given executeAsync Observable
/// method. Use this method if your background method returns IObservable.
///
/// Method to call that creates an Observable
/// representing an operation to execute in the background. The Command's
/// CanExecute will be false until this Observable completes. If this
/// Observable terminates with OnError, the Exception is marshaled to
/// ThrownExceptions.
/// The scheduler to deliver events on.
/// Defaults to RxApp.MainThreadScheduler.
/// A ReactiveCommand which returns all items that are created via
/// calling executeAsync as a single stream.
public static ReactiveCommand CreateAsyncObservable(Func> executeAsync, IScheduler scheduler = null)
{
return new ReactiveCommand(Observable.Return(true), executeAsync, scheduler);
}
///
/// Creates a ReactiveCommand typed to the given executeAsync Task-based
/// method. Use this method if your background method returns Task or uses
/// async/await.
///
/// An Observable that determines when the
/// Command can Execute. WhenAny is a great way to create this!
/// Method to call that creates a Task
/// representing an operation to execute in the background. The Command's
/// CanExecute will be false until this Task completes. If this
/// Task terminates with an Exception, the Exception is marshaled to
/// ThrownExceptions.
/// The scheduler to deliver events on.
/// Defaults to RxApp.MainThreadScheduler.
/// A ReactiveCommand which returns all items that are created via
/// calling executeAsync as a single stream.
public static ReactiveCommand CreateAsyncTask(IObservable canExecute, Func> executeAsync, IScheduler scheduler = null)
{
return new ReactiveCommand(canExecute, x => executeAsync(x).ToObservable(), scheduler);
}
///
/// Creates a ReactiveCommand typed to the given executeAsync Task-based
/// method. Use this method if your background method returns Task or uses
/// async/await.
///
/// Method to call that creates a Task
/// representing an operation to execute in the background. The Command's
/// CanExecute will be false until this Task completes. If this
/// Task terminates with an Exception, the Exception is marshaled to
/// ThrownExceptions.
/// The scheduler to deliver events on.
/// Defaults to RxApp.MainThreadScheduler.
/// A ReactiveCommand which returns all items that are created via
/// calling executeAsync as a single stream.
public static ReactiveCommand CreateAsyncTask(Func> executeAsync, IScheduler scheduler = null)
{
return new ReactiveCommand(Observable.Return(true), x => executeAsync(x).ToObservable(), scheduler);
}
///
/// Creates a ReactiveCommand typed to the given executeAsync Task-based
/// method. Use this method if your background method returns Task or uses
/// async/await.
///
/// Method to call that creates a Task
/// representing an operation to execute in the background. The Command's
/// CanExecute will be false until this Task completes. If this
/// Task terminates with an Exception, the Exception is marshaled to
/// ThrownExceptions.
/// The scheduler to deliver events on.
/// Defaults to RxApp.MainThreadScheduler.
/// A ReactiveCommand which returns all items that are created via
/// calling executeAsync as a single stream.
public static ReactiveCommand CreateAsyncTask(Func executeAsync, IScheduler scheduler = null)
{
return new ReactiveCommand(Observable.Return(true), x => executeAsync(x).ToObservable(), scheduler);
}
///
/// Creates a ReactiveCommand typed to the given executeAsync Task-based
/// method. Use this method if your background method returns Task or uses
/// async/await.
///
/// An Observable that determines when the
/// Command can Execute. WhenAny is a great way to create this!
/// Method to call that creates a Task
/// representing an operation to execute in the background. The Command's
/// CanExecute will be false until this Task completes. If this
/// Task terminates with an Exception, the Exception is marshaled to
/// ThrownExceptions.
/// The scheduler to deliver events on.
/// Defaults to RxApp.MainThreadScheduler.
/// A ReactiveCommand which returns all items that are created via
/// calling executeAsync as a single stream.
public static ReactiveCommand CreateAsyncTask(IObservable canExecute, Func executeAsync, IScheduler scheduler = null)
{
return new ReactiveCommand(canExecute, x => executeAsync(x).ToObservable(), scheduler);
}
///
/// Creates a ReactiveCommand typed to the given executeAsync Task-based
/// method that supports cancellation. Use this method if your background
/// method returns Task or uses async/await.
///
/// An Observable that determines when the
/// Command can Execute. WhenAny is a great way to create this!
/// Method to call that creates a Task
/// representing an operation to execute in the background. The Command's
/// CanExecute will be false until this Task completes. If this
/// Task terminates with an Exception, the Exception is marshaled to
/// ThrownExceptions.
/// The scheduler to deliver events on.
/// Defaults to RxApp.MainThreadScheduler.
/// A ReactiveCommand which returns all items that are created via
/// calling executeAsync as a single stream.
public static ReactiveCommand CreateAsyncTask(IObservable canExecute, Func> executeAsync, IScheduler scheduler = null)
{
return new ReactiveCommand(canExecute, x => Observable.StartAsync(ct => executeAsync(x, ct)), scheduler);
}
///
/// Creates a ReactiveCommand typed to the given executeAsync Task-based
/// method that supports cancellation. Use this method if your background
/// method returns Task or uses async/await.
///
/// Method to call that creates a Task
/// representing an operation to execute in the background. The Command's
/// CanExecute will be false until this Task completes. If this
/// Task terminates with an Exception, the Exception is marshaled to
/// ThrownExceptions.
/// The scheduler to deliver events on.
/// Defaults to RxApp.MainThreadScheduler.
/// A ReactiveCommand which returns all items that are created via
/// calling executeAsync as a single stream.
public static ReactiveCommand CreateAsyncTask(Func> executeAsync, IScheduler scheduler = null)
{
return new ReactiveCommand(Observable.Return(true), x => Observable.StartAsync(ct => executeAsync(x,ct)), scheduler);
}
///
/// Creates a ReactiveCommand typed to the given executeAsync Task-based
/// method that supports cancellation. Use this method if your background
/// method returns Task or uses async/await.
///
/// An Observable that determines when the
/// Command can Execute. WhenAny is a great way to create this!
/// Method to call that creates a Task
/// representing an operation to execute in the background. The Command's
/// CanExecute will be false until this Task completes. If this
/// Task terminates with an Exception, the Exception is marshaled to
/// ThrownExceptions.
/// The scheduler to deliver events on.
/// Defaults to RxApp.MainThreadScheduler.
/// A ReactiveCommand which returns all items that are created via
/// calling executeAsync as a single stream.
public static ReactiveCommand CreateAsyncTask(Func executeAsync, IScheduler scheduler = null)
{
return new ReactiveCommand(Observable.Return(true), x => Observable.StartAsync(ct => executeAsync(x,ct)), scheduler);
}
///
/// Creates a ReactiveCommand typed to the given executeAsync Task-based
/// method that supports cancellation. Use this method if your background
/// method returns Task or uses async/await.
///
/// Method to call that creates a Task
/// representing an operation to execute in the background. The Command's
/// CanExecute will be false until this Task completes. If this
/// Task terminates with an Exception, the Exception is marshaled to
/// ThrownExceptions.
/// The scheduler to deliver events on.
/// Defaults to RxApp.MainThreadScheduler.
/// A ReactiveCommand which returns all items that are created via
/// calling executeAsync as a single stream.
public static ReactiveCommand CreateAsyncTask(IObservable canExecute, Func executeAsync, IScheduler scheduler = null)
{
return new ReactiveCommand(canExecute, x => Observable.StartAsync(ct => executeAsync(x,ct)), scheduler);
}
///
/// This creates a ReactiveCommand that calls several child
/// ReactiveCommands when invoked. Its CanExecute will match the
/// combined result of the child CanExecutes (i.e. if any child
/// commands cannot execute, neither can the parent)
///
/// An Observable that determines whether the
/// parent command can execute
/// The commands to combine.
public static ReactiveCommand CreateCombined(IObservable canExecute, params IReactiveCommand[] commands)
{
var childrenCanExecute = commands
.Select(x => x.CanExecuteObservable)
.CombineLatest(latestCanExecute => latestCanExecute.All(x => x != false));
var canExecuteSum = Observable.CombineLatest(
canExecute.StartWith(true),
childrenCanExecute,
(parent, child) => parent && child);
var ret = ReactiveCommand.Create(canExecuteSum);
ret.Subscribe(x => commands.ForEach(cmd => cmd.Execute(x)));
return ret;
}
///
/// This creates a ReactiveCommand that calls several child
/// ReactiveCommands when invoked. Its CanExecute will match the
/// combined result of the child CanExecutes (i.e. if any child
/// commands cannot execute, neither can the parent)
///
/// The commands to combine.
public static ReactiveCommand CreateCombined(params IReactiveCommand[] commands)
{
return CreateCombined(Observable.Return(true), commands);
}
}
///
/// This class represents a Command that can optionally do a background task.
/// The results of the background task (or a signal that the Command has been
/// invoked) are delivered by Subscribing to the command itself, since
/// ReactiveCommand is itself an Observable. The results of individual
/// invocations can be retrieved via the ExecuteAsync method.
///
public class ReactiveCommand : IReactiveCommand, IReactiveCommand
{
#if NET_45
public event EventHandler CanExecuteChanged;
protected virtual void raiseCanExecuteChanged(EventArgs args)
{
var handler = this.CanExecuteChanged;
if (handler != null) {
handler(this, args);
}
}
#else
public event EventHandler CanExecuteChanged
{
add {
if (canExecuteDisp == null) canExecuteDisp = canExecute.Connect();
CanExecuteChangedEventManager.AddHandler(this, value);
}
remove { CanExecuteChangedEventManager.RemoveHandler(this, value); }
}
protected virtual void raiseCanExecuteChanged(EventArgs args)
{
CanExecuteChangedEventManager.DeliverEvent(this, args);
}
#endif
readonly Subject executeResults = new Subject();
readonly Subject isExecuting = new Subject();
readonly Func> executeAsync;
readonly IScheduler scheduler;
readonly ScheduledSubject exceptions;
IConnectableObservable canExecute;
bool canExecuteLatest = false;
IDisposable canExecuteDisp;
int inflightCount = 0;
///
/// Don't use this, use ReactiveCommand.CreateXYZ instead
///
public ReactiveCommand(IObservable canExecute, Func> executeAsync, IScheduler scheduler = null)
{
this.scheduler = scheduler ?? RxApp.MainThreadScheduler;
this.executeAsync = executeAsync;
this.canExecute = canExecute.CombineLatest(isExecuting.StartWith(false), (ce, ie) => ce && !ie)
.Catch(ex => {
exceptions.OnNext(ex);
return Observable.Return(false);
})
.Do(x => {
var fireCanExecuteChanged = (canExecuteLatest != x);
canExecuteLatest = x;
if (fireCanExecuteChanged) {
this.raiseCanExecuteChanged(EventArgs.Empty);
}
})
.Publish();
if (ModeDetector.InUnitTestRunner()) {
this.canExecute.Connect();
}
ThrownExceptions = exceptions = new ScheduledSubject(CurrentThreadScheduler.Instance, RxApp.DefaultExceptionHandler);
}
///
/// Executes a Command and returns the result asynchronously. This method
/// makes it *much* easier to test ReactiveCommand, as well as create
/// ReactiveCommands who invoke inferior commands and wait on their results.
///
/// Note that you **must** Subscribe to the Observable returned by
/// ExecuteAsync or else nothing will happen (i.e. ExecuteAsync is lazy)
///
/// Note also that the command will be executed, irrespective of the current value
/// of the command's canExecute observable.
///
/// An Observable representing a single invocation of the Command.
/// Don't use this.
public IObservable ExecuteAsync(object parameter = null)
{
var ret = Observable.Create(subj => {
if (Interlocked.Increment(ref inflightCount) == 1) {
isExecuting.OnNext(true);
}
var decrement = new SerialDisposable() {
Disposable = Disposable.Create(() => {
if (Interlocked.Decrement(ref inflightCount) == 0) {
isExecuting.OnNext(false);
}
})
};
var disp = executeAsync(parameter)
.ObserveOn(scheduler)
.Do(
_ => { },
e => decrement.Disposable = Disposable.Empty,
() => decrement.Disposable = Disposable.Empty)
.Do(executeResults.OnNext, exceptions.OnNext)
.Subscribe(subj);
return new CompositeDisposable(disp, decrement);
});
return ret.Publish().RefCount();
}
///
/// Executes a Command and returns the result as a Task. This method
/// makes it *much* easier to test ReactiveCommand, as well as create
/// ReactiveCommands who invoke inferior commands and wait on their results.
///
/// A Task representing a single invocation of the Command.
/// Don't use this.
/// An optional token that can cancel the operation, if
/// the operation supports it.
public Task ExecuteAsyncTask(object parameter = null, CancellationToken ct = default(CancellationToken))
{
return ExecuteAsync(parameter).ToTask(ct);
}
///
/// Fires whenever an exception would normally terminate ReactiveUI
/// internal state.
///
/// The thrown exceptions.
public IObservable ThrownExceptions { get; protected set; }
///
/// Returns a BehaviorSubject (i.e. an Observable which is guaranteed to
/// return at least one value immediately) representing the CanExecute
/// state.
///
public IObservable CanExecuteObservable {
get {
var ret = canExecute.StartWith(canExecuteLatest).DistinctUntilChanged();
if (canExecuteDisp != null) return ret;
return Observable.Create(subj => {
var disp = ret.Subscribe(subj);
// NB: We intentionally leak the CanExecute disconnect, it's
// cleaned up by the global Dispose. This is kind of a
// "Lazy Subscription" to CanExecute by the command itself.
canExecuteDisp = canExecute.Connect();
return disp;
});
}
}
public IObservable IsExecuting {
get { return isExecuting.StartWith(inflightCount > 0); }
}
public IDisposable Subscribe(IObserver observer)
{
return executeResults.Subscribe(observer);
}
public bool CanExecute(object parameter)
{
if (canExecuteDisp == null) canExecuteDisp = canExecute.Connect();
return canExecuteLatest;
}
///
/// Executes a Command. Note that the command will be executed, irrespective of the current value
/// of the command's canExecute observable.
///
public void Execute(object parameter)
{
ExecuteAsync(parameter).Catch(Observable.Empty()).Subscribe();
}
public virtual void Dispose()
{
var disp = Interlocked.Exchange(ref canExecuteDisp, null);
if (disp != null) disp.Dispose();
}
}
public static class ReactiveCommandMixins
{
///
/// ToCommand is a convenience method for returning a new
/// ReactiveCommand based on an existing Observable chain.
///
/// The scheduler to publish events on - default
/// is RxApp.MainThreadScheduler.
/// A new ReactiveCommand whose CanExecute Observable is the
/// current object.
public static ReactiveCommand ToCommand(this IObservable This, IScheduler scheduler = null)
{
return ReactiveCommand.Create(This, scheduler);
}
///
/// A utility method that will pipe an Observable to an ICommand (i.e.
/// it will first call its CanExecute with the provided value, then if
/// the command can be executed, Execute() will be called)
///
/// The command to be executed.
/// An object that when disposes, disconnects the Observable
/// from the command.
public static IDisposable InvokeCommand(this IObservable This, ICommand command)
{
return This.Throttle(x => Observable.FromEventPattern(h => command.CanExecuteChanged += h, h => command.CanExecuteChanged -= h)
.Select(_ => Unit.Default)
.StartWith(Unit.Default)
.Where(_ => command.CanExecute(x)))
.Subscribe(x => {
command.Execute(x);
});
}
///
/// A utility method that will pipe an Observable to an ICommand (i.e.
/// it will first call its CanExecute with the provided value, then if
/// the command can be executed, Execute() will be called)
///
/// The command to be executed.
/// An object that when disposes, disconnects the Observable
/// from the command.
public static IDisposable InvokeCommand(this IObservable This, IReactiveCommand command)
{
return This.Throttle(x => command.CanExecuteObservable.StartWith(command.CanExecute(x)).Where(b => b))
.Select(x => command.ExecuteAsync(x).Catch(Observable.Empty()))
.Switch()
.Subscribe();
}
///
/// A utility method that will pipe an Observable to an ICommand (i.e.
/// it will first call its CanExecute with the provided value, then if
/// the command can be executed, Execute() will be called)
///
/// The root object which has the Command.
/// The expression to reference the Command.
/// An object that when disposes, disconnects the Observable
/// from the command.
public static IDisposable InvokeCommand(this IObservable This, TTarget target, Expression> commandProperty)
{
return This.CombineLatest(target.WhenAnyValue(commandProperty), (val, cmd) => new { val, cmd })
.Throttle(x => Observable.FromEventPattern(h => x.cmd.CanExecuteChanged += h, h => x.cmd.CanExecuteChanged -= h)
.Select(_ => Unit.Default)
.StartWith(Unit.Default)
.Where(_ => x.cmd.CanExecute(x.val)))
.Subscribe(x => {
x.cmd.Execute(x.val);
});
}
///
/// A utility method that will pipe an Observable to an ICommand (i.e.
/// it will first call its CanExecute with the provided value, then if
/// the command can be executed, Execute() will be called)
///
/// The root object which has the Command.
/// The expression to reference the Command.
/// An object that when disposes, disconnects the Observable
/// from the command.
public static IDisposable InvokeCommand(this IObservable This, TTarget target, Expression>> commandProperty)
{
return This.CombineLatest(target.WhenAnyValue(commandProperty), (val, cmd) => new { val, cmd })
.Throttle(x => x.cmd.CanExecuteObservable.StartWith(x.cmd.CanExecute(x.val)).Where(b => b))
.Select(x => x.cmd.ExecuteAsync(x.val).Catch(Observable.Empty()))
.Switch()
.Subscribe();
}
///
/// A convenience method for subscribing and creating ReactiveCommands
/// in the same call. Equivalent to Subscribing to the command, except
/// there's no way to release your Subscription but that's probably fine.
///
public static ReactiveCommand OnExecuteCompleted(this ReactiveCommand This, Action onNext, Action onError = null)
{
if (onError != null) {
This.Subscribe(onNext, onError);
return This;
} else {
This.Subscribe(onNext);
return This;
}
}
}
}