-
Notifications
You must be signed in to change notification settings - Fork 27.2k
feat(core): Add rxjs operator prevent app stability until an event #56533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| /** | ||
| * @license | ||
| * Copyright Google LLC All Rights Reserved. | ||
| * | ||
| * Use of this source code is governed by an MIT-style license that can be | ||
| * found in the LICENSE file at https://angular.io/license | ||
| */ | ||
|
|
||
| import {assertInInjectionContext, PendingTasks, inject, Injector} from '@angular/core'; | ||
| import {MonoTypeOperatorFunction, Observable} from 'rxjs'; | ||
|
|
||
| /** | ||
| * Operator which makes the application unstable until the observable emits, complets, errors, or is unsubscribed. | ||
| * | ||
| * Use this operator in observables whose subscriptions are important for rendering and should be included in SSR serialization. | ||
| * | ||
| * @param injector The `Injector` to use during creation. If this is not provided, the current injection context will be used instead (via `inject`). | ||
| * | ||
| * @experimental | ||
| */ | ||
| export function pendingUntilEvent<T>(injector?: Injector): MonoTypeOperatorFunction<T> { | ||
| if (injector === undefined) { | ||
| assertInInjectionContext(pendingUntilEvent); | ||
| injector = inject(Injector); | ||
| } | ||
| const taskService = injector.get(PendingTasks); | ||
|
|
||
| return (sourceObservable) => { | ||
| return new Observable<T>((originalSubscriber) => { | ||
| // create a new task on subscription | ||
| const removeTask = taskService.add(); | ||
|
|
||
| let cleanedUp = false; | ||
| function cleanupTask() { | ||
| if (cleanedUp) { | ||
| return; | ||
| } | ||
|
|
||
| removeTask(); | ||
| cleanedUp = true; | ||
| } | ||
|
|
||
| const innerSubscription = sourceObservable.subscribe({ | ||
| next: (v) => { | ||
| originalSubscriber.next(v); | ||
|
atscott marked this conversation as resolved.
Outdated
|
||
| cleanupTask(); | ||
| }, | ||
| complete: () => { | ||
| originalSubscriber.complete(); | ||
| cleanupTask(); | ||
| }, | ||
| error: (e) => { | ||
| originalSubscriber.error(e); | ||
| cleanupTask(); | ||
| }, | ||
| }); | ||
| innerSubscription.add(() => { | ||
| originalSubscriber.unsubscribe(); | ||
| cleanupTask(); | ||
| }); | ||
| return innerSubscription; | ||
| }); | ||
| }; | ||
| } | ||
249 changes: 249 additions & 0 deletions
249
packages/core/rxjs-interop/test/pending_until_event_spec.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,249 @@ | ||
| /** | ||
| * @license | ||
| * Copyright Google LLC All Rights Reserved. | ||
| * | ||
| * Use of this source code is governed by an MIT-style license that can be | ||
| * found in the LICENSE file at https://angular.io/license | ||
| */ | ||
|
|
||
| import {EnvironmentInjector, ɵPendingTasks as PendingTasks, ApplicationRef} from '@angular/core'; | ||
| import { | ||
| BehaviorSubject, | ||
| EMPTY, | ||
| Subject, | ||
| catchError, | ||
| delay, | ||
| config, | ||
| finalize, | ||
| firstValueFrom, | ||
| interval, | ||
| of, | ||
| } from 'rxjs'; | ||
|
|
||
| import {pendingUntilEvent} from '@angular/core/rxjs-interop'; | ||
| import {TestBed} from '@angular/core/testing'; | ||
|
|
||
| describe('pendingUntilEvent', () => { | ||
| let taskService: PendingTasks; | ||
| let injector: EnvironmentInjector; | ||
| let appRef: ApplicationRef; | ||
| beforeEach(() => { | ||
| taskService = TestBed.inject(PendingTasks); | ||
| injector = TestBed.inject(EnvironmentInjector); | ||
| appRef = TestBed.inject(ApplicationRef); | ||
| }); | ||
|
|
||
| it('should not block stability until subscription', async () => { | ||
| const originalSource = new BehaviorSubject(0); | ||
| const delayedSource = originalSource.pipe(delay(5), pendingUntilEvent(injector)); | ||
| expect(taskService.hasPendingTasks.value).toEqual(false); | ||
|
|
||
| const emitPromise = firstValueFrom(delayedSource); | ||
| expect(taskService.hasPendingTasks.value).toEqual(true); | ||
|
|
||
| await expectAsync(emitPromise).toBeResolvedTo(0); | ||
| await expectAsync(appRef.whenStable()).toBeResolved(); | ||
| }); | ||
|
|
||
| it('runs the subscription body before stability', async () => { | ||
| const source = of(1).pipe(pendingUntilEvent(injector)); | ||
|
|
||
| // stable before subscription | ||
| expect(taskService.hasPendingTasks.value).toEqual(false); | ||
| source.subscribe(() => { | ||
| // unstable within synchronous subscription body | ||
| expect(taskService.hasPendingTasks.value).toBe(true); | ||
| }); | ||
| // stable after above synchronous subscription execution | ||
| await expectAsync(appRef.whenStable()).toBeResolved(); | ||
| }); | ||
|
|
||
| it('only blocks stability until first emit', async () => { | ||
| const intervalSource = interval(5).pipe(pendingUntilEvent(injector)); | ||
| expect(taskService.hasPendingTasks.value).toEqual(false); | ||
|
|
||
| await new Promise<void>(async (resolve) => { | ||
| const subscription = intervalSource.subscribe(async (v) => { | ||
| if (v === 0) { | ||
| expect(taskService.hasPendingTasks.value).toBe(true); | ||
| } else { | ||
| await expectAsync(appRef.whenStable()).toBeResolved(); | ||
| } | ||
| if (v === 3) { | ||
| subscription.unsubscribe(); | ||
| resolve(); | ||
| } | ||
| }); | ||
| expect(taskService.hasPendingTasks.value).toBe(true); | ||
| }); | ||
| }); | ||
|
|
||
| it('should unblock stability on complete (but no emit)', async () => { | ||
| const sub = new Subject(); | ||
| sub.asObservable().pipe(pendingUntilEvent(injector)).subscribe(); | ||
| expect(taskService.hasPendingTasks.value).toBe(true); | ||
| sub.complete(); | ||
| await expectAsync(appRef.whenStable()).toBeResolved(); | ||
| }); | ||
|
|
||
| it('should unblock stability on unsubscribe before emit', async () => { | ||
| const sub = new Subject(); | ||
| const subscription = sub.asObservable().pipe(pendingUntilEvent(injector)).subscribe(); | ||
| expect(taskService.hasPendingTasks.value).toBe(true); | ||
| subscription.unsubscribe(); | ||
| await expectAsync(appRef.whenStable()).toBeResolved(); | ||
| }); | ||
|
|
||
| // Note that we cannot execute `finalize` operators that appear _after_ ours before | ||
| // removing the pending task. We need to register the finalize operation on the subscription | ||
| // as soon as the operator executes. A `finalize` operator later on in the stream will | ||
| // be appear later in the finalizers list. These finalizers are both registered and executed | ||
| // serially. We cannot execute our finalizer after other finalizers in the pipeline. | ||
| it('should execute user finalize body before stability (as long as it appears first)', async () => { | ||
| const sub = new Subject(); | ||
| let finalizeExecuted = false; | ||
| const subscription = sub | ||
| .asObservable() | ||
| .pipe( | ||
| finalize(() => { | ||
| finalizeExecuted = true; | ||
| expect(taskService.hasPendingTasks.value).toBe(true); | ||
| }), | ||
| pendingUntilEvent(injector), | ||
| ) | ||
| .subscribe(); | ||
| expect(taskService.hasPendingTasks.value).toBe(true); | ||
| subscription.unsubscribe(); | ||
| await expectAsync(appRef.whenStable()).toBeResolved(); | ||
| expect(finalizeExecuted).toBe(true); | ||
| }); | ||
|
|
||
| it('should not throw if application is destroyed before emit', async () => { | ||
| const sub = new Subject<void>(); | ||
| sub.asObservable().pipe(pendingUntilEvent(injector)).subscribe(); | ||
| expect(taskService.hasPendingTasks.value).toBe(true); | ||
| TestBed.resetTestingModule(); | ||
| await expectAsync(appRef.whenStable()).toBeResolved(); | ||
| sub.next(); | ||
| await expectAsync(appRef.whenStable()).toBeResolved(); | ||
| }); | ||
|
|
||
| it('should unblock stability on error before emit', async () => { | ||
| const sub = new Subject<void>(); | ||
| sub | ||
| .asObservable() | ||
| .pipe( | ||
| pendingUntilEvent(injector), | ||
| catchError(() => EMPTY), | ||
| ) | ||
| .subscribe(); | ||
| expect(taskService.hasPendingTasks.value).toBe(true); | ||
| sub.error(new Error('error in pipe')); | ||
| await expectAsync(appRef.whenStable()).toBeResolved(); | ||
| sub.next(); | ||
| await expectAsync(appRef.whenStable()).toBeResolved(); | ||
| }); | ||
|
|
||
| it('should unblock stability on error in subscription', async () => { | ||
| function nextUncaughtError() { | ||
| return new Promise((resolve) => { | ||
| config.onUnhandledError = (e) => { | ||
| config.onUnhandledError = null; | ||
| resolve(e); | ||
| }; | ||
| }); | ||
| } | ||
| const sub = new Subject<void>(); | ||
| sub | ||
| .asObservable() | ||
| .pipe(pendingUntilEvent(injector)) | ||
| .subscribe({ | ||
| next: () => { | ||
| throw new Error('oh noes'); | ||
| }, | ||
| }); | ||
| expect(taskService.hasPendingTasks.value).toBe(true); | ||
| const errorPromise = nextUncaughtError(); | ||
| sub.next(); | ||
| await expectAsync(errorPromise).toBeResolved(); | ||
| await expectAsync(appRef.whenStable()).toBeResolved(); | ||
|
|
||
| const errorPromise2 = nextUncaughtError(); | ||
| sub.next(); | ||
| await expectAsync(appRef.whenStable()).toBeResolved(); | ||
| await expectAsync(errorPromise2).toBeResolved(); | ||
| }); | ||
|
|
||
| it('finalize and complete are delivered correctly', () => { | ||
| const sub = new Subject<void>(); | ||
| let log: string[] = []; | ||
| const obs1 = sub.asObservable().pipe( | ||
| pendingUntilEvent(injector), | ||
| finalize(() => { | ||
| log.push('finalize'); | ||
| }), | ||
| ); | ||
|
|
||
| // complete after subscription | ||
| obs1.subscribe({ | ||
| complete: () => { | ||
| log.push('complete'); | ||
| }, | ||
| }); | ||
| sub.complete(); | ||
| expect(log).toEqual(['complete', 'finalize']); | ||
|
|
||
| // already completed before subscription | ||
| log.length = 0; | ||
| obs1.subscribe({ | ||
| complete: () => { | ||
| log.push('complete'); | ||
| }, | ||
| }); | ||
| expect(log).toEqual(['complete', 'finalize']); | ||
|
|
||
| log.length = 0; | ||
| new Subject() | ||
| .asObservable() | ||
| .pipe( | ||
| pendingUntilEvent(injector), | ||
| finalize(() => { | ||
| log.push('finalize'); | ||
| }), | ||
| ) | ||
| .subscribe({ | ||
| complete: () => { | ||
| log.push('complete'); | ||
| }, | ||
| }) | ||
| .unsubscribe(); | ||
| expect(log).toEqual(['finalize']); | ||
| }); | ||
|
|
||
| it('should block stability for each new subscriber', async () => { | ||
| const sub = new Subject<void>(); | ||
| const observable = sub.asObservable().pipe(delay(5), pendingUntilEvent(injector)); | ||
|
|
||
| observable.subscribe(); | ||
| expect(taskService.hasPendingTasks.value).toBe(true); | ||
| sub.next(); | ||
| observable.subscribe(); | ||
| // first subscription unblocks | ||
| await new Promise((r) => setTimeout(r, 5)); | ||
| // still pending because the other subscribed after the emit | ||
| expect(taskService.hasPendingTasks.value).toBe(true); | ||
|
|
||
| sub.next(); | ||
| await new Promise((r) => setTimeout(r, 3)); | ||
| observable.subscribe(); | ||
| sub.next(); | ||
| // second subscription unblocks | ||
| await new Promise((r) => setTimeout(r, 2)); | ||
| // still pending because third subscription delay not finished | ||
| expect(taskService.hasPendingTasks.value).toBe(true); | ||
|
|
||
| // finishes third subscription | ||
| await new Promise((r) => setTimeout(r, 3)); | ||
| await expectAsync(appRef.whenStable()).toBeResolved(); | ||
| }); | ||
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if an app never stabilizes (because of some bug)? This will never complete I guess? Should we offer a timeout for this one, or just document it for the devs to keep that in mind too?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming you mean what happens when no event (emit, error, unsubscribe, or complete) occurs in the subscription? This operator affects stability rather than app stability affecting the task completion.
I don't believe a timeout is something that should be configured in this operator. If a timeout for the task is desirable, that can be done in a different operator, i.e. the builtin timeout.
edit to clarify: Leaving this open because I do think this is a reasonable request and worth considering.