From 001ccfd9ceded94a1379281ad87905beea4b1362 Mon Sep 17 00:00:00 2001 From: Karakatiza666 Date: Mon, 11 May 2026 17:21:48 +0000 Subject: [PATCH] [web-console] Fix Change Stream crashes on high-throughput streams Fix Change Stream losing section headers on high-throughput streams Signed-off-by: Karakatiza666 --- .../pipelines/editor/ChangeStream.svelte | 2 +- .../pipelines/editor/TabAdHocQuery.svelte | 15 +- .../pipelines/editor/TabChangeStream.svelte | 99 ++-- .../pipelines/editor/TabPerformance.svelte | 14 +- .../functions/pipelines/changeStream.spec.ts | 347 +++++++++++++ .../lib/functions/pipelines/changeStream.ts | 488 +++++++++++++++++- 6 files changed, 869 insertions(+), 96 deletions(-) create mode 100644 js-packages/web-console/src/lib/functions/pipelines/changeStream.spec.ts diff --git a/js-packages/web-console/src/lib/components/pipelines/editor/ChangeStream.svelte b/js-packages/web-console/src/lib/components/pipelines/editor/ChangeStream.svelte index 4c4b1a0de04..07e1b048afa 100644 --- a/js-packages/web-console/src/lib/components/pipelines/editor/ChangeStream.svelte +++ b/js-packages/web-console/src/lib/components/pipelines/editor/ChangeStream.svelte @@ -3,7 +3,7 @@ export type Row = | { relationName: string; columns: Field[] } | XgressEntry - | { skippedBytes: number } + | { relationName: string; skippedBytes: number } export type ChangeStreamData = { rows: Row[] headers: number[] diff --git a/js-packages/web-console/src/lib/components/pipelines/editor/TabAdHocQuery.svelte b/js-packages/web-console/src/lib/components/pipelines/editor/TabAdHocQuery.svelte index 2009737c8ed..e1ab4c7b151 100644 --- a/js-packages/web-console/src/lib/components/pipelines/editor/TabAdHocQuery.svelte +++ b/js-packages/web-console/src/lib/components/pipelines/editor/TabAdHocQuery.svelte @@ -15,10 +15,7 @@ import Query, { type Row, type QueryData } from '$lib/components/adhoc/Query.svelte' import { isPipelineInteractive } from '$lib/functions/pipelines/status' import type { SQLValueJS } from '$lib/types/sql' - import { - CustomJSONParserTransformStream, - parseCancellable - } from '$lib/functions/pipelines/changeStream' + import { createBigNumberStreamParser, parseStream } from '$lib/functions/pipelines/changeStream' import invariant from 'tiny-invariant' import WarningBanner from '$lib/components/pipelines/editor/WarningBanner.svelte' import { enclosure, reclosureKey } from '$lib/functions/common/function' @@ -117,8 +114,12 @@ } } } - const { cancel } = parseCancellable( + const { cancel } = parseStream( result, + createBigNumberStreamParser>({ + paths: ['$'], + separator: '' + }), { pushChanges, onBytesSkipped: (skippedBytes) => { @@ -147,10 +148,6 @@ injectValue({ error: e.message }) } }, - new CustomJSONParserTransformStream>({ - paths: ['$'], - separator: '' - }), { bufferSize: 8 * 1024 * 1024 } diff --git a/js-packages/web-console/src/lib/components/pipelines/editor/TabChangeStream.svelte b/js-packages/web-console/src/lib/components/pipelines/editor/TabChangeStream.svelte index f1309d66d1d..c8bca9c43a7 100644 --- a/js-packages/web-console/src/lib/components/pipelines/editor/TabChangeStream.svelte +++ b/js-packages/web-console/src/lib/components/pipelines/editor/TabChangeStream.svelte @@ -87,66 +87,61 @@ pipelinesRelations[tenantName][pipelineName][relationName].cancelStream = undefined return undefined } - const { cancel } = parseCancellable( + + // Build a section header for `relationName`. Prefers a sample data row to + // determine column order; falls back to the relation schema (e.g. for the + // skipped-bytes path, where there's no row to sample). + const buildSectionHeader = (sample?: XgressEntry): Row => { + const fields = pipelinesRelations[tenantName][pipelineName][relationName].fields + const columns: Field[] = sample + ? Object.keys('insert' in sample ? sample.insert : sample.delete).map( + (name) => fields[normalizeCaseIndependentName({ name })] + ) + : Object.values(fields) + return { relationName, columns } + } + + const appendForRelation = (newRows: Row[], sample?: XgressEntry) => + appendRowsForRelation( + changeStream[tenantName][pipelineName], + relationName, + newRows, + buildSectionHeader, + bufferSize, + sample + ) + + const { cancel } = parseStream( result, + createBigNumberStreamParser({ + paths: ['$.json_data.*'], + separator: '' + }), { pushChanges: (rows: XgressEntry[]) => { - const initialLen = changeStream[tenantName][pipelineName].rows.length - const lastRelationName = ((headerIdx) => - headerIdx !== undefined - ? ((header) => (header && 'relationName' in header ? header.relationName : null))( - changeStream[tenantName][pipelineName].rows[headerIdx] - ) - : null)(changeStream[tenantName][pipelineName].headers.at(-1)) - const offset = pushAsCircularBuffer( - () => changeStream[tenantName][pipelineName].rows, - bufferSize, - (v: Row) => v - )( - [ - ...(relationName !== lastRelationName - ? ([ - { - relationName, - columns: Object.keys( - ((row) => ('insert' in row ? row.insert : row.delete))(rows[0]) - ).map((name) => { - return pipelinesRelations[tenantName][pipelineName][relationName].fields[ - normalizeCaseIndependentName({ name }) - ] - }) - } - ] as Row[]) - : []) - ].concat(rows) - ) - if (relationName !== lastRelationName) { - changeStream[tenantName][pipelineName].headers.push(initialLen) - } - changeStream[tenantName][pipelineName].headers = changeStream[tenantName][ - pipelineName - ].headers - .map((i) => i - offset) - .filter((i) => i >= 0) + appendForRelation(rows as unknown as Row[], rows[0]) }, onBytesSkipped: (skippedBytes) => { - pushAsCircularBuffer( - () => changeStream[tenantName][pipelineName].rows, - bufferSize, - (v) => v - )([{ relationName, skippedBytes }]) - changeStream[tenantName][pipelineName].totalSkippedBytes += skippedBytes + const cs = changeStream[tenantName][pipelineName] + // Coalesce consecutive skip markers for the same relation: if the row at + // the tail is already a skip marker tagged with this relation, just bump + // its byte count in place instead of pushing another row. Keeps the + // change-stream view from getting flooded with one-line "Skipped N bytes" + // entries when backpressure drops sustained traffic. + const lastRow = cs.rows.at(-1) + if (lastRow && 'skippedBytes' in lastRow && lastRow.relationName === relationName) { + lastRow.skippedBytes += skippedBytes + } else { + appendForRelation([{ relationName, skippedBytes }]) + } + cs.totalSkippedBytes += skippedBytes }, onParseEnded: () => { pipelinesRelations[tenantName][pipelineName][relationName].cancelStream = undefined } }, - new CustomJSONParserTransformStream({ - paths: ['$.json_data.*'], - separator: '' - }), { - bufferSize: 8 * 1024 * 1024 + bufferSize: 4 * 1024 * 1024 } ) return () => { @@ -247,9 +242,9 @@ import { Pane, PaneGroup, PaneResizer } from 'paneforge' import type { Field, Relation } from '$lib/services/manager' import { - CustomJSONParserTransformStream, - parseCancellable, - pushAsCircularBuffer + appendRowsForRelation, + createBigNumberStreamParser, + parseStream } from '$lib/functions/pipelines/changeStream' import JSONbig from 'true-json-bigint' import { count, groupBy } from '$lib/functions/common/array' diff --git a/js-packages/web-console/src/lib/components/pipelines/editor/TabPerformance.svelte b/js-packages/web-console/src/lib/components/pipelines/editor/TabPerformance.svelte index 6c2cd2f42ee..e613649ae9a 100644 --- a/js-packages/web-console/src/lib/components/pipelines/editor/TabPerformance.svelte +++ b/js-packages/web-console/src/lib/components/pipelines/editor/TabPerformance.svelte @@ -24,8 +24,8 @@ import { formatDateTime, useElapsedTime } from '$lib/functions/format' import type { PipelineMetrics } from '$lib/functions/pipelineMetrics' import { - CustomJSONParserTransformStream, - parseCancellable, + createBigNumberStreamParser, + parseStream, pushAsCircularBuffer } from '$lib/functions/pipelines/changeStream' import { getDeploymentStatusLabel, isMetricsAvailable } from '$lib/functions/pipelines/status' @@ -104,8 +104,12 @@ cancelStream = undefined return undefined } - const { cancel } = parseCancellable( + const { cancel } = parseStream( result, + createBigNumberStreamParser({ + paths: ['$'], + separator: '' + }), { pushChanges: (rows: TimeSeriesEntry[]) => { pushAsCircularBuffer( @@ -134,10 +138,6 @@ } } }, - new CustomJSONParserTransformStream({ - paths: ['$'], - separator: '' - }), { bufferSize: 8 * 1024 * 1024 } diff --git a/js-packages/web-console/src/lib/functions/pipelines/changeStream.spec.ts b/js-packages/web-console/src/lib/functions/pipelines/changeStream.spec.ts new file mode 100644 index 00000000000..2ecc4193794 --- /dev/null +++ b/js-packages/web-console/src/lib/functions/pipelines/changeStream.spec.ts @@ -0,0 +1,347 @@ +/** + * Unit tests covering the two regressions the change-stream rewrite was meant to fix: + * + * 1. **Parsing correctness under overflow shedding.** The original implementation dropped + * arbitrary byte ranges mid-record, which sometimes produced syntactically valid + * but structurally broken JSON and crashed the UI. The current parser must only + * ever see complete top-level documents. + * 2. **Section header preservation under high throughput.** The original + * `pushAsCircularBuffer`-based push lost the only section header when rows shifted + * off the front, leaving orphan data rows. `appendRowsForRelation` must keep every + * surviving section preceded by its header. + */ + +import { BigNumber } from 'bignumber.js' +import { describe, expect, it } from 'vitest' +import type { ChangeStreamData, Row } from '$lib/components/pipelines/editor/ChangeStream.svelte' +import type { XgressEntry } from '$lib/services/pipelineManager' +import { + appendRowsForRelation, + createBigNumberStreamParser, + parseStream, + type StreamingJsonParser +} from './changeStream' + +// --- Mock stream factory --- + +const makeMockStream = (chunks: (Uint8Array | string)[]): ReadableStream => { + const encoder = new TextEncoder() + let i = 0 + return new ReadableStream({ + pull(controller) { + if (i < chunks.length) { + const c = chunks[i++] + controller.enqueue(typeof c === 'string' ? encoder.encode(c) : c) + } else { + controller.close() + } + } + }) +} + +// Runs `parseStream` over `chunks` and resolves the returned Promise once the stream has ended and the +// last flush has fired. Returns everything the consumer would have observed. +const runParseStream = ( + chunks: (Uint8Array | string)[], + parserOpts: Parameters>[0] = { + paths: ['$'], + separator: '' + }, + options?: Parameters>[3] +) => + new Promise<{ + values: T[] + skipped: number[] + parser: StreamingJsonParser + }>((resolve) => { + const values: T[] = [] + const skipped: number[] = [] + const parser = createBigNumberStreamParser(parserOpts) + parseStream( + { stream: makeMockStream(chunks), cancel: () => {} }, + parser, + { + pushChanges: (vs) => { + for (const v of vs) { + values.push(v) + } + }, + onBytesSkipped: (n) => skipped.push(n), + onParseEnded: () => resolve({ values, skipped, parser }) + }, + options + ) + }) + +// --- Row / Header builders --- + +type DataRow = Extract + +const insertRow = (data: Record): DataRow => + ({ insert: data }) as unknown as DataRow + +// A header row is `{ relationName, columns: Field[] }`. We don't exercise the Field +// shape in any of these tests, so `[] as unknown as Field[]` is fine. +const headerRow = (relationName: string): Row => + ({ relationName, columns: [] as unknown[] }) as unknown as Row + +const skipRow = (relationName: string, skippedBytes: number): Row => + ({ relationName, skippedBytes }) as unknown as Row + +const isHeader = (row: Row | undefined): boolean => !!row && 'columns' in (row as object) + +// A header-builder usable by the function-under-test that just returns a header for +// the relation the test is exercising. The optional `sample` argument is ignored. +const stubHeaderBuilder = (relationName: string) => () => headerRow(relationName) + +const emptyChangeStream = (): ChangeStreamData => ({ + rows: [], + headers: [], + totalSkippedBytes: 0 +}) + +describe('parseStream', () => { + it('parses NDJSON delivered in a single network chunk', async () => { + const { values, skipped } = await runParseStream<{ a: BigNumber }>([ + '{"a":1}\n{"a":2}\n{"a":3}\n' + ]) + expect(values.map((v) => v.a.toString())).toEqual(['1', '2', '3']) + expect(skipped).toEqual([]) + }) + + it('parses a line that straddles two network chunks', async () => { + const { values, skipped } = await runParseStream<{ a: BigNumber }>([ + '{"a":1}\n{"a":', + '2}\n{"a":3}\n' + ]) + expect(values.map((v) => v.a.toString())).toEqual(['1', '2', '3']) + expect(skipped).toEqual([]) + }) + + it('does not emit a trailing partial line that lacks a newline', async () => { + const { values, skipped } = await runParseStream<{ a: BigNumber }>(['{"a":1}\n{"a":2}']) + expect(values.map((v) => v.a.toString())).toEqual(['1']) + // The dangling `{"a":2}` is silently discarded (no skipped-bytes report); it's + // the connection-was-cut case, not an overflow-shedding drop. + expect(skipped).toEqual([]) + }) + + it('handles CRLF line terminators (the format the server actually emits)', async () => { + const { values, skipped } = await runParseStream<{ a: BigNumber }>([ + '{"a":1}\r\n{"a":2}\r\n{"a":3}\r\n' + ]) + expect(values.map((v) => v.a.toString())).toEqual(['1', '2', '3']) + expect(skipped).toEqual([]) + }) + + it('skips malformed JSON without poisoning subsequent values', async () => { + // The bad line is in its own chunk so it's parsed as an isolated piece. This is + // the regression test for the original crash: a parse failure must never let a + // partially-built value leak into `pushChanges`. + const { values, skipped } = await runParseStream<{ a: BigNumber }>([ + '{"a":1}\n', + '{not valid json}\n', + '{"a":3}\n' + ]) + expect(values.map((v) => v.a.toString())).toEqual(['1', '3']) + expect(skipped.length).toBeGreaterThan(0) + // Each emitted value is a real object — no partials, no garbage. + for (const v of values) { + expect(v).toBeInstanceOf(Object) + expect(BigNumber.isBigNumber(v.a)).toBe(true) + } + }) + + it('preserves arbitrary-precision numbers as BigNumber', async () => { + const huge = '9999999999999999999.999' + const { values } = await runParseStream<{ v: BigNumber }>([`{"v":${huge}}\n`]) + expect(values.length).toBe(1) + expect(BigNumber.isBigNumber(values[0].v)).toBe(true) + // JS Number would coerce this to 1e19; BigNumber preserves it verbatim. + expect(values[0].v.toFixed()).toBe(huge) + }) +}) + +describe('appendRowsForRelation', () => { + it('adds a section header for the first batch into an empty stream', () => { + const cs = emptyChangeStream() + const data = [insertRow({ id: 1 }), insertRow({ id: 2 }), insertRow({ id: 3 })] + appendRowsForRelation(cs, 'rel_A', data, stubHeaderBuilder('rel_A'), 100) + expect(cs.headers).toEqual([0]) + expect(cs.rows.length).toBe(4) + expect(isHeader(cs.rows[0])).toBe(true) + expect((cs.rows[0] as { relationName: string }).relationName).toBe('rel_A') + }) + + it('does not duplicate the header when continuing the same relation', () => { + const cs = emptyChangeStream() + appendRowsForRelation( + cs, + 'rel_A', + [insertRow({ id: 1 }), insertRow({ id: 2 }), insertRow({ id: 3 })], + stubHeaderBuilder('rel_A'), + 100 + ) + appendRowsForRelation( + cs, + 'rel_A', + [insertRow({ id: 4 }), insertRow({ id: 5 })], + stubHeaderBuilder('rel_A'), + 100 + ) + expect(cs.headers).toEqual([0]) + expect(cs.rows.length).toBe(6) + }) + + it('adds a new section header when the relation changes', () => { + const cs = emptyChangeStream() + appendRowsForRelation( + cs, + 'rel_A', + [insertRow({ id: 1 }), insertRow({ id: 2 }), insertRow({ id: 3 })], + stubHeaderBuilder('rel_A'), + 100 + ) + appendRowsForRelation( + cs, + 'rel_B', + [insertRow({ x: 'a' }), insertRow({ x: 'b' })], + stubHeaderBuilder('rel_B'), + 100 + ) + expect(cs.headers).toEqual([0, 4]) + expect(cs.rows.length).toBe(7) + expect((cs.rows[4] as { relationName: string }).relationName).toBe('rel_B') + }) + + it('re-inserts the dropped header when the front shift would orphan the section', () => { + // Drift regression: bufferSize=10, fill it with a header + 9 data rows, push 5 + // more for the same relation. The shift evicts the original header at index 0 + // along with 4 data rows; without the fix, the remaining 5 old rows and the 5 + // new rows would render with no section header above them. + const cs = emptyChangeStream() + const initial = Array.from({ length: 9 }, (_, i) => insertRow({ id: i })) + appendRowsForRelation(cs, 'rel_A', initial, stubHeaderBuilder('rel_A'), 10) + expect(cs.headers).toEqual([0]) + expect(cs.rows.length).toBe(10) + + const more = Array.from({ length: 5 }, (_, i) => insertRow({ id: 100 + i })) + appendRowsForRelation(cs, 'rel_A', more, stubHeaderBuilder('rel_A'), 10) + + expect(cs.headers).toEqual([0]) + expect(isHeader(cs.rows[0])).toBe(true) + expect((cs.rows[0] as { relationName: string }).relationName).toBe('rel_A') + // Soft cap: every cycle that drops a header re-inserts one, so the buffer can + // sit at bufferSize+1 in steady state. What matters is the invariant: rows[0] + // is always a header. + expect(cs.rows.length).toBeLessThanOrEqual(11) + // No row past index 0 is a header — there's exactly one section. + expect(cs.headers.length).toBe(1) + }) + + it('keeps a header at the front when a single batch overflows the buffer', () => { + // Overflow regression: pushing more rows than fit must still leave a header at + // rows[0]. Previously the prepended header was the *first* item of the values + // array, so `values.slice(-bufferSize)` dropped it along with the overflowing + // tail and the kept rows were left orphan. + const cs = emptyChangeStream() + const data = Array.from({ length: 50 }, (_, i) => insertRow({ id: i })) + appendRowsForRelation(cs, 'rel_A', data, stubHeaderBuilder('rel_A'), 10) + + expect(cs.headers).toEqual([0]) + expect(cs.rows.length).toBe(10) + expect(isHeader(cs.rows[0])).toBe(true) + expect((cs.rows[0] as { relationName: string }).relationName).toBe('rel_A') + + // The 9 kept data rows must be the LATEST 9 of the input (ids 41..49), not the + // earliest — we drop oldest, keep newest. + const keptIds = (cs.rows.slice(1) as unknown as { insert: { id: number } }[]).map( + (r) => r.insert.id + ) + expect(keptIds).toEqual([41, 42, 43, 44, 45, 46, 47, 48, 49]) + }) + + it('re-inserts the most-recent dropped header when multiple sections drop at once', () => { + // Pre-build state: [header_A, 5×A, header_B, 5×B] = 12 rows, headers=[0, 6]. + // With bufferSize=15 and a new C-batch sized so dropCount=7, both A and B + // headers are dropped from the front; the orphan rows that remain belonged to + // B (rows at old indices 7..11), so the helper must re-insert B's header — not + // A's — at the front of the kept tail. + const cs = emptyChangeStream() + appendRowsForRelation( + cs, + 'rel_A', + Array.from({ length: 5 }, (_, i) => insertRow({ id: i })), + stubHeaderBuilder('rel_A'), + 15 + ) + appendRowsForRelation( + cs, + 'rel_B', + Array.from({ length: 5 }, (_, i) => insertRow({ id: 100 + i })), + stubHeaderBuilder('rel_B'), + 15 + ) + expect(cs.headers).toEqual([0, 6]) + expect(cs.rows.length).toBe(12) + + appendRowsForRelation( + cs, + 'rel_C', + Array.from({ length: 9 }, (_, i) => insertRow({ id: 200 + i })), + stubHeaderBuilder('rel_C'), + 15 + ) + + // dropCount = 12 + (1 + 9) - 15 = 7, evicting header_A + 5 A_rows + header_B. + // All 5 B data rows survive at old indices 7..11 → after splice(0,7) at new + // 0..4 → after unshift(header_B) at new 1..5. + // Expected layout: + // rows[0] = header_B (re-inserted; most-recent dropped header) + // rows[1..5] = the 5 surviving B data rows (ids 100..104) + // rows[6] = header_C (new section) + // rows[7..15] = the 9 new C data rows + expect(cs.headers).toEqual([0, 6]) + expect(cs.rows.length).toBe(16) + expect((cs.rows[0] as { relationName: string }).relationName).toBe('rel_B') + expect((cs.rows[6] as { relationName: string }).relationName).toBe('rel_C') + // Confirm the orphans-under-B are actually B's data rows, not A's. + const orphanIds = (cs.rows.slice(1, 6) as unknown as { insert: { id: number } }[]).map( + (r) => r.insert.id + ) + expect(orphanIds).toEqual([100, 101, 102, 103, 104]) + }) + + it('inserts a header before a skip marker when the relation changed', () => { + const cs = emptyChangeStream() + appendRowsForRelation( + cs, + 'rel_A', + [insertRow({ id: 1 }), insertRow({ id: 2 }), insertRow({ id: 3 })], + stubHeaderBuilder('rel_A'), + 100 + ) + // Skip marker arrives for a different relation — must get its own header so + // the marker doesn't render under the wrong section. + appendRowsForRelation(cs, 'rel_B', [skipRow('rel_B', 512)], stubHeaderBuilder('rel_B'), 100) + expect(cs.headers).toEqual([0, 4]) + expect(cs.rows.length).toBe(6) + expect((cs.rows[4] as { relationName: string }).relationName).toBe('rel_B') + expect('skippedBytes' in (cs.rows[5] as object)).toBe(true) + }) + + it('does not add a header for a skip marker in the current relation', () => { + const cs = emptyChangeStream() + appendRowsForRelation( + cs, + 'rel_A', + [insertRow({ id: 1 }), insertRow({ id: 2 }), insertRow({ id: 3 })], + stubHeaderBuilder('rel_A'), + 100 + ) + appendRowsForRelation(cs, 'rel_A', [skipRow('rel_A', 512)], stubHeaderBuilder('rel_A'), 100) + expect(cs.headers).toEqual([0]) + expect(cs.rows.length).toBe(5) + expect('skippedBytes' in (cs.rows[4] as object)).toBe(true) + }) +}) diff --git a/js-packages/web-console/src/lib/functions/pipelines/changeStream.ts b/js-packages/web-console/src/lib/functions/pipelines/changeStream.ts index 20268ad4b11..bdf58a85167 100644 --- a/js-packages/web-console/src/lib/functions/pipelines/changeStream.ts +++ b/js-packages/web-console/src/lib/functions/pipelines/changeStream.ts @@ -1,20 +1,288 @@ import { type JSONParser, type JSONParserOptions, Tokenizer, TokenParser } from '@streamparser/json' import { BigNumber } from 'bignumber.js' import invariant from 'tiny-invariant' +import type { ChangeStreamData, Row } from '$lib/components/pipelines/editor/ChangeStream.svelte' import { findIndex } from '$lib/functions/common/array' import { tuple } from '$lib/functions/common/tuple' +import type { XgressEntry } from '$lib/services/pipelineManager' class BigNumberTokenizer extends Tokenizer { parseNumber = BigNumber as any } +export interface StreamingJsonParser { + /** Feed text into the parser. Returns the values it produced from this input. */ + write(input: string): T[] + /** Drop internal state — call after `write` throws so the next call starts clean. */ + reset(): void +} + +/** + * Long-lived JSON parser that preserves number precision via `BigNumber` (so SQL + * DECIMAL / large-integer columns aren't coerced through JS `Number`). Designed to be + * allocated **once per stream** and fed multi-line blocks: the underlying + * `@streamparser/json` tokenizer + token parser handle a series of concatenated + * documents in a single `write()` call (`separator: ''` mode). + */ +export const createBigNumberStreamParser = ( + opts?: JSONParserOptions +): StreamingJsonParser => { + let buffer: T[] = [] + let tokenizer!: Tokenizer + let tokenParser!: TokenParser + const init = () => { + tokenizer = new BigNumberTokenizer() + tokenParser = new TokenParser(opts) + tokenizer.onToken = tokenParser.write.bind(tokenParser) + tokenParser.onValue = (v) => { + buffer.push(v.value as T) + } + } + init() + return { + write(input: string): T[] { + tokenizer.write(input) + const out = buffer + buffer = [] + return out + }, + reset() { + buffer = [] + init() + } + } +} + +const NEWLINE = 0x0a + +/** + * Soft cap on how many bytes a single not-yet-newline-terminated record may accumulate. + * Anything beyond this is reported as skipped — accepting unbounded leftovers would let + * a runaway record exhaust browser memory. + */ +const MAX_LINE_SIZE = 16 * 1024 * 1024 + +/** + * Soft target for the size of each `parser.write` call. The reader splits each + * line-aligned block at the nearest newline below this size and yields between pieces, + * so a single big network chunk can't monopolize the main thread. + */ +const PARSE_PIECE_TARGET_BYTES = 16 * 1024 + +/** + * Yields control back to the event loop so the UI can render and respond to input. + * + * Uses `scheduler.yield()` when available (Chrome 129+) to keep the task's priority, + * otherwise a `MessageChannel` round-trip — both yield in the same macrotask boundary + * that `setTimeout(0)` would, but without the 4ms minimum browsers clamp setTimeout to + * after a few nested calls. That clamp was capping throughput far below what overflow + * shedding actually requires. + */ +const yieldToEventLoop = (): Promise => { + const sched = (globalThis as { scheduler?: { yield?: () => Promise } }).scheduler + if (sched && typeof sched.yield === 'function') { + return sched.yield() + } + if (typeof MessageChannel !== 'undefined') { + return new Promise((resolve) => { + const channel = new MessageChannel() + channel.port1.onmessage = () => { + channel.port1.close() + resolve() + } + channel.port2.postMessage(0) + }) + } + return new Promise((resolve) => setTimeout(resolve)) +} + +/** + * Streams newline-delimited JSON from `source` and dispatches parsed values via + * `pushChanges` on a fixed cadence. + * + * Single sequential reader loop (no `TransformStream` pipe). One `setInterval` drives UI + * updates. Overflow shedding uses a single counter: when the bytes of unflushed values + * exceed `bufferSize`, the next line-aligned block is dropped and reported via + * `onBytesSkipped`. Between network chunks the reader yields to a macrotask so the flush + * timer (and the rest of the page) keep ticking. + * + * The parser is `write`/`reset`-shaped on purpose: each network chunk is split into one + * line-aligned block (everything up to the last `\n`) and passed straight to + * `parser.write(block)`. The parser handles the multi-line block as a series of + * concatenated documents in a single call, which is much cheaper than tokenizing one + * line at a time. If `write` throws, the parser is `reset()` — its internal state is + * discarded, the block is reported as skipped, and the next block starts on a clean + * parser. Because we never feed the parser anything that isn't a complete document, the + * structurally-invalid-JSON corruption that crashed the UI in the original + * implementation cannot occur. + * + * @param parser Long-lived parser, typically allocated once per stream. + * @param options.bufferSize **Main tuning knob.** Per-flush-window byte budget for + * pieces admitted into the parser. When the running total of admitted bytes in the + * current window exceeds this, the next piece (up to `PARSE_PIECE_TARGET_BYTES`) is dropped (and + * reported via `onBytesSkipped`) instead of being parsed. The budget resets on every + * `flushIntervalMs` tick, so in steady state the worst-case stream-to-UI latency is + * approximately `bufferSize / parse_rate`. + * + * - **Too many skipped records?** Raise `bufferSize`. More bytes are admitted before + * dropping kicks in, but the visible data lags further behind real time. + * - **Updates feel too slow / stale?** Lower `bufferSize`. The window's worth of + * CPU/UI work is capped sooner, dropping the excess; perceived latency falls but + * the skipped-bytes counter climbs. + * @param options.flushIntervalMs How often `pushChanges` fires and `bufferSize` is + * released. Lower → smoother UI cadence and a tighter latency cap, but more frequent + * Svelte/DOM updates per second. Default 100ms. + */ +export const parseStream = ( + source: { + stream: ReadableStream> + cancel: () => void + }, + parser: StreamingJsonParser, + cbs: { + pushChanges: (changes: T[]) => void + onBytesSkipped?: (bytes: number) => void + onParseEnded?: (reason: 'ended' | 'cancelled') => void + onNetworkError?: (e: TypeError, injectValue: (value: T) => void) => void + }, + options?: { bufferSize?: number; flushIntervalMs?: number } +) => { + invariant( + source.stream instanceof ReadableStream, + `parseStream(): stream is ${JSON.stringify(source.stream)}` + ) + const maxPendingBytes = options?.bufferSize ?? 1_000_000 + const flushIntervalMs = options?.flushIntervalMs ?? 100 + + let cancelled = false + let closedReason: null | 'ended' | 'cancelled' = null + let endedReported = false + + const outBuffer: T[] = [] + let pendingBytes = 0 + + const reader = source.stream.getReader() + + const readLoop = async () => { + const decoder = new TextDecoder('utf-8') + let leftover = '' + while (!cancelled) { + let chunk + try { + chunk = await reader.read() + } catch (e) { + cbs.onNetworkError?.(e as TypeError, (v) => outBuffer.push(v)) + return + } + if (chunk.done) { + return + } + const text = leftover + decoder.decode(chunk.value, { stream: true }) + const lastNewline = text.lastIndexOf('\n') + if (lastNewline === -1) { + if (text.length > MAX_LINE_SIZE) { + cbs.onBytesSkipped?.(text.length) + leftover = '' + } else { + leftover = text + } + continue + } + const block = text.slice(0, lastNewline + 1) + leftover = text.slice(lastNewline + 1) + + // Feed the parser line-aligned pieces of bounded size and yield between them. + // The parser is the same instance across all pieces (zero per-piece allocation), + // but bounding the size of each `parser.write` call caps how long the main + // thread is blocked at a stretch. The pending-bytes check is per-piece, not + // per-block: if we admitted/dropped at block granularity, a single network + // chunk larger than `bufferSize` would be dropped even though the + // flush timer resets `pendingBytes` to 0 between yields and most pieces would + // fit individually. Dropping per-piece (each piece is itself line-aligned) + // keeps as much data flowing as the flush rate allows. + let cursor = 0 + while (cursor < block.length) { + let end = Math.min(cursor + PARSE_PIECE_TARGET_BYTES, block.length) + if (end < block.length) { + const nl = block.lastIndexOf('\n', end - 1) + end = nl > cursor ? nl + 1 : block.length + } + const pieceLen = end - cursor + if (pendingBytes + pieceLen > maxPendingBytes) { + cbs.onBytesSkipped?.(pieceLen) + } else { + const piece = block.slice(cursor, end) + try { + const items = parser.write(piece) + for (const item of items) { + outBuffer.push(item) + } + pendingBytes += pieceLen + } catch { + parser.reset() + cbs.onBytesSkipped?.(pieceLen) + } + } + cursor = end + await yieldToEventLoop() + if (cancelled) { + return + } + } + } + } + + setTimeout(async () => { + await readLoop() + closedReason ??= 'ended' + }) + + const flushHandle = setInterval(() => { + // Reset the per-window byte budget unconditionally. `pendingBytes` represents + // "CPU work admitted since the last window", not "bytes currently in outBuffer", + // so even pieces that parsed successfully but produced no values (heartbeat + // lines with no `json_data`, lines that don't match the parser's path filter, + // etc.) need to release their budget on each tick. If we only reset alongside + // pushChanges, a stream of no-output pieces would silently exhaust the cap + // and the read loop would drop everything from then on. + pendingBytes = 0 + if (outBuffer.length) { + const batch = outBuffer.slice() + outBuffer.length = 0 + cbs.pushChanges(batch) + } + if (closedReason && !endedReported) { + endedReported = true + clearInterval(flushHandle) + cbs.onParseEnded?.(closedReason) + } + }, flushIntervalMs) + + return { + cancel: () => { + cancelled = true + closedReason = 'cancelled' + reader + .cancel() + .catch(() => {}) + .finally(() => { + try { + source.cancel() + } catch { + /* source.cancel may be a no-op or already cancelled */ + } + }) + } + } +} + /** * * @param stream * @param pushChanges - * @param options.bufferSize Threshold size of the buffer that holds unprocessed JSON chunks. - * If the buffer size exceeds this value - when the new JSON batch arrives previous JSON batches are dropped - * until the buffer size is under the threshold, or only one batch remains. + * @param options.bufferSize Soft byte budget for the line queue between the network reader + * and the JSON parser. When the queue exceeds this, incoming complete lines are dropped + * instead of enqueued — see `splitByNewlineWithOverflowShedding` for why this is sound. * @returns */ export const parseCancellable = >( @@ -35,10 +303,9 @@ export const parseCancellable = void ): TransformStream { + let leftover: Uint8Array | null = null return new TransformStream( { - async transform(chunk, controller) { - let start = 0 - while (start < chunk.length) { - const end = Math.min(chunk.length, start + maxChunkBytes) - if (hasBackpressure(controller, maxChunkBytes)) { + transform(chunk, controller) { + // Scan only backwards from the end of the chunk to find the last newline. + // The bytes before it (plus any prior leftover) form a contiguous run of complete + // lines; the bytes after it are the head of an incomplete record. We never need + // to find the newlines in between — the parser will walk them itself. + let lastNewline = -1 + for (let i = chunk.length - 1; i >= 0; --i) { + if (chunk[i] === NEWLINE) { + lastNewline = i break } - controller.enqueue(chunk.subarray(start, end)) + } - start = end + if (lastNewline === -1) { + // No record terminator yet — extend the leftover. + const accumulated = (leftover?.length ?? 0) + chunk.length + if (accumulated > MAX_LINE_SIZE) { + onBytesSkipped?.(accumulated) + leftover = null + return + } + if (leftover) { + const merged = new Uint8Array(accumulated) + merged.set(leftover, 0) + merged.set(chunk, leftover.length) + leftover = merged + } else { + // Copy so we don't pin the network chunk's full backing buffer for one record. + leftover = chunk.slice() + } + return } - if (start < chunk.length) { - onBytesSkipped?.(chunk.length - start) + + const emitFromChunk = lastNewline + 1 + const leftoverLen = leftover?.length ?? 0 + const emitLen = leftoverLen + emitFromChunk + + if (hasBackpressure(controller)) { + onBytesSkipped?.(emitLen) + } else if (leftoverLen === 0) { + // No prior leftover — enqueue a view into the chunk; saves an allocation. + // The view keeps the chunk's backing buffer alive only until the parser + // consumes it, which happens on the next transform tick. + controller.enqueue(chunk.subarray(0, emitFromChunk)) + } else { + const merged = new Uint8Array(emitLen) + merged.set(leftover!, 0) + merged.set(chunk.subarray(0, emitFromChunk), leftoverLen) + controller.enqueue(merged) + } + leftover = null + + const remainderLen = chunk.length - emitFromChunk + if (remainderLen === 0) { + return + } + if (remainderLen > MAX_LINE_SIZE) { + onBytesSkipped?.(remainderLen) + } else { + // Copy so leftover doesn't pin the (much larger) network chunk buffer. + leftover = chunk.slice(emitFromChunk) + } + }, + flush(controller) { + if (leftover && leftover.length > 0) { + if (hasBackpressure(controller)) { + onBytesSkipped?.(leftover.length) + } else { + controller.enqueue(leftover) + } + leftover = null } } }, {}, - { highWaterMark: maxChunkBufferSize, size: (c) => c?.length ?? 0 } + { highWaterMark: maxBufferedBytes, size: (c) => c?.length ?? 0 } ) } -const hasBackpressure = (controller: TransformStreamDefaultController, offset: number) => { - return controller.desiredSize !== null && controller.desiredSize - offset < 0 +const hasBackpressure = (controller: TransformStreamDefaultController) => { + return controller.desiredSize !== null && controller.desiredSize <= 0 } const mkTransformerParser = ( @@ -176,10 +518,18 @@ class JSONParserTransformer implements Transformer { async transform(chunk: Uint8Array | string) { try { this.parser.write(chunk) - } catch (e) { + } catch { + // Every chunk arrives line-aligned from the upstream splitter, so a parse + // error means the source itself produced a malformed document. Reset the + // parser to drop its partial state and continue with the next chunk. this.parser = mkTransformerParser(this.controller, this.opts) } - await new Promise((resolve) => setTimeout(resolve)) + // Yield to a macrotask between every chunk — not because the parser needs a + // break, but because the WebStreams pipe runs transforms back-to-back via + // microtasks. Without this hop, the 100ms flush timer (and any other + // setTimeout/setInterval) starves behind the microtask queue, which makes + // the UI appear to update only every few seconds on busy streams. + await yieldToEventLoop() } flush() {} @@ -285,3 +635,87 @@ export const pushAsCircularBuffer = arr().push(...vs.slice(0, numNewItems)) return numItemsToDrop } + +/** + * Header-aware circular append for a Change Stream view. Appends `newRows` for + * `relationName` to `cs.rows`, maintaining `cs.headers` so that every section of rows + * is preceded by a header row (`{ relationName, columns }`) and orphan rows never + * appear. + * + * Why this isn't just `pushAsCircularBuffer`: under high-throughput streams, the + * circular buffer can shift the section header off the front of `cs.rows`. The previous + * implementation then filtered the (now-negative) header index out of `cs.headers`, + * leaving the rows that came after that header without a section header above them. + * Two specific failure modes: + * + * - **Batch overflow:** when a single push exceeds `bufferSize`, the trailing tail + * is kept but the prepended header at the front of the input is dropped along + * with the displaced overflow. + * - **Front-shift drift:** every same-relation push shifts existing rows left; once + * enough items accumulate, the only header (at index 0) is dropped, and the rest + * of the section becomes orphaned. + * + * This function handles both: on overflow it always builds a header at the front of + * the kept tail; on front-shift it captures the most-recent header that's about to be + * dropped and prepends it back so the surviving rows still render under their section. + * + * @param buildHeader Callback that returns a header row for `relationName`. Receives + * an optional sample data row so the caller can derive column order from the actual + * data shape (with a schema-based fallback for the skipped-bytes path). + */ +export const appendRowsForRelation = ( + cs: ChangeStreamData, + relationName: string, + newRows: Row[], + buildHeader: (sample?: XgressEntry) => Row, + bufferSize: number, + sample?: XgressEntry +) => { + const lastHeaderIdx = cs.headers.at(-1) + const lastHeader = lastHeaderIdx !== undefined ? cs.rows[lastHeaderIdx] : undefined + const lastRelationName = lastHeader && 'columns' in lastHeader ? lastHeader.relationName : null + const sectionHeader: Row | null = relationName !== lastRelationName ? buildHeader(sample) : null + + const itemsToAdd = (sectionHeader ? 1 : 0) + newRows.length + + // Batch alone fills (or overflows) the buffer. Discard everything else and start + // the kept tail with a fresh header. + if (itemsToAdd >= bufferSize) { + const header = sectionHeader ?? buildHeader(sample) + cs.rows = [header, ...newRows.slice(-(bufferSize - 1))] + cs.headers = [0] + return + } + + const initialLen = cs.rows.length + const dropCount = Math.max(0, initialLen + itemsToAdd - bufferSize) + + // Capture the most recent header that's about to be shifted off the front, so we + // can restore section context for any rows that survive its drop. + let lastDroppedHeader: Row | undefined + for (const hIdx of cs.headers) { + if (hIdx >= dropCount) { + break + } + lastDroppedHeader = cs.rows[hIdx] + } + + cs.rows.splice(0, dropCount) + let newHeaders = cs.headers.map((i) => i - dropCount).filter((i) => i >= 0) + + // If the kept rows now begin with orphan data rows (no header at index 0), + // prepend the captured dropped header so the section is preserved. + if (cs.rows.length > 0 && (newHeaders.length === 0 || newHeaders[0] > 0) && lastDroppedHeader) { + cs.rows.unshift(lastDroppedHeader) + newHeaders = [0, ...newHeaders.map((i) => i + 1)] + } + + if (sectionHeader) { + newHeaders.push(cs.rows.length) + cs.rows.push(sectionHeader) + } + for (const r of newRows) { + cs.rows.push(r) + } + cs.headers = newHeaders +}