Skip to content

Commit 0719c06

Browse files
feat: Created DocEmbedder class (#5973)
* - Introduced DocEmbedder class for embedding documents and transforming them into the FeatureView schema. - Added BaseChunker and TextChunker classes for document chunking. - Updated pyproject.toml to include sentence-transformers dependency. - Created a new Jupyter notebook example for using the RAG retriever with document embedding. Signed-off-by: Chaitany patel <patelchaitany93@gmail.com> * - Introduced DocEmbedder class for embedding documents and transforming them into the FeatureView schema. - Added BaseChunker and TextChunker classes for document chunking. - Updated pyproject.toml to include sentence-transformers dependency. - Created a new Jupyter notebook example for using the RAG retriever with document embedding. Signed-off-by: Chaitany patel <patelchaitany93@gmail.com> * resolving the merge conflict Signed-off-by: Chaitany patel <patelchaitany93@gmail.com> --------- Signed-off-by: Chaitany patel <patelchaitany93@gmail.com>
1 parent 99008c8 commit 0719c06

24 files changed

+5012
-9835
lines changed

examples/rag-retriever/rag_feast_docembedder.ipynb

Lines changed: 648 additions & 0 deletions
Large diffs are not rendered by default.

pixi.lock

Lines changed: 11 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ qdrant = ["qdrant-client>=1.12.0"]
117117
rag = [
118118
"transformers>=4.36.0",
119119
"datasets>=3.6.0",
120+
"sentence-transformers>=3.0.0",
120121
]
121122
ray = [
122123
'ray>=2.47.0; python_version == "3.10"',

sdk/python/feast/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414

1515
from .aggregation import Aggregation
1616
from .batch_feature_view import BatchFeatureView
17+
from .chunker import BaseChunker, ChunkingConfig, TextChunker
1718
from .data_source import KafkaSource, KinesisSource, PushSource, RequestSource
1819
from .dataframe import DataFrameEngine, FeastDataFrame
20+
from .doc_embedder import DocEmbedder, LogicalLayerFn
21+
from .embedder import BaseEmbedder, EmbeddingConfig, MultiModalEmbedder
1922
from .entity import Entity
2023
from .feature import Feature
2124
from .feature_service import FeatureService
@@ -62,4 +65,12 @@
6265
"OracleSource",
6366
"Project",
6467
"FeastVectorStore",
68+
"DocEmbedder",
69+
"LogicalLayerFn",
70+
"BaseChunker",
71+
"TextChunker",
72+
"ChunkingConfig",
73+
"BaseEmbedder",
74+
"MultiModalEmbedder",
75+
"EmbeddingConfig",
6576
]

sdk/python/feast/chunker.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
from abc import ABC, abstractmethod
2+
from dataclasses import dataclass
3+
from typing import Any, Optional
4+
5+
import pandas as pd
6+
7+
8+
@dataclass
9+
class ChunkingConfig:
10+
chunk_size: int = 100
11+
chunk_overlap: int = 20
12+
min_chunk_size: int = 20
13+
max_chunk_chars: Optional[int] = 500
14+
15+
16+
class BaseChunker(ABC):
17+
"""
18+
Abstract base class for document chunking.
19+
20+
Subclasses implement load_parse_and_chunk() with their own:
21+
- Loading logic
22+
- Parsing logic
23+
- Chunking strategy
24+
"""
25+
26+
def __init__(self, config: Optional[ChunkingConfig] = None):
27+
self.config = config or ChunkingConfig()
28+
29+
@abstractmethod
30+
def load_parse_and_chunk(
31+
self,
32+
source: Any,
33+
source_id: str,
34+
source_column: str,
35+
source_type: Optional[str] = None,
36+
) -> list[dict]:
37+
"""
38+
Load, parse, and chunk a document.
39+
40+
Args:
41+
source: File path, raw text, bytes, etc.
42+
source_id: Document identifier.
43+
source_type: Optional type hint.
44+
source_column: The column containing the document sources.
45+
46+
Returns:
47+
List of chunk dicts with keys:
48+
- chunk_id: str
49+
- original_id: str
50+
- text: str
51+
- chunk_index: int
52+
- (any additional metadata)
53+
"""
54+
pass
55+
56+
def chunk_dataframe(
57+
self,
58+
df: pd.DataFrame,
59+
id_column: str,
60+
source_column: str,
61+
type_column: Optional[str] = None,
62+
) -> pd.DataFrame:
63+
"""
64+
Chunk all documents in a DataFrame.
65+
66+
Args:
67+
df: The DataFrame containing the documents to chunk.
68+
id_column: The column containing the document IDs.
69+
source_column: The column containing the document sources.
70+
type_column: The column containing the document types.
71+
"""
72+
73+
all_chunks = []
74+
for row in df.itertuples(index=False):
75+
chunks = self.load_parse_and_chunk(
76+
getattr(row, source_column),
77+
str(getattr(row, id_column)),
78+
source_column,
79+
getattr(row, type_column) if type_column else None,
80+
)
81+
all_chunks.extend(chunks)
82+
83+
if not all_chunks:
84+
return pd.DataFrame(
85+
columns=["chunk_id", "original_id", source_column, "chunk_index"]
86+
)
87+
return pd.DataFrame(all_chunks)
88+
89+
90+
class TextChunker(BaseChunker):
91+
"""Default chunker for plain text. Chunks by word count."""
92+
93+
def load_parse_and_chunk(
94+
self,
95+
source: Any,
96+
source_id: str,
97+
source_column: str,
98+
source_type: Optional[str] = None,
99+
) -> list[dict]:
100+
# Load
101+
text = self._load(source)
102+
103+
# Chunk by words
104+
return self._chunk_by_words(text, source_id, source_column)
105+
106+
def _load(self, source: Any) -> str:
107+
from pathlib import Path
108+
109+
if isinstance(source, Path) and source.exists():
110+
return Path(source).read_text()
111+
if isinstance(source, str):
112+
if source.endswith(".txt") and Path(source).exists():
113+
return Path(source).read_text()
114+
return str(source)
115+
116+
def _chunk_by_words(
117+
self, text: str, source_id: str, source_column: str
118+
) -> list[dict]:
119+
words = text.split()
120+
chunks = []
121+
122+
step = self.config.chunk_size - self.config.chunk_overlap
123+
if step <= 0:
124+
raise ValueError(
125+
f"chunk_overlap ({self.config.chunk_overlap}) must be less than "
126+
f"chunk_size ({self.config.chunk_size})"
127+
)
128+
chunk_index = 0
129+
130+
for i in range(0, len(words), step):
131+
chunk_words = words[i : i + self.config.chunk_size]
132+
133+
if len(chunk_words) < self.config.min_chunk_size:
134+
continue
135+
136+
chunk_text = " ".join(chunk_words)
137+
if self.config.max_chunk_chars:
138+
chunk_text = chunk_text[: self.config.max_chunk_chars]
139+
140+
chunks.append(
141+
{
142+
"chunk_id": f"{source_id}_{chunk_index}",
143+
"original_id": source_id,
144+
source_column: chunk_text,
145+
"chunk_index": chunk_index,
146+
}
147+
)
148+
chunk_index += 1
149+
150+
return chunks

0 commit comments

Comments
 (0)