Skip to content

Commit 5cdf6bc

Browse files
atscottdylhunn
authored andcommitted
refactor(core): Synchronously emit the current signal value in toObservable (#49894)
As described in #49681 (reply in thread), if an `Observable` created from a signal with `toObservable` is subscribed to in a template, it will initially have `null` as the value. Immediately after the template is done executing, effects are flushed and this results in the `AsyncPipe` getting a new value before the `checkNoChanges` pass, resulting in `ExpressionChanged` error. ``` template: '{{obs$ | async}}' ... obs$ = toObservable(signal(0)); ``` Instead, this commit updates the `toObservable` to synchronously emit the initial value to the Observable stream. Side note here: We don't exactly encourage this pattern. Instead of using `AsyncPipe`, the template should just read signals. PR Close #49894
1 parent b4531f1 commit 5cdf6bc

File tree

4 files changed

+87
-27
lines changed

4 files changed

+87
-27
lines changed

packages/core/rxjs-interop/src/to_observable.ts

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
* found in the LICENSE file at https://angular.io/license
77
*/
88

9-
import {assertInInjectionContext, effect, inject, Injector, Signal} from '@angular/core';
10-
import {Observable} from 'rxjs';
9+
import {assertInInjectionContext, DestroyRef, effect, EffectRef, inject, Injector, Signal, untracked} from '@angular/core';
10+
import {Observable, ReplaySubject} from 'rxjs';
1111

1212
/**
1313
* Options for `toObservable`.
@@ -38,20 +38,23 @@ export function toObservable<T>(
3838
): Observable<T> {
3939
!options?.injector && assertInInjectionContext(toObservable);
4040
const injector = options?.injector ?? inject(Injector);
41+
const subject = new ReplaySubject<T>(1);
4142

42-
// Creating a new `Observable` allows the creation of the effect to be lazy. This allows for all
43-
// references to `source` to be dropped if the `Observable` is fully unsubscribed and thrown away.
44-
return new Observable(observer => {
45-
const watcher = effect(() => {
46-
let value: T;
47-
try {
48-
value = source();
49-
} catch (err) {
50-
observer.error(err);
51-
return;
52-
}
53-
observer.next(value);
54-
}, {injector, manualCleanup: true, allowSignalWrites: true});
55-
return () => watcher.destroy();
43+
const watcher = effect(() => {
44+
let value: T;
45+
try {
46+
value = source();
47+
} catch (err) {
48+
untracked(() => subject.error(err));
49+
return;
50+
}
51+
untracked(() => subject.next(value));
52+
}, {injector, manualCleanup: true});
53+
54+
injector.get(DestroyRef).onDestroy(() => {
55+
watcher.destroy();
56+
subject.complete();
5657
});
58+
59+
return subject.asObservable();
5760
}

packages/core/rxjs-interop/test/to_observable_spec.ts

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66
* found in the LICENSE file at https://angular.io/license
77
*/
88

9-
import {Component, computed, Injector, signal} from '@angular/core';
9+
import {Component, computed, createEnvironmentInjector, EnvironmentInjector, Injector, Signal, signal} from '@angular/core';
1010
import {toObservable} from '@angular/core/rxjs-interop';
1111
import {ComponentFixture, TestBed} from '@angular/core/testing';
1212
import {take, toArray} from 'rxjs/operators';
1313

1414
describe('toObservable()', () => {
1515
let fixture!: ComponentFixture<unknown>;
16-
let injector!: Injector;
16+
let injector!: EnvironmentInjector;
1717

1818
@Component({
1919
template: '',
@@ -24,7 +24,7 @@ describe('toObservable()', () => {
2424

2525
beforeEach(() => {
2626
fixture = TestBed.createComponent(Cmp);
27-
injector = TestBed.inject(Injector);
27+
injector = TestBed.inject(EnvironmentInjector);
2828
});
2929

3030
function flushEffects(): void {
@@ -81,7 +81,7 @@ describe('toObservable()', () => {
8181
sub.unsubscribe();
8282
});
8383

84-
it('should not monitor the signal if the Observable is never subscribed', () => {
84+
it('monitors the signal even if the Observable is never subscribed', () => {
8585
let counterRead = false;
8686
const counter = computed(() => {
8787
counterRead = true;
@@ -93,12 +93,12 @@ describe('toObservable()', () => {
9393
// Simply creating the Observable shouldn't trigger a signal read.
9494
expect(counterRead).toBeFalse();
9595

96-
// Nor should the signal be read after effects have run.
96+
// The signal is read after effects have run.
9797
flushEffects();
98-
expect(counterRead).toBeFalse();
98+
expect(counterRead).toBeTrue();
9999
});
100100

101-
it('should not monitor the signal if the Observable has no active subscribers', () => {
101+
it('should still monitor the signal if the Observable has no active subscribers', () => {
102102
const counter = signal(0);
103103

104104
// Tracks how many reads of `counter()` there have been.
@@ -122,15 +122,52 @@ describe('toObservable()', () => {
122122
flushEffects();
123123
expect(readCount).toBe(2);
124124

125-
// Tear down the only subscription and hence the effect that's monitoring the signal.
125+
// Tear down the only subscription.
126126
sub.unsubscribe();
127127

128-
// Now, setting the signal shouldn't trigger any additional reads, as the Observable is no
129-
// longer interested in its value.
128+
// Now, setting the signal still triggers additional reads
129+
counter.set(2);
130+
flushEffects();
131+
expect(readCount).toBe(3);
132+
});
133+
134+
it('stops monitoring the signal once injector is destroyed', () => {
135+
const counter = signal(0);
136+
137+
// Tracks how many reads of `counter()` there have been.
138+
let readCount = 0;
139+
const trackedCounter = computed(() => {
140+
readCount++;
141+
return counter();
142+
});
143+
144+
const childInjector = createEnvironmentInjector([], injector);
145+
toObservable(trackedCounter, {injector: childInjector});
146+
147+
expect(readCount).toBe(0);
130148

149+
flushEffects();
150+
expect(readCount).toBe(1);
151+
152+
// Now, setting the signal shouldn't trigger any additional reads, as the Injector was destroyed
153+
childInjector.destroy();
131154
counter.set(2);
132155
flushEffects();
156+
expect(readCount).toBe(1);
157+
});
133158

134-
expect(readCount).toBe(2);
159+
it('does not track downstream signal reads in the effect', () => {
160+
const counter = signal(0);
161+
const emits = signal(0);
162+
toObservable(counter, {injector}).subscribe(() => {
163+
// Read emits. If we are still tracked in the effect, this will cause an infinite loop by
164+
// triggering the effect again.
165+
emits();
166+
emits.update(v => v + 1);
167+
});
168+
flushEffects();
169+
expect(emits()).toBe(1);
170+
flushEffects();
171+
expect(emits()).toBe(1);
135172
});
136173
});

packages/core/test/render3/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ ts_library(
2525
"//packages/common",
2626
"//packages/compiler",
2727
"//packages/core",
28+
"//packages/core/rxjs-interop",
2829
"//packages/core/src/di/interface",
2930
"//packages/core/src/interface",
3031
"//packages/core/src/util",

packages/core/test/render3/reactivity_spec.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
* found in the LICENSE file at https://angular.io/license
77
*/
88

9+
import {AsyncPipe} from '@angular/common';
910
import {AfterViewInit, Component, ContentChildren, createComponent, destroyPlatform, effect, EnvironmentInjector, inject, Injector, Input, NgZone, OnChanges, QueryList, signal, SimpleChanges, ViewChild} from '@angular/core';
11+
import {toObservable} from '@angular/core/rxjs-interop';
1012
import {TestBed} from '@angular/core/testing';
1113
import {bootstrapApplication} from '@angular/platform-browser';
1214
import {withBody} from '@angular/private/testing';
@@ -386,4 +388,21 @@ describe('effects', () => {
386388
fixture.detectChanges();
387389
expect(fixture.componentInstance.noOfCmpCreated).toBe(1);
388390
});
391+
392+
it('should allow toObservable subscription in template (with async pipe)', () => {
393+
@Component({
394+
selector: 'test-cmp',
395+
standalone: true,
396+
imports: [AsyncPipe],
397+
template: '{{counter$ | async}}',
398+
})
399+
class Cmp {
400+
counter$ = toObservable(signal(0));
401+
}
402+
403+
const fixture = TestBed.createComponent(Cmp);
404+
expect(() => fixture.detectChanges(true)).not.toThrow();
405+
fixture.detectChanges();
406+
expect(fixture.nativeElement.textContent).toBe('0');
407+
});
389408
});

0 commit comments

Comments
 (0)