Skip to content

Two Change Stream bugfixes#6215

Open
Karakatiza666 wants to merge 1 commit into
mainfrom
fix-change-stream
Open

Two Change Stream bugfixes#6215
Karakatiza666 wants to merge 1 commit into
mainfrom
fix-change-stream

Conversation

@Karakatiza666
Copy link
Copy Markdown
Contributor

There were two major issues:

  1. Change Stream crashed the page on high-throughput streams.
    Change Stream is sometimes unable to parse and process every update in a high-throughput change stream, so some form of backpressure management is needed. Previously it was achieved by simply dropping incoming JSON lines HTTP chunks, relying on JSON failing to parse the next not-skipped chunk to effectively skip part of the data. But sometimes the chunk content aligned in a way where the combined JSON lines were valid syntactically, but represented invalid JSON shape. This malformed JSON then propagated the UI and caused the page to crash.
    The fix involves a fresh implementation of stream parsing code (parseStream). It

    • Guarantees the stream bytes are skipped only on line breaks (as a backpressure relief method)
    • Has nice backpressure relief behavior - tuned empirically, the change stream visually updates as fast on high-throughput streams as it does on lower-throughput streams, while skipping any amount of data necessary to avoid significantly slowing down the UI.
  2. Section headers got lost on high-throughput streams.
    Change Stream table displays special rows - section headers for a batch of changes to a single relation. This requires extra bookkeeping of header rows, and this algorithm had problems - in certain cases the first header in the table disappeared, which orphaned the first batch of changes - you could no longer see which relation they point to. The fix was refactoring the algorithm that does this bookkeeping (appendForRelation)

Tests: manual testing of low and high throughput streams; added relevant unit tests.

@Karakatiza666 Karakatiza666 requested a review from mihaibudiu May 11, 2026 17:51
@Karakatiza666 Karakatiza666 added bug Something isn't working javascript Pull requests that update Javascript code labels May 11, 2026
@mihaibudiu
Copy link
Copy Markdown
Contributor

I think "backpressure" is something else - it means blocking the source if you cannot keep up
you'd think you can just use a sticky row for section headers

// the tail is already a skip marker tagged with this relation, just bump
// its byte count in place instead of pushing another row. Keeps the
// change-stream view from getting flooded with one-line "Skipped N bytes"
// entries when backpressure drops sustained traffic.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this not backpressure.
when you cannot keep up

// change-stream view from getting flooded with one-line "Skipped N bytes"
// entries when backpressure drops sustained traffic.
const lastRow = cs.rows.at(-1)
if (lastRow && 'skippedBytes' in lastRow && lastRow.relationName === relationName) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who generates this message "skipped bytes"?
It's not obvious to me you can't miss some of these messages either

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who generates this message "skipped bytes"?

The UI itself, when it chooses to discard data to keep up with the throughput.

It's not obvious to me you can't miss some of these messages either

Could you elaborate? I don't understand your point.


export interface StreamingJsonParser<T> {
/** Feed text into the parser. Returns the values it produced from this input. */
write(input: string): T[]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 'parse' would be better
what happens if the string is only half-parseable? will it save the last half for the next call?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 'parse' would be better

I'd prefer to keep write as it highlights the stream semantics of the parser, rather than it's functionality. Also, keeps the usage more in tune with the underlying implementation.

what happens if the string is only half-parseable? will it save the last half for the next call?

Yes, exactly. It expects well-formed JSON, arbitrarily cut up into chunks for subsequent write() calls

* line at a time. If `write` throws, the parser is `reset()` — its internal state is
* discarded, the block is reported as skipped, and the next block starts on a clean
* parser. Because we never feed the parser anything that isn't a complete document, the
* structurally-invalid-JSON corruption that crashed the UI in the original
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's productive to have comments about "the original implementation"
soon no one will know what this is about

* @param parser Long-lived parser, typically allocated once per stream.
* @param options.bufferSize **Main tuning knob.** Per-flush-window byte budget for
* pieces admitted into the parser. When the running total of admitted bytes in the
* current window would exceed this, the next ~16KB piece is dropped wholesale (and
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would exceed -> exceeds

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hardwired 16KB size in comment

`parseStream(): stream is ${JSON.stringify(source.stream)}`
)
const maxPendingBytes = options?.bufferSize ?? 1_000_000
const flushIntervalMs = options?.flushIntervalMs ?? 100
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

frankly anything below what humans can perceive is a waste
you can't read anything that updates faster than ~1s, so even that would be fine

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fast refresh rate of the table is intended more as a visual effect of speed of processing rather than a practical way to view every incoming change, the intended flow for that is pausing the pipeline and viewing the now-static changes table.

* of enqueued, and its byte count is reported via `onBytesSkipped` so the UI can render
* a gap marker.
*
* Why line-aligned drops are correct: the previous implementation dropped the tail of
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write this comment describing what the current implementation does, not the previous one did wrong

})
}

// Runs `parseStream` over `chunks` and resolves once the stream has ended and the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is "resolves"?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"resolves the JavaScript Promise" - implied by context; wiill make the comment clearer


const isHeader = (row: Row | undefined): boolean => !!row && 'columns' in (row as object)

// A header-builder usable by the function-under-test that just returns a header for
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what sample argument?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is passed into appendRowsForRelation function as an argument callback that receives an optional sample argument - that one. The comment makes sense when working with the code in context.

const more = Array.from({ length: 5 }, (_, i) => insertRow({ id: 100 + i }))
appendRowsForRelation(cs, 'rel_A', more, stubHeaderBuilder('rel_A'), 10)

expect(cs.headers).toEqual([0])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this business with headers, once you have computed a header, it can stay the same forever, right? Why would it change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "header" rows are inserted inline with all the data rows - of reach batch of changes for a single relation. Since the "sticky header" needs to be displayed programmatically, I am keeping an index of rows which are actually headers of every section. The row buffer is not infinite, it is limited, and the oldest rows get dropped - so this index needs to be maintained when inserting new data into the stream, and occasionally re-inserting the oldest header row (because only a part of the batch was dropped, and the original header row was among the dropped rows).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see this in action if you start e.g. fraud-detection demo pipeline, select multiple views at once, pause the pipeline and scroll through the change stream

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid rewrite with good test coverage. Mihai's inline comments are worth addressing (especially the ones about referencing 'the original implementation' in comments -- write for the future reader, not the archeologist). One commit-message nit below.

| { relationName: string; columns: Field[] }
| XgressEntry
| { skippedBytes: number }
| { relationName: string; skippedBytes: number }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: commit message body has a typo -- "hight-throughput" should be "high-throughput".

@Karakatiza666
Copy link
Copy Markdown
Contributor Author

you'd think you can just use a sticky row for section headers

Unfortunately, plain CSS does not enable a clean use of sticky property for arbitrary HTML table rows, not just the header. Because of this, I have to keep an index of header rows, determine whether a sticky row should be displayed at any point, and position it manually to simulate the simple stick-to-top behavior.

Fix Change Stream losing section headers on high-throughput streams

Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working javascript Pull requests that update Javascript code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants