Skip to content
Merged
Prev Previous commit
[adapters] Fix adhoc scan implementation for multihost.
When scanning a table produces multiple batches, it's important to feed
all of the batches into a single `StreamWriter`.  Until now, the code here
used a separate `StreamWriter` for each batch, but that didn't work
properly.

This wasn't noticed until now because the tests only produced small
outputs that had only one batch.

Signed-off-by: Ben Pfaff <blp@feldera.com>
  • Loading branch information
blp committed Mar 24, 2026
commit 7f2a613bbe4a686a8bca7fbe8a593188b4b95b33
25 changes: 13 additions & 12 deletions crates/adapters/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2579,20 +2579,21 @@ async fn coordination_adhoc_scan(
// likely.
let first_batch = match stream.next().await {
Some(Err(error)) => return Err(error.into()),
other => other,
}
.into_iter();
other => other.into_iter(),
};

let schema = stream.schema();
Ok(HttpResponseBuilder::new(StatusCode::OK).streaming(
futures_util::stream::iter(first_batch).chain(stream).map(
move |batch| -> Result<Bytes, PipelineError> {
let mut writer = StreamWriter::try_new(Vec::new(), &schema)?;
writer.write(&batch?)?;
Ok(writer.into_inner()?.into())
},
),
))
let response_stream = async_stream::try_stream! {
let mut writer = StreamWriter::try_new(Vec::new(), &schema)?;
let mut stream = futures_util::stream::iter(first_batch).chain(stream);
while let Some(batch) = stream.next().await {
writer.write(&batch?)?;
yield Bytes::copy_from_slice(writer.get_ref().as_slice());
writer.get_mut().clear();
}
yield writer.into_inner()?.into();
};
Ok(HttpResponseBuilder::new(StatusCode::OK).streaming::<_, PipelineError>(response_stream))
}

/// Stream the set of incomplete labels.
Expand Down
Loading