Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 58 additions & 0 deletions crates/adapters/src/adhoc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::PipelineError;
use actix_web::http::header;
use actix_web::HttpResponse;
use arrow::array::RecordBatch;
use arrow::ipc::writer::StreamWriter;
use arrow::util::pretty::pretty_format_batches;
use arrow_json::writer::LineDelimited;
use arrow_json::WriterBuilder;
Expand Down Expand Up @@ -104,6 +105,20 @@ impl AsyncFileWriter for ChannelWriter {
}
}

impl std::io::Write for ChannelWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
// Clone the buffer and send it
let bytes = Bytes::copy_from_slice(buf);
let len = bytes.len();
futures::executor::block_on(self.tx.send(bytes)).unwrap();
Ok(len)
}

fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

/// We execute the dataframe in our dbsp tokio runtime. The reason is that this runtime will
/// have a multi-threaded scheduler that can run things on many cores with work-stealing, whereas
/// the actix-web runtime is single-threaded. This is important for datafusion because it can
Expand Down Expand Up @@ -254,6 +269,49 @@ pub async fn stream_adhoc_result(
}
}
})),
AdHocResultFormat::ArrowIpc => {
let (tx, mut rx) = mpsc::channel(1024);

let mut stream_job = Box::pin(async move {
let mut channel_writer = ChannelWriter::new(tx);
let schema = df.schema().inner().clone();
let mut stream = execute_stream(df).await.expect("unable to receive stream")?;
let mut writer = StreamWriter::try_new(&mut channel_writer, &schema).unwrap();

while let Some(batch) = stream.next().await {
let batch = batch.map_err(DataFusionError::from)?;
writer.write(&batch).map_err(DataFusionError::from)?;
}
writer.flush().map_err(DataFusionError::from)?;
writer.finish().map_err(DataFusionError::from)?;
<datafusion::common::Result<_>>::Ok(())
}.fuse());

Ok(HttpResponse::Ok()
.content_type(mime::APPLICATION_OCTET_STREAM)
.streaming(stream! {
Comment thread
gz marked this conversation as resolved.
loop {
select! {
stream_res = stream_job.as_mut() => {
match stream_res {
Ok(()) => {}
Err(err) => {
yield Err(err);
}
}
},
maybe_bytes = rx.recv().fuse() => {
if let Some(bytes) = maybe_bytes {
yield Ok(bytes);
} else {
// Channel closed, we're done
break;
}
}
}
}
}))
},
AdHocResultFormat::Parquet => {
let file_name = format!(
"results_{}.parquet",
Expand Down
1 change: 1 addition & 0 deletions crates/fda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ futures = "0.3"
tokio-util = "0.7"
tempfile = "3.15"
rmpv = { version = "1.3", features = ["with-serde"] }
arrow = { version = "54", features = ["ipc", "prettyprint"] }

[build-dependencies]
prettyplease = "0.2.22"
Expand Down
21 changes: 21 additions & 0 deletions crates/fda/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use clap::{Parser, Subcommand, ValueEnum, ValueHint};
use clap_complete::engine::{ArgValueCompleter, CompletionCandidate};
use std::fmt::Display;
use std::path::PathBuf;

use crate::cd::types::{CompilationProfile, ProgramConfig};
Expand Down Expand Up @@ -103,6 +104,26 @@ pub enum OutputFormat {
///
/// This usually corresponds to the exact response returned from the server.
Json,
/// Request the output in Arrow IPC format.
///
/// This format can only be specified for SQL queries.
ArrowIpc,
/// Return the output in Parquet format.
///
/// This format can only be specified for SQL queries.
Parquet,
}

impl Display for OutputFormat {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let output = match self {
OutputFormat::Text => "text",
OutputFormat::Json => "json",
OutputFormat::ArrowIpc => "arrow_ipc",
OutputFormat::Parquet => "parquet",
};
write!(f, "{}", output)
}
}

#[derive(Subcommand)]
Expand Down
Loading