Skip to content

Commit 5f203d6

Browse files
committed
py: docs: include example with runtime config
Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent 2dbdf68 commit 5f203d6

7 files changed

Lines changed: 131 additions & 15 deletions

File tree

docs.feldera.com/docs/pipelines/configuration.mdx

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Pipelines come with a set of configuration settings to toggle features, tune per
88

99
### Editing configuration
1010
.
11-
You can edit all pipeline settings when the pipeline is `Shutdown` and a limited subset when it is `Suspended`.
11+
You can edit all pipeline settings when the pipeline is `Stopped` with storage cleared, and a limited subset when it storage is in use.
1212
<Tabs
1313
defaultValue="web-console"
1414
values={[
@@ -23,7 +23,9 @@ You can edit all pipeline settings when the pipeline is `Shutdown` and a limited
2323
![Configure pipeline in web-console](webconsole_configure_pipeline.png)
2424
</TabItem>
2525
<TabItem value="python">
26-
You can use [feldera_client.create_or_update_pipeline()](pathname:///python/feldera.rest.html#feldera.rest.feldera_client.FelderaClient.create_or_update_pipeline) to update `program_config` and `runtime_config` fields.
26+
You can use `RuntimeConfig.from_dict()` to set the runtime configuration of a pipeline.
27+
28+
Example: [Runtime configuration of a Pipeline](pathname:///python/examples.html#runtime-configuration-of-a-pipeline)
2729
</TabItem>
2830
<TabItem value="fda">
2931
You can toggle the storage for a pipeline with
@@ -55,4 +57,4 @@ The "optimized" compilation profile (default) should be used when running produc
5557

5658
<div className='theme-api-markdown' >
5759
<ApiSchema pointer="#/components/schemas/ProgramConfig"/>
58-
</div>
60+
</div>

python/docs/examples.rst

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,72 @@ Creating a Pipeline (OVERWRITING existing pipelines)
7676
# This will stop and overwrite any existing pipeline with the same name.
7777
pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace()
7878
79+
Creating a Pipeline with Fault Tolerance Enabled
80+
================================================
81+
82+
.. code-block:: python
83+
84+
from feldera.runtime_config import RuntimeConfig
85+
from feldera.enums import FaultToleranceModel
86+
87+
client = FelderaClient.localhost()
88+
runtime_config = RuntimeConfig(
89+
fault_tolerance_model=FaultToleranceModel.AtLeastOnce,
90+
checkpoint_interval_secs=60
91+
)
92+
93+
pipeline = PipelineBuilder(client, name, sql, runtime_config=runtime_config).create()
94+
95+
Runtime configuration of a Pipeline
96+
===================================
97+
98+
.. code-block:: python
99+
100+
from feldera.runtime_config import RuntimeConfig
101+
102+
client = FelderaClient.localhost()
103+
config = {
104+
"workers": 8,
105+
"storage": {
106+
"backend": {
107+
"name": "default"
108+
},
109+
"min_storage_bytes": None,
110+
"min_step_storage_bytes": None,
111+
"compression": "default",
112+
"cache_mib": None
113+
},
114+
"fault_tolerance": {
115+
"model": "at_least_once",
116+
"checkpoint_interval_secs": 60
117+
},
118+
"cpu_profiler": True,
119+
"tracing": False,
120+
"tracing_endpoint_jaeger": "",
121+
"min_batch_size_records": 0,
122+
"max_buffering_delay_usecs": 0,
123+
"resources": {
124+
"cpu_cores_min": None,
125+
"cpu_cores_max": None,
126+
"memory_mb_min": None,
127+
"memory_mb_max": None,
128+
"storage_mb_max": None,
129+
"storage_class": None
130+
},
131+
"clock_resolution_usecs": 1_000_000,
132+
"pin_cpus": [],
133+
"provisioning_timeout_secs": None,
134+
"max_parallel_connector_init": None,
135+
"init_containers": None,
136+
"checkpoint_during_suspend": True,
137+
"dev_tweaks": {}
138+
}
139+
140+
runtime_config = RuntimeConfig.from_dict(config)
141+
142+
pipeline = PipelineBuilder(client, name, sql, runtime_config=runtime_config).create()
143+
144+
79145
Starting a Pipeline
80146
===================
81147

python/feldera/enums.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,3 +276,38 @@ def from_str(value):
276276

277277
def __eq__(self, other):
278278
return self.value == other.value
279+
280+
281+
class FaultToleranceModel(Enum):
282+
"""
283+
The fault tolerance model.
284+
"""
285+
286+
AtLeastOnce = 1
287+
"""
288+
Each record is output at least once. Crashes may duplicate output, but
289+
no input or output is dropped.
290+
"""
291+
292+
ExactlyOnce = 2
293+
"""
294+
Each record is output exactly once. Crashes do not drop or duplicate
295+
input or output.
296+
"""
297+
298+
def __str__(self) -> str:
299+
match self:
300+
case FaultToleranceModel.AtLeastOnce:
301+
return "at_least_once"
302+
case FaultToleranceModel.ExactlyOnce:
303+
return "exactly_once"
304+
305+
@staticmethod
306+
def from_str(value):
307+
for member in FaultToleranceModel:
308+
if str(member) == value.lower():
309+
return member
310+
311+
raise ValueError(
312+
f"Unknown value '{value}' for enum {FaultToleranceModel.__name__}"
313+
)

python/feldera/pipeline.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def input_json(
145145
:param data: The JSON encoded data to be pushed to the pipeline. The data should be in the form:
146146
`{'col1': 'val1', 'col2': 'val2'}` or `[{'col1': 'val1', 'col2': 'val2'}, {'col1': 'val1', 'col2': 'val2'}]`
147147
:param update_format: The update format of the JSON data to be pushed to the pipeline. Must be one of:
148-
"raw", "insert_delete". <https://docs.feldera.com/formats/json#the-insertdelete-format>
148+
"raw", "insert_delete". https://docs.feldera.com/formats/json#the-insertdelete-format
149149
:param force: `True` to push data even if the pipeline is paused. `False` by default.
150150
151151
:raises ValueError: If the update format is invalid.
@@ -180,7 +180,7 @@ def pause_connector(self, table_name: str, connector_name: str):
180180
All connectors are RUNNING by default.
181181
182182
Refer to the connector documentation for more information:
183-
<https://docs.feldera.com/connectors/#input-connector-orchestration>
183+
https://docs.feldera.com/connectors/#input-connector-orchestration
184184
185185
:param table_name: The name of the table that the connector is attached to.
186186
:param connector_name: The name of the connector to pause.
@@ -199,7 +199,7 @@ def resume_connector(self, table_name: str, connector_name: str):
199199
All connectors are RUNNING by default.
200200
201201
Refer to the connector documentation for more information:
202-
<https://docs.feldera.com/connectors/#input-connector-orchestration>
202+
https://docs.feldera.com/connectors/#input-connector-orchestration
203203
204204
:param table_name: The name of the table that the connector is attached to.
205205
:param connector_name: The name of the connector to resume.
@@ -533,15 +533,13 @@ def get(name: str, client: FelderaClient) -> "Pipeline":
533533

534534
def checkpoint(self, wait: bool = False, timeout_s=300) -> int:
535535
"""
536-
Checkpoints this pipeline, if fault-tolerance is enabled.
537-
Fault Tolerance in Feldera:
538-
<https://docs.feldera.com/pipelines/fault-tolerance/>
536+
Checkpoints this pipeline.
539537
540538
:param wait: If true, will block until the checkpoint completes.
541539
:param timeout_s: The maximum time (in seconds) to wait for the
542540
checkpoint to complete.
543541
544-
:raises FelderaAPIError: If checkpointing is not enabled.
542+
:raises FelderaAPIError: If enterprise features are not enabled.
545543
"""
546544

547545
seq = self.client.checkpoint_pipeline(self.name)

python/feldera/pipeline_builder.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@ class PipelineBuilder:
1010
"""
1111
A builder for creating a Feldera Pipeline.
1212
13-
:param client: The `.FelderaClient` instance
13+
:param client: The :class:`.FelderaClient` instance
1414
:param name: The name of the pipeline
1515
:param description: The description of the pipeline
1616
:param sql: The SQL code of the pipeline
1717
:param udf_rust: Rust code for UDFs
1818
:param udf_toml: Rust dependencies required by UDFs (in the TOML format)
19-
:param compilation_profile: The compilation profile to use
20-
:param runtime_config: The runtime config to use
19+
:param compilation_profile: The :class:`.CompilationProfile` to use
20+
:param runtime_config: The :class:`.RuntimeConfig` to use. Enables
21+
configuring the runtime behavior of the pipeline such as:
22+
fault tolerance, storage and :class:`.Resources`
2123
"""
2224

2325
def __init__(

python/feldera/rest/feldera_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ def clear_storage(self, pipeline_name: str, timeout_s: Optional[float] = 300):
404404

405405
def checkpoint_pipeline(self, pipeline_name: str) -> int:
406406
"""
407-
Checkpoint a fault-tolerant pipeline
407+
Checkpoint a pipeline.
408408
409409
:param pipeline_name: The name of the pipeline to checkpoint
410410
"""

python/feldera/runtime_config.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
from typing import Optional, Any, Mapping
3+
from feldera.enums import FaultToleranceModel
34

45

56
class Resources:
@@ -59,6 +60,11 @@ def __init__(
5960
class RuntimeConfig:
6061
"""
6162
Runtime configuration class to define the configuration for a pipeline.
63+
To create runtime config from a dictionary, use
64+
:meth:`.RuntimeConfig.from_dict`.
65+
66+
Documentation:
67+
https://docs.feldera.com/pipelines/configuration/#runtime-configuration
6268
"""
6369

6470
def __init__(
@@ -74,6 +80,8 @@ def __init__(
7480
provisioning_timeout_secs: Optional[int] = None,
7581
resources: Optional[Resources] = None,
7682
runtime_version: Optional[str] = None,
83+
fault_tolerance_model: Optional[FaultToleranceModel] = None,
84+
checkpoint_interval_secs: Optional[int] = None,
7785
):
7886
self.workers = workers
7987
self.tracing = tracing
@@ -86,6 +94,11 @@ def __init__(
8694
self.runtime_version = runtime_version or os.environ.get(
8795
"FELDERA_RUNTIME_VERSION"
8896
)
97+
if fault_tolerance_model is not None:
98+
self.fault_tolerance = {
99+
"model": str(fault_tolerance_model),
100+
"checkpoint_interval_secs": checkpoint_interval_secs,
101+
}
89102
if resources is not None:
90103
self.resources = resources.__dict__
91104
if isinstance(storage, bool):
@@ -100,7 +113,7 @@ def default() -> "RuntimeConfig":
100113
@classmethod
101114
def from_dict(cls, d: Mapping[str, Any]):
102115
"""
103-
Create a `.RuntimeConfig` object from a dictionary.
116+
Create a :class:`.RuntimeConfig` object from a dictionary.
104117
"""
105118

106119
conf = cls()

0 commit comments

Comments
 (0)