Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
67d79f7
Initial commit on INTPYTHON-297-MongoDB-Feast-Integration
caseyclements Jan 20, 2026
4d0f066
Added mongodb to project.optional-dependencies in pyproject.toml. Now…
caseyclements Jan 20, 2026
18b16b9
Checkpoint. Passing tests.unit.online_store.test_online_writes.TestOn…
caseyclements Feb 12, 2026
a066a99
Handle Nan in dfs for test_online_writes.py. Now all tests in the mod…
caseyclements Feb 13, 2026
7c17759
Removed suffix of implementation: mongodb_openai -> mongodb
caseyclements Feb 13, 2026
c463f4a
Moved MongoDBOnlineStore to feast.infra.online_store.contrib
caseyclements Feb 19, 2026
a6db5c7
Formatting
caseyclements Feb 19, 2026
9ed89f0
Refactor online_read that converts bson to proto. Left two methods fo…
caseyclements Feb 19, 2026
40e5ea5
Remove file added early during discovery
caseyclements Feb 19, 2026
67f2c99
Formatting
caseyclements Feb 19, 2026
35b66f0
Added version of test that uses testcontainers mongodb instead of ass…
caseyclements Feb 20, 2026
a526a0c
Create Make target for universal tests
caseyclements Feb 20, 2026
b0d57f5
Cleanup
caseyclements Feb 20, 2026
5a94a11
Removed temporary integration tests requiring one to spin up own mong…
caseyclements Feb 20, 2026
46898b5
Format
caseyclements Feb 20, 2026
0356261
Typing
caseyclements Feb 20, 2026
960881d
Implemented ASync API and Tests
caseyclements Feb 21, 2026
52948a2
Removed offline store stubs. The first PR will only contain the Onlin…
caseyclements Feb 23, 2026
f2e5dff
Moved mongodb_online_store out of cobtrib package.
caseyclements Feb 23, 2026
7f5a192
Add documentation.
caseyclements Feb 24, 2026
619fdd3
Cleanups and docstrings
caseyclements Feb 24, 2026
1f46d8e
Fixed another reference to contrib dir
caseyclements Feb 24, 2026
de6b103
Typos
caseyclements Feb 25, 2026
7e2a80d
Made _convert_raw_docs_to_proto staticmethods private
caseyclements Feb 25, 2026
8a82e74
After benchmarking two alogithm for conevrting read results from bson…
caseyclements Feb 25, 2026
f09faa0
Add extra unit tests
caseyclements Feb 25, 2026
0ba503c
Formatting
caseyclements Feb 25, 2026
6b96c0e
Fixes in pyproject.toml
caseyclements Feb 27, 2026
7c3874b
Fixed Detect secrets false positives.
caseyclements Mar 2, 2026
49022e3
Update sdk/python/feast/infra/online_stores/mongodb_online_store/mong…
caseyclements Mar 2, 2026
a2c493e
Fix CI: guard pymongo imports and skip test module when pymongo unava…
caseyclements Mar 3, 2026
dcb9072
Fix: return (None, None) when entity doc exists but feature view was …
caseyclements Mar 3, 2026
bcb63aa
Update pixi.lock after adding mongodb optional dependency to pyprojec…
caseyclements Mar 3, 2026
1cb84dc
fix: catch FeastExtrasDependencyImportError in doctest runner
caseyclements Mar 4, 2026
c02eebf
fix: update PYTEST_PLUGINS path for mongodb online store tests
caseyclements Mar 4, 2026
3eb6107
fix: broaden import exception handling in doctest runner to catch Typ…
caseyclements Mar 4, 2026
01d2e4c
fix: pass onerror to pkgutil.walk_packages to suppress non-ImportErro…
caseyclements Mar 4, 2026
156f17b
fix: update stale tests.integration.feature_repos imports to tests.un…
caseyclements Mar 4, 2026
762d17b
feat: add mongodb to ValidOnlineStoreDBStorePersistenceTypes in feast…
caseyclements Mar 5, 2026
6147c87
feat: add Feast driver metadata to MongoDB client instantiations
caseyclements Mar 5, 2026
1492a1e
docs: update MongoDB online store status from alpha to preview
caseyclements Mar 5, 2026
0d80772
fix: add mongodb to kubebuilder Enum annotations for OnlineStoreDBSto…
caseyclements Mar 5, 2026
7563f11
Update +GOLANGCI_LINT_VERSION to fix upstream issue golang/go#74462
caseyclements Mar 6, 2026
26ad3bb
fix: raise ValueError in _get_client_async for invalid config type, c…
caseyclements Mar 6, 2026
2f92af9
fix: Added mongodb to operator yamls
ntkathole Mar 9, 2026
577f2a7
Small change suggested by ntkathole
caseyclements Mar 9, 2026
e761e10
Factor out write logic into utility function making sync/async essent…
caseyclements Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implemented ASync API and Tests
Signed-off-by: Casey Clements <casey.clements@mongodb.com>
  • Loading branch information
caseyclements committed Mar 10, 2026
commit 960881d7fad5da1f1f2c6f673a308a1f6995188a
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from logging import getLogger
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple

from pymongo import MongoClient, UpdateOne
from pymongo import MongoClient, AsyncMongoClient, UpdateOne
from pymongo.asynchronous.collection import AsyncCollection
from pymongo.collection import Collection

from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.supported_async_methods import SupportedAsyncMethods
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
Expand Down Expand Up @@ -73,6 +75,8 @@ class MongoDBOnlineStore(OnlineStore):

_client: Optional[MongoClient] = None
_collection: Optional[Collection] = None
_client_async: Optional[AsyncMongoClient] = None
_collection_async: Optional[AsyncCollection] = None

def online_write_batch(
self,
Expand Down Expand Up @@ -254,6 +258,35 @@ def _get_collection(self, repo_config: RepoConfig) -> Collection:
self._collection = db[clxn_name]
return self._collection

async def _get_client_async(self, config: RepoConfig) -> AsyncMongoClient:
"""Returns an async MongoDB client."""
if self._client_async is None:
online_config = config.online_store
if not isinstance(online_config, MongoDBOnlineStoreConfig):
logger.warning(
f"config.online_store passed to _get_client_async is not a MongoDBOnlineStoreConfig. It's of type {type(online_config)}"
)
self._client_async = AsyncMongoClient(
online_config.connection_string, **online_config.client_kwargs
)
return self._client_async

async def _get_collection_async(self, repo_config: RepoConfig) -> AsyncCollection:
"""Returns an async connection to the online store collection."""
if self._collection_async is None:
self._client_async = await self._get_client_async(repo_config)
assert self._client_async is not None
online_config = repo_config.online_store
db = self._client_async[online_config.database_name]
clxn_name = f"{repo_config.project}_{online_config.collection_suffix}"
self._collection_async = db[clxn_name]
return self._collection_async

@property
def async_supported(self) -> SupportedAsyncMethods:
"""Indicates that this online store supports async operations."""
return SupportedAsyncMethods(read=True, write=True)

@staticmethod
def convert_raw_docs_to_proto_simply(
ids: list[bytes],
Expand Down Expand Up @@ -358,7 +391,96 @@ def convert_raw_docs_to_proto_transforming(
results.append((ts, row_features))
return results

async def online_read_async(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Asynchronously reads feature values from the online store.

Args:
config: Feast repo configuration
table: FeatureView to read from
entity_keys: List of entity keys to read
requested_features: Optional list of specific features to read

Returns:
List of tuples (event_timestamp, feature_dict) for each entity key
"""
clxn = await self._get_collection_async(config)

# Serialize entity keys
ids = [
serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
for entity_key in entity_keys
]

# Query MongoDB asynchronously
cursor = clxn.find({"_id": {"$in": ids}})
docs_list = await cursor.to_list(length=None)
docs = {doc["_id"]: doc for doc in docs_list}

# Convert to proto format
return self.convert_raw_docs_to_proto_transforming(ids, docs, table)

async def online_write_batch_async(
self,
config: RepoConfig,
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]] = None,
) -> None:
"""
Asynchronously writes a batch of feature values to the online store.

Args:
config: Feast repo configuration
table: FeatureView to write to
data: List of tuples (entity_key, features, event_ts, created_ts)
progress: Optional progress callback
"""
clxn = await self._get_collection_async(config)
ops = []
for row in data:
entity_key_proto, features, event_ts, created_ts = row
entity_id = serialize_entity_key(
entity_key_proto,
entity_key_serialization_version=config.entity_key_serialization_version,
)

# Convert ValueProto to native Python types
feature_dict = {}
for feature_name, value_proto in features.items():
feature_dict[feature_name] = feast_value_type_to_python_type(value_proto)

# Build update operation
update_doc = {
"$set": {
f"features.{table.name}.{feature_name}": value
for feature_name, value in feature_dict.items()
},
}
update_doc["$set"][f"event_timestamps.{table.name}"] = event_ts
if created_ts:
update_doc["$set"]["created_timestamp"] = created_ts

ops.append(UpdateOne({"_id": entity_id}, update_doc, upsert=True))

# Execute bulk write asynchronously
if ops:
await clxn.bulk_write(ops, ordered=False)

if progress:
progress(len(data))


# TODO
# - Implement async API
# - Vector Search
# - Vector Search (requires atlas image in testcontainers or similar)
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_push_features_and_read(store):


@pytest.mark.integration
@pytest.mark.universal_online_stores(only=["dynamodb"])
@pytest.mark.universal_online_stores(only=["dynamodb", "mongodb"])
async def test_push_features_and_read_async(store):
await store.push_async("location_stats_push_source", _ingest_df())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ async def _do_async_retrieval_test(environment, universal_data_sources):

@pytest.mark.asyncio
@pytest.mark.integration
@pytest.mark.universal_online_stores(only=["redis", "postgres"])
@pytest.mark.universal_online_stores(only=["redis", "postgres", "mongodb"])
async def test_async_online_retrieval_with_event_timestamps(
environment, universal_data_sources
):
Expand Down