import inspect import os import warnings from pathlib import Path from typing import TYPE_CHECKING, Callable, Optional, Protocol, runtime_checkable import pandas as pd from feast.chunker import BaseChunker, TextChunker from feast.embedder import BaseEmbedder, MultiModalEmbedder if TYPE_CHECKING: from feast.feature_store import FeatureStore @runtime_checkable class LogicalLayerFn(Protocol): """ Protocol defining the structure for logical layer functions. The logical layer transforms the output of Chunker + Embedder into the format expected by the FeatureView schema. """ def __call__(self, df: pd.DataFrame) -> pd.DataFrame: """ Transform chunked + embedded DataFrame to FeatureView schema. Args: df: Input DataFrame with chunks and embeddings. Returns: DataFrame with columns matching FeatureView schema. """ ... def default_logical_layer_fn(df: pd.DataFrame) -> pd.DataFrame: """ Default logical layer function that transforms the output of Chunker + Embedder into the format expected by the FeatureView schema. """ from datetime import datetime, timezone return pd.DataFrame( { "passage_id": df["chunk_id"], "text": df["text"], "embedding": df["text_embedding"], "event_timestamp": [datetime.now(timezone.utc)] * len(df), "source_id": df["original_id"], } ) def generate_repo_file( repo_path: str, feature_view_name: str = "text_feature_view", vector_length: int = 384, ) -> str: """ Generate a Python file with Entity and FeatureView definitions. This file is compatible with `feast apply` CLI. Args: repo_path: Path to the feature repo directory. feature_view_name: Name of the feature view to create. vector_length: Dimension of the embedding vectors. Should match the output dimension of the embedding model being used. Defaults to 384 (matching the default all-MiniLM-L6-v2 model). Returns: Path to generated file. """ from feast.repo_operations import is_valid_name if not is_valid_name(feature_view_name) or not feature_view_name.isidentifier(): raise ValueError( f"feature_view_name '{feature_view_name}' is invalid. " "It should only contain alphanumeric characters, underscores, " "and must not start with an underscore." ) code = f'''""" Auto-generated by DocEmbedder. Compatible with `feast apply` CLI. """ from datetime import timedelta from feast import Entity, FeatureView, Field, FileSource from feast.types import Array, Float32, String, ValueType # Entity text_entity = Entity( name="passage_id", join_keys=["passage_id"], description="Passage identifier", value_type=ValueType.STRING, ) # Source {feature_view_name.replace(" ", "_").replace("-", "_")}_source = FileSource( name="{feature_view_name}_source", path="data/{feature_view_name}.parquet", timestamp_field="event_timestamp", ) # FeatureView {feature_view_name.replace(" ", "_").replace("-", "_")} = FeatureView( name="{feature_view_name}", entities=[text_entity], ttl=timedelta(days=1), schema=[ Field( name="text", dtype=String, description="Document text content", ), Field( name="embedding", dtype=Array(Float32), description="Vector embedding", vector_index=True, vector_length={vector_length}, vector_search_metric="COSINE", ), Field( name="source_id", dtype=String, description="Source ID", ), ], source={feature_view_name.replace(" ", "_").replace("-", "_")}_source, online=True, ) ''' filepath = os.path.join( repo_path, feature_view_name.replace(" ", "_").replace("-", "_") + ".py" ) with open(filepath, "w") as f: f.write(code) return filepath class DocEmbedder: """ DocEmbedder is a class that embeds documents and chunks them into a format expected by the FeatureView schema using a Logic Implementation By the user. Args: repo_path: Path to the feature repo (can be "." for current directory). yaml_file: Name of the feature_store.yaml file inside repo_path. Defaults to "feature_store.yaml". feature_view_name: Name of the feature view to create. chunker: Chunker to use for chunking the documents. embedder: Embedder to use for embedding the documents. logical_layer_fn: Logical layer function to use for transforming the output of the chunker and embedder into the format expected by the FeatureView schema. create_feature_view: Whether to create a feature view in the feature repo. By default it will generate a Python file with the FeatureView definition. vector_length: Explicit embedding dimension for the generated FeatureView schema. If None (default), the dimension is auto-detected from the embedder via ``get_embedding_dim("text")``. Falls back to 384 if detection is not supported by the embedder. auto_apply_repo: Whether to apply the repository automatically. By default it will apply the repository after creating the feature view. """ def __init__( self, repo_path: str, yaml_file: str = "feature_store.yaml", feature_view_name: str = "text_feature_view", chunker: Optional[BaseChunker] = None, embedder: Optional[BaseEmbedder] = None, logical_layer_fn: LogicalLayerFn = default_logical_layer_fn, create_feature_view: bool = True, vector_length: Optional[int] = None, auto_apply_repo: bool = True, ): self.repo_path = repo_path self.yaml_path = os.path.join(Path(repo_path).resolve(), yaml_file) self.feature_view_name = feature_view_name self.chunker = chunker or TextChunker() self.embedder = embedder or MultiModalEmbedder() self.store: Optional[FeatureStore] = None sig = inspect.signature(logical_layer_fn) params = list(sig.parameters.values()) if ( len(params) != 1 or params[0].annotation != pd.DataFrame or sig.return_annotation != pd.DataFrame ): raise ValueError( "logical_layer_fn must be a function that takes a DataFrame and returns a DataFrame" ) self.logical_layer_fn = logical_layer_fn if create_feature_view: resolved_vector_length = self._resolve_vector_length(vector_length, "text") generate_repo_file( repo_path=repo_path, feature_view_name=feature_view_name, vector_length=resolved_vector_length, ) if auto_apply_repo: self.apply_repo() def _resolve_vector_length( self, explicit_length: Optional[int], modality: str ) -> int: """ Determine the vector length to use for the generated FeatureView. Priority: 1. Explicitly provided vector_length 2. Auto-detected from embedder via get_embedding_dim("text") 3. Default of 384 (matching all-MiniLM-L6-v2) Args: explicit_length: User-provided vector length, or None. Returns: The resolved vector length as an integer. """ _DEFAULT_VECTOR_LENGTH = 384 if explicit_length is not None: return explicit_length try: dim = self.embedder.get_embedding_dim(modality) if dim is not None: return dim except Exception: pass return _DEFAULT_VECTOR_LENGTH def save_to_online_store(self, df: pd.DataFrame, feature_view_name: str) -> None: """ Save the embedded documents to the online store. """ from feast.feature_store import FeatureStore if self.store is None: self.store = FeatureStore(repo_path=self.repo_path) self.store.write_to_online_store( feature_view_name=feature_view_name, df=df, ) # TODO (Future scope): Implement save_to_offline_store to write embedded # documents to the offline store. Currently blocked by DaskOfflineStore # .offline_write_batch not creating the parquet file if it does not exist. # Once that is fixed, add a method that calls: # store = FeatureStore(repo_path=self.repo_path) # store.write_to_offline_store(feature_view_name=feature_view_name, df=df) def apply_repo(self) -> None: """ Apply the repository to register feature views in the registry. """ from feast.repo_config import load_repo_config from feast.repo_operations import apply_total original_cwd = None try: original_cwd = os.getcwd() repo_path = Path(self.repo_path).resolve() config = load_repo_config( repo_path=repo_path, fs_yaml_file=Path(self.yaml_path), ) apply_total( repo_config=config, repo_path=repo_path, skip_source_validation=True, ) finally: if original_cwd is not None: os.chdir(original_cwd) def embed_documents( self, documents: pd.DataFrame, id_column: str, source_column: str, type_column: Optional[str] = None, column_mapping: Optional[tuple[str, str]] = None, custom_logical_layer_fn: Optional[ Callable[[pd.DataFrame], pd.DataFrame] ] = None, ) -> pd.DataFrame: """ Embed a list of documents and chunk them into a format expected by the FeatureView schema using a Logic Implementation By the user and save the DataFrame to the online store. Args: documents: DataFrame containing the documents to embed. id_column: Column name containing the document IDs. source_column: Column name containing the document sources. type_column: Column name containing the document types. column_mapping: Tuple mapping source columns to (modality, output column). custom_logical_layer_fn: Custom logical layer function to use for transforming the output of the chunker and embedder into the format expected by the FeatureView schema. Returns: DataFrame with the embedded documents. Example: documents = pd.DataFrame({ "id": [1, 2, 3], "source": ["source1", "source2", "source3"], "type": ["type1", "type2", "type3"], "text": ["text1", "text2", "text3"], }) column_mapping = ("text", "text_embedding") df = embed_documents(documents=documents, id_column="id", source_column="source", type_column="type", column_mapping=column_mapping) """ if custom_logical_layer_fn is not None: sig = inspect.signature(custom_logical_layer_fn) params = list(sig.parameters.values()) if ( len(params) != 1 or params[0].annotation != pd.DataFrame or sig.return_annotation != pd.DataFrame ): raise ValueError( "custom_logical_layer_fn must be a function that takes a DataFrame and returns a DataFrame" ) current_logical_layer_fn = ( custom_logical_layer_fn if custom_logical_layer_fn is not None else self.logical_layer_fn ) if column_mapping is None: column_mapping = ("text", "text_embedding") if ( current_logical_layer_fn is default_logical_layer_fn and column_mapping[0] == "text" and (source_column != "text" or column_mapping[1] != "text_embedding") ): raise ValueError( f"source_column='{source_column}' with output column='{column_mapping[1]}' " f"is not compatible with default_logical_layer_fn, which expects " f"source_column='text' and column_mapping=('text', 'text_embedding'). " f"Provide a custom logical_layer_fn." ) if column_mapping[0] == "text": df = self.chunker.chunk_dataframe( df=documents, id_column=id_column, source_column=source_column, type_column=type_column, ) else: df = documents df = self.embedder.embed_dataframe( df, column_mapping={source_column: column_mapping} ) if ( column_mapping[0] == "text" or current_logical_layer_fn is not default_logical_layer_fn ): df = current_logical_layer_fn(df) else: warnings.warn( f"Modality '{column_mapping[0]}' is not supported by the default logical layer function. " f"The output DataFrame will be passed directly to the online store. " f"Ensure your FeatureView schema matches the output columns. " f"You can provide a custom logical layer function to handle this.", UserWarning, stacklevel=2, ) self.save_to_online_store(df=df, feature_view_name=self.feature_view_name) return df