Skip to content

Commit dd7de66

Browse files
committed
[py] add Pipeline.query() and Pipeline.query_tabular() for adhoc queries
Adds methods to run adhoc queries against running or paused pipelines. In FelderaClient: - adds query(), query_as_text(), query_as_json() & query_as_parquet() methods In Pipeline: - adds query() and query_tabular() methods A new enum is also added to represent the different formats of responses for ad-hoc queries: `QueryResponseFormat`. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent 193b770 commit dd7de66

File tree

6 files changed

+245
-19
lines changed

6 files changed

+245
-19
lines changed

python/feldera/enums.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,3 +184,18 @@ def from_str(value):
184184

185185
def __eq__(self, other):
186186
return self.value == other.value
187+
188+
189+
class QueryResponseFormat(Enum):
190+
"""
191+
The format of the response returned for the output of a query.
192+
"""
193+
194+
JSON = 1
195+
"""
196+
The output is serialized as JSON.
197+
The response is deserialized into a python dictionary.
198+
199+
"""
200+
201+
PARQUET = 2

python/feldera/pipeline.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import time
22
import pandas
33

4-
from typing import List, Dict, Callable, Optional
4+
from typing import List, Dict, Callable, Optional, Generator, Mapping, Any
55
from queue import Queue
66

77
from feldera.rest.errors import FelderaAPIError
8-
from feldera.enums import PipelineStatus
8+
from feldera.enums import PipelineStatus, QueryResponseFormat
99
from feldera.rest.pipeline import Pipeline as InnerPipeline
1010
from feldera.rest.feldera_client import FelderaClient
1111
from feldera._callback_runner import _CallbackRunnerInstruction, CallbackRunner
@@ -344,3 +344,39 @@ def get(name: str, client: FelderaClient) -> 'Pipeline':
344344
except FelderaAPIError as err:
345345
if err.status_code == 404:
346346
raise RuntimeError(f"Pipeline with name {name} not found")
347+
348+
def query(self, query: str, fmt: QueryResponseFormat) -> Generator[Mapping[str, Any], None, None]:
349+
"""
350+
Executes an ad-hoc SQL query on this pipeline and returns the result in the specified format.
351+
352+
:param query: The SQL query to be executed.
353+
:param fmt: An instance of the :class:`.QueryResponseFormat` enum, specifying the output format:
354+
355+
- `QueryResponseFormat.JSON`: Returns a generator that yields rows as Python dictionaries.
356+
- `QueryResponseFormat.PARQUET`: Returns the result as a binary blob in Parquet format.
357+
358+
:return: A generator that yields the rows of the result as Python dictionaries when the format is JSON,
359+
or binary data when the format is Parquet.
360+
"""
361+
362+
if self.status() not in [
363+
PipelineStatus.RUNNING,
364+
PipelineStatus.PAUSED,
365+
]:
366+
raise RuntimeError("Pipeline must be running or paused to run a query")
367+
return self.client.query(self.name, query, fmt.name)
368+
369+
def query_tabular(self, query: str) -> str:
370+
"""
371+
Executes a SQL query on this pipeline and returns the result as a formatted string.
372+
373+
:param query: The SQL query to be executed.
374+
:return: A string representing the query result in a human-readable, tabular format.
375+
"""
376+
377+
if self.status() not in [
378+
PipelineStatus.RUNNING,
379+
PipelineStatus.PAUSED,
380+
]:
381+
raise RuntimeError("Pipeline must be running or paused to run a query")
382+
return self.client.query_as_text(self.name, query)

python/feldera/rest/_httprequests.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def send_request(
6262
timeout=timeout,
6363
headers=headers,
6464
params=params,
65+
stream=stream,
6566
)
6667
elif isinstance(body, bytes):
6768
request = http_method(
@@ -81,8 +82,14 @@ def send_request(
8182
params=params,
8283
stream=stream,
8384
)
84-
if stream:
85-
return request
85+
86+
if stream:
87+
return request
88+
if request.headers.get("content-type") == "text/plain":
89+
return request.text
90+
elif request.headers.get("content-type") == "application/octet-stream":
91+
return request.content
92+
8693
resp = self.__validate(request)
8794
logging.debug("got response: %s", str(resp))
8895
return resp
@@ -95,9 +102,10 @@ def send_request(
95102
def get(
96103
self,
97104
path: str,
98-
params: Optional[Mapping[str, Any]] = None
105+
params: Optional[Mapping[str, Any]] = None,
106+
stream: bool = False,
99107
) -> Any:
100-
return self.send_request(requests.get, path, params)
108+
return self.send_request(requests.get, path, params=params, stream=stream)
101109

102110
def post(
103111
self,

python/feldera/rest/feldera_client.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import time
44
import json
55
from decimal import Decimal
6+
from typing import Generator
67

78
from feldera.rest.config import Config
89
from feldera.rest.pipeline import Pipeline
@@ -375,3 +376,94 @@ def listen_to_pipeline(
375376
break
376377
if chunk:
377378
yield json.loads(chunk, parse_float=Decimal)
379+
380+
def query(self, pipeline_name: str, query: str, fmt: str) -> str | bytes | Generator[dict, None, None]:
381+
"""
382+
Executes an ad-hoc query on the specified data pipeline.
383+
384+
:param pipeline_name: The name of the pipeline to query.
385+
:param query: The SQL query to be executed.
386+
:param fmt: The format in which to return the query result:
387+
388+
- "text": Returns a string in tabular format representing the query result.
389+
- "parquet": Returns a binary blob of data in Parquet format, which can be saved as a file.
390+
- "json": Returns a generator that yields each row of the result as a Python dictionary.
391+
392+
:return: Depending on the format (`fmt`) provided:
393+
394+
- For "text": A string representing the query result in tabular form.
395+
- For "parquet": A binary blob representing the query result in Parquet format.
396+
- For "json": A generator that produces dictionaries for each row in the query result.
397+
"""
398+
399+
match fmt.lower():
400+
case "text":
401+
return self.query_as_text(pipeline_name, query)
402+
case "parquet":
403+
return self.query_as_parquet(pipeline_name, query)
404+
case _:
405+
return self.query_as_json(pipeline_name, query)
406+
407+
def query_as_text(self, pipeline_name: str, query: str) -> str:
408+
"""
409+
Executes an ad-hoc query on the specified pipeline and returns the result as a formatted text table.
410+
411+
:param pipeline_name: The name of the pipeline to query.
412+
:param query: The SQL query to be executed.
413+
:return: A string containing the query result in tabular format.
414+
"""
415+
params = {
416+
"pipeline_name": pipeline_name,
417+
"sql": query,
418+
"format": "text",
419+
}
420+
421+
return self.http.get(
422+
path=f"/pipelines/{pipeline_name}/query",
423+
params=params,
424+
)
425+
426+
def query_as_parquet(self, pipeline_name: str, query: str) -> bytes:
427+
"""
428+
Executes an ad-hoc query on the specified pipeline and returns the result as a Parquet binary blob.
429+
430+
:param pipeline_name: The name of the pipeline to query.
431+
:param query: The SQL query to be executed.
432+
:return: A binary blob representing the query result in Parquet format, which can be saved as a file.
433+
"""
434+
params = {
435+
"pipeline_name": pipeline_name,
436+
"sql": query,
437+
"format": "parquet",
438+
}
439+
440+
return self.http.get(
441+
path=f"/pipelines/{pipeline_name}/query",
442+
params=params,
443+
)
444+
445+
def query_as_json(self, pipeline_name: str, query: str) -> Generator[dict, None, None]:
446+
"""
447+
Executes an ad-hoc query on the specified pipeline and returns the result as a generator that yields
448+
rows of the query as Python dictionaries.
449+
450+
:param pipeline_name: The name of the pipeline to query.
451+
:param query: The SQL query to be executed.
452+
:return: A generator that yields each row of the result as a Python dictionary, deserialized from JSON.
453+
"""
454+
params = {
455+
"pipeline_name": pipeline_name,
456+
"sql": query,
457+
"format": "json",
458+
}
459+
460+
resp = self.http.get(
461+
path=f"/pipelines/{pipeline_name}/query",
462+
params=params,
463+
stream=True,
464+
)
465+
466+
for chunk in resp.iter_lines(chunk_size=50000000):
467+
if chunk:
468+
yield json.loads(chunk, parse_float=Decimal)
469+

python/tests/test_pipeline.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def test_get_pipeline_stats(self):
106106
TEST_CLIENT.shutdown_pipeline(name)
107107
TEST_CLIENT.delete_pipeline(name)
108108

109-
def __listener(self, name: str) -> bool:
109+
def __listener(self, name: str):
110110

111111
gen_obj = TEST_CLIENT.listen_to_pipeline(
112112
pipeline_name=name,
@@ -145,6 +145,79 @@ def test_listen_to_pipeline(self):
145145
TEST_CLIENT.shutdown_pipeline(name)
146146
TEST_CLIENT.delete_pipeline(name)
147147

148+
def test_adhoc_query_text(self):
149+
data = "1\n2\n"
150+
name = str(uuid.uuid4())
151+
152+
sql = f"""
153+
CREATE TABLE tbl(id INT) with ('materialized' = 'true');
154+
"""
155+
156+
pipeline = Pipeline(name, sql, {}, {})
157+
pipeline = TEST_CLIENT.create_pipeline(pipeline)
158+
159+
TEST_CLIENT.start_pipeline(name)
160+
161+
TEST_CLIENT.push_to_pipeline(name, "tbl", "csv", data)
162+
tbl = TEST_CLIENT.query(pipeline.name, "SELECT * FROM tbl", "text")
163+
TEST_CLIENT.shutdown_pipeline(name)
164+
TEST_CLIENT.delete_pipeline(name)
165+
166+
expected = """+----+
167+
| id |
168+
+----+
169+
| 2 |
170+
| 1 |
171+
+----+"""
172+
173+
assert tbl == expected
174+
175+
def test_adhoc_query_parquet(self):
176+
data = "1\n2\n"
177+
name = str(uuid.uuid4())
178+
179+
sql = f"""
180+
CREATE TABLE tbl(id INT) with ('materialized' = 'true');
181+
"""
182+
183+
pipeline = Pipeline(name, sql, {}, {})
184+
pipeline = TEST_CLIENT.create_pipeline(pipeline)
185+
186+
TEST_CLIENT.start_pipeline(name)
187+
188+
TEST_CLIENT.push_to_pipeline(name, "tbl", "csv", data)
189+
got: bytes = TEST_CLIENT.query(pipeline.name, "SELECT * FROM tbl", "parquet")
190+
TEST_CLIENT.shutdown_pipeline(name)
191+
TEST_CLIENT.delete_pipeline(name)
192+
193+
expected = b'PAR1\x15\x04\x15\x10\x15\x14L\x15\x04\x15\x00\x12'
194+
195+
assert got.find(expected) == 0
196+
197+
def test_adhoc_query_json(self):
198+
data = "1\n2\n"
199+
name = str(uuid.uuid4())
200+
201+
sql = f"""
202+
CREATE TABLE tbl(id INT) with ('materialized' = 'true');
203+
"""
204+
205+
pipeline = Pipeline(name, sql, {}, {})
206+
pipeline = TEST_CLIENT.create_pipeline(pipeline)
207+
208+
TEST_CLIENT.start_pipeline(name)
209+
210+
TEST_CLIENT.push_to_pipeline(name, "tbl", "csv", data)
211+
got = TEST_CLIENT.query(pipeline.name, "SELECT * FROM tbl", "json")
212+
213+
expected = [{"id": 2}, {"id": 1}]
214+
215+
for d, e in zip(got, expected):
216+
assert d == e
217+
218+
TEST_CLIENT.shutdown_pipeline(name)
219+
TEST_CLIENT.delete_pipeline(name)
220+
148221

149222
if __name__ == '__main__':
150223
unittest.main()

python/tests/test_variant.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from tests import TEST_CLIENT
88
from decimal import Decimal
99

10+
1011
class TestVariant(unittest.TestCase):
1112
def test_local(self):
1213
sql = f"""
@@ -33,9 +34,9 @@ def test_local(self):
3334
pipeline = PipelineBuilder(TEST_CLIENT, name="test_variant", sql=sql).create_or_replace()
3435

3536
input_strings = [
36-
{"json":"{\"name\":\"Bob\",\"scores\":[8,10]}"},
37-
{"json":"{\"name\":\"Dunce\",\"scores\":[3,4]}"},
38-
{"json":"{\"name\":\"John\",\"scores\":[9,10]}"}
37+
{"json": "{\"name\":\"Bob\",\"scores\":[8,10]}"},
38+
{"json": "{\"name\":\"Dunce\",\"scores\":[3,4]}"},
39+
{"json": "{\"name\":\"John\",\"scores\":[9,10]}"}
3940
]
4041

4142
input_json = [
@@ -47,25 +48,25 @@ def test_local(self):
4748
expected_strings = [j | {"insert_delete": 1} for j in input_strings]
4849

4950
expected_average = [
50-
{ "name": "Bob", "average": Decimal(9) },
51-
{ "name": "Dunce", "average": Decimal(3.5) },
52-
{ "name": "John", "average": Decimal(9.5) }
51+
{"name": "Bob", "average": Decimal(9)},
52+
{"name": "Dunce", "average": Decimal(3.5)},
53+
{"name": "John", "average": Decimal(9.5)}
5354
]
5455
for datum in expected_average:
5556
datum.update({"insert_delete": 1})
5657

5758
expected_typed = [
58-
{ "name": "Bob", "scores": [8, 10] },
59-
{ "name": "Dunce", "scores": [3, 4] },
60-
{ "name": "John", "scores": [9, 10] }
59+
{"name": "Bob", "scores": [8, 10]},
60+
{"name": "Dunce", "scores": [3, 4]},
61+
{"name": "John", "scores": [9, 10]}
6162
]
6263
for datum in expected_typed:
6364
datum.update({"insert_delete": 1})
6465

6566
expected_variant = [
66-
{"json": { "name": "Bob", "scores": [8, 10] }},
67-
{"json": { "name": "Dunce", "scores": [3, 4] }},
68-
{"json": { "name": "John", "scores": [9, 10] }}
67+
{"json": {"name": "Bob", "scores": [8, 10]}},
68+
{"json": {"name": "Dunce", "scores": [3, 4]}},
69+
{"json": {"name": "John", "scores": [9, 10]}}
6970
]
7071
for datum in expected_variant:
7172
datum.update({"insert_delete": 1})
@@ -98,5 +99,6 @@ def test_local(self):
9899

99100
pipeline.delete()
100101

102+
101103
if __name__ == '__main__':
102104
unittest.main()

0 commit comments

Comments
 (0)