Skip to content

Commit be4e396

Browse files
committed
fix(webapp): harden the realtime runs backend
Addresses review feedback on the new backend: - skip cache eviction when updating an existing key at capacity - treat a concurrency limit of 0 as valid (enforce it, not a 500) - gate subscribeToRunChanges behind the enable switch - keep protocol-reserved columns in the hydration projection - re-clamp a handle-recovered createdAt to the max-age floor - bulk-hydrate the shadow comparator instead of per-run reads - log only run id and column on divergence, never raw cell values
1 parent 88e176a commit be4e396

10 files changed

Lines changed: 122 additions & 8 deletions

apps/webapp/app/services/realtime/boundedTtlCache.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ export class BoundedTtlCache<V> {
3434
}
3535

3636
set(key: string, value: V): void {
37-
if (this.#entries.size >= this.maxEntries) {
37+
// Only run capacity eviction when inserting a NEW key — updating an existing key
38+
// doesn't grow the map, so it must never drop an unrelated live entry.
39+
if (!this.#entries.has(key) && this.#entries.size >= this.maxEntries) {
3840
const now = Date.now();
3941
for (const [key, entry] of this.#entries) {
4042
if (entry.expiresAt <= now) {

apps/webapp/app/services/realtime/notifierRealtimeClient.server.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,15 @@ export class NotifierRealtimeClient implements RealtimeStreamClient {
238238
}
239239

240240
// Recover the pinned window from the handle so the lower bound never drifts.
241+
// Re-clamp the recovered value to the max-age floor so a stale or crafted handle
242+
// can't widen the lookback past the configured ceiling.
243+
const recoveredMs = this.#filterMsFromHandle(handle);
241244
const filter: RunSetFilter = {
242245
tags,
243246
createdAtAfter: new Date(
244-
this.#filterMsFromHandle(handle) ?? this.#computeCreatedAtFilter(params.createdAt).getTime()
247+
recoveredMs !== undefined
248+
? this.#clampCreatedAtFloor(recoveredMs)
249+
: this.#computeCreatedAtFilter(params.createdAt).getTime()
245250
),
246251
};
247252

@@ -573,6 +578,13 @@ export class NotifierRealtimeClient implements RealtimeStreamClient {
573578
return bucket > 0 ? Math.floor(ms / bucket) * bucket : ms;
574579
}
575580

581+
/** Clamp a handle-recovered createdAt lower bound up to the max-age floor (so a
582+
* stale or crafted handle can't widen the window past the ceiling), then re-bucket. */
583+
#clampCreatedAtFloor(ms: number): number {
584+
const floorMs = Date.now() - this.options.maximumCreatedAtFilterAgeMs;
585+
return this.#bucketCreatedAtMs(Math.max(ms, floorMs));
586+
}
587+
576588
#mintListHandle(createdAtFilterMs: number): string {
577589
// Pins the createdAt threshold in the opaque handle so live polls reuse the
578590
// same lower bound even on a working-set cache miss.
@@ -615,7 +627,7 @@ export class NotifierRealtimeClient implements RealtimeStreamClient {
615627
DEFAULT_CONCURRENCY_LIMIT
616628
);
617629

618-
if (!concurrencyLimit) {
630+
if (concurrencyLimit == null) {
619631
logger.error("[notifierRealtimeClient] Failed to get concurrency limit", {
620632
organizationId: environment.organizationId,
621633
});

apps/webapp/app/services/realtime/runChangeNotifierInstance.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,8 @@ export function publishManyRunChanged(inputs: RunChangeInput[]): void {
6969

7070
/** Subscribe to the next change for a run via the shared subscriber. */
7171
export function subscribeToRunChanges(runId: string): RunChangeSubscription {
72+
if (!notifierEnabled) {
73+
throw new Error("Run change notifier is disabled");
74+
}
7275
return getRunChangeNotifier().subscribeToRunChanges(runId);
7376
}

apps/webapp/app/services/realtime/runReader.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { type Prisma, type PrismaClient } from "@trigger.dev/database";
22
import { BoundedTtlCache } from "./boundedTtlCache";
3-
import { type RealtimeRunRow } from "./electricStreamProtocol.server";
3+
import { RESERVED_COLUMNS, type RealtimeRunRow } from "./electricStreamProtocol.server";
44

55
/**
66
* RunReader — the pluggable read half of the notifier-backed realtime feed.
@@ -52,7 +52,7 @@ export const RUN_HYDRATOR_SELECT = {
5252
* `buildHydratorSelect`), so the replica doesn't ship large `payload`/`output`/
5353
* `metadata`/`error` columns the response will drop anyway.
5454
*/
55-
const ALWAYS_HYDRATED_COLUMNS = new Set<string>(["id", "updatedAt"]);
55+
const ALWAYS_HYDRATED_COLUMNS = new Set<string>(["id", "updatedAt", ...RESERVED_COLUMNS]);
5656

5757
/** Project `RUN_HYDRATOR_SELECT` down to the columns the client didn't skip (plus
5858
* the always-needed ones). An empty skip set returns the full select unchanged. */

apps/webapp/app/services/realtime/shadowCompare.server.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,21 @@ export class RealtimeShadowComparator {
9595
diffs: [],
9696
};
9797

98+
// Bulk-hydrate every emitted run in one query rather than a per-message round
99+
// trip, so shadow mode doesn't inflate the very replica load it's measuring.
100+
const emittedIds = changes
101+
.map((m) => m.value.id)
102+
.filter((id): id is string => typeof id === "string");
103+
const hydrated = await this.options.runReader.hydrateByIds(input.environment.id, emittedIds);
104+
const rowsById = new Map(hydrated.map((row) => [row.id, row]));
105+
98106
for (const message of changes) {
99107
const runId = message.value.id ?? undefined;
100108
if (!runId) {
101109
continue;
102110
}
103111

104-
const row = await this.options.runReader.getRunById(input.environment.id, runId);
112+
const row = rowsById.get(runId);
105113
if (!row) {
106114
// Run no longer readable (deleted / replica miss). Not a serialization divergence.
107115
outcome.serializationSkew++;

apps/webapp/app/services/realtime/shadowRealtimeClient.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,9 @@ export class ShadowRealtimeClient implements RealtimeStreamClient {
180180
membershipMatch: outcome.membershipMatch,
181181
missingInNotifier: outcome.missingInNotifier?.slice(0, 20),
182182
extraInNotifier: outcome.extraInNotifier?.slice(0, 20),
183-
diffs: outcome.diffs,
183+
// Log only which run/column diverged, never the raw cell values — they can
184+
// include run payload/output/metadata and must not leak into logs.
185+
diffs: outcome.diffs.map(({ runId, column }) => ({ runId, column })),
184186
});
185187
}
186188
}

apps/webapp/test/realtime/boundedTtlCache.test.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,17 @@ describe("BoundedTtlCache", () => {
2828
expect(cache.size).toBe(0);
2929
});
3030

31+
it("does not evict another entry when updating an existing key at capacity", () => {
32+
const cache = new BoundedTtlCache<number>(60_000, 2);
33+
cache.set("a", 1);
34+
cache.set("b", 2);
35+
// Updating an existing key doesn't grow the map, so it must not drop "b".
36+
cache.set("a", 11);
37+
expect(cache.get("a")).toBe(11);
38+
expect(cache.get("b")).toBe(2);
39+
expect(cache.size).toBe(2);
40+
});
41+
3142
it("drops the oldest entry when full of still-live entries", () => {
3243
const cache = new BoundedTtlCache<number>(60_000, 2);
3344
cache.set("a", 1);

apps/webapp/test/realtime/notifierRunSetCache.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,3 +171,57 @@ describe("NotifierRealtimeClient tag-list createdAt bucketing", () => {
171171
}
172172
});
173173
});
174+
175+
describe("NotifierRealtimeClient review fixes", () => {
176+
const ready = { changed: Promise.resolve(), unsubscribe() {} };
177+
const liveNotifier = { subscribeToRunChanges: () => ready, subscribeToEnvChanges: () => ready };
178+
179+
it("clamps a stale/crafted handle's createdAt up to the max-age floor", async () => {
180+
const maxAge = 24 * 60 * 60 * 1000;
181+
const { client, resolveSpy } = makeClient({
182+
notifier: liveNotifier,
183+
maximumCreatedAtFilterAgeMs: maxAge,
184+
runSetCreatedAtBucketMs: 0,
185+
livePollTimeoutMs: 50,
186+
});
187+
const before = Date.now();
188+
// Handle encodes createdAt = 1ms epoch, far older than the 24h ceiling.
189+
await client.streamRuns(
190+
"http://localhost:3030/realtime/v1/runs?offset=123_1&live=true&handle=runs_1_7",
191+
ENV,
192+
{ tags: ["t"] },
193+
CURRENT_API_VERSION,
194+
undefined,
195+
"1.0.0"
196+
);
197+
const passed = resolveSpy.mock.calls[0][0].createdAtAfter as Date;
198+
// Clamped to ~now - maxAge, not the epoch value encoded in the handle.
199+
expect(passed.getTime()).toBeGreaterThan(before - maxAge - 1_000);
200+
});
201+
202+
it("enforces a concurrency limit of 0 instead of failing with a 500", async () => {
203+
let limitCheckedWith: number | undefined;
204+
const { client } = makeClient({
205+
notifier: liveNotifier,
206+
cachedLimitProvider: { getCachedLimit: async () => 0 },
207+
limiter: {
208+
incrementAndCheck: async (_env: string, _id: string, limit: number) => {
209+
limitCheckedWith = limit;
210+
return true;
211+
},
212+
decrement: async () => {},
213+
},
214+
livePollTimeoutMs: 50,
215+
});
216+
const res = await client.streamBatch(
217+
"http://localhost:3030/realtime/v1/batches/batch_1?offset=123_1&live=true",
218+
ENV,
219+
"batch_1",
220+
CURRENT_API_VERSION,
221+
undefined,
222+
"1.0.0"
223+
);
224+
expect(res.status).toBe(200);
225+
expect(limitCheckedWith).toBe(0);
226+
});
227+
});

apps/webapp/test/realtime/runReaderProjection.test.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,24 @@ describe("buildHydratorSelect", () => {
1111
expect(select.error).toBe(true);
1212
});
1313

14+
it("keeps protocol-reserved columns even when asked to skip them", () => {
15+
// Reserved columns are always emitted by the serializer, so hydration must keep
16+
// them regardless of skipColumns or the output is null/incorrect.
17+
const select = buildHydratorSelect([
18+
"status",
19+
"taskIdentifier",
20+
"createdAt",
21+
"friendlyId",
22+
"payload",
23+
]);
24+
expect(select.status).toBe(true);
25+
expect(select.taskIdentifier).toBe(true);
26+
expect(select.createdAt).toBe(true);
27+
expect(select.friendlyId).toBe(true);
28+
// A non-reserved skipped column is still dropped.
29+
expect(select.payload).toBeUndefined();
30+
});
31+
1432
it("drops skipped columns but always keeps id + updatedAt", () => {
1533
const select = buildHydratorSelect(["payload", "output", "metadata", "error"]);
1634
expect(select.payload).toBeUndefined();

apps/webapp/test/realtime/shadowCompare.test.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ function makeComparator(
4949
resolvedIds: string[] = []
5050
) {
5151
return new RealtimeShadowComparator({
52-
runReader: { getRunById: async (_env: string, id: string) => rowsById[id] ?? null } as any,
52+
runReader: {
53+
getRunById: async (_env: string, id: string) => rowsById[id] ?? null,
54+
hydrateByIds: async (_env: string, ids: string[]) =>
55+
ids.map((id) => rowsById[id]).filter((row): row is RealtimeRunRow => Boolean(row)),
56+
} as any,
5357
runListResolver: { resolveMatchingRunIds: async (_f: RunListFilter) => resolvedIds } as any,
5458
});
5559
}

0 commit comments

Comments
 (0)