Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ The list below contains the functionality that contributors are planning to deve
* [x] [Athena (contrib plugin)](https://docs.feast.dev/reference/data-sources/athena)
* [x] [Clickhouse (contrib plugin)](https://docs.feast.dev/reference/data-sources/clickhouse)
* [x] [Oracle (contrib plugin)](https://docs.feast.dev/reference/data-sources/oracle)
* [x] [MongoDB (contrib plugin)](https://docs.feast.dev/reference/data-sources/mongodb)
* [x] [Ray source (contrib plugin)](https://docs.feast.dev/reference/data-sources/ray)
* [x] Kafka / Kinesis sources (via [push support into the online store](https://docs.feast.dev/reference/data-sources/push))
* **Offline Stores**
Expand All @@ -204,6 +205,7 @@ The list below contains the functionality that contributors are planning to deve
* [x] [Clickhouse (contrib plugin)](https://docs.feast.dev/reference/offline-stores/clickhouse)
* [x] [Ray (contrib plugin)](https://docs.feast.dev/reference/offline-stores/ray)
* [x] [Oracle (contrib plugin)](https://docs.feast.dev/reference/offline-stores/oracle)
* [x] [MongoDB (contrib plugin)](https://docs.feast.dev/reference/offline-stores/mongodb)
* [x] [Hybrid](https://docs.feast.dev/reference/offline-stores/hybrid)
* [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/customizing-feast/adding-a-new-offline-store)
* **Online Stores**
Expand Down
2 changes: 2 additions & 0 deletions docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
* [Athena (contrib)](reference/data-sources/athena.md)
* [Clickhouse (contrib)](reference/data-sources/clickhouse.md)
* [Ray (contrib)](reference/data-sources/ray.md)
* [MongoDB (contrib)](reference/data-sources/mongodb.md)
* [Offline stores](reference/offline-stores/README.md)
* [Overview](reference/offline-stores/overview.md)
* [Dask](reference/offline-stores/dask.md)
Expand All @@ -129,6 +130,7 @@
* [Ray (contrib)](reference/offline-stores/ray.md)
* [Oracle (contrib)](reference/offline-stores/oracle.md)
* [Athena (contrib)](reference/offline-stores/athena.md)
* [MongoDB (contrib)](reference/offline-stores/mongodb.md)
* [Remote Offline](reference/offline-stores/remote-offline-store.md)
* [Hybrid](reference/offline-stores/hybrid.md)
* [Online stores](reference/online-stores/README.md)
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/data-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,7 @@ Please see [Data Source](../../getting-started/concepts/data-ingestion.md) for a
{% content-ref url="ray.md" %}
[ray.md](ray.md)
{% endcontent-ref %}

{% content-ref url="mongodb.md" %}
[mongodb.md](mongodb.md)
{% endcontent-ref %}
113 changes: 113 additions & 0 deletions docs/reference/data-sources/mongodb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# MongoDB source (contrib)

## Description

MongoDB data sources are [MongoDB](https://www.mongodb.com/) collections that can be used as a source for feature data. The `MongoDBSource` points at a MongoDB collection and provides the metadata Feast needs to read historical features from the offline store's collection.

## Examples

Defining a MongoDB source:

```python
from feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb import (
MongoDBSource,
)

driver_stats_source = MongoDBSource(
name="driver_stats",
timestamp_field="event_timestamp",
created_timestamp_column="created_at",
)
```

The `name` field becomes the `feature_view` discriminator stored in every document in the `feature_history` collection.

Configuration options such as `connection_string`, `database`, and `collection` are inherited from the offline store configuration in `feature_store.yaml`.

The full set of configuration options is available [here](https://rtd.feast.dev/en/master/#feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb.MongoDBSource).

## Vector Search

The MongoDB online store supports [Atlas Vector Search](https://www.mongodb.com/docs/atlas/atlas-vector-search/), enabling similarity search over feature embeddings stored in MongoDB Atlas. This is powered by the `$vectorSearch` aggregation stage and requires MongoDB Atlas (or the `mongodb/mongodb-atlas-local` Docker image for local development).

See [PR #6344](https://github.com/feast-dev/feast/pull/6344) for full implementation details.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this line be included? It links back to code..


### Configuration

Enable vector search in your `feature_store.yaml`:

```yaml
project: my_project
provider: local
online_store:
type: mongodb
connection_string: mongodb+srv://<user>:<pass>@cluster.mongodb.net
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll see in CI, but this may need # pragma: allowlist secret too.

vector_enabled: true
similarity: cosine # cosine | euclidean | dotProduct
vector_index_wait_timeout: 60 # seconds to wait for index to become queryable
vector_index_wait_poll_interval: 1.0 # seconds between polls
```

### Defining a Feature View with Vector Index

Mark embedding fields with `vector_index=True` and specify `vector_length`:

```python
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Array, Float32, Int64, String
from datetime import timedelta

item_embeddings = FeatureView(
name="item_embeddings",
entities=[Entity(name="item_id", join_keys=["item_id"])],
schema=[
Field(
name="embedding",
dtype=Array(Float32),
vector_index=True,
vector_length=384,
vector_search_metric="cosine",
),
Field(name="title", dtype=String),
Field(name="item_id", dtype=Int64),
],
source=FileSource(path="items.parquet", timestamp_field="event_timestamp"),
ttl=timedelta(hours=24),
)
```

When `feast apply` (or `store.update()`) runs with `vector_enabled=True`, Atlas vector search indexes are automatically created for any field with `vector_index=True`. Indexes are also automatically dropped when feature views are removed.

### Retrieving Documents via Vector Search

Use `retrieve_online_documents_v2()` to perform similarity search:

```python
source = FeatureStore(repo_path=".")
results = store.retrieve_online_documents_v2(
config=repo_config,
table=item_embeddings,
requested_features=["embedding", "title"],
embedding=[0.1, 0.2, ...], # query vector
top_k=5,
)

# Each result is a (event_timestamp, entity_key_proto, feature_dict) tuple.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result of FeatureStore's version is OnlineResponse. I'd just remove these lines.

Given the change, I suggest reviewing the comments below. They are about the implementation in MongoDB, which IS what they want to hear about (how it works), just not specific to the function above.

# feature_dict includes a synthetic "distance" key with the vector search score.
for ts, entity_key, features in results:
print(features["title"].string_val, features["distance"].float_val)
```
```

### How It Works

- **Index creation**: `update()` creates an Atlas vector search index named `<feature_view>__<field>__vs_index` for each vector-indexed field. It waits for the index to reach `READY` status before proceeding.
- **Query execution**: `retrieve_online_documents_v2()` builds a `$vectorSearch` aggregation pipeline with `numCandidates = max(top_k * 10, 100)` and the specified `limit`.
- **Score**: Results include a `distance` field populated from `$meta: "vectorSearchScore"`.
- **BSON compatibility**: Query vectors are coerced to native Python floats to avoid numpy serialization issues.
- **Idempotency**: Calling `update()` multiple times will not duplicate indexes.

## Supported Types

MongoDB data sources support all eight primitive types (`bytes`, `string`, `int32`, `int64`, `float32`, `float64`, `bool`, `timestamp`) and their corresponding array types. Complex types such as `Map` and `Struct` are preserved through the MongoDB document model.
For a comparison against other batch data sources, please see [here](overview.md#functionality-matrix).
4 changes: 4 additions & 0 deletions docs/reference/offline-stores/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ Please see [Offline Store](../../getting-started/components/offline-store.md) fo
[clickhouse.md](clickhouse.md)
{% endcontent-ref %}

{% content-ref url="mongodb.md" %}
[mongodb.md](mongodb.md)
{% endcontent-ref %}

{% content-ref url="remote-offline-store.md" %}
[remote-offline-store.md](remote-offline-store.md)
{% endcontent-ref %}
Expand Down
103 changes: 103 additions & 0 deletions docs/reference/offline-stores/mongodb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# MongoDB offline store (contrib)

## Description

The MongoDB offline store provides support for reading [MongoDBSource](../data-sources/mongodb.md).
* Uses a single shared collection with a compound index for all FeatureViews, distinguished by a `feature_view` discriminator field.
* Entity dataframes can be provided as a Pandas dataframe. The offline store converts entity identifiers into serialized entity keys for efficient lookup against the collection.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip the description.

## Getting started

In order to use this offline store, you'll need to run `pip install 'feast[mongodb]'`.

## Example

{% code title="feature_store.yaml" %}
```yaml
project: my_project
registry: data/registry.db
provider: local
offline_store:
type: feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb.MongoDBOfflineStore
connection_string: "mongodb+srv://user:pass@cluster.mongodb.net" # pragma: allowlist secret
database: feast
collection: feature_history
online_store:
type: mongodb
connection_string: "mongodb+srv://user:pass@cluster.mongodb.net" # pragma: allowlist secret
database_name: feast_online_store
collection_suffix: latest
client_kwargs: {}
```
{% endcode %}

The full set of configuration options is available in [MongoDBOfflineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb.MongoDBOfflineStoreConfig).

## Data Model

The offline store uses a single shared collection (by default `feature_history`) that stores append-only historical feature rows for all feature views. Each document represents one observation of one entity for one FeatureView at a specific event timestamp:

```json
{
"entity_id": "Binary(...)",
"feature_view": "driver_stats",
"event_timestamp": "ISODate(2024-01-15T12:00:00Z)",
"created_at": "ISODate(2024-01-15T12:01:00Z)",
"features": {
"conv_rate": 0.72,
"acc_rate": 0.91,
"avg_daily_trips": 14
}
}
```

Key properties:

* **Append-only**: Historical data is treated as immutable; corrections are written as new rows with newer `created_at` timestamps rather than in-place updates.
* **Time-series friendly**: `event_timestamp` represents when the feature value was observed; `created_at` is used as a tie-breaker when multiple observations share the same event timestamp.
* **Feature grouping by FeatureView**: `feature_view` identifies which FeatureView the row belongs to, so a single collection can host multiple FVs.

A single compound index supports all major query patterns:

```
(entity_id ASC, feature_view ASC, event_timestamp DESC, created_at DESC)
```

This index enables efficient range scans over entities and feature views, while ensuring that the most recent observation per `(entity_id, feature_view)` is seen first during aggregation. The index is created lazily on first use and cached per connection string.

## Key Optimizations

* **Scoring vs. training paths**: When each entity appears only once in `entity_df` (scoring/inference — one feature lookup per entity), server-side `$group $first` efficiently returns the single latest value per entity. When the same entity appears at multiple timestamps (training — building a dataset with many historical snapshots per entity), the store retrieves all candidate rows and uses `pd.merge_asof` to select the correct point-in-time value for each request timestamp.
* **Two-level chunking**: `CHUNK_SIZE` (50,000 rows) controls the size of intermediate DataFrames in memory; `MONGO_BATCH_SIZE` (10,000 entity IDs) limits the query size sent to MongoDB.

## Functionality Matrix

The set of functionality supported by offline stores is described in detail [here](overview.md#functionality).
Below is a matrix indicating which functionality is supported by the MongoDB offline store.

| | MongoDB |
| :----------------------------------------------------------------- | :------ |
| `get_historical_features` (point-in-time correct join) | yes |
| `pull_latest_from_table_or_query` (retrieve latest feature values) | yes |
| `pull_all_from_table_or_query` (retrieve a saved dataset) | yes |
| `offline_write_batch` (persist dataframes to offline store) | yes |
| `write_logged_features` (persist logged features to offline store) | no |

Below is a matrix indicating which functionality is supported by `MongoDBRetrievalJob`.

| | MongoDB |
| ----------------------------------------------------- | ------- |
| export to dataframe | yes |
| export to arrow table | yes |
| export to arrow batches | no |
| export to SQL | no |
| export to data lake (S3, GCS, etc.) | no |
| export to data warehouse | no |
| export as Spark dataframe | no |
| local execution of Python-based on-demand transforms | yes |
| remote execution of Python-based on-demand transforms | no |
| persist results in the offline store | yes |
| preview the query plan before execution | no |
| read partitioned data | no |

To compare this set of functionality against other offline stores, please see the full [functionality matrix](overview.md#functionality-matrix).
2 changes: 2 additions & 0 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The list below contains the functionality that contributors are planning to deve
* [x] [Athena (contrib plugin)](https://docs.feast.dev/reference/data-sources/athena)
* [x] [Clickhouse (contrib plugin)](https://docs.feast.dev/reference/data-sources/clickhouse)
* [x] [Oracle (contrib plugin)](https://docs.feast.dev/reference/data-sources/oracle)
* [x] [MongoDB (contrib plugin)](https://docs.feast.dev/reference/data-sources/mongodb)
* [x] [Ray source (contrib plugin)](https://docs.feast.dev/reference/data-sources/ray)
* [x] Kafka / Kinesis sources (via [push support into the online store](https://docs.feast.dev/reference/data-sources/push))
* **Offline Stores**
Expand All @@ -39,6 +40,7 @@ The list below contains the functionality that contributors are planning to deve
* [x] [Clickhouse (contrib plugin)](https://docs.feast.dev/reference/offline-stores/clickhouse)
* [x] [Ray (contrib plugin)](https://docs.feast.dev/reference/offline-stores/ray)
* [x] [Oracle (contrib plugin)](https://docs.feast.dev/reference/offline-stores/oracle)
* [x] [MongoDB (contrib plugin)](https://docs.feast.dev/reference/offline-stores/mongodb)
* [x] [Hybrid](https://docs.feast.dev/reference/offline-stores/hybrid)
* [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/customizing-feast/adding-a-new-offline-store)
* **Online Stores**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
# MongoDB Offline Store

This offline store lets you train models and run batch scoring directly from it.
All feature views share a single collection (`feature_history`). Reads use
All feature views share a single collection. Reads use
MongoDB aggregation pipelines with a compound index, so per-entity cost is
O(log n_observations) regardless of collection size, and K feature views with the same
entity key collapse into one round-trip instead of K (1 if your data shares a unique id.)
O(log n_observations) regardless of collection size.

## Schema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@

Single-collection schema. Key optimizations:

1. K-collapse: feature views that share the same join key set are batched
into a single ``$match + $sort`` aggregation instead of K separate find
queries. Reduces round-trips from K to |unique join key signatures|.

2. Server-side deduplication (scoring path): when entity_df has unique
1. Server-side deduplication (scoring path): when entity_df has unique
entity IDs the aggregation adds a ``$group`` stage that returns at most
one document per (entity_id, feature_view) pair — O(N×K) transfer
instead of O(N×P×K). The compound index backs the entire pipeline,
Expand Down Expand Up @@ -561,7 +557,7 @@ def get_historical_features(

Training path (repeated entity IDs at different timestamps):
Omits ``$group`` and uses ``merge_asof`` in Python, matching
standard PIT behaviour but still with K-collapsed queries.
standard PIT behaviour.

Args:
strict_pit: When True (default) features whose document timestamp
Expand Down
Loading