Skip to content

Commit 4c2fcc8

Browse files
committed
[py] docs: improve the examples section of the documentation
Fixes: #2623 Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent e84aeb1 commit 4c2fcc8

File tree

3 files changed

+203
-6
lines changed

3 files changed

+203
-6
lines changed

python/docs/examples.rst

Lines changed: 201 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,205 @@
11
Examples
2-
========
2+
~~~~~~~~
3+
4+
Connecting to Feldera Sandbox
5+
=============================
6+
7+
.. code-block:: python
8+
9+
from feldera import FelderaClient, PipelineBuilder
10+
11+
client = FelderaClient('https://try.feldera.com', api_key=api_key)
12+
13+
pipeline = PipelineBuilder(client, name, sql).create()
14+
15+
Connecting to Feldera on localhost
16+
==================================
17+
18+
.. code-block:: python
19+
20+
from feldera import FelderaClient, PipelineBuilder
21+
22+
client = FelderaClient('https://try.feldera.com', api_key=api_key)
23+
24+
pipeline = PipelineBuilder(client, name, sql).create()
25+
26+
Creating a Pipeline
27+
===================
28+
29+
.. code-block:: python
30+
31+
sql = """
32+
CREATE TABLE student (
33+
name STRING,
34+
id INT
35+
);
36+
37+
CREATE TABLE grades (
38+
student_id INT,
39+
science INT,
40+
maths INT,
41+
art INT
42+
);
43+
44+
CREATE VIEW average_scores AS SELECT name, ((science + maths + art) / 3) as average FROM {TBL_NAMES[0]} JOIN {TBL_NAMES[1]} on id = student_id ORDER BY average DESC;
45+
"""
46+
47+
pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace()
48+
49+
Starting a Pipeline
50+
===================
51+
52+
.. code-block:: python
53+
54+
pipeline.start()
55+
56+
57+
Using Pandas DataFrames
58+
=======================
59+
60+
.. code-block:: python
61+
62+
# populate pandas dataframes
63+
df_students = pd.read_csv('students.csv')
64+
df_grades = pd.read_csv('grades.csv')
65+
66+
# subscribe to listen to outputs from a view
67+
out = pipeline.listen("average_scores")
68+
69+
pipeline.start()
70+
71+
# feed pandas dataframes as input
72+
pipeline.input_pandas(TBL_NAMES[0], df_students)
73+
pipeline.input_pandas(TBL_NAMES[1], df_grades)
74+
75+
# wait for the pipeline to complete and shutdown
76+
pipeline.wait_for_completion(True)
77+
78+
# get the output of the view as a pandas dataframe
79+
df = out.to_pandas()
80+
81+
# delete the pipeline
82+
pipeline.delete()
83+
84+
Iterating over Output Chunks
85+
============================
86+
87+
Use :meth:`.foreach_chunk` to process each chunk of data from a view or table.
88+
It takes a callback, and calls the callback on each chunk of received data.
89+
90+
.. code-block:: python
91+
92+
# define your callback to run on every chunk of data received
93+
# ensure that it takes two parameters, the chunk (DataFrame) and the sequence number
94+
def callback(df: pd.DataFrame, seq_no: int):
95+
print(f"\nSeq No: {seq_no}, DF size: {df.shape[0]}\n")
96+
97+
pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace()
98+
99+
# register the callback for data received from the selected view
100+
pipeline.foreach_chunk(view_name, callback)
101+
102+
# run the pipeline
103+
pipeline.start()
104+
pipeline.input_pandas(table_name, df)
105+
106+
# wait for the pipeline to finish and shutdown
107+
pipeline.wait_for_completion(True)
108+
pipeline.delete()
109+
110+
Waiting for Completion
111+
======================
112+
113+
To wait (block) till the pipeline has been completed, use :meth:`.Pipeline.wait_for_completion`.
114+
115+
.. code-block:: python
116+
117+
pipeline.wait_for_completion()
118+
119+
Optionally, to shutdown the pipeline after completion:
120+
121+
.. code-block:: python
122+
123+
pipeline.wait_for_completion(shutdown=True)
124+
125+
.. warning::
126+
If the data source is streaming, this will block forever.
127+
128+
End-to-End Example with Kafka Sink
129+
==================================
130+
131+
This example shows creating and running a pipeline with Feldera's internal data generator and writing to a Kafka sink.
132+
133+
.. code-block:: python
134+
135+
from feldera import FelderaClient, PipelineBuilder
136+
137+
client = FelderaClient('http://localhost:8080')
138+
139+
sql = """
140+
CREATE TABLE Stocks (
141+
symbol VARCHAR NOT NULL,
142+
price_time BIGINT NOT NULL, -- UNIX timestamp
143+
price DOUBLE NOT NULL
144+
) with (
145+
'connectors' = '[{
146+
"transport": {
147+
"name": "datagen",
148+
"config": {
149+
"plan": [{
150+
"limit": 5,
151+
"rate": 1,
152+
"fields": {
153+
"symbol": { "values": ["AAPL", "GOOGL", "SPY", "NVDA"] },
154+
"price": { "strategy": "uniform", "range": [100, 10000] }
155+
}
156+
}]
157+
}
158+
}
159+
}]'
160+
);
161+
162+
CREATE VIEW googl_stocks
163+
WITH (
164+
'connectors' = '[
165+
{
166+
"name": "kafka-3",
167+
"transport": {
168+
"name": "kafka_output",
169+
"config": {
170+
"bootstrap.servers": "localhost:9092",
171+
"topic": "googl_stocks",
172+
"auto.offset.reset": "earliest"
173+
}
174+
},
175+
"format": {
176+
"name": "json",
177+
"config": {
178+
"update_format": "insert_delete",
179+
"array": false
180+
}
181+
}
182+
}
183+
]'
184+
)
185+
AS SELECT * FROM Stocks WHERE symbol = 'GOOGL';
186+
"""
187+
188+
pipeline = PipelineBuilder(client, name="kafka_example", sql=sql).create_or_replace()
189+
190+
out = pipeline.listen("googl_stocks")
191+
pipeline.start()
192+
193+
# important: `wait_for_completion` will block forever here
194+
pipeline.wait_for_idle()
195+
pipeline.shutdown()
196+
df = out.to_pandas()
197+
assert df.shape[0] != 0
198+
199+
pipeline.delete()
3200
4201
Specifying Data Sources / Sinks
5-
*******************************
202+
===============================
6203

7-
To connect Feldera to data sources or sinks, you can specify them in the SQL code.
8-
Refer to the connector documentation at: https://github.com/feldera/feldera/tree/main/docs/connectors
204+
To connect Feldera to various data sources or sinks, you can define them in the SQL code.
205+
Refer to the connector documentation at: https://docs.feldera.com/connectors/

python/docs/introduction.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,6 @@ Key Concepts
124124

125125
.. warning::
126126
If the data source is streaming, this will block forever.
127-
In such cases, use :meth:`.Pipeline.start` instead.
127+
In such cases, use :meth:`.Pipeline.wait_for_idle` instead.
128128

129129
Checkout the :doc:`/examples`.

python/tests/test_pipeline_builder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,8 @@ def callback(df: pd.DataFrame, seq_no: int):
218218
df_students = pd.read_csv('students.csv')
219219
df_grades = pd.read_csv('grades.csv')
220220

221-
pipeline.start()
222221
pipeline.foreach_chunk(view_name, callback)
222+
pipeline.start()
223223

224224
pipeline.input_pandas(TBL_NAMES[0], df_students)
225225
pipeline.input_pandas(TBL_NAMES[1], df_grades)

0 commit comments

Comments
 (0)