Skip to content

Commit 93a6046

Browse files
committed
[adapters] Fix adhoc scan implementation for multihost.
When scanning a table produces multiple batches, it's important to feed all of the batches into a single `StreamWriter`. Until now, the code here used a separate `StreamWriter` for each batch, but that didn't work properly. This wasn't noticed until now because the tests only produced small outputs that had only one batch. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent c1d989e commit 93a6046

File tree

1 file changed

+13
-12
lines changed

1 file changed

+13
-12
lines changed

crates/adapters/src/server.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2579,20 +2579,21 @@ async fn coordination_adhoc_scan(
25792579
// likely.
25802580
let first_batch = match stream.next().await {
25812581
Some(Err(error)) => return Err(error.into()),
2582-
other => other,
2583-
}
2584-
.into_iter();
2582+
other => other.into_iter(),
2583+
};
25852584

25862585
let schema = stream.schema();
2587-
Ok(HttpResponseBuilder::new(StatusCode::OK).streaming(
2588-
futures_util::stream::iter(first_batch).chain(stream).map(
2589-
move |batch| -> Result<Bytes, PipelineError> {
2590-
let mut writer = StreamWriter::try_new(Vec::new(), &schema)?;
2591-
writer.write(&batch?)?;
2592-
Ok(writer.into_inner()?.into())
2593-
},
2594-
),
2595-
))
2586+
let response_stream = async_stream::try_stream! {
2587+
let mut writer = StreamWriter::try_new(Vec::new(), &schema)?;
2588+
let mut stream = futures_util::stream::iter(first_batch).chain(stream);
2589+
while let Some(batch) = stream.next().await {
2590+
writer.write(&batch?)?;
2591+
yield Bytes::copy_from_slice(writer.get_ref().as_slice());
2592+
writer.get_mut().clear();
2593+
}
2594+
yield writer.into_inner()?.into();
2595+
};
2596+
Ok(HttpResponseBuilder::new(StatusCode::OK).streaming::<_, PipelineError>(response_stream))
25962597
}
25972598

25982599
/// Stream the set of incomplete labels.

0 commit comments

Comments
 (0)