diff --git a/goldens/public-api/core/rxjs-interop/index.api.md b/goldens/public-api/core/rxjs-interop/index.api.md index b487f39a244c..272442ad7774 100644 --- a/goldens/public-api/core/rxjs-interop/index.api.md +++ b/goldens/public-api/core/rxjs-interop/index.api.md @@ -23,6 +23,9 @@ export function outputFromObservable(observable: Observable, opts?: Output // @public export function outputToObservable(ref: OutputRef): Observable; +// @public +export function pendingUntilEvent(injector?: Injector): MonoTypeOperatorFunction; + // @public export function rxResource(opts: RxResourceOptions): ResourceRef; diff --git a/packages/core/rxjs-interop/src/index.ts b/packages/core/rxjs-interop/src/index.ts index 364ac8233eef..8694c2cca5f1 100644 --- a/packages/core/rxjs-interop/src/index.ts +++ b/packages/core/rxjs-interop/src/index.ts @@ -15,4 +15,5 @@ export { toObservableMicrotask as ɵtoObservableMicrotask, } from './to_observable'; export {toSignal, ToSignalOptions} from './to_signal'; +export {pendingUntilEvent} from './pending_until_event'; export {RxResourceOptions, rxResource} from './rx_resource'; diff --git a/packages/core/rxjs-interop/src/pending_until_event.ts b/packages/core/rxjs-interop/src/pending_until_event.ts new file mode 100644 index 000000000000..6b08625c0684 --- /dev/null +++ b/packages/core/rxjs-interop/src/pending_until_event.ts @@ -0,0 +1,64 @@ +/** + * @license + * Copyright Google LLC All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ + +import {assertInInjectionContext, PendingTasks, inject, Injector} from '@angular/core'; +import {MonoTypeOperatorFunction, Observable} from 'rxjs'; + +/** + * Operator which makes the application unstable until the observable emits, complets, errors, or is unsubscribed. + * + * Use this operator in observables whose subscriptions are important for rendering and should be included in SSR serialization. + * + * @param injector The `Injector` to use during creation. If this is not provided, the current injection context will be used instead (via `inject`). + * + * @experimental + */ +export function pendingUntilEvent(injector?: Injector): MonoTypeOperatorFunction { + if (injector === undefined) { + assertInInjectionContext(pendingUntilEvent); + injector = inject(Injector); + } + const taskService = injector.get(PendingTasks); + + return (sourceObservable) => { + return new Observable((originalSubscriber) => { + // create a new task on subscription + const removeTask = taskService.add(); + + let cleanedUp = false; + function cleanupTask() { + if (cleanedUp) { + return; + } + + removeTask(); + cleanedUp = true; + } + + const innerSubscription = sourceObservable.subscribe({ + next: (v) => { + originalSubscriber.next(v); + cleanupTask(); + }, + complete: () => { + originalSubscriber.complete(); + cleanupTask(); + }, + error: (e) => { + originalSubscriber.error(e); + cleanupTask(); + }, + }); + innerSubscription.add(() => { + originalSubscriber.unsubscribe(); + cleanupTask(); + }); + return innerSubscription; + }); + }; +} diff --git a/packages/core/rxjs-interop/test/pending_until_event_spec.ts b/packages/core/rxjs-interop/test/pending_until_event_spec.ts new file mode 100644 index 000000000000..bf7625769a80 --- /dev/null +++ b/packages/core/rxjs-interop/test/pending_until_event_spec.ts @@ -0,0 +1,249 @@ +/** + * @license + * Copyright Google LLC All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ + +import {EnvironmentInjector, ɵPendingTasks as PendingTasks, ApplicationRef} from '@angular/core'; +import { + BehaviorSubject, + EMPTY, + Subject, + catchError, + delay, + config, + finalize, + firstValueFrom, + interval, + of, +} from 'rxjs'; + +import {pendingUntilEvent} from '@angular/core/rxjs-interop'; +import {TestBed} from '@angular/core/testing'; + +describe('pendingUntilEvent', () => { + let taskService: PendingTasks; + let injector: EnvironmentInjector; + let appRef: ApplicationRef; + beforeEach(() => { + taskService = TestBed.inject(PendingTasks); + injector = TestBed.inject(EnvironmentInjector); + appRef = TestBed.inject(ApplicationRef); + }); + + it('should not block stability until subscription', async () => { + const originalSource = new BehaviorSubject(0); + const delayedSource = originalSource.pipe(delay(5), pendingUntilEvent(injector)); + expect(taskService.hasPendingTasks.value).toEqual(false); + + const emitPromise = firstValueFrom(delayedSource); + expect(taskService.hasPendingTasks.value).toEqual(true); + + await expectAsync(emitPromise).toBeResolvedTo(0); + await expectAsync(appRef.whenStable()).toBeResolved(); + }); + + it('runs the subscription body before stability', async () => { + const source = of(1).pipe(pendingUntilEvent(injector)); + + // stable before subscription + expect(taskService.hasPendingTasks.value).toEqual(false); + source.subscribe(() => { + // unstable within synchronous subscription body + expect(taskService.hasPendingTasks.value).toBe(true); + }); + // stable after above synchronous subscription execution + await expectAsync(appRef.whenStable()).toBeResolved(); + }); + + it('only blocks stability until first emit', async () => { + const intervalSource = interval(5).pipe(pendingUntilEvent(injector)); + expect(taskService.hasPendingTasks.value).toEqual(false); + + await new Promise(async (resolve) => { + const subscription = intervalSource.subscribe(async (v) => { + if (v === 0) { + expect(taskService.hasPendingTasks.value).toBe(true); + } else { + await expectAsync(appRef.whenStable()).toBeResolved(); + } + if (v === 3) { + subscription.unsubscribe(); + resolve(); + } + }); + expect(taskService.hasPendingTasks.value).toBe(true); + }); + }); + + it('should unblock stability on complete (but no emit)', async () => { + const sub = new Subject(); + sub.asObservable().pipe(pendingUntilEvent(injector)).subscribe(); + expect(taskService.hasPendingTasks.value).toBe(true); + sub.complete(); + await expectAsync(appRef.whenStable()).toBeResolved(); + }); + + it('should unblock stability on unsubscribe before emit', async () => { + const sub = new Subject(); + const subscription = sub.asObservable().pipe(pendingUntilEvent(injector)).subscribe(); + expect(taskService.hasPendingTasks.value).toBe(true); + subscription.unsubscribe(); + await expectAsync(appRef.whenStable()).toBeResolved(); + }); + + // Note that we cannot execute `finalize` operators that appear _after_ ours before + // removing the pending task. We need to register the finalize operation on the subscription + // as soon as the operator executes. A `finalize` operator later on in the stream will + // be appear later in the finalizers list. These finalizers are both registered and executed + // serially. We cannot execute our finalizer after other finalizers in the pipeline. + it('should execute user finalize body before stability (as long as it appears first)', async () => { + const sub = new Subject(); + let finalizeExecuted = false; + const subscription = sub + .asObservable() + .pipe( + finalize(() => { + finalizeExecuted = true; + expect(taskService.hasPendingTasks.value).toBe(true); + }), + pendingUntilEvent(injector), + ) + .subscribe(); + expect(taskService.hasPendingTasks.value).toBe(true); + subscription.unsubscribe(); + await expectAsync(appRef.whenStable()).toBeResolved(); + expect(finalizeExecuted).toBe(true); + }); + + it('should not throw if application is destroyed before emit', async () => { + const sub = new Subject(); + sub.asObservable().pipe(pendingUntilEvent(injector)).subscribe(); + expect(taskService.hasPendingTasks.value).toBe(true); + TestBed.resetTestingModule(); + await expectAsync(appRef.whenStable()).toBeResolved(); + sub.next(); + await expectAsync(appRef.whenStable()).toBeResolved(); + }); + + it('should unblock stability on error before emit', async () => { + const sub = new Subject(); + sub + .asObservable() + .pipe( + pendingUntilEvent(injector), + catchError(() => EMPTY), + ) + .subscribe(); + expect(taskService.hasPendingTasks.value).toBe(true); + sub.error(new Error('error in pipe')); + await expectAsync(appRef.whenStable()).toBeResolved(); + sub.next(); + await expectAsync(appRef.whenStable()).toBeResolved(); + }); + + it('should unblock stability on error in subscription', async () => { + function nextUncaughtError() { + return new Promise((resolve) => { + config.onUnhandledError = (e) => { + config.onUnhandledError = null; + resolve(e); + }; + }); + } + const sub = new Subject(); + sub + .asObservable() + .pipe(pendingUntilEvent(injector)) + .subscribe({ + next: () => { + throw new Error('oh noes'); + }, + }); + expect(taskService.hasPendingTasks.value).toBe(true); + const errorPromise = nextUncaughtError(); + sub.next(); + await expectAsync(errorPromise).toBeResolved(); + await expectAsync(appRef.whenStable()).toBeResolved(); + + const errorPromise2 = nextUncaughtError(); + sub.next(); + await expectAsync(appRef.whenStable()).toBeResolved(); + await expectAsync(errorPromise2).toBeResolved(); + }); + + it('finalize and complete are delivered correctly', () => { + const sub = new Subject(); + let log: string[] = []; + const obs1 = sub.asObservable().pipe( + pendingUntilEvent(injector), + finalize(() => { + log.push('finalize'); + }), + ); + + // complete after subscription + obs1.subscribe({ + complete: () => { + log.push('complete'); + }, + }); + sub.complete(); + expect(log).toEqual(['complete', 'finalize']); + + // already completed before subscription + log.length = 0; + obs1.subscribe({ + complete: () => { + log.push('complete'); + }, + }); + expect(log).toEqual(['complete', 'finalize']); + + log.length = 0; + new Subject() + .asObservable() + .pipe( + pendingUntilEvent(injector), + finalize(() => { + log.push('finalize'); + }), + ) + .subscribe({ + complete: () => { + log.push('complete'); + }, + }) + .unsubscribe(); + expect(log).toEqual(['finalize']); + }); + + it('should block stability for each new subscriber', async () => { + const sub = new Subject(); + const observable = sub.asObservable().pipe(delay(5), pendingUntilEvent(injector)); + + observable.subscribe(); + expect(taskService.hasPendingTasks.value).toBe(true); + sub.next(); + observable.subscribe(); + // first subscription unblocks + await new Promise((r) => setTimeout(r, 5)); + // still pending because the other subscribed after the emit + expect(taskService.hasPendingTasks.value).toBe(true); + + sub.next(); + await new Promise((r) => setTimeout(r, 3)); + observable.subscribe(); + sub.next(); + // second subscription unblocks + await new Promise((r) => setTimeout(r, 2)); + // still pending because third subscription delay not finished + expect(taskService.hasPendingTasks.value).toBe(true); + + // finishes third subscription + await new Promise((r) => setTimeout(r, 3)); + await expectAsync(appRef.whenStable()).toBeResolved(); + }); +}); diff --git a/packages/core/src/change_detection/scheduling/zoneless_scheduling_impl.ts b/packages/core/src/change_detection/scheduling/zoneless_scheduling_impl.ts index 53c861941cd7..574a3f3bf382 100644 --- a/packages/core/src/change_detection/scheduling/zoneless_scheduling_impl.ts +++ b/packages/core/src/change_detection/scheduling/zoneless_scheduling_impl.ts @@ -219,7 +219,7 @@ export class ChangeDetectionSchedulerImpl implements ChangeDetectionScheduler { } private shouldScheduleTick(force: boolean): boolean { - if (this.disableScheduling && !force) { + if ((this.disableScheduling && !force) || this.appRef.destroyed) { return false; } // already scheduled or running