-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Add OnlineStore for MongoDB #6025
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
ntkathole
merged 47 commits into
feast-dev:master
from
caseyclements:INTPYTHON-297-MongoDB-Feast-Integration
Mar 10, 2026
Merged
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit
Hold shift + click to select a range
67d79f7
Initial commit on INTPYTHON-297-MongoDB-Feast-Integration
caseyclements 4d0f066
Added mongodb to project.optional-dependencies in pyproject.toml. Now…
caseyclements 18b16b9
Checkpoint. Passing tests.unit.online_store.test_online_writes.TestOn…
caseyclements a066a99
Handle Nan in dfs for test_online_writes.py. Now all tests in the mod…
caseyclements 7c17759
Removed suffix of implementation: mongodb_openai -> mongodb
caseyclements c463f4a
Moved MongoDBOnlineStore to feast.infra.online_store.contrib
caseyclements a6db5c7
Formatting
caseyclements 9ed89f0
Refactor online_read that converts bson to proto. Left two methods fo…
caseyclements 40e5ea5
Remove file added early during discovery
caseyclements 67f2c99
Formatting
caseyclements 35b66f0
Added version of test that uses testcontainers mongodb instead of ass…
caseyclements a526a0c
Create Make target for universal tests
caseyclements b0d57f5
Cleanup
caseyclements 5a94a11
Removed temporary integration tests requiring one to spin up own mong…
caseyclements 46898b5
Format
caseyclements 0356261
Typing
caseyclements 960881d
Implemented ASync API and Tests
caseyclements 52948a2
Removed offline store stubs. The first PR will only contain the Onlin…
caseyclements f2e5dff
Moved mongodb_online_store out of cobtrib package.
caseyclements 7f5a192
Add documentation.
caseyclements 619fdd3
Cleanups and docstrings
caseyclements 1f46d8e
Fixed another reference to contrib dir
caseyclements de6b103
Typos
caseyclements 7e2a80d
Made _convert_raw_docs_to_proto staticmethods private
caseyclements 8a82e74
After benchmarking two alogithm for conevrting read results from bson…
caseyclements f09faa0
Add extra unit tests
caseyclements 0ba503c
Formatting
caseyclements 6b96c0e
Fixes in pyproject.toml
caseyclements 7c3874b
Fixed Detect secrets false positives.
caseyclements 49022e3
Update sdk/python/feast/infra/online_stores/mongodb_online_store/mong…
caseyclements a2c493e
Fix CI: guard pymongo imports and skip test module when pymongo unava…
caseyclements dcb9072
Fix: return (None, None) when entity doc exists but feature view was …
caseyclements bcb63aa
Update pixi.lock after adding mongodb optional dependency to pyprojec…
caseyclements 1cb84dc
fix: catch FeastExtrasDependencyImportError in doctest runner
caseyclements c02eebf
fix: update PYTEST_PLUGINS path for mongodb online store tests
caseyclements 3eb6107
fix: broaden import exception handling in doctest runner to catch Typ…
caseyclements 01d2e4c
fix: pass onerror to pkgutil.walk_packages to suppress non-ImportErro…
caseyclements 156f17b
fix: update stale tests.integration.feature_repos imports to tests.un…
caseyclements 762d17b
feat: add mongodb to ValidOnlineStoreDBStorePersistenceTypes in feast…
caseyclements 6147c87
feat: add Feast driver metadata to MongoDB client instantiations
caseyclements 1492a1e
docs: update MongoDB online store status from alpha to preview
caseyclements 0d80772
fix: add mongodb to kubebuilder Enum annotations for OnlineStoreDBSto…
caseyclements 7563f11
Update +GOLANGCI_LINT_VERSION to fix upstream issue golang/go#74462
caseyclements 26ad3bb
fix: raise ValueError in _get_client_async for invalid config type, c…
caseyclements 2f92af9
fix: Added mongodb to operator yamls
ntkathole 577f2a7
Small change suggested by ntkathole
caseyclements e761e10
Factor out write logic into utility function making sync/async essent…
caseyclements File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Moved MongoDBOnlineStore to feast.infra.online_store.contrib
Signed-off-by: Casey Clements <casey.clements@mongodb.com>
- Loading branch information
commit c463f4a1c7a6865abe6d60ec1a08eca79b41e37d
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
Empty file.
3 changes: 3 additions & 0 deletions
3
sdk/python/feast/infra/online_stores/contrib/mongodb_online_store/__init__.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| from .mongodb import MongoDBOnlineStore, MongoDBOnlineStoreConfig | ||
|
|
||
| __all__ = ["MongoDBOnlineStore", "MongoDBOnlineStoreConfig"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -14,7 +14,8 @@ | |||||
| from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto | ||||||
| from feast.protos.feast.types.Value_pb2 import Value as ValueProto | ||||||
| from feast.repo_config import FeastConfigBaseModel, RepoConfig | ||||||
| from feast.type_map import python_values_to_proto_values | ||||||
| from feast.type_map import python_values_to_proto_values, feast_value_type_to_python_type | ||||||
|
|
||||||
|
|
||||||
| logger = logging.getLogger(__name__) | ||||||
|
|
||||||
|
|
@@ -30,16 +31,11 @@ class MongoDBOnlineStoreConfig(FeastConfigBaseModel): | |||||
| ] = "mongodb" | ||||||
| """Online store type selector""" | ||||||
| connection_string: str = "mongodb://localhost:27017" | ||||||
| database_name: str = "project" # todo - consider changing to project_name? | ||||||
| collection_suffix: str = "features_latest" | ||||||
| database_name: str = "features" # todo - consider removing, and using repo_config.project | ||||||
| collection_suffix: str = "latest" | ||||||
| client_kwargs: Dict[str, Any] = {} | ||||||
|
|
||||||
|
|
||||||
| def _store_name(project_name: str, collection_suffix: str) -> str: | ||||||
| """OnlineStore Collection's full name.""" | ||||||
| return f"{project_name}_{collection_suffix}" | ||||||
|
|
||||||
|
|
||||||
| class MongoDBOnlineStore(OnlineStore): | ||||||
| """ | ||||||
| MongoDB implementation of Feast OnlineStore. | ||||||
|
|
@@ -72,10 +68,6 @@ class MongoDBOnlineStore(OnlineStore): | |||||
| _client: Optional[MongoClient] = None | ||||||
| _collection: Optional[Collection] = None | ||||||
|
|
||||||
| # ------------------------------------------------------------------ | ||||||
| # Lifecycle | ||||||
| # ------------------------------------------------------------------ | ||||||
|
|
||||||
| def online_write_batch( | ||||||
| self, | ||||||
| config: RepoConfig, | ||||||
|
|
@@ -101,7 +93,7 @@ def online_write_batch( | |||||
| entity_key, proto_values, event_timestamp, created_timestamp = row | ||||||
| entity_id = serialize_entity_key(entity_key) | ||||||
| feature_updates = { | ||||||
| f"features.{table.name}.{field}": value_proto_to_python(val) | ||||||
| f"features.{table.name}.{field}": feast_value_type_to_python_type(val) | ||||||
| for field, val in proto_values.items() | ||||||
| } | ||||||
|
devin-ai-integration[bot] marked this conversation as resolved.
|
||||||
| update = { | ||||||
|
|
@@ -123,8 +115,6 @@ def online_write_batch( | |||||
| if progress: | ||||||
| progress(1) | ||||||
|
|
||||||
| # ------------------------------------------------------------------ | ||||||
|
|
||||||
| def online_read( | ||||||
| self, | ||||||
| config: RepoConfig, | ||||||
|
|
@@ -275,7 +265,7 @@ def update( | |||||
| The Entities are serialized in the _id. No schema needs be adjusted. | ||||||
| """ | ||||||
| if config.online_store.type != "mongodb": | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Good to be consistent across |
||||||
| raise RuntimeError("config.online_store.type must be mongodb. Found ", config.online_store.type) | ||||||
| raise RuntimeError(f"{config.online_store.type = }. It must be mongodb.") | ||||||
|
|
||||||
| clxn = self._get_collection(repo_config=config) | ||||||
|
|
||||||
|
|
@@ -296,105 +286,48 @@ def update( | |||||
|
|
||||||
| def teardown( | ||||||
|
ntkathole marked this conversation as resolved.
|
||||||
| self, | ||||||
| config: RepoConfig, # TODO - Need to resolve configs (Repo and Store) | ||||||
| config: RepoConfig, | ||||||
| tables: Sequence[FeatureView], | ||||||
| entities: Sequence[Entity], | ||||||
| ): | ||||||
| """ | ||||||
| Tear down MongoDB resources (drop collections). | ||||||
| Drop the backing collection and close the client. | ||||||
|
|
||||||
| Args: | ||||||
| config: Feast repository configuration | ||||||
| tables: Feature views whose collections should be dropped | ||||||
| entities: Entities (unused for MongoDB) | ||||||
| As in update, MongoDB requires very little here. | ||||||
| """ | ||||||
| assert config.online_store.type == "mongodb" | ||||||
| online_config: MongoDBOnlineStoreConfig = config.online_store | ||||||
|
|
||||||
| clxn = self._get_collection(repo_config=config) | ||||||
| clxn.drop() | ||||||
| self._get_client(config) | ||||||
| client = MongoClient(config.online_store.uri) | ||||||
| client.close() | ||||||
| self._get_client(config).close() | ||||||
|
|
||||||
|
|
||||||
| # ------------------------------------------------------------------ | ||||||
| # Helpers | ||||||
| # ------------------------------------------------------------------ | ||||||
|
|
||||||
| def _get_client(self, online_config: MongoDBOnlineStoreConfig): | ||||||
| def _get_client(self, config: RepoConfig): | ||||||
| """Returns a connection to the server.""" | ||||||
| online_store_config = config.online_store | ||||||
| if not isinstance(online_store_config, MongoDBOnlineStoreConfig): | ||||||
| raise ValueError(f"config.online_store should be MongoDBOnlineStoreConfig, got {online_store_config}") | ||||||
| if self._client is None: | ||||||
| assert isinstance(online_config, MongoDBOnlineStoreConfig) | ||||||
| online_config = config.online_store | ||||||
| if not isinstance(online_config, MongoDBOnlineStoreConfig): | ||||||
| logger.warning(f"config.online_store passed to _get_client is not a MongoDBOnlineStoreConfig. It's of type {type(online_config)}") | ||||||
| self._client = MongoClient(online_config.connection_string, **online_config.client_kwargs) | ||||||
| return self._client | ||||||
|
|
||||||
| def _get_collection(self, repo_config: RepoConfig) -> Collection: | ||||||
| """Returns a connection to the online store collection.""" | ||||||
| if self._collection is None: | ||||||
| self._client = self._get_client(repo_config) | ||||||
| online_config = repo_config.online_store | ||||||
| self._client = self._get_client(online_config) | ||||||
| db = self._client[online_config.database_name] | ||||||
| clxn_name = f"{repo_config.project}_{online_config.collection_suffix}" | ||||||
| if clxn_name not in db.list_collection_names(): | ||||||
| self._collection = db.create_collection(clxn_name) | ||||||
| self._collection = db[clxn_name] | ||||||
| return self._collection | ||||||
|
|
||||||
| def value_proto_to_python(val: ValueProto): | ||||||
| """Utility to convert Value proto to plain form saved in MongoDB.""" | ||||||
| try: # hasattr(val, "val"): | ||||||
| typ = val.WhichOneof("val") | ||||||
| if typ is None: | ||||||
| return None | ||||||
| val = getattr(val, typ) | ||||||
| if isinstance(val, datetime): | ||||||
| val = val.replace(tzinfo=datetime.UTC) | ||||||
| return val | ||||||
| except: | ||||||
| raise ValueError(f"Unsupported ValueProto: {val}") | ||||||
|
|
||||||
|
|
||||||
| def value_proto_to_python_deprecated(val: ValueProto): | ||||||
| """Utility to convert Value proto to plain form saved in MongoDB.""" | ||||||
| # TODO | ||||||
| # - Check timestamp implementation | ||||||
|
|
||||||
| val = val.WhichOneof("val") | ||||||
| if val == "int32_val": | ||||||
| return val.int32_val | ||||||
| if val == "float_int64_val": | ||||||
| return val.int32_list_val | ||||||
| if val == "int64_val": | ||||||
| return val.int64_val | ||||||
| if val == "int64_list_val": | ||||||
| return val.int64_list_val | ||||||
| if val == "float_val": | ||||||
| return val.float_val | ||||||
| if val == "float_list_val": | ||||||
| return val.float_list_val | ||||||
| if val == "double_val": | ||||||
| return val.double_val | ||||||
| if val == "double_list_val": | ||||||
| return val.double_list_val | ||||||
| if val == "bool_val": | ||||||
| return val.bool_val | ||||||
| if val == "bool_list_val": | ||||||
| return val.bool_list_val | ||||||
| if val == "string_val": | ||||||
| return val.string_val | ||||||
| if val == "bytes_val": | ||||||
| return val.bytes_val | ||||||
| if val == "timestamp_val": | ||||||
| return datetime.fromtimestamp( | ||||||
| val.timestamp_val.seconds + val.timestamp_val.nanos / 1e9, | ||||||
| tz=datetime.timezone.utc, | ||||||
| ) | ||||||
| if val == "null_val": | ||||||
| return None | ||||||
|
|
||||||
| raise ValueError(f"Unsupported ValueProto val: {val}") | ||||||
|
|
||||||
|
|
||||||
|
|
||||||
| # TODO | ||||||
| # - Implement async API | ||||||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
online_write_batch and online_write_batch_async has duplicated logic written in two different styles, we can extract the shared logic into a static helper that both paths call