-
Notifications
You must be signed in to change notification settings - Fork 116
Two Change Stream bugfixes #6215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -87,66 +87,61 @@ | |
| pipelinesRelations[tenantName][pipelineName][relationName].cancelStream = undefined | ||
| return undefined | ||
| } | ||
| const { cancel } = parseCancellable( | ||
|
|
||
| // Build a section header for `relationName`. Prefers a sample data row to | ||
| // determine column order; falls back to the relation schema (e.g. for the | ||
| // skipped-bytes path, where there's no row to sample). | ||
| const buildSectionHeader = (sample?: XgressEntry): Row => { | ||
| const fields = pipelinesRelations[tenantName][pipelineName][relationName].fields | ||
| const columns: Field[] = sample | ||
| ? Object.keys('insert' in sample ? sample.insert : sample.delete).map( | ||
| (name) => fields[normalizeCaseIndependentName({ name })] | ||
| ) | ||
| : Object.values(fields) | ||
| return { relationName, columns } | ||
| } | ||
|
|
||
| const appendForRelation = (newRows: Row[], sample?: XgressEntry) => | ||
| appendRowsForRelation( | ||
| changeStream[tenantName][pipelineName], | ||
| relationName, | ||
| newRows, | ||
| buildSectionHeader, | ||
| bufferSize, | ||
| sample | ||
| ) | ||
|
|
||
| const { cancel } = parseStream( | ||
| result, | ||
| createBigNumberStreamParser<XgressEntry>({ | ||
| paths: ['$.json_data.*'], | ||
| separator: '' | ||
| }), | ||
| { | ||
| pushChanges: (rows: XgressEntry[]) => { | ||
| const initialLen = changeStream[tenantName][pipelineName].rows.length | ||
| const lastRelationName = ((headerIdx) => | ||
| headerIdx !== undefined | ||
| ? ((header) => (header && 'relationName' in header ? header.relationName : null))( | ||
| changeStream[tenantName][pipelineName].rows[headerIdx] | ||
| ) | ||
| : null)(changeStream[tenantName][pipelineName].headers.at(-1)) | ||
| const offset = pushAsCircularBuffer( | ||
| () => changeStream[tenantName][pipelineName].rows, | ||
| bufferSize, | ||
| (v: Row) => v | ||
| )( | ||
| [ | ||
| ...(relationName !== lastRelationName | ||
| ? ([ | ||
| { | ||
| relationName, | ||
| columns: Object.keys( | ||
| ((row) => ('insert' in row ? row.insert : row.delete))(rows[0]) | ||
| ).map((name) => { | ||
| return pipelinesRelations[tenantName][pipelineName][relationName].fields[ | ||
| normalizeCaseIndependentName({ name }) | ||
| ] | ||
| }) | ||
| } | ||
| ] as Row[]) | ||
| : []) | ||
| ].concat(rows) | ||
| ) | ||
| if (relationName !== lastRelationName) { | ||
| changeStream[tenantName][pipelineName].headers.push(initialLen) | ||
| } | ||
| changeStream[tenantName][pipelineName].headers = changeStream[tenantName][ | ||
| pipelineName | ||
| ].headers | ||
| .map((i) => i - offset) | ||
| .filter((i) => i >= 0) | ||
| appendForRelation(rows as unknown as Row[], rows[0]) | ||
| }, | ||
| onBytesSkipped: (skippedBytes) => { | ||
| pushAsCircularBuffer( | ||
| () => changeStream[tenantName][pipelineName].rows, | ||
| bufferSize, | ||
| (v) => v | ||
| )([{ relationName, skippedBytes }]) | ||
| changeStream[tenantName][pipelineName].totalSkippedBytes += skippedBytes | ||
| const cs = changeStream[tenantName][pipelineName] | ||
| // Coalesce consecutive skip markers for the same relation: if the row at | ||
| // the tail is already a skip marker tagged with this relation, just bump | ||
| // its byte count in place instead of pushing another row. Keeps the | ||
| // change-stream view from getting flooded with one-line "Skipped N bytes" | ||
| // entries when backpressure drops sustained traffic. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this not backpressure. |
||
| const lastRow = cs.rows.at(-1) | ||
| if (lastRow && 'skippedBytes' in lastRow && lastRow.relationName === relationName) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. who generates this message "skipped bytes"?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The UI itself, when it chooses to discard data to keep up with the throughput.
Could you elaborate? I don't understand your point. |
||
| lastRow.skippedBytes += skippedBytes | ||
| } else { | ||
| appendForRelation([{ relationName, skippedBytes }]) | ||
| } | ||
| cs.totalSkippedBytes += skippedBytes | ||
| }, | ||
| onParseEnded: () => { | ||
| pipelinesRelations[tenantName][pipelineName][relationName].cancelStream = undefined | ||
| } | ||
| }, | ||
| new CustomJSONParserTransformStream<XgressEntry>({ | ||
| paths: ['$.json_data.*'], | ||
| separator: '' | ||
| }), | ||
| { | ||
| bufferSize: 8 * 1024 * 1024 | ||
| bufferSize: 4 * 1024 * 1024 | ||
| } | ||
| ) | ||
| return () => { | ||
|
|
@@ -247,9 +242,9 @@ | |
| import { Pane, PaneGroup, PaneResizer } from 'paneforge' | ||
| import type { Field, Relation } from '$lib/services/manager' | ||
| import { | ||
| CustomJSONParserTransformStream, | ||
| parseCancellable, | ||
| pushAsCircularBuffer | ||
| appendRowsForRelation, | ||
| createBigNumberStreamParser, | ||
| parseStream | ||
| } from '$lib/functions/pipelines/changeStream' | ||
| import JSONbig from 'true-json-bigint' | ||
| import { count, groupBy } from '$lib/functions/common/array' | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: commit message body has a typo -- "hight-throughput" should be "high-throughput".