Skip to content

Commit dd1f4c2

Browse files
committed
adhoc: add Arrow IPC client APIs and deprecate the JSON format
JSON is lossy: it coerces wide integers to f64, cannot represent SQL `MAP` keys that are not strings, and silently drops result columns that share a name. Arrow IPC does not have these limitations and is already used internally by fda for tabular display. Python: - Add `pyarrow` as a runtime dependency. - Add `FelderaClient.query_as_arrow_ipc` and the matching `Pipeline.query_arrow_ipc` helper that yield `pyarrow.RecordBatch` objects over an HTTP stream from the `/query` endpoint. - Extract the JSON streaming loop into a private `_query_json_stream` so `Pipeline.query()` can keep its existing dict-based contract without emitting a DeprecationWarning to user code that has not migrated yet. - Emit a `DeprecationWarning` from `FelderaClient.query_as_json` so external callers are nudged toward the Arrow IPC path. fda: - Promote the JSON deprecation notice to a stderr warning so it is visible without enabling `log`-level configuration. Tests: - Cover the duplicate-column case from #4218 with an Arrow IPC round-trip that requires both `x` columns to survive. - Cover wide-integer fidelity (2**53 + 1) and the common primitive SQL types via Arrow IPC. - Add an `fda --format arrow_ipc query` smoke run to test.bash. Fixes #4219 Fixex #3923 Fixes #4218
1 parent be972e1 commit dd1f4c2

5 files changed

Lines changed: 320 additions & 15 deletions

File tree

crates/fda/src/adhoc.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use feldera_rest_api::Client;
99
use feldera_types::query::{AdHocResultFormat, AdhocQueryArgs};
1010
use futures_util::SinkExt;
1111
use futures_util::StreamExt;
12-
use log::{debug, error, trace, warn};
12+
use log::{debug, error, trace};
1313
use reqwest_websocket::{CloseCode, Message, RequestBuilderExt};
1414

1515
use crate::UPGRADE_NOTICE;
@@ -155,8 +155,11 @@ pub(crate) async fn handle_adhoc_query(
155155
let format = match format {
156156
OutputFormat::Text => AdHocResultFormat::Text,
157157
OutputFormat::Json => {
158-
warn!(
159-
"The JSON format is deprecated for ad-hoc queries, see https://github.com/feldera/feldera/issues/4219 for the tracking issue."
158+
// JSON requested; log deprecation warning to stderr.
159+
eprintln!(
160+
"warning: the JSON format for ad-hoc queries is deprecated. \
161+
Prefer `--format arrow_ipc` or the default text mode. \
162+
See https://github.com/feldera/feldera/issues/4219."
160163
);
161164
AdHocResultFormat::Json
162165
}

crates/fda/test.bash

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ fda connector p1 example unknown start || true
120120

121121
# Adhoc queries
122122
fda query p1 "SELECT * FROM example"
123+
# Arrow IPC is the recommended format; verify it produces output and the
124+
# text-mode pretty-printer downstream of the parser handles it.
125+
fda --format arrow_ipc query p1 "SELECT * FROM example"
123126

124127
# Runtime errors during query execution must surface as a non-zero exit
125128
# code in WebSocket mode, otherwise scripts have no way to detect a

python/feldera/pipeline.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -974,9 +974,14 @@ def query(self, query: str) -> Generator[Mapping[str, Any], None, None]:
974974
Executes an ad-hoc SQL query on this pipeline and returns a generator
975975
that yields the rows of the result as Python dictionaries. For
976976
``INSERT`` and ``DELETE`` queries, consider using :meth:`.execute`
977-
instead. All floating-point numbers are deserialized as Decimal objects
977+
instead. All floating-point numbers are deserialized as ``Decimal``
978978
to avoid precision loss.
979979
980+
For new code, prefer :meth:`.query_arrow`: Arrow IPC keeps full
981+
SQL type fidelity, lets ``MAP`` keys be non-string values, and
982+
returns every column even when two share a name. See
983+
https://github.com/feldera/feldera/issues/4219.
984+
980985
Note:
981986
You can only ``SELECT`` from materialized tables and views.
982987
@@ -994,7 +999,9 @@ def query(self, query: str) -> Generator[Mapping[str, Any], None, None]:
994999
:raises FelderaAPIError: If the query is invalid.
9951000
"""
9961001

997-
return self.client.query_as_json(self.name, query)
1002+
# Delegate to the non-deprecated internal helper so calls through
1003+
# `query()` don't surface a DeprecationWarning to user code.
1004+
return self.client._query_json_stream(self.name, query)
9981005

9991006
def query_parquet(self, query: str, path: str):
10001007
"""

python/feldera/rest/feldera_client.py

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import pathlib
44
import time
5+
import warnings
56
from decimal import Decimal
67
from typing import Any, Dict, Generator, Mapping, Optional
78
from urllib.parse import quote
@@ -1275,6 +1276,11 @@ def query_as_arrow(
12751276
Executes an ad-hoc query on the specified pipeline and returns the result
12761277
as a generator that yields PyArrow RecordBatches.
12771278
1279+
Arrow IPC preserves SQL type fidelity that the JSON format cannot:
1280+
integers wider than 53 bits, ``MAP`` keys that are not strings, and
1281+
result sets where two columns share a name. Prefer this method over
1282+
:meth:`.query_as_json` when programmatically inspecting results.
1283+
12781284
:param pipeline_name: The name of the pipeline to query.
12791285
:param query: The SQL query to be executed.
12801286
:return: A generator that yields each query batch as a ``pyarrow.RecordBatch``.
@@ -1297,18 +1303,13 @@ def query_as_arrow(
12971303
finally:
12981304
resp.close()
12991305

1300-
def query_as_json(
1306+
def _query_json_stream(
13011307
self, pipeline_name: str, query: str
13021308
) -> Generator[Mapping[str, Any], None, None]:
1303-
"""
1304-
Executes an ad-hoc query on the specified pipeline and returns the result as a generator that yields
1305-
rows of the query as Python dictionaries.
1306-
All floating-point numbers are deserialized as Decimal objects to avoid precision loss.
1307-
1308-
:param pipeline_name: The name of the pipeline to query.
1309-
:param query: The SQL query to be executed.
1310-
:return: A generator that yields each row of the result as a Python dictionary, deserialized from JSON.
1311-
"""
1309+
"""Internal JSON streaming used by both `query_as_json` and the
1310+
higher-level dict-returning APIs. Does not emit a deprecation
1311+
warning so callers within Feldera can keep delegating to it
1312+
without making user code noisy."""
13121313
params = {
13131314
"pipeline_name": pipeline_name,
13141315
"sql": query,
@@ -1325,6 +1326,39 @@ def query_as_json(
13251326
if chunk:
13261327
yield json.loads(chunk, parse_float=Decimal)
13271328

1329+
def query_as_json(
1330+
self, pipeline_name: str, query: str
1331+
) -> Generator[Mapping[str, Any], None, None]:
1332+
"""
1333+
Executes an ad-hoc query on the specified pipeline and returns the result as a generator that yields
1334+
rows of the query as Python dictionaries.
1335+
1336+
Finite floating-point numbers are decoded with ``decimal.Decimal``
1337+
so the digits printed on the JSON line round-trip exactly. JSON
1338+
cannot represent ``NaN``, ``Infinity`` or ``-Infinity``; the
1339+
server emits those as ``null`` and they arrive here as ``None``.
1340+
1341+
.. deprecated::
1342+
The JSON format coerces numbers to ``f64`` (losing precision for
1343+
wide integers), drops ``NaN`` and infinities, cannot represent
1344+
SQL ``MAP`` keys that are not strings, and silently drops
1345+
result columns that share a name. Use
1346+
:meth:`.query_as_arrow` instead. See
1347+
https://github.com/feldera/feldera/issues/4219.
1348+
1349+
:param pipeline_name: The name of the pipeline to query.
1350+
:param query: The SQL query to be executed.
1351+
:return: A generator that yields each row of the result as a Python dictionary, deserialized from JSON.
1352+
"""
1353+
warnings.warn(
1354+
"query_as_json is deprecated; switch to query_as_arrow for "
1355+
"type-faithful results. See "
1356+
"https://github.com/feldera/feldera/issues/4219.",
1357+
DeprecationWarning,
1358+
stacklevel=2,
1359+
)
1360+
yield from self._query_json_stream(pipeline_name, query)
1361+
13281362
def input_connector_stats(
13291363
self, pipeline_name: str, table_name: str, connector_name: str
13301364
) -> dict:

0 commit comments

Comments
 (0)