Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/bulk-action-cursor-pagination.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Fix run pagination that could duplicate or skip runs: the query orders by `(created_at, run_id)` but the cursor cut on `run_id` alone, which diverges when run_id order doesn't match created_at order (e.g. bulk replay re-processing runs). Cursors now encode the composite key as an opaque token and cut on the matching tuple; legacy bare-run_id cursors stay supported for in-flight pagination.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Prisma orderBy still uses id:desc while ClickHouse uses (created_at, run_id)

At clickhouseRunsRepository.server.ts:182, the Prisma query orders by id: 'desc' while the ClickHouse query orders by (created_at DESC, run_id DESC). Since id and run_id are the same field (Postgres PK = ClickHouse run_id), this is id DESC vs (created_at DESC, id DESC). When created_at differs between rows, the Prisma ordering may not match the ClickHouse page ordering. This was a pre-existing issue (same orderBy existed before this PR) and the runs are displayed in a list where the frontend likely re-sorts or the difference is negligible, but it's worth noting.

(Refers to lines 181-183)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import {
convertRunListInputOptionsToFilterRunsOptions,
} from "./runsRepository.server";
import parseDuration from "parse-duration";
import { decodeRunsCursor, encodeRunsCursor } from "./runsCursor.server";

type RunCursorRow = { runId: string; createdAt: number };

export class ClickHouseRunsRepository implements IRunsRepository {
constructor(private readonly options: RunsRepositoryOptions) {}
Expand All @@ -18,25 +21,52 @@ export class ClickHouseRunsRepository implements IRunsRepository {
return "clickhouse";
}

async listRunIds(options: ListRunsOptions) {
/**
* Runs the keyset-paginated query and returns `{ runId, createdAt }` rows
* (one extra beyond `page.size` to signal "has more"). The ordering is always
* the composite `(created_at, run_id)`; the cursor predicate must match it.
*
* Composite cursors carry both components, so we cut on the
* `(created_at, run_id)` tuple — sound regardless of how run_id order relates
* to created_at order. Legacy bare-run_id cursors fall back to the old
* `run_id`-only predicate (knowingly unsound) for backwards compatibility
* with in-flight cursors.
*/
private async listRunRows(options: ListRunsOptions): Promise<RunCursorRow[]> {
const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder();
applyRunFiltersToQueryBuilder(
queryBuilder,
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
);

const forward = options.page.direction === "forward" || !options.page.direction;

if (options.page.cursor) {
if (options.page.direction === "forward" || !options.page.direction) {
queryBuilder
.where("run_id < {runId: String}", { runId: options.page.cursor })
.orderBy("created_at DESC, run_id DESC")
.limit(options.page.size + 1);
const decoded = decodeRunsCursor(options.page.cursor);

if (forward) {
if (decoded.kind === "composite") {
queryBuilder.where(
"(created_at, run_id) < (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})",
{ cursorCreatedAt: decoded.createdAt, runId: decoded.runId }
);
} else {
queryBuilder.where("run_id < {runId: String}", { runId: decoded.runId });
}
queryBuilder.orderBy("created_at DESC, run_id DESC");
} else {
queryBuilder
.where("run_id > {runId: String}", { runId: options.page.cursor })
.orderBy("created_at ASC, run_id ASC")
.limit(options.page.size + 1);
if (decoded.kind === "composite") {
queryBuilder.where(
"(created_at, run_id) > (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})",
{ cursorCreatedAt: decoded.createdAt, runId: decoded.runId }
);
} else {
queryBuilder.where("run_id > {runId: String}", { runId: decoded.runId });
}
queryBuilder.orderBy("created_at ASC, run_id ASC");
}

queryBuilder.limit(options.page.size + 1);
} else {
// Initial page - no cursor provided
queryBuilder.orderBy("created_at DESC, run_id DESC").limit(options.page.size + 1);
Expand All @@ -48,8 +78,31 @@ export class ClickHouseRunsRepository implements IRunsRepository {
throw queryError;
}

const runIds = result.map((row) => row.run_id);
return runIds;
return result.map((row) => ({ runId: row.run_id, createdAt: row.created_at_ms }));
}

async listRunIds(options: ListRunsOptions) {
const rows = await this.listRunRows(options);
return rows.map((row) => row.runId);
}

/**
* Forward-only batch iteration (bulk actions). Returns up to `page.size` run
* ids plus the composite cursor for the next batch (null when this batch is
* empty). The `created_at` component comes from the same query that orders the
* rows, so the next batch's tuple predicate is always consistent.
*/
async listRunIdsWithCursor(
options: ListRunsOptions
): Promise<{ runIds: string[]; nextCursor: string | null }> {
const rows = await this.listRunRows(options);
const batch = rows.slice(0, options.page.size);
const last = batch.at(-1);

return {
runIds: batch.map((row) => row.runId),
nextCursor: last ? encodeRunsCursor(last.createdAt, last.runId) : null,
};
}

async listFriendlyRunIds(options: ListRunsOptions) {
Expand All @@ -76,10 +129,15 @@ export class ClickHouseRunsRepository implements IRunsRepository {
}

async listRuns(options: ListRunsOptions) {
const runIds = await this.listRunIds(options);
const rows = await this.listRunRows(options);

// If there are more runs than the page size, we need to fetch the next page
const hasMore = runIds.length > options.page.size;
const hasMore = rows.length > options.page.size;

// Cursors carry both (created_at, run_id) so the next/prev page predicate
// matches the composite ordering — see runsCursor.server.ts.
const cursorFor = (row: RunCursorRow | undefined): string | null =>
row ? encodeRunsCursor(row.createdAt, row.runId) : null;

let nextCursor: string | null = null;
let previousCursor: string | null = null;
Expand All @@ -88,30 +146,31 @@ export class ClickHouseRunsRepository implements IRunsRepository {
const direction = options.page.direction ?? "forward";
switch (direction) {
case "forward": {
previousCursor = options.page.cursor ? runIds.at(0) ?? null : null;
previousCursor = options.page.cursor ? cursorFor(rows.at(0)) : null;
if (hasMore) {
// The next cursor should be the last run ID from this page
nextCursor = runIds[options.page.size - 1];
// The next cursor should be the last run from this page
nextCursor = cursorFor(rows[options.page.size - 1]);
}
break;
}
case "backward": {
const reversedRunIds = [...runIds].reverse();
const reversedRows = [...rows].reverse();
if (hasMore) {
previousCursor = reversedRunIds.at(1) ?? null;
nextCursor = reversedRunIds.at(options.page.size) ?? null;
previousCursor = cursorFor(reversedRows.at(1));
nextCursor = cursorFor(reversedRows.at(options.page.size));
} else {
nextCursor = reversedRunIds.at(options.page.size - 1) ?? null;
nextCursor = cursorFor(reversedRows.at(options.page.size - 1));
}

break;
}
}

const runIdsToReturn =
const runIdsToReturn = (
options.page.direction === "backward" && hasMore
? runIds.slice(1, options.page.size + 1)
: runIds.slice(0, options.page.size);
? rows.slice(1, options.page.size + 1)
: rows.slice(0, options.page.size)
).map((row) => row.runId);

let runs = await this.options.prisma.taskRun.findMany({
where: {
Expand Down
47 changes: 47 additions & 0 deletions apps/webapp/app/services/runsRepository/runsCursor.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Cursor encoding for keyset pagination over `(created_at, run_id)`.
*
* The list query orders by the composite key `(created_at, run_id)`, so a sound
* cursor must carry BOTH components — cutting on `run_id` alone re-includes and
* skips rows whenever `run_id` order diverges from `created_at` order.
*
* A cursor is an opaque URL-safe base64 token wrapping `{ c: createdAtMs, r:
* runId }`. Cursors are server-issued (the SDK just echoes
* `pagination.next`/`previous` back), so this format needs no client update.
*
* Legacy cursors were the bare internal run_id (a cuid). They are detected by
* decode failure: a cuid base64-decodes to non-JSON bytes, so it falls through
* to `{ kind: "legacy" }` and the old (knowingly unsound) `run_id`-only
* predicate. In-flight legacy cursors keep working and drain naturally.
*/

import { z } from "zod";

export type DecodedRunsCursor =
| { kind: "composite"; createdAt: number; runId: string }
| { kind: "legacy"; runId: string };

// `c` = created_at (ms since epoch), `r` = run_id. Short keys keep the token small.
const CompositeCursor = z.object({
c: z.number().int(),
r: z.string().min(1),
});

export function encodeRunsCursor(createdAtMs: number, runId: string): string {
return Buffer.from(JSON.stringify({ c: createdAtMs, r: runId })).toString("base64url");
}

export function decodeRunsCursor(cursor: string): DecodedRunsCursor {
try {
const parsed = CompositeCursor.safeParse(
JSON.parse(Buffer.from(cursor, "base64url").toString("utf8"))
);
if (parsed.success) {
return { kind: "composite", createdAt: parsed.data.c, runId: parsed.data.r };
}
} catch {
// JSON.parse threw — not a composite cursor.
}

return { kind: "legacy", runId: cursor };
}
26 changes: 26 additions & 0 deletions apps/webapp/app/services/runsRepository/runsRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ export type TagList = {
export interface IRunsRepository {
name: string;
listRunIds(options: ListRunsOptions): Promise<string[]>;
/**
* Forward-only batch iteration (bulk actions). Returns up to `page.size` run
* ids and the composite cursor for the next batch (null when the batch is
* empty). Keeping cursor construction here ensures the `created_at` component
* comes from the same source as the ordering.
*/
listRunIdsWithCursor(
options: ListRunsOptions
): Promise<{ runIds: string[]; nextCursor: string | null }>;
/** Returns friendly IDs (e.g., run_xxx) instead of internal UUIDs. Used for ClickHouse task_events queries. */
listFriendlyRunIds(options: ListRunsOptions): Promise<string[]>;
listRuns(options: ListRunsOptions): Promise<{
Expand Down Expand Up @@ -169,6 +178,23 @@ export class RunsRepository implements IRunsRepository {
);
}

async listRunIdsWithCursor(
options: ListRunsOptions
): Promise<{ runIds: string[]; nextCursor: string | null }> {
return startActiveSpan(
"runsRepository.listRunIdsWithCursor",
async () => this.clickHouseRunsRepository.listRunIdsWithCursor(options),
{
attributes: {
"repository.name": "clickhouse",
organizationId: options.organizationId,
projectId: options.projectId,
environmentId: options.environmentId,
},
}
);
}

async listFriendlyRunIds(options: ListRunsOptions): Promise<string[]> {
return startActiveSpan(
"runsRepository.listFriendlyRunIds",
Expand Down
10 changes: 5 additions & 5 deletions apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ export class BulkActionService extends BaseService {
throw new Error(`Bulk action group has invalid query name: ${group.queryName}`);
}

// 2. Get the runs to process in this batch
const runIds = await runsRepository.listRunIds({
// 2. Get the runs to process in this batch, plus the cursor for the next
// batch. The cursor is a composite (created_at, run_id) keyset cursor so the
// next batch can't re-include or skip runs.
const { runIds: runIdsToProcess, nextCursor } = await runsRepository.listRunIdsWithCursor({
...filters,
page: {
size: env.BULK_ACTION_BATCH_SIZE,
Expand All @@ -172,8 +174,6 @@ export class BulkActionService extends BaseService {
// 3. Process the runs
let successCount = 0;
let failureCount = 0;
// Slice because we fetch an extra for the cursor
const runIdsToProcess = runIds.slice(0, env.BULK_ACTION_BATCH_SIZE);

switch (group.type) {
case BulkActionType.CANCEL: {
Expand Down Expand Up @@ -292,7 +292,7 @@ export class BulkActionService extends BaseService {
const updatedGroup = await this._prisma.bulkActionGroup.update({
where: { id: bulkActionId },
data: {
cursor: runIdsToProcess.at(runIdsToProcess.length - 1),
cursor: nextCursor,
successCount: {
increment: successCount,
},
Expand Down
Loading
Loading