Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion goldens/public-api/core/index.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1712,7 +1712,7 @@ export type ResourceSnapshot<T> = {
export type ResourceStatus = 'idle' | 'error' | 'loading' | 'reloading' | 'resolved' | 'local';

// @public
export type ResourceStreamingLoader<T, R> = (param: ResourceLoaderParams<R>) => PromiseLike<Signal<ResourceStreamItem<T>>>;
export type ResourceStreamingLoader<T, R> = (param: ResourceLoaderParams<R>) => Signal<ResourceStreamItem<T>> | PromiseLike<Signal<ResourceStreamItem<T>>> | undefined;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we have a helper type in core somewhere that's type Awaitable<T> = T | PromiseLike<T>;? I think this could be useful elsewhere as well. It's too bad there isn't a built in like there is "Awaited"


// @public (undocumented)
export type ResourceStreamItem<T> = {
Expand Down
4 changes: 4 additions & 0 deletions packages/core/rxjs-interop/src/rx_resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ export function rxResource<T, R>(opts: RxResourceOptions<T, R>): ResourceRef<T |
},
});

if (resolve === undefined) {
return stream;
}

return promise;
},
});
Expand Down
72 changes: 67 additions & 5 deletions packages/core/rxjs-interop/test/rx_resource_spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,25 @@
* found in the LICENSE file at https://angular.dev/license
*/

import {of, Observable, BehaviorSubject, throwError} from 'rxjs';
import {TestBed} from '../../testing';
import {timeout} from '@angular/private/testing';
import {BehaviorSubject, EMPTY, Observable, of, Subscriber, throwError} from 'rxjs';
import {ApplicationRef, Injector, signal} from '../../src/core';
import {TestBed} from '../../testing';
import {rxResource} from '../src';

describe('rxResource()', () => {
it('should fetch data using an observable loader', async () => {
const injector = TestBed.inject(Injector);
const appRef = TestBed.inject(ApplicationRef);
const res = rxResource({
stream: () => of(1),
injector,
});
await appRef.whenStable();

TestBed.tick();

// Value should be available synchronously (because the observable emits synchronously)
expect(res.value()).toBe(1);
expect(res.status()).toBe('resolved');
});

it('should cancel the fetch when a new request comes in', async () => {
Expand All @@ -30,7 +33,7 @@ describe('rxResource()', () => {
const request = signal(1);
let unsub = false;
let lastSeenRequest: number = 0;
rxResource({
const res = rxResource({
params: request,
stream: ({params: request}) => {
lastSeenRequest = request;
Expand All @@ -48,12 +51,23 @@ describe('rxResource()', () => {
injector,
});

// The stream isn't evaluated eagerly. We have to wait for the effect to run to see the first request.
expect(lastSeenRequest).toBe(0);

TestBed.tick();

expect(res.status()).toBe('loading');
expect(lastSeenRequest).toBe(1);

// Wait for the resource to reach loading state.
await waitFor(() => lastSeenRequest === 1);

// Setting request = 2 should cancel request = 1
request.set(2);
// The stream is updated asynchronously because we're waiting for the effect to fire.
expect(lastSeenRequest).toBe(1);
await appRef.whenStable();
expect(lastSeenRequest).toBe(2);
expect(unsub).toBe(true);
});

Expand Down Expand Up @@ -114,6 +128,54 @@ describe('rxResource()', () => {

expect(() => rxRes.value()).toThrowError(/This is a FooError/);
});

it('should reuse observable', async () => {
let count = 0;
let sub!: Subscriber<number>;
const obs = new Observable<number>((s) => {
sub = s;
count++;
});

const res = rxResource({
stream: () => obs,
injector: TestBed.inject(Injector),
});
// Hasn't subscribed to the observable yet
expect(count).toBe(0);

TestBed.tick();
expect(count).toBe(1);
expect(res.status()).toBe('loading');
sub.next(1);
await new Promise((resolve) => setTimeout(resolve, 0));

expect(count).toBe(1);
});

it('should report error synchronously (after tick)', () => {
Comment thread
JeanMeche marked this conversation as resolved.
const injector = TestBed.inject(Injector);
const res = rxResource({
stream: () => EMPTY,
injector,
});
TestBed.tick();
expect(res.status()).toBe('error');
expect(res.error()).toBeInstanceOf(Error);
expect(() => res.value()).toThrowError(/Resource completed before producing a value/);
});

it('should report sync error synchronously (after tick) ', () => {
Comment thread
JeanMeche marked this conversation as resolved.
const injector = TestBed.inject(Injector);
const res = rxResource({
stream: () => throwError(() => new Error('bad news')),
injector,
});
TestBed.tick();
expect(res.status()).toBe('error');
expect(res.error()).toBeInstanceOf(Error);
expect(() => res.value()).toThrowError(/bad news/);
});
});

async function waitFor(fn: () => boolean): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/resource/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ export type ResourceLoader<T, R> = (param: ResourceLoaderParams<R>) => PromiseLi
*/
export type ResourceStreamingLoader<T, R> = (
param: ResourceLoaderParams<R>,
) => PromiseLike<Signal<ResourceStreamItem<T>>>;
) => Signal<ResourceStreamItem<T>> | PromiseLike<Signal<ResourceStreamItem<T>>> | undefined;

/**
* Options to the `resource` function, for creating a resource.
Expand Down
41 changes: 28 additions & 13 deletions packages/core/src/resource/resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
* found in the LICENSE file at https://angular.dev/license
*/

import {Signal, ValueEqualityFn} from '../render3/reactivity/api';
import {isSignal, Signal, ValueEqualityFn} from '../render3/reactivity/api';
import {computed} from '../render3/reactivity/computed';
import {effect, EffectRef} from '../render3/reactivity/effect';
import {signal, signalAsReadonlyFn, WritableSignal} from '../render3/reactivity/signal';
import {untracked} from '../render3/reactivity/untracked';
import {
Resource,
ResourceDependencyError,
ResourceLoaderParams,
ResourceOptions,
ResourceParamsStatus,
ResourceSnapshot,
Expand Down Expand Up @@ -422,28 +421,44 @@ export class ResourceImpl<T, R> extends BaseWritableResource<T> implements Resou
// The actual loading is run through `untracked` - only the request side of `resource` is
// reactive. This avoids any confusion with signals tracking or not tracking depending on
// which side of the `await` they are.
const stream = await untracked(() => {
const stream = untracked(() => {
return this.loaderFn({
params: extRequest.request as Exclude<R, undefined>,
abortSignal,
previous: {
status: previousStatus,
},
} as ResourceLoaderParams<R>);
});
});

// If this request has been aborted, or the current request no longer
// matches this load, then we should ignore this resolution.
if (abortSignal.aborted || untracked(this.extRequest) !== extRequest) {
return;
}
const shouldDiscard = () => abortSignal.aborted || untracked(this.extRequest) !== extRequest;

this.state.set({
extRequest,
status: 'resolved',
previousStatus: 'resolved',
stream,
});
if (isSignal(stream)) {
if (shouldDiscard()) {
return;
}

this.state.set({
extRequest,
status: 'resolved',
previousStatus: 'resolved',
stream,
});
} else {
const resolvedStream = await stream;
if (shouldDiscard()) {
return;
}

this.state.set({
extRequest,
status: 'resolved',
previousStatus: 'resolved',
stream: resolvedStream,
});
}
} catch (err) {
rethrowFatalErrors(err);
if (abortSignal.aborted || untracked(this.extRequest) !== extRequest) {
Expand Down
88 changes: 87 additions & 1 deletion packages/core/test/resource/resource_spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@

import {
ApplicationRef,
Component,
computed,
createEnvironmentInjector,
effect,
EnvironmentInjector,
Injector,
Input,
inputBinding,
resource,
ResourceRef,
ResourceStatus,
signal,
} from '../../src/core';
import {TestBed} from '../../testing';
import {promiseWithResolvers} from '../../src/util/promise_with_resolvers';
import {TestBed} from '../../testing';

abstract class MockBackend<T, R> {
protected pending = new Map<
Expand Down Expand Up @@ -744,17 +747,100 @@ describe('resource', () => {
});

it('should allow streaming', async () => {
const res = resource({
stream: () => signal({value: 'done'}),
injector: TestBed.inject(Injector),
});

// We rely an a synchronous tick to ensure that the stream is properly initialized, as it runs inside an effect.
TestBed.tick();

expect(res.status()).toBe('resolved');
expect(res.value()).toBe('done');
});

it('should allow streaming with param & sync stream', async () => {
const param = signal('a');
const res = resource({
params: param,
stream: ({params}) => signal({value: params + ' done'}),
injector: TestBed.inject(Injector),
});

// We rely an a synchronous tick to ensure that the stream is properly initialized, as it runs inside an effect.
TestBed.tick();

expect(res.status()).toBe('resolved');
expect(res.value()).toBe('a done');
});

it('should allow async streaming', async () => {
const appRef = TestBed.inject(ApplicationRef);
const res = resource({
stream: async () => signal({value: 'done'}),
injector: TestBed.inject(Injector),
});

expect(res.status()).toBe('loading');
TestBed.tick();
// We're still loading, the promise hasn't resolved yet.
expect(res.status()).toBe('loading');

await appRef.whenStable();
expect(res.status()).toBe('resolved');
expect(res.value()).toBe('done');
});

it('should allow streaming & cancelling with param & sync stream', async () => {
const appRef = TestBed.inject(ApplicationRef);
const param = signal('a');
let streamCount = 0;
const res = resource({
params: param,
stream: ({params}) => {
streamCount++;
return signal({value: params + ' done'});
},
injector: TestBed.inject(Injector),
});

// The stream is not evaluated eagerly, because we have to wait for init (in case we read inputs)
expect(streamCount).toBe(0);

TestBed.tick();

expect(streamCount).toBe(1);
expect(res.status()).toBe('resolved');
expect(res.value()).toBe('a done');
param.set('b');
await appRef.whenStable();
expect(streamCount).toBe(2);
expect(res.status()).toBe('resolved');
expect(res.value()).toBe('b done');
});

it('should allow stream from input()', async () => {
@Component({
selector: 'test',
template: `{{ res.value() }}`,
})
class TestComponent {
@Input() value = '';
res = resource({
params: () => this.value,
stream: ({params}) => signal({value: params + ' done'}),
});
}

const fixture = TestBed.createComponent(TestComponent, {
bindings: [inputBinding('value', signal('a'))],
});

TestBed.tick();

expect(fixture.componentInstance.res.value()).toEqual('a done');
});

it('should error via error()', async () => {
const appRef = TestBed.inject(ApplicationRef);
const res = resource({
Expand Down
Loading