""" Code generator for dbt to Feast imports. This module generates Python code files containing Feast object definitions (Entity, DataSource, FeatureView) from dbt model metadata. """ import logging from typing import Any, List, Optional, Set, Union from jinja2 import BaseLoader, Environment from feast.dbt.mapper import map_dbt_type_to_feast_type from feast.dbt.parser import DbtModel from feast.types import ( Array, Bool, Bytes, Float32, Float64, Int32, Int64, String, UnixTimestamp, ) logger = logging.getLogger(__name__) # Template for generating a complete Feast definitions file FEAST_FILE_TEMPLATE = '''""" Feast feature definitions generated from dbt models. Source: {{ manifest_path }} Project: {{ project_name }} Generated by: feast dbt import """ from datetime import timedelta from feast import Entity, FeatureView, Field {% if type_imports %} from feast.types import {{ type_imports | join(', ') }} {% endif %} {% if data_source_type == 'bigquery' %} from feast.infra.offline_stores.bigquery_source import BigQuerySource {% elif data_source_type == 'snowflake' %} from feast.infra.offline_stores.snowflake_source import SnowflakeSource {% elif data_source_type == 'file' %} from feast.infra.offline_stores.file_source import FileSource {% endif %} # ============================================================================= # Entities # ============================================================================= {% for entity in entities %} {{ entity.var_name }} = Entity( name="{{ entity.name }}", join_keys=["{{ entity.join_key }}"], description="{{ entity.description }}", tags={{ entity.tags }}, ) {% endfor %} # ============================================================================= # Data Sources # ============================================================================= {% for source in data_sources %} {% if data_source_type == 'bigquery' %} {{ source.var_name }} = BigQuerySource( name="{{ source.name }}", table="{{ source.table }}", timestamp_field="{{ source.timestamp_field }}", description="{{ source.description }}", tags={{ source.tags }}, ) {% elif data_source_type == 'snowflake' %} {{ source.var_name }} = SnowflakeSource( name="{{ source.name }}", database="{{ source.database }}", schema="{{ source.schema }}", table="{{ source.table }}", timestamp_field="{{ source.timestamp_field }}", description="{{ source.description }}", tags={{ source.tags }}, ) {% elif data_source_type == 'file' %} {{ source.var_name }} = FileSource( name="{{ source.name }}", path="{{ source.path }}", timestamp_field="{{ source.timestamp_field }}", description="{{ source.description }}", tags={{ source.tags }}, ) {% endif %} {% endfor %} # ============================================================================= # Feature Views # ============================================================================= {% for fv in feature_views %} {{ fv.var_name }} = FeatureView( name="{{ fv.name }}", entities=[{{ fv.entity_vars | join(', ') }}], ttl=timedelta(days={{ fv.ttl_days }}), schema=[ {% for field in fv.fields %} Field(name="{{ field.name }}", dtype={{ field.dtype }}{% if field.description %}, description="{{ field.description }}"{% endif %}), {% endfor %} ], online={{ fv.online }}, source={{ fv.source_var }}, description="{{ fv.description }}", tags={{ fv.tags }}, ) {% endfor %} ''' def _get_feast_type_name(feast_type: Any) -> str: """Get the string name of a Feast type for code generation.""" if isinstance(feast_type, Array): # Safely get base_type. Should always exist since Array.__init__ sets it. # Example: Array(String) -> base_type = String base_type = getattr(feast_type, "base_type", None) if base_type is None: logger.warning( "Array type missing 'base_type' attribute. " "This indicates a bug in Array initialization. Falling back to String." ) base_type = String base_type_name = _get_feast_type_name(base_type) return f"Array({base_type_name})" # Map type objects to their names. # Note: ImageBytes and PdfBytes are excluded since dbt manifests only expose # generic BYTES type without semantic information about binary content. type_map = { String: "String", Int32: "Int32", Int64: "Int64", Float32: "Float32", Float64: "Float64", Bool: "Bool", UnixTimestamp: "UnixTimestamp", Bytes: "Bytes", } return type_map.get(feast_type, "String") def _make_var_name(name: str) -> str: """Convert a name to a valid Python variable name.""" # Replace hyphens and spaces with underscores var_name = name.replace("-", "_").replace(" ", "_") # Ensure it starts with a letter or underscore if var_name and var_name[0].isdigit(): var_name = f"_{var_name}" return var_name def _escape_description(desc: Optional[str]) -> str: """Escape a description string for use in Python code.""" if not desc: return "" # Escape quotes and newlines return desc.replace("\\", "\\\\").replace('"', '\\"').replace("\n", " ") class DbtCodeGenerator: """ Generates Python code for Feast objects from dbt models. This class creates complete, importable Python files containing Entity, DataSource, and FeatureView definitions. Example:: generator = DbtCodeGenerator( data_source_type="bigquery", timestamp_field="event_timestamp", ttl_days=7 ) code = generator.generate( models=models, entity_column="user_id", manifest_path="target/manifest.json", project_name="my_project" ) with open("features.py", "w") as f: f.write(code) """ def __init__( self, data_source_type: str = "bigquery", timestamp_field: str = "event_timestamp", ttl_days: int = 1, ): self.data_source_type = data_source_type.lower() self.timestamp_field = timestamp_field self.ttl_days = ttl_days # Set up Jinja2 environment self.env = Environment( loader=BaseLoader(), trim_blocks=True, lstrip_blocks=True, ) self.template = self.env.from_string(FEAST_FILE_TEMPLATE) def generate( self, models: List[DbtModel], entity_columns: Union[str, List[str]], manifest_path: str = "", project_name: str = "", exclude_columns: Optional[List[str]] = None, online: bool = True, ) -> str: """ Generate Python code for Feast objects from dbt models. Args: models: List of DbtModel objects to generate code for entity_columns: Entity column name(s) - single string or list of strings manifest_path: Path to the dbt manifest (for documentation) project_name: dbt project name (for documentation) exclude_columns: Columns to exclude from features online: Whether to enable online serving Returns: Generated Python code as a string """ # Normalize entity_columns to list entity_cols: List[str] = ( [entity_columns] if isinstance(entity_columns, str) else entity_columns ) if not entity_cols: raise ValueError("At least one entity column must be specified") # Note: entity columns should NOT be excluded - FeatureView.__init__ # expects entity columns to be in the schema and will extract them excluded = {self.timestamp_field} if exclude_columns: excluded.update(exclude_columns) # Collect all Feast types used for imports type_imports: Set[str] = set() # Prepare entity data - create one entity per entity column entities = [] entity_vars = [] # Track variable names for feature views for entity_col in entity_cols: entity_var = _make_var_name(entity_col) entity_vars.append(entity_var) entities.append( { "var_name": entity_var, "name": entity_col, "join_key": entity_col, "description": "Entity key for dbt models", "tags": {"source": "dbt"}, } ) # Prepare data sources and feature views data_sources = [] feature_views = [] for model in models: # Check required columns exist column_names = [c.name for c in model.columns] if self.timestamp_field not in column_names: continue # Skip if ANY entity column is missing if not all(e in column_names for e in entity_cols): continue # Build tags tags = {"dbt.model": model.name} for tag in model.tags: tags[f"dbt.tag.{tag}"] = "true" # Data source source_var = _make_var_name(f"{model.name}_source") source_data = { "var_name": source_var, "name": f"{model.name}_source", "timestamp_field": self.timestamp_field, "description": _escape_description(model.description), "tags": tags, } if self.data_source_type == "bigquery": source_data["table"] = model.full_table_name elif self.data_source_type == "snowflake": source_data["database"] = model.database source_data["schema"] = model.schema source_data["table"] = model.alias elif self.data_source_type == "file": source_data["path"] = f"/data/{model.name}.parquet" data_sources.append(source_data) # Feature view fields fields = [] for column in model.columns: if column.name in excluded: continue feast_type = map_dbt_type_to_feast_type(column.data_type) type_name = _get_feast_type_name(feast_type) # Track base type for imports. For Array types, import both Array and base type. # Example: Array(Int64) requires imports: Array, Int64 if isinstance(feast_type, Array): type_imports.add("Array") base_type = getattr(feast_type, "base_type", None) if base_type is None: logger.warning( "Array type missing 'base_type' attribute while generating imports. " "This indicates a bug in Array initialization. Falling back to String." ) base_type = String base_type_name = _get_feast_type_name(base_type) type_imports.add(base_type_name) else: type_imports.add(type_name) fields.append( { "name": column.name, "dtype": type_name, "description": _escape_description(column.description), } ) # Feature view fv_var = _make_var_name(f"{model.name}_fv") feature_views.append( { "var_name": fv_var, "name": model.name, "entity_vars": entity_vars, "source_var": source_var, "ttl_days": self.ttl_days, "fields": fields, "online": online, "description": _escape_description(model.description), "tags": tags, } ) # Sort type imports for consistent output sorted_types = sorted(type_imports) # Render template return self.template.render( manifest_path=manifest_path, project_name=project_name, data_source_type=self.data_source_type, type_imports=sorted_types, entities=entities, data_sources=data_sources, feature_views=feature_views, ) def generate_feast_code( models: List[DbtModel], entity_columns: Union[str, List[str]], data_source_type: str = "bigquery", timestamp_field: str = "event_timestamp", ttl_days: int = 1, manifest_path: str = "", project_name: str = "", exclude_columns: Optional[List[str]] = None, online: bool = True, ) -> str: """ Convenience function to generate Feast code from dbt models. Args: models: List of DbtModel objects entity_columns: Entity column name(s) - single string or list of strings data_source_type: Type of data source (bigquery, snowflake, file) timestamp_field: Timestamp column name ttl_days: TTL in days for feature views manifest_path: Path to manifest for documentation project_name: Project name for documentation exclude_columns: Columns to exclude from features online: Whether to enable online serving Returns: Generated Python code as a string """ generator = DbtCodeGenerator( data_source_type=data_source_type, timestamp_field=timestamp_field, ttl_days=ttl_days, ) return generator.generate( models=models, entity_columns=entity_columns, manifest_path=manifest_path, project_name=project_name, exclude_columns=exclude_columns, online=online, )