Skip to content

Commit cb41db9

Browse files
abhizergz
authored andcommitted
py: implement foreach_chunk method for streaming HTTP output
Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent ee8b8f8 commit cb41db9

File tree

5 files changed

+138
-5
lines changed

5 files changed

+138
-5
lines changed

python/feldera/_callback_runner.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from threading import Thread
2+
from typing import Callable
3+
from queue import Queue, Empty
4+
5+
import pandas as pd
6+
from feldera import FelderaClient
7+
from feldera._helpers import dataframe_from_response
8+
from feldera.output_handler import _OutputHandlerInstruction
9+
10+
11+
class CallbackRunner(Thread):
12+
def __init__(
13+
self,
14+
client: FelderaClient,
15+
pipeline_name: str,
16+
view_name: str,
17+
callback: Callable[[pd.DataFrame, int], None],
18+
queue: Queue,
19+
):
20+
super().__init__()
21+
self.client: FelderaClient = client
22+
self.pipeline_name: str = pipeline_name
23+
self.view_name: str = view_name
24+
self.callback: Callable[[pd.DataFrame, int], None] = callback
25+
self.queue: Queue = queue
26+
27+
def run(self):
28+
"""
29+
The main loop of the thread. Listens for data and calls the callback function on each chunk of data received.
30+
31+
:meta private:
32+
"""
33+
34+
ack: _OutputHandlerInstruction = self.queue.get()
35+
36+
match ack:
37+
case _OutputHandlerInstruction.PipelineStarted:
38+
gen_obj = self.client.listen_to_pipeline(self.pipeline_name, self.view_name, format="json")
39+
self.queue.task_done()
40+
41+
for chunk in gen_obj:
42+
chunk: dict = chunk
43+
data: list[dict] = chunk.get("json_data")
44+
seq_no: int = chunk.get("sequence_number")
45+
46+
if data is not None:
47+
self.callback(dataframe_from_response([data]), seq_no)
48+
49+
try:
50+
again_ack = self.queue.get_nowait()
51+
if again_ack:
52+
match again_ack:
53+
case _OutputHandlerInstruction.RanToCompletion:
54+
self.queue.task_done()
55+
return
56+
case _OutputHandlerInstruction.PipelineStarted:
57+
self.queue.task_done()
58+
continue
59+
except Empty:
60+
continue
61+
62+
case _OutputHandlerInstruction.RanToCompletion:
63+
self.queue.task_done()
64+
return

python/feldera/_helpers.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import pandas as pd
2+
3+
4+
def dataframe_from_response(buffer: list[list[dict]]):
5+
"""
6+
Converts the response from Feldera to a pandas DataFrame.
7+
"""
8+
return pd.DataFrame([
9+
{**item['insert'], 'insert_delete': 1} if 'insert' in item else {**item['delete'], 'insert_delete': -1}
10+
for sublist in buffer for item in sublist
11+
])

python/feldera/output_handler.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from threading import Thread
55
from queue import Queue, Empty
66
from feldera import FelderaClient
7+
from feldera._helpers import dataframe_from_response
78
from enum import Enum
89

910

@@ -66,7 +67,4 @@ def to_pandas(self):
6667
Converts the output of the pipeline to a pandas DataFrame
6768
"""
6869
self.join()
69-
return pd.DataFrame([
70-
{**item['insert'], 'insert_delete': 1} if 'insert' in item else {**item['delete'], 'insert_delete': -1}
71-
for sublist in self.buffer for item in sublist
72-
])
70+
return dataframe_from_response(self.buffer)

python/feldera/sql_context.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import pandas
33
import re
44

5-
from typing import Optional, Dict
5+
from typing import Optional, Dict, Callable
66
from typing_extensions import Self
77
from queue import Queue
88

@@ -14,6 +14,7 @@
1414
from feldera.sql_schema import SQLSchema
1515
from feldera.output_handler import OutputHandler
1616
from feldera.output_handler import _OutputHandlerInstruction
17+
from feldera._callback_runner import CallbackRunner
1718
from enum import Enum
1819

1920

@@ -241,6 +242,9 @@ def listen(self, view_name: str) -> OutputHandler:
241242
Listens to the output of the provided view so that it is available in the notebook / python code.
242243
243244
:param view_name: The name of the view to listen to.
245+
246+
.. note::
247+
- This method must be called before calling :meth:`.run_to_completion`, or :meth:`.start`.
244248
"""
245249

246250
queue = Queue(maxsize=1)
@@ -310,6 +314,32 @@ def connect_sink_delta_table(self, view_name: str, connector_name: str, config:
310314
else:
311315
self.output_connectors_buffer[view_name] = [connector]
312316

317+
def foreach_chunk(self, view_name: str, callback: Callable[[pandas.DataFrame, int], None]):
318+
"""
319+
Runs the given callback on each chunk of the output of the specified view.
320+
321+
:param view_name: The name of the view.
322+
:param callback: The callback to run on each chunk. The callback should take two arguments:
323+
324+
- **chunk** -> The chunk as a pandas DataFrame
325+
- **seq_no** -> The sequence number. The sequence number is a monotonically increasing integer that
326+
starts from 0. Note that the sequence number is unique for each chunk, but not necessarily contiguous.
327+
328+
Please note that the callback is run in a separate thread, so it should be thread-safe.
329+
330+
.. note::
331+
- The callback must be thread-safe as it will be run in a separate thread.
332+
- This method must be called before calling :meth:`.run_to_completion`, or :meth:`.start`.
333+
334+
"""
335+
336+
queue = Queue(maxsize=1)
337+
338+
self.views_tx.append({view_name: queue})
339+
340+
handler = CallbackRunner(self.client, self.pipeline_name, view_name, callback, queue)
341+
handler.start()
342+
313343
def run_to_completion(self):
314344
"""
315345
.. _run_to_completion:

python/tests/test_wireframes.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,36 @@ def test_two_SQLContexts(self):
7474

7575
assert df.columns.tolist() not in df2.columns.tolist()
7676

77+
def test_foreach_chunk(self):
78+
def callback(df: pd.DataFrame, seq_no: int):
79+
print(f"\nSeq No: {seq_no}, DF size: {df.shape[0]}\n")
80+
81+
sql = SQLContext('foreach_chunk', TEST_CLIENT).get_or_create()
82+
83+
TBL_NAMES = ['students', 'grades']
84+
view_name = "average_scores"
85+
86+
df_students = pd.read_csv('students.csv')
87+
df_grades = pd.read_csv('grades.csv')
88+
89+
sql.register_table(TBL_NAMES[0], SQLSchema({"name": "STRING", "id": "INT"}))
90+
sql.register_table(TBL_NAMES[1], SQLSchema({
91+
"student_id": "INT",
92+
"science": "INT",
93+
"maths": "INT",
94+
"art": "INT"
95+
}))
96+
97+
query = f"SELECT name, ((science + maths + art) / 3) as average FROM {TBL_NAMES[0]} JOIN {TBL_NAMES[1]} on id = student_id ORDER BY average DESC"
98+
sql.register_view(view_name, query)
99+
100+
sql.connect_source_pandas(TBL_NAMES[0], df_students)
101+
sql.connect_source_pandas(TBL_NAMES[1], df_grades)
102+
103+
sql.foreach_chunk(view_name, callback)
104+
105+
sql.run_to_completion()
106+
77107

78108
if __name__ == '__main__':
79109
unittest.main()

0 commit comments

Comments
 (0)