diff --git a/.server-changes/bulk-action-cursor-pagination.md b/.server-changes/bulk-action-cursor-pagination.md new file mode 100644 index 00000000000..5f506493d11 --- /dev/null +++ b/.server-changes/bulk-action-cursor-pagination.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Fix run pagination that could duplicate or skip runs: the query orders by `(created_at, run_id)` but the cursor cut on `run_id` alone, which diverges when run_id order doesn't match created_at order (e.g. bulk replay re-processing runs). Cursors now encode the composite key as an opaque token and cut on the matching tuple; legacy bare-run_id cursors stay supported for in-flight pagination. diff --git a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts index 7a59179c55e..ec46be58764 100644 --- a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts @@ -10,6 +10,9 @@ import { convertRunListInputOptionsToFilterRunsOptions, } from "./runsRepository.server"; import parseDuration from "parse-duration"; +import { decodeRunsCursor, encodeRunsCursor } from "./runsCursor.server"; + +type RunCursorRow = { runId: string; createdAt: number }; export class ClickHouseRunsRepository implements IRunsRepository { constructor(private readonly options: RunsRepositoryOptions) {} @@ -18,25 +21,52 @@ export class ClickHouseRunsRepository implements IRunsRepository { return "clickhouse"; } - async listRunIds(options: ListRunsOptions) { + /** + * Runs the keyset-paginated query and returns `{ runId, createdAt }` rows + * (one extra beyond `page.size` to signal "has more"). The ordering is always + * the composite `(created_at, run_id)`; the cursor predicate must match it. + * + * Composite cursors carry both components, so we cut on the + * `(created_at, run_id)` tuple — sound regardless of how run_id order relates + * to created_at order. Legacy bare-run_id cursors fall back to the old + * `run_id`-only predicate (knowingly unsound) for backwards compatibility + * with in-flight cursors. + */ + private async listRunRows(options: ListRunsOptions): Promise { const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder(); applyRunFiltersToQueryBuilder( queryBuilder, await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma) ); + const forward = options.page.direction === "forward" || !options.page.direction; + if (options.page.cursor) { - if (options.page.direction === "forward" || !options.page.direction) { - queryBuilder - .where("run_id < {runId: String}", { runId: options.page.cursor }) - .orderBy("created_at DESC, run_id DESC") - .limit(options.page.size + 1); + const decoded = decodeRunsCursor(options.page.cursor); + + if (forward) { + if (decoded.kind === "composite") { + queryBuilder.where( + "(created_at, run_id) < (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})", + { cursorCreatedAt: decoded.createdAt, runId: decoded.runId } + ); + } else { + queryBuilder.where("run_id < {runId: String}", { runId: decoded.runId }); + } + queryBuilder.orderBy("created_at DESC, run_id DESC"); } else { - queryBuilder - .where("run_id > {runId: String}", { runId: options.page.cursor }) - .orderBy("created_at ASC, run_id ASC") - .limit(options.page.size + 1); + if (decoded.kind === "composite") { + queryBuilder.where( + "(created_at, run_id) > (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})", + { cursorCreatedAt: decoded.createdAt, runId: decoded.runId } + ); + } else { + queryBuilder.where("run_id > {runId: String}", { runId: decoded.runId }); + } + queryBuilder.orderBy("created_at ASC, run_id ASC"); } + + queryBuilder.limit(options.page.size + 1); } else { // Initial page - no cursor provided queryBuilder.orderBy("created_at DESC, run_id DESC").limit(options.page.size + 1); @@ -48,8 +78,31 @@ export class ClickHouseRunsRepository implements IRunsRepository { throw queryError; } - const runIds = result.map((row) => row.run_id); - return runIds; + return result.map((row) => ({ runId: row.run_id, createdAt: row.created_at_ms })); + } + + async listRunIds(options: ListRunsOptions) { + const rows = await this.listRunRows(options); + return rows.map((row) => row.runId); + } + + /** + * Forward-only batch iteration (bulk actions). Returns up to `page.size` run + * ids plus the composite cursor for the next batch (null when this batch is + * empty). The `created_at` component comes from the same query that orders the + * rows, so the next batch's tuple predicate is always consistent. + */ + async listRunIdsWithCursor( + options: ListRunsOptions + ): Promise<{ runIds: string[]; nextCursor: string | null }> { + const rows = await this.listRunRows(options); + const batch = rows.slice(0, options.page.size); + const last = batch.at(-1); + + return { + runIds: batch.map((row) => row.runId), + nextCursor: last ? encodeRunsCursor(last.createdAt, last.runId) : null, + }; } async listFriendlyRunIds(options: ListRunsOptions) { @@ -76,10 +129,15 @@ export class ClickHouseRunsRepository implements IRunsRepository { } async listRuns(options: ListRunsOptions) { - const runIds = await this.listRunIds(options); + const rows = await this.listRunRows(options); // If there are more runs than the page size, we need to fetch the next page - const hasMore = runIds.length > options.page.size; + const hasMore = rows.length > options.page.size; + + // Cursors carry both (created_at, run_id) so the next/prev page predicate + // matches the composite ordering — see runsCursor.server.ts. + const cursorFor = (row: RunCursorRow | undefined): string | null => + row ? encodeRunsCursor(row.createdAt, row.runId) : null; let nextCursor: string | null = null; let previousCursor: string | null = null; @@ -88,30 +146,31 @@ export class ClickHouseRunsRepository implements IRunsRepository { const direction = options.page.direction ?? "forward"; switch (direction) { case "forward": { - previousCursor = options.page.cursor ? runIds.at(0) ?? null : null; + previousCursor = options.page.cursor ? cursorFor(rows.at(0)) : null; if (hasMore) { - // The next cursor should be the last run ID from this page - nextCursor = runIds[options.page.size - 1]; + // The next cursor should be the last run from this page + nextCursor = cursorFor(rows[options.page.size - 1]); } break; } case "backward": { - const reversedRunIds = [...runIds].reverse(); + const reversedRows = [...rows].reverse(); if (hasMore) { - previousCursor = reversedRunIds.at(1) ?? null; - nextCursor = reversedRunIds.at(options.page.size) ?? null; + previousCursor = cursorFor(reversedRows.at(1)); + nextCursor = cursorFor(reversedRows.at(options.page.size)); } else { - nextCursor = reversedRunIds.at(options.page.size - 1) ?? null; + nextCursor = cursorFor(reversedRows.at(options.page.size - 1)); } break; } } - const runIdsToReturn = + const runIdsToReturn = ( options.page.direction === "backward" && hasMore - ? runIds.slice(1, options.page.size + 1) - : runIds.slice(0, options.page.size); + ? rows.slice(1, options.page.size + 1) + : rows.slice(0, options.page.size) + ).map((row) => row.runId); let runs = await this.options.prisma.taskRun.findMany({ where: { diff --git a/apps/webapp/app/services/runsRepository/runsCursor.server.ts b/apps/webapp/app/services/runsRepository/runsCursor.server.ts new file mode 100644 index 00000000000..f16d30a0c05 --- /dev/null +++ b/apps/webapp/app/services/runsRepository/runsCursor.server.ts @@ -0,0 +1,47 @@ +/** + * Cursor encoding for keyset pagination over `(created_at, run_id)`. + * + * The list query orders by the composite key `(created_at, run_id)`, so a sound + * cursor must carry BOTH components — cutting on `run_id` alone re-includes and + * skips rows whenever `run_id` order diverges from `created_at` order. + * + * A cursor is an opaque URL-safe base64 token wrapping `{ c: createdAtMs, r: + * runId }`. Cursors are server-issued (the SDK just echoes + * `pagination.next`/`previous` back), so this format needs no client update. + * + * Legacy cursors were the bare internal run_id (a cuid). They are detected by + * decode failure: a cuid base64-decodes to non-JSON bytes, so it falls through + * to `{ kind: "legacy" }` and the old (knowingly unsound) `run_id`-only + * predicate. In-flight legacy cursors keep working and drain naturally. + */ + +import { z } from "zod"; + +export type DecodedRunsCursor = + | { kind: "composite"; createdAt: number; runId: string } + | { kind: "legacy"; runId: string }; + +// `c` = created_at (ms since epoch), `r` = run_id. Short keys keep the token small. +const CompositeCursor = z.object({ + c: z.number().int(), + r: z.string().min(1), +}); + +export function encodeRunsCursor(createdAtMs: number, runId: string): string { + return Buffer.from(JSON.stringify({ c: createdAtMs, r: runId })).toString("base64url"); +} + +export function decodeRunsCursor(cursor: string): DecodedRunsCursor { + try { + const parsed = CompositeCursor.safeParse( + JSON.parse(Buffer.from(cursor, "base64url").toString("utf8")) + ); + if (parsed.success) { + return { kind: "composite", createdAt: parsed.data.c, runId: parsed.data.r }; + } + } catch { + // JSON.parse threw — not a composite cursor. + } + + return { kind: "legacy", runId: cursor }; +} diff --git a/apps/webapp/app/services/runsRepository/runsRepository.server.ts b/apps/webapp/app/services/runsRepository/runsRepository.server.ts index 9a0a4a19746..8a86f4ea0be 100644 --- a/apps/webapp/app/services/runsRepository/runsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/runsRepository.server.ts @@ -130,6 +130,15 @@ export type TagList = { export interface IRunsRepository { name: string; listRunIds(options: ListRunsOptions): Promise; + /** + * Forward-only batch iteration (bulk actions). Returns up to `page.size` run + * ids and the composite cursor for the next batch (null when the batch is + * empty). Keeping cursor construction here ensures the `created_at` component + * comes from the same source as the ordering. + */ + listRunIdsWithCursor( + options: ListRunsOptions + ): Promise<{ runIds: string[]; nextCursor: string | null }>; /** Returns friendly IDs (e.g., run_xxx) instead of internal UUIDs. Used for ClickHouse task_events queries. */ listFriendlyRunIds(options: ListRunsOptions): Promise; listRuns(options: ListRunsOptions): Promise<{ @@ -169,6 +178,23 @@ export class RunsRepository implements IRunsRepository { ); } + async listRunIdsWithCursor( + options: ListRunsOptions + ): Promise<{ runIds: string[]; nextCursor: string | null }> { + return startActiveSpan( + "runsRepository.listRunIdsWithCursor", + async () => this.clickHouseRunsRepository.listRunIdsWithCursor(options), + { + attributes: { + "repository.name": "clickhouse", + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } + async listFriendlyRunIds(options: ListRunsOptions): Promise { return startActiveSpan( "runsRepository.listFriendlyRunIds", diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index 21f5d39db91..00cc2f2bb6a 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -159,8 +159,10 @@ export class BulkActionService extends BaseService { throw new Error(`Bulk action group has invalid query name: ${group.queryName}`); } - // 2. Get the runs to process in this batch - const runIds = await runsRepository.listRunIds({ + // 2. Get the runs to process in this batch, plus the cursor for the next + // batch. The cursor is a composite (created_at, run_id) keyset cursor so the + // next batch can't re-include or skip runs. + const { runIds: runIdsToProcess, nextCursor } = await runsRepository.listRunIdsWithCursor({ ...filters, page: { size: env.BULK_ACTION_BATCH_SIZE, @@ -172,8 +174,6 @@ export class BulkActionService extends BaseService { // 3. Process the runs let successCount = 0; let failureCount = 0; - // Slice because we fetch an extra for the cursor - const runIdsToProcess = runIds.slice(0, env.BULK_ACTION_BATCH_SIZE); switch (group.type) { case BulkActionType.CANCEL: { @@ -292,7 +292,7 @@ export class BulkActionService extends BaseService { const updatedGroup = await this._prisma.bulkActionGroup.update({ where: { id: bulkActionId }, data: { - cursor: runIdsToProcess.at(runIdsToProcess.length - 1), + cursor: nextCursor, successCount: { increment: successCount, }, diff --git a/apps/webapp/test/runsRepositoryCursor.test.ts b/apps/webapp/test/runsRepositoryCursor.test.ts new file mode 100644 index 00000000000..36ff707a2f9 --- /dev/null +++ b/apps/webapp/test/runsRepositoryCursor.test.ts @@ -0,0 +1,299 @@ +import { describe, expect, vi } from "vitest"; + +// Mock the db prisma client +vi.mock("~/db.server", () => ({ + prisma: {}, + $replica: {}, +})); + +import { containerTest } from "@internal/testcontainers"; +import { setTimeout } from "node:timers/promises"; +import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; +import { setupClickhouseReplication } from "./utils/replicationUtils"; + +vi.setConfig({ testTimeout: 60_000 }); + +/** + * Regression tests for keyset pagination over `(created_at, run_id)`. + * + * `listRunIds`/`listRuns` order by the composite key `(created_at, run_id)` but + * the old cursor predicate cut on `run_id` alone. That is only sound when + * `run_id` lexicographic order matches `created_at` order. When a burst of runs + * is created such that the two orders diverge (here: deliberately reversed), + * keyset pagination both re-includes already-seen runs (duplicates) and drops + * runs it should have returned (skips). + * + * Each test inserts runs with explicit ids so that `run_id` ascending order is + * the exact reverse of `created_at` ascending order, then walks every page and + * asserts the union is exactly the inserted set with no duplicates. + */ +describe("RunsRepository cursor pagination", () => { + containerTest( + "forward pagination returns every run exactly once when run_id order is the reverse of created_at order", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + const { clickhouse } = await setupClickhouseReplication({ + prisma, + databaseUrl: postgresContainer.getConnectionUri(), + clickhouseUrl: clickhouseContainer.getConnectionUrl(), + redisOptions, + }); + + const organization = await prisma.organization.create({ + data: { title: "test", slug: "test" }, + }); + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + // run_id ascending: a < b < c < d < e + // created_at ascending: e < d < c < b < a (the exact reverse) + const ids = [ + "aaaaaaaaaaaaaaaaaaaaaaaa", + "bbbbbbbbbbbbbbbbbbbbbbbb", + "cccccccccccccccccccccccc", + "dddddddddddddddddddddddd", + "eeeeeeeeeeeeeeeeeeeeeeee", + ]; + const base = new Date("2026-06-04T16:55:07.000Z").getTime(); + for (let i = 0; i < ids.length; i++) { + await prisma.taskRun.create({ + data: { + id: ids[i], + // earliest-created run has the largest run_id (reverse correlation) + createdAt: new Date(base + (ids.length - 1 - i) * 1000), + friendlyId: `run_${ids[i]}`, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: `trace_${i}`, + spanId: `span_${i}`, + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + } + + await setTimeout(1000); + + const runsRepository = new RunsRepository({ prisma, clickhouse }); + + const baseOptions = { + projectId: project.id, + environmentId: runtimeEnvironment.id, + organizationId: organization.id, + }; + + // Walk every forward page, size 2, accumulating ids. + const seen: string[] = []; + let cursor: string | undefined = undefined; + for (let guard = 0; guard < 20; guard++) { + const page = await runsRepository.listRuns({ + ...baseOptions, + page: { size: 2, cursor, direction: cursor ? "forward" : undefined }, + }); + seen.push(...page.runs.map((r) => r.id)); + if (!page.pagination.nextCursor) break; + cursor = page.pagination.nextCursor; + } + + // No duplicates, no skips: every inserted run appears exactly once. + expect(seen.slice().sort()).toEqual(ids.slice().sort()); + expect(new Set(seen).size).toBe(ids.length); + } + ); + + containerTest( + "backward pagination round-trips to the previous page when run_id order is the reverse of created_at order", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + const { clickhouse } = await setupClickhouseReplication({ + prisma, + databaseUrl: postgresContainer.getConnectionUri(), + clickhouseUrl: clickhouseContainer.getConnectionUrl(), + redisOptions, + }); + + const organization = await prisma.organization.create({ + data: { title: "test", slug: "test" }, + }); + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + // run_id ascending: a < b < c ; created_at ascending: c < b < a (reversed). + const ids = [ + "aaaaaaaaaaaaaaaaaaaaaaaa", + "bbbbbbbbbbbbbbbbbbbbbbbb", + "cccccccccccccccccccccccc", + ]; + const base = new Date("2026-06-04T16:55:07.000Z").getTime(); + for (let i = 0; i < ids.length; i++) { + await prisma.taskRun.create({ + data: { + id: ids[i], + createdAt: new Date(base + (ids.length - 1 - i) * 1000), + friendlyId: `run_${ids[i]}`, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: `trace_${i}`, + spanId: `span_${i}`, + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + } + + await setTimeout(1000); + + const runsRepository = new RunsRepository({ prisma, clickhouse }); + const baseOptions = { + projectId: project.id, + environmentId: runtimeEnvironment.id, + organizationId: organization.id, + }; + + // Forward order (created_at DESC) is [a, b, c]. First page (size 2) = {a, b}. + const firstPage = await runsRepository.listRuns({ + ...baseOptions, + page: { size: 2 }, + }); + const firstIds = firstPage.runs.map((r) => r.id).sort(); + expect(firstIds).toEqual(["aaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbb"]); + expect(firstPage.pagination.nextCursor).toBeTruthy(); + + // Forward to the second page = {c}; it exposes a previousCursor. + const secondPage = await runsRepository.listRuns({ + ...baseOptions, + page: { size: 2, cursor: firstPage.pagination.nextCursor!, direction: "forward" }, + }); + expect(secondPage.runs.map((r) => r.id)).toEqual(["cccccccccccccccccccccccc"]); + expect(secondPage.pagination.previousCursor).toBeTruthy(); + + // Stepping backward from the second page must land back on the first page + // exactly — no duplicated or skipped runs across the boundary. + const backPage = await runsRepository.listRuns({ + ...baseOptions, + page: { size: 2, cursor: secondPage.pagination.previousCursor!, direction: "backward" }, + }); + expect(backPage.runs.map((r) => r.id).sort()).toEqual(firstIds); + } + ); + + containerTest( + "legacy bare run_id cursor still uses the old (run_id-only) predicate for backwards compatibility", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + const { clickhouse } = await setupClickhouseReplication({ + prisma, + databaseUrl: postgresContainer.getConnectionUri(), + clickhouseUrl: clickhouseContainer.getConnectionUrl(), + redisOptions, + }); + + const organization = await prisma.organization.create({ + data: { title: "test", slug: "test" }, + }); + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + const ids = [ + "aaaaaaaaaaaaaaaaaaaaaaaa", + "bbbbbbbbbbbbbbbbbbbbbbbb", + "cccccccccccccccccccccccc", + ]; + const base = new Date("2026-06-04T16:55:07.000Z").getTime(); + for (let i = 0; i < ids.length; i++) { + await prisma.taskRun.create({ + data: { + id: ids[i], + createdAt: new Date(base + i * 1000), + friendlyId: `run_${ids[i]}`, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: `trace_${i}`, + spanId: `span_${i}`, + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + } + + await setTimeout(1000); + + const runsRepository = new RunsRepository({ prisma, clickhouse }); + + // A legacy cursor is a bare run_id. The old predicate is `run_id < cursor`, + // so passing the largest run_id must return the two smaller ones. + const page = await runsRepository.listRuns({ + projectId: project.id, + environmentId: runtimeEnvironment.id, + organizationId: organization.id, + page: { size: 10, cursor: "cccccccccccccccccccccccc", direction: "forward" }, + }); + + const returned = page.runs.map((r) => r.id).sort(); + expect(returned).toEqual([ + "aaaaaaaaaaaaaaaaaaaaaaaa", + "bbbbbbbbbbbbbbbbbbbbbbbb", + ]); + } + ); +}); diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index 77dca1f7726..01612a6ccdb 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -366,6 +366,10 @@ export function insertRawTaskRunPayloads(ch: ClickhouseWriter, settings?: ClickH export const TaskRunV2QueryResult = z.object({ run_id: z.string(), + // Milliseconds since epoch. Returned as a JSON number because the client sets + // output_format_json_quote_64bit_integers: 0. Used to build composite keyset + // cursors over (created_at, run_id) — see runsRepository.server.ts. + created_at_ms: z.number().int(), }); export type TaskRunV2QueryResult = z.infer; @@ -373,7 +377,8 @@ export type TaskRunV2QueryResult = z.infer; export function getTaskRunsQueryBuilder(ch: ClickhouseReader, settings?: ClickHouseSettings) { return ch.queryBuilder({ name: "getTaskRuns", - baseQuery: "SELECT run_id FROM trigger_dev.task_runs_v2 FINAL", + baseQuery: + "SELECT run_id, toUnixTimestamp64Milli(created_at) AS created_at_ms FROM trigger_dev.task_runs_v2 FINAL", schema: TaskRunV2QueryResult, settings, });