Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
export type Row =
| { relationName: string; columns: Field[] }
| XgressEntry
| { skippedBytes: number }
| { relationName: string; skippedBytes: number }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: commit message body has a typo -- "hight-throughput" should be "high-throughput".

export type ChangeStreamData = {
rows: Row[]
headers: number[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
import Query, { type Row, type QueryData } from '$lib/components/adhoc/Query.svelte'
import { isPipelineInteractive } from '$lib/functions/pipelines/status'
import type { SQLValueJS } from '$lib/types/sql'
import {
CustomJSONParserTransformStream,
parseCancellable
} from '$lib/functions/pipelines/changeStream'
import { createBigNumberStreamParser, parseStream } from '$lib/functions/pipelines/changeStream'
import invariant from 'tiny-invariant'
import WarningBanner from '$lib/components/pipelines/editor/WarningBanner.svelte'
import { enclosure, reclosureKey } from '$lib/functions/common/function'
Expand Down Expand Up @@ -117,8 +114,12 @@
}
}
}
const { cancel } = parseCancellable(
const { cancel } = parseStream(
result,
createBigNumberStreamParser<Record<string, SQLValueJS>>({
paths: ['$'],
separator: ''
}),
{
pushChanges,
onBytesSkipped: (skippedBytes) => {
Expand Down Expand Up @@ -147,10 +148,6 @@
injectValue({ error: e.message })
}
},
new CustomJSONParserTransformStream<Record<string, SQLValueJS>>({
paths: ['$'],
separator: ''
}),
{
bufferSize: 8 * 1024 * 1024
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,66 +87,61 @@
pipelinesRelations[tenantName][pipelineName][relationName].cancelStream = undefined
return undefined
}
const { cancel } = parseCancellable(

// Build a section header for `relationName`. Prefers a sample data row to
// determine column order; falls back to the relation schema (e.g. for the
// skipped-bytes path, where there's no row to sample).
const buildSectionHeader = (sample?: XgressEntry): Row => {
const fields = pipelinesRelations[tenantName][pipelineName][relationName].fields
const columns: Field[] = sample
? Object.keys('insert' in sample ? sample.insert : sample.delete).map(
(name) => fields[normalizeCaseIndependentName({ name })]
)
: Object.values(fields)
return { relationName, columns }
}

const appendForRelation = (newRows: Row[], sample?: XgressEntry) =>
appendRowsForRelation(
changeStream[tenantName][pipelineName],
relationName,
newRows,
buildSectionHeader,
bufferSize,
sample
)

const { cancel } = parseStream(
result,
createBigNumberStreamParser<XgressEntry>({
paths: ['$.json_data.*'],
separator: ''
}),
{
pushChanges: (rows: XgressEntry[]) => {
const initialLen = changeStream[tenantName][pipelineName].rows.length
const lastRelationName = ((headerIdx) =>
headerIdx !== undefined
? ((header) => (header && 'relationName' in header ? header.relationName : null))(
changeStream[tenantName][pipelineName].rows[headerIdx]
)
: null)(changeStream[tenantName][pipelineName].headers.at(-1))
const offset = pushAsCircularBuffer(
() => changeStream[tenantName][pipelineName].rows,
bufferSize,
(v: Row) => v
)(
[
...(relationName !== lastRelationName
? ([
{
relationName,
columns: Object.keys(
((row) => ('insert' in row ? row.insert : row.delete))(rows[0])
).map((name) => {
return pipelinesRelations[tenantName][pipelineName][relationName].fields[
normalizeCaseIndependentName({ name })
]
})
}
] as Row[])
: [])
].concat(rows)
)
if (relationName !== lastRelationName) {
changeStream[tenantName][pipelineName].headers.push(initialLen)
}
changeStream[tenantName][pipelineName].headers = changeStream[tenantName][
pipelineName
].headers
.map((i) => i - offset)
.filter((i) => i >= 0)
appendForRelation(rows as unknown as Row[], rows[0])
},
onBytesSkipped: (skippedBytes) => {
pushAsCircularBuffer(
() => changeStream[tenantName][pipelineName].rows,
bufferSize,
(v) => v
)([{ relationName, skippedBytes }])
changeStream[tenantName][pipelineName].totalSkippedBytes += skippedBytes
const cs = changeStream[tenantName][pipelineName]
// Coalesce consecutive skip markers for the same relation: if the row at
// the tail is already a skip marker tagged with this relation, just bump
// its byte count in place instead of pushing another row. Keeps the
// change-stream view from getting flooded with one-line "Skipped N bytes"
// entries when backpressure drops sustained traffic.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this not backpressure.
when you cannot keep up

const lastRow = cs.rows.at(-1)
if (lastRow && 'skippedBytes' in lastRow && lastRow.relationName === relationName) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who generates this message "skipped bytes"?
It's not obvious to me you can't miss some of these messages either

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who generates this message "skipped bytes"?

The UI itself, when it chooses to discard data to keep up with the throughput.

It's not obvious to me you can't miss some of these messages either

Could you elaborate? I don't understand your point.

lastRow.skippedBytes += skippedBytes
} else {
appendForRelation([{ relationName, skippedBytes }])
}
cs.totalSkippedBytes += skippedBytes
},
onParseEnded: () => {
pipelinesRelations[tenantName][pipelineName][relationName].cancelStream = undefined
}
},
new CustomJSONParserTransformStream<XgressEntry>({
paths: ['$.json_data.*'],
separator: ''
}),
{
bufferSize: 8 * 1024 * 1024
bufferSize: 4 * 1024 * 1024
}
)
return () => {
Expand Down Expand Up @@ -247,9 +242,9 @@
import { Pane, PaneGroup, PaneResizer } from 'paneforge'
import type { Field, Relation } from '$lib/services/manager'
import {
CustomJSONParserTransformStream,
parseCancellable,
pushAsCircularBuffer
appendRowsForRelation,
createBigNumberStreamParser,
parseStream
} from '$lib/functions/pipelines/changeStream'
import JSONbig from 'true-json-bigint'
import { count, groupBy } from '$lib/functions/common/array'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import { formatDateTime, useElapsedTime } from '$lib/functions/format'
import type { PipelineMetrics } from '$lib/functions/pipelineMetrics'
import {
CustomJSONParserTransformStream,
parseCancellable,
createBigNumberStreamParser,
parseStream,
pushAsCircularBuffer
} from '$lib/functions/pipelines/changeStream'
import { getDeploymentStatusLabel, isMetricsAvailable } from '$lib/functions/pipelines/status'
Expand Down Expand Up @@ -104,8 +104,12 @@
cancelStream = undefined
return undefined
}
const { cancel } = parseCancellable(
const { cancel } = parseStream(
result,
createBigNumberStreamParser<TimeSeriesEntry>({
paths: ['$'],
separator: ''
}),
{
pushChanges: (rows: TimeSeriesEntry[]) => {
pushAsCircularBuffer(
Expand Down Expand Up @@ -134,10 +138,6 @@
}
}
},
new CustomJSONParserTransformStream<TimeSeriesEntry>({
paths: ['$'],
separator: ''
}),
{
bufferSize: 8 * 1024 * 1024
}
Expand Down
Loading
Loading