Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions python/feldera/_callback_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(
view_name: str,
callback: Callable[[pd.DataFrame, int], None],
queue: Optional[Queue],
case_sensitive: bool,
):
super().__init__()
self.daemon = True
Expand All @@ -30,6 +31,7 @@ def __init__(
self.callback: Callable[[pd.DataFrame, int], None] = callback
self.queue: Optional[Queue] = queue
self.schema: Optional[dict] = None
self.case_sensitive: bool = case_sensitive

def run(self):
"""
Expand All @@ -50,7 +52,8 @@ def run(self):

if self.schema is None:
raise ValueError(
f"Table or View {self.view_name} not found in the pipeline schema."
f"Table or View {
self.view_name} not found in the pipeline schema."
)

# by default, we assume that the pipeline has been started
Expand All @@ -66,7 +69,10 @@ def run(self):
case _CallbackRunnerInstruction.PipelineStarted:
# listen to the pipeline
gen_obj = self.client.listen_to_pipeline(
self.pipeline_name, self.view_name, format="json"
self.pipeline_name,
self.view_name,
format="json",
case_sensitive=self.case_sensitive,
)

# if there is a queue set up, inform the main thread that the listener has been started, and it can
Expand Down
8 changes: 7 additions & 1 deletion python/feldera/output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def __init__(
pipeline_name: str,
view_name: str,
queue: Optional[Queue],
case_sensitive: bool,
):
"""
Initializes the output handler, but doesn't start it.
Expand All @@ -32,7 +33,12 @@ def callback(df: pd.DataFrame, _: int):

# sets up the callback runner
self.handler = CallbackRunner(
self.client, self.pipeline_name, self.view_name, callback, queue
self.client,
self.pipeline_name,
self.view_name,
callback,
queue,
case_sensitive,
)

def start(self):
Expand Down
61 changes: 57 additions & 4 deletions python/feldera/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from datetime import datetime

import pandas
import warnings
import pyarrow

from typing import List, Dict, Callable, Optional, Generator, Mapping, Any
from collections import deque
Expand Down Expand Up @@ -204,7 +206,7 @@ def resume_connector(self, table_name: str, connector_name: str):

self.client.resume_connector(self.name, table_name, connector_name)

def listen(self, view_name: str) -> OutputHandler:
def listen(self, view_name: str, case_sensitive: bool = False) -> OutputHandler:
Comment thread
gz marked this conversation as resolved.
"""
Follow the change stream (i.e., the output) of the provided view.
Returns an output handler to read the changes.
Expand All @@ -215,6 +217,7 @@ def listen(self, view_name: str) -> OutputHandler:
If this method is called once the pipeline has started, you will only get the output from that point onwards.

:param view_name: The name of the view to listen to.
:param case_sensitive: True if the view name is case sensitive.
"""

queue: Optional[Queue] = None
Expand All @@ -223,13 +226,18 @@ def listen(self, view_name: str) -> OutputHandler:
queue = Queue(maxsize=1)
self.views_tx.append({view_name: queue})

handler = OutputHandler(self.client, self.name, view_name, queue)
handler = OutputHandler(
self.client, self.name, view_name, queue, case_sensitive
)
handler.start()

return handler

def foreach_chunk(
self, view_name: str, callback: Callable[[pandas.DataFrame, int], None]
self,
view_name: str,
callback: Callable[[pandas.DataFrame, int], None],
case_sensitive: bool = False,
):
"""
Run the given callback on each chunk of the output of the specified view.
Expand All @@ -244,6 +252,8 @@ def foreach_chunk(
- **seq_no** -> The sequence number. The sequence number is a monotonically increasing integer that
starts from 0. Note that the sequence number is unique for each chunk, but not necessarily contiguous.

:param case_sensitive: True if the view name is case sensitive.

Please note that the callback is run in a separate thread, so it should be thread-safe.
Please note that the callback should not block for a long time, as by default, backpressure is enabled and
will block the pipeline.
Expand All @@ -259,7 +269,9 @@ def foreach_chunk(
queue = Queue(maxsize=1)
self.views_tx.append({view_name: queue})

handler = CallbackRunner(self.client, self.name, view_name, callback, queue)
handler = CallbackRunner(
self.client, self.name, view_name, callback, queue, case_sensitive
)
handler.start()

def wait_for_completion(
Expand Down Expand Up @@ -692,9 +704,50 @@ def query(self, query: str) -> Generator[Mapping[str, Any], None, None]:
:raises FelderaAPIError: If querying a non materialized table or view.
:raises FelderaAPIError: If the query is invalid.
"""
warnings.warn(
"function query is deprecated and will be removed in a future version",
category=DeprecationWarning,
stacklevel=2,
)

return self.client.query_as_json(self.name, query)

def query_pyarrow(self, query: str) -> pyarrow.Table:
"""
Executes an ad-hoc SQL query on this pipeline and returns a
:class:`.pyarrow.Table` containing all the results.

Note:
You can only ``SELECT`` from materialized tables and views.

:param query: The SQL query to be executed.
:param path: The path of the parquet file.

:raises FelderaAPIError: If the pipeline is not in a RUNNING or PAUSED state.
:raises FelderaAPIError: If querying a non materialized table or view.
:raises FelderaAPIError: If the query is invalid.
"""

return self.client.query_as_pyarrow(self.name, query)

def query_pylist(self, query: str) -> List[Mapping[str, Any]]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like given this return type suffers from the same problem as the json (aliased columns will be removed?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it does. It doesn't suffer from it if we return pyarrow.Table.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok probably not a good return type then, can we remove it?

Its stupid but unless we change our SQL to not accept this nonsense I dont see a good alternative cc @mihaibudiu

Copy link
Copy Markdown
Contributor

@gz gz Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe instead if str as key you can have a Column Type that hashes using unique identifiers/position in the select stmt but displays as the column name

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be a list, not a mapping

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is mostly for convenience and works in most cases.
Should I remove it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to remove but either use a key that has a unique hash like the position or list of list

"""
Executes an ad-hoc SQL query on this pipeline and returns a python
dictionary containing all the results.

Note:
You can only ``SELECT`` from materialized tables and views.

:param query: The SQL query to be executed.
:param path: The path of the parquet file.

:raises FelderaAPIError: If the pipeline is not in a RUNNING or PAUSED state.
:raises FelderaAPIError: If querying a non materialized table or view.
:raises FelderaAPIError: If the query is invalid.
"""

return self.query_pyarrow(self.name, query).to_pylist()

def query_parquet(self, query: str, path: str):
"""
Executes an ad-hoc SQL query on this pipeline and saves the result to the specified path as a parquet file.
Expand Down
61 changes: 52 additions & 9 deletions python/feldera/rest/feldera_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import pathlib
from typing import Any, Dict, Optional
import logging
import warnings
import time
import json
from decimal import Decimal
from typing import Generator

import pyarrow

from feldera.rest.config import Config
from feldera.rest.feldera_config import FelderaConfig
from feldera.rest.errors import FelderaTimeoutError
Expand Down Expand Up @@ -124,9 +127,8 @@ def __wait_for_compilation(self, name: str):
if sql_errors:
err_msg = f"Pipeline {name} failed to compile:\n"
for sql_error in sql_errors:
err_msg += (
f"{sql_error['error_type']}\n{sql_error['message']}\n"
)
err_msg += f"{sql_error['error_type']}\n{
sql_error['message']}\n"
err_msg += f"Code snippet:\n{sql_error['snippet']}"
raise RuntimeError(err_msg)

Expand Down Expand Up @@ -239,7 +241,8 @@ def start_pipeline(self, pipeline_name: str, timeout_s: Optional[float] = 300):
elapsed = time.monotonic() - start_time
if elapsed > timeout_s:
raise TimeoutError(
f"Timed out waiting for pipeline {pipeline_name} to start"
f"Timed out waiting for pipeline {
pipeline_name} to start"
)

resp = self.get_pipeline(pipeline_name)
Expand Down Expand Up @@ -290,7 +293,8 @@ def pause_pipeline(
elapsed = time.monotonic() - start_time
if elapsed > timeout_s:
raise TimeoutError(
f"Timed out waiting for pipeline {pipeline_name} to pause"
f"Timed out waiting for pipeline {
pipeline_name} to pause"
)

resp = self.get_pipeline(pipeline_name)
Expand Down Expand Up @@ -340,7 +344,8 @@ def shutdown_pipeline(self, pipeline_name: str, timeout_s: Optional[float] = 300
time.sleep(0.1)

raise FelderaTimeoutError(
f"timeout error: pipeline '{pipeline_name}' did not shutdown in {timeout_s} seconds"
f"timeout error: pipeline '{
pipeline_name}' did not shutdown in {timeout_s} seconds"
)

def suspend_pipeline(self, pipeline_name: str, timeout_s: Optional[float] = 300):
Expand Down Expand Up @@ -379,7 +384,8 @@ def suspend_pipeline(self, pipeline_name: str, timeout_s: Optional[float] = 300)
time.sleep(0.1)

raise FelderaTimeoutError(
f"timeout error: pipeline '{pipeline_name}' did not suspend in {timeout_s} seconds"
f"timeout error: pipeline '{
pipeline_name}' did not suspend in {timeout_s} seconds"
)

def checkpoint_pipeline(self, pipeline_name: str) -> int:
Expand Down Expand Up @@ -531,6 +537,7 @@ def listen_to_pipeline(
pipeline_name: str,
table_name: str,
format: str,
case_sensitive: bool = False,
backpressure: bool = True,
array: bool = False,
timeout: Optional[float] = None,
Expand Down Expand Up @@ -559,6 +566,9 @@ def listen_to_pipeline(
if format == "json":
params["array"] = _prepare_boolean_input(array)

if case_sensitive:
table_name = f'"{table_name}"'

resp = self.http.post(
path=f"/pipelines/{pipeline_name}/egress/{table_name}",
params=params,
Expand Down Expand Up @@ -650,6 +660,13 @@ def query_as_json(
:param query: The SQL query to be executed.
:return: A generator that yields each row of the result as a Python dictionary, deserialized from JSON.
"""

warnings.warn(
"function query_as_json is deprecated and will be removed in a future version",
category=DeprecationWarning,
stacklevel=2,
)

params = {
"pipeline_name": pipeline_name,
"sql": query,
Expand All @@ -666,6 +683,30 @@ def query_as_json(
if chunk:
yield json.loads(chunk, parse_float=Decimal)

def query_as_pyarrow(self, pipeline_name: str, query: str) -> pyarrow.Table:
"""
Executes an ad-hoc query on the specified pipeline and returns a pyarrow
Table consisting of all the data.

:param pipeline_name: The name of the pipeline to query.
:param query: The SQL query to be executed.
:return: A pyarrow.Table consisting of all the records.
"""
params = {
"pipeline_name": pipeline_name,
"sql": query,
"format": "arrow_ipc",
}

resp = self.http.get(
path=f"/pipelines/{pipeline_name}/query",
params=params,
stream=True,
)

with pyarrow.ipc.RecordBatchStreamReader(resp.raw) as reader:
return reader.read_all()

def pause_connector(self, pipeline_name, table_name, connector_name):
"""
Pause the specified input connector.
Expand All @@ -685,7 +726,8 @@ def pause_connector(self, pipeline_name, table_name, connector_name):
"""

self.http.post(
path=f"/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/pause",
path=f"/pipelines/{pipeline_name}/tables/{
table_name}/connectors/{connector_name}/pause",
)

def resume_connector(
Expand All @@ -709,7 +751,8 @@ def resume_connector(
"""

self.http.post(
path=f"/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/start",
path=f"/pipelines/{pipeline_name}/tables/{
table_name}/connectors/{connector_name}/start",
)

def get_config(self) -> FelderaConfig:
Expand Down
1 change: 1 addition & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies = [
"numpy>=2.2.4",
"pretty-errors",
"ruff>=0.6.9",
"pyarrow>=20.0.0",
]
[project.urls]
Homepage = "https://www.feldera.com"
Expand Down
29 changes: 29 additions & 0 deletions python/tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import uuid
import threading

from decimal import Decimal
from tests import TEST_CLIENT

from feldera.rest.pipeline import Pipeline
Expand Down Expand Up @@ -236,6 +237,34 @@ def test_adhoc_query_json(self):
TEST_CLIENT.shutdown_pipeline(name)
TEST_CLIENT.delete_pipeline(name)

def test_adhoc_query_pyarrow(self):
Comment thread
gz marked this conversation as resolved.
data = "1\n2\n"
name = str(uuid.uuid4())

sql = """
CREATE TABLE tbl(id INT) with ('materialized' = 'true');
CREATE MATERIALIZED VIEW v0 AS SELECT id, '3.14'::DECIMAL(5, 2) as d, '3.14159'::DOUBLE as dbl, MAP[1, 10] as m FROM tbl;
"""

pipeline = Pipeline(name, sql, "", "", {}, {})
pipeline = TEST_CLIENT.create_pipeline(pipeline)

TEST_CLIENT.start_pipeline(name)

TEST_CLIENT.push_to_pipeline(name, "tbl", "csv", data)
resp = TEST_CLIENT.query_as_pyarrow(pipeline.name, "select * from v0")
expected = [
{"id": 2, "d": Decimal("3.14"), "dbl": 3.14159, "m": [(1, 10)]},
{"id": 1, "d": Decimal("3.14"), "dbl": 3.14159, "m": [(1, 10)]},
]
got = resp.to_pylist()
print(got)

self.assertCountEqual(got, expected)

TEST_CLIENT.shutdown_pipeline(name)
TEST_CLIENT.delete_pipeline(name)


if __name__ == "__main__":
unittest.main()
Loading
Loading