|
1 | 1 | Examples |
2 | | -======== |
| 2 | +~~~~~~~~ |
| 3 | + |
| 4 | +Connecting to Feldera Sandbox |
| 5 | +============================= |
| 6 | + |
| 7 | +.. code-block:: python |
| 8 | +
|
| 9 | + from feldera import FelderaClient, PipelineBuilder |
| 10 | +
|
| 11 | + client = FelderaClient('https://try.feldera.com', api_key=api_key) |
| 12 | +
|
| 13 | + pipeline = PipelineBuilder(client, name, sql).create() |
| 14 | +
|
| 15 | +Connecting to Feldera on localhost |
| 16 | +================================== |
| 17 | + |
| 18 | +.. code-block:: python |
| 19 | +
|
| 20 | + from feldera import FelderaClient, PipelineBuilder |
| 21 | +
|
| 22 | + client = FelderaClient('https://try.feldera.com', api_key=api_key) |
| 23 | +
|
| 24 | + pipeline = PipelineBuilder(client, name, sql).create() |
| 25 | +
|
| 26 | +Creating a Pipeline |
| 27 | +=================== |
| 28 | + |
| 29 | +.. code-block:: python |
| 30 | +
|
| 31 | + sql = """ |
| 32 | + CREATE TABLE student ( |
| 33 | + name STRING, |
| 34 | + id INT |
| 35 | + ); |
| 36 | +
|
| 37 | + CREATE TABLE grades ( |
| 38 | + student_id INT, |
| 39 | + science INT, |
| 40 | + maths INT, |
| 41 | + art INT |
| 42 | + ); |
| 43 | +
|
| 44 | + CREATE VIEW average_scores AS SELECT name, ((science + maths + art) / 3) as average FROM {TBL_NAMES[0]} JOIN {TBL_NAMES[1]} on id = student_id ORDER BY average DESC; |
| 45 | + """ |
| 46 | +
|
| 47 | + pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace() |
| 48 | +
|
| 49 | +Starting a Pipeline |
| 50 | +=================== |
| 51 | + |
| 52 | +.. code-block:: python |
| 53 | +
|
| 54 | + pipeline.start() |
| 55 | +
|
| 56 | +
|
| 57 | +Using Pandas DataFrames |
| 58 | +======================= |
| 59 | + |
| 60 | +.. code-block:: python |
| 61 | +
|
| 62 | + # populate pandas dataframes |
| 63 | + df_students = pd.read_csv('students.csv') |
| 64 | + df_grades = pd.read_csv('grades.csv') |
| 65 | +
|
| 66 | + # subscribe to listen to outputs from a view |
| 67 | + out = pipeline.listen("average_scores") |
| 68 | +
|
| 69 | + pipeline.start() |
| 70 | +
|
| 71 | + # feed pandas dataframes as input |
| 72 | + pipeline.input_pandas(TBL_NAMES[0], df_students) |
| 73 | + pipeline.input_pandas(TBL_NAMES[1], df_grades) |
| 74 | +
|
| 75 | + # wait for the pipeline to complete and shutdown |
| 76 | + pipeline.wait_for_completion(True) |
| 77 | +
|
| 78 | + # get the output of the view as a pandas dataframe |
| 79 | + df = out.to_pandas() |
| 80 | +
|
| 81 | + # delete the pipeline |
| 82 | + pipeline.delete() |
| 83 | +
|
| 84 | +Iterating over Output Chunks |
| 85 | +============================ |
| 86 | + |
| 87 | +Use :meth:`.foreach_chunk` to process each chunk of data from a view or table. |
| 88 | +It takes a callback, and calls the callback on each chunk of received data. |
| 89 | + |
| 90 | +.. code-block:: python |
| 91 | +
|
| 92 | + # define your callback to run on every chunk of data received |
| 93 | + # ensure that it takes two parameters, the chunk (DataFrame) and the sequence number |
| 94 | + def callback(df: pd.DataFrame, seq_no: int): |
| 95 | + print(f"\nSeq No: {seq_no}, DF size: {df.shape[0]}\n") |
| 96 | +
|
| 97 | + pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace() |
| 98 | +
|
| 99 | + # register the callback for data received from the selected view |
| 100 | + pipeline.foreach_chunk(view_name, callback) |
| 101 | +
|
| 102 | + # run the pipeline |
| 103 | + pipeline.start() |
| 104 | + pipeline.input_pandas(table_name, df) |
| 105 | +
|
| 106 | + # wait for the pipeline to finish and shutdown |
| 107 | + pipeline.wait_for_completion(True) |
| 108 | + pipeline.delete() |
| 109 | +
|
| 110 | +Waiting for Completion |
| 111 | +====================== |
| 112 | + |
| 113 | +To wait (block) till the pipeline has been completed, use :meth:`.Pipeline.wait_for_completion`. |
| 114 | + |
| 115 | +.. code-block:: python |
| 116 | +
|
| 117 | + pipeline.wait_for_completion() |
| 118 | +
|
| 119 | +Optionally, to shutdown the pipeline after completion: |
| 120 | + |
| 121 | +.. code-block:: python |
| 122 | +
|
| 123 | + pipeline.wait_for_completion(shutdown=True) |
| 124 | +
|
| 125 | +.. warning:: |
| 126 | + If the data source is streaming, this will block forever. |
| 127 | + |
| 128 | +End-to-End Example with Kafka Sink |
| 129 | +================================== |
| 130 | + |
| 131 | +This example shows creating and running a pipeline with Feldera's internal data generator and writing to a Kafka sink. |
| 132 | + |
| 133 | +.. code-block:: python |
| 134 | +
|
| 135 | + from feldera import FelderaClient, PipelineBuilder |
| 136 | +
|
| 137 | + client = FelderaClient('http://localhost:8080') |
| 138 | +
|
| 139 | + sql = """ |
| 140 | + CREATE TABLE Stocks ( |
| 141 | + symbol VARCHAR NOT NULL, |
| 142 | + price_time BIGINT NOT NULL, -- UNIX timestamp |
| 143 | + price DOUBLE NOT NULL |
| 144 | + ) with ( |
| 145 | + 'connectors' = '[{ |
| 146 | + "transport": { |
| 147 | + "name": "datagen", |
| 148 | + "config": { |
| 149 | + "plan": [{ |
| 150 | + "limit": 5, |
| 151 | + "rate": 1, |
| 152 | + "fields": { |
| 153 | + "symbol": { "values": ["AAPL", "GOOGL", "SPY", "NVDA"] }, |
| 154 | + "price": { "strategy": "uniform", "range": [100, 10000] } |
| 155 | + } |
| 156 | + }] |
| 157 | + } |
| 158 | + } |
| 159 | + }]' |
| 160 | + ); |
| 161 | +
|
| 162 | + CREATE VIEW googl_stocks |
| 163 | + WITH ( |
| 164 | + 'connectors' = '[ |
| 165 | + { |
| 166 | + "name": "kafka-3", |
| 167 | + "transport": { |
| 168 | + "name": "kafka_output", |
| 169 | + "config": { |
| 170 | + "bootstrap.servers": "localhost:9092", |
| 171 | + "topic": "googl_stocks", |
| 172 | + "auto.offset.reset": "earliest" |
| 173 | + } |
| 174 | + }, |
| 175 | + "format": { |
| 176 | + "name": "json", |
| 177 | + "config": { |
| 178 | + "update_format": "insert_delete", |
| 179 | + "array": false |
| 180 | + } |
| 181 | + } |
| 182 | + } |
| 183 | + ]' |
| 184 | + ) |
| 185 | + AS SELECT * FROM Stocks WHERE symbol = 'GOOGL'; |
| 186 | + """ |
| 187 | +
|
| 188 | + pipeline = PipelineBuilder(client, name="kafka_example", sql=sql).create_or_replace() |
| 189 | +
|
| 190 | + out = pipeline.listen("googl_stocks") |
| 191 | + pipeline.start() |
| 192 | +
|
| 193 | + # important: `wait_for_completion` will block forever here |
| 194 | + pipeline.wait_for_idle() |
| 195 | + pipeline.shutdown() |
| 196 | + df = out.to_pandas() |
| 197 | + assert df.shape[0] != 0 |
| 198 | +
|
| 199 | + pipeline.delete() |
3 | 200 |
|
4 | 201 | Specifying Data Sources / Sinks |
5 | | -******************************* |
| 202 | +=============================== |
6 | 203 |
|
7 | | -To connect Feldera to data sources or sinks, you can specify them in the SQL code. |
8 | | -Refer to the connector documentation at: https://github.com/feldera/feldera/tree/main/docs/connectors |
| 204 | +To connect Feldera to various data sources or sinks, you can define them in the SQL code. |
| 205 | +Refer to the connector documentation at: https://docs.feldera.com/connectors/ |
0 commit comments