Skip to content

Latest commit

 

History

History
653 lines (527 loc) · 21.5 KB

File metadata and controls

653 lines (527 loc) · 21.5 KB

Retrieval Augmented Generation (RAG) with Feast

This tutorial demonstrates how to use Feast with Docling and Milvus to build a Retrieval Augmented Generation (RAG) application. You'll learn how to store document embeddings in Feast and retrieve the most relevant documents for a given query.

Overview

Note

This tutorial is available on our GitHub here

RAG is a technique that combines generative models (e.g., LLMs) with retrieval systems to generate contextually relevant output for a particular goal (e.g., question and answering). Feast makes it easy to store and retrieve document embeddings for RAG applications by providing integrations with vector databases like Milvus.

The typical RAG process involves:

  1. Sourcing text data relevant for your application
  2. Transforming each text document into smaller chunks of text
  3. Transforming those chunks of text into embeddings
  4. Inserting those chunks of text along with some identifier for the chunk and document in a database
  5. Retrieving those chunks of text along with the identifiers at run-time to inject that text into the LLM's context
  6. Calling some API to run inference with your LLM to generate contextually relevant output
  7. Returning the output to some end user

Prerequisites

  • Python 3.10 or later
  • Feast installed with Milvus support: pip install feast[milvus, nlp]
  • A basic understanding of feature stores and vector embeddings

Step 0: Download, Compute, and Export the Docling Sample Dataset

import os
import io
import pypdf
import logging
import hashlib 
from datetime import datetime
import requests
import pandas as pd
from transformers import AutoTokenizer
from sentence_transformers import SentenceTransformer

from docling.datamodel.base_models import ConversionStatus, InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.chunking import HybridChunker
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger(__name__)

# Base URL for PDFs
BASE_URL = 'https://raw.githubusercontent.com/DS4SD/docling/refs/heads/main/tests/data/pdf/'
PDF_FILES = [
    '2203.01017v2.pdf', '2305.03393v1-pg9.pdf', '2305.03393v1.pdf',
    'amt_handbook_sample.pdf', 'code_and_formula.pdf', 'picture_classification.pdf',
    'redp5110_sampled.pdf', 'right_to_left_01.pdf', 'right_to_left_02.pdf', 'right_to_left_03.pdf'
]
INPUT_DOC_PATHS = [os.path.join(BASE_URL, pdf_file) for pdf_file in PDF_FILES]

# Configure PDF processing
pipeline_options = PdfPipelineOptions()
pipeline_options.generate_page_images = True

doc_converter = DocumentConverter(
    format_options={InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)}
)

# Load tokenizer and embedding model
EMBED_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
MAX_TOKENS = 64  # Small token limit for demonstration
tokenizer = AutoTokenizer.from_pretrained(EMBED_MODEL_ID)
embedding_model = SentenceTransformer(EMBED_MODEL_ID)

chunker = HybridChunker(tokenizer=tokenizer, max_tokens=MAX_TOKENS, merge_peers=True)

def embed_text(text: str) -> list[float]:
    """Generate an embedding for a given text."""
    return embedding_model.encode([text], normalize_embeddings=True).tolist()[0]

def generate_document_rows(conv_results):
    """
    Generator that yields one row per chunk from each successfully converted document.
    Each yielded dict contains:
      - file_name: Name of the source file.
      - raw_markdown: Serialized text for the chunk.
      - chunk_embedding: The embedding vector for that chunk.
    """
    processed_docs = 0
    for conv_res in conv_results:
        if conv_res.status != ConversionStatus.SUCCESS:
            continue

        processed_docs += 1
        file_name = conv_res.input.file.stem  # FIX: Use `.file.stem` instead of `.path`

        # Extract the document object (which contains iterate_items)
        document = conv_res.document
        try:
            document_markdown = document.export_to_markdown()
        except:
            document_markdown = ""
        if document is None:
            _log.warning(f"Document conversion failed for {file_name}")
            continue

        # Process each chunk from the document
        for chunk in chunker.chunk(dl_doc=document):  # Use `document` here!
            raw_chunk = chunker.serialize(chunk=chunk)
            embedding = embed_text(raw_chunk)
            yield {
                "file_name": file_name,
                "full_document_markdown": document_markdown,
                "raw_chunk_markdown": raw_chunk,
                "chunk_embedding": embedding,
            }
    _log.info(f"Processed {processed_docs} documents successfully.")

def generate_chunk_id(file_name: str, raw_chunk_markdown: str) -> str:
    """Generate a unique chunk ID based on file_name and raw_chunk_markdown."""
    unique_string = f"{file_name}-{raw_chunk_markdown}"
    return hashlib.sha256(unique_string.encode()).hexdigest()

conv_results = doc_converter.convert_all(INPUT_DOC_PATHS, raises_on_error=False)

# Build a DataFrame where each row is a unique chunk record
rows = list(generate_document_rows(conv_results))
df = pd.DataFrame.from_records(rows)
output_dict = {}
for file_name in PDF_FILES:
    try:
        r = requests.get(BASE_URL + file_name)
        pdf_bytes = io.BytesIO(r.content)
        output_dict[file_name] = pdf_bytes.getvalue()
    except Exception as e:
        print(f"error with {file_name} \n{e}")

odf = pd.DataFrame.from_dict(output_dict, orient='index', columns=['bytes']).reset_index()
odf.rename({"index": "file_name"}, axis=1, inplace=True)
odf['file_name'] = odf['file_name'].str.replace('.pdf', '')
finaldf = df.merge(odf, on='file_name', how='left')
finaldf["chunk_id"] = finaldf.apply(lambda row: generate_chunk_id(row["file_name"], row["raw_chunk_markdown"]), axis=1)

finaldf['created'] = datetime.now()

pdf_example = pypdf.PdfReader(io.BytesIO(finaldf['bytes'].values[0]))
finaldf.drop(['full_document_markdown', 'bytes'], axis=1).to_parquet('feature_repo/data/docling_samples.parquet', index=False)
odf.to_parquet('feature_repo/data/metadata_samples.parquet', index=False)

Step 1: Configure Milvus in Feast

Create a feature_store.yaml file with the following configuration:

project: docling-rag
provider: local
registry: data/registry.db
online_store:
  type: milvus
  path: data/online_store.db
  vector_enabled: true
  embedding_dim: 384
  index_type: "IVF_FLAT"

offline_store:
  type: file
entity_key_serialization_version: 3
auth:
  type: no_auth

Step 2: Define your Data Sources and Views

Create a feature_repo.py file to define your entities, data sources, and feature views:

from datetime import timedelta

import pandas as pd
from feast import (
    FeatureView,
    Field,
    FileSource,
    Entity,
    RequestSource,
)
from feast.data_format import ParquetFormat
from feast.types import Float64, Array, String, ValueType, PdfBytes
from feast.on_demand_feature_view import on_demand_feature_view
from sentence_transformers import SentenceTransformer
from typing import Dict, Any, List

import hashlib
from docling.datamodel.base_models import DocumentStream

import io
from docling.document_converter import DocumentConverter
from transformers import AutoTokenizer
from sentence_transformers import SentenceTransformer
from docling.chunking import HybridChunker

# Load tokenizer and embedding model
EMBED_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
MAX_TOKENS = 64  # Small token limit for demonstration

tokenizer = AutoTokenizer.from_pretrained(EMBED_MODEL_ID)
embedding_model = SentenceTransformer(EMBED_MODEL_ID)
chunker = HybridChunker(tokenizer=tokenizer, max_tokens=MAX_TOKENS, merge_peers=True)

def embed_text(text: str) -> list[float]:
    """Generate an embedding for a given text."""
    return embedding_model.encode([text], normalize_embeddings=True).tolist()[0]

def generate_chunk_id(file_name: str, raw_chunk_markdown: str="") -> str:
    """Generate a unique chunk ID based on file_name and raw_chunk_markdown."""
    unique_string = f"{file_name}-{raw_chunk_markdown}" if raw_chunk_markdown != "" else f"{file_name}"
    return hashlib.sha256(unique_string.encode()).hexdigest()

# Define entities
chunk = Entity(
    name="chunk_id",
    description="Chunk ID",
    value_type=ValueType.STRING,
    join_keys=["chunk_id"],
)

document = Entity(
    name="document_id",
    description="Document ID",
    value_type=ValueType.STRING,
    join_keys=["document_id"],
)

source = FileSource(
    file_format=ParquetFormat(),
    path="./data/docling_samples.parquet",
    timestamp_field="created",
)

input_request_pdf = RequestSource(
    name="pdf_request_source",
    schema=[
        Field(name="document_id", dtype=String),        
        Field(name="pdf_bytes", dtype=PdfBytes),
        Field(name="file_name", dtype=String),
    ],
)

# Define the view for retrieval
docling_example_feature_view = FeatureView(
    name="docling_feature_view",
    entities=[chunk],
    schema=[
        Field(name="file_name", dtype=String),
        Field(name="raw_chunk_markdown", dtype=String),
        Field(
            name="vector",
            dtype=Array(Float64),
            vector_index=True,
            vector_search_metric="COSINE",
        ),
        Field(name="chunk_id", dtype=String),
    ],
    source=source,
    ttl=timedelta(hours=2),
)

@on_demand_feature_view(
    entities=[chunk, document],
    sources=[input_request_pdf],
    schema=[
        Field(name="document_id", dtype=String),
        Field(name="chunk_id", dtype=String),
        Field(name="chunk_text", dtype=String),
        Field(
            name="vector",
            dtype=Array(Float64),
            vector_index=True,
            vector_search_metric="L2",
        ),
    ],
    mode="python",
    write_to_online_store=True,
    singleton=True,
)
def docling_transform_docs(inputs: dict[str, Any]):
    document_ids, chunks, embeddings, chunk_ids = [], [], [], []
    buf = io.BytesIO(
        inputs["pdf_bytes"],
    )
    doc_source = DocumentStream(name=inputs["file_name"], stream=buf)
    converter = DocumentConverter()
    result = converter.convert(doc_source)
    for i, chunk in enumerate(chunker.chunk(dl_doc=result.document)):
        raw_chunk = chunker.serialize(chunk=chunk)
        embedding = embed_text(raw_chunk)
        chunk_id = f"chunk-{i}"
        document_ids.append(inputs["document_id"])
        chunks.append(raw_chunk)
        chunk_ids.append(chunk_id)
        embeddings.append(embedding)
    return {
        "document_id": document_ids,
        "chunk_id": chunk_ids,
        "vector": embeddings,
        "chunk_text": chunks,
    }

Step 3: Update your Registry

Apply the feature view definitions to the registry:

feast apply

Step 4: Ingest your Data

Process your documents, generate embeddings, and ingest them into the Feast online store:

import pandas as pd 
from feast import FeatureStore

store = FeatureStore(repo_path=".")

df = pd.read_parquet("./data/docling_samples.parquet")
mdf = pd.read_parquet("./data/metadata_samples.parquet")
df['chunk_embedding'] = df['vector'].apply(lambda x: x.tolist())
embedding_length = len(df['vector'][0])
print(f'embedding length = {embedding_length}')
df['created'] = pd.Timestamp.now()
mdf['created'] = pd.Timestamp.now()

# Ingesting transformed data to the feature view that has no associated transformation
store.write_to_online_store(feature_view_name='docling_feature_view', df=df)

# Turning off transformation on writes is as simple as changing the default behavior
store.write_to_online_store(
    feature_view_name='docling_transform_docs', 
    df=df[df['document_id']!='doc-1'], 
    transform_on_write=False,
)

# Now we can transform a raw PDF on the fly
store.write_to_online_store(
    feature_view_name='docling_transform_docs', 
    df=mdf[mdf['document_id']=='doc-1'], 
    transform_on_write=True, # this is the default
)

Step 5: Retrieve Relevant Documents

Now you can retrieve the most relevant documents for a given query:

from feast import FeatureStore

# Initialize FeatureStore
store = FeatureStore(".")

# Generate query embedding
question = 'Who are the authors of the paper?'
query_embedding = embed_text(question)

# Retrieve similar documents
context_data = store.retrieve_online_documents_v2(
    features=[
        "docling_feature_view:vector",
        "docling_feature_view:file_name",
        "docling_feature_view:raw_chunk_markdown",
        "docling_feature_view:chunk_id",
    ],
    query=query_embedding,
    top_k=3,
    distance_metric='COSINE',
).to_df()

print(context_data)

Step 6: Use Retrieved Documents for Generation

Finally, you can use the retrieved documents as context for an LLM:

from openai import OpenAI
import os

client = OpenAI(
    api_key=os.environ.get("OPENAI_API_KEY"),
)

# Format documents for context
def format_documents(context_data, base_prompt):
    documents = "\n".join([f"Document {i+1}: {row['embedded_documents__sentence_chunks']}" 
                          for i, row in context_data.iterrows()])
    return f"{base_prompt}\n\nContext documents:\n{documents}"

BASE_PROMPT = """You are a helpful assistant that answers questions based on the provided context."""
FULL_PROMPT = format_documents(context_data, BASE_PROMPT)

# Generate response
response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[
        {"role": "system", "content": FULL_PROMPT},
        {"role": "user", "content": query_embedding}
    ],
)

print('\n'.join([c.message.content for c in response.choices]))

Alternative: Using DocEmbedder for Simplified Ingestion

Instead of manually chunking, embedding, and writing documents as shown above, you can use Feast's DocEmbedder class to handle the entire pipeline in a single step. DocEmbedder automates chunking, embedding generation, FeatureView creation, and writing to the online store.

Install Dependencies

pip install feast[milvus,rag]

Set Up and Ingest with DocEmbedder

from feast import DocEmbedder
import pandas as pd

# Prepare your documents as a DataFrame
df = pd.DataFrame({
    "id": ["doc1", "doc2", "doc3"],
    "text": [
        "Aaron is a prophet, high priest, and the brother of Moses...",
        "God at Sinai granted Aaron the priesthood for himself...",
        "His rod turned into a snake. Then he stretched out...",
    ],
})

# DocEmbedder handles everything: generates FeatureView, applies repo,
# chunks text, generates embeddings, and writes to the online store
embedder = DocEmbedder(
    repo_path="feature_repo/",
    feature_view_name="text_feature_view",
)

result = embedder.embed_documents(
    documents=df,
    id_column="id",
    source_column="text",
    column_mapping=("text", "text_embedding"),
)

Retrieve and Query

Once documents are ingested, you can retrieve them the same way as shown in Step 5 above:

from feast import FeatureStore

store = FeatureStore("feature_repo/")

query_embedding = embed_text("Who are the authors of the paper?")
context_data = store.retrieve_online_documents_v2(
    features=[
        "text_feature_view:embedding",
        "text_feature_view:text",
        "text_feature_view:source_id",
    ],
    query=query_embedding,
    top_k=3,
    distance_metric="COSINE",
).to_df()

Customizing the Pipeline

DocEmbedder is extensible at every stage. Below are examples of how to create custom components and wire them together.

Custom Chunker

Subclass BaseChunker to implement your own chunking strategy. The load_parse_and_chunk method receives each document and must return a list of chunk dictionaries.

from feast.chunker import BaseChunker, ChunkingConfig
from typing import Any, Optional

class SentenceChunker(BaseChunker):
    """Chunks text by sentences instead of word count."""

    def load_parse_and_chunk(
        self,
        source: Any,
        source_id: str,
        source_column: str,
        source_type: Optional[str] = None,
    ) -> list[dict]:
        import re

        text = str(source)
        # Split on sentence boundaries
        sentences = re.split(r'(?<=[.!?])\s+', text)

        chunks = []
        current_chunk = []
        chunk_index = 0

        for sentence in sentences:
            current_chunk.append(sentence)
            combined = " ".join(current_chunk)

            if len(combined.split()) >= self.config.chunk_size:
                chunks.append({
                    "chunk_id": f"{source_id}_{chunk_index}",
                    "original_id": source_id,
                    source_column: combined,
                    "chunk_index": chunk_index,
                })
                # Keep overlap by retaining the last sentence
                current_chunk = [sentence]
                chunk_index += 1

        # Don't forget the last chunk
        if current_chunk and len(" ".join(current_chunk).split()) >= self.config.min_chunk_size:
            chunks.append({
                "chunk_id": f"{source_id}_{chunk_index}",
                "original_id": source_id,
                source_column: " ".join(current_chunk),
                "chunk_index": chunk_index,
            })

        return chunks

Or simply configure the built-in TextChunker:

from feast import TextChunker, ChunkingConfig

chunker = TextChunker(config=ChunkingConfig(
    chunk_size=200,
    chunk_overlap=50,
    min_chunk_size=30,
    max_chunk_chars=1000,
))

Custom Embedder

Subclass BaseEmbedder to use a different embedding model. Register modality handlers in _register_default_modalities and implement the embed method.

from feast.embedder import BaseEmbedder, EmbeddingConfig
from typing import Any, List, Optional
import numpy as np

class OpenAIEmbedder(BaseEmbedder):
    """Embedder that uses the OpenAI API for text embeddings."""

    def __init__(self, model: str = "text-embedding-3-small", config: Optional[EmbeddingConfig] = None):
        self.model = model
        self._client = None
        super().__init__(config)

    def _register_default_modalities(self) -> None:
        self.register_modality("text", self._embed_text)

    @property
    def client(self):
        if self._client is None:
            from openai import OpenAI
            self._client = OpenAI()
        return self._client

    def get_embedding_dim(self, modality: str) -> Optional[int]:
        # text-embedding-3-small produces 1536-dim vectors
        if modality == "text":
            return 1536
        return None

    def embed(self, inputs: List[Any], modality: str) -> np.ndarray:
        if modality not in self._modality_handlers:
            raise ValueError(f"Unsupported modality: '{modality}'")
        return self._modality_handlers[modality](inputs)

    def _embed_text(self, inputs: List[str]) -> np.ndarray:
        response = self.client.embeddings.create(input=inputs, model=self.model)
        return np.array([item.embedding for item in response.data])

Custom Logical Layer Function

The schema transform function transforms the chunked + embedded DataFrame into the exact schema your FeatureView expects. It must accept a pd.DataFrame and return a pd.DataFrame.

import pandas as pd
from datetime import datetime, timezone

def my_schema_transform_fn(df: pd.DataFrame) -> pd.DataFrame:
    """Map chunked + embedded columns to the FeatureView schema."""
    return pd.DataFrame({
        "passage_id": df["chunk_id"],
        "text": df["text"],
        "embedding": df["text_embedding"],
        "event_timestamp": [datetime.now(timezone.utc)] * len(df),
        "source_id": df["original_id"],
        # Add any extra columns your FeatureView expects
        "chunk_index": df["chunk_index"],
    })

Putting It All Together

Pass your custom components to DocEmbedder:

from feast import DocEmbedder

embedder = DocEmbedder(
    repo_path="feature_repo/",
    feature_view_name="text_feature_view",
    chunker=SentenceChunker(config=ChunkingConfig(chunk_size=150, min_chunk_size=20)),
    embedder=OpenAIEmbedder(model="text-embedding-3-small"),
    schema_transform_fn=my_schema_transform_fn,
    vector_length=1536,  # Match the OpenAI embedding dimension
)

# Embed and ingest
result = embedder.embed_documents(
    documents=df,
    id_column="id",
    source_column="text",
    column_mapping=("text", "text_embedding"),
)

Note: When using a custom schema_transform_fn, ensure the returned DataFrame columns match your FeatureView schema. When using a custom embedder with a different output dimension, set vector_length accordingly (or let it auto-detect via get_embedding_dim).

For a complete end-to-end example, see the DocEmbedder notebook.

Why Feast for RAG?

Feast makes it remarkably easy to set up and manage a RAG system by:

  1. Simplifying vector database configuration and management
  2. Providing a consistent API for both writing and reading embeddings
  3. Supporting both batch and real-time data ingestion
  4. Enabling versioning and governance of your document repository
  5. Offering seamless integration with multiple vector database backends
  6. Providing a unified API for managing both feature data and document embeddings

For more details on using vector databases with Feast, see the Vector Database documentation.

The complete demo code is available in the GitHub repository.