Skip to content

Commit 30d9d73

Browse files
committed
Add hash mode for ad-hoc queries.
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 341e36c commit 30d9d73

File tree

15 files changed

+167
-17
lines changed

15 files changed

+167
-17
lines changed

Cargo.lock

Lines changed: 13 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ arc-swap = "1.5.1"
5656
arcstr = "1.2.0"
5757
arrow = "55"
5858
arrow-json = "55"
59+
arrow-digest = "55"
5960
ascii_table = "=4.0.2"
6061
async-channel = "2.3.1"
6162
async-std = "1.12.0"
@@ -90,6 +91,7 @@ crossbeam = "0.8.4"
9091
crossbeam-utils = "0.8.6"
9192
csv = "1.2.2"
9293
csv-core = "0.1.10"
94+
dashmap = "6.1.0"
9395
datafusion = "47"
9496
dbsp = { path = "crates/dbsp", version = "0.129.0" }
9597
dbsp_nexmark = { path = "crates/nexmark" }

crates/adapters/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,9 @@ backtrace = { workspace = true }
183183
itertools = { workspace = true }
184184
metrics-process = { version = "2.4.0", default-features = false }
185185
serde_json_path_to_error = { workspace = true }
186-
dashmap = "6.1.0"
187186
smallstr = { workspace = true }
187+
dashmap = { workspace = true }
188+
arrow-digest = { workspace = true }
188189

189190
[package.metadata.cargo-machete]
190191
ignored = ["num-traits"]

crates/adapters/src/adhoc/executor.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use arrow::array::RecordBatch;
22
use arrow::ipc::writer::StreamWriter;
33
use arrow::util::pretty::pretty_format_batches;
4+
use arrow_digest::{RecordDigest, RecordDigestV0};
45
use arrow_json::writer::LineDelimited;
56
use arrow_json::WriterBuilder;
67
use async_stream::{stream, try_stream};
@@ -18,6 +19,7 @@ use parquet::arrow::async_writer::AsyncFileWriter;
1819
use parquet::arrow::AsyncArrowWriter;
1920
use parquet::basic::Compression;
2021
use parquet::file::properties::WriterProperties;
22+
use sha2::Sha256;
2123
use std::convert::Infallible;
2224
use tokio::sync::mpsc::error::SendError;
2325
use tokio::sync::oneshot::Receiver;
@@ -96,6 +98,29 @@ pub(crate) fn stream_text_query(
9698
}
9799
}
98100

101+
pub(crate) async fn hash_query_result(df: DataFrame) -> Result<String, PipelineError> {
102+
let schema = df.schema().inner().clone();
103+
let stream_executor = execute_stream(df)
104+
.await
105+
.map_err(|e| PipelineError::AdHocQueryError {
106+
error: e.to_string(),
107+
df: None,
108+
})?;
109+
let mut stream = stream_executor.map_err(|e| PipelineError::AdHocQueryError {
110+
error: e.to_string(),
111+
df: Some(Box::new(e)),
112+
})?;
113+
114+
let mut digest = RecordDigestV0::<Sha256>::new(&schema);
115+
while let Some(batch) = stream.next().await {
116+
let batch = batch.map_err(PipelineError::from)?;
117+
digest.update(&batch);
118+
}
119+
120+
let hash = digest.finalize();
121+
Ok(format!("{:X}", hash))
122+
}
123+
99124
pub(crate) fn stream_json_query(
100125
df: DataFrame,
101126
) -> impl Stream<Item = Result<ByteString, PipelineError>> {

crates/adapters/src/adhoc/format.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use arrow::array::RecordBatch;
22
use arrow::error::ArrowError;
33
use arrow::util::display::{ArrayFormatter, FormatOptions};
44
use comfy_table::{Cell, Table};
5+
56
pub(crate) fn create_table(results: &[RecordBatch]) -> Result<Table, ArrowError> {
67
let options = FormatOptions::default().with_display_error(true);
78
let mut table = Table::new();

crates/adapters/src/adhoc/mod.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use datafusion::execution::runtime_env::RuntimeEnvBuilder;
77
use datafusion::execution::SessionStateBuilder;
88
use datafusion::prelude::*;
99
use executor::{
10-
infallible_from_bytestring, stream_arrow_query, stream_json_query, stream_parquet_query,
11-
stream_text_query,
10+
hash_query_result, infallible_from_bytestring, stream_arrow_query, stream_json_query,
11+
stream_parquet_query, stream_text_query,
1212
};
1313
use feldera_adapterlib::errors::journal::ControllerError;
1414
use feldera_types::config::PipelineConfig;
@@ -165,6 +165,20 @@ async fn adhoc_query_handler(
165165
}
166166
}
167167
}
168+
AdHocResultFormat::Hash => {
169+
let hash_result = hash_query_result(df).await;
170+
match hash_result {
171+
Ok(hash) => {
172+
ws_session.text(hash).await?;
173+
}
174+
Err(e) => {
175+
ws_session
176+
.text(serde_json::to_string(&e).unwrap_or(e.to_string()))
177+
.await?;
178+
ws_close(ws_session, CloseCode::Error).await;
179+
}
180+
}
181+
}
168182
}
169183

170184
Ok(())
@@ -302,5 +316,8 @@ pub(crate) async fn stream_adhoc_result(
302316
.content_type(mime::APPLICATION_OCTET_STREAM)
303317
.streaming(stream_parquet_query(df)))
304318
}
319+
AdHocResultFormat::Hash => Ok(HttpResponse::Ok()
320+
.content_type(mime::TEXT_PLAIN)
321+
.body(hash_query_result(df).await?)),
305322
}
306323
}

crates/fda/src/adhoc.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ pub(crate) async fn handle_adhoc_query(
157157
eprintln!("`query` command does not support Prometheus output format");
158158
std::process::exit(1);
159159
}
160+
OutputFormat::Hash => AdHocResultFormat::Hash,
160161
};
161162
let sql = sql.unwrap_or_else(|| {
162163
if stdin {
@@ -228,6 +229,19 @@ pub(crate) async fn handle_adhoc_query(
228229
result_file.flush().unwrap();
229230
println!("Query result saved to '{}'", path.display());
230231
}
232+
AdHocResultFormat::Hash => {
233+
while let Some(chunk) = websocket.next().await {
234+
let mut text: String = String::new();
235+
if let Ok(Message::Text(chunk)) = chunk {
236+
text.push_str(chunk.as_str());
237+
print!("{}", chunk);
238+
} else {
239+
handle_websocket_message_generic(&mut websocket, chunk).await;
240+
break;
241+
}
242+
}
243+
println!()
244+
}
231245
}
232246
}
233247

crates/fda/src/bench/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,10 @@ impl Benchmark {
324324
match format {
325325
OutputFormat::Json => serde_json::to_string_pretty(&self.as_map()).unwrap(),
326326
OutputFormat::Text => self.format_as_text(),
327-
OutputFormat::ArrowIpc | OutputFormat::Parquet | OutputFormat::Prometheus => {
327+
OutputFormat::ArrowIpc
328+
| OutputFormat::Parquet
329+
| OutputFormat::Prometheus
330+
| OutputFormat::Hash => {
328331
warn!("Format '{}' is not supported for benchmark results, falling back to text format", format);
329332
self.format_as_text()
330333
}

crates/fda/src/cli.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ pub enum OutputFormat {
126126
///
127127
/// This format can only be specified for the `metrics` command.
128128
Prometheus,
129+
/// Return the output as a hash of the result.
130+
///
131+
/// This format can only be specified for SQL queries.
132+
Hash,
129133
}
130134

131135
impl Display for OutputFormat {
@@ -136,6 +140,7 @@ impl Display for OutputFormat {
136140
OutputFormat::ArrowIpc => "arrow_ipc",
137141
OutputFormat::Parquet => "parquet",
138142
OutputFormat::Prometheus => "prometheus",
143+
OutputFormat::Hash => "hash",
139144
};
140145
write!(f, "{}", output)
141146
}

crates/fda/src/shell.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,10 @@ pub async fn shell(format: OutputFormat, name: String, client: Client) {
153153
OutputFormat::Json => "json",
154154
OutputFormat::ArrowIpc => "arrow",
155155
OutputFormat::Parquet => "parquet",
156-
OutputFormat::Prometheus => "prometheus",
156+
OutputFormat::Hash => "hash",
157+
OutputFormat::Prometheus => {
158+
panic!("Prometheus format is not supported for ad-hoc SQL");
159+
}
157160
};
158161
match client
159162
.pipeline_adhoc_sql()

0 commit comments

Comments
 (0)