Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { logger } from "~/services/logger.server";
import { parseDelay } from "~/utils/delays";
import { handleMetadataPacket } from "~/utils/packets";
import { startSpan } from "~/v3/tracing.server";
import { shouldUseV2RunTable } from "~/v3/runTableV2.server";
import type {
TriggerTaskServiceOptions,
TriggerTaskServiceResult,
Expand Down Expand Up @@ -151,7 +152,17 @@ export class RunEngineTriggerTaskService {
span.setAttribute("taskId", taskId);
span.setAttribute("attempt", attempt);

const runFriendlyId = options?.runFriendlyId ?? RunId.generate().friendlyId;
// The single per-org cutover point: an opted-in org mints a KSUID id
// (routing the run to task_run_v2), everyone else keeps a legacy id
// (TaskRun). The flag is a pure in-memory read of the org's
// featureFlags already loaded on `environment` — no DB query on the
// trigger hot path. Downstream routing is by id format only.
const runFriendlyId =
options?.runFriendlyId ??
(shouldUseV2RunTable(environment.organization.featureFlags)
? RunId.generateKsuid()
: RunId.generate()
).friendlyId;
const triggerRequest = {
taskId,
friendlyId: runFriendlyId,
Expand Down
48 changes: 42 additions & 6 deletions apps/webapp/app/services/runsBackfiller.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ export class RunsBackfillerService {
span.setAttribute("cursor", cursor ?? "");
span.setAttribute("batchSize", batchSize ?? 0);

// Keyset on (createdAt, id). Runs now live across two physical tables
// (legacy TaskRun with cuid ids, task_run_v2 with ksuid ids), and `id`
// alone is not a valid order across them: cuid and ksuid sort in
// different ranges. RunStore merges the two tables only on a time-based
// key, so order by createdAt and tiebreak on id within a timestamp.
const keyset = cursor ? decodeBackfillCursor(cursor) : undefined;

const runs = await runStore.findRuns(
{
where: {
Expand All @@ -51,11 +58,16 @@ export class RunsBackfillerService {
status: {
in: FINAL_RUN_STATUSES,
},
...(cursor ? { id: { gt: cursor } } : {}),
},
orderBy: {
id: "asc",
...(keyset
? {
OR: [
{ createdAt: { gt: keyset.createdAt } },
{ createdAt: keyset.createdAt, id: { gt: keyset.id } },
],
}
: {}),
},
orderBy: [{ createdAt: "asc" }, { id: "asc" }],
take: batchSize,
},
this.prisma
Expand Down Expand Up @@ -94,8 +106,32 @@ export class RunsBackfillerService {
lastRunId: lastRun.id,
});

// Return the last run ID to continue from
return lastRun.id;
// Return a (createdAt, id) cursor to continue from on the next batch.
return encodeBackfillCursor(lastRun.createdAt, lastRun.id);
});
}
}

// The backfill cursor is an opaque "<createdAt ISO>_<id>" string. The admin
// worker passes it back verbatim across batches; only this service interprets
// it. An ISO timestamp contains no "_" and run ids are base62/base36, so the
// first "_" cleanly splits the two halves.
const BACKFILL_CURSOR_SEPARATOR = "_";

export function encodeBackfillCursor(createdAt: Date, id: string): string {
return `${createdAt.toISOString()}${BACKFILL_CURSOR_SEPARATOR}${id}`;
}

export function decodeBackfillCursor(cursor: string): { createdAt: Date; id: string } {
const separatorIndex = cursor.indexOf(BACKFILL_CURSOR_SEPARATOR);
const createdAt = separatorIndex === -1 ? new Date(NaN) : new Date(cursor.slice(0, separatorIndex));
const id = separatorIndex === -1 ? "" : cursor.slice(separatorIndex + 1);

if (Number.isNaN(createdAt.getTime()) || id.length === 0) {
throw new Error(
`RunsBackfillerService: malformed cursor "${cursor}" (expected "<createdAt>_<id>")`
);
}

return { createdAt, id };
}
5 changes: 5 additions & 0 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ export class RunsReplicationService {
slotName: options.slotName,
publicationName: options.publicationName,
table: "TaskRun",
// task_run_v2 is a column-identical clone of TaskRun, so its WAL rows
// flow through the same handler/transform into the same ClickHouse table.
// Co-publishing it keeps the ClickHouse mirror complete once orgs cut over
// to v2 run ids; until then the table is empty and this is a no-op.
additionalTables: ["task_run_v2"],
redisOptions: options.redisOptions,
autoAcknowledge: false,
publicationActions: ["insert", "update", "delete"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,13 @@ export class ClickHouseRunsRepository implements IRunsRepository {
async listRuns(options: ListRunsOptions) {
const { runIds, pagination } = await this.listRunIds(options);

let runs = await runStore.findRuns(
const hydrated = await runStore.findRuns(
{
where: {
id: {
in: runIds,
},
},
orderBy: {
id: "desc",
},
select: {
id: true,
friendlyId: true,
Expand Down Expand Up @@ -216,6 +213,15 @@ export class ClickHouseRunsRepository implements IRunsRepository {
this.options.prisma
);

// ClickHouse already ranked `runIds`. An `IN (...)` hydration comes back
// unordered, and a single SQL `orderBy` can't span the two physical run
// tables (legacy TaskRun + task_run_v2), so restore ClickHouse's ranking
// in memory.
const runById = new Map(hydrated.map((run) => [run.id, run]));
let runs = runIds
.map((id) => runById.get(id))
.filter((run): run is NonNullable<typeof run> => run !== undefined);

// ClickHouse is slightly delayed, so we're going to do in-memory status filtering too
if (options.statuses && options.statuses.length > 0) {
runs = runs.filter((run) => options.statuses!.includes(run.status));
Expand Down
7 changes: 7 additions & 0 deletions apps/webapp/app/v3/featureFlags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export const FEATURE_FLAG = {
computeMigrationFreePercentage: "computeMigrationFreePercentage",
computeMigrationPaidPercentage: "computeMigrationPaidPercentage",
computeMigrationRequireTemplate: "computeMigrationRequireTemplate",
runTableV2: "runTableV2",
} as const;

export const FeatureFlagCatalog = {
Expand Down Expand Up @@ -43,6 +44,12 @@ export const FeatureFlagCatalog = {
// When on, migrated orgs build their compute template in required mode at deploy
// (fails the deploy on error) instead of shadow. Strict boolean (see above).
[FEATURE_FLAG.computeMigrationRequireTemplate]: z.boolean(),
// Per-org cutover to the parallel task_run_v2 table. When on, new runs for the
// org mint a KSUID id (routing them to task_run_v2); off (the default) keeps
// minting legacy ids. Strict boolean (see above): coercing a stringified
// "false" to true would cut an org over by mistake, and runs created on v2
// stay on v2.
[FEATURE_FLAG.runTableV2]: z.boolean(),
};

export type FeatureFlagKey = keyof typeof FeatureFlagCatalog;
Expand Down
28 changes: 28 additions & 0 deletions apps/webapp/app/v3/runTableV2.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { FEATURE_FLAG, FeatureFlagCatalog } from "~/v3/featureFlags";

/**
* Per-org cutover switch for the parallel `task_run_v2` run table.
*
* Read in memory from `Organization.featureFlags` (already loaded on the
* AuthenticatedEnvironment at API-key auth, so this adds no DB query) at the
* single run-id mint site in the trigger path. On → mint a KSUID id, which
* routes the run to `task_run_v2`; off (the default) → mint a legacy id, which
* routes to `TaskRun`.
*
* RunStore never reads this flag: it routes purely by id format. The flag only
* decides which id scheme is minted upstream. Disabling it sends only NEW runs
* back to legacy; runs already created on v2 stay readable there (routed by id).
*/
export function shouldUseV2RunTable(orgFeatureFlags: unknown): boolean {
if (orgFeatureFlags === null || typeof orgFeatureFlags !== "object") {
return false;
}

const override = (orgFeatureFlags as Record<string, unknown>)[FEATURE_FLAG.runTableV2];
if (override === undefined) {
return false;
}

const parsed = FeatureFlagCatalog[FEATURE_FLAG.runTableV2].safeParse(override);
return parsed.success ? parsed.data : false;
}
28 changes: 28 additions & 0 deletions apps/webapp/test/runTableV2.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { describe, expect, it } from "vitest";
import { shouldUseV2RunTable } from "~/v3/runTableV2.server";

describe("shouldUseV2RunTable", () => {
it("defaults to false when the org has no flags", () => {
expect(shouldUseV2RunTable(null)).toBe(false);
expect(shouldUseV2RunTable(undefined)).toBe(false);
expect(shouldUseV2RunTable({})).toBe(false);
});

it("returns true only when the flag is the boolean true", () => {
expect(shouldUseV2RunTable({ runTableV2: true })).toBe(true);
expect(shouldUseV2RunTable({ runTableV2: false })).toBe(false);
});

it("rejects a stringified flag value (strict boolean, no coercion)", () => {
// A stringified "false" must not coerce to true and cut the org over.
expect(shouldUseV2RunTable({ runTableV2: "true" })).toBe(false);
expect(shouldUseV2RunTable({ runTableV2: "false" })).toBe(false);
expect(shouldUseV2RunTable({ runTableV2: 1 })).toBe(false);
});

it("ignores unrelated flags and non-object inputs", () => {
expect(shouldUseV2RunTable({ mollifierEnabled: true })).toBe(false);
expect(shouldUseV2RunTable("runTableV2")).toBe(false);
expect(shouldUseV2RunTable(42)).toBe(false);
});
});
125 changes: 125 additions & 0 deletions apps/webapp/test/runsReplicationService.taskRunV2.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { ClickHouse } from "@internal/clickhouse";
import { replicationContainerTest } from "@internal/testcontainers";
import { RunId } from "@trigger.dev/core/v3/isomorphic";
import { setTimeout } from "node:timers/promises";
import { z } from "zod";
import { RunsReplicationService } from "~/services/runsReplicationService.server";
import { createInMemoryTracing } from "./utils/tracing";
import { TestReplicationClickhouseFactory } from "./utils/testReplicationClickhouseFactory";

vi.setConfig({ testTimeout: 60_000 });

describe("RunsReplicationService (task_run_v2)", () => {
replicationContainerTest(
"co-publishes task_run_v2 and streams its rows to the same ClickHouse table",
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
// Both tables are in the publication; both need FULL identity so the
// delete transform can read the old row. INSERTs (this test) carry the
// full new tuple regardless, but we mirror the production setup.
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
await prisma.$executeRawUnsafe(`ALTER TABLE public."task_run_v2" REPLICA IDENTITY FULL;`);

const clickhouse = new ClickHouse({
url: clickhouseContainer.getConnectionUrl(),
name: "runs-replication",
compression: { request: true },
logLevel: "warn",
});

const { tracer } = createInMemoryTracing();

const runsReplicationService = new RunsReplicationService({
clickhouseFactory: new TestReplicationClickhouseFactory(clickhouse),
pgConnectionUrl: postgresContainer.getConnectionUri(),
serviceName: "runs-replication",
slotName: "task_runs_to_clickhouse_v1",
publicationName: "task_runs_to_clickhouse_v1_publication",
redisOptions,
maxFlushConcurrency: 1,
flushIntervalMs: 100,
flushBatchSize: 1,
leaderLockTimeoutMs: 5000,
leaderLockExtendIntervalMs: 1000,
ackIntervalSeconds: 5,
tracer,
logLevel: "warn",
});

await runsReplicationService.start();

Comment on lines +48 to +49

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Always stop replication service in finally.

If any assertion throws before Line 122, the service keeps running and can interfere with later tests in the same worker.

Proposed fix
-      await runsReplicationService.start();
-
-      const organization = await prisma.organization.create({
+      await runsReplicationService.start();
+      try {
+        const organization = await prisma.organization.create({
           data: { title: "test", slug: "test" },
-      });
+        });
         // ...existing test body...
-
-      await runsReplicationService.stop();
+      } finally {
+        await runsReplicationService.stop();
+      }

Also applies to: 122-123

const organization = await prisma.organization.create({
data: { title: "test", slug: "test" },
});
const project = await prisma.project.create({
data: {
name: "test",
slug: "test",
organizationId: organization.id,
externalRef: "test",
},
});
const runtimeEnvironment = await prisma.runtimeEnvironment.create({
data: {
slug: "test",
type: "DEVELOPMENT",
projectId: project.id,
organizationId: organization.id,
apiKey: "test",
pkApiKey: "test",
shortcode: "test",
},
});

// A v2 run lives in task_run_v2, keyed by a KSUID id.
const ksuid = RunId.generateKsuid();
const run = await prisma.taskRunV2.create({
data: {
id: ksuid.id,
friendlyId: ksuid.friendlyId,
taskIdentifier: "my-task",
payload: JSON.stringify({ foo: "bar" }),
payloadType: "application/json",
traceId: "v2trace",
spanId: "v2span",
queue: "test",
workerQueue: "us-east-1-next",
region: "us-east-1",
planType: "free",
runtimeEnvironmentId: runtimeEnvironment.id,
projectId: project.id,
organizationId: organization.id,
environmentType: "DEVELOPMENT",
engine: "V2",
},
});

await setTimeout(1000);

const queryRuns = clickhouse.reader.query({
name: "runs-replication",
query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {runId: String}",
schema: z.any(),
params: z.object({ runId: z.string() }),
});

const [queryError, result] = await queryRuns({ runId: run.id });

Comment on lines +96 to +106

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Replace fixed sleep with bounded polling to avoid flaky replication timing.

Line 96 uses a fixed setTimeout(1000) and then a single read. Replication lag variance can intermittently fail this test even when behavior is correct.

Proposed fix
-      await setTimeout(1000);
-
       const queryRuns = clickhouse.reader.query({
         name: "runs-replication",
         query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {runId: String}",
         schema: z.any(),
         params: z.object({ runId: z.string() }),
       });
 
-      const [queryError, result] = await queryRuns({ runId: run.id });
+      let queryError: unknown = null;
+      let result: unknown[] | undefined;
+      const deadline = Date.now() + 10_000;
+
+      do {
+        [queryError, result] = await queryRuns({ runId: run.id });
+        if (!queryError && result?.length === 1) break;
+        await setTimeout(200);
+      } while (Date.now() < deadline);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await setTimeout(1000);
const queryRuns = clickhouse.reader.query({
name: "runs-replication",
query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {runId: String}",
schema: z.any(),
params: z.object({ runId: z.string() }),
});
const [queryError, result] = await queryRuns({ runId: run.id });
const queryRuns = clickhouse.reader.query({
name: "runs-replication",
query: "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {runId: String}",
schema: z.any(),
params: z.object({ runId: z.string() }),
});
let queryError: unknown = null;
let result: unknown[] | undefined;
const deadline = Date.now() + 10_000;
do {
[queryError, result] = await queryRuns({ runId: run.id });
if (!queryError && result?.length === 1) break;
await setTimeout(200);
} while (Date.now() < deadline);

expect(queryError).toBeNull();
expect(result?.length).toBe(1);
expect(result?.[0]).toEqual(
expect.objectContaining({
run_id: run.id,
friendly_id: run.friendlyId,
task_identifier: "my-task",
environment_id: runtimeEnvironment.id,
project_id: project.id,
organization_id: organization.id,
environment_type: "DEVELOPMENT",
engine: "V2",
})
);

await runsReplicationService.stop();
}
);
});
4 changes: 4 additions & 0 deletions apps/webapp/test/utils/replicationUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ export async function setupClickhouseReplication({
redisOptions: RedisOptions;
}) {
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
// task_run_v2 is co-published with TaskRun; it needs FULL identity too so
// UPDATE/DELETE WAL events carry the old row (the delete transform reads
// organizationId/environmentType off it). Mirrors the TaskRun line above.
await prisma.$executeRawUnsafe(`ALTER TABLE public."task_run_v2" REPLICA IDENTITY FULL;`);

const clickhouse = new ClickHouse({
url: clickhouseUrl,
Expand Down
Loading
Loading