diff --git a/Cargo.lock b/Cargo.lock index d55b3c4a378..6e797e83fc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4664,6 +4664,7 @@ dependencies = [ name = "fda" version = "0.42.0" dependencies = [ + "arrow", "chrono", "clap 4.5.31", "clap_complete", diff --git a/crates/adapters/src/adhoc/mod.rs b/crates/adapters/src/adhoc/mod.rs index f37f34c5520..1c3e1462c41 100644 --- a/crates/adapters/src/adhoc/mod.rs +++ b/crates/adapters/src/adhoc/mod.rs @@ -2,6 +2,7 @@ use crate::PipelineError; use actix_web::http::header; use actix_web::HttpResponse; use arrow::array::RecordBatch; +use arrow::ipc::writer::StreamWriter; use arrow::util::pretty::pretty_format_batches; use arrow_json::writer::LineDelimited; use arrow_json::WriterBuilder; @@ -104,6 +105,20 @@ impl AsyncFileWriter for ChannelWriter { } } +impl std::io::Write for ChannelWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + // Clone the buffer and send it + let bytes = Bytes::copy_from_slice(buf); + let len = bytes.len(); + futures::executor::block_on(self.tx.send(bytes)).unwrap(); + Ok(len) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + /// We execute the dataframe in our dbsp tokio runtime. The reason is that this runtime will /// have a multi-threaded scheduler that can run things on many cores with work-stealing, whereas /// the actix-web runtime is single-threaded. This is important for datafusion because it can @@ -254,6 +269,49 @@ pub async fn stream_adhoc_result( } } })), + AdHocResultFormat::ArrowIpc => { + let (tx, mut rx) = mpsc::channel(1024); + + let mut stream_job = Box::pin(async move { + let mut channel_writer = ChannelWriter::new(tx); + let schema = df.schema().inner().clone(); + let mut stream = execute_stream(df).await.expect("unable to receive stream")?; + let mut writer = StreamWriter::try_new(&mut channel_writer, &schema).unwrap(); + + while let Some(batch) = stream.next().await { + let batch = batch.map_err(DataFusionError::from)?; + writer.write(&batch).map_err(DataFusionError::from)?; + } + writer.flush().map_err(DataFusionError::from)?; + writer.finish().map_err(DataFusionError::from)?; + >::Ok(()) + }.fuse()); + + Ok(HttpResponse::Ok() + .content_type(mime::APPLICATION_OCTET_STREAM) + .streaming(stream! { + loop { + select! { + stream_res = stream_job.as_mut() => { + match stream_res { + Ok(()) => {} + Err(err) => { + yield Err(err); + } + } + }, + maybe_bytes = rx.recv().fuse() => { + if let Some(bytes) = maybe_bytes { + yield Ok(bytes); + } else { + // Channel closed, we're done + break; + } + } + } + } + })) + }, AdHocResultFormat::Parquet => { let file_name = format!( "results_{}.parquet", diff --git a/crates/fda/Cargo.toml b/crates/fda/Cargo.toml index 9f5dc69fcb0..5914f0780c0 100644 --- a/crates/fda/Cargo.toml +++ b/crates/fda/Cargo.toml @@ -31,6 +31,7 @@ futures = "0.3" tokio-util = "0.7" tempfile = "3.15" rmpv = { version = "1.3", features = ["with-serde"] } +arrow = { version = "54", features = ["ipc", "prettyprint"] } [build-dependencies] prettyplease = "0.2.22" diff --git a/crates/fda/src/cli.rs b/crates/fda/src/cli.rs index 3bf899103f1..32de9370f44 100644 --- a/crates/fda/src/cli.rs +++ b/crates/fda/src/cli.rs @@ -1,5 +1,6 @@ use clap::{Parser, Subcommand, ValueEnum, ValueHint}; use clap_complete::engine::{ArgValueCompleter, CompletionCandidate}; +use std::fmt::Display; use std::path::PathBuf; use crate::cd::types::{CompilationProfile, ProgramConfig}; @@ -103,6 +104,26 @@ pub enum OutputFormat { /// /// This usually corresponds to the exact response returned from the server. Json, + /// Request the output in Arrow IPC format. + /// + /// This format can only be specified for SQL queries. + ArrowIpc, + /// Return the output in Parquet format. + /// + /// This format can only be specified for SQL queries. + Parquet, +} + +impl Display for OutputFormat { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let output = match self { + OutputFormat::Text => "text", + OutputFormat::Json => "json", + OutputFormat::ArrowIpc => "arrow_ipc", + OutputFormat::Parquet => "parquet", + }; + write!(f, "{}", output) + } } #[derive(Subcommand)] diff --git a/crates/fda/src/main.rs b/crates/fda/src/main.rs index f62157ec139..2545d2e10d2 100644 --- a/crates/fda/src/main.rs +++ b/crates/fda/src/main.rs @@ -3,7 +3,10 @@ use std::convert::Infallible; use std::fs::File; use std::io::{ErrorKind, Read, Write}; +use std::path::PathBuf; +use arrow::ipc::reader::StreamReader; +use arrow::util::pretty::pretty_format_batches; use clap::{CommandFactory, Parser}; use clap_complete::CompleteEnv; use feldera_types::config::{FtConfig, RuntimeConfig, StorageOptions}; @@ -37,6 +40,28 @@ use crate::cd::*; use crate::cli::*; use crate::shell::shell; +/// Creates a unique filename by appending a number to the base name if it already exists. +fn unique_file(base: &str, extension: &str) -> Result<(PathBuf, File), std::io::Error> { + let mut path = PathBuf::from(format!("{}.{}", base, extension)); + let mut count = 1; + loop { + let file = File::create_new(&path); + match file { + Ok(file) => { + return Ok((path, file)); + } + Err(e) => { + if e.kind() == ErrorKind::AlreadyExists { + path = PathBuf::from(format!("{}_{}.{}", base, count, extension)); + count += 1; + } else { + return Err(e); + } + } + } + } +} + /// Adds the API key to the headers if it was supplied fn make_auth_headers(auth: &Option) -> Result { let mut headers = HeaderMap::new(); @@ -273,6 +298,10 @@ async fn api_key_commands(format: OutputFormat, action: ApiKeyActions, client: C .expect("Failed to serialize API key response") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } ApiKeyActions::Delete { name } => { @@ -321,6 +350,10 @@ async fn api_key_commands(format: OutputFormat, action: ApiKeyActions, client: C .expect("Failed to serialize API key list") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } } @@ -360,6 +393,10 @@ async fn pipelines(format: OutputFormat, client: Client) { .expect("Failed to serialize pipeline list") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } @@ -574,6 +611,10 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) .expect("Failed to serialize pipeline response") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } else { // Already reported error in read_program_code or read_file. @@ -842,6 +883,10 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) .expect("Failed to serialize pipeline stats") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } PipelineAction::Logs { name, watch } => { @@ -939,6 +984,10 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) .expect("Failed to serialize pipeline stats") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } PipelineAction::Config { name } => { @@ -966,6 +1015,10 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) .expect("Failed to serialize pipeline stats") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } PipelineAction::SetConfig { name, key, value } => { @@ -1025,6 +1078,10 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) .expect("Failed to serialize pipeline stats") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } PipelineAction::Program { action } => program(format, action, client).await, @@ -1125,15 +1182,17 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) shell(format, name, client2).await } PipelineAction::Query { name, sql, stdin } => { - let format = match format { + let format_str = match format { OutputFormat::Text => "text", OutputFormat::Json => "json", + OutputFormat::ArrowIpc => "arrow_ipc", + OutputFormat::Parquet => "parquet", }; let response = client .pipeline_adhoc_sql() .pipeline_name(name) - .format(format) + .format(format_str) .sql(sql.unwrap_or_else(|| { if stdin { let mut program_code = String::new(); @@ -1159,20 +1218,56 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) )) .unwrap(); - let mut byte_stream = response.into_inner(); - while let Some(chunk) = byte_stream.next().await { - let mut buffer = Vec::new(); - match chunk { - Ok(chunk) => buffer.extend_from_slice(&chunk), - Err(e) => { - eprintln!("ERROR: Unable to read server response: {}", e); - std::process::exit(1); + match format { + OutputFormat::Text | OutputFormat::Json => { + let mut byte_stream = response.into_inner(); + while let Some(chunk) = byte_stream.next().await { + let mut buffer = Vec::new(); + match chunk { + Ok(chunk) => buffer.extend_from_slice(&chunk), + Err(e) => { + eprintln!("ERROR: Unable to read server response: {}", e); + std::process::exit(1); + } + } + let text = String::from_utf8_lossy(&buffer); + print!("{}", text); + } + println!() + } + OutputFormat::ArrowIpc => { + let mut ipc_bytes: Vec = Vec::new(); + let mut byte_stream = response.into_inner(); + while let Some(chunk) = byte_stream.next().await { + match chunk { + Ok(chunk) => ipc_bytes.write_all(chunk.as_ref()).unwrap(), + Err(e) => { + eprintln!("ERROR: Unable to read server response: {}", e); + std::process::exit(1); + } + } } + let reader = StreamReader::try_new(ipc_bytes.as_slice(), None).unwrap(); + let results = reader.collect::, _>>(); + println!("{}", pretty_format_batches(&results.unwrap()).unwrap()); + } + OutputFormat::Parquet => { + let (path, mut result_file) = + unique_file("result", "parquet").expect("Failed to create parquet file"); + let mut byte_stream = response.into_inner(); + while let Some(chunk) = byte_stream.next().await { + match chunk { + Ok(chunk) => result_file.write_all(chunk.as_ref()).unwrap(), + Err(e) => { + eprintln!("ERROR: Unable to read server response: {}", e); + std::process::exit(1); + } + } + } + result_file.flush().unwrap(); + println!("Query result saved to '{}'", path.display()); } - let text = String::from_utf8_lossy(&buffer); - print!("{}", text); } - println!() } } } @@ -1302,6 +1397,10 @@ async fn connector( .expect("Failed to serialize pipeline stats") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } }; @@ -1345,6 +1444,10 @@ async fn program(format: OutputFormat, action: ProgramAction, client: Client) { .expect("Failed to serialize pipeline stats") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } ProgramAction::Config { name } => { @@ -1372,6 +1475,10 @@ async fn program(format: OutputFormat, action: ProgramAction, client: Client) { .expect("Failed to serialize pipeline stats") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } ProgramAction::SetConfig { name, profile } => { @@ -1413,6 +1520,10 @@ async fn program(format: OutputFormat, action: ProgramAction, client: Client) { .expect("Failed to serialize pipeline stats") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } ProgramAction::Status { name } => { @@ -1444,6 +1555,10 @@ async fn program(format: OutputFormat, action: ProgramAction, client: Client) { .expect("Failed to serialize pipeline stats") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } ProgramAction::Set { @@ -1491,6 +1606,10 @@ async fn program(format: OutputFormat, action: ProgramAction, client: Client) { .expect("Failed to serialize pipeline stats") ); } + _ => { + eprintln!("Unsupported output format: {}", format); + std::process::exit(1); + } } } else { // Already reported error in read_program_code or read_file. diff --git a/crates/fda/src/shell.rs b/crates/fda/src/shell.rs index 1e8f6b8cacb..96da4c21a02 100644 --- a/crates/fda/src/shell.rs +++ b/crates/fda/src/shell.rs @@ -150,6 +150,8 @@ pub async fn shell(format: OutputFormat, name: String, client: Client) { let format = match format { OutputFormat::Text => "text", OutputFormat::Json => "json", + OutputFormat::ArrowIpc => "arrow", + OutputFormat::Parquet => "parquet", }; match client .pipeline_adhoc_sql() diff --git a/crates/feldera-types/src/query.rs b/crates/feldera-types/src/query.rs index 68093c7aeef..75cf2666873 100644 --- a/crates/feldera-types/src/query.rs +++ b/crates/feldera-types/src/query.rs @@ -12,6 +12,8 @@ pub enum AdHocResultFormat { Json, /// Download results in a parquet file. Parquet, + /// Stream data in the arrow IPC format. + ArrowIpc, } impl Display for AdHocResultFormat { @@ -20,6 +22,7 @@ impl Display for AdHocResultFormat { AdHocResultFormat::Text => write!(f, "text"), AdHocResultFormat::Json => write!(f, "json"), AdHocResultFormat::Parquet => write!(f, "parquet"), + AdHocResultFormat::ArrowIpc => write!(f, "arrow_ipc"), } } } diff --git a/openapi.json b/openapi.json index fb772436127..7f892b251b9 100644 --- a/openapi.json +++ b/openapi.json @@ -2963,7 +2963,8 @@ "enum": [ "text", "json", - "parquet" + "parquet", + "arrow_ipc" ] }, "AdhocQueryArgs": {