Feldera Python is the Feldera SDK for Python developers.
pip install felderapip install git+https://github.com/feldera/feldera#subdirectory=pythonSimilarly, to install from a specific branch:
$ pip install git+https://github.com/feldera/feldera@{BRANCH_NAME}#subdirectory=pythonReplace {BRANCH_NAME} with the name of the branch you want to install from.
If you have cloned the Feldera repo, you can install the python SDK as follows:
# the Feldera Python SDK is present inside the python/ directory
pip install python/The Python SDK documentation is available at Feldera Python SDK Docs.
To build the html documentation run:
Ensure that you have sphinx installed. If not, install it using pip install sphinx.
Then run the following commands:
cd docs
sphinx-apidoc -o . ../feldera
make htmlTo clean the build, run make clean.
To run unit tests:
cd python && python3 -m pytest tests/- This will detect and run all test files that match the pattern
test_*.pyor*_test.py. - By default, the tests expect a running Feldera instance at
http://localhost:8080. To override the default endpoint, set theFELDERA_BASE_URLenvironment variable.
To run tests from a specific file:
(cd python && python3 -m pytest ./tests/path-to-file.py)The aggregate tests validate end-to-end correctness of SQL functionality. To run the aggregate tests use:
cd python
PYTHONPATH=`pwd` python3 ./tests/aggregate_tests/main.pyTo reduce redundant compilation cycles during testing:
- Inherit from
SharedTestPipelineinstead ofunittest.TestCase. - Define DDLs (e.g.,
CREATE TABLE,CREATE VIEW) in the docstring of each test method.- All DDLs from all test functions in the class are combined and compiled into a single pipeline.
- If a table or view is already defined in one test, it can be used directly in others without redefinition.
- Ensure that all table and view names are unique within the class.
- Use
@enterprise_onlyon tests that require Enterprise features. Their DDLs will be skipped on OSS builds. - Use
self.set_runtime_config(...)to override the default pipeline config.- Reset it at the end using
self.reset_runtime_config().
- Reset it at the end using
- Access the shared pipeline via
self.pipeline.
from tests.shared_test_pipeline import SharedTestPipeline
class TestAverage(SharedTestPipeline):
def test_average(self):
"""
CREATE TABLE students(id INT, name STRING);
CREATE MATERIALIZED VIEW v AS SELECT * FROM students;
"""
...
self.pipeline.start()
self.pipeline.input_pandas("students", df)
self.pipeline.wait_for_completion(True)
...Use Ruff to run the lint checks that will be executed by the precommit hook when a PR is submitted:
ruff check python/To reformat the code in the same way as the precommit hook:
ruff format