Skip to content

Commit 166c097

Browse files
committed
[WebConsole] WIP Switch ad-hoc queries to arrow_ipc format
Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
1 parent e8aaa5b commit 166c097

File tree

8 files changed

+325
-135
lines changed

8 files changed

+325
-135
lines changed

web-console/bun.lockb

5.78 KB
Binary file not shown.

web-console/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
"@vincjo/datatables": "^2.5.0",
5555
"@vusion/webfonts-generator": "^0.8.0",
5656
"add": "^2.0.6",
57+
"apache-arrow": "^20.0.0",
5758
"args": "^5.0.3",
5859
"array-join": "^3.2.1",
5960
"autoprefixer": "^10.4.20",

web-console/src/lib/components/adhoc/Query.svelte

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@
189189
{#if result.columns.length}
190190
<thead>
191191
<tr>
192+
<th class="bg-white-dark sticky top-0 z-10 {itemHeight}">#</th>
192193
{#each result.columns as column}
193194
<SqlColumnHeader
194195
{column}
@@ -208,6 +209,7 @@
208209
{@const row = rows[index]}
209210
{#if !row}{:else if 'cells' in row}
210211
<tr {style} class="{itemHeight} whitespace-nowrap odd:bg-white odd:dark:bg-black">
212+
<td class="select-none font-mono text-right">{index}</td>
211213
{#each row.cells as value}
212214
<SQLValue
213215
{value}

web-console/src/lib/components/pipelines/editor/TabAdHocQuery.svelte

Lines changed: 76 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313
import { isPipelineInteractive } from '$lib/functions/pipelines/status'
1414
import type { SQLValueJS } from '$lib/types/sql.ts'
1515
import {
16+
BatchProcessor,
1617
CustomJSONParserTransformStream,
17-
parseCancellable
18+
SimpleArrowIPCTransformStream,
19+
splitStreamByMaxChunk,
20+
type ArrowIpcChunk
1821
} from '$lib/functions/pipelines/changeStream'
1922
import invariant from 'tiny-invariant'
2023
import WarningBanner from '$lib/components/pipelines/editor/WarningBanner.svelte'
@@ -34,8 +37,9 @@
3437
adhocQueries[pipelineName] ??= { queries: [{ query: '' }] }
3538
})
3639
const api = usePipelineManager()
37-
const isDataRow = (record: Record<string, SQLValueJS>) =>
40+
const isJsonRow = (record: Record<string, SQLValueJS>) =>
3841
!(Object.keys(record).length === 1 && ('error' in record || 'warning' in record))
42+
const isArrowRow = (chunk: ArrowIpcChunk) => 'row' in chunk
3943
4044
const onSubmitQuery = (pipelineName: string, i: number) => async (query: string) => {
4145
const request = api.adHocQuery(pipelineName, query)
@@ -58,15 +62,15 @@
5862
return
5963
}
6064
const bufferSize = 1000
61-
const pushChanges = (
65+
const pushChangesJson = (
6266
input: (Record<string, SQLValueJS> | { error: string } | { warning: string })[]
6367
) => {
6468
if (!adhocQueries[pipelineName].queries[i]?.result) {
6569
return
6670
}
6771
if (
6872
adhocQueries[pipelineName].queries[i].result.columns.length === 0 &&
69-
isDataRow(input[0])
73+
isJsonRow(input[0])
7074
) {
7175
adhocQueries[pipelineName].queries[i].result.columns.push(
7276
...Object.keys(input[0]).map((name) => ({
@@ -85,7 +89,7 @@
8589
.push(
8690
...input
8791
.slice(0, bufferSize - previousLength)
88-
.map((v) => (isDataRow(v) ? { cells: Object.values(v) } : v) as Row)
92+
.map((v) => (isJsonRow(v) ? { cells: Object.values(v) } : v) as Row)
8993
)
9094
reclosureKey(adhocQueries[pipelineName].queries[i].result, 'rows')
9195
if (input.length > bufferSize - previousLength) {
@@ -104,24 +108,62 @@
104108
}
105109
}
106110
}
107-
const { cancel } = parseCancellable(
108-
result,
111+
const pushChangesArrow = (input: ArrowIpcChunk[]) => {
112+
if (!adhocQueries[pipelineName].queries[i]?.result) {
113+
return
114+
}
115+
if ('header' in input[0]) {
116+
console.log('pushChangesArrow header')
117+
adhocQueries[pipelineName].queries[i].result.columns.push(
118+
...input[0].header.fields.map((field) => ({
119+
name: field.name,
120+
case_sensitive: true,
121+
columntype: { nullable: field.nullable },
122+
unused: false
123+
}))
124+
)
125+
return
126+
}
127+
console.log('pushChangesArrow rows', input.length)
109128
{
110-
pushChanges,
111-
onBytesSkipped: (skippedBytes) => {
112-
if (!adhocQueries[pipelineName].queries[i]?.result) {
113-
return
114-
}
115-
adhocQueries[pipelineName].queries[i].result.totalSkippedBytes += skippedBytes
116-
},
129+
// Limit result size behavior - ignore all but first bufferSize rows
130+
const previousLength = adhocQueries[pipelineName].queries[i].result.rows().length
131+
adhocQueries[pipelineName].queries[i].result
132+
.rows()
133+
.push(
134+
...input
135+
.slice(0, bufferSize - previousLength)
136+
.map((v) => (isArrowRow(v) ? { cells: v.row } : v) as Row)
137+
)
138+
reclosureKey(adhocQueries[pipelineName].queries[i].result, 'rows')
139+
if (input.length > bufferSize - previousLength) {
140+
queueMicrotask(() => {
141+
if (!adhocQueries[pipelineName].queries[i]) {
142+
return
143+
}
144+
if (adhocQueries[pipelineName].queries[i].result?.rows) {
145+
adhocQueries[pipelineName].queries[i].result.rows().push({
146+
warning: `The result contains more rows, but only the first ${bufferSize} are shown`
147+
})
148+
reclosureKey(adhocQueries[pipelineName].queries[i].result, 'rows')
149+
}
150+
adhocQueries[pipelineName].queries[i].result?.endResultStream()
151+
})
152+
}
153+
}
154+
}
155+
const batchProcessor = new BatchProcessor(
156+
{ cancel: () => result.cancel() },
157+
{
158+
pushChanges: pushChangesArrow,
117159
onParseEnded: () => {
118160
if (!adhocQueries[pipelineName].queries[i]) {
119161
return
120162
}
121163
// Add field for the next query if the last query did not yield an error right away
122164
if (
123165
adhocQueries[pipelineName].queries.length === i + 1 &&
124-
((row) => !row || isDataRow(row))(
166+
((row) => !row || isJsonRow(row))(
125167
adhocQueries[pipelineName].queries[i].result?.rows().at(0)
126168
)
127169
) {
@@ -130,17 +172,28 @@
130172
adhocQueries[pipelineName].queries[i].progress = false
131173
},
132174
onNetworkError(e, injectValue) {
133-
injectValue({ error: e.message })
175+
// injectValue({ error: e.message })
134176
}
135-
},
136-
new CustomJSONParserTransformStream<Record<string, SQLValueJS>>({
137-
paths: ['$'],
138-
separator: ''
139-
}),
140-
{
141-
bufferSize: 8 * 1024 * 1024
142177
}
143178
)
179+
const cancel = () => {
180+
batchProcessor.cancel()
181+
}
182+
result.stream
183+
.pipeThrough(
184+
splitStreamByMaxChunk({
185+
maxChunkBytes: 100000,
186+
maxChunkBufferSize: 8 * 1024 * 1024,
187+
onBytesSkipped: (skippedBytes) => {
188+
if (!adhocQueries[pipelineName].queries[i]?.result) {
189+
return
190+
}
191+
adhocQueries[pipelineName].queries[i].result.totalSkippedBytes += skippedBytes
192+
}
193+
})
194+
)
195+
.pipeThrough(new SimpleArrowIPCTransformStream())
196+
.pipeTo(batchProcessor)
144197
adhocQueries[pipelineName].queries[i].result.endResultStream = cancel
145198
}
146199
</script>

web-console/src/lib/components/pipelines/editor/TabChangeStream.svelte

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@
5454
pipelinesRelations[pipelineName][relationName].cancelStream = undefined
5555
return undefined
5656
}
57-
const { cancel } = parseCancellable(
58-
result,
57+
const batchProcessor = new BatchProcessor(
58+
{ cancel: () => result.cancel() },
5959
{
6060
pushChanges: (rows: XgressEntry[]) => {
6161
const initialLen = changeStream[pipelineName].rows.length
@@ -94,25 +94,36 @@
9494
.map((i) => i - offset)
9595
.filter((i) => i >= 0)
9696
},
97-
onBytesSkipped: (skippedBytes) => {
98-
pushAsCircularBuffer(
99-
() => changeStream[pipelineName].rows,
100-
bufferSize,
101-
(v) => v
102-
)([{ relationName, skippedBytes }])
103-
changeStream[pipelineName].totalSkippedBytes += skippedBytes
104-
},
10597
onParseEnded: () =>
10698
(pipelinesRelations[pipelineName][relationName].cancelStream = undefined)
107-
},
108-
new CustomJSONParserTransformStream<XgressEntry>({
109-
paths: ['$.json_data.*'],
110-
separator: ''
111-
}),
112-
{
113-
bufferSize: 8 * 1024 * 1024
11499
}
115100
)
101+
const cancel = () => {
102+
batchProcessor.cancel()
103+
}
104+
result.stream
105+
.pipeThrough(
106+
splitStreamByMaxChunk({
107+
maxChunkBytes: 100000,
108+
maxChunkBufferSize: 8 * 1024 * 1024,
109+
onBytesSkipped: (skippedBytes) => {
110+
pushAsCircularBuffer(
111+
() => changeStream[pipelineName].rows,
112+
bufferSize,
113+
(v) => v
114+
)([{ relationName, skippedBytes }])
115+
changeStream[pipelineName].totalSkippedBytes += skippedBytes
116+
}
117+
})
118+
)
119+
.pipeThrough(
120+
new CustomJSONParserTransformStream<XgressEntry>({
121+
paths: ['$.json_data.*'],
122+
separator: ''
123+
})
124+
)
125+
.pipeThrough(createBatchingTransform())
126+
.pipeTo(batchProcessor)
116127
return () => {
117128
cancel()
118129
}
@@ -173,9 +184,11 @@
173184
import { Pane, PaneGroup, PaneResizer } from 'paneforge'
174185
import type { Field, Relation } from '$lib/services/manager'
175186
import {
187+
BatchProcessor,
188+
createBatchingTransform,
176189
CustomJSONParserTransformStream,
177-
parseCancellable,
178-
pushAsCircularBuffer
190+
pushAsCircularBuffer,
191+
splitStreamByMaxChunk
179192
} from '$lib/functions/pipelines/changeStream'
180193
import JSONbig from 'true-json-bigint'
181194
import { groupBy } from '$lib/functions/common/array'

web-console/src/lib/components/pipelines/editor/TabLogs.svelte

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
import LogsStreamList from '$lib/components/pipelines/editor/LogsStreamList.svelte'
2727
2828
import {
29-
parseCancellable,
29+
BatchProcessor,
30+
createBatchingTransform,
3031
pushAsCircularBuffer,
31-
SplitNewlineTransformStream
32+
SplitNewlineTransformStream,
33+
splitStreamByMaxChunk
3234
} from '$lib/functions/pipelines/changeStream'
3335
import { type ExtendedPipeline, type PipelineStatus } from '$lib/services/pipelineManager'
3436
import { usePipelineActionCallbacks } from '$lib/compositions/pipelines/usePipelineActionCallbacks.svelte'
@@ -113,8 +115,8 @@
113115
tryRestartStream(pipelineName, 5000)
114116
return
115117
}
116-
const { cancel } = parseCancellable(
117-
result,
118+
const batchProcessor = new BatchProcessor(
119+
{ cancel: () => result.cancel() },
118120
{
119121
pushChanges: (changes: string[]) => {
120122
const droppedNum = pushAsCircularBuffer(
@@ -130,16 +132,25 @@
130132
return
131133
}
132134
tryRestartStream(pipelineName, 5000)
133-
},
134-
onBytesSkipped(bytes) {
135-
streams[pipelineName].totalSkippedBytes += bytes
136135
}
137-
},
138-
new SplitNewlineTransformStream(),
139-
{
140-
bufferSize: 16 * 1024 * 1024
141136
}
142137
)
138+
const cancel = () => {
139+
batchProcessor.cancel()
140+
}
141+
result.stream
142+
.pipeThrough(
143+
splitStreamByMaxChunk({
144+
maxChunkBytes: 100000,
145+
maxChunkBufferSize: 16 * 1024 * 1024,
146+
onBytesSkipped(bytes) {
147+
streams[pipelineName].totalSkippedBytes += bytes
148+
}
149+
})
150+
)
151+
.pipeThrough(new SplitNewlineTransformStream())
152+
.pipeThrough(createBatchingTransform())
153+
.pipeTo(batchProcessor)
143154
streams[pipelineName] = {
144155
firstRowIndex: 0,
145156
stream: { open: result.stream, stop: cancel },

0 commit comments

Comments
 (0)