Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 173 additions & 12 deletions crates/adapters/src/adhoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ pub(crate) async fn execute_sql(

/// Plan and translate `sql` against `state`, applying `PREPARE`/`EXECUTE`
/// substitution within the scope of a single ad-hoc request.
///
/// Only the final statement returns rows. Earlier statements may be
/// `PREPARE`s or any non-result-producing statement (e.g. `INSERT`),
/// executed for their side effect.
async fn execute_sql_with_state(
state: SessionState,
sql: &str,
Expand All @@ -315,9 +319,6 @@ async fn execute_sql_with_state(
let mut prepared: HashMap<String, LogicalPlan> = HashMap::new();
let sql_options = SQLOptions::new().with_allow_ddl(false);

// For now, only the final statement may produce a result set. All
// preceding statements must be PREPAREs whose inner plans are stashed
// for a later EXECUTE in the same request.
while statements.len() > 1 {
let stmt = statements.pop_front().unwrap();
let plan = state.statement_to_plan(stmt).await?;
Expand All @@ -326,14 +327,40 @@ async fn execute_sql_with_state(
sql_options.verify_plan(&input)?;
prepared.insert(name, (*input).clone());
}
_ => {
LogicalPlan::Statement(Statement::Execute(Execute { name, parameters })) => {
// `EXECUTE` of a previously-prepared statement, used here
// for its side effects (e.g. a prepared INSERT).
let prepared_plan =
prepared
.remove(&name)
.ok_or_else(|| PipelineError::AdHocQueryError {
error: format!(
"prepared statement '{name}' is not defined in this request"
),
df: None,
})?;
let values = execute_parameters_to_scalars(&parameters)?;
let bound = prepared_plan.replace_params_with_values(&ParamValues::List(values))?;
sql_options.verify_plan(&bound)?;
drain_intermediate_plan(&state, bound).await?;
}
other if is_result_producing_plan(&other) => {
return Err(PipelineError::AdHocQueryError {
error: "only PREPARE statements may precede the final statement \
in a multi-statement ad-hoc query"
error: "only the final statement in a multi-statement \
ad-hoc query may return a result set; \
move SELECTs to the end or split into \
separate requests"
.to_string(),
df: None,
});
}
other => {
// Non-result-producing intermediate statement (INSERT,
// UPDATE, DELETE, EXPLAIN, ...). Execute it for its side
Comment thread
gz marked this conversation as resolved.
// effects and discard the per-statement count row.
sql_options.verify_plan(&other)?;
drain_intermediate_plan(&state, other).await?;
}
}
}

Expand Down Expand Up @@ -374,6 +401,26 @@ async fn execute_sql_with_state(
Ok(DataFrame::new(state, final_plan))
}

/// True if executing this plan would surface rows to the caller. Used to
/// reject queries like `SELECT; INSERT` where the early `SELECT` would
/// otherwise be silently dropped.
fn is_result_producing_plan(plan: &LogicalPlan) -> bool {
!matches!(plan, LogicalPlan::Dml(_) | LogicalPlan::Statement(_))
}

/// Execute an intermediate statement for its side effects and drop the
/// resulting batches. INSERTs produce a one-row count; we keep that
/// count out of the response stream so only the request's final
/// statement contributes rows.
async fn drain_intermediate_plan(
state: &SessionState,
plan: LogicalPlan,
) -> Result<(), PipelineError> {
let df = DataFrame::new(state.clone(), plan);
let _ = df.collect().await?;
Ok(())
}

/// Convert `EXECUTE` positional parameters to DataFusion's `ScalarAndMetadata`
/// list, rejecting anything that is not a literal value.
fn execute_parameters_to_scalars(params: &[Expr]) -> Result<Vec<ScalarAndMetadata>, PipelineError> {
Expand Down Expand Up @@ -408,10 +455,38 @@ fn parse_sql_statements(
.with_dialect(dialect.as_ref())
.with_recursion_limit(recursion_limit)
.build()?
.parse_statements()?;
.parse_statements()
.map_err(format_parser_error)?;
Ok(statements)
}

/// Convert a DataFusion error coming out of the SQL parser into a
Comment thread
gz marked this conversation as resolved.
/// `PipelineError` whose message is the parser's `Display`, not its
/// `Debug` form. The parser already appends the location ("at Line: X,
/// Column: Y") to its messages; preserving that string gives the user
/// something like
/// `sql parser error: Expected: end of statement, found: in at Line: 1, Column: 30`
/// instead of the wrapped
/// `SQL error: ParserError("Expected: ... at Line: 1, Column: 30")`.
///
/// The DataFusion parser may wrap its `DataFusionError::SQL` in a
/// `DataFusionError::Diagnostic`; unwrap that here so the inner parser
/// message reaches the user.
fn format_parser_error(error: datafusion::error::DataFusionError) -> PipelineError {
use datafusion::error::DataFusionError;
let inner = match error {
DataFusionError::Diagnostic(_, inner) => *inner,
other => other,
};
match inner {
DataFusionError::SQL(parser_err, _) => PipelineError::AdHocQueryError {
error: parser_err.to_string(),
df: None,
},
other => PipelineError::from(other),
}
}

/// Stream the result of an ad-hoc query using a HTTP streaming response.
pub(crate) async fn stream_adhoc_result(
controller: &Controller,
Expand Down Expand Up @@ -501,6 +576,33 @@ mod tests {
assert!(parse_sql_statements(&state, "SELECT * FROM").is_err());
}

/// Parser errors must include the line/column of the offending token so
Comment thread
gz marked this conversation as resolved.
/// the user can locate the typo without re-reading the query in their
/// head.
#[test]
fn parse_error_message_carries_location() {
let state = test_state();
// 'in' is not a valid statement starter here; the parser stops on the
// token after the column reference, which is at line 1 / column 30.
let err = parse_sql_statements(&state, "select * from foo where bar = in baz")
.expect_err("expected a parser error");
let msg = format!("{err}");
assert!(
msg.contains("Line: 1"),
"missing line number in error message: {msg}"
);
assert!(
msg.contains("Column:"),
"missing column number in error message: {msg}"
);
// The `Debug`-formatted `ParserError("...")` wrapper from earlier
// versions of the message should be gone.
assert!(
!msg.contains("ParserError(\""),
"raw Debug wrapper leaked into error message: {msg}"
);
}

#[test]
fn execute_parameters_to_scalars_rejects_non_literal() {
let expr = Expr::Column(datafusion::common::Column::new_unqualified("foo"));
Expand Down Expand Up @@ -559,15 +661,74 @@ mod tests {
assert_eq!(total_rows, 0);
}

/// An intermediate `SELECT` (or any other result-producing statement)
/// must be rejected: only one result set comes back per request, so
/// executing the earlier SELECT silently would discard its rows.
#[tokio::test]
async fn non_prepare_intermediate_statement_errors() {
async fn intermediate_select_is_rejected() {
let state = test_state();
let err = execute_sql_with_state(state, "SELECT 1; SELECT 2")
.await
.unwrap_err();
assert!(
format!("{err:?}").contains("PREPARE"),
"unexpected error: {err:?}"
);
let msg = format!("{err}");
assert!(msg.contains("final statement"), "unexpected error: {msg}");
}

/// Multiple `INSERT`s followed by a `SELECT` must execute in order,
/// committing each insert's side effect, and only surface the final
/// `SELECT`'s rows.
#[tokio::test]
async fn intermediate_inserts_run_and_final_select_returns_rows() {
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::MemTable;
use std::sync::Arc;

// Register a writable in-memory table so DML executes for real.
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
let mem = MemTable::try_new(schema.clone(), vec![vec![]]).unwrap();
let ctx = SessionContext::new_with_state(test_state());
ctx.register_table("t", Arc::new(mem)).unwrap();
let state = ctx.state();

let batches = collect_rows(
state,
"INSERT INTO t VALUES (1); INSERT INTO t VALUES (2); \
SELECT SUM(x) AS s FROM t",
)
.await;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1);
let col = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.expect("int64 column");
assert_eq!(col.value(0), 3);
}

/// A trailing `INSERT` (no final SELECT) must still execute, and
/// the final statement's count row is surfaced as today.
#[tokio::test]
async fn final_insert_returns_count() {
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::MemTable;
use std::sync::Arc;

let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
let mem = MemTable::try_new(schema.clone(), vec![vec![]]).unwrap();
let ctx = SessionContext::new_with_state(test_state());
ctx.register_table("t", Arc::new(mem)).unwrap();
let state = ctx.state();

let batches = collect_rows(
state,
"INSERT INTO t VALUES (10); INSERT INTO t VALUES (20)",
)
.await;
// The final INSERT yields a single-row count batch; check only
// that one row came back.
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1);
}
}
20 changes: 14 additions & 6 deletions crates/fda/src/adhoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use feldera_rest_api::Client;
use feldera_types::query::{AdHocResultFormat, AdhocQueryArgs};
use futures_util::SinkExt;
use futures_util::StreamExt;
use log::{debug, error, trace, warn};
use log::{debug, error, trace};
use reqwest_websocket::{CloseCode, Message, RequestBuilderExt};

use crate::UPGRADE_NOTICE;
Expand Down Expand Up @@ -91,11 +91,16 @@ async fn handle_websocket_message_generic(
if code == CloseCode::Normal {
trace!("Websocket normal closure.");
} else if code == CloseCode::Error {
// A runtime error during query execution closes the WS
// with `CloseCode::Error`. The server already sent the
// error details in a preceding text frame, so we don't
// print another message here; we only need to surface a
// non-zero exit code for scripts and CI.
// Tracks https://github.com/feldera/feldera/issues/4973
if !reason.is_empty() {
warn!("Error encountered during query processing: {}.", reason);
} else {
warn!("Error encountered during query processing.");
eprintln!("ERROR: {}.", reason);
}
std::process::exit(1);
} else {
eprint!("Connection unexpectedly closed by pipeline ({})", code);
if !reason.is_empty() {
Expand Down Expand Up @@ -150,8 +155,11 @@ pub(crate) async fn handle_adhoc_query(
let format = match format {
OutputFormat::Text => AdHocResultFormat::Text,
OutputFormat::Json => {
warn!(
"The JSON format is deprecated for ad-hoc queries, see https://github.com/feldera/feldera/issues/4219 for the tracking issue."
// JSON requested; log deprecation warning to stderr.
eprintln!(
"warning: the JSON format for ad-hoc queries is deprecated. \
Prefer `--format arrow_ipc` or the default text mode. \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer or need?

See https://github.com/feldera/feldera/issues/4219."
);
AdHocResultFormat::Json
}
Expand Down
15 changes: 15 additions & 0 deletions crates/fda/test.bash
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,21 @@ fda connector p1 example unknown start || true

# Adhoc queries
fda query p1 "SELECT * FROM example"
# Arrow IPC is the recommended format; verify it produces output and the
# text-mode pretty-printer downstream of the parser handles it.
fda --format arrow_ipc query p1 "SELECT * FROM example"

# Runtime errors during query execution must surface as a non-zero exit
# code in WebSocket mode, otherwise scripts have no way to detect a
# failure.
fail_on_success fda query p1 "SELECT 1/0"
fail_on_success fda --format arrow_ipc query p1 "SELECT 1/0"
fail_on_success fda --format json query p1 "SELECT 1/0"

# Intermediate `SELECT`s still error today; we plan to support
# `select; select; ...` once a streaming protocol can frame multiple
# result sets back to the caller.
fail_on_success fda query p1 "SELECT 1; SELECT 2"

# Transaction tests
echo "Testing transaction commands..."
Expand Down
11 changes: 9 additions & 2 deletions python/feldera/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,9 +974,14 @@ def query(self, query: str) -> Generator[Mapping[str, Any], None, None]:
Executes an ad-hoc SQL query on this pipeline and returns a generator
that yields the rows of the result as Python dictionaries. For
``INSERT`` and ``DELETE`` queries, consider using :meth:`.execute`
instead. All floating-point numbers are deserialized as Decimal objects
instead. All floating-point numbers are deserialized as ``Decimal``
to avoid precision loss.

For new code, prefer :meth:`.query_arrow`: Arrow IPC keeps full
SQL type fidelity, lets ``MAP`` keys be non-string values, and
returns every column even when two share a name. See
https://github.com/feldera/feldera/issues/4219.

Note:
You can only ``SELECT`` from materialized tables and views.

Expand All @@ -994,7 +999,9 @@ def query(self, query: str) -> Generator[Mapping[str, Any], None, None]:
:raises FelderaAPIError: If the query is invalid.
"""

return self.client.query_as_json(self.name, query)
# Delegate to the non-deprecated internal helper so calls through
# `query()` don't surface a DeprecationWarning to user code.
return self.client._query_json_stream(self.name, query)

def query_parquet(self, query: str, path: str):
"""
Expand Down
Loading
Loading