Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion goldens/public-api/core/index.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ export interface BaseResourceOptions<T, R> {
equal?: ValueEqualityFn<T>;
injector?: Injector;
params?: (ctx: ResourceParamsContext) => R;
transferCacheKey?: (params: R) => StateKey<T>;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Does this implementation result in duplicate transfer state entries (HTTP Client andResource)?
  • Consider id maybe instead of transferCacheKey?
  • Manual key management places a heavy burden on the developer to ensure global uniqueness. To improve the DX, we should explore internal ID generation instead of it being part of the API or something similar to React’s useId or Vue’s composition helpers. Relying on users to manual create unique cache-key feels like a "foot-gun" especially in big organizations; we should ideally handle key construction internally to prevent collisions, this was one of the reasons that we always avoided providing such option in the HTTP transfer cache.

}

// @public
Expand Down Expand Up @@ -1725,7 +1726,7 @@ export type ResourceSnapshot<T> = {
export type ResourceStatus = 'idle' | 'error' | 'loading' | 'reloading' | 'resolved' | 'local';

// @public
export type ResourceStreamingLoader<T, R> = (param: ResourceLoaderParams<R>) => PromiseLike<Signal<ResourceStreamItem<T>>>;
export type ResourceStreamingLoader<T, R> = (param: ResourceLoaderParams<R>) => Signal<ResourceStreamItem<T>> | PromiseLike<Signal<ResourceStreamItem<T>>> | undefined;

// @public (undocumented)
export type ResourceStreamItem<T> = {
Expand Down
1 change: 1 addition & 0 deletions packages/common/http/src/resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ class HttpResourceImpl<T>
equal,
debugName,
injector,
undefined,
getInitialStream,
);
this.client = injector.get(HttpClient);
Expand Down
13 changes: 8 additions & 5 deletions packages/core/rxjs-interop/src/rx_resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@
* found in the LICENSE file at https://angular.dev/license
*/

import {Observable, Subscription} from 'rxjs';
import {
assertInInjectionContext,
BaseResourceOptions,
resource,
ResourceLoaderParams,
ResourceRef,
ResourceStreamItem,
Signal,
signal,
BaseResourceOptions,
ɵRuntimeError,
ɵRuntimeErrorCode,
ResourceStreamItem,
} from '../../src/core';
import {Observable, Subscription} from 'rxjs';
import {encapsulateResourceError} from '../../src/resource/resource';

/**
Expand Down Expand Up @@ -75,8 +75,7 @@ export function rxResource<T, R>(opts: RxResourceOptions<T, R>): ResourceRef<T |
resolve = undefined;
}

// TODO(alxhub): remove after g3 updated to rename loader -> stream
const streamFn = opts.stream ?? (opts as {loader?: RxResourceOptions<T, R>['stream']}).loader;
const streamFn = opts.stream;
if (streamFn === undefined) {
throw new ɵRuntimeError(
ɵRuntimeErrorCode.MUST_PROVIDE_STREAM_OPTION,
Expand All @@ -103,6 +102,10 @@ export function rxResource<T, R>(opts: RxResourceOptions<T, R>): ResourceRef<T |
},
});

if (resolve === undefined) {
return stream;
}

return promise;
},
});
Expand Down
172 changes: 167 additions & 5 deletions packages/core/rxjs-interop/test/rx_resource_spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,24 @@
* found in the LICENSE file at https://angular.dev/license
*/

import {of, Observable, BehaviorSubject, throwError} from 'rxjs';
import {BehaviorSubject, EMPTY, Observable, of, Subscriber, throwError} from 'rxjs';
import {ApplicationRef, Injector, makeStateKey, signal, TransferState} from '../../src/core';
import {TestBed} from '../../testing';
import {ApplicationRef, Injector, signal} from '../../src/core';
import {rxResource} from '../src';

describe('rxResource()', () => {
it('should fetch data using an observable loader', async () => {
const injector = TestBed.inject(Injector);
const appRef = TestBed.inject(ApplicationRef);
const res = rxResource({
stream: () => of(1),
injector,
});
await appRef.whenStable();

TestBed.tick();

// Value should be available synchronously (because the observable emits synchronously)
expect(res.value()).toBe(1);
expect(res.status()).toBe('resolved');
});

it('should cancel the fetch when a new request comes in', async () => {
Expand All @@ -29,7 +32,7 @@ describe('rxResource()', () => {
const request = signal(1);
let unsub = false;
let lastSeenRequest: number = 0;
rxResource({
const res = rxResource({
params: request,
stream: ({params: request}) => {
lastSeenRequest = request;
Expand All @@ -47,12 +50,23 @@ describe('rxResource()', () => {
injector,
});

// The stream isn't evaluated eagerly. We have to wait for the effect to run to see the first request.
expect(lastSeenRequest).toBe(0);

TestBed.tick();

expect(res.status()).toBe('loading');
expect(lastSeenRequest).toBe(1);

// Wait for the resource to reach loading state.
await waitFor(() => lastSeenRequest === 1);

// Setting request = 2 should cancel request = 1
request.set(2);
// The stream is updated asynchronously because we're waiting for the effect to fire.
expect(lastSeenRequest).toBe(1);
await appRef.whenStable();
expect(lastSeenRequest).toBe(2);
expect(unsub).toBe(true);
});

Expand Down Expand Up @@ -113,10 +127,158 @@ describe('rxResource()', () => {

expect(() => rxRes.value()).toThrowError(/This is a FooError/);
});

it('should reuse observable', async () => {
let count = 0;
let sub!: Subscriber<number>;
const obs = new Observable<number>((s) => {
sub = s;
count++;
});

const res = rxResource({
stream: () => obs,
injector: TestBed.inject(Injector),
});
// Hasn't subscribed to the observable yet
expect(count).toBe(0);

TestBed.tick();
expect(count).toBe(1);
expect(res.status()).toBe('loading');
sub.next(1);
await new Promise((resolve) => setTimeout(resolve, 0));

expect(count).toBe(1);
});

it('should report error synchronously (after tick)', () => {
const injector = TestBed.inject(Injector);
const res = rxResource({
stream: () => EMPTY,
injector,
});
TestBed.tick();
expect(res.status()).toBe('error');
expect(res.error()).toBeInstanceOf(Error);
expect(() => res.value()).toThrowError(/Resource completed before producing a value/);
});

it('should report sync error synchronously (after tick) ', () => {
const injector = TestBed.inject(Injector);
const res = rxResource({
stream: () => throwError(() => new Error('bad news')),
injector,
});
TestBed.tick();
expect(res.status()).toBe('error');
expect(res.error()).toBeInstanceOf(Error);
expect(() => res.value()).toThrowError(/bad news/);
});

describe('with TransferState', () => {
let transferState: TransferState;

beforeEach(() => {
TestBed.configureTestingModule({providers: [TransferState]});
transferState = TestBed.inject(TransferState);
});

afterEach(() => {
(globalThis as any).ngServerMode = undefined;
});

it('should read from TransferState if a key is present', async () => {
const key = makeStateKey<number>('test-key');
transferState.set(key, 123);

const injector = TestBed.inject(Injector);
const testResource = rxResource({
stream: () => of(456),
transferCacheKey: () => key,
injector,
});

// Should be synchronously resolved from cache
expect(testResource.status()).toBe('resolved');
expect(testResource.value()).toBe(123);

// Should prevent loader from running
await flushMicrotasks();
expect(testResource.value()).toBe(123);
});

it('should write to TransferState on server when resolved (sync)', async () => {
(globalThis as any).ngServerMode = true;
const key = makeStateKey<number>('server-key');

const injector = TestBed.inject(Injector);
const testResource = rxResource({
stream: () => of(789),
transferCacheKey: () => key,
injector,
});

expect(testResource.status()).toBe('loading');

await flushMicrotasks();

expect(testResource.status()).toBe('resolved');
expect(testResource.value()).toBe(789);
expect(transferState.get(key, null!)).toBe(789);
});

it('should write to TransferState on server when resolved (async)', async () => {
(globalThis as any).ngServerMode = true;
const key = makeStateKey<number>('server-async-key');

const injector = TestBed.inject(Injector);
const testResource = rxResource({
stream: () =>
new Observable<number>((sub) => {
Promise.resolve().then(() => {
sub.next(101112);
sub.complete();
});
}),
transferCacheKey: () => key,
injector,
});

expect(testResource.status()).toBe('loading');

await waitFor(() => testResource.status() === 'resolved');

expect(testResource.value()).toBe(101112);
expect(transferState.get(key, null!)).toBe(101112);
});

it('should not write to TransferState on client when resolved', async () => {
(globalThis as any).ngServerMode = false;
const key = makeStateKey<number>('client-key');

const injector = TestBed.inject(Injector);
const testResource = rxResource({
stream: () => of(131415),
transferCacheKey: () => key,
injector,
});

await flushMicrotasks();

expect(testResource.status()).toBe('resolved');
expect(testResource.value()).toBe(131415);
expect(transferState.hasKey(key)).toBeFalse();
});
});
});

async function waitFor(fn: () => boolean): Promise<void> {
while (!fn()) {
await new Promise((resolve) => setTimeout(resolve, 1));
}
}

function flushMicrotasks(): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, 0));
}
8 changes: 7 additions & 1 deletion packages/core/src/resource/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import {Injector} from '../di/injector';
import {Signal, ValueEqualityFn} from '../render3/reactivity/api';
import {WritableSignal} from '../render3/reactivity/signal';
import {StateKey} from '../transfer_state';

/** Error thrown when a `Resource` dependency of another resource errors. */
export class ResourceDependencyError extends Error {
Expand Down Expand Up @@ -200,7 +201,7 @@ export type ResourceLoader<T, R> = (param: ResourceLoaderParams<R>) => PromiseLi
*/
export type ResourceStreamingLoader<T, R> = (
param: ResourceLoaderParams<R>,
) => PromiseLike<Signal<ResourceStreamItem<T>>>;
) => Signal<ResourceStreamItem<T>> | PromiseLike<Signal<ResourceStreamItem<T>>> | undefined;

/**
* Options to the `resource` function, for creating a resource.
Expand Down Expand Up @@ -231,6 +232,11 @@ export interface BaseResourceOptions<T, R> {
* Overrides the `Injector` used by `resource`.
*/
injector?: Injector;

/**
* The transfer cache key used to cache the resource data in the `TransferState` during server-side rendering and to retrieve it on the client side.
*/
transferCacheKey?: (params: R) => StateKey<T>;
}

/**
Expand Down
Loading