Commit 80264d9
committed
stream_arrow_query used a synchronous Arrow IPC StreamWriter to an
async mpsc by spawning one tokio task per std::io::Write::write call:
let handle = TOKIO.spawn(async move { tx.send(bytes).await });
self.handles.push(handle);
Each StreamWriter::write(&batch) makes ~6 sequential write_all calls
The spawned tasks have no ordering relation; on a multi-thread tokio
runtime they race to send into the receiver, so bytes arrive in
arbitrary order and the resulting Arrow IPC stream gets corrupted.
The fix is to not call sync Write from inside an async future at
all. stream_arrow_query now hands StreamWriter a Vec<u8> and drains
the buffer between batches via std::mem::take(writer.get_mut()), then
yields a single ordered Bytes chunk per batch. Memory cost is bounded
by one record batch; behaviour matches stream_json_query, which has
always used this shape.
ChannelWriter retains its AsyncFileWriter impl for the parquet path
(AsyncArrowWriter awaits each write future before issuing the next,
so ordering there is already safe); the racy std::io::Write impl, the
handles vec, and the cfg(test) reordering shim are all removed.
Refs: #3923 #3792 #4287 #4226 #5814 #4240
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 23b56cb commit 80264d9
1 file changed
Lines changed: 142 additions & 187 deletions
0 commit comments