Two Change Stream bugfixes#6215
Conversation
|
I think "backpressure" is something else - it means blocking the source if you cannot keep up |
| // the tail is already a skip marker tagged with this relation, just bump | ||
| // its byte count in place instead of pushing another row. Keeps the | ||
| // change-stream view from getting flooded with one-line "Skipped N bytes" | ||
| // entries when backpressure drops sustained traffic. |
There was a problem hiding this comment.
this not backpressure.
when you cannot keep up
| // change-stream view from getting flooded with one-line "Skipped N bytes" | ||
| // entries when backpressure drops sustained traffic. | ||
| const lastRow = cs.rows.at(-1) | ||
| if (lastRow && 'skippedBytes' in lastRow && lastRow.relationName === relationName) { |
There was a problem hiding this comment.
who generates this message "skipped bytes"?
It's not obvious to me you can't miss some of these messages either
There was a problem hiding this comment.
who generates this message "skipped bytes"?
The UI itself, when it chooses to discard data to keep up with the throughput.
It's not obvious to me you can't miss some of these messages either
Could you elaborate? I don't understand your point.
|
|
||
| export interface StreamingJsonParser<T> { | ||
| /** Feed text into the parser. Returns the values it produced from this input. */ | ||
| write(input: string): T[] |
There was a problem hiding this comment.
I think 'parse' would be better
what happens if the string is only half-parseable? will it save the last half for the next call?
There was a problem hiding this comment.
I think 'parse' would be better
I'd prefer to keep write as it highlights the stream semantics of the parser, rather than it's functionality. Also, keeps the usage more in tune with the underlying implementation.
what happens if the string is only half-parseable? will it save the last half for the next call?
Yes, exactly. It expects well-formed JSON, arbitrarily cut up into chunks for subsequent write() calls
| * line at a time. If `write` throws, the parser is `reset()` — its internal state is | ||
| * discarded, the block is reported as skipped, and the next block starts on a clean | ||
| * parser. Because we never feed the parser anything that isn't a complete document, the | ||
| * structurally-invalid-JSON corruption that crashed the UI in the original |
There was a problem hiding this comment.
I don't think it's productive to have comments about "the original implementation"
soon no one will know what this is about
| * @param parser Long-lived parser, typically allocated once per stream. | ||
| * @param options.bufferSize **Main tuning knob.** Per-flush-window byte budget for | ||
| * pieces admitted into the parser. When the running total of admitted bytes in the | ||
| * current window would exceed this, the next ~16KB piece is dropped wholesale (and |
There was a problem hiding this comment.
would exceed -> exceeds
There was a problem hiding this comment.
hardwired 16KB size in comment
| `parseStream(): stream is ${JSON.stringify(source.stream)}` | ||
| ) | ||
| const maxPendingBytes = options?.bufferSize ?? 1_000_000 | ||
| const flushIntervalMs = options?.flushIntervalMs ?? 100 |
There was a problem hiding this comment.
frankly anything below what humans can perceive is a waste
you can't read anything that updates faster than ~1s, so even that would be fine
There was a problem hiding this comment.
The fast refresh rate of the table is intended more as a visual effect of speed of processing rather than a practical way to view every incoming change, the intended flow for that is pausing the pipeline and viewing the now-static changes table.
| * of enqueued, and its byte count is reported via `onBytesSkipped` so the UI can render | ||
| * a gap marker. | ||
| * | ||
| * Why line-aligned drops are correct: the previous implementation dropped the tail of |
There was a problem hiding this comment.
write this comment describing what the current implementation does, not the previous one did wrong
| }) | ||
| } | ||
|
|
||
| // Runs `parseStream` over `chunks` and resolves once the stream has ended and the |
There was a problem hiding this comment.
"resolves the JavaScript Promise" - implied by context; wiill make the comment clearer
|
|
||
| const isHeader = (row: Row | undefined): boolean => !!row && 'columns' in (row as object) | ||
|
|
||
| // A header-builder usable by the function-under-test that just returns a header for |
There was a problem hiding this comment.
This function is passed into appendRowsForRelation function as an argument callback that receives an optional sample argument - that one. The comment makes sense when working with the code in context.
| const more = Array.from({ length: 5 }, (_, i) => insertRow({ id: 100 + i })) | ||
| appendRowsForRelation(cs, 'rel_A', more, stubHeaderBuilder('rel_A'), 10) | ||
|
|
||
| expect(cs.headers).toEqual([0]) |
There was a problem hiding this comment.
I don't get this business with headers, once you have computed a header, it can stay the same forever, right? Why would it change?
There was a problem hiding this comment.
The "header" rows are inserted inline with all the data rows - of reach batch of changes for a single relation. Since the "sticky header" needs to be displayed programmatically, I am keeping an index of rows which are actually headers of every section. The row buffer is not infinite, it is limited, and the oldest rows get dropped - so this index needs to be maintained when inserting new data into the stream, and occasionally re-inserting the oldest header row (because only a part of the batch was dropped, and the original header row was among the dropped rows).
There was a problem hiding this comment.
You can see this in action if you start e.g. fraud-detection demo pipeline, select multiple views at once, pause the pipeline and scroll through the change stream
mythical-fred
left a comment
There was a problem hiding this comment.
Solid rewrite with good test coverage. Mihai's inline comments are worth addressing (especially the ones about referencing 'the original implementation' in comments -- write for the future reader, not the archeologist). One commit-message nit below.
| | { relationName: string; columns: Field[] } | ||
| | XgressEntry | ||
| | { skippedBytes: number } | ||
| | { relationName: string; skippedBytes: number } |
There was a problem hiding this comment.
Nit: commit message body has a typo -- "hight-throughput" should be "high-throughput".
Unfortunately, plain CSS does not enable a clean use of |
Fix Change Stream losing section headers on high-throughput streams Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
7930474 to
001ccfd
Compare
There were two major issues:
Change Stream crashed the page on high-throughput streams.
Change Stream is sometimes unable to parse and process every update in a high-throughput change stream, so some form of backpressure management is needed. Previously it was achieved by simply dropping incoming JSON lines HTTP chunks, relying on JSON failing to parse the next not-skipped chunk to effectively skip part of the data. But sometimes the chunk content aligned in a way where the combined JSON lines were valid syntactically, but represented invalid JSON shape. This malformed JSON then propagated the UI and caused the page to crash.
The fix involves a fresh implementation of stream parsing code (
parseStream). ItSection headers got lost on high-throughput streams.
Change Stream table displays special rows - section headers for a batch of changes to a single relation. This requires extra bookkeeping of header rows, and this algorithm had problems - in certain cases the first header in the table disappeared, which orphaned the first batch of changes - you could no longer see which relation they point to. The fix was refactoring the algorithm that does this bookkeeping (
appendForRelation)Tests: manual testing of low and high throughput streams; added relevant unit tests.