diff --git a/.server-changes/runs-replication-utf16-recovery.md b/.server-changes/runs-replication-utf16-recovery.md new file mode 100644 index 0000000000..4a998f2a31 --- /dev/null +++ b/.server-changes/runs-replication-utf16-recovery.md @@ -0,0 +1,20 @@ +--- +area: webapp +type: fix +--- + +Recover from ClickHouse `JSONEachRow` parse failures in the runs +replication path. `RunsReplicationService` now wraps its task-run and +payload inserts with the same reactive-sanitisation pattern used by +`ClickhouseEventRepository` since #3659: on `Cannot parse JSON object`, +sanitize lone UTF-16 surrogates across the batch (via the shared +`sanitizeRows` helper) and retry once. If the sanitiser found nothing +or the retry also fails, the batch is dropped, `permanentlyDroppedBatches` +increments, and a loud error log is emitted — preventing the surrounding +`#insertWithRetry` layer from spinning on the same deterministic +failure. Non-parse errors propagate unchanged. + +Stops the bleeding behind the customer-visible "Tasks page shows a huge +Running count" symptom: one row with bad output JSON used to take down +the COMPLETED updates for its 50+ batch-mates, leaving every one of +them stranded in `EXECUTING` in ClickHouse forever (Postgres unaffected). diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 6b62cc76b2..39bfd379ec 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -38,6 +38,11 @@ import EventEmitter from "node:events"; import pLimit from "p-limit"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { calculateErrorFingerprint } from "~/utils/errorFingerprinting"; +import { + isClickHouseJsonParseError, + parseRowNumberFromError, + sanitizeRows, +} from "~/v3/eventRepository/sanitizeRowsOnParseError.server"; interface TransactionEvent { tag: "insert" | "update" | "delete"; @@ -129,6 +134,15 @@ export class RunsReplicationService { private _disablePayloadInsert: boolean; private _disableErrorFingerprinting: boolean; + /** + * Counts batches that hit a ClickHouse `Cannot parse JSON object` failure + * that survived one sanitize-retry. These batches are dropped on the floor + * (returning success-ish to the caller so the retry layer doesn't spin on + * the same deterministic failure), and we track the drop count for + * observability. Counter only — does not gate behaviour. + */ + private _permanentlyDroppedBatches = 0; + // Metrics private _replicationLagHistogram: Histogram; private _batchesFlushedCounter: Counter; @@ -283,6 +297,11 @@ export class RunsReplicationService { this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000; } + /** Exposed for tests and metrics — total batches lost to unrecoverable parse errors. */ + get permanentlyDroppedBatches() { + return this._permanentlyDroppedBatches; + } + public async shutdown() { if (this._isShuttingDown) return; @@ -658,7 +677,7 @@ export class RunsReplicationService { combinedTaskRunInserts.push(...group.taskRunInserts); combinedPayloadInserts.push(...group.payloadInserts); - const [trErr] = await this.#insertWithRetry( + const [trErr, trOutcome] = await this.#insertWithRetry( (attempt) => this.#insertTaskRunInserts(clickhouse, group.taskRunInserts, attempt), "task run inserts", flushId @@ -667,7 +686,7 @@ export class RunsReplicationService { taskRunError = trErr; } - const [plErr] = await this.#insertWithRetry( + const [plErr, plOutcome] = await this.#insertWithRetry( (attempt) => this.#insertPayloadInserts(clickhouse, group.payloadInserts, attempt), "payload inserts", flushId @@ -676,10 +695,14 @@ export class RunsReplicationService { payloadError = plErr; } - if (!trErr) { + // Only count rows that actually landed in ClickHouse. `kind: "dropped"` + // means the recovery wrapper bailed (sanitizer no-op or sanitize-retry + // still failed) — those rows never made it, so they must not show up + // as successful inserts in the per-batch counter. + if (!trErr && trOutcome?.kind !== "dropped") { this._taskRunsInsertedCounter.add(group.taskRunInserts.length); } - if (!plErr) { + if (!plErr && plOutcome?.kind !== "dropped") { this._payloadsInsertedCounter.add(group.payloadInserts.length); } } @@ -837,24 +860,28 @@ export class RunsReplicationService { return; } return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { - const [insertError, insertResult] = - await clickhouse.taskRuns.insertCompactArrays(taskRunInserts, { - params: { - clickhouse_settings: this.#getClickhouseInsertSettings(), - }, - }); - - if (insertError) { - this.logger.error("Error inserting task run inserts attempt", { - error: insertError, - attempt, - }); - - recordSpanError(span, insertError); - throw insertError; - } - - return insertResult; + const doInsert = async () => { + const [insertError, insertResult] = await clickhouse.taskRuns.insertCompactArrays( + taskRunInserts, + { params: { clickhouse_settings: this.#getClickhouseInsertSettings() } } + ); + if (insertError) { + this.logger.error("Error inserting task run inserts attempt", { + error: insertError, + attempt, + }); + recordSpanError(span, insertError); + throw insertError; + } + return insertResult; + }; + + return await this.#insertWithJsonParseRecovery( + taskRunInserts, + doInsert, + "task_runs_v2", + attempt + ); }); } @@ -867,25 +894,129 @@ export class RunsReplicationService { return; } return await startSpan(this._tracer, "insertPayloadInserts", async (span) => { - const [insertError, insertResult] = - await clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, { - params: { - clickhouse_settings: this.#getClickhouseInsertSettings(), - }, - }); - - if (insertError) { - this.logger.error("Error inserting payload inserts attempt", { - error: insertError, - attempt, - }); + const doInsert = async () => { + const [insertError, insertResult] = await clickhouse.taskRuns.insertPayloadsCompactArrays( + payloadInserts, + { params: { clickhouse_settings: this.#getClickhouseInsertSettings() } } + ); + if (insertError) { + this.logger.error("Error inserting payload inserts attempt", { + error: insertError, + attempt, + }); + recordSpanError(span, insertError); + throw insertError; + } + return insertResult; + }; + + return await this.#insertWithJsonParseRecovery( + payloadInserts, + doInsert, + "raw_task_runs_payload_v1", + attempt + ); + }); + } - recordSpanError(span, insertError); - throw insertError; + /** + * Wraps a ClickHouse insert with reactive UTF-16 sanitization for + * `Cannot parse JSON object` rejections. Mirrors the pattern from + * `ClickhouseEventRepository.#insertWithJsonParseRecovery` introduced + * in #3659 — same root cause (lone UTF-16 surrogates in user-provided + * JSON), same recovery shape: + * + * 1. Try the insert. Healthy batches pay zero scan cost. + * 2. On parse error, walk the whole batch via `sanitizeRows` and + * replace any lone-surrogate string with `"[invalid-utf16]"`. + * 3. Retry once. If the sanitizer found nothing or the retry also + * fails with the same error class, drop the batch loudly and + * return — do NOT rethrow, otherwise the surrounding + * `#insertWithRetry` layer would spin three more times on the + * same deterministic failure. + * 4. Non-parse errors propagate unchanged so the existing + * transient-retry path still handles them. + * + * The whole-batch scan (rather than slicing on the `at row N` hint) is + * deliberate: `at row N` semantics under `input_format_parallel_parsing` + * aren't stable enough to safely skip rows. The cost is bounded because + * `detectBadJsonStrings` exits in O(1) for clean strings. + */ + async #insertWithJsonParseRecovery( + rows: T[], + doInsert: () => Promise, + contextLabel: string, + attempt: number + ): Promise< + | { kind: "inserted"; insertResult: unknown } + | { kind: "sanitized"; insertResult: unknown } + | { kind: "dropped" } + > { + try { + return { kind: "inserted", insertResult: await doInsert() }; + } catch (firstError) { + if (!isClickHouseJsonParseError(firstError)) throw firstError; + + const firstMessage = + typeof firstError === "object" && firstError !== null && "message" in firstError + ? String((firstError as { message?: unknown }).message ?? "") + : String(firstError); + + const rowHint = parseRowNumberFromError(firstMessage); + const { rowsTouched, fieldsSanitized } = sanitizeRows(rows); + + if (fieldsSanitized === 0) { + this._permanentlyDroppedBatches += 1; + this.logger.error( + "Dropped batch — ClickHouse JSON parse error but sanitizer found nothing to fix", + { + contextLabel, + attempt, + batchSize: rows.length, + clickhouseRowHint: rowHint, + permanentlyDroppedBatches: this._permanentlyDroppedBatches, + sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024), + clickhouseError: firstMessage.split("\n")[0], + } + ); + return { kind: "dropped" }; } - return insertResult; - }); + this.logger.warn("Sanitizing batch after ClickHouse JSON parse error", { + contextLabel, + attempt, + batchSize: rows.length, + clickhouseRowHint: rowHint, + rowsTouched, + fieldsSanitized, + clickhouseError: firstMessage.split("\n")[0], + }); + + try { + return { kind: "sanitized", insertResult: await doInsert() }; + } catch (retryError) { + if (!isClickHouseJsonParseError(retryError)) throw retryError; + + this._permanentlyDroppedBatches += 1; + const retryMessage = + typeof retryError === "object" && retryError !== null && "message" in retryError + ? String((retryError as { message?: unknown }).message ?? "") + : String(retryError); + this.logger.error( + "Dropped batch after sanitize-retry still hit ClickHouse JSON parse error", + { + contextLabel, + attempt, + batchSize: rows.length, + permanentlyDroppedBatches: this._permanentlyDroppedBatches, + sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024), + firstError: firstMessage.split("\n")[0], + retryError: retryMessage.split("\n")[0], + } + ); + return { kind: "dropped" }; + } + } } async #prepareRunInserts(