Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
67d79f7
Initial commit on INTPYTHON-297-MongoDB-Feast-Integration
caseyclements Jan 20, 2026
4d0f066
Added mongodb to project.optional-dependencies in pyproject.toml. Now…
caseyclements Jan 20, 2026
18b16b9
Checkpoint. Passing tests.unit.online_store.test_online_writes.TestOn…
caseyclements Feb 12, 2026
a066a99
Handle Nan in dfs for test_online_writes.py. Now all tests in the mod…
caseyclements Feb 13, 2026
7c17759
Removed suffix of implementation: mongodb_openai -> mongodb
caseyclements Feb 13, 2026
c463f4a
Moved MongoDBOnlineStore to feast.infra.online_store.contrib
caseyclements Feb 19, 2026
a6db5c7
Formatting
caseyclements Feb 19, 2026
9ed89f0
Refactor online_read that converts bson to proto. Left two methods fo…
caseyclements Feb 19, 2026
40e5ea5
Remove file added early during discovery
caseyclements Feb 19, 2026
67f2c99
Formatting
caseyclements Feb 19, 2026
35b66f0
Added version of test that uses testcontainers mongodb instead of ass…
caseyclements Feb 20, 2026
a526a0c
Create Make target for universal tests
caseyclements Feb 20, 2026
b0d57f5
Cleanup
caseyclements Feb 20, 2026
5a94a11
Removed temporary integration tests requiring one to spin up own mong…
caseyclements Feb 20, 2026
46898b5
Format
caseyclements Feb 20, 2026
0356261
Typing
caseyclements Feb 20, 2026
960881d
Implemented ASync API and Tests
caseyclements Feb 21, 2026
52948a2
Removed offline store stubs. The first PR will only contain the Onlin…
caseyclements Feb 23, 2026
f2e5dff
Moved mongodb_online_store out of cobtrib package.
caseyclements Feb 23, 2026
7f5a192
Add documentation.
caseyclements Feb 24, 2026
619fdd3
Cleanups and docstrings
caseyclements Feb 24, 2026
1f46d8e
Fixed another reference to contrib dir
caseyclements Feb 24, 2026
de6b103
Typos
caseyclements Feb 25, 2026
7e2a80d
Made _convert_raw_docs_to_proto staticmethods private
caseyclements Feb 25, 2026
8a82e74
After benchmarking two alogithm for conevrting read results from bson…
caseyclements Feb 25, 2026
f09faa0
Add extra unit tests
caseyclements Feb 25, 2026
0ba503c
Formatting
caseyclements Feb 25, 2026
6b96c0e
Fixes in pyproject.toml
caseyclements Feb 27, 2026
7c3874b
Fixed Detect secrets false positives.
caseyclements Mar 2, 2026
49022e3
Update sdk/python/feast/infra/online_stores/mongodb_online_store/mong…
caseyclements Mar 2, 2026
a2c493e
Fix CI: guard pymongo imports and skip test module when pymongo unava…
caseyclements Mar 3, 2026
dcb9072
Fix: return (None, None) when entity doc exists but feature view was …
caseyclements Mar 3, 2026
bcb63aa
Update pixi.lock after adding mongodb optional dependency to pyprojec…
caseyclements Mar 3, 2026
1cb84dc
fix: catch FeastExtrasDependencyImportError in doctest runner
caseyclements Mar 4, 2026
c02eebf
fix: update PYTEST_PLUGINS path for mongodb online store tests
caseyclements Mar 4, 2026
3eb6107
fix: broaden import exception handling in doctest runner to catch Typ…
caseyclements Mar 4, 2026
01d2e4c
fix: pass onerror to pkgutil.walk_packages to suppress non-ImportErro…
caseyclements Mar 4, 2026
156f17b
fix: update stale tests.integration.feature_repos imports to tests.un…
caseyclements Mar 4, 2026
762d17b
feat: add mongodb to ValidOnlineStoreDBStorePersistenceTypes in feast…
caseyclements Mar 5, 2026
6147c87
feat: add Feast driver metadata to MongoDB client instantiations
caseyclements Mar 5, 2026
1492a1e
docs: update MongoDB online store status from alpha to preview
caseyclements Mar 5, 2026
0d80772
fix: add mongodb to kubebuilder Enum annotations for OnlineStoreDBSto…
caseyclements Mar 5, 2026
7563f11
Update +GOLANGCI_LINT_VERSION to fix upstream issue golang/go#74462
caseyclements Mar 6, 2026
26ad3bb
fix: raise ValueError in _get_client_async for invalid config type, c…
caseyclements Mar 6, 2026
2f92af9
fix: Added mongodb to operator yamls
ntkathole Mar 9, 2026
577f2a7
Small change suggested by ntkathole
caseyclements Mar 9, 2026
e761e10
Factor out write logic into utility function making sync/async essent…
caseyclements Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Cleanups and docstrings
Signed-off-by: Casey Clements <casey.clements@mongodb.com>
  • Loading branch information
caseyclements committed Mar 10, 2026
commit 619fdd371bbfd080f700169529b64567e730e051
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ def online_read(
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Read features for a batch of entities.

Args:
config: Feast repo configuration
table: FeatureView to read from
entity_keys: List of entity keys to read
requested_features: Optional list of specific features to read

Returns:
List of tuples (event_timestamp, feature_dict) for each entity key
"""
clxn = self._get_collection(config)

Expand Down Expand Up @@ -223,7 +232,15 @@ def teardown(
assert config.online_store.type == "mongodb"
clxn = self._get_collection(repo_config=config)
clxn.drop()
self._get_client(config).close()
if self._client:
self._client.close()
Comment thread
caseyclements marked this conversation as resolved.

async def close(self) -> None:
"""Close the async MongoDB client and release its resources."""
if self._client_async is not None:
await self._client_async.close()
self._client_async = None
self._collection_async = None
Comment on lines +278 to +283
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Resource leak: teardown() never closes async client, close() never closes sync client

The MongoDBOnlineStore maintains two independent clients: _client (sync MongoClient) and _client_async (async AsyncMongoClient). The teardown() method (line 273-276) only closes the sync _client and leaves _client_async open. Conversely, the close() method (line 280-283) only closes _client_async and leaves the sync _client open. If both clients are created during the lifetime of a store instance (e.g., sync writes via online_write_batch followed by async reads via online_read_async, or vice versa), the other client's connection pool will leak when cleanup runs through either path.

Suggested change
async def close(self) -> None:
"""Close the async MongoDB client and release its resources."""
if self._client_async is not None:
await self._client_async.close()
self._client_async = None
self._collection_async = None
async def close(self) -> None:
"""Close both async and sync MongoDB clients and release their resources."""
if self._client_async is not None:
await self._client_async.close()
self._client_async = None
self._collection_async = None
if self._client is not None:
self._client.close()
self._client = None
self._collection = None
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


# ------------------------------------------------------------------
# Helpers
Expand Down Expand Up @@ -288,22 +305,25 @@ def async_supported(self) -> SupportedAsyncMethods:
return SupportedAsyncMethods(read=True, write=True)

@staticmethod
def convert_raw_docs_to_proto_simply(
def convert_raw_docs_to_proto_naive(
ids: list[bytes],
docs: dict[bytes, Any],
table: FeatureView,
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[dict[str, ValueProto]]]]:
"""Convert values in documents retrieved from MongoDB (BSON) into ValueProto types.

The table, a FeatureView, provides a map from feature name to proto type.
ids is a sorted list of the serialized entity ids used in MongoDBOnlineStore.

The heavy lifting is done in feast.type_map.python_values_to_proto_values.
It is intended to take a list of proto values with a single type (i.e. a column).
However, it is intended to take a list of proto values with a single type (i.e. a column).

In this version, we simply iterate over ids, calling this method each time.
It is naive, but straightforward. # TODO Remove if transforming is faster.

In this method, we simply iterate over ids, calling this method each time.
It is naive, but straightforward.
Args:
ids: sorted list of the serialized entity ids requested.
docs: results of collection find.
table: The FeatureView of the read, providing the types.
Returns:
List of tuples (event_timestamp, feature_dict) for each entity key
"""
feature_type_map = {
feature.name: feature.dtype.to_value_type() for feature in table.features
Expand Down Expand Up @@ -336,14 +356,20 @@ def convert_raw_docs_to_proto_transforming(
) -> List[Tuple[Optional[datetime], Optional[dict[str, ValueProto]]]]:
"""Convert values in documents retrieved from MongoDB (BSON) into ValueProto types.

The table, a FeatureView, provides a map from feature name to proto type.
ids is a sorted list of the serialized entity ids used in MongoDBOnlineStore.

The heavy lifting is done in feast.type_map.python_values_to_proto_values.
It is intended to take a list of proto values with a single type (i.e. a column).
The issue is that it is column-oriented, expecting a list of proto values with a single type.
MongoDB lookups are row-oriented, plus we need to ensure ordering of ids.
So we transform twice to minimize calls to the python/proto converter.

In this method, we simply iterate over ids, calling this method each time.
It is naive, but straightforward.
Luckily, the table, a FeatureView, provides a map from feature name to proto type
so we don't have to infer types for each feature value.

Args:
ids: sorted list of the serialized entity ids requested.
docs: results of collection find.
table: The FeatureView of the read, providing the types.
Returns:
List of tuples (event_timestamp, feature_dict) for each entity key
"""
feature_type_map = {
feature.name: feature.dtype.to_value_type() for feature in table.features
Expand Down Expand Up @@ -421,10 +447,20 @@ async def online_read_async(
for entity_key in entity_keys
]

# Query MongoDB asynchronously
cursor = clxn.find({"_id": {"$in": ids}})
docs_list = await cursor.to_list(length=None)
docs = {doc["_id"]: doc for doc in docs_list}
query_filter = {"_id": {"$in": ids}}
projection = {
"_id": 1,
f"event_timestamps.{table.name}": 1,
}
if requested_features:
projection.update(
{f"features.{table.name}.{x}": 1 for x in requested_features}
)
else:
projection[f"features.{table.name}"] = 1

cursor = clxn.find(query_filter, projection=projection)
docs = {doc["_id"]: doc async for doc in cursor}

# Convert to proto format
return self.convert_raw_docs_to_proto_transforming(ids, docs, table)
Expand Down