Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a86635c
chore(run-store): scaffold @internal/run-store package
d-cs Jun 17, 2026
d4c1ff4
feat(run-store): add shared types and the RunStore interface
d-cs Jun 17, 2026
6d7abab
chore(run-store): use .js extensions in index re-exports for Node16 r…
d-cs Jun 17, 2026
010cf17
feat(run-store): add NoopRunStore test double
d-cs Jun 17, 2026
72a7462
feat(run-store): add PostgresRunStore with createRun
d-cs Jun 17, 2026
2e63223
feat(run-store): implement createCancelledRun and createFailedRun
d-cs Jun 17, 2026
f8456c1
feat(run-store): implement attempt lifecycle, cancel, and fail methods
d-cs Jun 17, 2026
f1ab6ae
feat(run-store): implement expiry, dequeue-lock, version, and checkpo…
d-cs Jun 17, 2026
f66bbad
feat(run-store): implement reschedule, debounce, metadata, idempotenc…
d-cs Jun 17, 2026
56ec707
feat(run-store): wire RunStore into run-engine SystemResources and we…
d-cs Jun 17, 2026
01bbc67
fix(run-store): align create-input types with the columns callers act…
d-cs Jun 17, 2026
de52aaa
refactor(run-engine): route run creation through RunStore
d-cs Jun 17, 2026
4826117
fix(run-store): allow optional machinePreset in recordRetryOutcome (l…
d-cs Jun 17, 2026
8650e40
refactor(run-engine): route attempt lifecycle, cancel, and fail write…
d-cs Jun 17, 2026
d530eb1
refactor(run-engine): route expiry and dequeue-lock writes through Ru…
d-cs Jun 17, 2026
4ec5aab
fix(run-store): allow undefined maxDurationInSeconds in lockRunToWork…
d-cs Jun 17, 2026
109c6a7
refactor(run-engine): route checkpoint, delayed, pending-version, and…
d-cs Jun 17, 2026
2fbdc5d
refactor(webapp): route run metadata, idempotency-key, and reschedule…
d-cs Jun 17, 2026
1a5ccdc
refactor(webapp): route tag and realtime-stream appends through RunStore
d-cs Jun 17, 2026
60565cf
fix(run-store): short-circuit expireRunsBatch on an empty runIds array
d-cs Jun 18, 2026
3c22b32
Merge main into run-store-write-adapter
d-cs Jun 18, 2026
76f3494
fix(webapp): inject runStore into UpdateMetadataService
d-cs Jun 18, 2026
3b809b9
Merge branch 'main' into run-store-write-adapter
d-cs Jun 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions apps/webapp/app/routes/api.v1.runs.$runId.tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { logger } from "~/services/logger.server";
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
import { runStore } from "~/v3/runStore.server";

// Pull the existing tags out of a buffer entry's serialised payload so
// the buffer-path response can dedup against them, matching the
Expand Down Expand Up @@ -84,14 +85,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
if (newTags.length === 0) {
return json({ message: "No new tags to add" }, { status: 200 });
}
const updated = await prisma.taskRun.update({
where: {
id: taskRun.id,
runtimeEnvironmentId: env.id,
},
data: { runTags: { push: newTags } },
select: { updatedAt: true },
});
const updated = await runStore.pushTags(taskRun.id, newTags, { runtimeEnvironmentId: env.id }, prisma);
// Publish a run-changed record with the NEW tag set so tag feeds reindex
// (no-op unless enabled). updatedAt is the read-your-writes watermark.
publishChangeRecord({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { $replica, prisma } from "~/db.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { ServiceValidationError } from "~/v3/services/common.server";
import { runStore } from "~/v3/runStore.server";

const ParamsSchema = z.object({
runId: z.string(),
Expand Down Expand Up @@ -87,16 +88,7 @@ const { action } = createActionApiRoute(
}

if (!targetRun.realtimeStreams.includes(params.streamId)) {
await prisma.taskRun.update({
where: {
id: targetRun.id,
},
data: {
realtimeStreams: {
push: params.streamId,
},
},
});
await runStore.pushRealtimeStream(targetRun.id, params.streamId, prisma);
}

const part = await request.text();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
createActionApiRoute,
createLoaderApiRoute,
} from "~/services/routeBuilders/apiBuilder.server";
import { runStore } from "~/v3/runStore.server";

const ParamsSchema = z.object({
runId: z.string(),
Expand Down Expand Up @@ -86,12 +87,7 @@ const { action } = createActionApiRoute(
}

if (!target.realtimeStreams.includes(params.streamId)) {
await prisma.taskRun.update({
where: { id: target.id },
data: {
realtimeStreams: { push: params.streamId },
},
});
await runStore.pushRealtimeStream(target.id, params.streamId, prisma);
}

const realtimeStream = getRealtimeStreamInstance(
Expand Down
17 changes: 9 additions & 8 deletions apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server";
import { makeResolveMollifierFlag } from "~/v3/mollifier/mollifierGate.server";
import { runStore } from "~/v3/runStore.server";
import type { TraceEventConcern, TriggerTaskRequest } from "../types";

// In-memory per-org mollifier-enabled check, shared with `evaluateGate`
Expand Down Expand Up @@ -190,10 +191,10 @@ export class IdempotencyKeyConcern {
});

// Update the existing run to remove the idempotency key
await this.prisma.taskRun.updateMany({
where: { id: existingRun.id, idempotencyKey },
data: { idempotencyKey: null, idempotencyKeyExpiresAt: null },
});
await runStore.clearIdempotencyKey(
{ byId: { runId: existingRun.id, idempotencyKey } },
this.prisma
);

return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}
Expand All @@ -207,10 +208,10 @@ export class IdempotencyKeyConcern {
});

// Update the existing run to remove the idempotency key
await this.prisma.taskRun.updateMany({
where: { id: existingRun.id, idempotencyKey },
data: { idempotencyKey: null, idempotencyKeyExpiresAt: null },
});
await runStore.clearIdempotencyKey(
{ byId: { runId: existingRun.id, idempotencyKey } },
this.prisma
);

return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}
Expand Down
48 changes: 25 additions & 23 deletions apps/webapp/app/services/metadata/updateMetadata.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { Effect, Schedule, Duration, Fiber } from "effect";
import { type RuntimeFiber } from "effect/Fiber";
import { setTimeout } from "timers/promises";
import { Logger, LogLevel } from "@trigger.dev/core/logger";
import type { RunStore } from "@internal/run-store";

const RUN_UPDATABLE_WINDOW_MS = 60 * 60 * 1000; // 1 hour

Expand All @@ -24,6 +25,7 @@ type BufferedRunMetadataChangeOperation = {

export type UpdateMetadataServiceOptions = {
prisma: PrismaClientOrTransaction;
runStore: RunStore;
flushIntervalMs?: number;
flushEnabled?: boolean;
flushLoggingEnabled?: boolean;
Expand All @@ -49,6 +51,7 @@ export class UpdateMetadataService {
private _bufferedOperations: Map<string, BufferedRunMetadataChangeOperation[]> = new Map();
private _flushFiber: RuntimeFiber<void> | null = null;
private readonly _prisma: PrismaClientOrTransaction;
private readonly _runStore: RunStore;
private readonly flushIntervalMs: number;
private readonly flushEnabled: boolean;
private readonly flushLoggingEnabled: boolean;
Expand All @@ -57,6 +60,7 @@ export class UpdateMetadataService {

constructor(private readonly options: UpdateMetadataServiceOptions) {
this._prisma = options.prisma;
this._runStore = options.runStore;
this.flushIntervalMs = options.flushIntervalMs ?? 5000;
this.flushEnabled = options.flushEnabled ?? true;
this.flushLoggingEnabled = options.flushLoggingEnabled ?? true;
Expand Down Expand Up @@ -260,17 +264,16 @@ export class UpdateMetadataService {
const writeTime = new Date();
const result = yield* _(
Effect.tryPromise(() =>
this._prisma.taskRun.updateMany({
where: {
id: runId,
metadataVersion: run.metadataVersion,
},
data: {
metadata: newMetadataPacket.data,
this._runStore.updateMetadata(
runId,
{
metadata: newMetadataPacket.data!,
metadataVersion: { increment: 1 },
updatedAt: writeTime,
},
})
{ expectedMetadataVersion: run.metadataVersion },
this._prisma
)
)
);

Expand Down Expand Up @@ -469,20 +472,19 @@ export class UpdateMetadataService {
// Update with optimistic locking; updatedAt stamped explicitly so the caller can
// publish the exact committed watermark without a follow-up read.
const writeTime = new Date();
const result = await this._prisma.taskRun.updateMany({
where: {
id: runId,
metadataVersion: run.metadataVersion,
},
data: {
metadata: newMetadataPacket.data,
const result = await this._runStore.updateMetadata(
runId,
{
metadata: newMetadataPacket.data!,
metadataType: newMetadataPacket.dataType,
metadataVersion: {
increment: 1,
},
updatedAt: writeTime,
},
});
{ expectedMetadataVersion: run.metadataVersion },
this._prisma
);

if (result.count === 0) {
if (this.flushLoggingEnabled) {
Expand Down Expand Up @@ -564,19 +566,19 @@ export class UpdateMetadataService {
// Update the metadata without version check; updatedAt stamped explicitly so the
// caller can publish the exact committed watermark.
const writeTime = new Date();
await this._prisma.taskRun.update({
where: {
id: runId,
},
data: {
metadata: metadataPacket?.data,
await this._runStore.updateMetadata(
runId,
{
metadata: metadataPacket?.data!,
metadataType: metadataPacket?.dataType,
metadataVersion: {
increment: 1,
},
updatedAt: writeTime,
},
});
{},
this._prisma
);
updatedAtMs = writeTime.getTime();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ import { singleton } from "~/utils/singleton";
import { env } from "~/env.server";
import { UpdateMetadataService } from "./updateMetadata.server";
import { prisma } from "~/db.server";
import { runStore } from "~/v3/runStore.server";
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";

export const updateMetadataService = singleton(
"update-metadata-service",
() =>
new UpdateMetadataService({
prisma,
runStore,
flushIntervalMs: env.BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS,
flushEnabled: env.BATCH_METADATA_OPERATIONS_FLUSH_ENABLED === "1",
flushLoggingEnabled: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1",
Expand Down
8 changes: 8 additions & 0 deletions apps/webapp/app/v3/runStore.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { PostgresRunStore } from "@internal/run-store";
import { $replica, prisma } from "~/db.server";
import { singleton } from "~/utils/singleton";

export const runStore = singleton(
"PostgresRunStore",
() => new PostgresRunStore({ prisma, readOnlyPrisma: $replica })
);
5 changes: 4 additions & 1 deletion apps/webapp/app/v3/services/baseService.server.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import { Span, SpanKind } from "@opentelemetry/api";
import type { RunStore } from "@internal/run-store";
import { $replica, PrismaClientOrTransaction, prisma } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
import { engine, RunEngine } from "../runEngine.server";
import { runStore as defaultRunStore } from "../runStore.server";
import { ServiceValidationError } from "./common.server";

export { ServiceValidationError };

export abstract class BaseService {
constructor(
protected readonly _prisma: PrismaClientOrTransaction = prisma,
protected readonly _replica: PrismaClientOrTransaction = $replica
protected readonly _replica: PrismaClientOrTransaction = $replica,
protected readonly runStore: RunStore = defaultRunStore
) {}

protected async traceWithEnv<T>(
Expand Down
8 changes: 4 additions & 4 deletions apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,10 @@ export class BatchTriggerV3Service extends BaseService {

// Expire the cached runs that are no longer valid
if (expiredRunIds.size) {
await this._prisma.taskRun.updateMany({
where: { friendlyId: { in: Array.from(expiredRunIds) } },
data: { idempotencyKey: null },
});
await this.runStore.clearIdempotencyKey(
{ byFriendlyIds: Array.from(expiredRunIds) },
this._prisma
);
}

return runs;
Expand Down
11 changes: 5 additions & 6 deletions apps/webapp/app/v3/services/rescheduleTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ export class RescheduleTaskRunService extends BaseService {
throw new ServiceValidationError(`Invalid delay: ${body.delay}`);
}

const updatedRun = await this._prisma.taskRun.update({
where: {
id: taskRun.id,
},
data: {
const updatedRun = await this.runStore.rescheduleRun(
taskRun.id,
{
delayUntil: delay,
queueTimestamp: delay,
},
});
this._prisma
);

if (updatedRun.engine === "V1") {
await EnqueueDelayedRunService.reschedule(taskRun.id, delay);
Expand Down
38 changes: 18 additions & 20 deletions apps/webapp/app/v3/services/resetIdempotencyKey.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@ export class ResetIdempotencyKeyService extends BaseService {
taskIdentifier: string,
authenticatedEnv: AuthenticatedEnvironment
): Promise<{ id: string }> {
const { count: pgCount } = await this._prisma.taskRun.updateMany({
where: {
idempotencyKey,
taskIdentifier,
runtimeEnvironmentId: authenticatedEnv.id,
},
data: {
idempotencyKey: null,
idempotencyKeyExpiresAt: null,
const { count: pgCount } = await this.runStore.clearIdempotencyKey(
{
byPredicate: {
idempotencyKey,
taskIdentifier,
runtimeEnvironmentId: authenticatedEnv.id,
},
},
});
this._prisma
);

// Buffer-side reset: the key may belong to a buffered run that
// hasn't materialised yet. The PG updateMany above can't see it.
Expand Down Expand Up @@ -75,17 +74,16 @@ export class ResetIdempotencyKeyService extends BaseService {
// lookup against the writer when there's nothing to find;
// otherwise the exact write the customer asked for (i.e., not
// duplicative — without it the reset is silently lost).
const { count: handoffPgCount } = await this._prisma.taskRun.updateMany({
where: {
idempotencyKey,
taskIdentifier,
runtimeEnvironmentId: authenticatedEnv.id,
},
data: {
idempotencyKey: null,
idempotencyKeyExpiresAt: null,
const { count: handoffPgCount } = await this.runStore.clearIdempotencyKey(
{
byPredicate: {
idempotencyKey,
taskIdentifier,
runtimeEnvironmentId: authenticatedEnv.id,
},
},
});
this._prisma
);
if (handoffPgCount > 0) {
logger.info(
`Reset idempotency key via handoff re-check: ${idempotencyKey} for task: ${taskIdentifier} in env: ${authenticatedEnv.id}, affected ${handoffPgCount} run(s)`
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"@internal/llm-model-catalog": "workspace:*",
"@internal/redis": "workspace:*",
"@internal/run-engine": "workspace:*",
"@internal/run-store": "workspace:*",
"@internal/schedule-engine": "workspace:*",
"@internal/tracing": "workspace:*",
"@internal/tsql": "workspace:*",
Expand Down
Loading
Loading