Skip to content

Commit 726dedf

Browse files
committed
[WebConsole] WIP Switch ad-hoc queries to arrow_ipc format
Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
1 parent c312ac1 commit 726dedf

File tree

10 files changed

+330
-137
lines changed

10 files changed

+330
-137
lines changed

crates/adapters/src/format/parquet/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,8 @@ pub fn relation_to_arrow_fields(fields: &[Field], delta_lake: bool) -> Vec<Arrow
292292
SqlType::Array => {
293293
// SqlType::Array implies c.component.is_some()
294294
let array_component = c.component.as_ref().unwrap();
295-
DataType::LargeList(Arc::new(ArrowField::new_list_field(
295+
// TODO: Replace with LargeList when apache-arrow JS library supports it
296+
DataType::List(Arc::new(ArrowField::new_list_field(
296297
columntype_to_datatype(array_component, delta_lake),
297298
// FIXME: Databricks refuses to understand the `nullable: false` constraint.
298299
delta_lake || array_component.nullable,

web-console/bun.lockb

5.78 KB
Binary file not shown.

web-console/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
"@vincjo/datatables": "^2.5.0",
5555
"@vusion/webfonts-generator": "^0.8.0",
5656
"add": "^2.0.6",
57+
"apache-arrow": "^20.0.0",
5758
"args": "^5.0.3",
5859
"array-join": "^3.2.1",
5960
"autoprefixer": "^10.4.20",

web-console/src/lib/components/adhoc/Query.svelte

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@
189189
{#if result.columns.length}
190190
<thead>
191191
<tr>
192+
<th class="bg-white-dark sticky top-0 z-10 {itemHeight}">#</th>
192193
{#each result.columns as column}
193194
<SqlColumnHeader
194195
{column}
@@ -208,6 +209,7 @@
208209
{@const row = rows[index]}
209210
{#if !row}{:else if 'cells' in row}
210211
<tr {style} class="{itemHeight} whitespace-nowrap odd:bg-white odd:dark:bg-black">
212+
<td class="select-none text-right font-mono">{index}</td>
211213
{#each row.cells as value}
212214
<SQLValue
213215
{value}

web-console/src/lib/components/pipelines/editor/TabAdHocQuery.svelte

Lines changed: 86 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313
import { isPipelineInteractive } from '$lib/functions/pipelines/status'
1414
import type { SQLValueJS } from '$lib/types/sql.ts'
1515
import {
16-
CustomJSONParserTransformStream,
17-
parseCancellable
16+
arrowIPCChunkTransform,
17+
BatchProcessor,
18+
arrowIPCTransformStream,
19+
splitStreamByMaxChunk,
20+
type ArrowIpcChunk
1821
} from '$lib/functions/pipelines/changeStream'
1922
import invariant from 'tiny-invariant'
2023
import WarningBanner from '$lib/components/pipelines/editor/WarningBanner.svelte'
@@ -34,8 +37,9 @@
3437
adhocQueries[pipelineName] ??= { queries: [{ query: '' }] }
3538
})
3639
const api = usePipelineManager()
37-
const isDataRow = (record: Record<string, SQLValueJS>) =>
40+
const isJsonRow = (record: Record<string, SQLValueJS>) =>
3841
!(Object.keys(record).length === 1 && ('error' in record || 'warning' in record))
42+
const isArrowRow = (chunk: ArrowIpcChunk) => 'row' in chunk
3943
4044
const onSubmitQuery = (pipelineName: string, i: number) => async (query: string) => {
4145
const request = api.adHocQuery(pipelineName, query)
@@ -58,15 +62,15 @@
5862
return
5963
}
6064
const bufferSize = 1000
61-
const pushChanges = (
65+
const pushChangesJson = (
6266
input: (Record<string, SQLValueJS> | { error: string } | { warning: string })[]
6367
) => {
6468
if (!adhocQueries[pipelineName].queries[i]?.result) {
6569
return
6670
}
6771
if (
6872
adhocQueries[pipelineName].queries[i].result.columns.length === 0 &&
69-
isDataRow(input[0])
73+
isJsonRow(input[0])
7074
) {
7175
adhocQueries[pipelineName].queries[i].result.columns.push(
7276
...Object.keys(input[0]).map((name) => ({
@@ -85,7 +89,7 @@
8589
.push(
8690
...input
8791
.slice(0, bufferSize - previousLength)
88-
.map((v) => (isDataRow(v) ? { cells: Object.values(v) } : v) as Row)
92+
.map((v) => (isJsonRow(v) ? { cells: Object.values(v) } : v) as Row)
8993
)
9094
reclosureKey(adhocQueries[pipelineName].queries[i].result, 'rows')
9195
if (input.length > bufferSize - previousLength) {
@@ -104,24 +108,60 @@
104108
}
105109
}
106110
}
107-
const { cancel } = parseCancellable(
108-
result,
111+
const pushChangesArrow = (input: ArrowIpcChunk[]) => {
112+
if (!adhocQueries[pipelineName].queries[i]?.result) {
113+
return
114+
}
115+
if ('header' in input[0]) {
116+
adhocQueries[pipelineName].queries[i].result.columns.push(
117+
...input[0].header.fields.map((field) => ({
118+
name: field.name,
119+
case_sensitive: true,
120+
columntype: { nullable: field.nullable },
121+
unused: false
122+
}))
123+
)
124+
return
125+
}
109126
{
110-
pushChanges,
111-
onBytesSkipped: (skippedBytes) => {
112-
if (!adhocQueries[pipelineName].queries[i]?.result) {
113-
return
114-
}
115-
adhocQueries[pipelineName].queries[i].result.totalSkippedBytes += skippedBytes
116-
},
127+
// Limit result size behavior - ignore all but first bufferSize rows
128+
const previousLength = adhocQueries[pipelineName].queries[i].result.rows().length
129+
adhocQueries[pipelineName].queries[i].result
130+
.rows()
131+
.push(
132+
...input
133+
.slice(0, bufferSize - previousLength)
134+
.map((v) => (isArrowRow(v) ? { cells: v.row } : v) as Row)
135+
)
136+
reclosureKey(adhocQueries[pipelineName].queries[i].result, 'rows')
137+
if (input.length > bufferSize - previousLength) {
138+
queueMicrotask(() => {
139+
if (!adhocQueries[pipelineName].queries[i]) {
140+
return
141+
}
142+
if (adhocQueries[pipelineName].queries[i].result?.rows) {
143+
adhocQueries[pipelineName].queries[i].result.rows().push({
144+
warning: `The result contains more rows, but only the first ${bufferSize} are shown`
145+
})
146+
reclosureKey(adhocQueries[pipelineName].queries[i].result, 'rows')
147+
}
148+
adhocQueries[pipelineName].queries[i].result?.endResultStream()
149+
})
150+
}
151+
}
152+
}
153+
const batchProcessor = new BatchProcessor(
154+
{ cancel: () => result.cancel() },
155+
{
156+
pushChanges: pushChangesArrow,
117157
onParseEnded: () => {
118158
if (!adhocQueries[pipelineName].queries[i]) {
119159
return
120160
}
121161
// Add field for the next query if the last query did not yield an error right away
122162
if (
123163
adhocQueries[pipelineName].queries.length === i + 1 &&
124-
((row) => !row || isDataRow(row))(
164+
((row) => !row || isJsonRow(row))(
125165
adhocQueries[pipelineName].queries[i].result?.rows().at(0)
126166
)
127167
) {
@@ -130,17 +170,39 @@
130170
adhocQueries[pipelineName].queries[i].progress = false
131171
},
132172
onNetworkError(e, injectValue) {
133-
injectValue({ error: e.message })
173+
// injectValue({ error: e.message })
134174
}
135-
},
136-
new CustomJSONParserTransformStream<Record<string, SQLValueJS>>({
137-
paths: ['$'],
138-
separator: ''
139-
}),
140-
{
141-
bufferSize: 8 * 1024 * 1024
142175
}
143176
)
177+
const cancel = () => {
178+
batchProcessor.cancel()
179+
}
180+
result.stream
181+
.pipeThrough(
182+
splitStreamByMaxChunk({
183+
maxChunkBytes: 100000,
184+
maxChunkBufferSize: 8 * 1024 * 1024,
185+
onBytesSkipped: (skippedBytes) => {
186+
if (!adhocQueries[pipelineName].queries[i]?.result) {
187+
return
188+
}
189+
adhocQueries[pipelineName].queries[i].result.totalSkippedBytes += skippedBytes
190+
}
191+
})
192+
)
193+
.pipeThrough(arrowIPCTransformStream())
194+
.pipeThrough(arrowIPCChunkTransform())
195+
.pipeTo(batchProcessor)
196+
.catch((err) => {
197+
if (!adhocQueries[pipelineName]?.queries[i]?.result) {
198+
return
199+
}
200+
adhocQueries[pipelineName].queries[i].result.rows().push({
201+
error: err instanceof Error ? err.message : JSON.stringify(err)
202+
})
203+
reclosureKey(adhocQueries[pipelineName].queries[i].result, 'rows')
204+
batchProcessor.cancel()
205+
})
144206
adhocQueries[pipelineName].queries[i].result.endResultStream = cancel
145207
}
146208
</script>

web-console/src/lib/components/pipelines/editor/TabChangeStream.svelte

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@
5454
pipelinesRelations[pipelineName][relationName].cancelStream = undefined
5555
return undefined
5656
}
57-
const { cancel } = parseCancellable(
58-
result,
57+
const batchProcessor = new BatchProcessor(
58+
{ cancel: () => result.cancel() },
5959
{
6060
pushChanges: (rows: XgressEntry[]) => {
6161
const initialLen = changeStream[pipelineName].rows.length
@@ -94,25 +94,36 @@
9494
.map((i) => i - offset)
9595
.filter((i) => i >= 0)
9696
},
97-
onBytesSkipped: (skippedBytes) => {
98-
pushAsCircularBuffer(
99-
() => changeStream[pipelineName].rows,
100-
bufferSize,
101-
(v) => v
102-
)([{ relationName, skippedBytes }])
103-
changeStream[pipelineName].totalSkippedBytes += skippedBytes
104-
},
10597
onParseEnded: () =>
10698
(pipelinesRelations[pipelineName][relationName].cancelStream = undefined)
107-
},
108-
new CustomJSONParserTransformStream<XgressEntry>({
109-
paths: ['$.json_data.*'],
110-
separator: ''
111-
}),
112-
{
113-
bufferSize: 8 * 1024 * 1024
11499
}
115100
)
101+
const cancel = () => {
102+
batchProcessor.cancel()
103+
}
104+
result.stream
105+
.pipeThrough(
106+
splitStreamByMaxChunk({
107+
maxChunkBytes: 100000,
108+
maxChunkBufferSize: 8 * 1024 * 1024,
109+
onBytesSkipped: (skippedBytes) => {
110+
pushAsCircularBuffer(
111+
() => changeStream[pipelineName].rows,
112+
bufferSize,
113+
(v) => v
114+
)([{ relationName, skippedBytes }])
115+
changeStream[pipelineName].totalSkippedBytes += skippedBytes
116+
}
117+
})
118+
)
119+
.pipeThrough(
120+
new CustomJSONParserTransformStream<XgressEntry>({
121+
paths: ['$.json_data.*'],
122+
separator: ''
123+
})
124+
)
125+
.pipeThrough(createBatchingTransform())
126+
.pipeTo(batchProcessor)
116127
return () => {
117128
cancel()
118129
}
@@ -173,9 +184,11 @@
173184
import { Pane, PaneGroup, PaneResizer } from 'paneforge'
174185
import type { Field, Relation } from '$lib/services/manager'
175186
import {
187+
BatchProcessor,
188+
createBatchingTransform,
176189
CustomJSONParserTransformStream,
177-
parseCancellable,
178-
pushAsCircularBuffer
190+
pushAsCircularBuffer,
191+
splitStreamByMaxChunk
179192
} from '$lib/functions/pipelines/changeStream'
180193
import JSONbig from 'true-json-bigint'
181194
import { groupBy } from '$lib/functions/common/array'

web-console/src/lib/components/pipelines/editor/TabLogs.svelte

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
import LogsStreamList from '$lib/components/pipelines/editor/LogsStreamList.svelte'
2727
2828
import {
29-
parseCancellable,
29+
BatchProcessor,
30+
createBatchingTransform,
3031
pushAsCircularBuffer,
31-
SplitNewlineTransformStream
32+
SplitNewlineTransformStream,
33+
splitStreamByMaxChunk
3234
} from '$lib/functions/pipelines/changeStream'
3335
import { type ExtendedPipeline, type PipelineStatus } from '$lib/services/pipelineManager'
3436
import { usePipelineActionCallbacks } from '$lib/compositions/pipelines/usePipelineActionCallbacks.svelte'
@@ -109,8 +111,8 @@
109111
tryRestartStream(pipelineName, 5000)
110112
return
111113
}
112-
const { cancel } = parseCancellable(
113-
result,
114+
const batchProcessor = new BatchProcessor(
115+
{ cancel: () => result.cancel() },
114116
{
115117
pushChanges: (changes: string[]) => {
116118
const droppedNum = pushAsCircularBuffer(
@@ -126,16 +128,25 @@
126128
return
127129
}
128130
tryRestartStream(pipelineName, 5000)
129-
},
130-
onBytesSkipped(bytes) {
131-
streams[pipelineName].totalSkippedBytes += bytes
132131
}
133-
},
134-
new SplitNewlineTransformStream(),
135-
{
136-
bufferSize: 16 * 1024 * 1024
137132
}
138133
)
134+
const cancel = () => {
135+
batchProcessor.cancel()
136+
}
137+
result.stream
138+
.pipeThrough(
139+
splitStreamByMaxChunk({
140+
maxChunkBytes: 100000,
141+
maxChunkBufferSize: 16 * 1024 * 1024,
142+
onBytesSkipped(bytes) {
143+
streams[pipelineName].totalSkippedBytes += bytes
144+
}
145+
})
146+
)
147+
.pipeThrough(new SplitNewlineTransformStream())
148+
.pipeThrough(createBatchingTransform())
149+
.pipeTo(batchProcessor)
139150
streams[pipelineName] = {
140151
firstRowIndex: 0,
141152
stream: { open: result.stream, stop: cancel },
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import type { SQLValueJS } from '$lib/types/sql'
2+
import type { DataType, Field, RecordBatch, TypeMap } from 'apache-arrow'
3+
import { Type } from 'apache-arrow'
4+
import BigNumber from 'bignumber.js'
5+
import Dayjs from 'dayjs'
6+
7+
const arrowIpcValueToJS = <T extends DataType<Type, any>>(arrowType: Field<T>, value: any) => {
8+
switch (arrowType.typeId) {
9+
case Type.Int64: {
10+
return new BigNumber(value)
11+
}
12+
case Type.Uint64: {
13+
return new BigNumber(value)
14+
}
15+
case Type.Timestamp: {
16+
return Dayjs(value)
17+
}
18+
case Type.Date: {
19+
return Dayjs(value)
20+
}
21+
case Type.DateMillisecond: {
22+
return Dayjs(value)
23+
}
24+
default: {
25+
return value
26+
}
27+
}
28+
}
29+
30+
export const arrowIpcBatchToJS = <T extends TypeMap>(
31+
batch: RecordBatch<T>
32+
): { row: SQLValueJS[] }[] => {
33+
const res: { row: SQLValueJS[] }[] = new Array(batch.numRows)
34+
35+
for (let i = 0; i < batch.numRows; i++) {
36+
const row: SQLValueJS[] = []
37+
38+
for (let j = 0; j < batch.schema.fields.length; j++) {
39+
const field = batch.schema.fields[j]
40+
const column = batch.getChildAt(j)
41+
42+
if (column && column.isValid(i)) {
43+
const value = column.get(i)
44+
row[j] = arrowIpcValueToJS(field, value)
45+
}
46+
}
47+
48+
res[i] = { row }
49+
}
50+
return res
51+
}

0 commit comments

Comments
 (0)