diff --git a/crates/adapters/src/adhoc.rs b/crates/adapters/src/adhoc.rs index 8e71be879f..21682d5a7c 100644 --- a/crates/adapters/src/adhoc.rs +++ b/crates/adapters/src/adhoc.rs @@ -292,14 +292,25 @@ pub(crate) async fn execute_sql( .await .ok_or_else(|| PipelineError::Initializing)?, ); - execute_sql_with_state(state, sql).await + execute_sql_with_state(state, sql, Some(controller)).await } /// Plan and translate `sql` against `state`, applying `PREPARE`/`EXECUTE` /// substitution within the scope of a single ad-hoc request. +/// +/// Only the final statement returns rows. Earlier statements may be +/// `PREPARE`s or any non-result-producing statement (e.g. `INSERT`), +/// executed for their side effect. After such an intermediate write +/// runs, the per-request snapshot is refreshed so the trailing SELECT +/// sees the just-written rows. +/// +/// `controller` is optional so unit tests can drive this function +/// without a running pipeline; in that case the post-write snapshot +/// refresh is skipped. async fn execute_sql_with_state( - state: SessionState, + mut state: SessionState, sql: &str, + controller: Option<&Controller>, ) -> Result { let mut statements = parse_sql_statements(&state, sql)?; if statements.is_empty() { @@ -315,9 +326,13 @@ async fn execute_sql_with_state( let mut prepared: HashMap = HashMap::new(); let sql_options = SQLOptions::new().with_allow_ddl(false); - // For now, only the final statement may produce a result set. All - // preceding statements must be PREPAREs whose inner plans are stashed - // for a later EXECUTE in the same request. + // Subscribe to `step_watcher` *before* any intermediate writes so the + // steps that drain those writes accumulate as unseen changes; calling + // `changed()` after the writes return then completes immediately + // instead of blocking on a step that may never be triggered. + let mut step_watcher = controller.map(|c| c.step_watcher()); + + let mut intermediate_wrote_data = false; while statements.len() > 1 { let stmt = statements.pop_front().unwrap(); let plan = state.statement_to_plan(stmt).await?; @@ -326,17 +341,56 @@ async fn execute_sql_with_state( sql_options.verify_plan(&input)?; prepared.insert(name, (*input).clone()); } - _ => { + LogicalPlan::Statement(Statement::Execute(Execute { name, parameters })) => { + // `EXECUTE` of a previously-prepared statement, used here + // for its side effects (e.g. a prepared INSERT). + let prepared_plan = + prepared + .remove(&name) + .ok_or_else(|| PipelineError::AdHocQueryError { + error: format!( + "prepared statement '{name}' is not defined in this request" + ), + df: None, + })?; + let values = execute_parameters_to_scalars(¶meters)?; + let bound = prepared_plan.replace_params_with_values(&ParamValues::List(values))?; + sql_options.verify_plan(&bound)?; + intermediate_wrote_data |= plan_writes_data(&bound); + drain_intermediate_plan(&state, bound).await?; + } + other if is_result_producing_plan(&other) => { return Err(PipelineError::AdHocQueryError { - error: "only PREPARE statements may precede the final statement \ - in a multi-statement ad-hoc query" + error: "only the final statement in a multi-statement \ + ad-hoc query may return a result set; \ + move SELECTs to the end or split into \ + separate requests" .to_string(), df: None, }); } + other => { + // Non-result-producing intermediate statement (INSERT, + // UPDATE, DELETE, EXPLAIN, ...). Execute it for its side + // effects and discard the per-statement count row. + sql_options.verify_plan(&other)?; + intermediate_wrote_data |= plan_writes_data(&other); + drain_intermediate_plan(&state, other).await?; + } } } + // The snapshot pinned in `state` was captured at request start, before + // any intermediate INSERT ran. Refresh it so the trailing SELECT sees + // the just-written rows. Tracks + // https://github.com/feldera/feldera/issues/6243. + if intermediate_wrote_data + && let Some(controller) = controller + && let Some(watcher) = step_watcher.as_mut() + { + refresh_snapshot_after_writes(controller, watcher, &mut state).await?; + } + let stmt = statements.pop_front().unwrap(); let plan = state.statement_to_plan(stmt).await?; @@ -374,6 +428,68 @@ async fn execute_sql_with_state( Ok(DataFrame::new(state, final_plan)) } +/// True if executing this plan would surface rows to the caller. Used to +/// reject queries like `SELECT; INSERT` where the early `SELECT` would +/// otherwise be silently dropped. +fn is_result_producing_plan(plan: &LogicalPlan) -> bool { + !matches!(plan, LogicalPlan::Dml(_) | LogicalPlan::Statement(_)) +} + +/// Execute an intermediate statement for its side effects and drop the +/// resulting batches. INSERTs produce a one-row count; we keep that +/// count out of the response stream so only the request's final +/// statement contributes rows. +async fn drain_intermediate_plan( + state: &SessionState, + plan: LogicalPlan, +) -> Result<(), PipelineError> { + let df = DataFrame::new(state.clone(), plan); + let _ = df.collect().await?; + Ok(()) +} + +/// True if executing this plan mutates a table (and therefore needs the +/// post-write snapshot refresh below). Today the only mutating plan our +/// SQL options allow is a `LogicalPlan::Dml`. +fn plan_writes_data(plan: &LogicalPlan) -> bool { + matches!(plan, LogicalPlan::Dml(_)) +} + +/// Wait for the controller to complete at least one full step after +/// the intermediate writes returned, then update `state`'s pinned +/// snapshot to the freshly produced one. The controller updates +/// `trace_snapshots` at the end of every non-transactional step, so +/// observing the next `Idle` transition is enough to guarantee that +/// our writes are visible. +/// +/// `watcher` must have been created before the intermediate writes +/// happened so the steps that drain them are already buffered as +/// unseen changes; otherwise this function would block waiting for +/// a future step that may never be triggered on an idle pipeline. +async fn refresh_snapshot_after_writes( + controller: &Controller, + watcher: &mut tokio::sync::watch::Receiver, + state: &mut SessionState, +) -> Result<(), PipelineError> { + use feldera_types::coordination::StepAction; + + loop { + let status = *watcher.borrow_and_update(); + if matches!(status.action, StepAction::Idle) { + break; + } + if watcher.changed().await.is_err() { + break; + } + } + let snapshot = controller + .latest_consistent_snapshot() + .await + .ok_or(PipelineError::Initializing)?; + set_snapshot(state, snapshot); + Ok(()) +} + /// Convert `EXECUTE` positional parameters to DataFusion's `ScalarAndMetadata` /// list, rejecting anything that is not a literal value. fn execute_parameters_to_scalars(params: &[Expr]) -> Result, PipelineError> { @@ -408,10 +524,38 @@ fn parse_sql_statements( .with_dialect(dialect.as_ref()) .with_recursion_limit(recursion_limit) .build()? - .parse_statements()?; + .parse_statements() + .map_err(format_parser_error)?; Ok(statements) } +/// Convert a DataFusion error coming out of the SQL parser into a +/// `PipelineError` whose message is the parser's `Display`, not its +/// `Debug` form. The parser already appends the location ("at Line: X, +/// Column: Y") to its messages; preserving that string gives the user +/// something like +/// `sql parser error: Expected: end of statement, found: in at Line: 1, Column: 30` +/// instead of the wrapped +/// `SQL error: ParserError("Expected: ... at Line: 1, Column: 30")`. +/// +/// The DataFusion parser may wrap its `DataFusionError::SQL` in a +/// `DataFusionError::Diagnostic`; unwrap that here so the inner parser +/// message reaches the user. +fn format_parser_error(error: datafusion::error::DataFusionError) -> PipelineError { + use datafusion::error::DataFusionError; + let inner = match error { + DataFusionError::Diagnostic(_, inner) => *inner, + other => other, + }; + match inner { + DataFusionError::SQL(parser_err, _) => PipelineError::AdHocQueryError { + error: parser_err.to_string(), + df: None, + }, + other => PipelineError::from(other), + } +} + /// Stream the result of an ad-hoc query using a HTTP streaming response. pub(crate) async fn stream_adhoc_result( controller: &Controller, @@ -501,6 +645,33 @@ mod tests { assert!(parse_sql_statements(&state, "SELECT * FROM").is_err()); } + /// Parser errors must include the line/column of the offending token so + /// the user can locate the typo without re-reading the query in their + /// head. + #[test] + fn parse_error_message_carries_location() { + let state = test_state(); + // 'in' is not a valid statement starter here; the parser stops on the + // token after the column reference, which is at line 1 / column 30. + let err = parse_sql_statements(&state, "select * from foo where bar = in baz") + .expect_err("expected a parser error"); + let msg = format!("{err}"); + assert!( + msg.contains("Line: 1"), + "missing line number in error message: {msg}" + ); + assert!( + msg.contains("Column:"), + "missing column number in error message: {msg}" + ); + // The `Debug`-formatted `ParserError("...")` wrapper from earlier + // versions of the message should be gone. + assert!( + !msg.contains("ParserError(\""), + "raw Debug wrapper leaked into error message: {msg}" + ); + } + #[test] fn execute_parameters_to_scalars_rejects_non_literal() { let expr = Expr::Column(datafusion::common::Column::new_unqualified("foo")); @@ -517,7 +688,7 @@ mod tests { /// A helper that executes a query and returns results. async fn collect_rows(state: SessionState, sql: &str) -> Vec { - execute_sql_with_state(state, sql) + execute_sql_with_state(state, sql, None) .await .unwrap() .collect() @@ -542,7 +713,7 @@ mod tests { #[tokio::test] async fn execute_without_prepare_errors() { let state = test_state(); - let err = execute_sql_with_state(state, "EXECUTE missing(1)") + let err = execute_sql_with_state(state, "EXECUTE missing(1)", None) .await .unwrap_err(); assert!( @@ -559,15 +730,74 @@ mod tests { assert_eq!(total_rows, 0); } + /// An intermediate `SELECT` (or any other result-producing statement) + /// must be rejected: only one result set comes back per request, so + /// executing the earlier SELECT silently would discard its rows. #[tokio::test] - async fn non_prepare_intermediate_statement_errors() { + async fn intermediate_select_is_rejected() { let state = test_state(); - let err = execute_sql_with_state(state, "SELECT 1; SELECT 2") + let err = execute_sql_with_state(state, "SELECT 1; SELECT 2", None) .await .unwrap_err(); - assert!( - format!("{err:?}").contains("PREPARE"), - "unexpected error: {err:?}" - ); + let msg = format!("{err}"); + assert!(msg.contains("final statement"), "unexpected error: {msg}"); + } + + /// Multiple `INSERT`s followed by a `SELECT` must execute in order, + /// committing each insert's side effect, and only surface the final + /// `SELECT`'s rows. + #[tokio::test] + async fn intermediate_inserts_run_and_final_select_returns_rows() { + use datafusion::arrow::array::Int64Array; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::datasource::MemTable; + use std::sync::Arc; + + // Register a writable in-memory table so DML executes for real. + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)])); + let mem = MemTable::try_new(schema.clone(), vec![vec![]]).unwrap(); + let ctx = SessionContext::new_with_state(test_state()); + ctx.register_table("t", Arc::new(mem)).unwrap(); + let state = ctx.state(); + + let batches = collect_rows( + state, + "INSERT INTO t VALUES (1); INSERT INTO t VALUES (2); \ + SELECT SUM(x) AS s FROM t", + ) + .await; + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 1); + let col = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .expect("int64 column"); + assert_eq!(col.value(0), 3); + } + + /// A trailing `INSERT` (no final SELECT) must still execute, and + /// the final statement's count row is surfaced as today. + #[tokio::test] + async fn final_insert_returns_count() { + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::datasource::MemTable; + use std::sync::Arc; + + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)])); + let mem = MemTable::try_new(schema.clone(), vec![vec![]]).unwrap(); + let ctx = SessionContext::new_with_state(test_state()); + ctx.register_table("t", Arc::new(mem)).unwrap(); + let state = ctx.state(); + + let batches = collect_rows( + state, + "INSERT INTO t VALUES (10); INSERT INTO t VALUES (20)", + ) + .await; + // The final INSERT yields a single-row count batch; check only + // that one row came back. + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 1); } } diff --git a/crates/adapters/src/controller.rs b/crates/adapters/src/controller.rs index 2c97597b64..f1f52d9af1 100644 --- a/crates/adapters/src/controller.rs +++ b/crates/adapters/src/controller.rs @@ -3022,30 +3022,27 @@ impl CircuitThread { self.step_circuit(); let transaction_state = self.controller.get_transaction_state(); - self.step_sender.send_replace(StepStatus::new( - self.step, - StepAction::Idle, - transaction_state.into_coordination_status(), - )); - // Update `trace_snapshot` to the latest traces. + // Update `trace_snapshot` to the latest traces *before* signaling + // `Idle` on the step watcher, so subscribers can rely on + // `latest_consistent_snapshot()` being current the moment they + // observe Idle. // - // We do this before updating `total_processed_records` so that ad hoc - // query results always reflect all data that we have reported - // processing; otherwise, there is a race for any code that runs a query - // as soon as input has been processed. - if transaction_state == TransactionState::None { - // Don't update the snapshot until bootstrapping is complete (including the additional post-bootstrap transaction). - // This guarantees that: - // 1. Ad hoc queries observe a consistent view of the data. - // 2. Ad hoc snapshots are up-to-date before the pipeline is marked as running. - if !self.controller.status.bootstrap_in_progress() { - Span::new("update") - .with_category("Step") - .with_tooltip(|| format!("update ad-hoc tables after step {}", self.step)) - .in_scope(|| self.update_snapshot()); - } + // We also keep this before updating `total_processed_records` so + // that ad hoc query results always reflect all data that we have + // reported processing. + // Keep `trace_snapshots` current on every step regardless of + // bootstrap or transaction state, so ad-hoc queries can read + // the freshest data including read-your-own-writes within one + // request. Connectors with `send_snapshot=true` still defer + // until bootstrap completes; that gate now lives in + // `enqueue_latest_snapshot`. + Span::new("update") + .with_category("Step") + .with_tooltip(|| format!("update ad-hoc tables after step {}", self.step)) + .in_scope(|| self.update_snapshot()); + if transaction_state == TransactionState::None { let bootstrapping = self.circuit.bootstrap_in_progress(); // If bootstrapping has completed, clear self.bootstrapping, but don't update the status flag @@ -3061,6 +3058,12 @@ impl CircuitThread { } } + self.step_sender.send_replace(StepStatus::new( + self.step, + StepAction::Idle, + transaction_state.into_coordination_status(), + )); + // Record that we've processed the records, unless there is a transaction in progress, // in which case records are ingested by the circuit but are not fully processed. let transaction_state = self.controller.get_transaction_state(); @@ -6704,6 +6707,17 @@ impl ControllerInner { processed_records: Option, step: Option, ) -> bool { + // Defer the initial snapshot delivery until bootstrap completes + // and no transaction is in flight; this is the gate that + // guarantees `send_snapshot=true` connectors receive a + // consistent, fully-formed view. Adhoc queries hit + // `trace_snapshots` directly and are not subject to this gate. + if self.status.bootstrap_in_progress() + || self.get_transaction_state() != TransactionState::None + { + return false; + } + // Look up the most recent cached snapshot for this stream. Return early // if no snapshot has been produced yet (pipeline hasn't completed a step). // This method is only called from the circuit thread (via `push_output`), diff --git a/crates/fda/src/adhoc.rs b/crates/fda/src/adhoc.rs index de9f52f152..95fe36190e 100644 --- a/crates/fda/src/adhoc.rs +++ b/crates/fda/src/adhoc.rs @@ -9,7 +9,7 @@ use feldera_rest_api::Client; use feldera_types::query::{AdHocResultFormat, AdhocQueryArgs}; use futures_util::SinkExt; use futures_util::StreamExt; -use log::{debug, error, trace, warn}; +use log::{debug, error, trace}; use reqwest_websocket::{CloseCode, Message, RequestBuilderExt}; use crate::UPGRADE_NOTICE; @@ -91,11 +91,16 @@ async fn handle_websocket_message_generic( if code == CloseCode::Normal { trace!("Websocket normal closure."); } else if code == CloseCode::Error { + // A runtime error during query execution closes the WS + // with `CloseCode::Error`. The server already sent the + // error details in a preceding text frame, so we don't + // print another message here; we only need to surface a + // non-zero exit code for scripts and CI. + // Tracks https://github.com/feldera/feldera/issues/4973 if !reason.is_empty() { - warn!("Error encountered during query processing: {}.", reason); - } else { - warn!("Error encountered during query processing."); + eprintln!("ERROR: {}.", reason); } + std::process::exit(1); } else { eprint!("Connection unexpectedly closed by pipeline ({})", code); if !reason.is_empty() { @@ -150,8 +155,11 @@ pub(crate) async fn handle_adhoc_query( let format = match format { OutputFormat::Text => AdHocResultFormat::Text, OutputFormat::Json => { - warn!( - "The JSON format is deprecated for ad-hoc queries, see https://github.com/feldera/feldera/issues/4219 for the tracking issue." + // JSON requested; log deprecation warning to stderr. + eprintln!( + "warning: the JSON format for ad-hoc queries is deprecated. \ + Prefer `--format arrow_ipc` or the default text mode. \ + See https://github.com/feldera/feldera/issues/4219." ); AdHocResultFormat::Json } diff --git a/crates/fda/test.bash b/crates/fda/test.bash index 4bf637ab3b..5bd09db4a6 100755 --- a/crates/fda/test.bash +++ b/crates/fda/test.bash @@ -120,6 +120,21 @@ fda connector p1 example unknown start || true # Adhoc queries fda query p1 "SELECT * FROM example" +# Arrow IPC is the recommended format; verify it produces output and the +# text-mode pretty-printer downstream of the parser handles it. +fda --format arrow_ipc query p1 "SELECT * FROM example" + +# Runtime errors during query execution must surface as a non-zero exit +# code in WebSocket mode, otherwise scripts have no way to detect a +# failure. +fail_on_success fda query p1 "SELECT 1/0" +fail_on_success fda --format arrow_ipc query p1 "SELECT 1/0" +fail_on_success fda --format json query p1 "SELECT 1/0" + +# Intermediate `SELECT`s still error today; we plan to support +# `select; select; ...` once a streaming protocol can frame multiple +# result sets back to the caller. +fail_on_success fda query p1 "SELECT 1; SELECT 2" # Transaction tests echo "Testing transaction commands..." diff --git a/python/feldera/pipeline.py b/python/feldera/pipeline.py index 191e7a4d5a..18f0eadd50 100644 --- a/python/feldera/pipeline.py +++ b/python/feldera/pipeline.py @@ -974,9 +974,14 @@ def query(self, query: str) -> Generator[Mapping[str, Any], None, None]: Executes an ad-hoc SQL query on this pipeline and returns a generator that yields the rows of the result as Python dictionaries. For ``INSERT`` and ``DELETE`` queries, consider using :meth:`.execute` - instead. All floating-point numbers are deserialized as Decimal objects + instead. All floating-point numbers are deserialized as ``Decimal`` to avoid precision loss. + For new code, prefer :meth:`.query_arrow`: Arrow IPC keeps full + SQL type fidelity, lets ``MAP`` keys be non-string values, and + returns every column even when two share a name. See + https://github.com/feldera/feldera/issues/4219. + Note: You can only ``SELECT`` from materialized tables and views. @@ -994,7 +999,9 @@ def query(self, query: str) -> Generator[Mapping[str, Any], None, None]: :raises FelderaAPIError: If the query is invalid. """ - return self.client.query_as_json(self.name, query) + # Delegate to the non-deprecated internal helper so calls through + # `query()` don't surface a DeprecationWarning to user code. + return self.client._query_json_stream(self.name, query) def query_parquet(self, query: str, path: str): """ diff --git a/python/feldera/rest/feldera_client.py b/python/feldera/rest/feldera_client.py index 6e2ca3da71..56f8869c21 100644 --- a/python/feldera/rest/feldera_client.py +++ b/python/feldera/rest/feldera_client.py @@ -2,6 +2,7 @@ import logging import pathlib import time +import warnings from decimal import Decimal from typing import Any, Dict, Generator, Mapping, Optional from urllib.parse import quote @@ -1275,6 +1276,11 @@ def query_as_arrow( Executes an ad-hoc query on the specified pipeline and returns the result as a generator that yields PyArrow RecordBatches. + Arrow IPC preserves SQL type fidelity that the JSON format cannot: + integers wider than 53 bits, ``MAP`` keys that are not strings, and + result sets where two columns share a name. Prefer this method over + :meth:`.query_as_json` when programmatically inspecting results. + :param pipeline_name: The name of the pipeline to query. :param query: The SQL query to be executed. :return: A generator that yields each query batch as a ``pyarrow.RecordBatch``. @@ -1297,18 +1303,13 @@ def query_as_arrow( finally: resp.close() - def query_as_json( + def _query_json_stream( self, pipeline_name: str, query: str ) -> Generator[Mapping[str, Any], None, None]: - """ - Executes an ad-hoc query on the specified pipeline and returns the result as a generator that yields - rows of the query as Python dictionaries. - All floating-point numbers are deserialized as Decimal objects to avoid precision loss. - - :param pipeline_name: The name of the pipeline to query. - :param query: The SQL query to be executed. - :return: A generator that yields each row of the result as a Python dictionary, deserialized from JSON. - """ + """Internal JSON streaming used by both `query_as_json` and the + higher-level dict-returning APIs. Does not emit a deprecation + warning so callers within Feldera can keep delegating to it + without making user code noisy.""" params = { "pipeline_name": pipeline_name, "sql": query, @@ -1325,6 +1326,39 @@ def query_as_json( if chunk: yield json.loads(chunk, parse_float=Decimal) + def query_as_json( + self, pipeline_name: str, query: str + ) -> Generator[Mapping[str, Any], None, None]: + """ + Executes an ad-hoc query on the specified pipeline and returns the result as a generator that yields + rows of the query as Python dictionaries. + + Finite floating-point numbers are decoded with ``decimal.Decimal`` + so the digits printed on the JSON line round-trip exactly. JSON + cannot represent ``NaN``, ``Infinity`` or ``-Infinity``; the + server emits those as ``null`` and they arrive here as ``None``. + + .. deprecated:: + The JSON format coerces numbers to ``f64`` (losing precision for + wide integers), drops ``NaN`` and infinities, cannot represent + SQL ``MAP`` keys that are not strings, and silently drops + result columns that share a name. Use + :meth:`.query_as_arrow` instead. See + https://github.com/feldera/feldera/issues/4219. + + :param pipeline_name: The name of the pipeline to query. + :param query: The SQL query to be executed. + :return: A generator that yields each row of the result as a Python dictionary, deserialized from JSON. + """ + warnings.warn( + "query_as_json is deprecated; switch to query_as_arrow for " + "type-faithful results. See " + "https://github.com/feldera/feldera/issues/4219.", + DeprecationWarning, + stacklevel=2, + ) + yield from self._query_json_stream(pipeline_name, query) + def input_connector_stats( self, pipeline_name: str, table_name: str, connector_name: str ) -> dict: diff --git a/python/tests/runtime/test_adhoc_queries.py b/python/tests/runtime/test_adhoc_queries.py index d7b169a2ae..1bc6cf95e9 100644 --- a/python/tests/runtime/test_adhoc_queries.py +++ b/python/tests/runtime/test_adhoc_queries.py @@ -272,3 +272,332 @@ def test_pipeline_adhoc_query_empty(self): c = rows[0].get("c") print(rows, rows[0], c) assert c == 0 + + +class TestAdhocQueriesArrow(SharedTestPipeline): + """Tests for the Arrow IPC adhoc query path. + + Arrow IPC is the recommended default because it preserves SQL type + fidelity that JSON cannot. See + https://github.com/feldera/feldera/issues/4219. + """ + + @property + def pipeline(self): + return self.p + + @sql(""" + CREATE TABLE t_dup ( + x INT NOT NULL, + y INT NOT NULL + ) WITH ( + 'materialized' = 'true', + 'connectors' = '[{"transport": {"name": "datagen", "config": {"plan": [{"limit": 1, + "fields": {"x": {"strategy": "uniform", "range": [10, 11]}, "y": {"strategy": "uniform", "range": [10, 11]}}}]}}}]' + ); + + CREATE TABLE s_dup ( + x INT NOT NULL, + y INT NOT NULL + ) WITH ( + 'materialized' = 'true', + 'connectors' = '[{"transport": {"name": "datagen", "config": {"plan": [{"limit": 1, + "fields": {"x": {"strategy": "uniform", "range": [20, 21]}, "y": {"strategy": "uniform", "range": [10, 11]}}}]}}}]' + ); + """) + def test_arrow_ipc_preserves_duplicate_column_names(self): + """https://github.com/feldera/feldera/issues/4218 + + ``SELECT T.x, S.x`` produces two output columns that both end up + with the unqualified name ``x``. The Arrow IPC schema preserves + both as positional fields, whereas JSON would silently drop one. + """ + self.pipeline.start() + self.pipeline.wait_for_completion() + + batches = list( + self.pipeline.query_arrow( + "SELECT t_dup.x, s_dup.x FROM t_dup, s_dup WHERE t_dup.y = s_dup.y" + ) + ) + assert batches, "expected at least one batch" + total_rows = sum(b.num_rows for b in batches) + assert total_rows == 1, f"expected exactly one joined row, got {total_rows}" + first = batches[0] + assert first.num_columns == 2, ( + f"arrow stream must keep both columns, got {first.num_columns}" + ) + # The arrow schema preserves both fields, even though their names + # collide. We don't pin their names because DataFusion may qualify + # them; what matters is that two distinct integer columns arrive. + assert first.column(0).to_pylist() != first.column(1).to_pylist() + + @sql( + """CREATE TABLE all_types ( + i8_col TINYINT NOT NULL, + i16_col SMALLINT NOT NULL, + i32_col INT NOT NULL, + i64_col BIGINT NOT NULL, + u8_col TINYINT UNSIGNED NOT NULL, + u16_col SMALLINT UNSIGNED NOT NULL, + u32_col INT UNSIGNED NOT NULL, + u64_col BIGINT UNSIGNED NOT NULL, + real_col REAL NOT NULL, + double_col DOUBLE NOT NULL, + dec_col DECIMAL(10, 2) NOT NULL, + bool_col BOOL NOT NULL, + str_col VARCHAR NOT NULL, + char_col CHAR(5) NOT NULL, + vbin_col VARBINARY NOT NULL, + dt_col DATE NOT NULL, + tm_col TIME NOT NULL, + ts_col TIMESTAMP NOT NULL, + uuid_col UUID NOT NULL, + arr_col INT ARRAY NOT NULL, + map_col MAP NOT NULL + ) WITH ('materialized' = 'true');""" + ) + def test_arrow_ipc_round_trips_common_sql_types(self): + """Smoke-test every commonly-used SQL primitive (and ARRAY plus + MAP with VARCHAR keys) through the Arrow IPC pipe. + + Wide integers (``BIGINT``) and high-precision decimals are the + cases the deprecated JSON path mishandles; we assert the values + come back exactly. Types ``ROW``, ``VARIANT``, and ``INTERVAL`` + are exercised separately or not at all here since they are not + plain column-level scalars in Feldera SQL. + """ + self.pipeline.start() + + # Use an integer larger than 2**53 (JSON's f64 precision boundary) + # to demonstrate a value we cannot represent in JSON. + wide_int = 9007199254740993 # 2**53 + 1 + # `execute` drains the generator so the INSERT actually runs. + self.pipeline.execute( + "INSERT INTO all_types VALUES (" + f"1, 2, 3, {wide_int}, " + "10, 20, 30, 40, " + "1.5, 3.141592653589793, " + "12345.67, " + "TRUE, 'hello', 'abc', " + "x'deadbeef', " + "DATE '2024-05-13', " + "TIME '12:34:56', " + "TIMESTAMP '2024-05-13 12:34:56', " + "'c32d330f-5757-4ada-bcf6-1fac2d54e37f', " + "ARRAY[10, 20, 30], " + "MAP {'a': 1, 'b': 2}" + ")" + ) + + batches = list( + self.pipeline.query_arrow("SELECT * FROM all_types ORDER BY i32_col") + ) + assert batches, "expected at least one batch" + rows = batches[0].to_pylist() + assert len(rows) == 1 + row = rows[0] + + assert row["i8_col"] == 1 + assert row["i16_col"] == 2 + assert row["i32_col"] == 3 + assert row["i64_col"] == wide_int, ( + f"BIGINT was {row['i64_col']!r}, expected {wide_int}" + ) + assert row["u8_col"] == 10 + assert row["u16_col"] == 20 + assert row["u32_col"] == 30 + assert row["u64_col"] == 40 + # REAL is f32; expect exact match on a value with an exact binary + # representation. + assert row["real_col"] == 1.5 + assert row["double_col"] == 3.141592653589793 + # DECIMAL(10, 2) keeps two fractional digits without loss for a + # value well within precision. + from decimal import Decimal + + assert row["dec_col"] == Decimal("12345.67") + assert row["bool_col"] is True + assert row["str_col"] == "hello" + assert row["char_col"] == "abc" + assert row["vbin_col"] == b"\xde\xad\xbe\xef" + import datetime + + assert row["dt_col"] == datetime.date(2024, 5, 13) + assert row["tm_col"] == datetime.time(12, 34, 56) + assert row["ts_col"] == datetime.datetime(2024, 5, 13, 12, 34, 56) + # UUIDs come back as their canonical string form. + assert row["uuid_col"] == "c32d330f-5757-4ada-bcf6-1fac2d54e37f" + assert row["arr_col"] == [10, 20, 30] + # MAP arrives as a list of (key, value) tuples + # because Arrow's `MapArray` preserves order. + assert sorted(row["map_col"]) == [("a", 1), ("b", 2)] + + @sql( + """CREATE TABLE int_keyed_map ( + m MAP NOT NULL + ) WITH ('materialized' = 'true');""" + ) + def test_arrow_ipc_map_with_non_string_keys(self): + """SQL ``MAP`` can have non-string keys; the JSON output format + fails outright because JSON object keys must be strings. Arrow + IPC carries the typed key through unchanged. + """ + self.pipeline.start() + + self.pipeline.execute( + "INSERT INTO int_keyed_map VALUES (MAP {1: 'one', 2: 'two'})" + ) + + # Arrow IPC: int keys preserved as ints. + batches = list(self.pipeline.query_arrow("SELECT * FROM int_keyed_map")) + assert batches, "expected at least one batch" + rows = batches[0].to_pylist() + assert len(rows) == 1 + assert sorted(rows[0]["m"]) == [(1, "one"), (2, "two")] + + # JSON: errors because the encoder cannot represent int keys. + import warnings + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + json_rows = list( + TEST_CLIENT.query_as_json( + self.pipeline.name, "SELECT * FROM int_keyed_map" + ) + ) + assert json_rows, "expected JSON to emit at least an error record" + assert "error" in json_rows[-1], ( + f"expected JSON encoder to surface an error, got {json_rows!r}" + ) + + @sql( + """CREATE TABLE has_variant ( + v VARIANT NOT NULL + ) WITH ('materialized' = 'true');""" + ) + def test_arrow_ipc_reads_variant(self): + """``VARIANT`` is a dynamically-typed JSON-shaped value. The + DataFusion ad-hoc layer rejects ``CAST(.. AS VARIANT)`` in + ``INSERT`` literals, so we populate via the JSON ingress path + and verify the value comes out over Arrow IPC as the canonical + JSON string. + """ + self.pipeline.start() + + self.pipeline.input_json("has_variant", [{"v": {"a": 1, "b": [2, 3]}}]) + + batches = list(self.pipeline.query_arrow("SELECT * FROM has_variant")) + assert batches, "expected at least one batch" + rows = batches[0].to_pylist() + assert len(rows) == 1 + # Arrow surfaces VARIANT as a UTF-8 string carrying the JSON + # representation; whitespace is normalised by the encoder. + assert rows[0]["v"] == '{"a":1,"b":[2,3]}' + + @sql( + """CREATE TABLE floats ( + tag VARCHAR NOT NULL, + d DOUBLE NOT NULL, + r REAL NOT NULL + ) WITH ('materialized' = 'true');""" + ) + def test_arrow_ipc_round_trips_inf_and_nan(self): + """``+Inf``, ``-Inf`` and ``NaN`` survive the Arrow IPC pipe as + Python floats. + """ + import math + + self.pipeline.start() + # SQL has no `Infinity` / `NaN` literals; produce them via + # division so the values reach Feldera as DOUBLE / REAL. + self.pipeline.execute( + "INSERT INTO floats VALUES " + "('pos_inf', 1.0/0.0, CAST( 1.0/0.0 AS REAL))," + "('neg_inf', -1.0/0.0, CAST(-1.0/0.0 AS REAL))," + "('nan', 0.0/0.0, CAST( 0.0/0.0 AS REAL))" + ) + + batches = list(self.pipeline.query_arrow("SELECT * FROM floats ORDER BY tag")) + assert batches, "expected at least one batch" + rows = {r["tag"]: r for r in batches[0].to_pylist()} + assert math.isinf(rows["pos_inf"]["d"]) and rows["pos_inf"]["d"] > 0 + assert math.isinf(rows["pos_inf"]["r"]) and rows["pos_inf"]["r"] > 0 + assert math.isinf(rows["neg_inf"]["d"]) and rows["neg_inf"]["d"] < 0 + assert math.isinf(rows["neg_inf"]["r"]) and rows["neg_inf"]["r"] < 0 + assert math.isnan(rows["nan"]["d"]) + assert math.isnan(rows["nan"]["r"]) + + +class TestAdhocReadAfterWrite(SharedTestPipeline): + """Regression tests for + https://github.com/feldera/feldera/issues/6243 . + + A multi-statement adhoc request must see its own intermediate + INSERTs in the trailing SELECT. The earlier bug was that the SELECT + ran against the snapshot captured before the request started, so + inserts in the same request stayed invisible. + """ + + @property + def pipeline(self): + return self.p + + @sql( + """CREATE TABLE example ( + id INT NOT NULL PRIMARY KEY + ) WITH ('materialized' = 'true');""" + ) + def test_insert_then_select_in_same_request_sees_inserts(self): + self.pipeline.start() + + rows = list( + self.pipeline.query( + "INSERT INTO example VALUES (2222);" + " INSERT INTO example VALUES (3333);" + " SELECT COUNT(*) AS c FROM example" + ) + ) + assert rows, "trailing SELECT must return a row" + assert rows[0].get("c") == 2, ( + f"trailing SELECT should see both intermediate inserts, got {rows!r}" + ) + + @sql( + """CREATE TABLE example2 ( + id INT NOT NULL PRIMARY KEY + ) WITH ('materialized' = 'true');""" + ) + def test_multi_statement_query_during_open_transaction(self): + """An ad-hoc request running inside a user transaction must + still see its own intermediate INSERTs in the trailing SELECT. + Adhoc reads pull from `trace_snapshots`, which updates on every + step regardless of transaction state. + """ + self.pipeline.start() + + # Seed one row outside the transaction as the baseline. + self.pipeline.execute("INSERT INTO example2 VALUES (1)") + + tid = self.pipeline.start_transaction() + rows_during = list( + self.pipeline.query( + "INSERT INTO example2 VALUES (2);" + " INSERT INTO example2 VALUES (3);" + " SELECT COUNT(*) AS c FROM example2" + ) + ) + self.pipeline.commit_transaction(transaction_id=tid, wait=True) + + assert rows_during, "trailing SELECT must return a row" + count_during = rows_during[0].get("c") + assert count_during == 3, ( + f"trailing SELECT inside the transaction must observe all " + f"three rows (baseline + two intermediate inserts), got {count_during}" + ) + + # After commit, a fresh adhoc query still sees the same three rows. + rows_after = list(self.pipeline.query("SELECT COUNT(*) AS c FROM example2")) + assert rows_after and rows_after[0].get("c") == 3, ( + f"after commit, all three inserts must be visible, got {rows_after!r}" + )