Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
67d79f7
Initial commit on INTPYTHON-297-MongoDB-Feast-Integration
caseyclements Jan 20, 2026
4d0f066
Added mongodb to project.optional-dependencies in pyproject.toml. Now…
caseyclements Jan 20, 2026
18b16b9
Checkpoint. Passing tests.unit.online_store.test_online_writes.TestOn…
caseyclements Feb 12, 2026
a066a99
Handle Nan in dfs for test_online_writes.py. Now all tests in the mod…
caseyclements Feb 13, 2026
7c17759
Removed suffix of implementation: mongodb_openai -> mongodb
caseyclements Feb 13, 2026
c463f4a
Moved MongoDBOnlineStore to feast.infra.online_store.contrib
caseyclements Feb 19, 2026
a6db5c7
Formatting
caseyclements Feb 19, 2026
9ed89f0
Refactor online_read that converts bson to proto. Left two methods fo…
caseyclements Feb 19, 2026
40e5ea5
Remove file added early during discovery
caseyclements Feb 19, 2026
67f2c99
Formatting
caseyclements Feb 19, 2026
35b66f0
Added version of test that uses testcontainers mongodb instead of ass…
caseyclements Feb 20, 2026
a526a0c
Create Make target for universal tests
caseyclements Feb 20, 2026
b0d57f5
Cleanup
caseyclements Feb 20, 2026
5a94a11
Removed temporary integration tests requiring one to spin up own mong…
caseyclements Feb 20, 2026
46898b5
Format
caseyclements Feb 20, 2026
0356261
Typing
caseyclements Feb 20, 2026
960881d
Implemented ASync API and Tests
caseyclements Feb 21, 2026
52948a2
Removed offline store stubs. The first PR will only contain the Onlin…
caseyclements Feb 23, 2026
f2e5dff
Moved mongodb_online_store out of cobtrib package.
caseyclements Feb 23, 2026
7f5a192
Add documentation.
caseyclements Feb 24, 2026
619fdd3
Cleanups and docstrings
caseyclements Feb 24, 2026
1f46d8e
Fixed another reference to contrib dir
caseyclements Feb 24, 2026
de6b103
Typos
caseyclements Feb 25, 2026
7e2a80d
Made _convert_raw_docs_to_proto staticmethods private
caseyclements Feb 25, 2026
8a82e74
After benchmarking two alogithm for conevrting read results from bson…
caseyclements Feb 25, 2026
f09faa0
Add extra unit tests
caseyclements Feb 25, 2026
0ba503c
Formatting
caseyclements Feb 25, 2026
6b96c0e
Fixes in pyproject.toml
caseyclements Feb 27, 2026
7c3874b
Fixed Detect secrets false positives.
caseyclements Mar 2, 2026
49022e3
Update sdk/python/feast/infra/online_stores/mongodb_online_store/mong…
caseyclements Mar 2, 2026
a2c493e
Fix CI: guard pymongo imports and skip test module when pymongo unava…
caseyclements Mar 3, 2026
dcb9072
Fix: return (None, None) when entity doc exists but feature view was …
caseyclements Mar 3, 2026
bcb63aa
Update pixi.lock after adding mongodb optional dependency to pyprojec…
caseyclements Mar 3, 2026
1cb84dc
fix: catch FeastExtrasDependencyImportError in doctest runner
caseyclements Mar 4, 2026
c02eebf
fix: update PYTEST_PLUGINS path for mongodb online store tests
caseyclements Mar 4, 2026
3eb6107
fix: broaden import exception handling in doctest runner to catch Typ…
caseyclements Mar 4, 2026
01d2e4c
fix: pass onerror to pkgutil.walk_packages to suppress non-ImportErro…
caseyclements Mar 4, 2026
156f17b
fix: update stale tests.integration.feature_repos imports to tests.un…
caseyclements Mar 4, 2026
762d17b
feat: add mongodb to ValidOnlineStoreDBStorePersistenceTypes in feast…
caseyclements Mar 5, 2026
6147c87
feat: add Feast driver metadata to MongoDB client instantiations
caseyclements Mar 5, 2026
1492a1e
docs: update MongoDB online store status from alpha to preview
caseyclements Mar 5, 2026
0d80772
fix: add mongodb to kubebuilder Enum annotations for OnlineStoreDBSto…
caseyclements Mar 5, 2026
7563f11
Update +GOLANGCI_LINT_VERSION to fix upstream issue golang/go#74462
caseyclements Mar 6, 2026
26ad3bb
fix: raise ValueError in _get_client_async for invalid config type, c…
caseyclements Mar 6, 2026
2f92af9
fix: Added mongodb to operator yamls
ntkathole Mar 9, 2026
577f2a7
Small change suggested by ntkathole
caseyclements Mar 9, 2026
e761e10
Factor out write logic into utility function making sync/async essent…
caseyclements Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Moved MongoDBOnlineStore to feast.infra.online_store.contrib
Signed-off-by: Casey Clements <casey.clements@mongodb.com>
  • Loading branch information
caseyclements committed Mar 10, 2026
commit c463f4a1c7a6865abe6d60ec1a08eca79b41e37d
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .mongodb import MongoDBOnlineStore, MongoDBOnlineStoreConfig

__all__ = ["MongoDBOnlineStore", "MongoDBOnlineStoreConfig"]
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.type_map import python_values_to_proto_values
from feast.type_map import python_values_to_proto_values, feast_value_type_to_python_type


logger = logging.getLogger(__name__)

Expand All @@ -30,16 +31,11 @@ class MongoDBOnlineStoreConfig(FeastConfigBaseModel):
] = "mongodb"
"""Online store type selector"""
connection_string: str = "mongodb://localhost:27017"
database_name: str = "project" # todo - consider changing to project_name?
collection_suffix: str = "features_latest"
database_name: str = "features" # todo - consider removing, and using repo_config.project
collection_suffix: str = "latest"
client_kwargs: Dict[str, Any] = {}


def _store_name(project_name: str, collection_suffix: str) -> str:
"""OnlineStore Collection's full name."""
return f"{project_name}_{collection_suffix}"


class MongoDBOnlineStore(OnlineStore):
"""
MongoDB implementation of Feast OnlineStore.
Expand Down Expand Up @@ -72,10 +68,6 @@ class MongoDBOnlineStore(OnlineStore):
_client: Optional[MongoClient] = None
_collection: Optional[Collection] = None

# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------

def online_write_batch(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

online_write_batch and online_write_batch_async has duplicated logic written in two different styles, we can extract the shared logic into a static helper that both paths call

self,
config: RepoConfig,
Expand All @@ -101,7 +93,7 @@ def online_write_batch(
entity_key, proto_values, event_timestamp, created_timestamp = row
entity_id = serialize_entity_key(entity_key)
feature_updates = {
f"features.{table.name}.{field}": value_proto_to_python(val)
f"features.{table.name}.{field}": feast_value_type_to_python_type(val)
for field, val in proto_values.items()
}
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
update = {
Expand All @@ -123,8 +115,6 @@ def online_write_batch(
if progress:
progress(1)

# ------------------------------------------------------------------

def online_read(
self,
config: RepoConfig,
Expand Down Expand Up @@ -275,7 +265,7 @@ def update(
The Entities are serialized in the _id. No schema needs be adjusted.
"""
if config.online_store.type != "mongodb":
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if config.online_store.type != "mongodb":
if not isinstance(config.online_store, MongoDBOnlineStoreConfig):

Good to be consistent across

raise RuntimeError("config.online_store.type must be mongodb. Found ", config.online_store.type)
raise RuntimeError(f"{config.online_store.type = }. It must be mongodb.")

clxn = self._get_collection(repo_config=config)

Expand All @@ -296,105 +286,48 @@ def update(

def teardown(
Comment thread
ntkathole marked this conversation as resolved.
self,
config: RepoConfig, # TODO - Need to resolve configs (Repo and Store)
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
):
"""
Tear down MongoDB resources (drop collections).
Drop the backing collection and close the client.

Args:
config: Feast repository configuration
tables: Feature views whose collections should be dropped
entities: Entities (unused for MongoDB)
As in update, MongoDB requires very little here.
"""
assert config.online_store.type == "mongodb"
online_config: MongoDBOnlineStoreConfig = config.online_store

clxn = self._get_collection(repo_config=config)
clxn.drop()
self._get_client(config)
client = MongoClient(config.online_store.uri)
client.close()
self._get_client(config).close()


# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------

def _get_client(self, online_config: MongoDBOnlineStoreConfig):
def _get_client(self, config: RepoConfig):
"""Returns a connection to the server."""
online_store_config = config.online_store
if not isinstance(online_store_config, MongoDBOnlineStoreConfig):
raise ValueError(f"config.online_store should be MongoDBOnlineStoreConfig, got {online_store_config}")
if self._client is None:
assert isinstance(online_config, MongoDBOnlineStoreConfig)
online_config = config.online_store
if not isinstance(online_config, MongoDBOnlineStoreConfig):
logger.warning(f"config.online_store passed to _get_client is not a MongoDBOnlineStoreConfig. It's of type {type(online_config)}")
self._client = MongoClient(online_config.connection_string, **online_config.client_kwargs)
return self._client

def _get_collection(self, repo_config: RepoConfig) -> Collection:
"""Returns a connection to the online store collection."""
if self._collection is None:
self._client = self._get_client(repo_config)
online_config = repo_config.online_store
self._client = self._get_client(online_config)
db = self._client[online_config.database_name]
clxn_name = f"{repo_config.project}_{online_config.collection_suffix}"
if clxn_name not in db.list_collection_names():
self._collection = db.create_collection(clxn_name)
self._collection = db[clxn_name]
return self._collection

def value_proto_to_python(val: ValueProto):
"""Utility to convert Value proto to plain form saved in MongoDB."""
try: # hasattr(val, "val"):
typ = val.WhichOneof("val")
if typ is None:
return None
val = getattr(val, typ)
if isinstance(val, datetime):
val = val.replace(tzinfo=datetime.UTC)
return val
except:
raise ValueError(f"Unsupported ValueProto: {val}")


def value_proto_to_python_deprecated(val: ValueProto):
"""Utility to convert Value proto to plain form saved in MongoDB."""
# TODO
# - Check timestamp implementation

val = val.WhichOneof("val")
if val == "int32_val":
return val.int32_val
if val == "float_int64_val":
return val.int32_list_val
if val == "int64_val":
return val.int64_val
if val == "int64_list_val":
return val.int64_list_val
if val == "float_val":
return val.float_val
if val == "float_list_val":
return val.float_list_val
if val == "double_val":
return val.double_val
if val == "double_list_val":
return val.double_list_val
if val == "bool_val":
return val.bool_val
if val == "bool_list_val":
return val.bool_list_val
if val == "string_val":
return val.string_val
if val == "bytes_val":
return val.bytes_val
if val == "timestamp_val":
return datetime.fromtimestamp(
val.timestamp_val.seconds + val.timestamp_val.nanos / 1e9,
tz=datetime.timezone.utc,
)
if val == "null_val":
return None

raise ValueError(f"Unsupported ValueProto val: {val}")



# TODO
# - Implement async API
Loading