From 8957a01abf9f04f25e0b3dbbf17f99185ccd14e8 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Fri, 22 May 2026 17:12:31 +0100 Subject: [PATCH 1/2] fix(webapp): recover from ClickHouse JSON parse failures in runs replication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On a `Cannot parse JSON object` rejection, sanitize lone UTF-16 surrogates across the batch via the existing `sanitizeRows` helper and retry once. Drop the batch loudly if the sanitizer found nothing or the retry also fails, so the surrounding `#insertWithRetry` layer doesn't spin on a deterministic failure. Non-parse errors propagate unchanged. Mirrors the pattern shipped for ClickhouseEventRepository in #3659 — same root cause (lone surrogates in user-provided JSON), same recovery shape, same shared helpers. Fixes the customer-facing symptom from TRI-9755: one row's bad output JSON used to kill the COMPLETED updates for its 50+ batch-mates, stranding them in EXECUTING in ClickHouse forever and inflating "Running" counts on the Tasks page. Co-Authored-By: Claude Opus 4.7 --- .../runs-replication-utf16-recovery.md | 20 ++ .../services/runsReplicationService.server.ts | 197 +++++++++++++++--- 2 files changed, 183 insertions(+), 34 deletions(-) create mode 100644 .server-changes/runs-replication-utf16-recovery.md diff --git a/.server-changes/runs-replication-utf16-recovery.md b/.server-changes/runs-replication-utf16-recovery.md new file mode 100644 index 00000000000..4a998f2a31e --- /dev/null +++ b/.server-changes/runs-replication-utf16-recovery.md @@ -0,0 +1,20 @@ +--- +area: webapp +type: fix +--- + +Recover from ClickHouse `JSONEachRow` parse failures in the runs +replication path. `RunsReplicationService` now wraps its task-run and +payload inserts with the same reactive-sanitisation pattern used by +`ClickhouseEventRepository` since #3659: on `Cannot parse JSON object`, +sanitize lone UTF-16 surrogates across the batch (via the shared +`sanitizeRows` helper) and retry once. If the sanitiser found nothing +or the retry also fails, the batch is dropped, `permanentlyDroppedBatches` +increments, and a loud error log is emitted — preventing the surrounding +`#insertWithRetry` layer from spinning on the same deterministic +failure. Non-parse errors propagate unchanged. + +Stops the bleeding behind the customer-visible "Tasks page shows a huge +Running count" symptom: one row with bad output JSON used to take down +the COMPLETED updates for its 50+ batch-mates, leaving every one of +them stranded in `EXECUTING` in ClickHouse forever (Postgres unaffected). diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 6b62cc76b27..9ff6fa6d68c 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -38,6 +38,11 @@ import EventEmitter from "node:events"; import pLimit from "p-limit"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { calculateErrorFingerprint } from "~/utils/errorFingerprinting"; +import { + isClickHouseJsonParseError, + parseRowNumberFromError, + sanitizeRows, +} from "~/v3/eventRepository/sanitizeRowsOnParseError.server"; interface TransactionEvent { tag: "insert" | "update" | "delete"; @@ -129,6 +134,15 @@ export class RunsReplicationService { private _disablePayloadInsert: boolean; private _disableErrorFingerprinting: boolean; + /** + * Counts batches that hit a ClickHouse `Cannot parse JSON object` failure + * that survived one sanitize-retry. These batches are dropped on the floor + * (returning success-ish to the caller so the retry layer doesn't spin on + * the same deterministic failure), and we track the drop count for + * observability. Counter only — does not gate behaviour. + */ + private _permanentlyDroppedBatches = 0; + // Metrics private _replicationLagHistogram: Histogram; private _batchesFlushedCounter: Counter; @@ -283,6 +297,11 @@ export class RunsReplicationService { this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000; } + /** Exposed for tests and metrics — total batches lost to unrecoverable parse errors. */ + get permanentlyDroppedBatches() { + return this._permanentlyDroppedBatches; + } + public async shutdown() { if (this._isShuttingDown) return; @@ -837,24 +856,29 @@ export class RunsReplicationService { return; } return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { - const [insertError, insertResult] = - await clickhouse.taskRuns.insertCompactArrays(taskRunInserts, { - params: { - clickhouse_settings: this.#getClickhouseInsertSettings(), - }, - }); - - if (insertError) { - this.logger.error("Error inserting task run inserts attempt", { - error: insertError, - attempt, - }); - - recordSpanError(span, insertError); - throw insertError; - } - - return insertResult; + const doInsert = async () => { + const [insertError, insertResult] = await clickhouse.taskRuns.insertCompactArrays( + taskRunInserts, + { params: { clickhouse_settings: this.#getClickhouseInsertSettings() } } + ); + if (insertError) { + this.logger.error("Error inserting task run inserts attempt", { + error: insertError, + attempt, + }); + recordSpanError(span, insertError); + throw insertError; + } + return insertResult; + }; + + const outcome = await this.#insertWithJsonParseRecovery( + taskRunInserts, + doInsert, + "task_runs_v2", + attempt + ); + return outcome.kind === "dropped" ? undefined : outcome.insertResult; }); } @@ -867,25 +891,130 @@ export class RunsReplicationService { return; } return await startSpan(this._tracer, "insertPayloadInserts", async (span) => { - const [insertError, insertResult] = - await clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, { - params: { - clickhouse_settings: this.#getClickhouseInsertSettings(), - }, - }); - - if (insertError) { - this.logger.error("Error inserting payload inserts attempt", { - error: insertError, - attempt, - }); + const doInsert = async () => { + const [insertError, insertResult] = await clickhouse.taskRuns.insertPayloadsCompactArrays( + payloadInserts, + { params: { clickhouse_settings: this.#getClickhouseInsertSettings() } } + ); + if (insertError) { + this.logger.error("Error inserting payload inserts attempt", { + error: insertError, + attempt, + }); + recordSpanError(span, insertError); + throw insertError; + } + return insertResult; + }; + + const outcome = await this.#insertWithJsonParseRecovery( + payloadInserts, + doInsert, + "raw_task_runs_payload_v1", + attempt + ); + return outcome.kind === "dropped" ? undefined : outcome.insertResult; + }); + } - recordSpanError(span, insertError); - throw insertError; + /** + * Wraps a ClickHouse insert with reactive UTF-16 sanitization for + * `Cannot parse JSON object` rejections. Mirrors the pattern from + * `ClickhouseEventRepository.#insertWithJsonParseRecovery` introduced + * in #3659 — same root cause (lone UTF-16 surrogates in user-provided + * JSON), same recovery shape: + * + * 1. Try the insert. Healthy batches pay zero scan cost. + * 2. On parse error, walk the whole batch via `sanitizeRows` and + * replace any lone-surrogate string with `"[invalid-utf16]"`. + * 3. Retry once. If the sanitizer found nothing or the retry also + * fails with the same error class, drop the batch loudly and + * return — do NOT rethrow, otherwise the surrounding + * `#insertWithRetry` layer would spin three more times on the + * same deterministic failure. + * 4. Non-parse errors propagate unchanged so the existing + * transient-retry path still handles them. + * + * The whole-batch scan (rather than slicing on the `at row N` hint) is + * deliberate: `at row N` semantics under `input_format_parallel_parsing` + * aren't stable enough to safely skip rows. The cost is bounded because + * `detectBadJsonStrings` exits in O(1) for clean strings. + */ + async #insertWithJsonParseRecovery( + rows: T[], + doInsert: () => Promise, + contextLabel: string, + attempt: number + ): Promise< + | { kind: "inserted"; insertResult: unknown } + | { kind: "sanitized"; insertResult: unknown } + | { kind: "dropped" } + > { + try { + return { kind: "inserted", insertResult: await doInsert() }; + } catch (firstError) { + if (!isClickHouseJsonParseError(firstError)) throw firstError; + + const firstMessage = + typeof firstError === "object" && firstError !== null && "message" in firstError + ? String((firstError as { message?: unknown }).message ?? "") + : String(firstError); + + const rowHint = parseRowNumberFromError(firstMessage); + const { rowsTouched, fieldsSanitized } = sanitizeRows(rows); + + if (fieldsSanitized === 0) { + this._permanentlyDroppedBatches += 1; + this.logger.error( + "Dropped batch — ClickHouse JSON parse error but sanitizer found nothing to fix", + { + contextLabel, + attempt, + batchSize: rows.length, + clickhouseRowHint: rowHint, + permanentlyDroppedBatches: this._permanentlyDroppedBatches, + sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024), + clickhouseError: firstMessage.split("\n")[0], + } + ); + return { kind: "dropped" }; } - return insertResult; - }); + this.logger.warn("Sanitizing batch after ClickHouse JSON parse error", { + contextLabel, + attempt, + batchSize: rows.length, + clickhouseRowHint: rowHint, + rowsTouched, + fieldsSanitized, + clickhouseError: firstMessage.split("\n")[0], + }); + + try { + return { kind: "sanitized", insertResult: await doInsert() }; + } catch (retryError) { + if (!isClickHouseJsonParseError(retryError)) throw retryError; + + this._permanentlyDroppedBatches += 1; + const retryMessage = + typeof retryError === "object" && retryError !== null && "message" in retryError + ? String((retryError as { message?: unknown }).message ?? "") + : String(retryError); + this.logger.error( + "Dropped batch after sanitize-retry still hit ClickHouse JSON parse error", + { + contextLabel, + attempt, + batchSize: rows.length, + permanentlyDroppedBatches: this._permanentlyDroppedBatches, + sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024), + firstError: firstMessage.split("\n")[0], + retryError: retryMessage.split("\n")[0], + } + ); + return { kind: "dropped" }; + } + } } async #prepareRunInserts( From bea811ac41d77d56996648a795be471069d2aed7 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Fri, 22 May 2026 17:21:29 +0100 Subject: [PATCH 2/2] fix(webapp): don't count dropped batches as successful inserts in metrics Devin caught this: when #insertWithJsonParseRecovery drops a batch (sanitizer no-op, or sanitize-retry still hit a parse error), #insertTaskRunInserts was previously converting `{kind: "dropped"}` to `undefined`, so #insertWithRetry saw `[null, undefined]` (no error) and #flushBatch ticked `_taskRunsInsertedCounter` / `_payloadsInsertedCounter` for rows that never landed in ClickHouse. Fix: return the recovery wrapper's outcome straight through. #flushBatch now reads the outcome and only increments the success counter when both `!err` AND `outcome?.kind !== "dropped"`. Matches the pattern in ClickhouseEventRepository where the caller explicitly bails on `outcome.kind === "dropped"` before downstream success work. Co-Authored-By: Claude Opus 4.7 --- .../services/runsReplicationService.server.ts | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 9ff6fa6d68c..39bfd379ecb 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -677,7 +677,7 @@ export class RunsReplicationService { combinedTaskRunInserts.push(...group.taskRunInserts); combinedPayloadInserts.push(...group.payloadInserts); - const [trErr] = await this.#insertWithRetry( + const [trErr, trOutcome] = await this.#insertWithRetry( (attempt) => this.#insertTaskRunInserts(clickhouse, group.taskRunInserts, attempt), "task run inserts", flushId @@ -686,7 +686,7 @@ export class RunsReplicationService { taskRunError = trErr; } - const [plErr] = await this.#insertWithRetry( + const [plErr, plOutcome] = await this.#insertWithRetry( (attempt) => this.#insertPayloadInserts(clickhouse, group.payloadInserts, attempt), "payload inserts", flushId @@ -695,10 +695,14 @@ export class RunsReplicationService { payloadError = plErr; } - if (!trErr) { + // Only count rows that actually landed in ClickHouse. `kind: "dropped"` + // means the recovery wrapper bailed (sanitizer no-op or sanitize-retry + // still failed) — those rows never made it, so they must not show up + // as successful inserts in the per-batch counter. + if (!trErr && trOutcome?.kind !== "dropped") { this._taskRunsInsertedCounter.add(group.taskRunInserts.length); } - if (!plErr) { + if (!plErr && plOutcome?.kind !== "dropped") { this._payloadsInsertedCounter.add(group.payloadInserts.length); } } @@ -872,13 +876,12 @@ export class RunsReplicationService { return insertResult; }; - const outcome = await this.#insertWithJsonParseRecovery( + return await this.#insertWithJsonParseRecovery( taskRunInserts, doInsert, "task_runs_v2", attempt ); - return outcome.kind === "dropped" ? undefined : outcome.insertResult; }); } @@ -907,13 +910,12 @@ export class RunsReplicationService { return insertResult; }; - const outcome = await this.#insertWithJsonParseRecovery( + return await this.#insertWithJsonParseRecovery( payloadInserts, doInsert, "raw_task_runs_payload_v1", attempt ); - return outcome.kind === "dropped" ? undefined : outcome.insertResult; }); }