using System; using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Linq; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Threading.Tasks; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Input; using System.Linq.Expressions; using System.Reactive.Disposables; using Splat; namespace ReactiveUI { public static class ReactiveCommand { /// /// Creates a default ReactiveCommand that has no background action. This /// is probably what you want if you were calling the constructor in /// previous versions of ReactiveUI /// /// An Observable that determines when the /// Command can Execute. WhenAny is a great way to create this! /// The scheduler to deliver events on. /// Defaults to RxApp.MainThreadScheduler. /// A ReactiveCommand whose ExecuteAsync just returns the /// CommandParameter immediately. Which you should ignore! public static ReactiveCommand Create(IObservable canExecute = null, IScheduler scheduler = null) { canExecute = canExecute ?? Observable.Return(true); return new ReactiveCommand(canExecute, x => Observable.Return(x), scheduler); } /// /// Creates a ReactiveCommand typed to the given executeAsync Observable /// method. Use this method if your background method returns IObservable. /// /// An Observable that determines when the /// Command can Execute. WhenAny is a great way to create this! /// Method to call that creates an Observable /// representing an operation to execute in the background. The Command's /// CanExecute will be false until this Observable completes. If this /// Observable terminates with OnError, the Exception is marshaled to /// ThrownExceptions. /// The scheduler to deliver events on. /// Defaults to RxApp.MainThreadScheduler. /// A ReactiveCommand which returns all items that are created via /// calling executeAsync as a single stream. public static ReactiveCommand CreateAsyncObservable(IObservable canExecute, Func> executeAsync, IScheduler scheduler = null) { return new ReactiveCommand(canExecute, executeAsync, scheduler); } /// /// Creates a ReactiveCommand typed to the given executeAsync Observable /// method. Use this method if your background method returns IObservable. /// /// Method to call that creates an Observable /// representing an operation to execute in the background. The Command's /// CanExecute will be false until this Observable completes. If this /// Observable terminates with OnError, the Exception is marshaled to /// ThrownExceptions. /// The scheduler to deliver events on. /// Defaults to RxApp.MainThreadScheduler. /// A ReactiveCommand which returns all items that are created via /// calling executeAsync as a single stream. public static ReactiveCommand CreateAsyncObservable(Func> executeAsync, IScheduler scheduler = null) { return new ReactiveCommand(Observable.Return(true), executeAsync, scheduler); } /// /// Creates a ReactiveCommand typed to the given executeAsync Task-based /// method. Use this method if your background method returns Task or uses /// async/await. /// /// An Observable that determines when the /// Command can Execute. WhenAny is a great way to create this! /// Method to call that creates a Task /// representing an operation to execute in the background. The Command's /// CanExecute will be false until this Task completes. If this /// Task terminates with an Exception, the Exception is marshaled to /// ThrownExceptions. /// The scheduler to deliver events on. /// Defaults to RxApp.MainThreadScheduler. /// A ReactiveCommand which returns all items that are created via /// calling executeAsync as a single stream. public static ReactiveCommand CreateAsyncTask(IObservable canExecute, Func> executeAsync, IScheduler scheduler = null) { return new ReactiveCommand(canExecute, x => executeAsync(x).ToObservable(), scheduler); } /// /// Creates a ReactiveCommand typed to the given executeAsync Task-based /// method. Use this method if your background method returns Task or uses /// async/await. /// /// Method to call that creates a Task /// representing an operation to execute in the background. The Command's /// CanExecute will be false until this Task completes. If this /// Task terminates with an Exception, the Exception is marshaled to /// ThrownExceptions. /// The scheduler to deliver events on. /// Defaults to RxApp.MainThreadScheduler. /// A ReactiveCommand which returns all items that are created via /// calling executeAsync as a single stream. public static ReactiveCommand CreateAsyncTask(Func> executeAsync, IScheduler scheduler = null) { return new ReactiveCommand(Observable.Return(true), x => executeAsync(x).ToObservable(), scheduler); } /// /// Creates a ReactiveCommand typed to the given executeAsync Task-based /// method. Use this method if your background method returns Task or uses /// async/await. /// /// Method to call that creates a Task /// representing an operation to execute in the background. The Command's /// CanExecute will be false until this Task completes. If this /// Task terminates with an Exception, the Exception is marshaled to /// ThrownExceptions. /// The scheduler to deliver events on. /// Defaults to RxApp.MainThreadScheduler. /// A ReactiveCommand which returns all items that are created via /// calling executeAsync as a single stream. public static ReactiveCommand CreateAsyncTask(Func executeAsync, IScheduler scheduler = null) { return new ReactiveCommand(Observable.Return(true), x => executeAsync(x).ToObservable(), scheduler); } /// /// Creates a ReactiveCommand typed to the given executeAsync Task-based /// method. Use this method if your background method returns Task or uses /// async/await. /// /// An Observable that determines when the /// Command can Execute. WhenAny is a great way to create this! /// Method to call that creates a Task /// representing an operation to execute in the background. The Command's /// CanExecute will be false until this Task completes. If this /// Task terminates with an Exception, the Exception is marshaled to /// ThrownExceptions. /// The scheduler to deliver events on. /// Defaults to RxApp.MainThreadScheduler. /// A ReactiveCommand which returns all items that are created via /// calling executeAsync as a single stream. public static ReactiveCommand CreateAsyncTask(IObservable canExecute, Func executeAsync, IScheduler scheduler = null) { return new ReactiveCommand(canExecute, x => executeAsync(x).ToObservable(), scheduler); } /// /// Creates a ReactiveCommand typed to the given executeAsync Task-based /// method that supports cancellation. Use this method if your background /// method returns Task or uses async/await. /// /// An Observable that determines when the /// Command can Execute. WhenAny is a great way to create this! /// Method to call that creates a Task /// representing an operation to execute in the background. The Command's /// CanExecute will be false until this Task completes. If this /// Task terminates with an Exception, the Exception is marshaled to /// ThrownExceptions. /// The scheduler to deliver events on. /// Defaults to RxApp.MainThreadScheduler. /// A ReactiveCommand which returns all items that are created via /// calling executeAsync as a single stream. public static ReactiveCommand CreateAsyncTask(IObservable canExecute, Func> executeAsync, IScheduler scheduler = null) { return new ReactiveCommand(canExecute, x => Observable.StartAsync(ct => executeAsync(x, ct)), scheduler); } /// /// Creates a ReactiveCommand typed to the given executeAsync Task-based /// method that supports cancellation. Use this method if your background /// method returns Task or uses async/await. /// /// Method to call that creates a Task /// representing an operation to execute in the background. The Command's /// CanExecute will be false until this Task completes. If this /// Task terminates with an Exception, the Exception is marshaled to /// ThrownExceptions. /// The scheduler to deliver events on. /// Defaults to RxApp.MainThreadScheduler. /// A ReactiveCommand which returns all items that are created via /// calling executeAsync as a single stream. public static ReactiveCommand CreateAsyncTask(Func> executeAsync, IScheduler scheduler = null) { return new ReactiveCommand(Observable.Return(true), x => Observable.StartAsync(ct => executeAsync(x,ct)), scheduler); } /// /// Creates a ReactiveCommand typed to the given executeAsync Task-based /// method that supports cancellation. Use this method if your background /// method returns Task or uses async/await. /// /// An Observable that determines when the /// Command can Execute. WhenAny is a great way to create this! /// Method to call that creates a Task /// representing an operation to execute in the background. The Command's /// CanExecute will be false until this Task completes. If this /// Task terminates with an Exception, the Exception is marshaled to /// ThrownExceptions. /// The scheduler to deliver events on. /// Defaults to RxApp.MainThreadScheduler. /// A ReactiveCommand which returns all items that are created via /// calling executeAsync as a single stream. public static ReactiveCommand CreateAsyncTask(Func executeAsync, IScheduler scheduler = null) { return new ReactiveCommand(Observable.Return(true), x => Observable.StartAsync(ct => executeAsync(x,ct)), scheduler); } /// /// Creates a ReactiveCommand typed to the given executeAsync Task-based /// method that supports cancellation. Use this method if your background /// method returns Task or uses async/await. /// /// Method to call that creates a Task /// representing an operation to execute in the background. The Command's /// CanExecute will be false until this Task completes. If this /// Task terminates with an Exception, the Exception is marshaled to /// ThrownExceptions. /// The scheduler to deliver events on. /// Defaults to RxApp.MainThreadScheduler. /// A ReactiveCommand which returns all items that are created via /// calling executeAsync as a single stream. public static ReactiveCommand CreateAsyncTask(IObservable canExecute, Func executeAsync, IScheduler scheduler = null) { return new ReactiveCommand(canExecute, x => Observable.StartAsync(ct => executeAsync(x,ct)), scheduler); } /// /// This creates a ReactiveCommand that calls several child /// ReactiveCommands when invoked. Its CanExecute will match the /// combined result of the child CanExecutes (i.e. if any child /// commands cannot execute, neither can the parent) /// /// An Observable that determines whether the /// parent command can execute /// The commands to combine. public static ReactiveCommand CreateCombined(IObservable canExecute, params IReactiveCommand[] commands) { var childrenCanExecute = commands .Select(x => x.CanExecuteObservable) .CombineLatest(latestCanExecute => latestCanExecute.All(x => x != false)); var canExecuteSum = Observable.CombineLatest( canExecute.StartWith(true), childrenCanExecute, (parent, child) => parent && child); var ret = ReactiveCommand.Create(canExecuteSum); ret.Subscribe(x => commands.ForEach(cmd => cmd.Execute(x))); return ret; } /// /// This creates a ReactiveCommand that calls several child /// ReactiveCommands when invoked. Its CanExecute will match the /// combined result of the child CanExecutes (i.e. if any child /// commands cannot execute, neither can the parent) /// /// The commands to combine. public static ReactiveCommand CreateCombined(params IReactiveCommand[] commands) { return CreateCombined(Observable.Return(true), commands); } } /// /// This class represents a Command that can optionally do a background task. /// The results of the background task (or a signal that the Command has been /// invoked) are delivered by Subscribing to the command itself, since /// ReactiveCommand is itself an Observable. The results of individual /// invocations can be retrieved via the ExecuteAsync method. /// public class ReactiveCommand : IReactiveCommand, IReactiveCommand { #if NET_45 public event EventHandler CanExecuteChanged; protected virtual void raiseCanExecuteChanged(EventArgs args) { var handler = this.CanExecuteChanged; if (handler != null) { handler(this, args); } } #else public event EventHandler CanExecuteChanged { add { if (canExecuteDisp == null) canExecuteDisp = canExecute.Connect(); CanExecuteChangedEventManager.AddHandler(this, value); } remove { CanExecuteChangedEventManager.RemoveHandler(this, value); } } protected virtual void raiseCanExecuteChanged(EventArgs args) { CanExecuteChangedEventManager.DeliverEvent(this, args); } #endif readonly Subject executeResults = new Subject(); readonly Subject isExecuting = new Subject(); readonly Func> executeAsync; readonly IScheduler scheduler; readonly ScheduledSubject exceptions; IConnectableObservable canExecute; bool canExecuteLatest = false; IDisposable canExecuteDisp; int inflightCount = 0; /// /// Don't use this, use ReactiveCommand.CreateXYZ instead /// public ReactiveCommand(IObservable canExecute, Func> executeAsync, IScheduler scheduler = null) { this.scheduler = scheduler ?? RxApp.MainThreadScheduler; this.executeAsync = executeAsync; this.canExecute = canExecute.CombineLatest(isExecuting.StartWith(false), (ce, ie) => ce && !ie) .Catch(ex => { exceptions.OnNext(ex); return Observable.Return(false); }) .Do(x => { var fireCanExecuteChanged = (canExecuteLatest != x); canExecuteLatest = x; if (fireCanExecuteChanged) { this.raiseCanExecuteChanged(EventArgs.Empty); } }) .Publish(); if (ModeDetector.InUnitTestRunner()) { this.canExecute.Connect(); } ThrownExceptions = exceptions = new ScheduledSubject(CurrentThreadScheduler.Instance, RxApp.DefaultExceptionHandler); } /// /// Executes a Command and returns the result asynchronously. This method /// makes it *much* easier to test ReactiveCommand, as well as create /// ReactiveCommands who invoke inferior commands and wait on their results. /// /// Note that you **must** Subscribe to the Observable returned by /// ExecuteAsync or else nothing will happen (i.e. ExecuteAsync is lazy) /// /// Note also that the command will be executed, irrespective of the current value /// of the command's canExecute observable. /// /// An Observable representing a single invocation of the Command. /// Don't use this. public IObservable ExecuteAsync(object parameter = null) { var ret = Observable.Create(subj => { if (Interlocked.Increment(ref inflightCount) == 1) { isExecuting.OnNext(true); } var decrement = new SerialDisposable() { Disposable = Disposable.Create(() => { if (Interlocked.Decrement(ref inflightCount) == 0) { isExecuting.OnNext(false); } }) }; var disp = executeAsync(parameter) .ObserveOn(scheduler) .Do( _ => { }, e => decrement.Disposable = Disposable.Empty, () => decrement.Disposable = Disposable.Empty) .Do(executeResults.OnNext, exceptions.OnNext) .Subscribe(subj); return new CompositeDisposable(disp, decrement); }); return ret.Publish().RefCount(); } /// /// Executes a Command and returns the result as a Task. This method /// makes it *much* easier to test ReactiveCommand, as well as create /// ReactiveCommands who invoke inferior commands and wait on their results. /// /// A Task representing a single invocation of the Command. /// Don't use this. /// An optional token that can cancel the operation, if /// the operation supports it. public Task ExecuteAsyncTask(object parameter = null, CancellationToken ct = default(CancellationToken)) { return ExecuteAsync(parameter).ToTask(ct); } /// /// Fires whenever an exception would normally terminate ReactiveUI /// internal state. /// /// The thrown exceptions. public IObservable ThrownExceptions { get; protected set; } /// /// Returns a BehaviorSubject (i.e. an Observable which is guaranteed to /// return at least one value immediately) representing the CanExecute /// state. /// public IObservable CanExecuteObservable { get { var ret = canExecute.StartWith(canExecuteLatest).DistinctUntilChanged(); if (canExecuteDisp != null) return ret; return Observable.Create(subj => { var disp = ret.Subscribe(subj); // NB: We intentionally leak the CanExecute disconnect, it's // cleaned up by the global Dispose. This is kind of a // "Lazy Subscription" to CanExecute by the command itself. canExecuteDisp = canExecute.Connect(); return disp; }); } } public IObservable IsExecuting { get { return isExecuting.StartWith(inflightCount > 0); } } public IDisposable Subscribe(IObserver observer) { return executeResults.Subscribe(observer); } public bool CanExecute(object parameter) { if (canExecuteDisp == null) canExecuteDisp = canExecute.Connect(); return canExecuteLatest; } /// /// Executes a Command. Note that the command will be executed, irrespective of the current value /// of the command's canExecute observable. /// public void Execute(object parameter) { ExecuteAsync(parameter).Catch(Observable.Empty()).Subscribe(); } public virtual void Dispose() { var disp = Interlocked.Exchange(ref canExecuteDisp, null); if (disp != null) disp.Dispose(); } } public static class ReactiveCommandMixins { /// /// ToCommand is a convenience method for returning a new /// ReactiveCommand based on an existing Observable chain. /// /// The scheduler to publish events on - default /// is RxApp.MainThreadScheduler. /// A new ReactiveCommand whose CanExecute Observable is the /// current object. public static ReactiveCommand ToCommand(this IObservable This, IScheduler scheduler = null) { return ReactiveCommand.Create(This, scheduler); } /// /// A utility method that will pipe an Observable to an ICommand (i.e. /// it will first call its CanExecute with the provided value, then if /// the command can be executed, Execute() will be called) /// /// The command to be executed. /// An object that when disposes, disconnects the Observable /// from the command. public static IDisposable InvokeCommand(this IObservable This, ICommand command) { return This.Throttle(x => Observable.FromEventPattern(h => command.CanExecuteChanged += h, h => command.CanExecuteChanged -= h) .Select(_ => Unit.Default) .StartWith(Unit.Default) .Where(_ => command.CanExecute(x))) .Subscribe(x => { command.Execute(x); }); } /// /// A utility method that will pipe an Observable to an ICommand (i.e. /// it will first call its CanExecute with the provided value, then if /// the command can be executed, Execute() will be called) /// /// The command to be executed. /// An object that when disposes, disconnects the Observable /// from the command. public static IDisposable InvokeCommand(this IObservable This, IReactiveCommand command) { return This.Throttle(x => command.CanExecuteObservable.StartWith(command.CanExecute(x)).Where(b => b)) .Select(x => command.ExecuteAsync(x).Catch(Observable.Empty())) .Switch() .Subscribe(); } /// /// A utility method that will pipe an Observable to an ICommand (i.e. /// it will first call its CanExecute with the provided value, then if /// the command can be executed, Execute() will be called) /// /// The root object which has the Command. /// The expression to reference the Command. /// An object that when disposes, disconnects the Observable /// from the command. public static IDisposable InvokeCommand(this IObservable This, TTarget target, Expression> commandProperty) { return This.CombineLatest(target.WhenAnyValue(commandProperty), (val, cmd) => new { val, cmd }) .Throttle(x => Observable.FromEventPattern(h => x.cmd.CanExecuteChanged += h, h => x.cmd.CanExecuteChanged -= h) .Select(_ => Unit.Default) .StartWith(Unit.Default) .Where(_ => x.cmd.CanExecute(x.val))) .Subscribe(x => { x.cmd.Execute(x.val); }); } /// /// A utility method that will pipe an Observable to an ICommand (i.e. /// it will first call its CanExecute with the provided value, then if /// the command can be executed, Execute() will be called) /// /// The root object which has the Command. /// The expression to reference the Command. /// An object that when disposes, disconnects the Observable /// from the command. public static IDisposable InvokeCommand(this IObservable This, TTarget target, Expression>> commandProperty) { return This.CombineLatest(target.WhenAnyValue(commandProperty), (val, cmd) => new { val, cmd }) .Throttle(x => x.cmd.CanExecuteObservable.StartWith(x.cmd.CanExecute(x.val)).Where(b => b)) .Select(x => x.cmd.ExecuteAsync(x.val).Catch(Observable.Empty())) .Switch() .Subscribe(); } /// /// A convenience method for subscribing and creating ReactiveCommands /// in the same call. Equivalent to Subscribing to the command, except /// there's no way to release your Subscription but that's probably fine. /// public static ReactiveCommand OnExecuteCompleted(this ReactiveCommand This, Action onNext, Action onError = null) { if (onError != null) { This.Subscribe(onNext, onError); return This; } else { This.Subscribe(onNext); return This; } } } }