Switch ad-hoc queries to arrow_ipc format#4240
Conversation
56b7606 to
9c0a916
Compare
|
It doesn't look like it answers queries (I tried fraud-detection), revision 9c0a916 |
|
I think you encountered this one: #4239 |
9c0a916 to
166c097
Compare
gz
left a comment
There was a problem hiding this comment.
lgtm make sure to do
some testing before merging (different pipelines with different types (variant map etc.) and queries)
|
@Karakatiza666 can we merge this? |
|
Haven't had the chance to do a few tests with complex types, hopefully tomorrow |
94e2e00 to
726dedf
Compare
|
Blocked by #4287 |
mythical-fred
left a comment
There was a problem hiding this comment.
Web-console behavioral changes without tests. Blocked by #4287 per author — but tests are also needed before this lands.
| import BigNumber from 'bignumber.js' | ||
| import Dayjs from 'dayjs' | ||
|
|
||
| const arrowIpcValueToJS = <T extends DataType<Type, any>>(arrowType: Field<T>, value: any) => { |
There was a problem hiding this comment.
New Arrow IPC value conversion logic covers a lot of type cases (int64, timestamps, structs, maps, lists). This is exactly the kind of logic that deserves unit tests — each type case can go wrong independently. Per Gerd's directive (2026-03-04): behavioral changes require tests. Setup: npm install -D vitest @testing-library/svelte jsdom. The pure conversion functions here are ideal for unit testing without any DOM or component setup.
|
@abhizer can we close this? |
|
This is for the UI, cc @Karakatiza666 |
revert) stream_arrow_query used a synchronous Arrow IPC StreamWriter to an async mpsc by spawning one tokio task per std::io::Write::write call: let handle = TOKIO.spawn(async move { tx.send(bytes).await }); self.handles.push(handle); Each StreamWriter::write(&batch) makes ~6 sequential write_all calls The spawned tasks have no ordering relation; on a multi-thread tokio runtime they race to send into the receiver, so bytes arrive in arbitrary order and the resulting Arrow IPC stream gets corrupted. The fix is to not call sync Write from inside an async future at all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains the buffer between batches via std::mem::take(writer.get_mut()), then yields a single ordered Bytes chunk per batch. Memory cost is bounded by one record batch; behaviour matches stream_json_query, which has always used this shape. ChannelWriter retains its AsyncFileWriter impl for the parquet path (AsyncArrowWriter awaits each write future before issuing the next, so ordering there is already safe); the racy std::io::Write impl, the handles vec, and the cfg(test) reordering shim are all removed. Refs: #3923 #3792 #4287 #4226 #5814 #4240 Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
revert) stream_arrow_query used a synchronous Arrow IPC StreamWriter to an async mpsc by spawning one tokio task per std::io::Write::write call: let handle = TOKIO.spawn(async move { tx.send(bytes).await }); self.handles.push(handle); Each StreamWriter::write(&batch) makes ~6 sequential write_all calls The spawned tasks have no ordering relation; on a multi-thread tokio runtime they race to send into the receiver, so bytes arrive in arbitrary order and the resulting Arrow IPC stream gets corrupted. The fix is to not call sync Write from inside an async future at all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains the buffer between batches via std::mem::take(writer.get_mut()), then yields a single ordered Bytes chunk per batch. Memory cost is bounded by one record batch; behaviour matches stream_json_query, which has always used this shape. ChannelWriter retains its AsyncFileWriter impl for the parquet path (AsyncArrowWriter awaits each write future before issuing the next, so ordering there is already safe); the racy std::io::Write impl, the handles vec, and the cfg(test) reordering shim are all removed. Refs: #3923 #3792 #4287 #4226 #5814 #4240 Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
revert) stream_arrow_query used a synchronous Arrow IPC StreamWriter to an async mpsc by spawning one tokio task per std::io::Write::write call: let handle = TOKIO.spawn(async move { tx.send(bytes).await }); self.handles.push(handle); Each StreamWriter::write(&batch) makes ~6 sequential write_all calls The spawned tasks have no ordering relation; on a multi-thread tokio runtime they race to send into the receiver, so bytes arrive in arbitrary order and the resulting Arrow IPC stream gets corrupted. The fix is to not call sync Write from inside an async future at all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains the buffer between batches via std::mem::take(writer.get_mut()), then yields a single ordered Bytes chunk per batch. Memory cost is bounded by one record batch; behaviour matches stream_json_query, which has always used this shape. ChannelWriter retains its AsyncFileWriter impl for the parquet path (AsyncArrowWriter awaits each write future before issuing the next, so ordering there is already safe); the racy std::io::Write impl, the handles vec, and the cfg(test) reordering shim are all removed. Refs: #3923 #3792 #4287 #4226 #5814 #4240 Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
|
@Karakatiza666 we shoudl be able to fix/merge this now |
…eldera#4226 feldera#5814 revert) stream_arrow_query used a synchronous Arrow IPC StreamWriter to an async mpsc by spawning one tokio task per std::io::Write::write call: let handle = TOKIO.spawn(async move { tx.send(bytes).await }); self.handles.push(handle); Each StreamWriter::write(&batch) makes ~6 sequential write_all calls The spawned tasks have no ordering relation; on a multi-thread tokio runtime they race to send into the receiver, so bytes arrive in arbitrary order and the resulting Arrow IPC stream gets corrupted. The fix is to not call sync Write from inside an async future at all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains the buffer between batches via std::mem::take(writer.get_mut()), then yields a single ordered Bytes chunk per batch. Memory cost is bounded by one record batch; behaviour matches stream_json_query, which has always used this shape. ChannelWriter retains its AsyncFileWriter impl for the parquet path (AsyncArrowWriter awaits each write future before issuing the next, so ordering there is already safe); the racy std::io::Write impl, the handles vec, and the cfg(test) reordering shim are all removed. Refs: feldera#3923 feldera#3792 feldera#4287 feldera#4226 feldera#5814 feldera#4240 Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
fada2e5 to
8255106
Compare
|
Switching from LargeList to List for ad-hoc queries causes invalidation of all checksums we use in QA. I'll try to add support for LargeList to the upstream JS package first |
|
Tracked in apache/arrow-js#438 |
8af2a11 to
1c0d393
Compare
1c0d393 to
ac58588
Compare
ac58588 to
c397118
Compare
Make ARRAYs serialize as arrow List for ad-hoc queries Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
c397118 to
e41d7c5
Compare
[web-console] Improve ad-hoc error rendering and formatting (remove message duplication) DataFusion errors raised during query setup were not reported over HTTP: the backend had already responded with status 200 and simply interrupted the stream when the error was thrown. Start the query before deciding the HTTP status. If it fails to start, respond with HTTP 400 and the error message so the client can surface it. If it starts cleanly, respond with 200 OK and stream the results as before. This adds no significant delay - only a few extra milliseconds for query planning and SQL types header generation; the initial response does not wait for the first data bytes. Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
Switch the web console's ad-hoc query transport from HTTP streaming to the
WebSocket `/v0/pipelines/{name}/query` endpoint. WebSocket reports a runtime
error raised mid-query as a dedicated text frame followed by an error close,
so errors propagate to the UI without having to be encoded into the arrow-ipc
result stream.
`adHocQuery` opens the socket, sends `{sql, format:'arrow_ipc'}`, and pipes
binary frames into a `ReadableStream<Uint8Array>` consumed by the existing
`RecordBatchReader` path. apache-arrow's ReadableStream adapter swallows a
stream error (it cancels and ends the stream rather than rethrowing), so the
byte stream is always closed cleanly and a query error is reported out of band
via `error()`; `TabAdHocQuery` checks it after draining and renders the message
(an up-front failure such as selecting from a non-materialized source produces
no arrow schema, which is now handled rather than crashing).
Browsers cannot set the `Authorization` header on a WebSocket handshake, so the
bearer token and selected tenant are sent as `feldera-bearer.*` /
`feldera-tenant.*` subprotocols (base64url). A new manager middleware,
`promote_websocket_subprotocol_auth`, promotes them back to the standard headers
ahead of the bearer-auth path, so WebSocket handshakes authenticate through the
exact same validator as every other request and API keys keep working too.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
ac9f7ef to
6bd33bd
Compare
Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
Changes:
Testing: manual, added unit test for formatting
Part of #4219: Deprecate the JSON format for ad-hoc queries