Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .server-changes/runs-replication-utf16-recovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
area: webapp
type: fix
---

Recover from ClickHouse `JSONEachRow` parse failures in the runs
replication path. `RunsReplicationService` now wraps its task-run and
payload inserts with the same reactive-sanitisation pattern used by
`ClickhouseEventRepository` since #3659: on `Cannot parse JSON object`,
sanitize lone UTF-16 surrogates across the batch (via the shared
`sanitizeRows` helper) and retry once. If the sanitiser found nothing
or the retry also fails, the batch is dropped, `permanentlyDroppedBatches`
increments, and a loud error log is emitted — preventing the surrounding
`#insertWithRetry` layer from spinning on the same deterministic
failure. Non-parse errors propagate unchanged.

Stops the bleeding behind the customer-visible "Tasks page shows a huge
Running count" symptom: one row with bad output JSON used to take down
the COMPLETED updates for its 50+ batch-mates, leaving every one of
them stranded in `EXECUTING` in ClickHouse forever (Postgres unaffected).
207 changes: 169 additions & 38 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ import EventEmitter from "node:events";
import pLimit from "p-limit";
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
import { calculateErrorFingerprint } from "~/utils/errorFingerprinting";
import {
isClickHouseJsonParseError,
parseRowNumberFromError,
sanitizeRows,
} from "~/v3/eventRepository/sanitizeRowsOnParseError.server";

interface TransactionEvent<T = any> {
tag: "insert" | "update" | "delete";
Expand Down Expand Up @@ -129,6 +134,15 @@ export class RunsReplicationService {
private _disablePayloadInsert: boolean;
private _disableErrorFingerprinting: boolean;

/**
* Counts batches that hit a ClickHouse `Cannot parse JSON object` failure
* that survived one sanitize-retry. These batches are dropped on the floor
* (returning success-ish to the caller so the retry layer doesn't spin on
* the same deterministic failure), and we track the drop count for
* observability. Counter only — does not gate behaviour.
*/
private _permanentlyDroppedBatches = 0;

// Metrics
private _replicationLagHistogram: Histogram;
private _batchesFlushedCounter: Counter;
Expand Down Expand Up @@ -283,6 +297,11 @@ export class RunsReplicationService {
this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000;
}

/** Exposed for tests and metrics — total batches lost to unrecoverable parse errors. */
get permanentlyDroppedBatches() {
return this._permanentlyDroppedBatches;
}

public async shutdown() {
if (this._isShuttingDown) return;

Expand Down Expand Up @@ -658,7 +677,7 @@ export class RunsReplicationService {
combinedTaskRunInserts.push(...group.taskRunInserts);
combinedPayloadInserts.push(...group.payloadInserts);

const [trErr] = await this.#insertWithRetry(
const [trErr, trOutcome] = await this.#insertWithRetry(
(attempt) => this.#insertTaskRunInserts(clickhouse, group.taskRunInserts, attempt),
"task run inserts",
flushId
Expand All @@ -667,7 +686,7 @@ export class RunsReplicationService {
taskRunError = trErr;
}

const [plErr] = await this.#insertWithRetry(
const [plErr, plOutcome] = await this.#insertWithRetry(
(attempt) => this.#insertPayloadInserts(clickhouse, group.payloadInserts, attempt),
"payload inserts",
flushId
Expand All @@ -676,10 +695,14 @@ export class RunsReplicationService {
payloadError = plErr;
}

if (!trErr) {
// Only count rows that actually landed in ClickHouse. `kind: "dropped"`
// means the recovery wrapper bailed (sanitizer no-op or sanitize-retry
// still failed) — those rows never made it, so they must not show up
// as successful inserts in the per-batch counter.
if (!trErr && trOutcome?.kind !== "dropped") {
this._taskRunsInsertedCounter.add(group.taskRunInserts.length);
}
if (!plErr) {
if (!plErr && plOutcome?.kind !== "dropped") {
this._payloadsInsertedCounter.add(group.payloadInserts.length);
}
}
Expand Down Expand Up @@ -837,24 +860,28 @@ export class RunsReplicationService {
return;
}
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
const [insertError, insertResult] =
await clickhouse.taskRuns.insertCompactArrays(taskRunInserts, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
});

if (insertError) {
this.logger.error("Error inserting task run inserts attempt", {
error: insertError,
attempt,
});

recordSpanError(span, insertError);
throw insertError;
}

return insertResult;
const doInsert = async () => {
const [insertError, insertResult] = await clickhouse.taskRuns.insertCompactArrays(
taskRunInserts,
{ params: { clickhouse_settings: this.#getClickhouseInsertSettings() } }
);
if (insertError) {
this.logger.error("Error inserting task run inserts attempt", {
error: insertError,
attempt,
});
recordSpanError(span, insertError);
throw insertError;
}
return insertResult;
};

return await this.#insertWithJsonParseRecovery(
taskRunInserts,
doInsert,
"task_runs_v2",
attempt
);
});
}

Expand All @@ -867,25 +894,129 @@ export class RunsReplicationService {
return;
}
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
const [insertError, insertResult] =
await clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
});

if (insertError) {
this.logger.error("Error inserting payload inserts attempt", {
error: insertError,
attempt,
});
const doInsert = async () => {
const [insertError, insertResult] = await clickhouse.taskRuns.insertPayloadsCompactArrays(
payloadInserts,
{ params: { clickhouse_settings: this.#getClickhouseInsertSettings() } }
);
if (insertError) {
this.logger.error("Error inserting payload inserts attempt", {
error: insertError,
attempt,
});
recordSpanError(span, insertError);
throw insertError;
}
return insertResult;
};

return await this.#insertWithJsonParseRecovery(
payloadInserts,
doInsert,
"raw_task_runs_payload_v1",
attempt
);
});
}

recordSpanError(span, insertError);
throw insertError;
/**
* Wraps a ClickHouse insert with reactive UTF-16 sanitization for
* `Cannot parse JSON object` rejections. Mirrors the pattern from
* `ClickhouseEventRepository.#insertWithJsonParseRecovery` introduced
* in #3659 — same root cause (lone UTF-16 surrogates in user-provided
* JSON), same recovery shape:
*
* 1. Try the insert. Healthy batches pay zero scan cost.
* 2. On parse error, walk the whole batch via `sanitizeRows` and
* replace any lone-surrogate string with `"[invalid-utf16]"`.
* 3. Retry once. If the sanitizer found nothing or the retry also
* fails with the same error class, drop the batch loudly and
* return — do NOT rethrow, otherwise the surrounding
* `#insertWithRetry` layer would spin three more times on the
* same deterministic failure.
* 4. Non-parse errors propagate unchanged so the existing
* transient-retry path still handles them.
*
* The whole-batch scan (rather than slicing on the `at row N` hint) is
* deliberate: `at row N` semantics under `input_format_parallel_parsing`
* aren't stable enough to safely skip rows. The cost is bounded because
* `detectBadJsonStrings` exits in O(1) for clean strings.
*/
async #insertWithJsonParseRecovery<T extends object>(
rows: T[],
doInsert: () => Promise<unknown>,
contextLabel: string,
attempt: number
): Promise<
| { kind: "inserted"; insertResult: unknown }
| { kind: "sanitized"; insertResult: unknown }
| { kind: "dropped" }
> {
try {
return { kind: "inserted", insertResult: await doInsert() };
} catch (firstError) {
if (!isClickHouseJsonParseError(firstError)) throw firstError;

const firstMessage =
typeof firstError === "object" && firstError !== null && "message" in firstError
? String((firstError as { message?: unknown }).message ?? "")
: String(firstError);

const rowHint = parseRowNumberFromError(firstMessage);
const { rowsTouched, fieldsSanitized } = sanitizeRows(rows);

if (fieldsSanitized === 0) {
this._permanentlyDroppedBatches += 1;
this.logger.error(
"Dropped batch — ClickHouse JSON parse error but sanitizer found nothing to fix",
{
contextLabel,
attempt,
batchSize: rows.length,
clickhouseRowHint: rowHint,
permanentlyDroppedBatches: this._permanentlyDroppedBatches,
sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024),
clickhouseError: firstMessage.split("\n")[0],
}
);
return { kind: "dropped" };
}

return insertResult;
});
this.logger.warn("Sanitizing batch after ClickHouse JSON parse error", {
contextLabel,
attempt,
batchSize: rows.length,
clickhouseRowHint: rowHint,
rowsTouched,
fieldsSanitized,
clickhouseError: firstMessage.split("\n")[0],
});

try {
return { kind: "sanitized", insertResult: await doInsert() };
} catch (retryError) {
if (!isClickHouseJsonParseError(retryError)) throw retryError;

this._permanentlyDroppedBatches += 1;
const retryMessage =
typeof retryError === "object" && retryError !== null && "message" in retryError
? String((retryError as { message?: unknown }).message ?? "")
: String(retryError);
this.logger.error(
"Dropped batch after sanitize-retry still hit ClickHouse JSON parse error",
{
contextLabel,
attempt,
batchSize: rows.length,
permanentlyDroppedBatches: this._permanentlyDroppedBatches,
sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024),
firstError: firstMessage.split("\n")[0],
retryError: retryMessage.split("\n")[0],
}
);
return { kind: "dropped" };
}
}
}

async #prepareRunInserts(
Expand Down