-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Add OnlineStore for MongoDB #6025
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
67d79f7
4d0f066
18b16b9
a066a99
7c17759
c463f4a
a6db5c7
9ed89f0
40e5ea5
67f2c99
35b66f0
a526a0c
b0d57f5
5a94a11
46898b5
0356261
960881d
52948a2
f2e5dff
7f5a192
619fdd3
1f46d8e
de6b103
7e2a80d
8a82e74
f09faa0
0ba503c
6b96c0e
7c3874b
49022e3
a2c493e
dcb9072
bcb63aa
1cb84dc
c02eebf
3eb6107
01d2e4c
156f17b
762d17b
6147c87
1492a1e
0d80772
7563f11
26ad3bb
2f92af9
577f2a7
e761e10
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Signed-off-by: Casey Clements <casey.clements@mongodb.com>
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -139,6 +139,15 @@ def online_read( | |||||||||||||||||||||||||||||||||
| ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| Read features for a batch of entities. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||
| config: Feast repo configuration | ||||||||||||||||||||||||||||||||||
| table: FeatureView to read from | ||||||||||||||||||||||||||||||||||
| entity_keys: List of entity keys to read | ||||||||||||||||||||||||||||||||||
| requested_features: Optional list of specific features to read | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||||||
| List of tuples (event_timestamp, feature_dict) for each entity key | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| clxn = self._get_collection(config) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
@@ -223,7 +232,15 @@ def teardown( | |||||||||||||||||||||||||||||||||
| assert config.online_store.type == "mongodb" | ||||||||||||||||||||||||||||||||||
| clxn = self._get_collection(repo_config=config) | ||||||||||||||||||||||||||||||||||
| clxn.drop() | ||||||||||||||||||||||||||||||||||
| self._get_client(config).close() | ||||||||||||||||||||||||||||||||||
| if self._client: | ||||||||||||||||||||||||||||||||||
| self._client.close() | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| async def close(self) -> None: | ||||||||||||||||||||||||||||||||||
| """Close the async MongoDB client and release its resources.""" | ||||||||||||||||||||||||||||||||||
| if self._client_async is not None: | ||||||||||||||||||||||||||||||||||
| await self._client_async.close() | ||||||||||||||||||||||||||||||||||
| self._client_async = None | ||||||||||||||||||||||||||||||||||
| self._collection_async = None | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+278
to
+283
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Resource leak: The
Suggested change
Was this helpful? React with 👍 or 👎 to provide feedback. |
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| # ------------------------------------------------------------------ | ||||||||||||||||||||||||||||||||||
| # Helpers | ||||||||||||||||||||||||||||||||||
|
|
@@ -288,22 +305,25 @@ def async_supported(self) -> SupportedAsyncMethods: | |||||||||||||||||||||||||||||||||
| return SupportedAsyncMethods(read=True, write=True) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| @staticmethod | ||||||||||||||||||||||||||||||||||
| def convert_raw_docs_to_proto_simply( | ||||||||||||||||||||||||||||||||||
| def convert_raw_docs_to_proto_naive( | ||||||||||||||||||||||||||||||||||
| ids: list[bytes], | ||||||||||||||||||||||||||||||||||
| docs: dict[bytes, Any], | ||||||||||||||||||||||||||||||||||
| table: FeatureView, | ||||||||||||||||||||||||||||||||||
| requested_features: Optional[List[str]] = None, | ||||||||||||||||||||||||||||||||||
| ) -> List[Tuple[Optional[datetime], Optional[dict[str, ValueProto]]]]: | ||||||||||||||||||||||||||||||||||
| """Convert values in documents retrieved from MongoDB (BSON) into ValueProto types. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| The table, a FeatureView, provides a map from feature name to proto type. | ||||||||||||||||||||||||||||||||||
| ids is a sorted list of the serialized entity ids used in MongoDBOnlineStore. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| The heavy lifting is done in feast.type_map.python_values_to_proto_values. | ||||||||||||||||||||||||||||||||||
| It is intended to take a list of proto values with a single type (i.e. a column). | ||||||||||||||||||||||||||||||||||
| However, it is intended to take a list of proto values with a single type (i.e. a column). | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| In this version, we simply iterate over ids, calling this method each time. | ||||||||||||||||||||||||||||||||||
| It is naive, but straightforward. # TODO Remove if transforming is faster. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| In this method, we simply iterate over ids, calling this method each time. | ||||||||||||||||||||||||||||||||||
| It is naive, but straightforward. | ||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||
| ids: sorted list of the serialized entity ids requested. | ||||||||||||||||||||||||||||||||||
| docs: results of collection find. | ||||||||||||||||||||||||||||||||||
| table: The FeatureView of the read, providing the types. | ||||||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||||||
| List of tuples (event_timestamp, feature_dict) for each entity key | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| feature_type_map = { | ||||||||||||||||||||||||||||||||||
| feature.name: feature.dtype.to_value_type() for feature in table.features | ||||||||||||||||||||||||||||||||||
|
|
@@ -336,14 +356,20 @@ def convert_raw_docs_to_proto_transforming( | |||||||||||||||||||||||||||||||||
| ) -> List[Tuple[Optional[datetime], Optional[dict[str, ValueProto]]]]: | ||||||||||||||||||||||||||||||||||
| """Convert values in documents retrieved from MongoDB (BSON) into ValueProto types. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| The table, a FeatureView, provides a map from feature name to proto type. | ||||||||||||||||||||||||||||||||||
| ids is a sorted list of the serialized entity ids used in MongoDBOnlineStore. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| The heavy lifting is done in feast.type_map.python_values_to_proto_values. | ||||||||||||||||||||||||||||||||||
| It is intended to take a list of proto values with a single type (i.e. a column). | ||||||||||||||||||||||||||||||||||
| The issue is that it is column-oriented, expecting a list of proto values with a single type. | ||||||||||||||||||||||||||||||||||
| MongoDB lookups are row-oriented, plus we need to ensure ordering of ids. | ||||||||||||||||||||||||||||||||||
| So we transform twice to minimize calls to the python/proto converter. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| In this method, we simply iterate over ids, calling this method each time. | ||||||||||||||||||||||||||||||||||
| It is naive, but straightforward. | ||||||||||||||||||||||||||||||||||
| Luckily, the table, a FeatureView, provides a map from feature name to proto type | ||||||||||||||||||||||||||||||||||
| so we don't have to infer types for each feature value. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||
| ids: sorted list of the serialized entity ids requested. | ||||||||||||||||||||||||||||||||||
| docs: results of collection find. | ||||||||||||||||||||||||||||||||||
| table: The FeatureView of the read, providing the types. | ||||||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||||||
| List of tuples (event_timestamp, feature_dict) for each entity key | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| feature_type_map = { | ||||||||||||||||||||||||||||||||||
| feature.name: feature.dtype.to_value_type() for feature in table.features | ||||||||||||||||||||||||||||||||||
|
|
@@ -421,10 +447,20 @@ async def online_read_async( | |||||||||||||||||||||||||||||||||
| for entity_key in entity_keys | ||||||||||||||||||||||||||||||||||
| ] | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| # Query MongoDB asynchronously | ||||||||||||||||||||||||||||||||||
| cursor = clxn.find({"_id": {"$in": ids}}) | ||||||||||||||||||||||||||||||||||
| docs_list = await cursor.to_list(length=None) | ||||||||||||||||||||||||||||||||||
| docs = {doc["_id"]: doc for doc in docs_list} | ||||||||||||||||||||||||||||||||||
| query_filter = {"_id": {"$in": ids}} | ||||||||||||||||||||||||||||||||||
| projection = { | ||||||||||||||||||||||||||||||||||
| "_id": 1, | ||||||||||||||||||||||||||||||||||
| f"event_timestamps.{table.name}": 1, | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| if requested_features: | ||||||||||||||||||||||||||||||||||
| projection.update( | ||||||||||||||||||||||||||||||||||
| {f"features.{table.name}.{x}": 1 for x in requested_features} | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||
| projection[f"features.{table.name}"] = 1 | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| cursor = clxn.find(query_filter, projection=projection) | ||||||||||||||||||||||||||||||||||
| docs = {doc["_id"]: doc async for doc in cursor} | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| # Convert to proto format | ||||||||||||||||||||||||||||||||||
| return self.convert_raw_docs_to_proto_transforming(ids, docs, table) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.