-
Notifications
You must be signed in to change notification settings - Fork 112
feat(py): A dbt adapter for Feldera (not a Feldera adapter, but a dbt adapter, that is wrapping Feldera) #5950
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
mihaibudiu
merged 75 commits into
feldera:main
from
mdrakiburrahman:dev/mdrrahman/dbt-feldera
Apr 16, 2026
Merged
Changes from 1 commit
Commits
Show all changes
75 commits
Select commit
Hold shift + click to select a range
8810072
Checkpoint: Initial adapter bootstrapping and e2e tests with Docker C…
mdrakiburrahman 04438af
Fixed catalog gen and add some steps for accessing dbt UI
mdrakiburrahman 44448ab
Materialize Adventureworks as Delta Table and use DuckDB to assert ro…
mdrakiburrahman 6635196
Adding a Kafka sales table and ensuring it fires IVM
mdrakiburrahman e5af63c
Materialize the Kafka topic as Delta table and use that instead in th…
mdrakiburrahman 709b25a
Use sqlglot instead of brittle RegEx to parse Feldera SQL dialect
mdrakiburrahman 04b7b38
Break apart contributing and customer facing README
mdrakiburrahman a06c699
Update changelog
mdrakiburrahman d8afa71
Merge branch 'main' into dev/mdrrahman/dbt-feldera
mdrakiburrahman 0283a3d
Proofread
mdrakiburrahman 34d0e35
Remove test_string_size override to implement the base behavior
mdrakiburrahman 7ceaa8f
Differentiate between float vs numeric
mdrakiburrahman 3fb8dcc
Update docstring to mention Feldera transactions
mdrakiburrahman 56de322
Use NOW and add a test for it
mdrakiburrahman a870d0b
Update docstring for the expand column types
mdrakiburrahman 22fd48e
Use better type inference in seed via _infer_column_types.
mdrakiburrahman c10e3d0
Add an idempotent seed uploader and downloader
mdrakiburrahman dc4df22
Remove seeds from git in origin
mdrakiburrahman 24c7b86
Remove the models and macros as well
mdrakiburrahman ec8cf4c
Add a gitkeep to the seed folders so they are visible in git tree
mdrakiburrahman 044063c
Merge branch 'main' into dev/mdrrahman/dbt-feldera
mdrakiburrahman 3f6809b
Remove GitHub action changes - no tests in publish and remove CI test…
mdrakiburrahman 0c70a30
Use the ci-post-release.yml sed to change the version
mdrakiburrahman 2f2dcb9
Update changelog
mdrakiburrahman ec20ba2
Use Pipeline client instead of FelderaClient.get_pipeline and a few o…
mdrakiburrahman 8046bf6
compiling_states needs to be a tuple in PipelineStateManager due to P…
mdrakiburrahman a07217d
Attempt to lint for CI pipeline
mdrakiburrahman 2cafef7
Merge branch 'main' into dev/mdrrahman/dbt-feldera
mdrakiburrahman fd88b16
Merge branch 'main' into dev/mdrrahman/dbt-feldera
mdrakiburrahman 8700b56
fix(py): fix ruff isort import ordering in dbt-feldera
mdrakiburrahman 7e05d2f
Merge branch 'main' into dev/mdrrahman/dbt-feldera
mdrakiburrahman 0280c4d
fix(py): pin ruff==0.9.10 in dbt-feldera to match CI pre-commit
mdrakiburrahman f698020
Update docstring to make it clear that this adapter uses continuous q…
mdrakiburrahman d94f0c8
Add documentation for TYPE_LABELS
mdrakiburrahman b5fb950
Update param docstring for auto_begin
mdrakiburrahman 9ecf1b2
Turn positional tuples into a ColumnDescription NamedTuple
mdrakiburrahman 8413b74
Add a docstring to connection.execute that it delegates to the cursor…
mdrakiburrahman 018cd38
Update docstring for cursor.execute
mdrakiburrahman 64afc65
Add a docstring to SqlIntent to make it clear which enum is supported…
mdrakiburrahman 5c3dd4e
Add a small docstring to DATA_INGRESS
mdrakiburrahman de5b06f
Merge branch 'main' into dev/mdrrahman/dbt-feldera
mdrakiburrahman 1859709
Merge branch 'main' into dev/mdrrahman/dbt-feldera
mdrakiburrahman 4d43b45
Clean .temp dir
mdrakiburrahman 2687e25
Merge branch 'main' into dev/mdrrahman/dbt-feldera
mdrakiburrahman 95a54f7
Merge branch 'main' into dev/mdrrahman/dbt-feldera
mdrakiburrahman b859260
Merge branch 'main' into dev/mdrrahman/dbt-feldera
mdrakiburrahman 6f68d81
Merge branch 'feldera:main' into dev/mdrrahman/dbt-feldera
mdrakiburrahman 0a45510
Merge branch 'main' into dev/mdrrahman/dbt-feldera
mdrakiburrahman 1d8fdcd
Move DuckDB into a container for tests so we don't need host mount
mdrakiburrahman 749e793
Add adventureworks SQL scripts to git, remove .gitkeep files
mdrakiburrahman 9aa8846
Pass 1 through PR review comments
mdrakiburrahman 37f077e
Implement Generator in sqlglot to override FLOAT and map it to REAL
mdrakiburrahman edf302b
Merge branch 'main' into dev/mdrrahman/dbt-feldera
mdrakiburrahman a64ae10
Bump version to 0.288.0
mdrakiburrahman da6aa5e
Map FLOAT to REAL in dbt macro
mdrakiburrahman ca62160
Update README
mdrakiburrahman 29c6c7c
Remove incremental materialization support, it's not required in Feldera
mdrakiburrahman da853cb
Changed Jinja macro to stored = true
mdrakiburrahman 28fa16f
Remove ambiguous "registered" reference
mdrakiburrahman ac59e4d
Make TestFelderaColumn dynamic
mdrakiburrahman b5482d3
Remove external from FelderaRelationType
mdrakiburrahman 7465e32
Remove test for no columns
mdrakiburrahman a6f2801
Factor out CATALOG_COLUMNS
mdrakiburrahman e4c50f7
Remove INTERVAL mapping in agate and run integration tests green
mdrakiburrahman dc3416c
Add a _wait_for_pipeline_idle to deterministically poll pipeline to p…
mdrakiburrahman b0150a6
Add lattice tests that are data driven
mdrakiburrahman 3f04fb6
Rename test to test_remove_nonexistent_does_not_throw
mdrakiburrahman 393d5cd
Crash test if Kafka topic isn't up and dump logs
mdrakiburrahman 2d8c874
Update docstring for update_with_views
mdrakiburrahman ab70529
Lint
mdrakiburrahman 61123e3
Dupe line
mdrakiburrahman 05e7eb5
Remove NOW and use MD5 hash
mdrakiburrahman e4e07b6
There is no relationship between connectors and views being stored
mdrakiburrahman e1ce519
Add DBT_THREADS for faster integration tests
mdrakiburrahman b0dca82
Ruff the integration test folder
mdrakiburrahman File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Fixed catalog gen and add some steps for accessing dbt UI
- Loading branch information
commit 04438aff438bbd7f7509e1a665748ecd914cc18f
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| import unittest | ||
| from unittest.mock import MagicMock | ||
|
|
||
| from dbt.adapters.feldera.column import FelderaColumn | ||
| from dbt.adapters.feldera.impl import FelderaAdapter | ||
| from dbt.adapters.feldera.relation import FelderaRelation | ||
|
|
||
| CATALOG_COLUMNS = [ | ||
| "table_database", | ||
| "table_schema", | ||
| "table_name", | ||
| "table_type", | ||
| "table_comment", | ||
| "table_owner", | ||
| "column_name", | ||
| "column_index", | ||
| "column_type", | ||
| "column_comment", | ||
| ] | ||
|
|
||
|
|
||
| class TestGetOneCatalog(unittest.TestCase): | ||
| """Tests for FelderaAdapter._get_one_catalog.""" | ||
|
|
||
| def _build_adapter_mock(self, relations, columns_by_relation): | ||
| """Return a mock that looks enough like a FelderaAdapter.""" | ||
| adapter = MagicMock(spec=FelderaAdapter) | ||
| adapter.Relation = FelderaRelation | ||
| adapter.list_relations_without_caching = MagicMock(return_value=relations) | ||
| adapter.get_columns_in_relation = MagicMock(side_effect=lambda r: columns_by_relation.get(r.identifier, [])) | ||
| adapter._catalog_filter_table = MagicMock(side_effect=lambda table, _: table) | ||
| # Bind the real method so we can call it | ||
| adapter._get_one_catalog = FelderaAdapter._get_one_catalog.__get__(adapter) | ||
| return adapter | ||
|
|
||
| def _make_info_schema(self, database="default"): | ||
| info = MagicMock() | ||
| info.database = database | ||
| return info | ||
|
|
||
| def test_empty_schemas(self): | ||
| """No schemas → empty catalog.""" | ||
| adapter = self._build_adapter_mock([], {}) | ||
| info = self._make_info_schema() | ||
| table = adapter._get_one_catalog(info, set(), frozenset()) | ||
| self.assertEqual(len(table), 0) | ||
| self.assertEqual(list(table.column_names), CATALOG_COLUMNS) | ||
|
|
||
| def test_schema_with_no_relations(self): | ||
| """A schema exists but has no relations → empty catalog.""" | ||
| adapter = self._build_adapter_mock([], {}) | ||
| info = self._make_info_schema() | ||
| table = adapter._get_one_catalog(info, {"my_pipeline"}, frozenset()) | ||
| self.assertEqual(len(table), 0) | ||
|
|
||
| def test_single_relation_with_columns(self): | ||
| """One relation with two columns → two catalog rows with correct data.""" | ||
| rel = FelderaRelation.create(database="default", schema="my_pipeline", identifier="users") | ||
| cols = [ | ||
| FelderaColumn(column="id", dtype="INTEGER"), | ||
|
mdrakiburrahman marked this conversation as resolved.
|
||
| FelderaColumn(column="name", dtype="VARCHAR"), | ||
| ] | ||
| adapter = self._build_adapter_mock([rel], {"users": cols}) | ||
| info = self._make_info_schema() | ||
|
|
||
| table = adapter._get_one_catalog( | ||
| info, | ||
| {"my_pipeline"}, | ||
| frozenset(), | ||
| ) | ||
|
|
||
| self.assertEqual(len(table), 2) | ||
| self.assertEqual(list(table.column_names), CATALOG_COLUMNS) | ||
|
|
||
| row0 = dict(zip(table.column_names, table.rows[0].values())) | ||
| self.assertEqual(row0["table_database"], "default") | ||
| self.assertEqual(row0["table_schema"], "my_pipeline") | ||
| self.assertEqual(row0["table_name"], "users") | ||
| self.assertEqual(row0["column_name"], "id") | ||
| self.assertEqual(row0["column_index"], 1) | ||
| self.assertEqual(row0["column_type"], "INTEGER") | ||
|
|
||
| row1 = dict(zip(table.column_names, table.rows[1].values())) | ||
| self.assertEqual(row1["column_name"], "name") | ||
| self.assertEqual(row1["column_index"], 2) | ||
| self.assertEqual(row1["column_type"], "VARCHAR") | ||
|
|
||
| def test_multiple_relations(self): | ||
| """Two relations → catalog contains rows for both.""" | ||
| rel_users = FelderaRelation.create(database="default", schema="pipe", identifier="users") | ||
| rel_orders = FelderaRelation.create(database="default", schema="pipe", identifier="orders") | ||
| cols_users = [FelderaColumn(column="id", dtype="INTEGER")] | ||
| cols_orders = [ | ||
| FelderaColumn(column="order_id", dtype="BIGINT"), | ||
| FelderaColumn(column="total", dtype="DECIMAL"), | ||
| ] | ||
| adapter = self._build_adapter_mock( | ||
| [rel_users, rel_orders], | ||
| {"users": cols_users, "orders": cols_orders}, | ||
| ) | ||
| info = self._make_info_schema() | ||
|
|
||
| table = adapter._get_one_catalog(info, {"pipe"}, frozenset()) | ||
|
|
||
| self.assertEqual(len(table), 3) | ||
| names = [row["table_name"] for row in table] | ||
| self.assertIn("users", names) | ||
| self.assertIn("orders", names) | ||
|
|
||
| def test_relation_with_no_columns(self): | ||
| """Relation with no columns → no rows for that relation.""" | ||
|
mdrakiburrahman marked this conversation as resolved.
Outdated
|
||
| rel = FelderaRelation.create(database="default", schema="pipe", identifier="empty_table") | ||
| adapter = self._build_adapter_mock([rel], {"empty_table": []}) | ||
| info = self._make_info_schema() | ||
|
|
||
| table = adapter._get_one_catalog(info, {"pipe"}, frozenset()) | ||
| self.assertEqual(len(table), 0) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.