Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions docs/how-to-guides/dbt-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

**Current Limitations**:
- Supported data sources: BigQuery, Snowflake, and File-based sources only
- Single entity per model
- Manual entity column specification required

Breaking changes may occur in future releases.
Expand Down Expand Up @@ -185,6 +184,53 @@ driver_features_fv = FeatureView(
```
{% endcode %}

## Multiple Entity Support

The dbt integration supports feature views with multiple entities, useful for modeling relationships involving multiple keys.

### Usage

Specify multiple entity columns using repeated `-e` flags:

```bash
feast dbt import \
-m target/manifest.json \
-e user_id \
-e merchant_id \
--tag feast \
-o features/transactions.py
```

This creates a FeatureView with both `user_id` and `merchant_id` as entities, useful for:
- Transaction features keyed by both user and merchant
- Interaction features keyed by multiple parties
- Association tables in many-to-many relationships

Single entity usage:
```bash
feast dbt import -m target/manifest.json -e driver_id --tag feast
```

### Requirements

All specified entity columns must exist in each dbt model being imported. Models missing any entity column will be skipped with a warning.

### Generated Code

The `--output` flag generates code like:

```python
user_id = Entity(name="user_id", join_keys=["user_id"], ...)
merchant_id = Entity(name="merchant_id", join_keys=["merchant_id"], ...)

transaction_fv = FeatureView(
name="transactions",
entities=[user_id, merchant_id], # Multiple entities
schema=[...],
...
)
```

## CLI Reference

### `feast dbt list`
Expand Down Expand Up @@ -217,7 +263,7 @@ feast dbt import <manifest_path> [OPTIONS]

| Option | Description | Default |
|--------|-------------|---------|
| `--entity-column`, `-e` | Column to use as entity key | (required) |
| `--entity-column`, `-e` | Entity column name (can be specified multiple times) | (required) |
| `--data-source-type`, `-d` | Data source type: `bigquery`, `snowflake`, `file` | `bigquery` |
| `--tag-filter`, `-t` | Filter models by dbt tag | None |
| `--model`, `-m` | Import specific model(s) only | None |
Expand Down
69 changes: 50 additions & 19 deletions sdk/python/feast/cli/dbt_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ def dbt_cmd():
@click.option(
"--entity-column",
"-e",
"entity_columns",
multiple=True,
required=True,
help="Primary key / entity column name (e.g., driver_id, customer_id)",
help="Entity column name (can be specified multiple times, e.g., -e user_id -e merchant_id)",
)
@click.option(
"--data-source-type",
Expand Down Expand Up @@ -89,7 +91,7 @@ def dbt_cmd():
def import_command(
ctx: click.Context,
manifest_path: str,
entity_column: str,
entity_columns: tuple,
data_source_type: str,
timestamp_field: str,
tag_filter: Optional[str],
Expand Down Expand Up @@ -141,6 +143,28 @@ def import_command(
if parser.project_name:
click.echo(f" Project: {parser.project_name}")

# Convert tuple to list and validate
entity_cols: List[str] = list(entity_columns) if entity_columns else []

# Validation: At least one entity required (redundant with required=True but explicit)
if not entity_cols:
click.echo(
f"{Fore.RED}Error: At least one entity column required{Style.RESET_ALL}",
err=True,
)
raise SystemExit(1)

# Validation: No duplicate entity columns
if len(entity_cols) != len(set(entity_cols)):
duplicates = [col for col in entity_cols if entity_cols.count(col) > 1]
click.echo(
f"{Fore.RED}Error: Duplicate entity columns: {', '.join(set(duplicates))}{Style.RESET_ALL}",
err=True,
)
raise SystemExit(1)

click.echo(f"Entity columns: {', '.join(entity_cols)}")

# Get models with filters
model_list: Optional[List[str]] = list(model_names) if model_names else None
models = parser.get_models(model_names=model_list, tag_filter=tag_filter)
Expand Down Expand Up @@ -188,24 +212,31 @@ def import_command(
)
continue

# Validate entity column exists
if entity_column not in column_names:
# Validate ALL entity columns exist
missing_entities = [e for e in entity_cols if e not in column_names]
if missing_entities:
click.echo(
f"{Fore.YELLOW}Warning: Model '{model.name}' missing entity "
f"column '{entity_column}'. Skipping.{Style.RESET_ALL}"
f"column(s): {', '.join(missing_entities)}. Skipping.{Style.RESET_ALL}"
)
continue

# Create or reuse entity
if entity_column not in entities_created:
entity = mapper.create_entity(
name=entity_column,
description="Entity key for dbt models",
)
entities_created[entity_column] = entity
all_objects.append(entity)
else:
entity = entities_created[entity_column]
# Create or reuse entities (one per entity column)
model_entities: List[Any] = []
for entity_col in entity_cols:
if entity_col not in entities_created:
# Use mapper's internal method for value type inference
entity_value_type = mapper._infer_entity_value_type(model, entity_col)
entity = mapper.create_entity(
name=entity_col,
description="Entity key for dbt models",
value_type=entity_value_type,
)
entities_created[entity_col] = entity
all_objects.append(entity)
else:
entity = entities_created[entity_col]
model_entities.append(entity)

# Create data source
data_source = mapper.create_data_source(
Expand All @@ -218,8 +249,8 @@ def import_command(
feature_view = mapper.create_feature_view(
model=model,
source=data_source,
entity_column=entity_column,
entity=entity,
entity_columns=entity_cols,
entities=model_entities,
timestamp_field=timestamp_field,
ttl_days=ttl_days,
exclude_columns=excluded,
Expand All @@ -242,7 +273,7 @@ def import_command(
m
for m in models
if timestamp_field in [c.name for c in m.columns]
and entity_column in [c.name for c in m.columns]
and all(e in [c.name for c in m.columns] for e in entity_cols)
]

# Summary
Expand All @@ -257,7 +288,7 @@ def import_command(

code = generate_feast_code(
models=valid_models,
entity_column=entity_column,
entity_columns=entity_cols,
data_source_type=data_source_type,
timestamp_field=timestamp_field,
ttl_days=ttl_days,
Expand Down
55 changes: 34 additions & 21 deletions sdk/python/feast/dbt/codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""

import logging
from typing import Any, List, Optional, Set
from typing import Any, List, Optional, Set, Union

from jinja2 import BaseLoader, Environment

Expand Down Expand Up @@ -106,7 +106,7 @@
{% for fv in feature_views %}
{{ fv.var_name }} = FeatureView(
name="{{ fv.name }}",
entities=[{{ fv.entity_var }}],
entities=[{{ fv.entity_vars | join(', ') }}],
ttl=timedelta(days={{ fv.ttl_days }}),
schema=[
{% for field in fv.fields %}
Expand Down Expand Up @@ -220,7 +220,7 @@ def __init__(
def generate(
self,
models: List[DbtModel],
entity_column: str,
entity_columns: Union[str, List[str]],
manifest_path: str = "",
project_name: str = "",
exclude_columns: Optional[List[str]] = None,
Expand All @@ -231,7 +231,7 @@ def generate(

Args:
models: List of DbtModel objects to generate code for
entity_column: The entity/primary key column name
entity_columns: Entity column name(s) - single string or list of strings
manifest_path: Path to the dbt manifest (for documentation)
project_name: dbt project name (for documentation)
exclude_columns: Columns to exclude from features
Expand All @@ -240,25 +240,36 @@ def generate(
Returns:
Generated Python code as a string
"""
excluded = {entity_column, self.timestamp_field}
# Normalize entity_columns to list
entity_cols: List[str] = (
[entity_columns] if isinstance(entity_columns, str) else entity_columns
)

if not entity_cols:
raise ValueError("At least one entity column must be specified")

excluded = set(entity_cols) | {self.timestamp_field}
if exclude_columns:
excluded.update(exclude_columns)

# Collect all Feast types used for imports
type_imports: Set[str] = set()

# Prepare entity data
# Prepare entity data - create one entity per entity column
entities = []
entity_var = _make_var_name(entity_column)
entities.append(
{
"var_name": entity_var,
"name": entity_column,
"join_key": entity_column,
"description": "Entity key for dbt models",
"tags": {"source": "dbt"},
}
)
entity_vars = [] # Track variable names for feature views
for entity_col in entity_cols:
entity_var = _make_var_name(entity_col)
entity_vars.append(entity_var)
entities.append(
{
"var_name": entity_var,
"name": entity_col,
"join_key": entity_col,
"description": "Entity key for dbt models",
"tags": {"source": "dbt"},
}
)

# Prepare data sources and feature views
data_sources = []
Expand All @@ -269,7 +280,9 @@ def generate(
column_names = [c.name for c in model.columns]
if self.timestamp_field not in column_names:
continue
if entity_column not in column_names:

# Skip if ANY entity column is missing
if not all(e in column_names for e in entity_cols):
continue

# Build tags
Expand Down Expand Up @@ -339,7 +352,7 @@ def generate(
{
"var_name": fv_var,
"name": model.name,
"entity_var": entity_var,
"entity_vars": entity_vars,
"source_var": source_var,
"ttl_days": self.ttl_days,
"fields": fields,
Expand All @@ -366,7 +379,7 @@ def generate(

def generate_feast_code(
models: List[DbtModel],
entity_column: str,
entity_columns: Union[str, List[str]],
data_source_type: str = "bigquery",
timestamp_field: str = "event_timestamp",
ttl_days: int = 1,
Expand All @@ -380,7 +393,7 @@ def generate_feast_code(

Args:
models: List of DbtModel objects
entity_column: Primary key column name
entity_columns: Entity column name(s) - single string or list of strings
data_source_type: Type of data source (bigquery, snowflake, file)
timestamp_field: Timestamp column name
ttl_days: TTL in days for feature views
Expand All @@ -400,7 +413,7 @@ def generate_feast_code(

return generator.generate(
models=models,
entity_column=entity_column,
entity_columns=entity_columns,
manifest_path=manifest_path,
project_name=project_name,
exclude_columns=exclude_columns,
Expand Down
Loading
Loading